zoukankan      html  css  js  c++  java
  • python redis模块消息发布和订阅

    一、需求:

    公司CICD平台前后端是分离的,后端用的是python,前端这块主要用的vue;中间有一个环节是将后端应用发布的结果信息通知给前端处理后展示在前端页面。早期的做法是以redis为媒介,将发布结果以广播的形式推送给redis,然后前端监听redis上对应频道的消息,接收到消息处理后发送给前端。
    git地址:https://github.com/f1017746640/redisapi.git

    二、代码逻辑部分:

    2.1 代码结构:

    2.2 config.ini为redis连接信息:

    [DEFAULT]
    redis_host=10.0.0.201
    redis_port=30213
    redis_db=6
    redis_password=None
    
    [LOCAL]
    
    [DEV]
    redis_host=10.0.0.201
    redis_port=30213
    redis_db=6
    redis_password=
    redis_channel=--deploy--
    
    [QA]
    
    [PROD]

    2.3 rds.py 基于redis模块包装的消息发布和订阅的类:

    #!/usr/bin/env python
    # encoding: utf-8
    """
       > FileName: rds.py
       > Author: FZH
       > Mail: fengzhihai@ilarge.cn
       > CreatedTime: 2020-03-22 16:49
    """
    import redis
    
    class RedisHelper(object):
        """
        host: redis ip
        port: redis port
        channel: 发送接送消息的频道
        """
        def __init__(self,
                     host,
                     port,
                     db,
                     channel,
                     password=None):
            self.host = host
            self.port = port
            self.db = db
            self.password = password
            self.channel = channel
            self.__conn = redis.Redis(self.host,
                                      self.port,
                                      self.db,
                                      self.password,
                                      decode_responses=True)
    
        def ping(self):
            try:
                self.__conn.ping()
                return True
            except Exception as e:
                print(e)
                return False
    
        # 发送消息
        def public(self,
                   msg):
            if self.ping():
                self.__conn.publish(self.channel,
                                    msg)
                return True
            else:
                return False
    
        # 订阅
        def subscribe(self):
            if self.ping():
                # 打开收音机
                pub = self.__conn.pubsub()
                # 调频道
                pub.subscribe(self.channel)
                # 准备接收
                pub.parse_response()
                return pub
            else:
                return False

    2.4 public_info.py 发送消息:

    #!/usr/bin/env python
    # encoding: utf-8
    """
       > FileName: public_info.py
       > Author: FZH
       > Mail: fengzhihai@ilarge.cn
       > CreatedTime: 2020-03-22 17:06
    """
    import json
    import configparser
    
    from rds import RedisHelper
    
    config = configparser.ConfigParser()
    config_file = "config.ini"
    config.read(config_file)
    sec = config["DEV"]
    
    redis_host = sec["redis_host"]
    redis_port = sec["redis_port"]
    redis_db = sec["redis_db"]
    redis_channel = sec["redis_channel"]
    redis_password = sec["redis_password"]
    if len(redis_password) == 0:
        redis_password = None
    
    def pub_info(app_name,
                 app_version,
                 app_owner,
                 app_desc,
                 deploy_res):
        info = {
            "app_name": app_name,
            "app_version": app_version,
            "app_owner": app_owner,
            "app_desc": app_desc,
            "deploy_res": deploy_res
        }
        obj = RedisHelper(host=redis_host,
                          port=redis_port,
                          db=redis_db,
                          channel=redis_channel,
                          password=redis_password)
        res = obj.public(msg=json.dumps(info))
        if res:
            print('public successful.')
        else:
            print('public fail.')
    
    if __name__ == '__main__':
        app_name = 'inf-demo'
        app_version = '1.1'
        app_owner = 'fengzhihai'
        app_desc = 'inf group demo application'
        deploy_res = 'SUCCESS'
        pub_info(app_name=app_name,
                 app_version=app_version,
                 app_owner=app_owner,
                 app_desc=app_desc,
                 deploy_res=deploy_res)

    2.5 subscribe_info.py 订阅消息:

    #!/usr/bin/env python
    # encoding: utf-8
    """
       > FileName: subscribe_info.py
       > Author: FZH
       > Mail: fengzhihai@ilarge.cn
       > CreatedTime: 2020-03-22 17:07
    """
    import time
    import configparser
    
    from rds import RedisHelper
    
    config = configparser.ConfigParser()
    config_file = "config.ini"
    config.read(config_file)
    sec = config["DEV"]
    
    redis_host = sec["redis_host"]
    redis_port = sec["redis_port"]
    redis_db = sec["redis_db"]
    redis_channel = sec["redis_channel"]
    redis_password = sec["redis_password"]
    if len(redis_password) == 0:
        redis_password = None
    
    def sub_info():
        obj = RedisHelper(host=redis_host,
                          port=redis_port,
                          db=redis_db,
                          channel=redis_channel,
                          password=redis_password)
        sub_obj = obj.subscribe()
        while True:
            if sub_obj:
                now_time = time.strftime('%Y-%m-%d %H:%M:%S',
                                         time.localtime(time.time()))
                msg = sub_obj.parse_response()
                print('receive:', now_time, msg)
            else:
                break
    
    if __name__ == '__main__':
        sub_info()

    三、测试:

    3.1 开启循环 监听redis channel消息:

    3.2 发送消息:

    python public_info.py

    修改app_name再次发送消息。

    3.3 监听结果:

    拿到发布结果后,我们就可以对结果进行分析再处理,在前端展示了。

  • 相关阅读:
    设计教训。
    爆牙齿的世界杯日记(小组末轮AB组)
    [维多利亚2 MOD] RecoverMingV(Vic2版复明) V1.1.3(201254更新),兼容AHD 2.31beta
    [文明5建筑MOD] Gunpowder Magazine and Firecracker Workshop (火药库与爆竹坊)。祝大家春节快乐!
    IVY Bridge : There's more than 22nm(IVB的新指令)
    ID3D11DeviceContext::Dispatch与numthread笔记
    如何在各个版本的VC及64位下使用CPUID指令
    [x86]SIMD指令集发展历程表(MMX、SSE、AVX等)
    C++AMP的tiled_index线程编号属性笔记
    x264编码参数大测试:09 trellis(crf26)
  • 原文地址:https://www.cnblogs.com/fengzhihai/p/12548462.html
Copyright © 2011-2022 走看看