zoukankan      html  css  js  c++  java
  • redis实现发布(订阅)消息

    redis实现发布(订阅)消息

    什么是redis的发布订阅(pub/sub)?   Pub/Sub功能(means Publish, Subscribe)即发布及订阅功能。基于事件的系统中,Pub/Sub是目前广泛使用的通信模型,它采用事件作为基本的通信机制,提供大规模系统所要求的松散耦合的交互模式:订阅者(如客户端)以事件订阅的方式表达出它有兴趣接收的一个事件或一类事件;发布者(如服务器)可将订阅者感兴趣的事件随时通知相关订阅者。熟悉设计模式的朋友应该了解这与23种设计模式中的观察者模式极为相似。 

    看到发布订阅的特性,用来做一个简单的实时聊天系统再适合不过了。这是其中之一,当然这样的东西,我们开发中很少涉及到。再举一个常用的,在我们的分布式架构中,常常会遇到读写分离的场景,在写入的过程中,就可以使用redis发布订阅,使得写入值及时发布到各个读的程序中,就保证数据的完整一致性。再比如,在一个博客网站中,有100个粉丝订阅了你,当你发布新文章,就可以推送消息给粉丝们拉。总之场景很多,需要去挖掘。。

    Redis的pub/sub是一种消息通信模式,主要的目的是解除消息发布者和消息订阅者之间的耦合,  Redis作为一个pub/sub的server, 在订阅者和发布者之间起到了消息路由的功能。

    Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。

    Redis 客户端可以订阅任意数量的频道。

    当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的所有客户端。

    // *************************************************************************** }
    //
    // Delphi REDIS Client
    //
    // Copyright (c) 2015-2017 Daniele Teti
    //
    // https://github.com/danieleteti/delphiredisclient
    //
    // ***************************************************************************
    //
    // Licensed under the Apache License, Version 2.0 (the "License");
    // you may not use this file except in compliance with the License.
    // You may obtain a copy of the License at
    //
    // http://www.apache.org/licenses/LICENSE-2.0
    //
    // Unless required by applicable law or agreed to in writing, software
    // distributed under the License is distributed on an "AS IS" BASIS,
    // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    // See the License for the specific language governing permissions and
    // limitations under the License.
    //
    // ***************************************************************************
    
    unit MainForm;
    
    interface
    
    uses
      Winapi.Windows, Winapi.Messages, System.SysUtils, System.Variants,
      System.Classes, Vcl.Graphics,
      Vcl.Controls, Vcl.Forms, Vcl.Dialogs, redis.client, redis.commons,
      redis.netlib.indy, System.threading,
      Vcl.StdCtrls;
    
    type
      TForm2 = class(TForm)
        Memo1: TMemo;
        Edit2: TEdit;
        Label1: TLabel;
        Button1: TButton;
        procedure FormCreate(Sender: TObject);
        procedure Edit2KeyUp(Sender: TObject; var Key: Word; Shift: TShiftState);
        procedure FormClose(Sender: TObject; var Action: TCloseAction);
        procedure Button1Click(Sender: TObject);
      private
        _redis: IRedisClient;
        FTask: ITask;
        FClosing: Boolean;
        procedure SendChatMessage;
        procedure OnMessage(const ANickName, AMessage: string);
        { Private declarations }
      public
        { Public declarations }
      end;
    
    var
      Form2: TForm2;
    
    implementation
    
    uses System.json, ShellAPI;
    {$R *.dfm}
    
    
    procedure TForm2.Button1Click(Sender: TObject);
    begin
      ShellExecute(0, pchar('open'), pchar(Application.ExeName), nil, nil, SW_SHOW);
    end;
    
    procedure TForm2.Edit2KeyUp(Sender: TObject; var Key: Word; Shift: TShiftState);
    begin
      if Key = VK_RETURN then
      begin
        SendChatMessage;
        Key := 0;
        Edit2.Clear;
      end;
    end;
    
    procedure TForm2.FormClose(Sender: TObject; var Action: TCloseAction);
    begin
      FClosing := True;
    end;
    
    procedure TForm2.FormCreate(Sender: TObject);
    begin
      FClosing := False;
      Label1.Caption := InputBox('Chat user name',
        'What is your user name in this chat?', 'd.teti' + (100 + Random(999))
        .ToString);
      _redis := NewRedisClient();
      FTask := TTask.Run(
        procedure
        var
          r: IRedisClient;
        begin
          r := NewRedisClient;
          r.SUBSCRIBE(['chat'],
            procedure(channel, message: string)
            var
              jobj: TJSONObject;
              msg, nickname: string;
            begin
              jobj := TJSONObject.ParseJSONValue(message) as TJSONObject;
              nickname := jobj.GetValue<TJSONString>('nickname').Value;
              msg := jobj.GetValue<TJSONString>('message').Value;
              TThread.Synchronize(nil,
                procedure
                begin
                  Self.OnMessage(nickname, msg);
                end);
            end,
            function: Boolean
            begin
              Result := Assigned(Self) and (not FClosing);
            end);
        end);
    end;
    
    procedure TForm2.OnMessage(const ANickName, AMessage: string);
    begin
      Memo1.Lines.Add('[' + ANickName + '] ' + DateTimeToStr(now));
      Memo1.Lines.Add(AMessage);
      Memo1.Lines.Add('---');
    end;
    
    procedure TForm2.SendChatMessage;
    var
      jobj: TJSONObject;
    begin
      jobj := TJSONObject.Create;
      try
        jobj.AddPair('nickname', Label1.Caption).AddPair('message', Edit2.Text);
        _redis.PUBLISH('chat', jobj.ToString);
      finally
        jobj.Free;
      end;
    end;
    
    end.
    

      

  • 相关阅读:
    Shadow SSDT详解、WinDbg查看Shadow SSDT
    两种方法获取shadow ssdt
    r0遍历系统进程方法总结
    枚举PEB获取进程模块列表
    用户层获取TEB PEB结构地址 遍历进程模块.doc
    Process32First 返回FALSE的原因
    windows内核需要注意的
    hive中遇到的问题
    解读:计数器Counter
    hadoop System times on machines may be out of sync. Check system time and time zones.
  • 原文地址:https://www.cnblogs.com/hnxxcxg/p/10481283.html
Copyright © 2011-2022 走看看