import numpy as np
from matplotlib import pyplot as plt
from multiprocessing import Pipe, Process
from .plotter import Plotter
[docs]
class LivePlotter(Plotter):
def __init__(self) -> None:
super().__init__()
self.fig.canvas.mpl_connect("close_event", self.close)
self.receiving_conn, self.sending_conn = Pipe(duplex=False)
self.cntr = 0
self.updating = True
self.t_data = 0
self.ref_data = 0
self.angle_data = 0
self.motor_data = 0
[docs]
def set_up(self, cycles, freq) -> None:
max_time = cycles // freq
self.ax[0].set_xlim(-.05, 1.01*max_time)
# for generalisation, this range could be increased to the full range of motion of the pendulum.
self.ax[0].set_ylim(-5, 180)
self.ax[1].set_xlim(-.05, 1.01*max_time)
self.ax[1].set_ylim(-5, 105)
self.ax[2].set_xlim(-.05, 1.01*max_time)
self.ax[2].set_ylim(1000/freq - .5, 2000/freq)
self.t_data = np.zeros(cycles)
self.ref_data = np.zeros(cycles)
self.pot_data = np.zeros(cycles)
self.angle_data = np.zeros(cycles)
self.motor_data = np.zeros(cycles)
[docs]
def add_data_to_queue(self, t, ref, pot, angle, motor):
self.sending_conn.send((t, ref, pot, angle, motor))
[docs]
def get_data_from_queue(self):
done = False
while not done:
if self.receiving_conn.poll():
data = self.receiving_conn.recv()
if data[0] == -1:
self.updating = False
else:
self.t_data[self.cntr], self.ref_data[self.cntr], self.pot_data[self.cntr], self.angle_data[self.cntr], self.motor_data[self.cntr] = data
self.cntr += 1
else:
done = True
[docs]
def get_process(self):
return Process(target=self.run, daemon=True)
[docs]
def run(self):
self.updating = True
while self.updating:
self.get_data_from_queue()
if self.cntr > 0:
self.ref_line.set_data(self.t_data[:self.cntr], self.ref_data[:self.cntr])
self.pot_line.set_data(self.t_data[:self.cntr], self.pot_data[:self.cntr])
self.angle_line.set_data(self.t_data[:self.cntr], self.angle_data[:self.cntr])
self.motor_line.set_data(self.t_data[:self.cntr], self.motor_data[:self.cntr])
self.dt_line.set_data(self.t_data[:self.cntr], 1000*np.gradient(self.t_data[:self.cntr]))
self.fig.canvas.draw()
plt.pause(1/60)
plt.show()
[docs]
def close(self, event):
self.updating = False
plt.close(self.fig)