zoukankan      html  css  js  c++  java
  • SpringBoot 消费NSQ消息

    使用监听器,来实现实时消费nsq的消息

    一、目前spring boot中支持的事件类型如下

    1. ApplicationFailedEvent:该事件为spring boot启动失败时的操作

    2. ApplicationPreparedEvent:上下文context准备时触发

    3. ApplicationReadyEvent:上下文已经准备完毕的时候触发

    4. ApplicationStartedEvent:spring boot 启动监听类

    5. SpringApplicationEvent:获取SpringApplication

    6. ApplicationEnvironmentPreparedEvent:环境事先准备

    二、这里我使用的是监听ApplicationReadyEvent事件,实现ApplicationListener<ApplicationReadyEvent>接口

    package com.device.nsq.Receiver;
    
    import java.util.concurrent.Executor;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.context.event.ApplicationReadyEvent;
    import org.springframework.context.ApplicationListener;
    import org.springframework.context.ConfigurableApplicationContext;import org.springframework.stereotype.Component;
    import com.github.brainlag.nsq.NSQConsumer;
    import com.github.brainlag.nsq.lookup.DefaultNSQLookup;
    import com.github.brainlag.nsq.lookup.NSQLookup;
    
    /**
     * nsq监听消息
     * 
     * @author joey
     *
     */
    @Component
    public class NsqMessageReceiver implements ApplicationListener<ApplicationReadyEvent> {
        private Logger logger = LoggerFactory.getLogger(NsqMessageReceiver.class);/**
         * 监听nsq消息
         */
        @Override
        public void onApplicationEvent(ApplicationReadyEvent event) {
            NSQLookup lookup = new DefaultNSQLookup();
            Executor executor = Executors.newFixedThreadPool(20);        lookup.addLookupAddress(127.0.0.1, 4150);// 监听topicname的topic
            NSQConsumer registerConsumer = new NSQConsumer(lookup, "topicname", "channel",
                    (message) -> {
                        logger.info("收到消息:" + new String(message.getMessage()));
                        message.finished();
                    });
            registerConsumer.setExecutor(executor);
            registerConsumer.start();
      }
    }

    三、通过SpringApplication类中的addListeners方法将自定义的监听器注册进去

    package com.device;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import com.device.nsq.Receiver.NsqMessageReceiver;
    
    @SpringBootApplication
    public class Application {
        public static void main(String[] args) {
            SpringApplication application = new SpringApplication(Application.class);
            application.addListeners(new NsqMessageReceiver());
            application.run(args);
        }
    }

    启动,向nsq的topicname发送消息,程序会自动进行消费

  • 相关阅读:
    文件读写
    使用HttpClient实现文件的上传下载
    TreeMap
    Linux的目录结构与文件权限
    Hibernate中get()和load()方法的区别
    Hibernate中openSession()与getCurrentSession()的区别与联系
    Hibernate核心类和接口
    Hibernate连接数据库
    Struts2中OGNL表达式的用法
    Struts2中Result的配置
  • 原文地址:https://www.cnblogs.com/JoeyWong/p/9227789.html
Copyright © 2011-2022 走看看