zoukankan      html  css  js  c++  java
  • python zmq(ZeorMQ)

    pip install pyzmq

    ZeroMQ位于OSI模型的表示层,使用后台异步线程完成消息的接收和发送,大大简化了编程的复杂度。

    传统的TCP Socket连接时1-1的,可以认为"1个socket=1个连接",每个线程独立维护一个socket,但在zmq中实现了1-n,m-n的连接模式,一个zmq socket维护一组连接,用户只可以操作socket,而不可以操作这些连接。zmq socket特殊的机制去区分多个连接,用户不需要关心。

    另外,因为zmq socket使用后台异步线程,因此zmq不允许在线程之间共享socket,不然会报错

    zmq.error.ZMQError: Operation cannot be accomplished in current state

    三种模式:

    一、Request-Reply模式

    一问一答,客户端request,服务器reply

    哪方先先启动都可以,客户端中途断掉和服务端在reply后断掉都无所谓。

    server:

    import zmq
    
    context = zmq.Context()
    socket = context.socket(zmq.REP) # 设置socket的类型
    socket.bind('tcp://*:15000') # 端口绑定
    
    message = socket.recv() # 收到的是byte类型
    print(message)
    
    socket.send_string('copy!')

    client:

    import zmq
    
    context = zmq.Context()
    socket = context.socket(zmq.REQ)
    socket.connect('tcp://localhost:15000')
    socket.send_string('request')
    
    message = socket.recv()
    print(message)

    二、Publisher-Subscriber模式

    一对多,一个发布者,若干订阅者。订阅者端可以通过设置过滤器过滤数据。

    Publisher

    import zmq
    from random import randrange
     
    context = zmq.Context()
    socket = context.socket(zmq.PUB)
    socket.bind("tcp://*:15000")
     
    while True:
        socket.send_string("message")

    Subscriber

    import sys
    import zmq
    
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    socket.connect("tcp://localhost:15000")
     
    # 过滤器
    zip_filter = sys.argv[1] if len(sys.argv) > 1 else "10002"
    socket.setsockopt(zmq.SUBSCRIBE, zip_filter)
    
    for i in range(5):
        msg = socket.recv()
        print msg 
     
    print(msg)

    三、Push-Pull模式

    服务端push,所有连接在服务端的客户端pull,不同的是只有一个客户端可以pull,它们之间存在竞争,具体机制不需要了解,此模式类似于负载均衡。

    Server

    import zmq
    
    context = zmq.Context()
    server = context.socket(zmq.PUSH)
    server.bind('tcp://*:15000')
    
    while True:
        server.send_string('Push')

    Client

    import zmq
    
    context = zmq.Context()
    client = context.socket(zmq.PULL)
    client.connect('tcp://localhost:15000')
     
    while True:
        msg = client.recv()

    问题1:如果客户端既需要pull模式 又需要subscriber模式的socket

    import zmq
     
    context = zmq.Context()
     
    receiver = context.socket(zmq.PULL)
    receiver.connect("tcp://localhost:5557")
     
    subscriber = context.socket(zmq.SUB)
    subscriber.connect("tcp://localhost:5556")
    subscriber.setsockopt(zmq.SUBSCRIBE, b"10001")
     
    poller = zmq.Poller()
    poller.register(receiver, zmq.POLLIN)
    poller.register(subscriber, zmq.POLLIN)
     
    while True:
        try:
            socks = dict(poller.poll())
        except KeyboardInterrupt:
            break
     
        if receiver in socks:
            message = receiver.recv()
     
        if subscriber in socks:
            message = subscriber.recv()

    问题2:在Request-Reply模式下,如果服务端压力过大,如何给服务端负载均衡

     可以通过增加中间代理,来自动分摊来自客户端的Request。

    Server

    import zmq
     
    context = zmq.Context()
    socket = context.socket(zmq.REP)
    socket.connect("tcp://localhost:15000")
     
    while True:
        message = socket.recv()
        socket.send_string("msg")

    Urgent

    import zmq
     
    # Prepare our context and sockets
    context = zmq.Context()
     
    frontend = context.socket(zmq.ROUTER)
    backend = context.socket(zmq.DEALER)
    frontend.bind("tcp://*:15001")
    backend.bind("tcp://*:15002")
     
    poller = zmq.Poller()
    poller.register(frontend, zmq.POLLIN)
    poller.register(backend, zmq.POLLIN)
     
    while True:
        socks = dict(poller.poll())
        
        if socks.get(frontend) == zmq.POLLIN:
            message = frontend.recv_multipart()
            backend.send_multipart(message)
        
        if socks.get(backend) == zmq.POLLIN:
            message = backend.recv_multipart()
            frontend.send_multipart(message)

    Client

    import zmq
    
    context = zmq.Context()
    socket = context.socket(zmq.REQ)
    socket.connect("tcp://localhost:15001")
     
    for request in range(1,11):
        socket.send_string("Hello")
        message = socket.recv()
    socket.close()
    context.term()
  • 相关阅读:
    shell数组(产生不同的随机数)
    统计服务连接状况
    子网掩码与子网划分
    oracle 12g sqlplus安装
    MySQL的备份和还原
    mysql日志
    mysql用户和权限管理
    mysql show
    CentOS Linux解决Device eth0 does not seem to be present
    mysqldump命令详解(转载)
  • 原文地址:https://www.cnblogs.com/LMIx/p/12677787.html
Copyright © 2011-2022 走看看