客户端主动获取消息
channel.basic_get:同步获取消息,性能比basic_consume低。
import pika import logging import sys import traceback import time as timer from time import time from pika.spec import Basic mylog = logging.getLogger('pika') mylog.setLevel(logging.ERROR) ch = logging.StreamHandler() ch.setLevel(logging.ERROR) mylog.addHandler(ch) start = 0 end = 0 connection = None channelg = None def on_open(connection): connection.channel(on_open_callback=on_channel_open) def on_message(unused_channel, basic_deliver, properties, body): print("body>>>", body.decode("utf-8")) timer.sleep(0.1) unused_channel.basic_ack(delivery_tag=basic_deliver.delivery_tag) def get_message(): try: channelg.basic_get(callback=on_message, queue='hello-01') # on_next_message() except Exception: print(traceback.format_exc(), ">>>") # def on_next_message(): # connection.add_timeout(0.001, get_message) def callback(basic_deliver): print(basic_deliver.method) if isinstance(basic_deliver.method, Basic.Ack): return print('empty', basic_deliver) sys.exit() def on_channel_open(channel): global channelg channelg = channel channelg.add_callback(callback, replies=(Basic.GetEmpty,), one_shot=True) global start start = time() get_message() credentials = pika.PlainCredentials("wjq", "1234") parameters = pika.ConnectionParameters(host='192.168.139.128', credentials=credentials) connection = pika.SelectConnection( parameters=parameters, on_open_callback=on_open) try: connection.ioloop.start() except KeyboardInterrupt: connection.close() connection.ioloop.start()
参考地址: