生产者(指定队列推送,或者默认创建主题时就创建一个队列):
在发送消息时,自动创建服务器不存在的topic,默认创建的队列数 默认值 4改成1即可,已有的主题只能指定了。
package com.apacherocketmq.test; import java.io.UnsupportedEncodingException; import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.client.producer.MessageQueueSelector; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.remoting.common.RemotingHelper; import org.apache.rocketmq.remoting.exception.RemotingException; import com.apacherocketmq.connection.RocketProducer; public class BroadcastProducer { private static int i = 0; private static int index = 0; private static long count =1L; @SuppressWarnings("unused") public static void main(String[] args) throws UnsupportedEncodingException, MQClientException, RemotingException, MQBrokerException, InterruptedException { DefaultMQProducer producer = RocketProducer.newInstance(); producer.setProducerGroup("GroupE"); producer.start(); while (true) { if (i == 0) { i = 1; index++; } else if (i == 1) { i = 0; } Message message = new Message("GroupETopicA", "TagE", "e", (" " + time() + " " + i + " " + index).getBytes(RemotingHelper.DEFAULT_CHARSET)); SendResult sendResult = producer.send(message, new MessageQueueSelector() { public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { //0号队列 return mqs.get(0); } }, "ss"); // Thread.sleep(2000); System.out.println(count++); } // producer.shutdown(); } private static String time() { SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); return df.format(new Date()); } }
消费者(从指定队列取,其他队列的抛弃,若主题只有一个队列则无所谓):
package com.apacherocketmq.test; import java.sql.ResultSet; import java.sql.SQLException; import java.util.List; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.consumer.ConsumeFromWhere; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.protocol.heartbeat.MessageModel; import com.apacherocketmq.connection.RocketMQPushConsumer; import com.jdbc.MyJDBCUtil; public class BroadcastConsumerTEST { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = RocketMQPushConsumer.newInstance(); // 消息模型,支持以下两种:集群消费(clustering),广播消费(broadcasting) consumer.setMessageModel(MessageModel.BROADCASTING); consumer.setConsumerGroup("GroupE"); consumer.subscribe("GroupETopicA", "*"); // consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET); // consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_TIMESTAMP); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); // 从指定时间消费 // consumer.setConsumeTimestamp(UtilAll.timeMillisToHumanString3(System.currentTimeMillis() // - (1000 * 60 * 30))); consumer.setConsumeThreadMax(1); consumer.setConsumeThreadMin(1); consumer.registerMessageListener(new MessageListenerConcurrently() { private boolean isFirst = true; private long i = 0L; public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { if (isFirst) { i = query("JYLQ"); isFirst = false; } long offset = msgs.get(0).getQueueOffset(); int s = msgs.get(0).getQueueId(); if (s != 0) { //抛弃其他队列数据 System.err.println("QueueId NOT 0"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } if (offset > i) { update(offset); String ss = offset + new String(msgs.get(0).getBody()) + " N "; System.out.println(ss); } else { String t = offset + new String(msgs.get(0).getBody()) + " O "; System.err.println(t); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } private long query(String param) { long i = 0; String SQL = "SELECT CONFIG_VALUE FROM common_sys_config WHERE CONFIG_ITEM=?"; ResultSet result = MyJDBCUtil.query(SQL, param); try { result.next(); i = result.getLong("CONFIG_VALUE"); } catch (SQLException e) { e.printStackTrace(); } return i; } private void update(long param) { String SQL1 = "UPDATE common_sys_config SET CONFIG_VALUE=? WHERE CONFIG_ITEM='JYLQ'"; MyJDBCUtil.execute(SQL1, String.valueOf(param)); } }); consumer.start(); System.out.println("Broadcast ConsumerA Started."); } public static void waitFor(long i) { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } }