1.安装rabbitmq
2.入门程序
一 安装rabbitmq
下载地址:Installing on Windows — Rabbit
安装完本地访问:http://localhost:15672/
登录:guest guest
二 入门程序
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.ligy</groupId> <artifactId>studymaven1</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.0.3</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-simple</artifactId> <version>1.7.25</version> <scope>compile</scope> </dependency> </dependencies> </project>
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.util.Date; public class Main { private final static String NAME = "school2"; public static void main(String[] args) throws Exception { ConnectionFactory cf = new ConnectionFactory(); cf.setHost("127.0.0.1"); cf.setPort(5672); cf.setUsername("guest"); cf.setPassword("guest"); cf.setVirtualHost("/"); for (int i = 0; i < 10; i++) { Connection c = cf.newConnection(); Channel channel = c.createChannel(); String msg = "你好,世界" + i; channel.queueDeclare(NAME, true, false, false, null); channel.basicPublish("", NAME, null, msg.getBytes()); System.out.println("发送了消息:" + msg); c.close(); Thread.sleep(2000); } } }
import com.rabbitmq.client.*; import java.io.IOException; public class Main { private final static String QUEUE_NAME = "school"; public static void main(String[] args) throws Exception { /* 建立连接 */ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost");// MQ的IP factory.setPort(5672);// MQ端口 factory.setUsername("guest");// MQ用户名 factory.setPassword("guest");// MQ密码 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); /* 声明要连接的队列 */ channel.queueDeclare(QUEUE_NAME, true, false, false, null); System.out.println("等待消息产生:"); DefaultConsumer defaultConsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { super.handleDelivery(consumerTag, envelope, properties, body); //获取交换机 String exchange = envelope.getExchange(); //消息id,用来表示那个消息消费了 long deliveryTag = envelope.getDeliveryTag(); String message=new String(body,"utf-8"); System.out.println("receive" + message); } }; channel.basicConsume(QUEUE_NAME,true ,defaultConsumer); // /* 创建消费者对象,用于读取消息 */ // QueueingConsumer consumer = new QueueingConsumer(channel); // channel.basicConsume(QUEUE_NAME, true, consumer); // int i=1; // /* 读取队列,并且阻塞,即在读到消息之前在这里阻塞,直到等到消息,完成消息的阅读后,继续阻塞循环 */ // while (true) { // QueueingConsumer.Delivery delivery = consumer.nextDelivery(); // String message = new String(delivery.getBody()); // System.out.println("第"+i+"个消息!"); // System.out.println("收到消息'" + message + "'"); // i++; //// Thread.sleep(1000*5); // } } }