zoukankan      html  css  js  c++  java
  • 使用ActiveMQ实现简易聊天功能

    一 什么是消息队列

    我们可以把消息队列比作是一个存放消息的容器,当我们需要使用消息的时候可以取出消息供自己使用。消息队列是分布式系统中重要的组件,使用消息队列主要是为了通过异步处理提高系统性能和削峰、降低系统耦合性。目前使用较多的消息队列有ActiveMQ,RabbitMQ,Kafka,RocketMQ

    二 为什么要用消息队列

    使用消息队列主要有两点好处:

    1.通过异步处理提高系统性能(削峰、减少响应所需时间);

    2.降低系统耦合性。如果在面试的时候你被面试官问到这个问题的话,一般情况是你在你的简历上涉及到消息队列这方面的内容,这个时候推荐你结合你自己的项目来回答。

    三 ActiveMQ

    ActiveMQ 是基于 JMS 规范实现的。

    JMS消息列队有两种消息模式,一种是点对点的消息模式,还有一种是订阅的模式。

    四 实现

    ActiveMQ下载地址:http://activemq.apache.org/components/classic/download/

    解压缩apache-activemq-5.xxx-bin.zip到一个目录

    启动ActiveMQ:运行C: apache-activemq-5.xxxinactivemq.bat

    浏览器中输入:http://localhost:8161/admin/ 测试启动情况

    使用点对点方式实现聊天功能

    编写消息发送类和接收类。发送类中需要连接ActiveMQ 服务器,创建队列,发送消息;接收类中需要ActiveMQ 服务器,读取发送者发送消息所用的队列。接收类实现为一个单独的线程,使用监听器模式,每隔一段时间侦听是否有消息到来,若有消息到来,将消息添加到辅助类消息列表中。

    使用2个队列,即对于每一个用户来说,发送消息为一个队列,接受消息为一个队列。

    效果如下:

     1 import org.apache.activemq.ActiveMQConnectionFactory;
     2 
     3 import javax.jms.*;
     4 
     5 import static java.lang.Thread.sleep;
     6 
     7 public class MessageReceiver implements Runnable{
     8     private String url;
     9     private String user;
    10     private String password;
    11     private final String QUEUE;
    12     private Boolean stop;
    13     Connection connection;
    14 
    15     public MessageReceiver(String queue, String url, String user, String password) {
    16         this.url = url;
    17         this.user = user;
    18         this.password = password;
    19         this.QUEUE = queue;
    20         stop = false;
    21     }
    22 
    23     public void run() {
    24         ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
    25         try {
    26             connection = connectionFactory.createConnection();
    27             Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    28             Destination receiveQueue = session.createQueue(QUEUE);
    29             MessageConsumer consumer = session.createConsumer(receiveQueue);
    30             connection.start();
    31             while(!stop) {
    32                 consumer.setMessageListener(new MessageListener() {
    33                     @Override
    34                     public void onMessage(Message message) {
    35                         try {
    36                             //获取到接收的数据
    37                             String text = ((TextMessage) message).getText();
    38                             MessageText.setMsg(text);
    39                         } catch (JMSException e) {
    40                             e.printStackTrace();
    41                         }
    42                     }
    43                 });
    44                 sleep(500);
    45             }
    46         } catch (JMSException e) {
    47             e.printStackTrace();
    48         }catch (InterruptedException e) {
    49             //Thread.currentThread().interrupt();
    50             e.printStackTrace();
    51         }
    52     }
    53 
    54     public void setStop(Boolean stop) {
    55         this.stop = stop;
    56     }
    57 
    58     public void closeConnection(){
    59         try {
    60             connection.close();
    61         } catch (JMSException e) {
    62             e.printStackTrace();
    63         }
    64     }
    65 
    66     public String getUrl() {
    67         return url;
    68     }
    69 
    70     public void setUrl(String url) {
    71         this.url = url;
    72     }
    73 
    74     public String getUser() {
    75         return user;
    76     }
    77 
    78     public void setUser(String user) {
    79         this.user = user;
    80     }
    81 
    82     public String getPassword() {
    83         return password;
    84     }
    85 
    86     public void setPassword(String password) {
    87         this.password = password;
    88     }
    89 }
    MessageReceiver
     1 import org.apache.activemq.ActiveMQConnectionFactory;
     2 
     3 import javax.jms.*;
     4 import java.text.DateFormat;
     5 import java.text.SimpleDateFormat;
     6 import java.util.Date;
     7 
     8 public class MessageSender {
     9     private String url;
    10     private String user;
    11     private String password;
    12     private final String QUEUE;
    13     private Connection connection;
    14     private Session session;
    15     private Destination sendQueue;
    16     private MessageProducer sender;
    17     private TextMessage outMessage;
    18     private DateFormat df;
    19 
    20     public MessageSender(String queue, String url, String user, String password) {
    21         this.url = url;
    22         this.user = user;
    23         this.password = password;
    24         this.QUEUE = queue;
    25     }
    26 
    27     public void init() {
    28         ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
    29         df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    30         try {
    31             connection = connectionFactory.createConnection();
    32             connection.start();
    33             session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
    34             sendQueue = session.createQueue(QUEUE);
    35             sender = session.createProducer(sendQueue);
    36             outMessage = session.createTextMessage();
    37         } catch (JMSException e) {
    38             e.printStackTrace();
    39         }
    40     }
    41 
    42     public void sendMessage(String messageStr) {
    43         try {
    44             outMessage = session.createTextMessage();
    45             String sendStr = df.format(new Date()) + "
    " + QUEUE + ": " + messageStr;
    46             outMessage.setText(sendStr);
    47             sender.send(outMessage);
    48             session.commit();
    49             MessageText.setMsg(sendStr);
    50         } catch (JMSException e) {
    51             e.printStackTrace();
    52         }
    53     }
    54 
    55     public void closeConnection() {
    56         try {
    57             sender.close();
    58             connection.close();
    59         } catch (JMSException e) {
    60             e.printStackTrace();
    61         }
    62     }
    63 
    64     public String getUrl() {
    65         return url;
    66     }
    67 
    68     public void setUrl(String url) {
    69         this.url = url;
    70     }
    71 
    72     public String getUser() {
    73         return user;
    74     }
    75 
    76     public void setUser(String user) {
    77         this.user = user;
    78     }
    79 
    80     public String getPassword() {
    81         return password;
    82     }
    83 
    84     public void setPassword(String password) {
    85         this.password = password;
    86     }
    87 }
    MessageSender
  • 相关阅读:
    C++ 值传递、指针传递、引用传递
    typedef与#define的区别
    const与#define的区别
    头文件重复引用
    多态
    ng双向数据绑定
    angular响应式编程
    angular的一些问题
    npm install 权限的问题
    typescript的入门
  • 原文地址:https://www.cnblogs.com/flyuz/p/10682756.html
Copyright © 2011-2022 走看看