1、最近有个想做一个传感器数据实时显示的上位机,常规的数据打印太频繁了,无法直观的看出数据的变化。
python下的上位机实现起来简单一点,网上找了一些python界面Tkinter相关资料和python串口的demo.测试实现了简单的数据显示。
Mark 一下问题点:
最大的问题点在于对bytes型数据的拼接:之前的串口解析的代码是在python 2.7平台上实现的,
切换到python3.0之后,测试失踪无法通过。幸而找到了大神相助。
python 2.7code:
1 import os 2 import time 3 import sys, traceback 4 from serial.serialutil import SerialException 5 from serial import Serial 6 7 import struct 8 import binascii 9 10 11 class Arduino: 12 def __init__(self, port="/dev/ttyUSB0", baudrate=115200, timeout=0.5): 13 14 self.port = port 15 self.baudrate = baudrate 16 self.timeout = timeout 17 self.encoder_count = 0 18 self.writeTimeout = timeout 19 self.interCharTimeout = timeout / 30. 20 21 self.WAITING_FF = 0 22 self.WAITING_AA = 1 23 self.RECEIVE_LEN = 2 24 self.RECEIVE_PACKAGE = 3 25 self.RECEIVE_CHECK = 4 26 self.HEADER0 = 0xff 27 self.HEADER1 = 0xaa 28 self.REC_CMD = 5 29 self.count = 0 30 self.data1 = 0 31 self.data2 = 0 32 self.error_flag = 0 33 self.SUCCESS = 0 34 self.FAIL = -1 35 36 self.receive_state_ = self.WAITING_FF 37 self.receive_check_sum_ = 0 38 self.payload_command = '' 39 self.payload_ack = '' 40 self.payload_args = '' 41 self.payload_len = 0 42 self.byte_count_ = 0 43 self.receive_message_length_ = 0 44 45 self.mutex = threading.Thread.allocate_lock() 46 47 # An array to cache analog sensor readings 48 self.analog_sensor_cache = [None] * self.N_ANALOG_PORTS 49 50 # An array to cache digital sensor readings 51 self.digital_sensor_cache = [None] * self.N_DIGITAL_PORTS 52 53 def connect(self): 54 try: 55 print "Connecting to Arduino on port", self.port, "..." 56 self.port = Serial(port=self.port, baudrate=self.baudrate, timeout=self.timeout, writeTimeout=self.writeTimeout) 57 # The next line is necessary to give the firmware time to wake up. 58 time.sleep(1) 59 state_, val = self.get_baud() 60 if val != self.baudrate: 61 time.sleep(1) 62 state_, val = self.get_baud() 63 if val != self.baudrate: 64 raise SerialException 65 print "Connected at", self.baudrate 66 print "Arduino is ready." 67 68 except SerialException: 69 print "Serial Exception:" 70 print sys.exc_info() 71 print "Traceback follows:" 72 traceback.print_exc(file=sys.stdout) 73 print "Cannot connect to Arduino!" 74 os._exit(1) 75 76 def open(self): 77 self.port.open() 78 79 def close(self): 80 self.port.close() 81 82 def send(self, cmd): 83 self.port.write(cmd) 84 85 def receiveFiniteStates(self, rx_data): 86 if self.receive_state_ == self.WAITING_FF: 87 #print str(binascii.b2a_hex(rx_data)) 88 if rx_data == 'xff': 89 self.receive_state_ = self.WAITING_AA 90 self.receive_check_sum_ =0 91 self.receive_message_length_ = 0 92 self.byte_count_=0 93 self.payload_ack = '' 94 self.payload_args = '' 95 self.payload_len = 0 96 self.count = 0 97 98 elif self.receive_state_ == self.WAITING_AA : 99 if rx_data == 'xaa': 100 self.receive_state_ = self.REC_CMD 101 else: 102 self.receive_state_ = self.WAITING_FF 103 elif self.receive_state_ == self.REC_CMD: 104 self.count+=1 105 if self.count == 1: 106 self.data1,=struct.unpack("B",rx_data) 107 elif self.count == 2: 108 self.data2,=struct.unpack("B",rx_data) 109 self.receive_state_ = self.RECEIVE_LEN 110 if self.error_flag == 0 and self.data1 != 0: 111 self.error_flag = 1 112 if self.data2 != 0 and self.error_flag == 0: 113 self.error_flag = 1 114 elif self.receive_state_ == self.RECEIVE_LEN: 115 self.receive_message_length_, = struct.unpack("B",rx_data) 116 self.receive_state_ = self.RECEIVE_PACKAGE 117 elif self.receive_state_ == self.RECEIVE_PACKAGE: 118 if self.byte_count_==0: 119 self.payload_ack = rx_data 120 else: 121 self.payload_args += rx_data 122 self.byte_count_ +=1 123 #print "byte:"+str(byte_count_) +","+ "rece_len:"+str(receive_message_length_) 124 if self.byte_count_ >= self.receive_message_length_: 125 self.receive_state_ = self.RECEIVE_CHECK 126 127 elif self.receive_state_ == self.RECEIVE_CHECK: 128 #if(rx_data == (unsigned char)receive_check_sum_) 129 if 1: 130 self.receive_state_ = self.WAITING_FF 131 #print str(binascii.b2a_hex(value)) 132 #left, right, = struct.unpack('hh', value) 133 #print "left:"+str(left)+", right:"+str(right) 134 return 1 135 else: 136 self.receive_state_ = self.WAITING_FF 137 else: 138 self.receive_state_ = self.WAITING_FF; 139 return 0 140 141 def recv(self, timeout=0.5): 142 timeout = min(timeout, self.timeout) 143 ''' This command should not be used on its own: it is called by the execute commands 144 below in a thread safe manner. Note: we use read() instead of readline() since 145 readline() tends to return garbage characters from the Arduino 146 ''' 147 c = '' 148 value = '' 149 attempts = 0 150 c = self.port.read(1) 151 #print str(binascii.b2a_hex(c)) 152 while self.receiveFiniteStates(c) != 1: 153 c = self.port.read(1) 154 #print str(binascii.b2a_hex(c)) 155 attempts += 1 156 if attempts * self.interCharTimeout > timeout: 157 return 0 158 return 1 159 160 def recv_ack(self): 161 ack = self.recv(self.timeout) 162 return ack == 'OK' 163 164 def execute(self, cmd): 165 self.mutex.acquire() 166 167 try: 168 self.port.flushInput() 169 except: 170 pass 171 172 ntries = 1 173 attempts = 0 174 175 try: 176 self.port.write(cmd) 177 res = self.recv(self.timeout) 178 while attempts < ntries and res !=1 : 179 try: 180 self.port.flushInput() 181 self.port.write(cmd) 182 res = self.recv(self.timeout) 183 except: 184 print "Exception executing command: " + str(binascii.b2a_hex(cmd)) 185 attempts += 1 186 except: 187 self.mutex.release() 188 print "Exception executing command: " + str(binascii.b2a_hex(cmd)) 189 return 0 190 191 self.mutex.release() 192 return 1 193 194 def get_baud(self): 195 ''' Get the current baud rate on the serial port. 196 ''' 197 cmd_str=struct.pack("4B", self.HEADER0, self.HEADER1, 0x01, 0x00) + struct.pack("B", 0x01) 198 if (self.execute(cmd_str))==1 and self.payload_ack == 'x00': 199 val, = struct.unpack('I', self.payload_args) 200 return self.SUCCESS, val 201 else: 202 return self.FAIL, 0 203 204 def get_check_sum(self,list): 205 list_len = len(list) 206 cs = 0 207 for i in range(list_len): 208 #print i, list[i] 209 cs += list[i] 210 cs=cs%255 211 return cs 212 213 def stop(self): 214 ''' Stop both motors. 215 ''' 216 self.drive(0, 0)
在python 3.6 version 中就该修改部分代码:
问题在于bytes类型的拼接:
以下的拼接是正确的:
bytes('','ascii')+bytes([1,2,3,4,5,6,7,8,9])
bytes('','ascii')+bytes([1,2,3,4,5,6,7,8,9])+bytes([1,2,3,4,5,6,7,8,9])
也即是:bytes类型的拼接的初始变量要声明为:bytes('','ascii')
如果声明为其它类型:比如字符串,执行时会直接报错。
即:python2.7代码块中:40行以及94行的声明:
(1)将:40行和94行的 self.payload_args = '' 改为: self.payload_args = bytes('','ascii') (2)将:121行赋值命令: self.payload_args += rx_data 修改为: self.payload_args += bytes(rx_data)
源码已上传到github:链接
目前单纯实现了串口数据的查询: