Activemq 是一款开源的消息中间件,适合中小型应用使用,遵循JMS规范。
具体介绍这里就不再阐述了,这里简单说下消息中间件的好处
1请求结果异步处理
客户端发送请求以后,服务器可以把相关数据放到消息中间件上,不一定马上处理
2解耦
client进程和server服务进程都不一定同时可用。
3不仅支持1对1通信,也支持1对多通信
另外我想说的是,JMS只是定义了接口,并没有给出实现。以下是jms相关的术语介绍
Provider(MessageProvider):生产者 生产消息
Consumer(MessageConsumer):消费者 发送消息
PTP:Point To Point,点对点通信消息模型
Pub/Sub:Publish/Subscribe,发布订阅消息模型
Queue:消息队列,和PTP结合
Topic:消息主题,和Pub/Sub结合
ConnectionFactory:连接工厂,JMS用它创建Connnection
Connnection:JMS Client到JMS Provider的连接,用于创建session
Destination:消息目的地,由Session创建
Session:会话,由Connection创建,发送、接受消息的一个线程
快速入门
下载 http://activemq.apache.org/ 启动程序
windows系统 (根据你系统 bit的不同 选择不同的启动程序
启动完成以后,就可以访问控制台了,web 控制台可以通过
http://localhost:8161 需要输入用户名以及密码 (默认是admin admin )
当然这个是可以修改的,默认这个是运行在jetty里面的
修改conf目录下的jetty-realm.properties
接下来,我们写第一个快速入门程序。具体来说,会有一个生产者用于发送消息,一个消费者用于接收消息。
引入jar包,我这里为了方便,直接引入的是activemq-all-5.9.0.jar
实际开发中 应该根据需求引入lib目录下的相关包
=============================================================================================================
生产者
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.MessageProducer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Producer { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ActiveMQConnectionFactory("cc","cz", ActiveMQConnectionFactory.DEFAULT_BROKER_URL); Connection connection = factory.createConnection(); // 默认是关闭的 需要开启 connection.start(); Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination dest = session.createQueue("test"); MessageProducer producer = session.createProducer(dest); // 设置非持久化 服务器关闭消息就不再保存 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); TextMessage textMessage = session.createTextMessage(); textMessage.setText("I AM MESSAGE"); producer.send(textMessage); try { } finally { if(connection!=null) connection.close(); } } }
消费者
import javax.jms.Connection; import javax.jms.ConnectionFactory; import javax.jms.Destination; import javax.jms.MessageConsumer; import javax.jms.Session; import javax.jms.TextMessage; import org.apache.activemq.ActiveMQConnectionFactory; public class Consumer { public static void main(String[] args) throws Exception { ConnectionFactory factory = new ActiveMQConnectionFactory("cc", "cz", ActiveMQConnectionFactory.DEFAULT_BROKER_URL); Connection connection = factory.createConnection(); // Connection默认是关闭的 需要开启 connection.start(); // 设置是否支持事务 和 签收模式 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination dest = session.createQueue("test"); MessageConsumer consumer = session.createConsumer(dest); while (true) { //因为这里我们知道发送的是TextMessage 所以没有做类型判断 类型判断可以用 instanceof TextMessage mes = (TextMessage) consumer.receive(); // mes.acknowledge(); 签收消息 System.out.println(mes.getText()); } } }
运行生产者 ,在运行消费者(先后顺序是没有关系的,我这里并没有让消费者停止运行)
详细api介绍
1安全认证,我们不能让谁都去链接我们的mq服务器,这里我们可以在activemq.xml配置用户名密码 认证
2conection一定要关闭,不然mq不会释放资源。
3我们通过connection创建session对象的时候,有两个参数
分别是指 是否支持事务以及消息的签收模式
签收模式有三种
Session.AUTO_ACKNOWLEDGE 自动签收 当客户端从recive方法或者onMessage成功返回时 就签收
Session.CLIENT_ACKNOWLEDGE 需要调用Message调用acknowledge消息才会签收【实际开发中,我们一般使用这种方式】
Session.DUPS_OK_ACKNOWLEDGE 不必确认对消息的签收,可能会有消息的重复,但是提高了性能。
4 Producer的send方法
上面的代码中,我们先后指定了目的地,持久化方式,实际上这些都可以不必指定的,而是到send的时候指定。而且在实际业务开发中,往往根据各种判断,来决定将这条消息发往哪个Queue,因此往往不会在MessageProducer创建的时候指定Destination。
TTL,消息的存活时间,一句话:生产者生产了消息,如果消费者不来消费,那么这条消息保持多久的有效期
priority,消息优先级,0-9。0-4是普通消息,5-9是加急消息,消息默认级别是4。注意,消息优先级只是一个理论上的概念,也就是说ActiveMQ并不能保证消费的顺序性!
deliveryMode,如果不指定,默认是持久化的消息。如果可以容忍消息的丢失,那么采用非持久化的方式,将会改善性能、减少存储的开销
当然我们也可以修改消息持久化方式,默认是kahadb
、
可以修改为mysql ,不过一般不推荐mysql 。
可以使用leveldb,毕竟这些数据库的性能要较MySQL更高些,事实上我们并不关心消息的“可视化”,更加关心的是消息在持久化的同时更加高效!