zoukankan      html  css  js  c++  java
  • activeMq-1 快速入门

    Activemq 是一款开源的消息中间件,适合中小型应用使用,遵循JMS规范。

    具体介绍这里就不再阐述了,这里简单说下消息中间件的好处

     1请求结果异步处理

      客户端发送请求以后,服务器可以把相关数据放到消息中间件上,不一定马上处理

     2解耦

     client进程和server服务进程都不一定同时可用。

     3不仅支持11通信,也支持1对多通信

    另外我想说的是,JMS只是定义了接口,并没有给出实现。以下是jms相关的术语介绍

    ProviderMessageProvider):生产者 生产消息

    ConsumerMessageConsumer):消费者 发送消息

    PTPPoint To Point,点对点通信消息模型

    Pub/SubPublish/Subscribe,发布订阅消息模型

    Queue:消息队列,和PTP结合

    Topic:消息主题,和Pub/Sub结合

    ConnectionFactory:连接工厂,JMS用它创建Connnection

    ConnnectionJMS ClientJMS Provider的连接,用于创建session

    Destination:消息目的地,由Session创建

    Session:会话,由Connection创建,发送、接受消息的一个线程

    快速入门

      下载 http://activemq.apache.org/ 启动程序   

       windows系统 (根据你系统 bit不同 选择不同的启动程序

     

     启动完成以后,就可以访问控制台了,web 控制台可以通过

     http://localhost:8161  需要输入用户名以及密码 (默认是admin  admin

     当然这个是可以修改的,默认这个是运行在jetty里面的

    修改conf目录下的jetty-realm.properties

      

    接下来,我们写第一个快速入门程序。具体来说,会有一个生产者用于发送消息,一个消费者用于接收消息。

    引入jar包,我这里为了方便,直接引入的是activemq-all-5.9.0.jar

     

    实际开发中 应该根据需求引入lib目录下的相关包

    =============================================================================================================

    生产者

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.DeliveryMode;
    import javax.jms.Destination;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class Producer {
    	public static void main(String[] args) throws Exception {
    	
    		ConnectionFactory factory = new ActiveMQConnectionFactory("cc","cz", ActiveMQConnectionFactory.DEFAULT_BROKER_URL);
    		Connection connection = factory.createConnection();
    		// 默认是关闭的 需要开启
    		connection.start();
    		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    		Destination dest = session.createQueue("test");
    		MessageProducer producer = session.createProducer(dest);
    		// 设置非持久化  服务器关闭消息就不再保存
    		producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    		TextMessage textMessage = session.createTextMessage();
    		textMessage.setText("I AM MESSAGE");
    		producer.send(textMessage);
    		try {
    		} finally {
    			if(connection!=null)
    			connection.close();
    		}
    	}
    }
    

    消费者

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import org.apache.activemq.ActiveMQConnectionFactory;
    
    public class Consumer {
    	public static void main(String[] args) throws Exception {
    
    		ConnectionFactory factory = new ActiveMQConnectionFactory("cc", "cz",
    				ActiveMQConnectionFactory.DEFAULT_BROKER_URL);
    		Connection connection = factory.createConnection();
    		// Connection默认是关闭的 需要开启
    		connection.start();
    		// 设置是否支持事务 和 签收模式
    		Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    		Destination dest = session.createQueue("test");
    		MessageConsumer consumer = session.createConsumer(dest);
    		while (true) {
    			//因为这里我们知道发送的是TextMessage 所以没有做类型判断 类型判断可以用 instanceof
    			TextMessage mes = (TextMessage) consumer.receive();
    			// mes.acknowledge(); 签收消息
    			System.out.println(mes.getText());
    		}
    
    	
    	}
    
    }
    

      

    运行生产者 ,在运行消费者(先后顺序是没有关系的,我这里并没有让消费者停止运行)

      

    详细api介绍

     1安全认证,我们不能让谁都去链接我们的mq服务器,这里我们可以在activemq.xml配置用户名密码 认证

     

     2conection一定要关闭,不然mq不会释放资源。

     3我们通过connection创建session对象的时候,有两个参数

    分别是指 是否支持事务以及消息的签收模式

     签收模式有三种

     Session.AUTO_ACKNOWLEDGE 自动签收 当客户端从recive方法或者onMessage成功返回时 就签收

     Session.CLIENT_ACKNOWLEDGE 需要调用Message调用acknowledge消息才会签收【实际开发中,我们一般使用这种方式】

     Session.DUPS_OK_ACKNOWLEDGE 不必确认对消息的签收,可能会有消息的重复,但是提高了性能。

    4 Producersend方法

    上面的代码中,我们先后指定了目的地,持久化方式,实际上这些都可以不必指定的,而是到send的时候指定。而且在实际业务开发中,往往根据各种判断,来决定将这条消息发往哪个Queue,因此往往不会在MessageProducer创建的时候指定Destination。

    TTL,消息的存活时间,一句话:生产者生产了消息,如果消费者不来消费,那么这条消息保持多久的有效期

    priority,消息优先级,0-9。0-4是普通消息,5-9是加急消息,消息默认级别是4。注意,消息优先级只是一个理论上的概念,也就是说ActiveMQ并不能保证消费的顺序性!

     

    deliveryMode,如果不指定,默认是持久化的消息。如果可以容忍消息的丢失,那么采用非持久化的方式,将会改善性能、减少存储的开销

    当然我们也可以修改消息持久化方式,默认是kahadb

     

    可以修改为mysql ,不过一般不推荐mysql 。

    可以使用leveldb,毕竟这些数据库的性能要较MySQL更高些,事实上我们并不关心消息的“可视化”,更加关心的是消息在持久化的同时更加高效!

     

  • 相关阅读:
    web(零)---tornado使用
    web(一)----tornado nginx配置
    pb- 使用
    Python排序算法之直接插入排序
    敏捷测试中发现的一些问题及改进办法
    加密算法
    共享内存与存储映射(mmap)
    mysql索引的性能分析
    mysql索引
    Innodb的存储及缓存
  • 原文地址:https://www.cnblogs.com/javabigdata/p/7464393.html
Copyright © 2011-2022 走看看