zoukankan      html  css  js  c++  java
  • python redis模块消息发布和订阅

    一、需求:

    公司CICD平台前后端是分离的,后端用的是python,前端这块主要用的vue;中间有一个环节是将后端应用发布的结果信息通知给前端处理后展示在前端页面。早期的做法是以redis为媒介,将发布结果以广播的形式推送给redis,然后前端监听redis上对应频道的消息,接收到消息处理后发送给前端。
    git地址:https://github.com/f1017746640/redisapi.git

    二、代码逻辑部分:

    2.1 代码结构:

    2.2 config.ini为redis连接信息:

    [DEFAULT]
    redis_host=10.0.0.201
    redis_port=30213
    redis_db=6
    redis_password=None
    
    [LOCAL]
    
    [DEV]
    redis_host=10.0.0.201
    redis_port=30213
    redis_db=6
    redis_password=
    redis_channel=--deploy--
    
    [QA]
    
    [PROD]

    2.3 rds.py 基于redis模块包装的消息发布和订阅的类:

    #!/usr/bin/env python
    # encoding: utf-8
    """
       > FileName: rds.py
       > Author: FZH
       > Mail: fengzhihai@ilarge.cn
       > CreatedTime: 2020-03-22 16:49
    """
    import redis
    
    class RedisHelper(object):
        """
        host: redis ip
        port: redis port
        channel: 发送接送消息的频道
        """
        def __init__(self,
                     host,
                     port,
                     db,
                     channel,
                     password=None):
            self.host = host
            self.port = port
            self.db = db
            self.password = password
            self.channel = channel
            self.__conn = redis.Redis(self.host,
                                      self.port,
                                      self.db,
                                      self.password,
                                      decode_responses=True)
    
        def ping(self):
            try:
                self.__conn.ping()
                return True
            except Exception as e:
                print(e)
                return False
    
        # 发送消息
        def public(self,
                   msg):
            if self.ping():
                self.__conn.publish(self.channel,
                                    msg)
                return True
            else:
                return False
    
        # 订阅
        def subscribe(self):
            if self.ping():
                # 打开收音机
                pub = self.__conn.pubsub()
                # 调频道
                pub.subscribe(self.channel)
                # 准备接收
                pub.parse_response()
                return pub
            else:
                return False

    2.4 public_info.py 发送消息:

    #!/usr/bin/env python
    # encoding: utf-8
    """
       > FileName: public_info.py
       > Author: FZH
       > Mail: fengzhihai@ilarge.cn
       > CreatedTime: 2020-03-22 17:06
    """
    import json
    import configparser
    
    from rds import RedisHelper
    
    config = configparser.ConfigParser()
    config_file = "config.ini"
    config.read(config_file)
    sec = config["DEV"]
    
    redis_host = sec["redis_host"]
    redis_port = sec["redis_port"]
    redis_db = sec["redis_db"]
    redis_channel = sec["redis_channel"]
    redis_password = sec["redis_password"]
    if len(redis_password) == 0:
        redis_password = None
    
    def pub_info(app_name,
                 app_version,
                 app_owner,
                 app_desc,
                 deploy_res):
        info = {
            "app_name": app_name,
            "app_version": app_version,
            "app_owner": app_owner,
            "app_desc": app_desc,
            "deploy_res": deploy_res
        }
        obj = RedisHelper(host=redis_host,
                          port=redis_port,
                          db=redis_db,
                          channel=redis_channel,
                          password=redis_password)
        res = obj.public(msg=json.dumps(info))
        if res:
            print('public successful.')
        else:
            print('public fail.')
    
    if __name__ == '__main__':
        app_name = 'inf-demo'
        app_version = '1.1'
        app_owner = 'fengzhihai'
        app_desc = 'inf group demo application'
        deploy_res = 'SUCCESS'
        pub_info(app_name=app_name,
                 app_version=app_version,
                 app_owner=app_owner,
                 app_desc=app_desc,
                 deploy_res=deploy_res)

    2.5 subscribe_info.py 订阅消息:

    #!/usr/bin/env python
    # encoding: utf-8
    """
       > FileName: subscribe_info.py
       > Author: FZH
       > Mail: fengzhihai@ilarge.cn
       > CreatedTime: 2020-03-22 17:07
    """
    import time
    import configparser
    
    from rds import RedisHelper
    
    config = configparser.ConfigParser()
    config_file = "config.ini"
    config.read(config_file)
    sec = config["DEV"]
    
    redis_host = sec["redis_host"]
    redis_port = sec["redis_port"]
    redis_db = sec["redis_db"]
    redis_channel = sec["redis_channel"]
    redis_password = sec["redis_password"]
    if len(redis_password) == 0:
        redis_password = None
    
    def sub_info():
        obj = RedisHelper(host=redis_host,
                          port=redis_port,
                          db=redis_db,
                          channel=redis_channel,
                          password=redis_password)
        sub_obj = obj.subscribe()
        while True:
            if sub_obj:
                now_time = time.strftime('%Y-%m-%d %H:%M:%S',
                                         time.localtime(time.time()))
                msg = sub_obj.parse_response()
                print('receive:', now_time, msg)
            else:
                break
    
    if __name__ == '__main__':
        sub_info()

    三、测试:

    3.1 开启循环 监听redis channel消息:

    3.2 发送消息:

    python public_info.py

    修改app_name再次发送消息。

    3.3 监听结果:

    拿到发布结果后,我们就可以对结果进行分析再处理,在前端展示了。

  • 相关阅读:
    编写程序,验证string是ipV4地址
    TCP三次握手和四次挥手
    链表和数组的区别
    cookie和session的区别
    GET和POST的区别
    TCP和UDP的区别
    java HashMap和Hashtable的区别
    java 堆和栈的区别
    最小栈的实现
    关于几个位运算的算法分析
  • 原文地址:https://www.cnblogs.com/fengzhihai/p/12548462.html
Copyright © 2011-2022 走看看