了解Jenkins的人都知道,JMS selector是基于SQL92语法实现的,本文将介绍使用stomp.py和ActiveMQ来校验JMS selector的正确性。
Q: 什么是stomp.py?
A: stomp.py是一个基于STOMP(Simple (or Streaming) Text Orientated Messaging Protocol)协议实现的Python的客户端函数库,用来访问诸如ActiveMQ,RabbitMQ之类的消息服务器。
那么,为了使用stomp.py函数库,我们先安装一个ActiveMQ服务。
1. 下载ActiveMQ
$ export TGZBALL=5.15.2/apache-activemq-5.15.2-bin.tar.gz $ wget https://archive.apache.org/dist/activemq/$TGZBALL
2. 安装ActiveMQ
$ tar zxvf apache-activemq-5.15.2-bin.tar.gz $ sudo mv apache-activemq-5.15.2 /opt $ cd /opt $ sudo ln -s apache-activemq-5.15.2 activemq
3. 启动ActiveMQ
$ cd /opt/activemq/bin/linux-x86-64 $ sudo ./activemq start Starting ActiveMQ Broker...
注意ActiveMQ默认的监听端口是8161,
$ netstat -an | egrep 8161 tcp6 0 0 :::8161 :::* LISTEN
如果要关闭ActiveMQ,
$ cd /opt/activemq/bin/linux-x86-64 $ sudo ./activemq stop Stopping ActiveMQ Broker... Stopped ActiveMQ Broker.
一旦ActiveMQ, 我们可以访问其web界面,默认的登录用户名/密码是admin/admin, 例如:
4. 克隆stomp.py的源代码
$ mkdir /var/tmp/sp $ cd /var/tmp/sp $ git clone https://github.com/jasonrbriggs/stomp.py.git
5. 准备一个Python脚本foo.py
1 #!/usr/bin/python3 2 import sys 3 import json 4 import collections 5 import time 6 import stomp 7 8 9 g_rcvmsg_cnt = 0 # counter of received messages 10 11 12 class MyListener(stomp.ConnectionListener): 13 def on_error(self, headers, message): 14 global g_rcvmsg_cnt 15 g_rcvmsg_cnt = 0 16 17 print('Oops!! received an error "%s"' % message) 18 19 def on_message(self, headers, message): 20 global g_rcvmsg_cnt 21 g_rcvmsg_cnt += 1 22 23 print('Bingo! received a message, rcv_cnt = %d' % g_rcvmsg_cnt) 24 print('o message headers : %s' % json.dumps(headers, indent=4)) 25 print('o message body : %s ' % message) 26 27 28 def validate(): 29 if g_rcvmsg_cnt > 0: 30 print(">>> PASS") 31 return 0 32 else: 33 print(">>> FAIL") 34 return 1 35 36 37 def usage(argv0): 38 sys.stderr.write("Usage: %s <JMS selector> <message headers> " % argv0) 39 40 41 def main(argc, argv): 42 if argc != 3: 43 usage(argv[0]) 44 return 1 45 46 jms_selector = argv[1] 47 msg_headers = argv[2] 48 49 d_sub_headers = dict() 50 d_sub_headers['selector'] = jms_selector 51 print('>>> SUB HEADERS', d_sub_headers) 52 53 d_snd_headers = json.loads(msg_headers, 54 object_pairs_hook=collections.OrderedDict) 55 print('>>> SND HEADERS', d_snd_headers) 56 print() 57 58 conn = stomp.Connection() 59 conn.set_listener('', MyListener()) 60 conn.start() 61 62 # Note default user/password of ActiveMQ is admin/admin 63 user = 'admin' 64 password = 'admin' 65 conn.connect(user, password, wait=True) 66 67 conn.subscribe(destination='/queue/test', id=1, ack='auto', 68 headers=d_sub_headers) 69 70 s_body = ' '.join(argv[1:]) 71 conn.send(destination='/queue/test', body=s_body, headers=d_snd_headers) 72 73 time.sleep(2) 74 conn.disconnect() 75 76 return validate() 77 78 79 if __name__ == '__main__': 80 sys.exit(main(len(sys.argv), sys.argv))
在上面的代码中,
67 conn.subscribe(destination='/queue/test', id=1, ack='auto', 68 headers=d_sub_headers)
L67-68: 根据用户输入的JMS selector进行订阅,
71 conn.send(destination='/queue/test', body=s_body, headers=d_snd_headers)
L71: 将用户输入的JSON文本作为消息头发送。
6. 使用foo.py测试JMS selector
hdan$ export PYTHONPATH=/var/tmp/sp/stomp.py:$PYTHONPATH hdan$ hdan$ ./foo.py "Type LIKE 'Tag%'" '{"Type": "tag", "foo": 123}' >>> SUB HEADERS {'selector': "Type LIKE 'Tag%'"} >>> SND HEADERS OrderedDict([('Type', 'tag'), ('foo', 123)]) >>> FAIL hdan$ hdan$ ./foo.py "Type LIKE 'Tag%'" '{"Type": "Tag", "foo": 123}' >>> SUB HEADERS {'selector': "Type LIKE 'Tag%'"} >>> SND HEADERS OrderedDict([('Type', 'Tag'), ('foo', 123)]) Bingo! received a message, rcv_cnt = 1 o message headers : { "subscription": "1", "content-length": "44", "timestamp": "1550744115995", "destination": "/queue/test", "message-id": "ID:huangdan-40431-1550639739706-3:131:-1:1:1", "foo": "123", "Type": "Tag", "expires": "0", "priority": "4" } o message body : Type LIKE 'Tag%' {"Type": "Tag", "foo": 123} >>> PASS hdan$ hdan$ ./foo.py "Type LIKE 'Tag%'" '{"Type": "Tag123", "foo": 123}' >>> SUB HEADERS {'selector': "Type LIKE 'Tag%'"} >>> SND HEADERS OrderedDict([('Type', 'Tag123'), ('foo', 123)]) Bingo! received a message, rcv_cnt = 1 o message headers : { "message-id": "ID:huangdan-40431-1550639739706-3:132:-1:1:1", "destination": "/queue/test", "timestamp": "1550744132347", "expires": "0", "priority": "4", "subscription": "1", "content-length": "47", "foo": "123", "Type": "Tag123" } o message body : Type LIKE 'Tag%' {"Type": "Tag123", "foo": 123} >>> PASS hdan$
附录: SQL92的LIKE操作符 (来源戳这里)
A like string consists of a set of characters where the percent (%),
underscore (_) and escape character, such as backslash () are
treated differently.
* The percent (%) matches any number of characters including
zero characters
* The underscore (_) matches only one character
e.g.
+-------------+---------------------------------------+
| Like string | Meaning |
+-------------+---------------------------------------+
| Topic% | All strings starting with Topic |
| %/abc/% | Any string containing /abc/ |
| Name_ | Any string starting with Name and |
| | having exactly one more character |
| Name\_2 | Only the string Name_2 |
+-------------+---------------------------------------+
参考资料: