zoukankan      html  css  js  c++  java
  • Go基于观察者模式实现的订阅/发布

    面UCloud的时候问到了这题,下来看了一下是基于观察者模式实现的,仅作记录

    /**
     * @Author: lzw5399
     * @Date: 2021/5/20 20:38
     * @Desc: 基于观察者模式实现的订阅发布
     */
    package main
    
    import (
    	"errors"
    	"fmt"
    	"sync"
    )
    
    func main() {
    	server := NewServer()
    	server.Subscribe("CCTV1", NewBasicClient("张三"))
    	server.Subscribe("CCTV2", NewBasicClient("张三"))
    	server.Subscribe("CCTV1", NewBasicClient("李四"))
    
    	server.Publish("CCTV1", "开播辣!")
    }
    
    // ---server---
    type Server struct {
    	Topics map[string]*Topic  // k=topicName v=topic
    	sync.RWMutex
    }
    
    func NewServer() *Server {
    	return &Server{
    		Topics:  make(map[string]*Topic),
    		RWMutex: sync.RWMutex{},
    	}
    }
    
    func (s *Server) Subscribe(topicName string, client Client) {
    	s.RLock()
    	topic, exist := s.Topics[topicName]
    	s.RUnlock()
    
    	// 存在topic直接添加client
    	if exist {
    		topic.AddClient(client)
    		return
    	}
    
    	// 不存在topic,新建topic,在添加client
    	topic = NewTopic(topicName)
    	topic.AddClient(client)
    	s.Lock()
    	s.Topics[topicName] = topic
    	s.Unlock()
    }
    
    func (s *Server) Publish(topicName string, message string) error {
    	s.RLock()
    	topic, exist := s.Topics[topicName]
    	s.RUnlock()
    
    	if exist {
    		topic.Notify(message)
    		return nil
    	}
    
    	return errors.New("当前topic不存在")
    }
    
    // ---end server---
    
    // ---topic---
    type Topic struct {
    	Name    string
    	clients map[string]Client  // k=clientName v=client
    	sync.RWMutex
    }
    
    func NewTopic(name string) *Topic {
    	return &Topic{
    		Name:    name,
    		clients: make(map[string]Client),
    		RWMutex: sync.RWMutex{},
    	}
    }
    
    func (t *Topic) AddClient(client Client) {
    	// 只添加存在的
    	if _, exist := t.clients[client.Name()]; !exist {
    		t.Lock()
    		t.clients[client.Name()] = client
    		t.Unlock()
    	}
    }
    
    func (t *Topic) Notify(message string) {
    	for _, v := range t.clients {
    		v.ConsumeCallback(message)
    	}
    }
    
    // ---end topic---
    
    // ---client---
    type Client interface {
    	Name() string
    	ConsumeCallback(message string)
    }
    
    type BasicClient struct {
    	name string
    }
    
    func NewBasicClient(name string) *BasicClient {
    	return &BasicClient{
    		name: name,
    	}
    }
    
    func (b *BasicClient) Name() string {
    	return b.name
    }
    
    func (b *BasicClient) ConsumeCallback(message string) {
    	fmt.Printf("当前Client: %s, 接收到的消息为: %s
    ", b.Name(), message)
    }
    
    // ---end client---
    
  • 相关阅读:
    mysql 单表查询
    socket 阻塞,同步、I/O模型
    I/O复用
    send函数和recv函数
    linux网络编程、系统编程
    TCP三次握手、四次挥手
    支持中文的PHP按字符串长度分割成数组代码
    php json_encode 处理中文
    php base64各种上传
    本地创建分支
  • 原文地址:https://www.cnblogs.com/baoshu/p/14791403.html
Copyright © 2011-2022 走看看