一:先进入rabbitmq的安装目录下的bin目录,执行wget -c http://localhost:15672/cli/rabbitmqadmin;(前提是plugin management已经开启),然后就会下载rabbitmqadmin脚本文件到bin里
二:chmod +x rabbitmqadmin,使得所有用户均能够执行此python脚本;
三:可通过./rabbitmqadmin --help查看有哪些命令,注意不能用sh或bash命令,因为它不是shell脚本;
补充:Routing key和binding key可以这么理解,它们本质上都是由Exchange执行,只不过存在两个子模块,一个是通过producer产生key的子模块,一个是通过key找到queue的子模块,然而这两个模块的调用
均通过Exchange,它先将producer信息作为参数调用第一个子模块产生key,然后将key作为参数调用第二个子模块得到queue,然后对queue执行publish/push操作;(理解有误但有参考价值)
四:用法
1.查看user:./rabbitmqadmin list users [name] [tags](后面的命令省略写./rabbitmqadmin但默认都有);(加上name表示只看name列)
2.查看vhosts:list vhosts(这个也一样可以选要显示哪些列,如这里有name和tracing列)
3.list connections
4.list exchanges
5.list bindings(可加source destination_type destination properties_key
)
6.list permissions
7.list channels
8.list parameters
9.list queues
10.list policies
11.list nodes(最好加上列否则太长了,如name type mem_used)
12.show overview(太长,可加rabbitmq_version cluster_name node queue_totals.messages object_totals.queues
)
13.delete queue name=hello
(删除一个queue)
14.delete user name=oth
15.delete exchange name=test
16.delete binding source='kk' destination_type=queue destination=test properties_key=test
17.purge queue name=test
用来清空队列
18.-f raw_json list users
可以格式化输出(其中raw_json可换成pretty_json或long,默认是table的格式化)
19.declare queue name=test durable=true
创建一个queue(此时还没给它配置binding之类的规则),durable是持久的意思;(创建一个queue后rabbitmq会自动为其创建一个binding,source是空字符串的那个,而destination和routing_key都是queue名)
20.publish routing_key=test payload="just for test"
发布一条消息(注意这是在创建的queue是test且是默认配置的情况下成功,如果有自定义配置则不一定能发布,这里用的无名的类型是direct的exchange)
这里还可以指定有哪个exchange来mapping,格式为加个:exchange=my.fanout
21.get queue=test requeue=true
消费一条消息(requequ=true设置为可重复消费)
22.declare exchange name=my.fanout type=fanout
创建自己的exchange
23.declare binding source=my.fanout destination=test routing_key=first创建一个binding(binding就可以理解为一个注册表项,只不过这里不是key-value结构而是多列结构)
补充,默认的Exchange(空字符串的那个)在调用api时是amq.default,amp是Adaptive Message Queue的意思;(15672端口也可以通过post和get生产和消费消息)
补充:查看binding后有source/destination/routing_key三个属性;其中routing_key就是生产者或消费者请求时会产生的key,暂且理解由一个子模块产生,然后source就是exchange,然后exchange根据自己的类型如模糊匹配key找到对应的destination值作为name的queue;然后对此queue操作;
在exchange的外面还有一层context,比如进行查找exchange和给exchange提供routing_key就是由它完成,暂且叫broker把。。
关于Exchange:
有:fanout/direct/topic/headers四种类型
1.fanout:进行最简单的广播,即每个queue都找一边看有没有??(应该是,而且直接忽略routing_key,此时可以指定destination,如果不指定则将找到的queue都publish此payload,虽然此情况下忽略routing_key但还是必须得指定就是这么规定的)
注意这个广播也是前提有对应source是fanout的exchange的binding存在,也就是这么说,publish时指定了exchange则先搜索exchange的表找出此exchange发现是fanout类型,然后此exchange忽略routing_key找binding表里source是此exchange的,发现有
则获取其destination,如果是queue则直接操作(有个destination_type),如果是exchange则再次搜索exchange表(有点像责任链模式),然后将routing_key作为参数调用此exchange的某个方法去继续搜索binding表,直到找出对应的queue并执行publish;
注:我之前的理解有误,routing_key是producer在publish请求时就会携带,还可以携带要由哪个exchange处理的信息(否则默认携带exchange名字是空字符串的那个exchange,type是direct对应匹配);
2.direct:根据整个routing_key进行对应一个queue
3.topic:可以使用点分routing_key模糊匹配(通配符有*和#,由于通配性质故可以组播,即对某一类匹配的queue进行操作,而fanout是对所有的queue都操作[都是以source对应的前提和没有指定destination])
topic里通过.号隔开标识符,标识符不能超过255;然后*匹配 一个 标识符(应该不能为0个),而#可以匹配 0个或多个标识符,如*.a只能匹配类似aa.a而不能匹配a.aa.a,但是#.a则可以匹配.a和a.a和aa.a.a等等;
在创建queue后其routing_key默认就是queue的名字,而其exchange就是系统的第一个无名的exchange是direct类型;
其它:
用户仅能对其所能访问的virtual hosts中的资源进行操作。这里的资源指的是virtual hosts中的exchanges、queues等(这两个具有name),操作包括对资源进行配置、写、读。配置权限可创建、删除、资源并修改资源的行为,写权限可向资源发送消息,读权限从资源获取消息。比如:
exchange和queue的declare与delete分别需要exchange和queue上的配置权限
exchange的bind与unbind需要exchange的读写权限
queue的bind与unbind需要queue写权限exchange的读权限
发消息(publish)需exchange的写权限
获取或清除(get、consume、purge)消息需queue的读权限
对何种资源具有配置、写、读的权限通过正则表达式来匹配,具体命令如下:set_permissions [-p <vhostpath>] <user> <conf> <write> <
read
>
其中,<conf> <write> <read>的位置分别用正则表达式来匹配特定的资源,如'^(amq.gen.*|amq.default)$'(要有三个,".*"其实就是正则表达式匹配任意字符串的pattern)可以匹配server生成的和默认的exchange,'^$'不匹配任何资源
需要注意的是RabbitMQ会缓存每个connection或channel的权限验证结果、因此权限发生变化后需要重连才能生效。
已知应用场景:(消息中间件的存在允许两个原本耦合的系统之间的处理不是立刻同步的,比如a调用b接口,b反馈是否执行成功,这就存在一个同步关系,而现在是a定时去取b系统执行后发布的执行状态,mes定时取wms的叫料处理状态[无需等待锁物料完成后给MES反馈]),在vhost里是可以有很多queue的;
WCS和WMS之间的指令下发和执行状态反馈:
但WMS下发给WCS指令时可由原先的http请求改为WMS将指令下发到A这个queue;WCS则订阅A这个queue;
WCS将指令执行到可反馈的步骤后将反馈消息下发到B这个queue,WMS则订阅B这个queue;
两个系统在订阅(一条消息只会被一个订阅者读取到,这个跟观察者模式略有不同)后得到消息后对消息进行解析得到其业务类型(故具体的消息最好由一个包装类进行业务包装),然后对真正的消息做业务操作;
n)或者说queue对象已经决定了业务类型,比如A queue就是用于m WMS给 n WCS系统发指令;(如果多个consumer订阅同一个queue或多个producer对同一个queue发布消息还得好好理理对应的业务场景)
n)多个producer和一个consumer业务场景:比如之前的MES叫料,多个生产单位(多个producer账号)可能同时出现叫料,然后这些producer都将叫料任务写到 JL queue,而由一个WMS系统(consumer)来按自己的处理能力去while consume这个JL queue;
每consume到一个叫料任务WMS系统自行决定要先处理完再再次consume还是放入线程池里处理立刻consume下一个;(注意如果需要一直对一个queue获取消息应该用consume而非get,consume是事先订阅了这个queue它们直接存在连接[有点像已经建立tcp连接],而get则类似每次都重新建立tcp连接,只不过在rabbitmq里是叫channel,订阅后channel已经指向了queue)
n)一个或多个producer对应多个consumer的业务场景:比如多个客户端下单(多个producer),然后调度系统将下单任务写到XD queue,而因为是分布式系统,处理下单任务的系统是部署有多个/多套的,每个订单任务处理系统代码都是一样的,它们都能够处理订单;
于是每个订单处理系统都订阅了XD queue,然后消息中间件将订单任务按照一定规律逐条分别给每个订单处理系统处理;(这也是为什么一条消息不会同时发送给多个订阅者,消费者也要做好生产者重复投递消息的情况)
n)如果要实现每个消费者都能消费同一个消息则要通过为每个消费者创建一个queue,然后消息到达后给这些queue都offer即可实现,可以由topic或者配置source/routing_key均相同但destination为这些queue;(topic可以只配置少年binding即可direct则要配置很多)
补充:.erlang.cookie其权限要设置为700,否则启动时报错;(经过多次测试rabbitmq-server貌似只能root启动[其它用户启动虽然没报错但是进程是没开起来的],而rabbitmqctl也要启动的用户来操作,综合考虑还是就用root启动和控制rabbitmq好了),可以sudo启动
问题找出来了,是因为启动rabbitmq需要对/var/lib/rabbitmq有mkdir的权限,可以在root下用chmod 777 -R /var/lib/rabbitmq(或者acl或者group权限均可,反正是递归的rwx权限),然后还有/var/log/rabbitmq的递归权限,之后就可以用其它有这几个目录递归rwx权限的用户启动了;
老版本错误总结:安装rabbitmq时自动安装到了/usr/lib/rabbitmq里,而启动rabbitmq-server需要拥有对rabbitmq的写权限,故启动rabbitmq-server的用户必须拥有对应权限,然而/usr/lib目录不允许将它的写权限给其它用户,因而产生其它用户无法启动的结果
,应该是lib有隐藏权限所以无法赋予其它用户某些权限(似乎又不是用了lsattr没看到隐藏权限),总结如果要能以其它用户且非sudo启动rabbitmq则最好手动安装到其它目录;(rabbitmq启动会做一个mkdir /usr/lib/rabbitmq的操作,因而判断没有此权限启动失败)
补充:rabbitmq里4369是服务相关的端口号,它起到一个端口映射的功能(erlang port mapper daemon,类似dns),即所有的请求都是通过此端口进来,然后rabbitmq会判断其命令类型,如是start,stop,status则会将这个命令转发到25672端口的子服务上,
而rabbitmq启动时是会缓存一个cookie,如果客户端执行如start,stop,status等ctl命令25672端口的子服务会判断对方的类似sessionid的东西是否和自己缓存的一样,不一样说明不是同一个账户在操作,因此通过rabbitmqctl操作是不需要user的;
而由于不能跨文件系统创建链接,故/root下的.erlang.cookie无法链接到/home/xxx下,因此root启动的rabbitmq-server是无法通过其它账户关闭的(但是有个小技巧就是将/root/.erlang.cookie cp到/home/silentdoer目录下就可以通过silentdoer来status了而无需sudo
,因此sudo执行的命令的~是/root;(rabbitmq-server还会检测remote的ip如果不是localhost则不允许某些操作)
15672端口则是可以通过用户名密码的方式管理,但是不包括start/stop之类的控制服务的权限;5672则是amqp端口,即生产者消费者是通过这个端口来实现生产和消费消息的;
sudo 无法cd到/root,根据这个特性实际上可以将一些只能root,sudo也不行的东西放到/root里改个特殊名能够有效防止;