一、安装
pip install websocket-client
二、使用示例
import websocket
try:
import thread
except ImportError:
import _thread as thread
import time
def on_message(ws, message):
print(message)
def on_error(ws, error):
print(error)
def on_close(ws, close_status_code, close_msg):
print("### closed ###")
def on_open(ws):
def run(*args):
for i in range(3):
time.sleep(1)
ws.send("Hello %d" % i)
time.sleep(1)
ws.close()
print("thread terminating...")
thread.start_new_thread(run, ())
if __name__ == "__main__":
websocket.enableTrace(True)
ws = websocket.WebSocketApp("ws://echo.websocket.org/",
on_open=on_open,
on_message=on_message,
on_error=on_error,
on_close=on_close)
ws.run_forever()
from websocket import create_connection
ws = create_connection("ws://echo.websocket.org/")
print("Sending 'Hello, World'...")
ws.send("Hello, World")
print("Sent")
print("Receiving...")
result = ws.recv()
print("Received '%s'" % result)
ws.close()
三、个人使用示例
在个人实际项目中,tornado框架中集成了websocket,考虑到上传地图时,断网后上传进度与实际情况不一致的问题,故在上传完成之后,服务器主动断开连接,将断开连接的配置信息写入文件。
当下次客户端发起上传请求时,先去读取配置文件,判断是否断开连接,然后进行后续操作。主要代码如下:
utils.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# create time : 2021/7/2 10:58
import json
import threading
from common.logger import Logger
from common.constant import CONF_PATH
threadLock = threading.Lock()
logger = Logger.getLogger()
def process_exception_handler(call_srv_method, resultEntity, result_code_info, *args, **kwargs):
"""调用srv服务异常的处理:捕获异常,将异常信息返回给调用者"""
try:
resp = call_srv_method(*args, **kwargs)
result_code_info[call_srv_method] = resp.result_code
except Exception as e:
resultEntity.error_msg = e.__str__()
resultEntity.result_code = -1
result_code_info[call_srv_method] = -1
return resultEntity
else:
return resp
class CustomThread(threading.Thread):
"""自定义线程类"""
def __init__(self, func, ws_manager, device_id):
threading.Thread.__init__(self)
self.ws_manager = ws_manager
self.device_id = device_id
self.func = func
def run(self):
logger.info("当前{}线程正在运行...".format(self.func.__name__))
# 获得锁,成功获得锁定后返回True
# 否则超时后将返回False
# threadLock.acquire()
# 释放锁
# threadLock.release()
self.func(self.ws_manager, self.device_id)
def read_ws_conn_conf_info():
"""获取websocket连接的配置信息"""
with open(CONF_PATH, "r") as fp:
conf_data = json.load(fp)
return conf_data
def write_ws_conn_conf_info(is_close=0):
"""写入websocket连接的配置信息"""
with open(CONF_PATH, "w") as fp:
conf_info = {
"is_close": is_close
}
json.dump(conf_info, fp)
websocket_handler.py
# -*- coding: utf-8 -*-
import uuid
import json
from tornado.websocket import WebSocketHandler
from system_status.entity.system_status_entity import SystemStatusEntity
from login.entity.user_login_entity import UserLoginEntity
from common.result_entity import ResultEntity
from common.jsonrpc_result import JsonrpcResult
from common.logger import Logger
from common.utils import write_ws_conn_conf_info
logger = Logger.getLogger()
class WSHandlerClient():
def __init__(self, device_id=None, ws_handler=None, ws_uuid=None):
self.device_id = device_id
self.ws_handler = ws_handler
self.ws_uuid = ws_uuid
def __repr__(self):
return 'WSHandlerClient [device_id:{}, ws_uuid:{}]'.format(self.device_id, self.ws_uuid)
class WebSocketManager():
def __init__(self, device_id=None):
self.device_id = device_id
self.ws_clients = set()
self.ws_conn_id = 1
def get_ws_conn_id(self):
"""获取websocket连接的id"""
self.ws_conn_id += 1
return self.ws_conn_id
def add_ws_handler(self, ws_client):
self.ws_clients.add(ws_client)
def rm_ws_handler(self, ws_client):
self.ws_clients.remove(ws_client)
def get_ws_client(self, device_id=None):
for ws_client in self.ws_clients:
if ws_client.device_id == device_id:
return ws_client
class WSHandler(WebSocketHandler):
def initialize(self, ws_manager=None):
self.ws_manager = ws_manager
self.ws_uuid = None
self.ws_client = None
self.is_close = False
def check_origin(self, origin):
"""判断请求源,对于符合条件的请求源允许其连接,否则返回403。可重写此方法来解决WebSocket的跨域请求"""
return True
def open(self):
"""当一个WebSocket连接建立后被调用"""
self.ws_uuid = str(uuid.uuid4())
self.is_close = 0
# self.ThreadLocalObj.is_close = self.is_close
logger.info("open method is_close state:{}".format(self.is_close))
write_ws_conn_conf_info(self.is_close)
logger.info("[{}]-websocket 建立连接...".format(self.ws_uuid))
@property
def get_ws_uuid(self):
logger.info("当前websocket连接对象的UUID: {}".format(self.ws_uuid))
return self.ws_uuid
def on_message(self, message):
"""当客户端发送消息过来时被调用,此方法必须被重写。"""
logger.info("当客户端发送消息过来时被调用......")
msg_data = message.decode('utf-8')
logger.info("APP发送过来的数据... {}
".format(msg_data))
ws_data = json.loads(msg_data)
ws_method = ws_data.get('method')
ws_params = ws_data.get('params')
ws_conn_id = ws_data.get('id')
if ws_method.lower() == 'system_status':
systemStatusEntity = SystemStatusEntity()
systemStatusEntity.from_dict(ws_params)
logger.info("系统状态数据:{}".format(systemStatusEntity))
elif ws_method.lower() == 'login':
user_login_entity = UserLoginEntity()
user_login_entity.from_dict(ws_params)
device_id = user_login_entity.device_id
if device_id:
self.ws_client = WSHandlerClient(device_id, self, self.get_ws_uuid)
self.ws_manager.add_ws_handler(self.ws_client)
resultEntity = ResultEntity()
resultEntity.result_code = 0
json_rpc_ret = JsonrpcResult()
result = list()
result.append(resultEntity)
json_rpc_ret.result = result
json_rpc_ret.id = ws_conn_id
to_app_data = json_rpc_ret.to_json_string()
logger.info("返回给APP: 用户登录数据
:{}".format(to_app_data))
self.is_close = 0
logger.info("on_message method is_close state:{}".format(self.is_close))
write_ws_conn_conf_info(self.is_close)
self.write_message(to_app_data)
# self.ThreadLocalObj.is_close = self.is_close
def on_close(self):
"""客户端关闭时被调用"""
if self.ws_client is not None:
logger.info("[{}]-websocket 客户端正在关闭 ...".format(self.ws_uuid))
self.ws_manager.rm_ws_handler(self.ws_client)
self.is_close = 1
# self.ThreadLocalObj.is_close = self.is_close
write_ws_conn_conf_info(self.is_close)
logger.info("on_close method is_close state:{}".format(self.is_close))
def close(self, code=None, reason=None):
"""关闭连接时调用"""
if self.ws_client is not None:
logger.info("[{}]-websocket连接正在关闭...".format(self.ws_uuid))
self.ws_manager.rm_ws_handler(self.ws_client)
self.is_close = 1
# self.ThreadLocalObj.is_close = self.is_close
write_ws_conn_conf_info(self.is_close)
logger.info("close method is_close state:{}".format(self.is_close))
def write_msgs(self, message=None):
"""向客户端发送消息,消息可以是字符串或字典(字典会被转换为JSON字符串),
若binary为False则消息会以utf8编码发送,否则以二进制格式发送。"""
messages = json.loads(message)
method = messages.get("method").lower()
reply_type = messages.get("params").get("reply_type")
if method == "map_upload_progress" and reply_type == 3:
# 如果传完了,手动断开与客户端的连接
self.close()
try:
self.write_message(message)
except Exception as ex:
logger.error(ex)