create_rabbitmq_federation
前言
1.由于公司环境用到rabbitmq的federation,如果通过web UI创建,耗时耗力,而且还容易出错。所以希望通过rabbitmq的api来实施这一步骤
2.查询官方文档的api,http api最适合完成这一项任务
实验环境准备:
1.linux主机一台,ip:10.0.1.66
2.为了快速实验,安装 docker engine,拉取官方docker镜像:rabbitmq:3.6-alpine
docker pull rabbitmq:3.6-alpine
3.准备需要使用的rabbitmq的插件配置文件enabled_plugins
mkdir -p /home/wu/rabbitmq-test/
echo '[rabbitmq_federation,rabbitmq_federation_management,rabbitmq_management].' > /home/wu/rabbitmq-test/enabled_plugins
#注:配置文件末尾有一个点
4.启动rabbitmq
1)启动一个rabbitmq,假如命名为external-mq
docker run -d -h external-mq --name external-mq -p 5672:5672 -p 15672:15672 -v /home/wu/rabbitmq-test/external:/var/lib/rabbitmq -v /home/wu/rabbitmq-test/enabled_plugins:/etc/rabbitmq/enabled_plugins rabbitmq:3.6-alpine
2)启动一个rabbitmq,假如命名为internal-mq
docker run -d -h internal-mq --name internal-mq -p 5673:5672 -p 15673:15672 -v /home/wu/rabbitmq-test/internal:/var/lib/rabbitmq -v /home/wu/rabbitmq-test/enabled_plugins:/etc/rabbitmq/enabled_plugins rabbitmq:3.6-alpine
5.配置rabbitmq的用户名密码
1)使用guest:guest登陆
2)在admin标签创建用户,设置密码,并设置权限(这里我创建了用户名user1,密码为password123)
curl操作rabbitmq的http API
1.参照官方文档:http://www.rabbitmq.com/federation.html
由于官方文档的example,没有给出包含设置prefetch-count,reconnect-delay等参数的例子,我也没找到相关的数据格式,所以采用一个小技巧:
公司环境里有已经通过web UI设置过federation-upstream的环境,所以get一下就可以拿到数据格式了
curl -i -u 'admin_user:admin_passowrd' -H "content-type:application/json" -XGET http://10.0.1.100:15672/api/parameters/federation-upstream/%2f/internal-result-send
2.在external-mq上创建federation-upstream:
external-federation-upstream='{"value":{"uri":"amqp://user1:password123@10.0.1.66:5673","prefetch-count":20,"reconnect-delay":30,"ack-mode":"on-confirm","trust-user-id":false,"exchange":"internal-result-send","max-hops":1},"vhost":"/","component":"federation-upstream","name":"internal-result-send"}'
curl -i -u 'user1:password123' -H "content-type:application/json" -XPUT --data ${external-federation-upstream} http://10.0.1.66:15672/api/parameters/federation-upstream/%2f/internal-result-send
3.在external-mq上创建policy
external_policies_body='{"pattern":"^external_result_receive","apply-to":"exchanges","definition":{"federation-upstream":"internal-result-send"},"priority":0}'
curl -i -u 'user1:password123' -H "content-type:application/json" -XPUT --data ${external_policies_body} http://10.0.1.66:15672/api/policies/%2f/external_result_receive
至此,就完成了单边的消息传递
4.internal-mq上创建federation-upstream:
internal-federation-upstream='{"value":{"uri":"amqp://user1:password123@10.0.1.66:5672","prefetch-count":20,"reconnect-delay":30,"ack-mode":"on-confirm","trust-user-id":false,"exchange":"external-search-send","max-hops":1},"vhost":"/","component":"federation-upstream","name":"external-search-send"}'
curl -i -u 'user1:password123' -H "content-type:application/json" -XPUT --data ${internal-federation-upstream} http://10.0.1.66:15673/api/parameters/federation-upstream/%2f/external-search-send
5.internal-mq上创建policy
internal_policies_body='{"pattern":"^internal_search_receive","apply-to":"exchanges","definition":{"federation-upstream":"external-search-send"},"priority":0}'
curl -i -u 'user1:password123' -H "content-type:application/json" -XPUT --data ${internal_policies_body} http://10.0.1.66:15673/api/policies/%2f/external-search-send
处理password中有特殊字符的问题
1.当设置的password有特殊字符时,执行curl命令后,rabbitmq并不能创建federation,而会报错。
比如,用户名为user1,密码为password#123时,以上操作将无法成功执行;
大致因为是:特殊字符,汉字等都不能被http的url处理,必须经过url百分号编码
2.我的解决办法:利用python的urllib库编码
cat url_percent_encode.py
#!/usr/bin/env python
#-*- coding:utf-8 -*-
import urllib
content = 'password#123'
new_content = urllib.quote_plus(content)
print(new_content)
将执行python脚本后得到的字符串password%23123替换之前的密码字符串即:
如:
body='{"value":{"uri":"amqp://user1:password%23123@10.0.1.66:5672","prefetch-count":20,"reconnect-delay":30,"ack-mode":"on-confirm","trust-user-id":false,"exchange":"external-search-send","max-hops":1},"vhost":"/","component":"federation-upstream","name":"external-search-send"}'
---分割线---
补:python脚本操作
补充一个python操作rabbitmq http api的脚本
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @author: Wu
# @date:18-8-6
import urllib, requests, json, base64
ex_info = {
"username": "admin_user",
"password": "admin#passwd",
"ip": "10.0.1.66",
"manage_port": "15672",
"amqp_port": "5672",
"exchange": "external_result_receive",
"upstream": "internal-result-send",
}
in_info = {
"username": "admin_user",
"password": "admin#passwd",
"ip": "10.0.1.66",
"manage_port": "15673",
"amqp_port": "5673",
"exchange": "internal_search_receive",
"upstream": "external-search-send"
}
def str_percent_encode(content):
new_content = urllib.quote_plus(content)
return new_content
def http_put_federation(username, password, data, url):
j_data = json.dumps(data)
userInfo = "%s:%s" % (username, password)
userInfo = base64.b64encode(userInfo.encode('utf-8'))
auth = 'Basic ' + userInfo
headers = {
'Content-Type': 'application/json',
'authorization': auth,
}
resp = requests.put(url, data=j_data, headers=headers)
return resp
api = "/api/parameters/federation-upstream/%2f/"
ex_url = "http://" + ex_info.get("ip") + ":" + ex_info.get("manage_port") + api + ex_info.get("upstream")
in_url = "http://" + in_info.get("ip") + ":" + in_info.get("manage_port") + api + in_info.get("upstream")
in_mq = in_info.get("username") + ":" + str_percent_encode(in_info.get("password")) + "@" + in_info.get(
"ip") + ":" + in_info.get("amqp_port")
ex_mq = ex_info.get("username") + ":" + str_percent_encode(ex_info.get("password")) + "@" + ex_info.get(
"ip") + ":" + ex_info.get("amqp_port")
ex_data = {
"value": {
"uri": "amqp://" + in_mq,
"prefetch-count": 20, "reconnect-delay": 30,
"ack-mode": "on-confirm",
"trust-user-id": False,
"exchange": ex_info.get("upstream"),
"max-hops": 1
},
"vhost": "/",
"component": "federation-upstream",
"name": ex_info.get("upstream")
}
in_data = {
"value": {
"uri": "amqp://" + ex_mq,
"prefetch-count": 20, "reconnect-delay": 30,
"ack-mode": "on-confirm",
"trust-user-id": False,
"exchange": in_info.get("upstream"),
"max-hops": 1
},
"vhost": "/",
"component": "federation-upstream",
"name": in_info.get("upstream")
}
ex_po_data = {
"pattern": "^" + ex_info.get("exchange"),
"apply-to": "exchanges",
"definition": {
"federation-upstream": ex_info.get("upstream")
},
"priority": 0
}
in_po_data = {
"pattern": "^" + in_info.get("exchange"),
"apply-to": "exchanges",
"definition": {
"federation-upstream": in_info.get("upstream")
},
"priority": 0
}
po_api = "/api/policies/%2f/"
ex_po_url = "http://" + ex_info.get("ip") + ":" + ex_info.get("manage_port") + po_api + ex_info.get("exchange")
in_po_url = "http://" + in_info.get("ip") + ":" + in_info.get("manage_port") + po_api + in_info.get("exchange")
resp = http_put_federation(ex_info.get("username"), ex_info.get("password"), ex_data, ex_url)
print("external create federation:",resp)
resp = http_put_federation(in_info.get("username"), in_info.get("password"), in_data, in_url)
print("internal create federation:",resp)
resp = http_put_federation(ex_info.get("username"), ex_info.get("password"), ex_po_data, ex_po_url)
print("external create policies:",resp)
resp = http_put_federation(in_info.get("username"), in_info.get("password"), in_po_data, in_po_url)
print("external create policies:",resp)
#################使用python3重写,并加入自动创建需要的exchange
#!/usr/bin/env python # -*- coding: utf-8 -*- # @author: Wu # @date:19-2-26 import urllib, requests, json, base64 class Federate: def __init__(self): self.ex_info = { "username": "admin", "password": "adminpassword", "ip": "10.0.1.60", "manage_port": "55788", "amqp_port": "5788", "exchange": "a_result_receive", "upstream": "b-result-send", } self.in_info = { "username": "admin", "password": "adminpassword", "ip": "10.0.1.60", "manage_port": "55789", "amqp_port": "5789", "exchange": "b_search_receive", "upstream": "a-search-send" } self.ex_args_list = [] self.in_args_list = [] self.ex_headers = None self.in_headers = None @staticmethod def str_percent_encode(content): new_content = urllib.parse.quote_plus(content) return new_content @staticmethod def request_put(kwargs): resp = requests.put(**kwargs) return resp def _init_url(self, kwargs): ip = kwargs.get("ip") username = kwargs.get("username") password = self.str_percent_encode(kwargs.get("password")) amqp_port = kwargs.get("amqp_port") manage_port = kwargs.get("manage_port") upstream = kwargs.get("upstream") exchange = kwargs.get("exchange") fed_api = "/api/parameters/federation-upstream/%2f/" exchange_api = "/api/exchanges/%2F/" po_api = "/api/policies/%2f/" fed_url = f"http://{ip}:{manage_port}{fed_api}{upstream}" mq_url = f"{username}:{password}@{ip}:{amqp_port}" exchange_url = f'http://{ip}:{manage_port}{exchange_api}{exchange}' po_url = f'http://{ip}:{manage_port}{po_api}{exchange}' return fed_url, mq_url, exchange_url, po_url @staticmethod def _init_fed_data(opposite_mq, upstream): fed_data = { "value": { "uri": f"amqp://{opposite_mq}", "prefetch-count": 300, "reconnect-delay": 30, "ack-mode": "on-confirm", "trust-user-id": False, "exchange": upstream, "max-hops": 1 }, "vhost": "/", "component": "federation-upstream", "name": upstream } return json.dumps(fed_data) @staticmethod def _init_policy_data(kwargs): po_data = { "pattern": F'^{kwargs.get("exchange")}', "apply-to": "exchanges", "definition": { "federation-upstream": kwargs.get("upstream") }, "priority": 0 } return json.dumps(po_data) @staticmethod def _init_headers(username, password): user_info = f"{username}:{password}" user_info = base64.b64encode(user_info.encode('utf-8')) auth = f'Basic {str(user_info,encoding="utf-8")}' headers = { 'Content-Type': 'application/json', 'authorization': auth, } return headers @staticmethod def _init_exchange_data(kwargs): exchange_data = {"vhost": "/", "name": kwargs.get("exchange"), "type": "topic", "durable": "true", "auto_delete": "false", "internal": "false", "arguments": {}} return json.dumps(exchange_data) def init_all_data(self): ex_fed_kwargs = {} ex_po_kwargs = {} in_fed_kwargs = {} in_po_kwargs = {} ex_exchange_kwargs = {} in_exchange_kwargs = {} ex_fed_kwargs["url"], ex_mq, ex_exchange_kwargs["url"], ex_po_kwargs["url"] = self._init_url(self.ex_info) in_fed_kwargs["url"], in_mq, in_exchange_kwargs["url"], in_po_kwargs["url"] = self._init_url(self.in_info) self.ex_headers = self._init_headers(self.ex_info.get("username"), self.ex_info.get("password")) self.in_headers = self._init_headers(self.in_info.get("username"), self.in_info.get("password")) ex_fed_kwargs["data"] = self._init_fed_data(in_mq, self.ex_info.get("upstream")) in_fed_kwargs["data"] = self._init_fed_data(ex_mq, self.in_info.get("upstream")) ex_po_kwargs["data"] = self._init_policy_data(self.ex_info) in_po_kwargs["data"] = self._init_policy_data(self.in_info) ex_exchange_kwargs["data"] = self._init_exchange_data(self.ex_info) in_exchange_kwargs["data"] = self._init_exchange_data(self.in_info) self.ex_args_list = [ex_fed_kwargs, ex_po_kwargs, ex_exchange_kwargs] self.in_args_list = [in_fed_kwargs, in_po_kwargs, in_exchange_kwargs]
def main():
fd = Federate()
fd.init_all_data()
for kwargs in fd.ex_args_list:
kwargs["headers"] = fd.ex_headers
res = fd.request_put(kwargs)
if res.content:
print(res.content)
print(res)
for kwargs in fd.in_args_list:
kwargs["headers"] = fd.in_headers
res = fd.request_put(kwargs)
if res.content:
print(res.content)
print(res)
if __name__ == "__main__": main()