zoukankan      html  css  js  c++  java
  • 使用curl操作rabbitmq的API接口创建federation

    create_rabbitmq_federation

    前言

    1.由于公司环境用到rabbitmq的federation,如果通过web UI创建,耗时耗力,而且还容易出错。所以希望通过rabbitmq的api来实施这一步骤
    2.查询官方文档的api,http api最适合完成这一项任务

    实验环境准备:

    1.linux主机一台,ip:10.0.1.66
    2.为了快速实验,安装 docker engine,拉取官方docker镜像:rabbitmq:3.6-alpine

    docker pull rabbitmq:3.6-alpine
    

    3.准备需要使用的rabbitmq的插件配置文件enabled_plugins

    mkdir -p /home/wu/rabbitmq-test/
    echo '[rabbitmq_federation,rabbitmq_federation_management,rabbitmq_management].' > /home/wu/rabbitmq-test/enabled_plugins
    #注:配置文件末尾有一个点
    

    4.启动rabbitmq

    1)启动一个rabbitmq,假如命名为external-mq

    docker run -d -h external-mq  --name external-mq -p 5672:5672 -p 15672:15672 -v /home/wu/rabbitmq-test/external:/var/lib/rabbitmq -v /home/wu/rabbitmq-test/enabled_plugins:/etc/rabbitmq/enabled_plugins rabbitmq:3.6-alpine
    

    2)启动一个rabbitmq,假如命名为internal-mq

    docker run -d -h internal-mq  --name internal-mq -p 5673:5672 -p 15673:15672 -v /home/wu/rabbitmq-test/internal:/var/lib/rabbitmq -v /home/wu/rabbitmq-test/enabled_plugins:/etc/rabbitmq/enabled_plugins rabbitmq:3.6-alpine
    

    5.配置rabbitmq的用户名密码
    1)使用guest:guest登陆
    2)在admin标签创建用户,设置密码,并设置权限(这里我创建了用户名user1,密码为password123)

    curl操作rabbitmq的http API

    1.参照官方文档:http://www.rabbitmq.com/federation.html
    由于官方文档的example,没有给出包含设置prefetch-count,reconnect-delay等参数的例子,我也没找到相关的数据格式,所以采用一个小技巧:
    公司环境里有已经通过web UI设置过federation-upstream的环境,所以get一下就可以拿到数据格式了

    curl -i -u 'admin_user:admin_passowrd' -H "content-type:application/json" -XGET http://10.0.1.100:15672/api/parameters/federation-upstream/%2f/internal-result-send
    

    2.在external-mq上创建federation-upstream:

    external-federation-upstream='{"value":{"uri":"amqp://user1:password123@10.0.1.66:5673","prefetch-count":20,"reconnect-delay":30,"ack-mode":"on-confirm","trust-user-id":false,"exchange":"internal-result-send","max-hops":1},"vhost":"/","component":"federation-upstream","name":"internal-result-send"}'
    
    curl -i -u 'user1:password123' -H "content-type:application/json" -XPUT --data ${external-federation-upstream} http://10.0.1.66:15672/api/parameters/federation-upstream/%2f/internal-result-send
    

    3.在external-mq上创建policy

    external_policies_body='{"pattern":"^external_result_receive","apply-to":"exchanges","definition":{"federation-upstream":"internal-result-send"},"priority":0}'
    curl -i -u 'user1:password123' -H "content-type:application/json" -XPUT --data ${external_policies_body} http://10.0.1.66:15672/api/policies/%2f/external_result_receive
    

    至此,就完成了单边的消息传递

    4.internal-mq上创建federation-upstream:

    internal-federation-upstream='{"value":{"uri":"amqp://user1:password123@10.0.1.66:5672","prefetch-count":20,"reconnect-delay":30,"ack-mode":"on-confirm","trust-user-id":false,"exchange":"external-search-send","max-hops":1},"vhost":"/","component":"federation-upstream","name":"external-search-send"}'
    curl -i -u 'user1:password123' -H "content-type:application/json" -XPUT --data ${internal-federation-upstream} http://10.0.1.66:15673/api/parameters/federation-upstream/%2f/external-search-send
    

    5.internal-mq上创建policy

    internal_policies_body='{"pattern":"^internal_search_receive","apply-to":"exchanges","definition":{"federation-upstream":"external-search-send"},"priority":0}'
    curl -i -u 'user1:password123' -H "content-type:application/json" -XPUT --data ${internal_policies_body} http://10.0.1.66:15673/api/policies/%2f/external-search-send
    

    处理password中有特殊字符的问题

    1.当设置的password有特殊字符时,执行curl命令后,rabbitmq并不能创建federation,而会报错。
    比如,用户名为user1,密码为password#123时,以上操作将无法成功执行;
    大致因为是:特殊字符,汉字等都不能被http的url处理,必须经过url百分号编码
    2.我的解决办法:利用python的urllib库编码

    cat url_percent_encode.py
    
    #!/usr/bin/env python
    #-*- coding:utf-8 -*-
    
    import urllib
    
    content = 'password#123'
    new_content = urllib.quote_plus(content)
    print(new_content)
    
    

    将执行python脚本后得到的字符串password%23123替换之前的密码字符串即:
    如:

    body='{"value":{"uri":"amqp://user1:password%23123@10.0.1.66:5672","prefetch-count":20,"reconnect-delay":30,"ack-mode":"on-confirm","trust-user-id":false,"exchange":"external-search-send","max-hops":1},"vhost":"/","component":"federation-upstream","name":"external-search-send"}'
    

    ---分割线---

    补:python脚本操作

    补充一个python操作rabbitmq http api的脚本

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    # @author: Wu
    # @date:18-8-6
    
    import urllib, requests, json, base64
    
    ex_info = {
        "username": "admin_user",
        "password": "admin#passwd",
        "ip": "10.0.1.66",
        "manage_port": "15672",
        "amqp_port": "5672",
        "exchange": "external_result_receive",
        "upstream": "internal-result-send",
    }
    
    in_info = {
        "username": "admin_user",
        "password": "admin#passwd",
        "ip": "10.0.1.66",
        "manage_port": "15673",
        "amqp_port": "5673",
        "exchange": "internal_search_receive",
        "upstream": "external-search-send"
    
    }
    
    
    def str_percent_encode(content):
        new_content = urllib.quote_plus(content)
        return new_content
    
    
    def http_put_federation(username, password, data, url):
        j_data = json.dumps(data)
        userInfo = "%s:%s" % (username, password)
        userInfo = base64.b64encode(userInfo.encode('utf-8'))
        auth = 'Basic ' + userInfo
        headers = {
            'Content-Type': 'application/json',
            'authorization': auth,
        }
        resp = requests.put(url, data=j_data, headers=headers)
        return resp
    
    
    api = "/api/parameters/federation-upstream/%2f/"
    ex_url = "http://" + ex_info.get("ip") + ":" + ex_info.get("manage_port") + api + ex_info.get("upstream")
    in_url = "http://" + in_info.get("ip") + ":" + in_info.get("manage_port") + api + in_info.get("upstream")
    
    in_mq = in_info.get("username") + ":" + str_percent_encode(in_info.get("password")) + "@" + in_info.get(
        "ip") + ":" + in_info.get("amqp_port")
    ex_mq = ex_info.get("username") + ":" + str_percent_encode(ex_info.get("password")) + "@" + ex_info.get(
        "ip") + ":" + ex_info.get("amqp_port")
    
    ex_data = {
        "value": {
            "uri": "amqp://" + in_mq,
            "prefetch-count": 20, "reconnect-delay": 30,
            "ack-mode": "on-confirm",
            "trust-user-id": False,
            "exchange": ex_info.get("upstream"),
            "max-hops": 1
        },
        "vhost": "/",
        "component": "federation-upstream",
        "name": ex_info.get("upstream")
    }
    
    in_data = {
        "value": {
            "uri": "amqp://" + ex_mq,
            "prefetch-count": 20, "reconnect-delay": 30,
            "ack-mode": "on-confirm",
            "trust-user-id": False,
            "exchange": in_info.get("upstream"),
            "max-hops": 1
        },
        "vhost": "/",
        "component": "federation-upstream",
        "name": in_info.get("upstream")
    }
    
    ex_po_data = {
        "pattern": "^" + ex_info.get("exchange"),
        "apply-to": "exchanges",
        "definition": {
            "federation-upstream": ex_info.get("upstream")
        },
        "priority": 0
    }
    
    in_po_data = {
        "pattern": "^" + in_info.get("exchange"),
        "apply-to": "exchanges",
        "definition": {
            "federation-upstream": in_info.get("upstream")
        },
        "priority": 0
    }
    
    po_api = "/api/policies/%2f/"
    ex_po_url = "http://" + ex_info.get("ip") + ":" + ex_info.get("manage_port") + po_api + ex_info.get("exchange")
    in_po_url = "http://" + in_info.get("ip") + ":" + in_info.get("manage_port") + po_api + in_info.get("exchange")
    
    resp = http_put_federation(ex_info.get("username"), ex_info.get("password"), ex_data, ex_url)
    print("external create federation:",resp)
    resp = http_put_federation(in_info.get("username"), in_info.get("password"), in_data, in_url)
    print("internal create federation:",resp)
    resp = http_put_federation(ex_info.get("username"), ex_info.get("password"), ex_po_data, ex_po_url)
    print("external create policies:",resp)
    resp = http_put_federation(in_info.get("username"), in_info.get("password"), in_po_data, in_po_url)
    print("external create policies:",resp)
        
     

    #################使用python3重写,并加入自动创建需要的exchange

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    
    # @author: Wu
    # @date:19-2-26
    
    import urllib, requests, json, base64
    
    
    class Federate:
        def __init__(self):
            self.ex_info = {
                "username": "admin",
                "password": "adminpassword",
                "ip": "10.0.1.60",
                "manage_port": "55788",
                "amqp_port": "5788",
                "exchange": "a_result_receive",
                "upstream": "b-result-send",
            }
            self.in_info = {
                "username": "admin",
                "password": "adminpassword",
                "ip": "10.0.1.60",
                "manage_port": "55789",
                "amqp_port": "5789",
                "exchange": "b_search_receive",
                "upstream": "a-search-send"
            }
            self.ex_args_list = []
            self.in_args_list = []
            self.ex_headers = None
            self.in_headers = None
    
        @staticmethod
        def str_percent_encode(content):
            new_content = urllib.parse.quote_plus(content)
            return new_content
    
        @staticmethod
        def request_put(kwargs):
            resp = requests.put(**kwargs)
            return resp
    
        def _init_url(self, kwargs):
            ip = kwargs.get("ip")
            username = kwargs.get("username")
            password = self.str_percent_encode(kwargs.get("password"))
            amqp_port = kwargs.get("amqp_port")
            manage_port = kwargs.get("manage_port")
            upstream = kwargs.get("upstream")
            exchange = kwargs.get("exchange")
            fed_api = "/api/parameters/federation-upstream/%2f/"
            exchange_api = "/api/exchanges/%2F/"
            po_api = "/api/policies/%2f/"
            fed_url = f"http://{ip}:{manage_port}{fed_api}{upstream}"
            mq_url = f"{username}:{password}@{ip}:{amqp_port}"
            exchange_url = f'http://{ip}:{manage_port}{exchange_api}{exchange}'
            po_url = f'http://{ip}:{manage_port}{po_api}{exchange}'
            return fed_url, mq_url, exchange_url, po_url
    
        @staticmethod
        def _init_fed_data(opposite_mq, upstream):
            fed_data = {
                "value": {
                    "uri": f"amqp://{opposite_mq}",
                    "prefetch-count": 300,
                    "reconnect-delay": 30,
                    "ack-mode": "on-confirm",
                    "trust-user-id": False,
                    "exchange": upstream,
                    "max-hops": 1
                },
                "vhost": "/",
                "component": "federation-upstream",
                "name": upstream
            }
            return json.dumps(fed_data)
    
        @staticmethod
        def _init_policy_data(kwargs):
            po_data = {
                "pattern": F'^{kwargs.get("exchange")}',
                "apply-to": "exchanges",
                "definition": {
                    "federation-upstream": kwargs.get("upstream")
                },
                "priority": 0
            }
            return json.dumps(po_data)
    
        @staticmethod
        def _init_headers(username, password):
            user_info = f"{username}:{password}"
            user_info = base64.b64encode(user_info.encode('utf-8'))
            auth = f'Basic {str(user_info,encoding="utf-8")}'
            headers = {
                'Content-Type': 'application/json',
                'authorization': auth,
            }
            return headers
    
        @staticmethod
        def _init_exchange_data(kwargs):
            exchange_data = {"vhost": "/",
                             "name": kwargs.get("exchange"),
                             "type": "topic",
                             "durable": "true",
                             "auto_delete": "false",
                             "internal": "false",
                             "arguments": {}}
            return json.dumps(exchange_data)
    
        def init_all_data(self):
            ex_fed_kwargs = {}
            ex_po_kwargs = {}
            in_fed_kwargs = {}
            in_po_kwargs = {}
            ex_exchange_kwargs = {}
            in_exchange_kwargs = {}
    
            ex_fed_kwargs["url"], ex_mq, ex_exchange_kwargs["url"], ex_po_kwargs["url"] = self._init_url(self.ex_info)
            in_fed_kwargs["url"], in_mq, in_exchange_kwargs["url"], in_po_kwargs["url"] = self._init_url(self.in_info)
    
            self.ex_headers = self._init_headers(self.ex_info.get("username"), self.ex_info.get("password"))
            self.in_headers = self._init_headers(self.in_info.get("username"), self.in_info.get("password"))
    
            ex_fed_kwargs["data"] = self._init_fed_data(in_mq, self.ex_info.get("upstream"))
            in_fed_kwargs["data"] = self._init_fed_data(ex_mq, self.in_info.get("upstream"))
    
            ex_po_kwargs["data"] = self._init_policy_data(self.ex_info)
            in_po_kwargs["data"] = self._init_policy_data(self.in_info)
    
            ex_exchange_kwargs["data"] = self._init_exchange_data(self.ex_info)
            in_exchange_kwargs["data"] = self._init_exchange_data(self.in_info)
    
            self.ex_args_list = [ex_fed_kwargs, ex_po_kwargs, ex_exchange_kwargs]
            self.in_args_list = [in_fed_kwargs, in_po_kwargs, in_exchange_kwargs]
    
    def main():
    fd = Federate()
    fd.init_all_data()
    for kwargs in fd.ex_args_list:
    kwargs["headers"] = fd.ex_headers
    res = fd.request_put(kwargs)
    if res.content:
    print(res.content)
    print(res)
    for kwargs in fd.in_args_list:
    kwargs["headers"] = fd.in_headers
    res = fd.request_put(kwargs)
    if res.content:
    print(res.content)
    print(res)
    if __name__ == "__main__": main()
  • 相关阅读:
    黑客常用端口利用总结
    10.Python之Ansible自动化运维常用模块
    正确启用HTTP/2支持,正确配置ssl_protocols和ssl_ciphers
    JVM总体架构
    什么是线程安全以及如何保证线程安全
    Jsp和Servlet的区别
    JQuery Ajax() serialize()方法提交Form表单数据
    SQL性能优化
    Java中的集合类及关系图
    什么是泛型、为什么要使用以及泛型擦除
  • 原文地址:https://www.cnblogs.com/rootid/p/9407297.html
Copyright © 2011-2022 走看看