消息生产者把消息发送到brokey,消费端进行消费,总共有两种方式,一个是主动推送,一个是主动拉取
1 push (主动推送): 消息发送到brokey,Brokey主动推送到Consumer进行消费.
优点:消息能够快速被消费.
缺点:如果消息过多,consumer端处理不过来,会造成消息的丢失和堵塞等问题
2:pull(主动拉取):消费端主动把brokey的消息拉取过来消费
优点:控制权在Consumer端,可以设置拉取时间
缺点:时间间隔不好控制,假设设置2秒拉一次,brokey没有消息,一直返回空,会造成连接的资源损耗,时间设置长点,不能及时的消费新的消息.
3:长轮训: consumer连接Brokey后如果没有消息不会立马返回,而是保留连接一段时间,默认时间是个15秒.15秒过后没有消息的话返回在次连接
优点:主动权的话在consumer,有大量的消息的话不会一次性的进行发送.
缺点:consumer要保持与brokey的连接,会占用一定的资源,轮训连接返回对应的consumer
pushConsumer 本质就是长轮训
在brokey端可以配置
longPollingEnable=true 默认是开启的
消息节后后自动处理offset和消息,如果有新的Consumer加入后会做一个负载均衡
pullConsumer 的话需要自己维护offset
public class PullConsumer {
private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();
public static void main(String[] args) throws MQClientException {
DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_group_name_5");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.start();
Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("broker-a");
for (MessageQueue mq : mqs) {
System.out.printf("Consume from the queue: %s%n", mq);
SINGLE_MQ:
while (true) {
try {
PullResult pullResult =
consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
System.out.printf("%s%n", pullResult);
putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
switch (pullResult.getPullStatus()) {
case FOUND:
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
break SINGLE_MQ;
case OFFSET_ILLEGAL:
break;
default:
break;
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
consumer.shutdown();
}
private static long getMessageQueueOffset(MessageQueue mq) {
Long offset = OFFSE_TABLE.get(mq);
if (offset != null)
return offset;
return 0;
}
private static void putMessageQueueOffset(MessageQueue mq, long offset) {
OFFSE_TABLE.put(mq, offset);
}