项目需要用到nsq,并且是单点的,网上看到的springboot整合nsq都是先连接lookup后,从lookup获取nsqd的连接信息。由于本项目用到了docker,映射的端口都不是原始端口了。而lookup分发的连接nsqd还是默认的4150端口。所以是连不通的。这时候需要直连nsqd,撇开lookup。
查看官网 https://nsq.io/clients/client_libraries.html

由于要用到认证,所以只能选择 nsq-j 。翻看点进去的文档。有个名字是直连消费者的类: DirectSubscriber
看api以及父类的api,感觉可用。
加入依赖包:
<dependency>
<groupId>com.sproutsocial</groupId>
<artifactId>nsq-j</artifactId>
<version>1.0</version>
</dependency>
import com.sproutsocial.nsq.DirectSubscriber;
import com.sproutsocial.nsq.MessageDataHandler;
import com.sproutsocial.nsq.Subscriber;
import com.tslsmart.big.base.util.DateUtils;
import com.tslsmart.big.device.constance.NsqConstance;
import com.tslsmart.big.device.dao.entity.WatchManTaskLog;
import com.tslsmart.big.device.dao.mapper.WatchManTaskLogMapper;
import com.tslsmart.big.device.protobuffer.response.Response;
import com.tslsmart.sz.nsq.properties.NSQCloudProperties;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import java.util.Objects;
@Component
@Slf4j
public class WatchManNsqConsume implements ApplicationRunner {
@Autowired
private NSQCloudProperties nsqCloudProperties;
@Autowired
private TaskLogMapper taskLogMapper;
@Override
public void run(ApplicationArguments applicationArguments) throws InterruptedException {
Subscriber directSubscriber = null;
while(Objects.isNull(directSubscriber) || directSubscriber.getConnectionCount() == 0){
Thread.sleep(500);
log.info("==========="+DateUtils.getNowDateTime()+"启动消费者=======================");
String serverAddrs = nsqCloudProperties.getServerAddrs();
if(StringUtils.isNotEmpty(serverAddrs)){
directSubscriber = new DirectSubscriber(2, serverAddrs.split(","));
directSubscriber.subscribe(NsqConstance.TSL_DATA_RES, "server_data_receive", new MessageDataHandler() {
@Override
public void accept(byte[] bytes) {
log.info("==========="+DateUtils.getNowDateTime()+"同步空间数据给平台==接收消息====start");
Response.Res_Detail builder = null;
try {
builder = Response.Res_Detail.parseFrom(bytes);
int status = builder.getStatus();
TaskLog taskLog = new TaskLog();
taskLog.setTasklogid(builder.getSeqNo());
if(1 == status){
taskLog.setHandleresult(true);
}else{
taskLog.setErrmsg(builder.getErrMsg());
}
TaskLogMapper.updateByPrimaryKey(taskLog);
log.info("==========="+DateUtils.getNowDateTime()+"同步空间数据给平台==接收消息====end=={}",
builder.getSeqNo());
} catch (Exception e) {
log.error("==========="+DateUtils.getNowDateTime()+"同步空间数据给平台出错======"+e.getMessage());
log.error("同步报错:",e);
}
}
});
//directSubscriber.stop();
log.error("==========="+DateUtils.getNowDateTime()+" directSubscriber.stop()======:"+directSubscriber
.getConnectionCount());
}
}
}
}
测试可用。