130 lines
3.6 KiB
Python
130 lines
3.6 KiB
Python
import serial
|
|
import serial.tools.list_ports
|
|
import numpy as np
|
|
import matplotlib.pyplot as plt
|
|
import logging
|
|
|
|
# 设置中文字体
|
|
plt.rcParams['font.sans-serif'] = ['SimHei']
|
|
plt.rcParams['axes.unicode_minus'] = False
|
|
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
|
|
# 全局变量
|
|
ser = None
|
|
alld = np.array([], dtype=np.uint8)
|
|
|
|
fig, ax = plt.subplots()
|
|
cax = ax.imshow(np.zeros((32, 32)), cmap='viridis', interpolation='nearest')
|
|
fig.colorbar(cax, ax=ax, label='压力值')
|
|
ax.set_title("热力图")
|
|
ax.set_xlabel("X轴")
|
|
ax.set_ylabel("Y轴")
|
|
|
|
|
|
def setup_serial_port():
|
|
global ser
|
|
available_ports = list(serial.tools.list_ports.comports())
|
|
if not available_ports:
|
|
logging.error('未找到可用的串口设备')
|
|
return None
|
|
|
|
print("可用的串口设备:")
|
|
for i, port in enumerate(available_ports):
|
|
print(f"[{i}] {port.device}")
|
|
|
|
try:
|
|
choice = int(input("请选择串口编号: "))
|
|
if 0 <= choice < len(available_ports):
|
|
ser = serial.Serial(available_ports[choice].device, 1000000, timeout=1)
|
|
logging.info(f'成功连接到串口: {available_ports[choice].device}')
|
|
return ser
|
|
else:
|
|
logging.error("无效的选择")
|
|
return None
|
|
except ValueError:
|
|
logging.error("请输入有效的数字")
|
|
return None
|
|
|
|
|
|
def find_packet_start(data):
|
|
return np.where(np.all(np.array([data[i:i + 4] for i in range(len(data) - 3)]) == [170, 85, 3, 153], axis=1))[0]
|
|
|
|
|
|
def update_heatmap(matrix):
|
|
"""
|
|
更新热力图,并根据矩阵值动态调整颜色映射范围。
|
|
"""
|
|
|
|
min_val = np.min(matrix)
|
|
max_val = np.max(matrix)
|
|
|
|
cax.set_clim(vmin=min_val, vmax=max_val)
|
|
|
|
cax.set_data(matrix)
|
|
|
|
ax.figure.canvas.draw()
|
|
plt.pause(0.01) # 确保图像更新
|
|
|
|
|
|
def process_matrix_data(matrix_data):
|
|
"""处理1024字节的矩阵数据"""
|
|
if len(matrix_data) != 1024:
|
|
return None
|
|
|
|
img_data = np.array(matrix_data).flatten()
|
|
|
|
# 交换行
|
|
for i in range(8):
|
|
start1, end1 = i * 32, (i + 1) * 32
|
|
start2, end2 = (14 - i) * 32, (15 - i) * 32
|
|
img_data[start1:end1], img_data[start2:end2] = img_data[start2:end2].copy(), img_data[start1:end1].copy()
|
|
|
|
img_data = np.roll(img_data, -15 * 32)
|
|
|
|
return img_data.reshape(32, 32)
|
|
|
|
|
|
def process_serial_data():
|
|
global alld
|
|
if ser and ser.in_waiting > 0:
|
|
receive = ser.read(ser.in_waiting)
|
|
alld = np.concatenate([alld, np.frombuffer(receive, dtype=np.uint8)])
|
|
|
|
if len(alld) >= 4:
|
|
index = find_packet_start(alld)
|
|
if len(index) > 0:
|
|
if index[0] > 0:
|
|
alld = alld[index[0]:]
|
|
|
|
if len(alld) >= 1028:
|
|
imgdata = alld[4:1028]
|
|
alld = alld[1028:]
|
|
|
|
if len(imgdata) == 1024:
|
|
img_data = imgdata
|
|
frameData_b = process_matrix_data(img_data)
|
|
# logging.info(f"接收到的矩阵数据:\n{frameData_b}") # 打印矩阵到控制台
|
|
update_heatmap(frameData_b)
|
|
|
|
|
|
def main():
|
|
global ser
|
|
ser = setup_serial_port()
|
|
if ser is None:
|
|
return
|
|
|
|
print("热力图显示中,按 Ctrl+C 退出程序")
|
|
try:
|
|
while True:
|
|
process_serial_data()
|
|
except KeyboardInterrupt:
|
|
logging.info("用户中断程序")
|
|
finally:
|
|
if ser and ser.is_open:
|
|
ser.close()
|
|
logging.info("串口已关闭")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |