zoukankan      html  css  js  c++  java
  • 简单的activemq的封装和使用

    天空中飘着小雨,实在是适合写代码的时节。

    1

      1 package ch02.chat;
      2 
      3 import java.io.Serializable;
      4 
      5 import javax.jms.Connection;
      6 import javax.jms.ConnectionFactory;
      7 import javax.jms.Destination;
      8 import javax.jms.JMSException;
      9 import javax.jms.Message;
     10 import javax.jms.MessageConsumer;
     11 import javax.jms.MessageListener;
     12 import javax.jms.MessageProducer;
     13 import javax.jms.ObjectMessage;
     14 import javax.jms.Session;
     15 /*本工具封装了*/
     16 
     17 
     18 import javax.jms.TextMessage;
     19 import javax.jms.Topic;
     20 import javax.jms.TopicConnection;
     21 import javax.jms.TopicConnectionFactory;
     22 import javax.jms.TopicPublisher;
     23 import javax.jms.TopicSession;
     24 import javax.jms.TopicSubscriber;
     25 
     26 import org.apache.activemq.ActiveMQConnection;
     27 import org.apache.activemq.ActiveMQConnectionFactory;
     28 
     29 public class JMSTopic {
     30     TopicConnectionFactory connectionFactory;
     31     // Connection :JMS 客户端到JMS Provider 的连接
     32     TopicConnection connection = null;
     33     //用来发布的会话
     34     
     35     //TopicSession proSession = null;
     36     //2一个订阅会话
     37    //TopicSession conSession = null;
     38     TopicSession  session=null;
     39     
     40     //主题发布者
     41     MessageProducer producer=null;
     42     //主题
     43     MessageConsumer consumer=null;
     44     
     45     
     46     // Destination :消息的目的地;消息发送给谁.
     47     Destination destination;
     48     // MessageProducer:消息发送者
     49     
     50     //默认构造函数,默认的连接activemq,可以写多个构造函数
     51     public JMSTopic() 
     52     { 
     53         connectionFactory =  new ActiveMQConnectionFactory(
     54                 ActiveMQConnection.DEFAULT_USER,
     55                 ActiveMQConnection.DEFAULT_PASSWORD,
     56                 "tcp://localhost:61616");
     57         try {
     58             connection= connectionFactory.createTopicConnection();
     59         } catch (JMSException e) {
     60             // TODO Auto-generated catch block
     61             e.printStackTrace();
     62         }
     63         try {
     64             connection.start();
     65         } catch (JMSException e) {
     66             // TODO Auto-generated catch block
     67             e.printStackTrace();
     68         }
     69         
     70     }
     71     public JMSTopic(String user,String name) 
     72     { 
     73         connectionFactory =  new ActiveMQConnectionFactory(
     74                 user,
     75                 name,
     76                 "tcp://localhost:61616");
     77         try {
     78             connection= connectionFactory.createTopicConnection();
     79         } catch (JMSException e) {
     80             // TODO Auto-generated catch block
     81             e.printStackTrace();
     82         }
     83         try {
     84             connection.start();
     85         } catch (JMSException e) {
     86             // TODO Auto-generated catch block
     87             e.printStackTrace();
     88         }
     89         
     90     }
     91     
     92     
     93     
     94     
     95     //设计session类型
     96        public void setSession() throws JMSException
     97        {
     98            session= connection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
     99            
    100        }
    101          //设置为原子类型
    102        public void setAtomicSession() throws JMSException
    103        {
    104            session= connection.createTopicSession(true, Session.AUTO_ACKNOWLEDGE);
    105            
    106        }
    107     
    108     //此处先固定消息为String类型
    109     public void writeMessage(String t,String message,int priority )
    110     {
    111         try {
    112             
    113             producer=session.createProducer(session.createTopic(t));
    114             
    115             //使用message构造TextMessage 
    116             TextMessage text=session.createTextMessage();
    117             text.setJMSPriority(priority);
    118             text.setText(message);
    119             producer.send(text);
    120             
    121             
    122             
    123             
    124         } catch (JMSException e) {
    125             // TODO Auto-generated catch block
    126             e.printStackTrace();
    127         }
    128         //创建发布会话应该是可以配置的,此处先固定
    129         
    130         
    131     }
    132     
    133     public void writeMessage(String t,Object o)
    134     {
    135         try {
    136 
    137             
    138             producer=session.createProducer(session.createTopic(t));
    139             
    140             //使用message构造TextMessage 
    141             ObjectMessage text=session.createObjectMessage();
    142             text.setObject((Serializable) o);
    143             producer.send(text);
    144             
    145             
    146             
    147             
    148         } catch (JMSException e) {
    149             // TODO Auto-generated catch block
    150             e.printStackTrace();
    151         }
    152         //创建发布会话应该是可以配置的,此处先固定
    153         
    154         
    155     }
    156         
    157     //使用某个Message监听器来监听某个Topic
    158     public void receiveMsg(String c,MessageListener ml)
    159     {
    160         try {
    161         
    162             Topic t=session.createTopic(c);
    163             consumer=session.createConsumer(t);
    164             //设置过来的监视器
    165             consumer.setMessageListener(ml);
    166             
    167         } catch (JMSException e) {
    168             // TODO Auto-generated catch block
    169             e.printStackTrace();
    170         }
    171         
    172         
    173         
    174         
    175     }
    176     public Message receiveMsg(String c)
    177     {
    178         try {
    179             
    180             Topic t=session.createTopic(c);
    181             consumer=session.createConsumer(t);
    182             //设置过来的监视器
    183             Message message=consumer.receive();
    184             return message;
    185             
    186         } catch (JMSException e) {
    187             // TODO Auto-generated catch block
    188             e.printStackTrace();
    189         }
    190         return null;
    191     }
    192     //同步接收信息
    193     
    194     public void commit() throws JMSException
    195     {
    196         session.commit();
    197     }
    198     public void rollback() throws JMSException
    199     {
    200         session.rollback();
    201     }
    202     public void close() throws JMSException 
    203     {
    204         if(connection!=null) connection.close();
    205         if(session!=null) session.close();
    206         if(producer!=null) session.close();
    207         if(consumer!=null) consumer.close();
    208         
    209         
    210         
    211         
    212     }
    213     
    214 
    215 }
    View Code

    2.如何使用呢?

    a.做个有关事务的实

     1 package ch02.chat;
     2 
     3 import java.util.Scanner;
     4 
     5 import javax.jms.JMSException;
     6 import javax.jms.Session;
     7 
     8 public class ClientTest {
     9     public static void main(String args[]) throws JMSException
    10     {
    11         //第一个例子,建立一个原子的session做个实验看看,这个一个不会发送任何信息到服务器,
    12         JMSTopic jt=new JMSTopic();
    13         //jt.setSession();
    14         try{
    15         
    16         jt.setAtomicSession();
    17         
    18         jt.writeMessage("que1", "hansongjiang",4);
    19         int x=10/0; //会抛出异常,实现回滚,所以que1中不会添加任何信息
    20         jt.writeMessage("que1","hansong",4);
    21         jt.commit();
    22        }
    23         catch(Exception e)
    24         {
    25             jt.rollback();
    26             
    27         }
    28         finally
    29         {
    30     
    31         jt.close();
    32         }
    33     
    34     //如果
    35     /*    jt=new  JMSTopic();
    36         try
    37         {
    38         jt.setSession();
    39         jt.writeMessage("que1", "hansongjiang",4);
    40         //int x=10/0;
    41         jt.writeMessage("que1","zhangsan",4);
    42         
    43         }
    44         catch(Exception e)
    45         {
    46             System.out.println("异常我抓住了");
    47             
    48         }
    49         
    50         */
    51         
    52         
    53     }
    54 
    55 }
    View Code

    入队的信息为的信息为个数为0

    b.非事务的执行后呢?我们使用getAtomicSession获得的session执行后入topic个数为1

     1 package ch02.chat;
     2 
     3 import java.util.Scanner;
     4 
     5 import javax.jms.JMSException;
     6 import javax.jms.Session;
     7 
     8 public class ClientTest {
     9     public static void main(String args[]) throws JMSException
    10     {
    11     
    12     
    13         JMSTopic jt=new  JMSTopic();
    14         try
    15         {
    16         jt.setSession();
    17         jt.writeMessage("que1", "hansongjiang",4);
    18         //int x=10/0;
    19         jt.writeMessage("que1","zhangsan",4);
    20         
    21         }
    22         catch(Exception e)
    23         {
    24             System.out.println("异常我抓住了");
    25             
    26         }
    27         
    28         */
    29         
    30         
    31     }
    32 
    33 }
    View Code
  • 相关阅读:
    [BZOJ2038]小Z的袜子
    [BZOJ5016]一个简单的询问
    [BZOJ1008][HNOI2008]越狱
    [FZU2254]英语考试
    利用Map 的merge方法统计数量
    List 原生态类型
    try-with-resource 关闭 io流
    利用构建器创建对象
    linux 安装 vault
    git 上传文件
  • 原文地址:https://www.cnblogs.com/hansongjiang/p/4077145.html
Copyright © 2011-2022 走看看