zoukankan      html  css  js  c++  java
  • saltstack系列(五)——zmq扩展(一)

    问题

    假设我们的一个客户端既有pull又有sub,他们两个都需要接收消息,该如何协调呢,毕竟,当一个socket要收消息的时候,函数recv是阻塞的,所以,我们第一个思路是不让它阻塞?

    实例代码:

    #coding=utf-8  
    ''''' 
    在这里,同时处理多个套接字,那么接收消息的时候,就需要设置noblock 
    不然会在第一个接收消息的地方堵塞 
    '''  
    import zmq  
    import time  
      
    # Prepare our context and sockets  
    context = zmq.Context()  
      
    # Connect to task ventilator  
    receiver = context.socket(zmq.PULL)  
    receiver.connect("tcp://localhost:8000")  
      
    # Connect to weather server  
    subscriber = context.socket(zmq.SUB)  
    subscriber.connect("tcp://localhost:8001")  
    subscriber.setsockopt(zmq.SUBSCRIBE, b"10001")  
      
    # Process messages from both sockets  
    # We prioritize traffic from the task ventilator  
    while True:  
      
        # Process any waiting tasks  
        while True:  
            try:  
                #用了NOBLOCK,就意味着得不到消息时不会堵塞在这里  
                msg = receiver.recv(zmq.NOBLOCK)  
            except zmq.ZMQError:  
                break  
            # process task  
      
        # Process any waiting weather updates  
        while True:  
            try:  
                msg = subscriber.recv(zmq.NOBLOCK)  
            except zmq.ZMQError:  
                break  
            # process weather update  
      
        # No activity, so sleep for 1 msec  
        time.sleep(0.001)  
    

    通过设置zmq.NOBLOCK,我们可以让recv不再阻塞,但是呢,要捕捉zmq.ZMQError这个异常,这样一来,两个套接字就可以不发生冲突了。

    但是明显,你可以感受得到,这种做法的丑陋,看起来不是那么的优雅,所以我们换一种做法。

    #coding=utf-8  
    ''''' 
    这种方式比msreader要更好一些 
    '''  
    import zmq  
      
    # Prepare our context and sockets  
    context = zmq.Context()  
      
    # Connect to task ventilator  
    receiver = context.socket(zmq.PULL)  
    receiver.connect("tcp://localhost:8000")  
      
    # Connect to weather server  
    subscriber = context.socket(zmq.SUB)  
    subscriber.connect("tcp://localhost:8001")  
    subscriber.setsockopt(zmq.SUBSCRIBE, b"10001")  
      
    # Initialize poll set  
    poller = zmq.Poller()  
    poller.register(receiver, zmq.POLLIN)  
    poller.register(subscriber, zmq.POLLIN)  
      
    # Process messages from both sockets  
    while True:  
        try:  
            socks = dict(poller.poll())  
        except KeyboardInterrupt:  
            break  
      
        if receiver in socks:  
            message = receiver.recv()  
            # process task  
      
        if subscriber in socks:  
            message = subscriber.recv()  
            # process weather update  
    

    这种做法就很想socket的select模式,大家谁也别争,谁也别抢,只要有消息达到,我就通知你们,然后你们各自检查是不是自己的消息。我们在客户端创建多个socket套接字可能是合理的,但是服务端就最好别这么做了,REQ,PUSH,PUB,道理其实也很简单,服务就是服务,多个员工可以挤在一个办公司里办公,哪有多个老板挤在一起办公的。  

      

  • 相关阅读:
    grpc xservice 使用
    modsecurity3.0 nginx 安装
    scrapy docker 基本部署使用
    fabio 安装试用&&实际使用的几个问题
    yugabyte cloud native db 基本试用
    coredns 编译模式添加插件
    gradle 项目构建以及发布maven 私服&& docker 私服构建发布
    groovy gradle 构建配置
    groovy && java 混编 gradle 配置
    gradle 构建包含源码配置
  • 原文地址:https://www.cnblogs.com/yezl/p/6604965.html
Copyright © 2011-2022 走看看