一、添加依赖
<!-- RocketMQ --> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.6.0</version> </dependency>
2个核心接口,3个默认实现。
interface MQProducer
--- DefaultMQProducer
interface MQConsumer
--- DefaultMQPushConsumer
--- DefaultMQPullConsumer
DefaultMQProducer是MQProducer的唯一默认实现,其实现 MQProducer 接口的时候 还继承了 ClientConfig类 (客户端配置类),可以配置如 sendMsgTimeout超时时间,producerGroup 生产者组 最大消息容量和是否启用压缩等。
关键方法是 send(Message) 发送一个消息到MQ。
DefaultMQPushConsumer 包含很多可以配置的信息,最主要的有:
messageModel 消息模型 支持以下两种 1、集群消费 2、广播消费
messageListener 消息监听器
consumeThreadMin 消费线程池数量 默认10
consumeThreadMax 消费线程池数量 默认20
Consumer 对象注册一个 Listener 接口,一旦收到消息,Consumer 对象立刻回调 Listener 接口方法, MessageListenerOrderly有序的,MessageListenerConcurrently 无序的。
关键方法有registerMessageListener 注册监听器等。
二、生产者
import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.remoting.exception.RemotingException; /** * 生产者 */ public class RocketProducer { public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException { //设置生产者组名 DefaultMQProducer producer = new DefaultMQProducer("my-group-name-A"); //指定nameServer的地址, 多个地址用分号分隔 producer.setNamesrvAddr("localhost:9876"); //启动实例 producer.start(); Message message = new Message("topic-name-A","tag-name-A","Message : My blog address".getBytes()); producer.send(message); System.out.println("Message sent"); //关闭生产者,释放资源 producer.shutdown(); } }
三、消费者
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import java.util.List; /** * 消费者 */ public class RocketConsumer { public static void main(String[] args) throws MQClientException{ //设置消费者组名 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("my-group-name-A"); //设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费 consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); //NameServer地址, 多个地址用分号(;)分隔 consumer.setNamesrvAddr("localhost:9876"); //参数1:topic名字 参数2:tag名字 consumer.subscribe("topic-name-A", "tag-name-A"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage( List<MessageExt> msgs, ConsumeConcurrentlyContext context) { for (MessageExt msg : msgs) { System.out.println(new String(msg.getBody())); } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); //启动,会一直监听消息 consumer.start(); System.out.println("Consumer Started!"); //consumer.shutdown(); } }