一、消息的事务性
1、生产者
一、生产者不开启事务
public class JmsQueueProducer {
public static final String BROKER_URL = "tcp://192.168.229.129:61616";
public static final String QUEUE_NAME = "queue01";
public static final String TEXT_MESSAGE_NAME = "textMessage";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
Connection connection = connectionFactory.createConnection();
connection.start();
// 两个参数 transacted=事务,acknowledgeMode=签收模式
// 如果生产者不开启事务,也就是transacted=false,那么第二个参数只要是符合传参的规范,就能将消息发送至MQ服务器中
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 1; i < 4; i++) {
TextMessage textMessage = session.createTextMessage(TEXT_MESSAGE_NAME + i);
producer.send(queue, textMessage);
}
// 释放资源
producer.close();
session.close();
connection.close();
System.out.println("生产者发送消息至MQ QUEUE......");
}
}
可以看出第二个参数是int类型,它的规范的值有下面这一些:
Session.SESSION_TRANSACTED
Session.ATUO_ACKNOWLEDGE
Session.CLIENT_ACKNOWLEDGE
Session.DUPS_KO_ACKNOWKEDGE
如果我们给acknowledge设置一个int类型的值5,看看会有什么现象
// 给第二个参数设置int类型的数字5
Session session = connection.createSession(false, 5);
运行程序发现报错了,报错的意思就是int类型的数字 5 不是一个有效的参数,能使用的参数下面列举出来了.
Exception in thread "main" javax.jms.JMSException: invalid acknowledgeMode: 5.
Valid values are
Session.AUTO_ACKNOWLEDGE (1),
Session.CLIENT_ACKNOWLEDGE (2),
Session.DUPS_OK_ACKNOWLEDGE (3),
ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE (4)
or for transacted sessions Session.SESSION_TRANSACTED (0)
at org.apache.activemq.ActiveMQConnection.createSession(ActiveMQConnection.java:334)
at queue.JmsQueueProducer.main(JmsQueueProducer.java:23)
二、生产者开启事务
public class JmsQueueProducer {
public static final String BROKER_URL = "tcp://192.168.229.129:61616";
public static final String QUEUE_NAME = "queue01";
public static final String TEXT_MESSAGE_NAME = "textMessage";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(BROKER_URL);
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(queue);
try {
for (int i = 1; i < 11; i++) {
TextMessage textMessage = session.createTextMessage(TEXT_MESSAGE_NAME + i);
producer.send(queue, textMessage);
if(i<=5){
session.commit();
}else{
session.rollback();
}
}
System.out.println("生产者成功发送消息至MQ QUEUE......");
} catch (Exception e) {
System.out.println("如果出现异常,消息回滚");
session.rollback();
} finally {
// 释放资源
producer.close();
session.close();
connection.close();
}
}
}
小结:
1、生产者不开启事务的情况下,只要执行send()方法,消息就可以发送至队列,它会默认帮我们commit(),第二个参数只要是符合语法规范的有效值就可以了,理论上是任意的,但是我们比较喜欢使用ATUO_ACKNOWLEDGE
2、生产者开启事务的情况下,执行send()方法并不会把消息发送至MQ服务器,还需要手动commit()进行提交,如果出现异常的情况下可以使用session.rollback进行回滚,由于对于生产者来说,事务的级别高于消息的应答,所以第二个参数可以忽略,一般ATUO_ACKNOWLEDGE就可以了.
3、对于生产者来说,生产者偏向于事务,生产者发送消息到MQ队列中,不关心签收机制是什么,所以不管开启不开启事务,第二个参数一般只要给一个AUTO_ACKNOWLEDGE就可以了
2、消费者
一、消费者不开启事务,自动签收的情况下,只能消费一次消息,不会重复消费消息
// 消费者不开启事务,自动应答
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
二、消费者开启事务、自动签收、并且手动commit(),那么消息就只能被消费一次,这样就可以保证一定的可靠性,否则消息会被重复消费
// 消费者开启事务,自动签收
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// 在session关闭之前,使用session.commit()手动提交
session.commit();
二、消息的签收机制
一般签收机制只对于消费者来说,对于生产者来说,它只负责发送消息,消息的签收机制对它来说没有太大意义
1、消费者不开启事务,自动签收的情况下,消费者可以消费消息,并且只能消费一次消息,不会存在重复消费的情况
// 消费者不开启事务,自动签收
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
2、消费者开启事务,自动签收的情况下,必须使用commit手动提交,否则重复消费消息
// 消费者开启事务,自动签收
Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// 如果没有手动提交,消费者可以重复消费消息,并且队列中的消息不会出队列
session.commit();
3、消费者不开启事务,手动签收,必须调用acknowledge()方法来确认签收,否则会重复消费消息
// 消费者不开启事务,手动签收
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
// 在自动签收的情况下使用message.acknowledge()方法进行签收,如果不手动调用ackonwledge()方法,那么消息会被重复消费
textMessage.acknowledge();
4、消费者开启事务,手动签收,只调用acknowledge()方法,消息会被重复消费
// 消费者开启事务,手动签收
Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
// 如果只加上message.acknowledge()方法,消息会被重复消费
textMessage.acknowledge();
5、消费者开启事务,手动签收,只调用commit()方法,消息不会被重复消费(事务的决定性>签收机制???)
// 消费者开启事务,手动签收
Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
// 如果只加上session.commit()方法,消息不会重复消费
session.commit();
6、消费者开启事务,手动签收,调用commit()方法和acknowledge()方法,消息不会重复消费
// 消费者开启事务,手动签收
Session session = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
// 加上message.acknowledge()方法
textMessage.acknowledge();
// 加上session.commit()方法
session.commit();
三、总结
1、生产者
一、对于生产者来说,它关心的仅仅只是事务,生产者只负责发送消息,消息是否签收,以及消息的签收机制对他来说没有很大意义
二、如果生产者不开启事务,我们为了符合规范,消息签收机制那里我们给一个Session.AUTO_ACKONWLEDGE就可以了
三、如果生产者开启事务,我们为了符合规范,消息签收机制那里我们给一个AUTO_ACKONWLEDGE,并且需要手动调用session.commit(),否则消息不会发送至MQ服务器
2、消费者
一、消费者一般关心的是签收方式,我们一般不开启事务,如果是自动签收,我们什么都不需要做,如果是手动签收,那么我们需要手动调用message.acknowledge()方法
二、消费者特殊情况下如果使用事务,就需要调用session.commit()方法.
三、特殊情况下,如果既开启了事务,又是手动签收,单单开启事务就可以避免消费重复消息
四、为了避免出错,消费者消费消息的时候,如果开启了事务,就调用session.commit(),自动签收就调用acknowledge,使它们成对的出现,这样可以避免不必要的错误.