Java发送我们定义的data队列消息到rabbitmq管理界面消息队列queue里:
public class Producer1 {
public final static String QUEUE_NAME="data";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException{
long startTime=System.currentTimeMillis(); //获取开始时间
//数据准备
List<testObj> list = MockData.mockData();
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置RabbitMQ相关信息
factory.setHost("192.168.43.211");
factory.setUsername("test");
factory.setPassword("123456");
//factory.setPort(5672);
//创建一个新的连接
Connection connection = factory.newConnection();
//创建一个通道
Channel channel = connection.createChannel();
//声明一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
List<testObj> list1 = new ArrayList<testObj>();
list1 = new MockData().mockData();
//发送消息到队列中
ObjectMapper mapper=new ObjectMapper();
String message = mapper.writeValueAsString(list1);
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
//关闭通道和连接
channel.close();
connection.close();
long endTime=System.currentTimeMillis(); //获取结束时间
System.out.println("程序运行时间: "+(endTime-startTime)/1000.0+"s");
}
}
Python顺利拿到Java数据:
if __name__ == '__main__':
starttime = datetime.datetime.now()
# 创建socket链接
credentials = pika.PlainCredentials('test', '123456')
connection = pika.BlockingConnection(pika.ConnectionParameters(
'192.168.43.211', 5672, '/', credentials))
# 创建管道
channel = connection.channel()
# 创建队列
queue_name = 'data'
channel.queue_declare(queue_name)
# 如果接受到消息就调用回调函数,准备接受消息
# 声明回调函数
def callback(ch, method, properties, body):
message = json.loads(body.decode())
endtime = datetime.datetime.now()
print("拿数据时间:{}".format(endtime - starttime))
list = message
for i in list:
print(i)
channel.basic_consume(callback, queue=queue_name, no_ack=False)
channel.start_consuming()