zoukankan      html  css  js  c++  java
  • 用zmq的pub/sub+flask实现异步通信的研究

    zmq_client监听端代码:

    #coding=utf8
    ## client.py
    import zmq
    import sys
    import time
    import logging
    import os
    
    HOST = '10.1.240.229'#服务器运行的监听地址,需要与服务器运行脚本中设定的监听地址一致,若同一台服务器
    PORT = '4444'#服务器运行的监听端口,不可以与对外业务接口相同
    
    logging.basicConfig(filename='subscriber.log', level=logging.INFO)
    
    
    class ZClient(object):
    
        def __init__(self, host=HOST, port=PORT):
            """Initialize Worker"""
            self.host = host
            self.port = port
            self._context = zmq.Context()
            self._subscriber = self._context.socket(zmq.SUB)
            print "Client Initiated"
        
        def receive_message(self):
            """Start receiving messages"""
            self._subscriber.connect('tcp://{}:{}'.format(self.host, self.port))
            self._subscriber.setsockopt(zmq.SUBSCRIBE, b"")
        
            while True:
                print 'listening on tcp://{}:{}'.format(self.host, self.port)
                message = self._subscriber.recv()
                time.sleep(1)
                print "sub",message
                time.sleep(100)#可以放置延时非常大的任务
                print "sub",message
                logging.info(
                    '{}   - {}'.format(message, time.strftime("%Y-%m-%d %H:%M")))
    
    if __name__ == '__main__':
        zs = ZClient()
        zs.receive_message() 

    服务端代码,这里采用flask:

    #coding=utf8
    # server.py 
    import time 
    import zmq 
    import json
    import flask
    
    HOST = '10.1.240.229' #若写成127.0.0.1,则为默认本机ip
    PORT = '4444' #监听端口
    
    _context = zmq.Context() 
    _publisher = _context.socket(zmq.PUB) 
    url = 'tcp://{}:{}'.format(HOST, PORT) 
    
    def publish_message(message):
        try: 
            _publisher.bind(url) 
            time.sleep(1) 
            _publisher.send(message) 
        
        except Exception as e: 
            print "error {}".format(e) 
        finally:
            _publisher.unbind(url) 
    
    from flask import Flask 
    from flask import request 
    app = Flask(__name__) 
    
    @app.route("/index/", methods=['POST']) 
    def lowerString(): 
    
        received_dict = json.loads(flask.request.data)
        #_strn = request.args.get('param') 
        #response = 'lower case of {} is {}'.format(_strn, _strn.lower()) #将请求转换成小写
        #print received_dict
        response=received_dict
        ret=json.dumps(response)
        publish_message(ret )
        return ret
    
    if __name__ == '__main__': 
        host="10.1.240.202"#对外服务地址
        port = 7001#对外服务端口
        debug = True
        app.run(host, port, debug)

    用户访问服务器的对外服务端口时,服务器会进行处理,将部分消息send到一直监听的监听端,然后不需要关心监听端如何处理就给用户端一个消息结果。监听端收到服务端发来的消息后进行处理,处理完毕后进行相应操作。

    这种框架一个好处是大大提高服务器的运行效率,不需要等待全部处理完毕后再返回,还有个好处是服务器和监听端可以部署在不同的机器上,可以根据业务需求合理调配硬件资源

    要深入的人可以参看http://blog.csdn.net/kwsy2008/article/details/49449781这篇博客以及博主的其他文章

    以下附上从http://www.cnblogs.com/rainbowzc/p/3357594.html搬来的原理图

  • 相关阅读:
    关于MySQL INT类型长度的疑惑
    uwp 的锁屏功能
    QT5 动态链接库的创建和使用
    Qt 自定义事件的实现
    学习k8s本地虚拟机安装minikube
    建议收藏!细说HashMap实现,Hash冲突模拟思路讲解。
    95. Unique Binary Search Trees II
    96. Unique Binary Search Trees(dp)
    574 React系列(一)
    如何在Win7电脑上增加新磁盘分区?
  • 原文地址:https://www.cnblogs.com/slqt/p/5481376.html
Copyright © 2011-2022 走看看