zoukankan      html  css  js  c++  java
  • Qt实战7.轻量级发布订阅模式

    1 需求描述

    1. 基于Qt实现发布订阅模式;
    2. 发布的消息类型可自定义;
    3. 能够支持多线程使用。

    2 设计思路

    Qt信号槽可看作的是观察者模式的一种实现,信号槽的连接需要知道信号发送者和接收者。
    但是有些情况下我们完全不需要知道发送者和接收者,发送者只管发送主题消息,接收者只管接收自己关心的主题消息,这样使发送者和接收者完全脱耦,它们之间通过一个“中间使者”进行通信,这样便实现了发布订阅模式,发送者称为发布者,接收者称为订阅者。

    一个例子,你在微博上关注了A,同时其他很多人也关注了A,那么当A发布动态的时候,微博就会为你们推送这条动态。A就是发布者,你是订阅者,微博就是调度中心,你和A是没有直接的消息往来的,全是通过微博来协调的(你的关注,A的发布动态)。

    这里设计两个类Subscriber、Publisher,即订阅者和发布者,发布者为单例类,内部维护一个订阅者列表,记录订阅者对应的主题列表,发布主题消息的时候遍历查询进行发送即可。

    3 代码实现

    3.1 发布者

    发布者为单例类,需要私有化构造、析构等函数,其对外只提供publish主题发布接口,主题消息封装为QVariant,代码如下:

    #ifndef PUBLISHER_H
    #define PUBLISHER_H
    
    #include <QObject>
    #include <QReadWriteLock>
    #include <QHash>
    #include <QSet>
    #include <QStringList>
    #include <QScopedPointer>
    
    class Subscriber;
    
    class Publisher : public QObject
    {
        Q_OBJECT
    public:
        /*!
         * rief getInstance 获取Publisher单例指针
         * 
    eturn
         */
        static Publisher *getInstance();
    
        /*!
         * rief publish 发布主题消息
         * param topic 主题名称
         * param msg 消息内容
         */
        void publish(const QString &topic, const QVariant &msg);
    
    private:
        explicit Publisher(QObject *parent = nullptr);
        ~Publisher();
        Publisher(const Publisher &other);
        Publisher& operator=(const Publisher &other);
    
        /*!
         * rief add 订阅者订阅一个主题
         * param object 订阅者对象指针
         * param topic 主题名称
         */
        void add(Subscriber *object, const QString &topic);
    
        /*!
         * rief remove 移除订阅者订阅的对应主题
         * param object 订阅者对象指针
         * param topic 主题名称
         */
        void remove(Subscriber *object, const QString &topic);
    
        /*!
         * rief remove 移除订阅者订阅的所有主题
         * param object 订阅者对象指针
         */
        void remove(Subscriber *object);
    
        /*!
         * rief getTopics 获取订阅者所订阅的主题列表
         * param object 订阅者对象指针
         * 
    eturn
         */
        QStringList getTopics(Subscriber *object);
    
        friend class Subscriber;
    
    private:
        static QReadWriteLock sm_readWriteLock;
    
        static QScopedPointer<Publisher> sm_instance;
        friend struct QScopedPointerDeleter<Publisher>;
    
        QHash<Subscriber *, QSet<QString> > m_objectTopicHash;
        QHash<QString, QVariant> m_topicLastMsgHash;
    };
    
    #endif // PUBLISHER_H
    
    #include "Publisher.h"
    #include "Subscriber.h"
    
    #include <QDebug>
    #include <QReadLocker>
    #include <QWriteLocker>
    
    QScopedPointer<Publisher> Publisher::sm_instance;
    QReadWriteLock Publisher::sm_readWriteLock;
    Publisher::Publisher(QObject *parent) : QObject(parent)
    {
    }
    
    Publisher::~Publisher()
    {
    }
    
    void Publisher::add(Subscriber *object, const QString &topic)
    {
        QWriteLocker locker(&sm_readWriteLock);
    
        if (m_objectTopicHash.keys().contains(object)) {
            auto it = m_objectTopicHash.find(object);
            it.value().insert(topic);
        } else {
            QSet<QString> set = {topic};
            m_objectTopicHash.insert(object, set);
    
            connect(object, &QObject::destroyed, [=]() {
                remove(object);
            });
        }
          
        //订阅后将自动发送最后一次主题消息
        if (m_topicLastMsgHash.keys().contains(topic)) {
            QMetaObject::invokeMethod(object, "topicUpdated", Qt::QueuedConnection,
                                      Q_ARG(QString, topic), Q_ARG(QVariant, m_topicLastMsgHash.value(topic)));
        }
    }
    
    void Publisher::remove(Subscriber *object, const QString &topic)
    {
        QWriteLocker locker(&sm_readWriteLock);
    
        if (m_objectTopicHash.keys().contains(object)) {
            auto it = m_objectTopicHash.find(object);
            it.value().remove(topic);
        }
    }
    
    void Publisher::remove(Subscriber *object)
    {
        QWriteLocker locker(&sm_readWriteLock);
    
        if (m_objectTopicHash.keys().contains(object)) {
            m_objectTopicHash.remove(object);
        }
    }
    
    QStringList Publisher::getTopics(Subscriber *object)
    {
        QReadLocker locker(&sm_readWriteLock);
    
        if (m_objectTopicHash.keys().contains(object)) {
            return QStringList::fromSet(m_objectTopicHash.value(object));
        }
    
        return QStringList();
    }
    
    Publisher *Publisher::getInstance()
    {
        if (sm_instance.isNull()) {
    
            sm_readWriteLock.lockForWrite();
            if (sm_instance.isNull()) {
                sm_instance.reset(new Publisher);
            }
            sm_readWriteLock.unlock();
        }
    
        return sm_instance.data();
    }
    
    void Publisher::publish(const QString &topic, const QVariant &msg)
    {
        QReadLocker locker(&sm_readWriteLock);
    
        auto it = m_objectTopicHash.constBegin();
        while (it != m_objectTopicHash.constEnd()) {
            if (it.value().contains(topic)) {
                QMetaObject::invokeMethod(it.key(), "topicUpdated", Qt::QueuedConnection,
                                          Q_ARG(QString, topic), Q_ARG(QVariant, msg));
                m_topicLastMsgHash.insert(topic, msg);
                ++it;
            }
        }
    }
    

    发布者做了一些处理,m_topicLastMsgHash用于缓存最后一次发送的主题消息,使刚订阅的订阅者能够获取到最新的主题消息。

    3.2 订阅者

    订阅者接口很简单,主要有订阅主题、取消主题订阅、取消所有订阅,代码如下:

    #ifndef SUBSCRIBER_H
    #define SUBSCRIBER_H
    
    #include <QObject>
    #include <QStringList>
    
    class Subscriber : public QObject
    {
        Q_OBJECT
    public:
        explicit Subscriber(QObject *parent = nullptr);
    
        /*!
         * rief subscribe 订阅主题
         * param topic 主题名称
         */
        void subscribe(const QString &topic);
    
        /*!
         * rief unSubscribe 取消订阅
         * param topic 主题名称
         */
        void unSubscribe(const QString &topic);
    
        /*!
         * rief clearSubscribedTopics 取消所有已订阅主题
         */
        void clearSubscribedTopics();
    
        /*!
         * rief topics 获取已订阅的主题列表
         * 
    eturn 主题列表
         */
        QStringList topics();
    
    signals:
        /*!
         * rief topicUpdated 主题消息更新信号
         * param topic 主题名称
         * param var 消息内容
         */
        void topicUpdated(const QString &topic, const QVariant &msg);
    };
    
    #endif // SUBSCRIBER_H
    
    #include "Subscriber.h"
    #include "Publisher.h"
    
    #include <QDebug>
    
    Subscriber::Subscriber(QObject *parent) : QObject(parent)
    {
    }
    
    void Subscriber::subscribe(const QString &topic)
    {
        Publisher *publiser = Publisher::getInstance();
        publiser->add(this, topic);
    }
    
    void Subscriber::unSubscribe(const QString &topic)
    {
        Publisher *publiser = Publisher::getInstance();
        publiser->remove(this, topic);
    }
    
    void Subscriber::clearSubscribedTopics()
    {
        Publisher *publiser = Publisher::getInstance();
        publiser->remove(this);
    }
    
    QStringList Subscriber::topics()
    {
        Publisher *publiser = Publisher::getInstance();
        return publiser->getTopics(this);
    }
    

    到此,发布者通过publish接口发送主题消息,订阅者通过topicUpdated信号接收主题消息,订阅者Subscriber可声明为自定义类的成员或直接继承,使用起来非常简单。

    4 总结

    对于设计模式,一般来说都是经验总结,是解决一类问题的“套路”,既然是经验总结,那么一定具有实用性。设计模式其实也是源于生活,只是程序员用代码的方式给实现了,这个叫面向对象(生活)编程,不要觉得这概念很悬乎,其实就是把生活中的逻辑移植到代码的世界,设计模式让代码结构更加清晰,也更便于后期的维护。当然,设计模式也不要滥用,不然就是瞎搞,杀鸡焉用牛刀,想想还真是这个道理。

    划重点了,在多线程复杂场景下,通过QVariant封装自定义结构体类型时使用队列方式处理信号槽可能会转换异常,原因可能是信号过快,而槽函数又处理的慢,临时解决办法是publish中调用方式QueuedConnection改为DirectConnection,且将读锁改为写锁,这样效率会有所下降,但是更加稳定。终极办法还是让槽函数执行快点吧,最好不要有互斥锁在里面,这样效率和稳定性兼顾。

  • 相关阅读:
    Use Study Groups to Support Learning
    “开闭”原则(OpenClosed principle, OCP)
    我的E72i 开发
    conlution of daily work
    appstore相关查询链接
    sqlite3.0不支持的sql属性
    iOS sdk 运行时函数
    自动化测试部分
    ios下获取mac地址修正版
    修改mac os host
  • 原文地址:https://www.cnblogs.com/luoxiang/p/13592134.html
Copyright © 2011-2022 走看看