以Java语言,MQ客户端为amqp-client作为示例
1、基本原则
direct模式,由生产者声明队列名,消费者也声明队列名
topic模式,由生产者声明交换器名,由消费者声明队列名+交换器名+绑定关系
即生产者只负责生产消息,至于消息要投递到哪里由消费者指定
2、队列、交换器、消息的持久化配置
队列声明持久化
public void queueDeclare(String queue) { try { if (conn == null) { conn = connectionFactory.newConnection(); } Channel channel = conn.createChannel(); // 声明队列,如果队列不存在则创建之 boolean durable = true; boolean exclusive = false; boolean autoDelete = false; Map<String, Object> arguments = null; channel.queueDeclare(queue, durable, exclusive, autoDelete, arguments); channel.close(); } catch (IOException e) { logger.error("IOException:", e); } catch (TimeoutException e) { logger.error("TimeoutException:", e); } }
交换器声明持久化
// 声明topic交换器 public void topicExchangeDeclare(String exchange) { String type = "topic"; boolean durable = true; exchangeDeclare(exchange, type, durable); } private void exchangeDeclare(String exchange, String type, boolean durable) { try { if (conn == null) { conn = connectionFactory.newConnection(); } Channel channel = conn.createChannel(); // 声明交换器 channel.exchangeDeclare(exchange, type, durable); channel.close(); } catch (IOException e) { logger.error("IOException:", e); } catch (TimeoutException e) { logger.error("TimeoutException:", e); } }
消息发送时指定持久化
// 发送消息 public void send(String exchange, String routingKey, JSONObject json) { try { if (conn == null) { conn = connectionFactory.newConnection(); } Channel channel = conn.createChannel(); String msg = json.toJSONString(); channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes("utf-8")); channel.close(); } catch (IOException e) { logger.error("IOException:", e); } catch (TimeoutException e) { logger.error("TimeoutException:", e); } }
3、网络闪断、RabbitMQ重启时App的自恢复编码
首先,必须已经指定了队列和交换器的持久化,否则在自恢复时,由于无法找到队列及交换器和绑定关系会报错
需要注意的是,RabbitMQ推荐尽量共用Connection,多个线程之间用不同的Channel
<bean id="connectionFactory" class="com.rabbitmq.client.ConnectionFactory">
<property name="automaticRecoveryEnabled" value="true"></property>
<property name="host" value="${RABBITMQ.SERVER_IP}"></property>
<property name="port" value="${RABBITMQ.SERVER_PORT}"></property>
<property name="username" value="${RABBITMQ.USERNAME}"></property>
<property name="password" value="${RABBITMQ.PASSWORD}"></property>
<property name="virtualHost" value="${RABBITMQ.VIRTUAL_HOST}"></property>
</bean>
设置automaticRecoveryEnabled为true
public class MQConsumer implements Runnable, Consumer { static Logger logger = LoggerFactory.getLogger(MQConsumer.class); protected Connection connection; protected Channel channel; protected String queue; protected ConsumerExecutor executor;// 执行器 private MQConfig config; public MQConsumer(MQConfig config, String queue, ConsumerExecutor executor) { this.config = config; this.queue = queue; this.executor = executor; } @Override public void run() { try { init(); try { channel.basicConsume(queue, true, this); } catch (IOException e) { logger.error("MQ消费处理失败:", e); } } catch (Exception e) { logger.error("mq init() error", e); } } protected void init() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(config.getIp()); factory.setPort(config.getPort()); factory.setUsername(config.getUserName()); factory.setPassword(config.getPassword()); factory.setVirtualHost(config.getvHost()); factory.setAutomaticRecoveryEnabled(true); connection = factory.newConnection(); channel = connection.createChannel(); } @Override public void handleDelivery(String consumerTag, Envelope env, BasicProperties props, byte[] body) throws IOException { String msg = new String(body, "utf-8"); logger.debug("从队列[" + queue + "] 接收消息: " + msg); try { executor.consume(msg); } catch (Exception e) { logger.error("handleDelivery error:", e); } } @Override public void handleCancel(String consumerTag) { logger.info("handleCancel:" + consumerTag); } @Override public void handleCancelOk(String consumerTag) { logger.info("handleCancelOk:" + consumerTag); } @Override public void handleConsumeOk(String consumerTag) { logger.info("handleConsumeOk:" + consumerTag); } @Override public void handleRecoverOk(String consumerTag) { logger.info("handleRecoverOk:" + consumerTag); } @Override public void handleShutdownSignal(String consumerTag, ShutdownSignalException e) { logger.info("handleShutdownSignal:" + consumerTag); } }
消费者代码示例,只要automaticRecoveryEnabled为true,而且queue和exchange都是持久化的,能够自动恢复,不用手工处理。
4、auto_ack问题
在auto_ack为true时,数据流是这样的:
App从MQ取消息->删除消息->App业务逻辑处理(包括读写数据库等)->发送处理结果(如果有需要)
可以看出当App业务逻辑处理失败时,消息已经被删除了,很多情况下,这是不安全的,所以改为:
App从MQ取消息->App业务逻辑处理(包括读写数据库等)->发送ACK删除消息 ->发送处理结果(如果有需要)
但是由于性能问题一般出现在业务逻辑部分,如果这部分处理慢又会造成拥塞,所以要自已权衡
try { channel.basicConsume(queue, true, this); boolean autoAck = false; channel.basicConsume(queue, autoAck, this); } catch (IOException e) { logger.error("MQ消费处理失败:", e); }
try{ channel.basicAck(env.getDeliveryTag(), true); }catch(Exception e){ logger.error("basicAck error:", e); }
5、超时处理
采用MQ解耦后系统之间虽然是异步处理,但正常情况下响应速度跟同步处理接近。特殊情况下响应慢时很可能消息从发送到被处理已经过去了很长一段时间,前端极可能已经重复提交并完成了业务,所以需要加个快速失败机制。即消息生产者将消息的创建时间带到消息体里,消费者拿到消息后,判断如果是已经过去了指定间隔的消息,则直接失败返回。