zoukankan      html  css  js  c++  java
  • 【bird-java】分布式服务间的事件总线EventBus

    什么是EventBus
    EventBus是对发布-订阅模式的一种实现。其以一种非常优雅的方式实现了组件间的解耦与通信,在Android开发、DDD等领域都有非常广泛的应用。

    事件流大致如下:

    • Producer向EventBus发送事件。
    • EventBus向所有监听了该事件的Consumer推送事件。
    • 监听了该事件的Consumer消费事件。


    注:一个组件即可以是Producer,也可以是Consumer。

    分布式服务间的EventBus
    在分布式系统中,事件在服务之间的传递要比单机EventBus复杂很多。有没有一种适用于分布式服务之间的,并且事件传递就像单机一样简单的EventBus呢?在GitHub上搜索了JAVA实现的EventBus,排名前十的几乎都是用于Android或JAVA的单机事件总线。良久之后...还是自己动手吧。集群环境下的EventBus比单机版需要多考虑一些问题,比如:

    1.  服务集群部署的情况下,如何保证每个集群均可订阅该事件,且每个集群只能消费一次该事件。
    2. 如何实现一个服务内部多个`xxxService`订阅同一事件。

    解决方案:

    1. 使用`kafka`实现集群间的发布订阅(基于`topic`),同一集群处于同一个kafka的consumer-group来保证每个集群只会消费一次该事件。
    2. 服务在启动时可反射获得所有实现了`IEventHandler<TEventArg>`的类并缓存,服务消费消息时获取所有注册了该消息的handler并调用其`HandleEvent`方法。

    部分关键源码

    1、事件消息的定义

    public abstract class EventArg implements IEventArg{
    
        private Date eventTime;
    
        public EventArg(){
            eventTime = new Date();
        }
    
        public Date getEventTime() {
            return eventTime;
        }
    
        public void setEventTime(Date eventTime) {
            this.eventTime = eventTime;
        }
    }

    事件消息默认记录创建时间,可扩展其他字段,比如发送时间、标识等。

    2、使用spring-kafka发送消息

    /**
     * kafka事件注册器,向kafka队列中push消息
     */
    @Component
    public class KafkaRegister implements IEventRegister {
    
        @Autowired(required = false)
        private KafkaTemplate<String,IEventArg> kafkaTemplate;
    
        /**
         * 事件注册
         *
         * @param eventArg 事件参数
         */
        @Override
        public void regist(IEventArg eventArg) {
            kafkaTemplate.send(getTopic(eventArg),eventArg);
        }
    
        /**
         * 获取kafka的topic
         *
         *
         * @param eventArg
         * @return topic
         */
        private String getTopic(IEventArg eventArg){
            return eventArg.getClass().getName();
        }
    }

    3、消费kafka消息并执行当前服务中所有订阅了该消息的事件

    /**
     * kafka事件监听器
     */
    public class KafkaEventArgListener implements MessageListener<String,EventArg> {
    
        @Autowired
        private IEventHandlerFactory eventHandlerFactory;
    
        @Override
        public void onMessage(ConsumerRecord<String, EventArg> consumerRecord) {
            if (consumerRecord == null) return;
            EventArg value = consumerRecord.value();
    
            Set<IEventHandler> handlers = eventHandlerFactory.getHandlers(value);
            if (handlers == null) return;
            for (IEventHandler handler : handlers) {
                handler.HandleEvent(value);
            }
        }
    }

     

    EventBus的使用

    1、事件的定义。所有事件均继承于上文EventArg抽象类,示例如下:

    public class TestEventArg extends EventArg{
        private String value;
    
        public String getValue() {
            return value;
        }
    
        public void setValue(String value) {
            this.value = value;
        }
    }

    2、事件发布。示例代码:

    eventBus.push(new TestEventArg());

    3、事件订阅。一个服务发布事件之后,任何服务中的任何`xxxServiceImpl`类均可订阅该事件,实现`IEventHandler<TEventArg>`接口即可完成事件的订阅,示例如下:

    public class FormServiceImpl extends AbstractServiceImpl<Form> implements FormService,IEventHandler<TestEventArg> {
    
        @Override
        public void HandleEvent(TestEventArg eventArg) {
            System.out.println("notify zero======");
        }
    }

     整体来说,使用还是很简单的,EventBus实现与使用示例收录于bird-java项目中,项目地址:https://github.com/liuxx001/bird-java

  • 相关阅读:
    Spring + MySQL + Mybatis + Redis【二级缓存】执行流程分析
    Spring + MySQL + Mybatis + Redis【二级缓存】
    MyBatis的笔记
    Spring事务:一种编程式事务,三种声明式事务
    笔记
    mybatis-generator自定义注释生成
    做准备的笔记
    常用DOS命令和Linux命令
    数据库MongoDB查询语句--持续更新
    SpringBoot集成websocket实现后端向页面发送消息
  • 原文地址:https://www.cnblogs.com/liuyh/p/8305001.html
Copyright © 2011-2022 走看看