使用消息队列异步化系统
前言
前期为了快速开发,项目结构较为混乱,代码维护与功能扩展都比较困难,为了方便后续功能开发,最近对项目进行的重构,顺便在重构的过程中将之前的部分操作进行了异步处理,也第一次实际接触了JMS与消息队列。项目中采用的消息中间件为ActiveMQ。
什么是JMS
Java消息服务(Java Message Service,JMS)应用程序接口是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。
Java消息服务的规范包括两种消息模式,点对点和发布者/订阅者。许多提供商支持这一通用框架因此,程序员可以在他们的分布式软件中实现面向消息的操作,这些操作将具有不同面向消息中间件产品的可移植性。
Java消息服务支持同步和异步的消息处理,在某些场景下,异步消息是必要的;在其他场景下,异步消息比同步消息操作更加便利。
Java消息服务支持面向事件的方法接收消息,事件驱动的程序设计现在被广泛认为是一种富有成效的程序设计范例,程序员们都相当熟悉。
在应用系统开发时,Java消息服务可以推迟选择面对消息中间件产品,也可以在不同的面对消息中间件切换。——Wiki
什么是消息队列
在计算机科学中,消息队列(英语:Message queue)是一种进程间通信或同一进程的不同线程间的通信方式,软件的贮列用来处理一系列的输入,通常是来自使用者。消息队列提供了异步的通信协议,每一个贮列中的纪录包含详细说明的资料,包含发生的时间,输入装置的种类,以及特定的输入参数,也就是说:消息的发送者和接收者不需要同时与消息队列互交。消息会保存在队列中,直到接收者取回它。
目前,有很多消息队列有很多开源的实现,包括JBoss Messaging、JORAM、Apache ActiveMQ、Sun Open Message Queue、Apache Qpid和HTTPSQS。
消息队列本身是异步的,它允许接收者在消息发送很长时间后再取回消息,这和大多数通信协议是不同的。例如WWW中使用的HTTP协议是同步的,因为客户端在发出请求后必须等待服务器回应。然而,很多情况下我们需要异步的通信协议。比如,一个进程通知另一个进程发生了一个事件,但不需要等待回应。但消息队列的异步特点,也造成了一个缺点,就是接收者必须轮询消息队列,才能收到最近的消息。
和信号相比,消息队列能够传递更多的信息。与管道相比,消息队列提供了有格式的数据,这可以减少开发人员的工作量。但消息队列仍然有大小限制。——Wiki
正文
基本类图结构如下:
说明:
AsyncWork:消息的处理类接口,定义各类型的消息的处理方式
AsyncWorkProducer:消息的生产者(JMS生产者),负责向消息队列里面放入消息
AsyncWorkConsumer:消息的消费者(JMS消费者),负责从消息队列中消费消息
AsyncWorkFactory:对外提供的服务的工厂类
EmailWork、PushNotificationWork、LoginLogWork...:实现AsyncWork接口,定义消息的具体处理方式
1 public class AsyncWorkProducer { 2 //ConnectionFactory :连接工厂,JMS 用它创建连接 3 private ConnectionFactory connectionFactory; 4 private String queueName = "QueueName"; 5 public AsyncWorkProducer(String queueName){ 6 this.queueName = queueName; 7 init(); 8 } 9 private void init(){ 10 connectionFactory = new ActiveMQConnectionFactory( 11 ActiveMQConnection.DEFAULT_USER, 12 ActiveMQConnection.DEFAULT_PASSWORD, 13 SystemConfiguration.getString("asyc.location")); 14 } 15 public void sendMessage(Message message){ 16 Connection connection = null; 17 try { 18 // Connection :JMS 客户端到JMS Provider 的连接 | 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar 19 connection = connectionFactory.createConnection(); 20 //启动 21 connection.start(); 22 // Session: 一个发送或接收消息的线程 | 获取操作连接 23 Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); 24 // Destination :消息的目的地;消息发送给谁. 25 Destination destination = session.createQueue(queueName); 26 // MessageProducer:消息发送者 |得到消息生成者【发送者】 27 MessageProducer producer = session.createProducer(destination); 28 //设置不持久化,实际根据项目决定 29 producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); 30 // 发送消息到目的地方 31 producer.send(message); 32 } catch (Exception e) { 33 e.printStackTrace(); 34 }finally{ 35 try { 36 if (null != connection){ 37 connection.close(); 38 } 39 } catch (Throwable ignore) { 40 } 41 } 42 } 43 }
服务工厂类,貌似作用不大:
public class AsyncWorkFactory { private static ConcurrentHashMap<String, AsyncWorkProducer> chm = new ConcurrentHashMap<String, AsyncWorkProducer>(); private AsyncWorkFactory(){} public static AsyncWorkProducer getProducer(String queueName){ AsyncWorkProducer awp = chm.get(queueName); if(awp==null){ awp = new AsyncWorkProducer(queueName); chm.put(queueName, awp); } return awp; } public static void sendMessage(Message message,String queueName){ getProducer(queueName).sendMessage(message); } }
线程监听:
1 public class AsyncWorkConsumer implements Runnable{ 2 // ConnectionFactory :连接工厂,JMS 用它创建连接 3 private ConnectionFactory connectionFactory; 4 private AsycWork work; 5 private String queueName = "QueueName"; 6 public AsyncWorkConsumer(String queueName,AsycWork work){ 7 this.queueName = queueName; 8 this.work = work; 9 init(); 10 } 11 private void init(){ 12 connectionFactory = new ActiveMQConnectionFactory( 13 ActiveMQConnection.DEFAULT_USER, 14 ActiveMQConnection.DEFAULT_PASSWORD, 15 SystemConfiguration.getString("asyc.location")); 16 } 17 @Override 18 public void run() { 19 Connection connection = null; 20 try { 21 // Connection :JMS 客户端到JMS Provider 的连接 | 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar 22 connection = connectionFactory.createConnection(); 23 connection.start(); 24 // Session: 一个发送或接收消息的线程 | 获取操作连接 25 Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE); 26 // Destination :消息的目的地;消息发送给谁. 27 Destination destination = session.createQueue(queueName); 28 // MessageProducer:消息发送者 |得到消息生成者【发送者】 29 MessageConsumer consumer = session.createConsumer(destination); 30 //设置不持久化,实际根据项目决定 31 while (true) { 32 //可设置接收者接收消息的时间 consumer.recevie(xxx) 33 Message message = consumer.receive(); 34 work.execute(message); 35 } 36 } catch (Exception e) { 37 e.printStackTrace(); 38 }finally{ 39 try { 40 if (null != connection){ 41 connection.close(); 42 } 43 } catch (Throwable ignore) { 44 } 45 } 46 } 47 }
回调处理:
1 public class EmailWorker implements AsycWork { 2 private static Logger log = LoggerFactory.getLogger(EmailWorker.class); 3 @Override 4 public void execute(Message message) { 5 ActiveMQMapMessage msg = (ActiveMQMapMessage) message; 6 try { 7 String address = msg.getString("address"); 8 String title = msg.getString("title"); 9 String content = msg.getString("content"); 10 Constants.sendMail(address, title, content); 11 } catch (JMSException e) { 12 log.error("异步邮件发送异常", e); 13 } 14 } 15 }
项目启动时执行如下代码启动线程:
1 Thread emailThread = new Thread(new AsyncWorkConsumer(AsycWork.EMAIL,emailWorker)); 2 emailThread.setDaemon(true); 3 emailThread.start(); 4 //启动线程绑定各种回调 5 Thread normalLogThread = new Thread(new AsyncWorkConsumer(AsycWork.NORMAL_LOG,normalLogWork)); 6 normalLogThread.setDaemon(true); 7 normalLogThread.start(); 8 Thread loginLogThread = new Thread(new AsyncWorkConsumer(AsycWork.LOGIN_LOG,loginLogWorker)); 9 loginLogThread.setDaemon(true); 10 loginLogThread.start();
调用异步的工具类:
1 public class AsyncUtils { 2 private static Logger log = LoggerFactory.getLogger(AsyncUtils.class); 3 public static void log(String type,String operate){ 4 if(!SystemConfigFromDB.getBoolean(SystemConfigFromDB.NEED_NORMAL_LOG)){ 5 return; 6 } 7 try{ 8 User user = (User) SecurityUtils.getSubject().getSession().getAttribute("loginUser"); 9 if(user==null){ 10 return; 11 } 12 OperateLog log = new OperateLog(user.getId(), user.getName(), operate,type, user.getLastLoginIp()); 13 ActiveMQObjectMessage message = new ActiveMQObjectMessage(); 14 message.setObject(log); 15 AsyncWorkFactory.sendMessage(message, AsycWork.NORMAL_LOG); 16 }catch (Exception e) { 17 log.error("日志记录出错!", e); 18 } 19 } 20 public static void sendMail(String address,String title,String content){ 21 if(!SystemConfigFromDB.getBoolean(SystemConfigFromDB.NEED_SEND_MAIL)){ 22 return; 23 } 24 try{ 25 ActiveMQMapMessage message = new ActiveMQMapMessage(); 26 message.setString("address", address); 27 message.setString("title", title); 28 message.setString("content", content); 29 AsyncWorkFactory.sendMessage(message, AsycWork.EMAIL); 30 }catch (Exception e) { 31 log.error("邮件发送出错!",e); 32 } 33 } 34 public static void loginLog(String uid,String ip,Date date){ 35 if(!SystemConfigFromDB.getBoolean(SystemConfigFromDB.NEED_LOG_CLIENTUSER_LOGINLOG)){ 36 return; 37 } 38 try{ 39 ActiveMQMapMessage message = new ActiveMQMapMessage(); 40 message.setString("uid", uid); 41 message.setString("ip", ip); 42 message.setString("date", DateUtils.formatDateTime(date, "yyyy-MM-dd HH:mm:ss")); 43 AsyncWorkFactory.sendMessage(message, AsycWork.LOGIN_LOG); 44 }catch (Exception e) { 45 log.error("邮件发送出错!",e); 46 } 47 } 48 }
在需要异步处理的地方执行类似如下代码:
AsyncUtils.sendMail("xxx@xxx.com", "邮件标题", "邮件内容");//异步发送邮件
这样就可以执行异步操作了。
适用于
异步系统适用于与主要业务逻辑无关的较耗时或不需要同步操作的,失败时不影响主业务逻辑的功能点:
比如:1.在用户注册的时候记录数据做后期统计、发送注册成功邮件等
2.系统操作的日志记录
3.iOS消息推送
4.发送短信
...
在使用异步系统之前,用户注册与注册日志记录是在同一个事务完成的,用户注册失败则不会记录日志,但同时,日志记录发生异常也会引起用户注册失败,日志记录本身是与用户注册这个逻辑不相关的工作,在日志发生异常的时候不应该使用户注册失败。
在使用异步系统之后,用户注册逻辑执行结束后,调用异步的注册日志记录与异步的注册邮件发送功能即可,不用等待日志记录与邮件发送的返回,即可直接返回用户注册成功。将日志与邮件异步处理,既提高了响应速度也使逻辑更加严谨。在发生异常的时候,消息队列会将消息继续保留,留待后续处理。
PS:本文的实现方式大部分为自己摸索的,之前没有接触过类似的模块,所以有些地方都是按照自己的理解处理的,通用的异步系统是不是这种结构本人不是太了解,欢迎交流。后面会介绍一下最新的实现方式,修改为了基于Spring管理的异步系统,将ActiveMQ丢给了Spring,依靠Spring发送与监听消息,相比这个可能会更靠谱一点。