webservlet +redis 的消息发布订阅 ,挺好的
当请求到来,向redis server申请一个频道 ,然后等着另一端架设是B 处理完毕获得到处理信息调用redis ,使用redis 往当前申请的频道号 发送消息,在接收者C此时会收到一个事件的方式处理结果.
redis.subscribe(subscriber, channel); //通过subscribe 的api去订阅,入参是订阅者和频道名
注意事项:
webservlet 启动异步线程有个timeout 超时事件
AsyncContext asyncContext = request.startAsync(); asyncContext.setTimeout(3000);
asyncContext.addListener(new AppAsyncListener(subscriber)); //通过AppAsyncListener 接收超时事件
在超时之后返回事件中可以返回超时的信息给请求端,必须要做关闭链接,异步线程结束 的操作,因为我的项目中订阅了redis消息.在此直关闭链接和关闭异步线程,还是不够的,还需要 中间取消订阅,也就是把事件给解绑了,要不异步线程实际没结束,事件还存在,
当有个redis 消息事件过来,而此时的链接已经关闭,会出现一个bug : AsyncContext has already completed ,异步报文已经结束的bug ,实际此时,事件还是能接收到的.于是异步线程未全部退出,导致每次请求一次,多一个现在在debug内无法退出.
java.lang.IllegalStateException: The request associated with the AsyncContext has already completed processing. at org.apache.catalina.core.AsyncContextImpl.check(AsyncContextImpl.java:521) at org.apache.catalina.core.AsyncContextImpl.getResponse(AsyncContextImpl.java:245)
所以,需要在AppAsyncListener 注入subscriber 订阅者这个对象,这个对象是接收消息的事件对象,扩展了Subscriber extends JedisPubSub.只有这样才能调用unsubscriber解绑所绑定频道其实感觉就是个接触事件绑定.
请求得到之后,需要关闭频道号,关闭这次异步启用的线程,也就是让当前启动的线程退出, 如果这个异步线程未退出,那bug 简直会让你头晕......
AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(3000);
Work work = new Work(asyncContext,"3856111302");
subscriber = new Subscriber(asyncContext);
asyncContext.addListener(new AppAsyncListener(subscriber)); // 异步状态监听 类注入 redis订阅者对象subscriber,这样当异步线程出现timeout时,调用AppAsyncListener类中void onTimeout 时 可以通过subscriber 对象调用
// unSubscriber(),解除订阅订阅的频道...关闭链接, 关闭异步线程.此次请求的异步线程完美退出.
new Thread(work).start();
上边是异步线程启动的地方
下边是timeout 后再listener 中调用 suscriber类中的解绑方法
public void unSubscriber() //给timeout 之后 解除事件用 { PrintWriter out; try { out = response.getWriter(); Gson gson = new Gson(); String gsonString = gson.toJson("214"); out.println(gsonString); out.flush(); out.close(); unsubscribe("3856111302"); asyncContext.complete(); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); }
@WebListener public class AppAsyncListener implements AsyncListener { String devcmdid; Subscriber subscriber; public AppAsyncListener(){} public AppAsyncListener(Subscriber subscriber){ // this.devcmdid=devcmdid; this.subscriber=subscriber; } @Override public void onTimeout(AsyncEvent asyncEvent) throws IOException { /* ServletResponse response = asyncEvent.getAsyncContext().getResponse(); PrintWriter out = response.getWriter(); out.write("SystemMessageContents.ErrorCode.MESSAGE_REQUEST_TIMEOUT""214"); //超时214 out.flush(); out.close(); asyncEvent.getAsyncContext().complete(); System.out.println("AppAsyncListener timeout");*/ subscriber.unSubscriber(); // unsubscribe("3856111302"); }
由于异步线程启动之后加载了redis的订阅频道,在这里应该是 加载了redis 的频道事件,事件一旦加载了,这个线程是一直存在的, 它不像别的没有while的线程,运行完了线程就结束了
在此加载了事件后,线程不会出退出
RedisImpl redis = new RedisImpl(); redis.subscribe(subscriber, channel); //通过subscribe 的api去订阅,入参是订阅者和频道名
所以当收到消息后 ,必须运行 asyncContext.complete(); 对这个线程进行关闭,否则会等到 asyncContext的timeout 时间结束之后才结束.这就是我那程序里没加这句,导致每次请求完之后都要运行一次AppAsyncListener implements AsyncListener 里边的timeout事件返回消息
@Override
public void onTimeout(AsyncEvent asyncEvent) throws IOException // 后因结束异常,程序改为上边的方式
@Override public void onMessage(String channel, String message) { //收到消息会调用 ServletResponse response = asyncContext.getResponse(); PrintWriter out; try { out = response.getWriter(); Gson gson = new Gson(); String gsonString = gson.toJson("resever"); out.println(gsonString); out.flush(); out.close(); unsubscribe("3856111302"); asyncContext.complete(); } catch (IOException e) {
2:一个常识方面的bug
public class Subscriber extends JedisPubSub { private AsyncContext asyncContext; ServletResponse response = asyncContext.getResponse(); public Subscriber(AsyncContext asyncContext){ this.asyncContext=asyncContext; response = asyncContext.getResponse(); }
ServletResponse response = asyncContext.getResponse(); 这段程序 在类下边直接去获取,在类初始化时由于还未载入asyncContext,出现asyncContext为null的bug,所以此处应该如下
public class Subscriber extends JedisPubSub { private AsyncContext asyncContext; ServletResponse response =null;//= asyncContext.getResponse(); public Subscriber(AsyncContext asyncContext){ this.asyncContext=asyncContext; response = asyncContext.getResponse(); }
3:类构造方法无参未写出现初始化的错误 .init() 的bug
public class AppAsyncListener implements AsyncListener { String devcmdid; Subscriber subscriber; public AppAsyncListener(){} //后来加上这个问题解决 public AppAsyncListener(Subscriber subscriber){ // this.devcmdid=devcmdid; this.subscriber=subscriber; }
4:线程不停的循环读取redis 的值,当运行 getResource(); 时,而此时redis server未打开,出现JedisConnectionException 抛出异常,这里jedis 做的不够啊,用try catch 还抓取不到这个异常...等有空有时间做处理