zoukankan      html  css  js  c++  java
  • 消息队列:快速上手ActiveMQ消息队列的JMS方式使用(两种模式:Topic和Queue的消息推送和订阅)

    1、实现功能

    希望使用一套API,实现两种模式下的消息发送和接收功能,方便业务程序调用

    1、发送Topic

    2、发送Queue

    3、接收Topic

    4、接收Queue

    2、接口设计

    根据功能设计公共调用接口

    1. /**
    2. * 数据分发接口(用于发送、接收消息队列数据)
    3. *
    4. * @author eguid
    5. *
    6. */
    7. public interface MsgDistributeInterface {
    8.  
    9. /**
    10. * 发送到主题
    11. *
    12. * @param topicName -主题
    13. * @param data -数据
    14. * @return
    15. */
    16. public boolean sendTopic(String topicName, byte[] data);
    17.  
    18. /**
    19. * 发送到主题
    20. * @param topicName -主题
    21. * @param data-数据
    22. * @param offset -偏移量
    23. * @param length -长度
    24. * @return
    25. */
    26. boolean sendTopic(String topicName, byte[] data, int offset, int length);
    27.  
    28. /**
    29. * 发送到队列
    30. *
    31. * @param queueName -队列名称
    32. * @param data -数据
    33. * @return
    34. */
    35. public boolean sendQueue(String queueName, byte[] data);
    36.  
    37. /**
    38. * 发送到队列
    39. * @param queueName -队列名称
    40. * @param data -数据
    41. * @param offset
    42. * @param length
    43. * @return
    44. */
    45. public boolean sendQueue(String queueName, byte[] data,int offset, int length);
    46.  
    47. /**
    48. * 接收队列消息
    49. * @param queueName 队列名称
    50. * @param listener
    51. * @throws JMSException
    52. */
    53. void receiveQueue(String queueName, MessageListener listener) throws JMSException;
    54.  
    55. /**
    56. * 订阅主题
    57. * @param topicName -主题名称
    58. * @param listener
    59. * @throws JMSException
    60. */
    61. void receiveTopic(String topicName, MessageListener listener) throws JMSException;
    62. }

    3、基于ActiveMQ的接口实现

    1. /**
    2. * 基于activeMQ的消息生产者/消费者实现(初始化该对象时即初始化连接消息队列,如果无法连接到消息队列,立即抛出异常)
    3. *
    4. * @author eguid
    5. *
    6. */
    7. public class ActiveMQImpl implements MsgDistributeInterface {
    8.  
    9. private String userName;
    10. private String password;
    11. private String brokerURL;
    12. private boolean persistentMode;//持久化模式
    13. //连接工厂
    14. ConnectionFactory connectionFactory;
    15. //发送消息的线程
    16. Connection connection;
    17. // 事务管理
    18. Session session;
    19.  
    20. //存放各个线程订阅模式生产者
    21. ThreadLocal<MessageProducer> topicThreadLocal = new ThreadLocal<MessageProducer>();
    22. //存放各个线程队列模式生产者
    23. ThreadLocal<MessageProducer> queueThreadLocal = new ThreadLocal<MessageProducer>();
    24.  
    25. public ActiveMQImpl(String userName, String password, String brokerURL) throws JMSException {
    26. this(userName, password, brokerURL, true);
    27. }
    28.  
    29. public ActiveMQImpl(String userName, String password, String brokerURL,boolean persistentMode) throws JMSException {
    30. this.userName = userName;
    31. this.password = password;
    32. this.brokerURL = brokerURL;
    33. this.persistentMode=persistentMode;
    34. init();
    35. }
    36.  
    37. public void init() throws JMSException {
    38. try {
    39. // 创建一个链接工厂
    40. connectionFactory = new ActiveMQConnectionFactory(this.userName, this.password, this.brokerURL);
    41. // 从工厂中创建一个链接
    42. connection = connectionFactory.createConnection();
    43. // 开启链接
    44. connection.start();
    45. // 创建一个事务(订阅模式,事务采用自动确认方式)
    46. session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    47. } catch (JMSException e) {
    48. throw e;
    49. }
    50. }
    51.  
    52. @Override
    53. public boolean sendTopic(String topicName, byte[] data) {
    54. return sendTopic(topicName, data, 0, data.length);
    55. }
    56.  
    57. @Override
    58. public boolean sendTopic(String topicName, byte[] data, int offset, int length) {
    59. return send(true, topicName, data, offset, length);
    60. }
    61.  
    62. @Override
    63. public boolean sendQueue(String queueName, byte[] data) {
    64. return sendQueue(queueName, data, 0, data.length);
    65. }
    66.  
    67. @Override
    68. public boolean sendQueue(String queueName, byte[] data, int offset, int length) {
    69. return send(false, queueName, data, offset, length);
    70. }
    71.  
    72. /**
    73. * 发送数据
    74. *
    75. * @param name
    76. * @param data
    77. * @param offset
    78. * @param length
    79. * @param type
    80. * -类型
    81. * @return
    82. */
    83. private boolean send(boolean type, String name, byte[] data, int offset, int length) {
    84. try {
    85. MessageProducer messageProducer = getMessageProducer(name, type);
    86.  
    87. BytesMessage msg = createBytesMsg(data, offset, length);
    88. System.err.println(Thread.currentThread().getName()+"发送消息");
    89. // 发送消息
    90. messageProducer.send(msg);
    91. } catch (JMSException e) {
    92. return false;
    93. }
    94. return false;
    95. }
    96.  
    97. public void receive(String topicName) throws JMSException {
    98. final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
    99. Topic topic =session.createTopic(topicName);
    100. MessageConsumer consumer=session.createConsumer(topic);
    101. consumer.setMessageListener(new MessageListener() {
    102. @Override
    103. public void onMessage(Message message) {
    104. BytesMessage msg=(BytesMessage) message;
    105. System.err.println(Thread.currentThread().getName()+"收到消息:"+msg.toString());
    106. }
    107. });
    108.  
    109. }
    110. /**
    111. * 创建字节数组消息
    112. *
    113. * @param data
    114. * @param offset
    115. * @param length
    116. * @return
    117. * @throws JMSException
    118. */
    119. private BytesMessage createBytesMsg(byte[] data, int offset, int length) throws JMSException {
    120. BytesMessage msg = session.createBytesMessage();
    121. msg.writeBytes(data, offset, length);
    122. return msg;
    123. }
    124.  
    125. /**
    126. * 创建对象序列化消息
    127. * @param obj
    128. * @return
    129. * @throws JMSException
    130. */
    131. private ObjectMessage createMapMsg(Serializable obj) throws JMSException {
    132. // MapMessage msg = session.createMapMessage();//key-value形式的消息
    133. ObjectMessage msg = session.createObjectMessage(obj);
    134. return msg;
    135. }
    136.  
    137. /**
    138. * 创建字符串消息
    139. * @param text
    140. * @return
    141. * @throws JMSException
    142. */
    143. private TextMessage createTextMsg(String text) throws JMSException {
    144. TextMessage msg = session.createTextMessage(text);
    145. return msg;
    146. }
    147.  
    148.  
    149. /**
    150. * 获取创建者
    151. *
    152. * @param name -名称(主题名称和队列名称)
    153. * @param type -类型(true:topic,false:queue)
    154. * @return
    155. * @throws JMSException
    156. */
    157. private MessageProducer getMessageProducer(String name, boolean type) throws JMSException {
    158. return type?getTopicProducer(name):getQueueProducer(name);
    159. }
    160.  
    161. /**
    162. * 创建或获取队列
    163. * @param queueName
    164. * @return
    165. * @throws JMSException
    166. */
    167. private MessageProducer getQueueProducer(String queueName) throws JMSException {
    168. MessageProducer messageProducer = null;
    169. if ((messageProducer = queueThreadLocal.get()) == null) {
    170. Queue queue = session.createQueue(queueName);
    171. messageProducer = session.createProducer(queue);
    172. //是否持久化(1-不持久化(如果没有消费者,消息就也会自动失效),2-持久化(如果没有消费者进行消费,消息队列也会缓存消息等待消费者进行消费))
    173. messageProducer.setDeliveryMode(persistentMode?DeliveryMode.PERSISTENT:DeliveryMode.NON_PERSISTENT);
    174. queueThreadLocal.set(messageProducer);
    175. }
    176. return messageProducer;
    177. }
    178.  
    179. /**
    180. * 创建或获取主题
    181. * @param topicName
    182. * @return
    183. * @throws JMSException
    184. */
    185. private MessageProducer getTopicProducer(String topicName) throws JMSException {
    186. MessageProducer messageProducer = null;
    187. if ((messageProducer = topicThreadLocal.get()) == null) {
    188. Topic topic = session.createTopic(topicName);
    189. messageProducer = session.createProducer(topic);
    190. //是否持久化(1-不持久化(如果没有消费者,消息就也会自动失效),2-持久化(如果没有消费者进行消费,消息队列也会缓存消息等待消费者进行消费))
    191. messageProducer.setDeliveryMode(persistentMode?DeliveryMode.PERSISTENT:DeliveryMode.NON_PERSISTENT);
    192. topicThreadLocal.set(messageProducer);
    193. }
    194. return messageProducer;
    195. }
    196.  
    197. public String getPassword() {
    198. return password;
    199. }
    200.  
    201. public void setPassword(String password) {
    202. this.password = password;
    203. }
    204.  
    205. @Override
    206. public void receiveQueue(String queueName,MessageListener listener) throws JMSException {
    207. final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
    208. Queue topic =session.createQueue(queueName);
    209. MessageConsumer consumer=session.createConsumer(topic);
    210. consumer.setMessageListener(listener);
    211.  
    212. }
    213.  
    214. @Override
    215. public void receiveTopic(String topicName,MessageListener listener) throws JMSException {
    216. final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
    217. Topic topic =session.createTopic(topicName);
    218. MessageConsumer consumer=session.createConsumer(topic);
    219. consumer.setMessageListener(listener);
    220. }

    4、测试一下Topic和Queue

    1. public static void main(String[] args) throws JMSException{
    2. //如果创建失败会立即抛出异常
    3. MsgDistributeInterface producter = new ActiveMQImpl("system", "manager", "tcp://127.0.0.1:61616");
    4. Test testMq = new Test();
    5. try {
    6. Thread.sleep(1000);
    7. } catch (InterruptedException e) {
    8. e.printStackTrace();
    9. }
    10. //Thread 1
    11. new Thread(testMq.new ProductorMq(producter)).start();
    12. //Thread 2
    13. new Thread(testMq.new ProductorMq(producter)).start();
    14. //Thread 3
    15. new Thread(testMq.new ProductorMq(producter)).start();
    16. //Thread 4
    17. new Thread(testMq.new ProductorMq(producter)).start();
    18. //Thread 5
    19. new Thread(testMq.new ProductorMq(producter)).start();
    20. //Thread 6
    21. new Thread(testMq.new ProductorMq(producter)).start();
    22.  
    23. //订阅接收线程Thread 1
    24. new Thread(new Runnable() {
    25. @Override
    26. public void run() {
    27. try {
    28. producter.receiveTopic("eguid-topic",new MessageListener() {
    29. @Override
    30. public void onMessage(Message message) {
    31. BytesMessage msg=(BytesMessage) message;
    32. System.err.println(Thread.currentThread().getName()+"订阅主题消息:"+msg.toString());
    33. }
    34. });
    35. } catch (JMSException e) {
    36. // TODO Auto-generated catch block
    37. e.printStackTrace();
    38. }
    39. }
    40. }).start();
    41. //订阅接收线程Thread 2
    42. new Thread(new Runnable() {
    43. @Override
    44. public void run() {
    45. try {
    46. producter.receiveTopic("eguid-topic",new MessageListener() {
    47. @Override
    48. public void onMessage(Message message) {
    49. BytesMessage msg=(BytesMessage) message;
    50. System.err.println(Thread.currentThread().getName()+"订阅主题消息:"+msg.toString());
    51. }
    52. });
    53. } catch (JMSException e) {
    54. // TODO Auto-generated catch block
    55. e.printStackTrace();
    56. }
    57. }
    58. }).start();
    59. //队列消息生产线程Thread-1
    60. new Thread(testMq.new QueueProductor(producter)).start();
    61. //队列消息生产线程Thread-2
    62. new Thread(testMq.new QueueProductor(producter)).start();
    63. //队列接收线程Thread 1
    64. new Thread(new Runnable() {
    65. @Override
    66. public void run() {
    67. try {
    68. producter.receiveQueue("eguid-queue",new MessageListener() {
    69. @Override
    70. public void onMessage(Message message) {
    71. BytesMessage msg=(BytesMessage) message;
    72. System.err.println(Thread.currentThread().getName()+"收到队列消息:"+msg.toString());
    73. }
    74. });
    75. } catch (JMSException e) {
    76. // TODO Auto-generated catch block
    77. e.printStackTrace();
    78. }
    79. }
    80. }).start();
    81. //队列接收线程Thread2
    82. new Thread(new Runnable() {
    83. @Override
    84. public void run() {
    85. try {
    86. producter.receiveQueue("eguid-queue",new MessageListener() {
    87. @Override
    88. public void onMessage(Message message) {
    89. BytesMessage msg=(BytesMessage) message;
    90. System.err.println(Thread.currentThread().getName()+"收到队列消息:"+msg.toString());
    91. }
    92. });
    93. } catch (JMSException e) {
    94. // TODO Auto-generated catch block
    95. e.printStackTrace();
    96. }
    97. }
    98. }).start();
    99. }
    100.  
    101. private class ProductorMq implements Runnable{
    102. Jtt809MsgProducter producter;
    103. public ProductorMq(Jtt809MsgProducter producter){
    104. this.producter = producter;
    105. }
    106.  
    107. @Override
    108. public void run() {
    109. while(true){
    110. try {
    111. String wang=Thread.currentThread().getName()+"Hello eguid! This is topic.";
    112. producter.sendTopic("eguid-topic",wang.getBytes());
    113.  
    114. Thread.sleep(2000);
    115. } catch (InterruptedException e) {
    116. e.printStackTrace();
    117. }
    118. }
    119. }
    120. }
    121.  
    122. private class QueueProductor implements Runnable{
    123. Jtt809MsgProducter producter;
    124. public QueueProductor(Jtt809MsgProducter producter){
    125. this.producter = producter;
    126. }
    127.  
    128. @Override
    129. public void run() {
    130. while(true){
    131. try {
    132. String eguid=Thread.currentThread().getName()+"Hello eguid! This is queue.";
    133. producter.sendQueue("eguid-queue",eguid.getBytes());
    134. Thread.sleep(2000);
    135. } catch (InterruptedException e) {
    136. e.printStackTrace();
    137. }
    138. }
    139. }
    140. }

    -------------------End--------------------

  • 相关阅读:
    spring MVC 后台token防重复提交解决方案
    redis实现分布式锁
    java spring boot项目部署-上
    倒计数锁存器(CountDown Latch)和 CyclicBarrier(同步屏障)
    通过条件注解@Conditional细粒度的选择bean实例
    Netflix中的负载均衡策略
    C# lambda表达式参数的正确使用姿势
    RabbitMQ如何保证发送端消息的可靠投递-发生镜像队列发生故障转移时
    RabbitMQ如何保证发送端消息的可靠投递
    vue项目目录结构详解
  • 原文地址:https://www.cnblogs.com/eguid/p/9667126.html
Copyright © 2011-2022 走看看