XM-01/资料/算法文档/JQ_USB_Connection.py

130 lines
3.6 KiB
Python
Raw Normal View History

import serial
import serial.tools.list_ports
import numpy as np
import matplotlib.pyplot as plt
import logging
# 设置中文字体
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# 全局变量
ser = None
alld = np.array([], dtype=np.uint8)
fig, ax = plt.subplots()
cax = ax.imshow(np.zeros((32, 32)), cmap='viridis', interpolation='nearest')
fig.colorbar(cax, ax=ax, label='压力值')
ax.set_title("热力图")
ax.set_xlabel("X轴")
ax.set_ylabel("Y轴")
def setup_serial_port():
global ser
available_ports = list(serial.tools.list_ports.comports())
if not available_ports:
logging.error('未找到可用的串口设备')
return None
print("可用的串口设备:")
for i, port in enumerate(available_ports):
print(f"[{i}] {port.device}")
try:
choice = int(input("请选择串口编号: "))
if 0 <= choice < len(available_ports):
ser = serial.Serial(available_ports[choice].device, 1000000, timeout=1)
logging.info(f'成功连接到串口: {available_ports[choice].device}')
return ser
else:
logging.error("无效的选择")
return None
except ValueError:
logging.error("请输入有效的数字")
return None
def find_packet_start(data):
return np.where(np.all(np.array([data[i:i + 4] for i in range(len(data) - 3)]) == [170, 85, 3, 153], axis=1))[0]
def update_heatmap(matrix):
"""
更新热力图并根据矩阵值动态调整颜色映射范围
"""
min_val = np.min(matrix)
max_val = np.max(matrix)
cax.set_clim(vmin=min_val, vmax=max_val)
cax.set_data(matrix)
ax.figure.canvas.draw()
plt.pause(0.01) # 确保图像更新
def process_matrix_data(matrix_data):
"""处理1024字节的矩阵数据"""
if len(matrix_data) != 1024:
return None
img_data = np.array(matrix_data).flatten()
# 交换行
for i in range(8):
start1, end1 = i * 32, (i + 1) * 32
start2, end2 = (14 - i) * 32, (15 - i) * 32
img_data[start1:end1], img_data[start2:end2] = img_data[start2:end2].copy(), img_data[start1:end1].copy()
img_data = np.roll(img_data, -15 * 32)
return img_data.reshape(32, 32)
def process_serial_data():
global alld
if ser and ser.in_waiting > 0:
receive = ser.read(ser.in_waiting)
alld = np.concatenate([alld, np.frombuffer(receive, dtype=np.uint8)])
if len(alld) >= 4:
index = find_packet_start(alld)
if len(index) > 0:
if index[0] > 0:
alld = alld[index[0]:]
if len(alld) >= 1028:
imgdata = alld[4:1028]
alld = alld[1028:]
if len(imgdata) == 1024:
img_data = imgdata
frameData_b = process_matrix_data(img_data)
# logging.info(f"接收到的矩阵数据:\n{frameData_b}") # 打印矩阵到控制台
update_heatmap(frameData_b)
def main():
global ser
ser = setup_serial_port()
if ser is None:
return
print("热力图显示中,按 Ctrl+C 退出程序")
try:
while True:
process_serial_data()
except KeyboardInterrupt:
logging.info("用户中断程序")
finally:
if ser and ser.is_open:
ser.close()
logging.info("串口已关闭")
if __name__ == "__main__":
main()