zoukankan      html  css  js  c++  java
  • python zmq(ZeorMQ)

    pip install pyzmq

    ZeroMQ位于OSI模型的表示层,使用后台异步线程完成消息的接收和发送,大大简化了编程的复杂度。

    传统的TCP Socket连接时1-1的,可以认为"1个socket=1个连接",每个线程独立维护一个socket,但在zmq中实现了1-n,m-n的连接模式,一个zmq socket维护一组连接,用户只可以操作socket,而不可以操作这些连接。zmq socket特殊的机制去区分多个连接,用户不需要关心。

    另外,因为zmq socket使用后台异步线程,因此zmq不允许在线程之间共享socket,不然会报错

    zmq.error.ZMQError: Operation cannot be accomplished in current state

    三种模式:

    一、Request-Reply模式

    一问一答,客户端request,服务器reply

    哪方先先启动都可以,客户端中途断掉和服务端在reply后断掉都无所谓。

    server:

    import zmq
    
    context = zmq.Context()
    socket = context.socket(zmq.REP) # 设置socket的类型
    socket.bind('tcp://*:15000') # 端口绑定
    
    message = socket.recv() # 收到的是byte类型
    print(message)
    
    socket.send_string('copy!')

    client:

    import zmq
    
    context = zmq.Context()
    socket = context.socket(zmq.REQ)
    socket.connect('tcp://localhost:15000')
    socket.send_string('request')
    
    message = socket.recv()
    print(message)

    二、Publisher-Subscriber模式

    一对多,一个发布者,若干订阅者。订阅者端可以通过设置过滤器过滤数据。

    Publisher

    import zmq
    from random import randrange
     
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.bind("tcp://*:15000")
     
    while True:
        socket.send_string("message")

    Subscriber

    import sys
    import zmq
    
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    socket.connect("tcp://localhost:15000")
     
    # 过滤器
    zip_filter = sys.argv[1] if len(sys.argv) > 1 else "10002"
    socket.setsockopt(zmq.SUBSCRIBE, zip_filter)
    
    for i in range(5):
        msg = socket.recv()
        print msg 
     
    print(msg)

    三、Push-Pull模式

    服务端push,所有连接在服务端的客户端pull,不同的是只有一个客户端可以pull,它们之间存在竞争,具体机制不需要了解,此模式类似于负载均衡。

    Server

    import zmq
    
    context = zmq.Context()
    server = context.socket(zmq.PUSH)
    server.bind('tcp://*:15000')
    
    while True:
        server.send_string('Push')

    Client

    import zmq
    
    context = zmq.Context()
    client = context.socket(zmq.PULL)
    client.connect('tcp://localhost:15000')
     
    while True:
        msg = client.recv()

    问题1:如果客户端既需要pull模式 又需要subscriber模式的socket

    import zmq
     
    context = zmq.Context()
     
    receiver = context.socket(zmq.PULL)
    receiver.connect("tcp://localhost:5557")
     
    subscriber = context.socket(zmq.SUB)
    subscriber.connect("tcp://localhost:5556")
    subscriber.setsockopt(zmq.SUBSCRIBE, b"10001")
     
    poller = zmq.Poller()
    poller.register(receiver, zmq.POLLIN)
    poller.register(subscriber, zmq.POLLIN)
     
    while True:
        try:
            socks = dict(poller.poll())
        except KeyboardInterrupt:
            break
     
        if receiver in socks:
            message = receiver.recv()
     
        if subscriber in socks:
            message = subscriber.recv()

    问题2:在Request-Reply模式下,如果服务端压力过大,如何给服务端负载均衡

     可以通过增加中间代理,来自动分摊来自客户端的Request。

    Server

    import zmq
     
    context = zmq.Context()
    socket = context.socket(zmq.REP)
    socket.connect("tcp://localhost:15000")
     
    while True:
        message = socket.recv()
        socket.send_string("msg")

    Urgent

    import zmq
     
    # Prepare our context and sockets
    context = zmq.Context()
     
    frontend = context.socket(zmq.ROUTER)
    backend = context.socket(zmq.DEALER)
    frontend.bind("tcp://*:15001")
    backend.bind("tcp://*:15002")
     
    poller = zmq.Poller()
    poller.register(frontend, zmq.POLLIN)
    poller.register(backend, zmq.POLLIN)
     
    while True:
        socks = dict(poller.poll())
        
        if socks.get(frontend) == zmq.POLLIN:
            message = frontend.recv_multipart()
            backend.send_multipart(message)
        
        if socks.get(backend) == zmq.POLLIN:
            message = backend.recv_multipart()
            frontend.send_multipart(message)

    Client

    import zmq
    
    context = zmq.Context()
    socket = context.socket(zmq.REQ)
    socket.connect("tcp://localhost:15001")
     
    for request in range(1,11):
        socket.send_string("Hello")
        message = socket.recv()
    socket.close()
    context.term()
  • 相关阅读:
    ABAPNote001
    ABAPNote002
    共享WinCE6.0 下的一个软件升级程序
    EF参数化查询
    VS2010 调用RFC 时注意(.net4.0) 使用nco
    RDLC报表问题
    参数化查询与拼接SQL代码执行测试
    ABAPNOTE 获取释放的定单
    Wince6.0 + xpsp3+VS2005的一些问题
    BackgroundWorker使用备忘
  • 原文地址:https://www.cnblogs.com/LMIx/p/12677787.html
Copyright © 2011-2022 走看看