zoukankan      html  css  js  c++  java
  • python3发送需要双向认证的wss请求

    python3发送需要双向认证的wss请求
    websocket链接python有很多封装好的库:websocket-client、websockets、aiowebsocket 这里用的websokets
    此次接口要求:1、需要双向认证 2、wss协议 3、发送数据和接受数据都需要序列化和反序列化(probuff)

     1 # encoding = utf-8
     2 import asyncio
     3 import pathlib
     4 import ssl
     5 import websockets
     6 import base64
     7 from pb import conn_ready_pb2
     8 import uuid
     9 import datetime,time
    10 import json
    11 from google.protobuf import json_format
    12 # 服务端声明CLIENT_AUTH
    13 ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
    14 #Purpose.CLIENT_AUTH该值表示该上下文可用于对Web客户端进行身份验证(因此,它将用于创建服务器端套接字),carfile服务端证书
    15 ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH,cafile="D://my_project//Ajcloud//ca.crt")
    16 # certfile和keyfile参数为指定包含证书被用来识别连接的本地端可选的文件,客户端证书
    17 ssl_context.load_cert_chain(certfile="D://my_project//Ajcloud//mgr.crt", keyfile="D://my_project//Ajcloud//mgr.key")
    18 ssl_context.verify_mode = ssl.CERT_REQUIRED
    19 
    20 async def hello():
    21     uri = "wss://xxxxxxxxxx"
    22     async with websockets.connect(
    23             uri, ssl=ssl_context
    24     ) as websocket:
    25         message = "xxxxx"
    26         reg = conn_ready_pb2.ConnReadyRequest()
    27         res = conn_ready_pb2.ConnReadyResponse()
    28         reg.header.message_id = str(uuid.uuid1())
    29         datetime_now = datetime.datetime.now()
    30         date_stamp = str(int(time.mktime(datetime_now.timetuple())))
    31         data_microsecond = str("%06d" % datetime_now.microsecond)[0:3]
    32         date_stamp = date_stamp + data_microsecond
    33         reg.header.message_ts = date_stamp
    34         r = reg.SerializeToString()#序列化
    35         b = base64.b64encode(r)  # 将二进制转为base64编码
    36         b2 = str(b, 'utf-8')
    37         message = {"xxx": "xxx", "yyyy": b2}
    38         await websocket.send(json.dumps(message))
    39         print(f"> {message}")
    40         greeting = await websocket.recv()
    41         payload = json.loads(greeting)["payload"]
    42         # base64转码成二进制
    43         payload =base64.b64decode(payload)
    44         res.ParseFromString(payload)#反序列化
    45         json_result = json_format.MessageToJson(res)  # pb转成json格式
    46         print(json_result)
    47         print(f"< {greeting}")
    48 
    49 
    50 asyncio.get_event_loop().run_until_complete(hello())
    ------ 往事如烟,伴着远去的步伐而愈加朦胧。未来似雾,和着前进的风儿而逐渐清晰!
  • 相关阅读:
    Desert King
    Dropping tests
    01分数规划小结
    简单的数学题
    [HAOI2016]放棋子
    [SDOI2017]数字表格
    诸侯放置
    LJJ爱数数
    车的放置
    [SDOI2014]数表
  • 原文地址:https://www.cnblogs.com/cutesnow/p/15045224.html
Copyright © 2011-2022 走看看