zoukankan      html  css  js  c++  java
  • keycloak~扩展事件机制,集成kafka中间件

    对于KC的后台或者接口的操作,当用户,组,角色这些实体状态发生改变时,KC会对外发布事件,而这些事件处理程序我们是可以在后台配置的,默认继承了jboss-logging日志事件,而我们可以在事件管理中去配置自己的事件处理程序。

    事件处理程序SPI

    实现EventListenerProviderFactoryEventListenerProvider这两个SPI即可,我们在这里可以订阅由KC发出现的事件,针对我们感兴趣的事件,去添加处理代码。

    /**
         * 后端管理平台事件
         *
         * @param adminEvent
         * @param b
         */
        @Override
        public void onEvent(AdminEvent adminEvent, boolean b) {
          logger.info(adminEvent.getResourceTypeAsString()))
        }
    

    总结常用的消息类型

    • 操作类型:operationType
      • 后端类型
        • CREATE 新建
        • UPDATE 编辑
        • DELETE 删除
      • 后台资源类型:resourceType
        • REALM_ROLE 域的角色
        • REALM_ROLE_MAPPING 域的角色绑定
        • USER 用户
        • GROUP 组
        • GROUP_MEMBERSHIP 组绑定
        • CLIENT_ROLE 客户端角色
        • CLIENT_ROLE_MAPPING 客户端角色绑定
      • 前端类型
        • LOGIN 登录
        • LOGIN_ERROR 登录失败
        • REGISTER 注册
        • REGISTER_ERROR 注册失败
        • LOGOUT 登出
        • LOGOUT_ERROR 登出失败

    扩展一个KAFKA

    • 需要扩展相关组件
    <dependencies>
         <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.6.0</version>
    </dependency>
    
    • 需要配置你的kafka的生产者
      kc这边是一个kafka的生产者,当它收到由KC发出来的事件之后,把消息发到kafka,其它服务方可以消费这个kafka消息
    /**
     * kafka生产者.
     */
    public class Producer {
    
        private final static String BOOTSTRAP_SERVER = ConfigFactory.getInstance().getStrPropertyValue("kafka.host");
        private final static Logger logger = Logger.getLogger(Producer.class);
        private static KafkaProducer<String, String> producer;
    
        private static KafkaProducer<String, String> getProducer() {
            if (producer == null) {
                //reset thread context
                resetThreadContext();
                // create the producer
                producer = new KafkaProducer<String, String>(getProperties());
            }
            return producer;
        }
    
        public static void publishEvent(String topic, String value) {
    
            // create a producer record
            ProducerRecord<String, String> eventRecord =
                    new ProducerRecord<String, String>(topic, value);
    
            // send data - asynchronous
            getProducer().send(eventRecord, new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e != null) {
                        e.printStackTrace();
                    } else {
                        logger.info(String.format("The offset of the record we just sent is:%s", recordMetadata.offset()));
                    }
                }
            });
    
        }
    
        private static void resetThreadContext() {
            Thread.currentThread().setContextClassLoader(null);
        }
    
        public static Properties getProperties() {
            Properties properties = new Properties();
            properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVER);
            properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            properties.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "16384");
            return properties;
        }
    
    
    }
    

    最后,在你的kc事件处理SPI里,调用咱们的KAFKA生产者去生产消息就行了

  • 相关阅读:
    初识PL/SQL
    PL/SQL基本语法
    Oracle命令备忘
    工厂模式之二 工厂方法(Factory Method)
    XMLHttpRequest 原始AJAX初步
    DOM元素的innerHTML属性
    如果用JavaScript获取标准下拉框的"选中值"和"选中文本"
    工厂模式之三 抽象工厂(Abstract Factory)模式
    JavaScript中的动态参数
    JavaScript中的闭包初探
  • 原文地址:https://www.cnblogs.com/lori/p/15235724.html
Copyright © 2011-2022 走看看