使用监听器,来实现实时消费nsq的消息
一、目前spring boot中支持的事件类型如下
-
ApplicationFailedEvent:该事件为spring boot启动失败时的操作
-
ApplicationPreparedEvent:上下文context准备时触发
-
ApplicationReadyEvent:上下文已经准备完毕的时候触发
-
ApplicationStartedEvent:spring boot 启动监听类
-
SpringApplicationEvent:获取SpringApplication
-
ApplicationEnvironmentPreparedEvent:环境事先准备
二、这里我使用的是监听ApplicationReadyEvent事件,实现ApplicationListener<ApplicationReadyEvent>接口
package com.device.nsq.Receiver; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.context.event.ApplicationReadyEvent; import org.springframework.context.ApplicationListener; import org.springframework.context.ConfigurableApplicationContext;import org.springframework.stereotype.Component; import com.github.brainlag.nsq.NSQConsumer; import com.github.brainlag.nsq.lookup.DefaultNSQLookup; import com.github.brainlag.nsq.lookup.NSQLookup; /** * nsq监听消息 * * @author joey * */ @Component public class NsqMessageReceiver implements ApplicationListener<ApplicationReadyEvent> { private Logger logger = LoggerFactory.getLogger(NsqMessageReceiver.class);/** * 监听nsq消息 */ @Override public void onApplicationEvent(ApplicationReadyEvent event) { NSQLookup lookup = new DefaultNSQLookup(); Executor executor = Executors.newFixedThreadPool(20); lookup.addLookupAddress(127.0.0.1, 4150);// 监听topicname的topic NSQConsumer registerConsumer = new NSQConsumer(lookup, "topicname", "channel", (message) -> { logger.info("收到消息:" + new String(message.getMessage())); message.finished(); }); registerConsumer.setExecutor(executor); registerConsumer.start(); } }
三、通过SpringApplication类中的addListeners方法将自定义的监听器注册进去
package com.device; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import com.device.nsq.Receiver.NsqMessageReceiver; @SpringBootApplication public class Application { public static void main(String[] args) { SpringApplication application = new SpringApplication(Application.class); application.addListeners(new NsqMessageReceiver()); application.run(args); } }
启动,向nsq的topicname发送消息,程序会自动进行消费