Untitled
unknown
python
3 years ago
4.9 kB
9
Indexable
from cProfile import label
from operator import ge
import socket
import math
from turtle import color
# For plot real-time
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from matplotlib.lines import Line2D
import numpy as np
import matplotlib.animation as animation
from multiprocessing import Process
from collections import deque
import time
import re, sys
from scipy import linalg
# How many sensor samples we want to store
HISTORY_SIZE = 10000
# Pause re-sampling the sensor and drawing for INTERVAL seconds
INTERVAL = 0.0001
# msg = bytearray()
msg = b''
init = True
######################### Initialize socket server and ploter #########################
# Define host and port which server listen
host = '192.168.1.1'
port = 8888
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# server.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
server.bind((host, port))
server.listen(1) # Only have 1 connection
print("Server listening on port: ", port)
client, addr = server.accept()
print("Connect from ", str(addr))
# Turn off this axis when not calib gyro and mag
# fig, ax = plt.subplots(1, 1)
# ax.set_aspect(1)
# remove animation
anim = None
aniRealtime = None
######################### End of Initialize socket server and ploter #################################
def get_imu_data():
msg = client.recv(1024)
global msg2string
msg2string = msg.decode("utf-8")
# print(msg2string)
# print(type(msg2string))
return msg2string
# Create mag_x_arr is queue data
mag_x_arr = deque()
mag_y_arr = deque()
mag_z_arr = deque()
def emitterData():
"""Return ax, ay, az, roll, pitch, yaw, gra_x, gra_y, gra_z, geo_x, geo_y, geo_z"""
while True:
data = []
ret = get_imu_data()
if (isinstance(ret, str)):
# parseLine = msg2string.split(",")
parseLine = re.split(",|\n",msg2string)
print("len(parseLine): ", len(parseLine))
print(parseLine)
else:
continue
if (len(parseLine) >= 3):
# geo_x, geo_y, geo_z
data.append(float(parseLine[0]))
data.append(float(parseLine[1]))
data.append(float(parseLine[2]))
mag_x_arr.append(float(parseLine[0]))
mag_y_arr.append(float(parseLine[1]))
mag_z_arr.append(float(parseLine[2]))
yield data,
class Scope:
# 0.02 second is 50Hz, maxt is window max time
def __init__(self, ax, maxt=10, dt=0.02, lim_left = -50., lim_right = 50.):
self.ax = ax
self.dt = dt
self.maxt = maxt
self.tdata = [0]
self.lim_left = lim_left
self.lim_right = lim_right
self.xdata = [0]
self.ydata = [0]
self.zdata = [0]
self.line_x = Line2D(self.tdata, self.xdata, color='r', label="x")
self.line_y = Line2D(self.tdata, self.ydata, color='g', label="y")
self.line_z = Line2D(self.tdata, self.zdata, color='b', label="z")
self.ax.add_line(self.line_x)
self.ax.add_line(self.line_y)
self.ax.add_line(self.line_z)
self.ax.set_ylim(self.lim_left, self.lim_right)
self.ax.set_xlim(0, self.maxt)
self.preData = [0] * 4
def update_xyz(self, data):
lastt = self.tdata[-1]
if lastt > self.tdata[0] + self.maxt: # reset the arrays
median = np.median(self.xdata)
self.tdata = [self.tdata[-1]]
self.xdata = [self.xdata[-1]]
self.ydata = [self.ydata[-1]]
self.zdata = [self.zdata[-1]]
self.ax.set_ylim(median - 10, median + 10)
self.ax.set_xlim(self.tdata[0], self.tdata[0] + self.maxt)
self.ax.figure.canvas.draw()
t = self.tdata[-1] + self.dt
self.tdata.append(t)
data = np.squeeze(data)
print("Data Update: ", data)
if len(data) >= 3:
self.xdata.append(data[0])
self.ydata.append(data[1])
self.zdata.append(data[2])
self.line_x.set_data(self.tdata, self.xdata)
self.line_y.set_data(self.tdata, self.ydata)
self.line_z.set_data(self.tdata, self.zdata)
return self.line_x, self.line_y, self.line_z,
# return self.line_x,
def visualRealtimeData():
global aniRealtime
fig_xyz, ax_xyz = plt.subplots(1, 1)
scope_xyz = Scope(ax_xyz, lim_left = 101057-20, lim_right = 101057 + 20)
ax_xyz.legend()
# pass a generator in "emitter" to produce data for the update func
aniRealtime = animation.FuncAnimation(fig_xyz, scope_xyz.update_xyz, emitterData, interval=0,
blit=True)
print("============= Blocking in plt.show() ===============")
# plt.autoscale(enable=True, axis='x', tight=True)
plt.show()
visualRealtimeData()
Editor is loading...