一、环境搭建
1.概述
本文主要是通过Python3实现CAN总线上J1939报文接收、发送等功能,通过模拟单帧、多帧实现周期性发送报文等模拟场景。
2.CAN工具
本案例采用的是PCAN-USB工具
PCAN-USB驱动:https://www.peak-system.com/fileadmin/media/files/pcan-basic.zip
3.Python安装
下载地址:https://www.python.org/ftp/python/3.7.9/python-3.7.9-amd64.exe
库:pip install can-j1939
二、项目演示
1.J1939报文接收
2.J1939报文发送
用另一台CAN工具查看报文,接收正常:
三、完整代码
#!/usr/bin/python # _*_ coding:utf-8 _*_ import time, j1939, threading def on_message(priority, pgn, sa, timestamp, data): """接收来自总线的消息""" print(priority, pgn, sa, timestamp, [data[i] for i in range(len(data))]) # 打印消息 def recv_j1939(read_time): """订阅总线消息""" ecu.subscribe(on_message) time.sleep(read_time) ecu.unsubscribe(on_message) # 停止订阅消息 def cycle_send_j1939(): """发送j1939报文""" j1939_loog_data = [{'cycle': 1000, "pgn": 65251, "dp": 0, "pf": 254, "ps": 227, "p": 6, "sa": 0, "data": [0, 25, 132, 112, 63, 137, 64, 31, 133, 128, 37, 133, 32, 53, 136, 155, 66, 255, 255, 164, 3, 0, 0,60, 70, 213, 125, 133, 155, 66, 134, 0, 255, 255, 255, 255, 255, 255, 255,255]}] # j1939长帧报文 j1939_data = [{'cycle': 250, "can_id": 0x18FEDF00, 'pgn': 65247, "data": [130, 128, 37, 125, 251, 255, 255, 240]}, {'cycle': 50, "can_id": 0x18F00400, "pgn": 61444, "data": [14, 189, 130, 0, 0, 0, 0, 125]}, {'cycle': 100, "can_id": 0x18FE9200, "pgn": 65170,"data": [255, 255, 254, 255, 255, 255, 0, 0]}] # j1939短帧报文 for i in j1939_loog_data: i['currenttime'] = int(round(time.time() * 1000)) for i in j1939_data: i['currenttime'] = int(round(time.time() * 1000)) while True: currenttime = int(round(time.time() * 1000)) for i in j1939_loog_data: interval = currenttime - i['currenttime'] if interval >= i['cycle']: ecu.send_pgn(data_page=i["dp"], pdu_format=i["pf"], pdu_specific=i["ps"], priority=i["p"], src_address=i["sa"], data=i["data"]) # 发送长帧报文 print('PGN%d:长帧报文发送成功' % i['pgn']) i['currenttime'] = currenttime # 更新发送时间 for i in j1939_data: interval = currenttime - i['currenttime'] if interval >= i['cycle']: ecu.send_message(can_id=i["can_id"], data=i["data"]) # 发送短帧报文 print('PGN%d:短帧报文发送成功' % i['pgn']) i['currenttime'] = currenttime # 更新发送时间 if __name__ == '__main__': print("初始化总线") ecu = j1939.ElectronicControlUnit() # 创建ECU ecu.connect(bustype='pcan', channel='PCAN_USBBUS1', bitrate=500000) # 连接CAN总线 # s1 = threading.Thread(target=recv_j1939, name="接收J1939消息线程",args=(10,)) # s1.start() s2 = threading.Thread(target=cycle_send_j1939, name="发送J1939消息线程") s2.start() # print('取消初始化') # ecu.disconnect() #断开总线连接