zoukankan      html  css  js  c++  java
  • 发布订阅-jedis

    ~~~~~~~~~~~~~~~~分割线,正文可直接跳过~~~~~~~~~~~~~~~~

      前段时间做二维码支付项目的时候,扫码枪扫到二维码后需要给用户反馈支付结果,属于双向通信。双向通信常用的手段是客户端轮询和websocet,考虑到用户体验和服务端的压力,最终选择了websocket。由于服务是多节点,websocket只会和其中一个节点保持长连接,远程服务扫描到支付码需要调用支付服务的扣款接口,根据扣款的成功与否给用户通知,支付服务需要发通知的时候并不知道要给哪个长连接发送。

      解决方案是每个节点保存一份websocket的Session,断开连接时清除Session,服务端需要发送通知时哪个节点有此用户的Session就说明客户端是跟此节点保持的长连接。这里判断哪个节点含有用户的Session就需要检查所有的服务节点,此时就引入了发布/订阅,不管远程服务调用了支付服务的哪个节点,都去检查一遍所有的服务节点。

      下图表示远程服务调用支付服务后的流程示意图:

    ~~~~~~~~~~~~~~~~~~~~~~分割线~~~~~~~~~~~~~~~~~~~

    订阅频道:

    JedisPool jedisPool = RedisHelper.getJedisPool();
    SubscriberThread subscriberThread = new SubscriberThread(CHANNEL_QRCODE, jedisPool);
    subscriberThread.setDaemon(true);
    subscriberThread.start();
    //订阅频道是阻塞的,需要开启一个线程
    public
    class SubscriberThread extends Thread { private Subscriber subscriber = new Subscriber(); private String channel; private JedisPool jedisPool; public SubscriberThread(String channel, JedisPool jedisPool) { this.channel = channel; this.jedisPool = jedisPool; } @Override public void run() {while(true){ Jedis redis = this.jedisPool.getResource(); try{ log.info(">>>>>>>>>>订阅频道:{}", channel); redis.subscribe(subscriber, channel); }catch(Exception e){ e.printStackTrace(); } } } }

    发送消息:

    public class PublisherBuilder {
    private String channel;
        private JedisPool jedisPool;
    
        public PublisherBuilder(String channel, JedisPool jedisPool) {
            this.channel = channel;
            this.jedisPool = jedisPool;
        }
    
        public void publisher(String message) {
            log.info("publisher==>message:{}", message);
    
            Jedis jedis = null;
            try {
                jedis = jedisPool.getResource();
                jedis.publish(channel, message);
                log.info("成功发布消息:{}", message);
            } catch (Exception e) {
                e.printStackTrace();
                log.error("发布消息({})出错了, channel:", message, channel);
            } finally {
                if (jedis != null) {
                    jedis.close();
                }
            }
        }
    
    }

    订阅消息:

    public class Subscriber extends JedisPubSub {
    public Subscriber() {
        }
    
        @Override
        public void onMessage(String channel, String message) {       
         //收到消息会调用 log.info(String.format("收到订阅消息, channel %s, message %s", channel, message)); } @Override public void onSubscribe(String channel, int subscribedChannels) {
    //订阅了频道会调用 log.info(">>>>>>>>>频道 {} 订阅成功:{}<<<<<<<<<<<", channel, subscribedChannels); } @Override public void onUnsubscribe(String channel, int subscribedChannels) {
    //取消订阅 会调用 log.info(">>>>>>>>>取消频道 {} 订阅成功: {}<<<<<<<<<<<", channel, subscribedChannels); } }
    “小智”和“佩奇”是同班同学,佩奇同学非常乐于思考,经常提问题,小智同学,聪明绝顶,乐于解答问题。
  • 相关阅读:
    Tomcat启动startup.bat闪退和JRE_HOME错误
    页面布局:一侧固定宽度,一侧自适应
    iOS-数据持久化-CoreData
    iOS-数据持久化-SQlite3
    iOS-数据持久化-偏好设置
    iOS-数据持久化-对象归档
    iOS-数据持久化-属性列表
    iOS-数据持久化基础-沙盒机制
    iOS-数据持久化详细介绍
    iOS-网络处理框架AFN
  • 原文地址:https://www.cnblogs.com/liyefei/p/13144766.html
Copyright © 2011-2022 走看看