转载:https://www.cnblogs.com/huny/p/14079320.html
3.使用一个demo测试网站:https://www.websocket.org/echo.html 进行演示。
import json from websocket import create_connection url = 'wss://echo.websocket.org'#websocket连接地址,地址为虚拟地址 #websocket.enableTrace(True) #打开跟踪,查看日志 while True: ws = create_connection(url) #创建连接 data =input('输入传输消息:') #测试数据 # new_data=json.dumps(data,ensure_ascii=False) #将data转化为字符串 ws.send(data) #发送请求 print('收到回复消息:',ws.recv()) #打印服务器响应数据 if data == 'q': ws.close() #关闭连接 break
4.实战
公司开发的websocket接口只是单纯的建立连接进行监听,并不支持发送和接受数据。
import unittest from websocket import create_connection import websocket class Test_websoket(unittest.TestCase): def test_websk(self): '''websocket 消息推送: /socketServer/${userId} ''' url = 'ws://10.10.100.224:9997/socketServer/528' # websocket连接地址 websocket.enableTrace(True) # 打开跟踪,查看日志 ws = create_connection(url) # 创建连接 print(ws.getstatus()) # 打印连接状态 self.assertEqual(101, ws.getstatus(), 'websocket连接错误') # 断言连接状态 # ws.settimeout(10) #设置超时时间 # print(ws.gettimeout()) #获取超时时间 # ws.shutdown() # 关闭连接 if __name__ == '__main__': unittest.main()
注意这里返回的状态是101,表示协议切换的意思,于是可以用来断言是否连接成功。