zoukankan      html  css  js  c++  java
  • springboot项目整合nsq,消费者直连nsqd

        项目需要用到nsq,并且是单点的,网上看到的springboot整合nsq都是先连接lookup后,从lookup获取nsqd的连接信息。由于本项目用到了docker,映射的端口都不是原始端口了。而lookup分发的连接nsqd还是默认的4150端口。所以是连不通的。这时候需要直连nsqd,撇开lookup。

        查看官网  https://nsq.io/clients/client_libraries.html

        由于要用到认证,所以只能选择   nsq-j   。翻看点进去的文档。有个名字是直连消费者的类:  DirectSubscriber  

        看api以及父类的api,感觉可用。

      加入依赖包:

      

     <dependency>
                <groupId>com.sproutsocial</groupId>
                <artifactId>nsq-j</artifactId>
                <version>1.0</version>
            </dependency>
    

      

    import com.sproutsocial.nsq.DirectSubscriber;
    import com.sproutsocial.nsq.MessageDataHandler;
    import com.sproutsocial.nsq.Subscriber;
    import com.tslsmart.big.base.util.DateUtils;
    import com.tslsmart.big.device.constance.NsqConstance;
    import com.tslsmart.big.device.dao.entity.WatchManTaskLog;
    import com.tslsmart.big.device.dao.mapper.WatchManTaskLogMapper;
    import com.tslsmart.big.device.protobuffer.response.Response;
    import com.tslsmart.sz.nsq.properties.NSQCloudProperties;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.commons.lang3.StringUtils;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.ApplicationArguments;
    import org.springframework.boot.ApplicationRunner;
    import org.springframework.stereotype.Component;
    
    import java.util.Objects;
    
    @Component
    @Slf4j
    public class WatchManNsqConsume implements ApplicationRunner {
    
        @Autowired
        private NSQCloudProperties nsqCloudProperties;
    
        @Autowired
        private TaskLogMapper taskLogMapper;
    
        @Override
        public void run(ApplicationArguments applicationArguments) throws InterruptedException {
            Subscriber directSubscriber = null;
            while(Objects.isNull(directSubscriber) || directSubscriber.getConnectionCount() == 0){
                Thread.sleep(500);
                log.info("==========="+DateUtils.getNowDateTime()+"启动消费者=======================");
                String serverAddrs = nsqCloudProperties.getServerAddrs();
                if(StringUtils.isNotEmpty(serverAddrs)){
                    directSubscriber = new DirectSubscriber(2, serverAddrs.split(","));
                    directSubscriber.subscribe(NsqConstance.TSL_DATA_RES, "server_data_receive", new MessageDataHandler() {
                        @Override
                        public void accept(byte[] bytes) {
                            log.info("==========="+DateUtils.getNowDateTime()+"同步空间数据给平台==接收消息====start");
                            Response.Res_Detail builder = null;
                            try {
                                builder = Response.Res_Detail.parseFrom(bytes);
                                int status = builder.getStatus();
                                TaskLog taskLog = new TaskLog();
                                taskLog.setTasklogid(builder.getSeqNo());
                                if(1 == status){
                                    taskLog.setHandleresult(true);
                                }else{
                                    taskLog.setErrmsg(builder.getErrMsg());
                                }
                                TaskLogMapper.updateByPrimaryKey(taskLog);
                                log.info("==========="+DateUtils.getNowDateTime()+"同步空间数据给平台==接收消息====end=={}",
                                        builder.getSeqNo());
                            } catch (Exception e) {
                                log.error("==========="+DateUtils.getNowDateTime()+"同步空间数据给平台出错======"+e.getMessage());
                                log.error("同步报错:",e);
                            }
                        }
                    });
                    //directSubscriber.stop();
                    log.error("==========="+DateUtils.getNowDateTime()+" directSubscriber.stop()======:"+directSubscriber
                            .getConnectionCount());
                }
            }
    
    
        }
    }
    

      

    测试可用。

  • 相关阅读:
    冲刺第二阶段第五天
    找水王2
    冲刺第二阶段第四天
    梦断代码阅读笔记03
    冲刺第二阶段第三天
    冲刺第二阶段第二天
    冲刺第二阶段第一天
    梦断代码阅读笔记02
    第十二周学习进度条
    找水王
  • 原文地址:https://www.cnblogs.com/fuguang/p/14262877.html
Copyright © 2011-2022 走看看