zoukankan      html  css  js  c++  java
  • 【译】StackExchange.Redis 中文文档(八)流

    概述

    Stream 数据类型是在 Redis 版本 5.0 中添加的,它表示消息的仅追加日志。redis.io 上记录的所有 stream related commands 已在 StackExchange.Redis 客户端库中实现。阅读"Introduction to Redis Streams",以获取有关原始 Redis 命令以及如何使用流的更多信息。

    写入流

    流中的每条消息或条目均由 StreamEntry 类型表示。每个流的条目包含一个唯一的ID和一个名称/值对数组。名称/值对由 NameValueEntry 类型表示。

    使用以下命令将具有单个名称/值对的简单消息添加到流中:

    var db = redis.GetDatabase();
    var messageId = db.StreamAdd("event_stream", "foo_name", "bar_value");
    // messageId = 1518951480106-0
    

    StreamAdd 返回的消息ID由将消息添加到流中的毫秒时间和序列号组成。如果在同一毫秒时间创建了两个或更多消息,则序列号用于防止ID冲突。

    可以使用以下方法将多个名称/值对写入流:

    var values = new NameValueEntry[]
    {
        new NameValueEntry("sensor_id", "1234"),
        new NameValueEntry("temp", "19.8")
    };
    
    var db = redis.GetDatabase();
    var messageId = db.StreamAdd("sensor_stream", values);
    

    You also have the option to override the auto-generated message ID by passing your own ID to the StreamAdd method. Other optional parameters allow you to trim the stream's length.

    您还可以选择通过将自己的ID传递给StreamAdd方法来覆盖自动生成的消息ID。其他可选参数使您可以调整流的长度。

    db.StreamAdd("event_stream", "foo_name", "bar_value", messageId: "0-1", maxLength: 100);
    

    读取流

    通过使用 StreamReadStreamRange 方法从流中进行读取。

    var messages = db.StreamRead("event_stream", "0-0");
    

    上面的代码将从ID "0-0" 到流的末尾读取所有消息。你可以选择使用可选的 count 参数来限制返回的消息数。

    StreamRead 方法还允许你一次从多个流中读取:

    var streams = db.StreamRead(new StreamPosition[]
    {
        new StreamPosition("event_stream", "0-0"),
        new StreamPosition("score_stream", "0-0")
    });
    
    Console.WriteLine($"Stream = {streams.First().Key}");
    Console.WriteLine($"Length = {streams.First().Entries.Length}");
    

    你可以使用 countPerStream 可选参数来限制每个流返回的消息数。

    StreamRange 方法允许你返回流中的一系列条目。

    var messages = db.StreamRange("event_stream", minId: "-", maxId: "+");
    

    -+ 特殊字符表示可能的最小和最大ID。这些值是没有设置参数时的默认值。你还可以选择通过使用 messageOrder 参数来反向读取流。StreamRange 方法还提供了通过使用 count 参数来限制返回的条目数的功能。

    var messages = db.StreamRange("event_stream",
        minId: "0-0",
        maxId: "+",
        count: 100,
        messageOrder: Order.Descending);
    

    流的信息

    StreamInfo 方法提供了读取有关流的基本信息的能力:流的第一个和最后一个条目,流的长度,使用者组的数量等。此信息可用于以更有效的方式处理流。

    var info = db.StreamInfo("event_stream");
    
    Console.WriteLine(info.Length);
    Console.WriteLine(info.FirstEntry.Id);
    Console.WriteLine(info.LastEntry.Id);
    

    消费者组

    使用使用者组可让你扩展跨多个工作人员或使用者的流的处理。请阅读“Introduction to Redis Streams”,以获取有关消费者群体的详细信息。

    以下内容创建了一个使用者组,并告诉 Redis 从流中的哪个位置开始读取。如果你在第一次创建流之前调用该方法,则默认情况下,StreamCreateConsumerGroup 方法将为你创建流。你可以通过为createStream可选参数传递false来覆盖此默认行为。

    // Returns true if created, otherwise false.
    db.StreamCreateConsumerGroup("events_stream", "events_consumer_group", "$");
    // or
    db.StreamCreateConsumerGroup("events_stream", "events_consumer_group", StreamPosition.NewMessages);
    

    特殊字符 "$" 表示使用者组将只读取在创建使用者组之后创建的消息。如果要阅读流中已经存在的消息,则可以提供流中的任何位置。

    // Begin reading from the first position in the stream.
    db.StreamCreateConsumerGroup("events_stream", "events_consumer_group", "0-0");
    

    使用 StreamReadGroup 方法将消息读入使用者。此方法接受消息ID作为参数之一。当ID传递给 StreamReadGroup 时,Redis 将仅返回给定使用者的待处理消息,换句话说,它将仅返回使用者已读取的消息。

    要将新消息读入使用者,请使用特殊字符 ">"StreamPosition.NewMessages">" 特殊字符表示从未读取过的消息,从未传递给其他消费者。请注意,消费者组中的消费者是在调用 StreamReadGroup 方法时首次使用时自动创建的。

    // Read 5 messages into two consumers.
    var consumer_1_messages = db.StreamReadGroup("events_stream", "events_cg", "consumer_1", ">", count: 5);
    var consumer_2_messages = db.StreamReadGroup("events_stream", "events_cg", "consumer_2", ">", count: 5);
    

    消费者读取了一条消息后,其状态对于该消费者变为“待处理”状态,其他任何消费者都无法通过 StreamReadGroup 方法读取该消息。可以通过使用 StreamReadGroup 方法并为消费者提供在待处理消息范围内的ID来读取消费者的待处理消息。

    // Read the first pending message for the "consumer_1" consumer.
    var message = db.StreamReadGroup("events_stream", "events_cg", "consumer_1", "0-0", count: 1);
    

    还可以通过调用 StreamPendingStreamPendingMessages 方法来检索待处理的消息信息。StreamPending 返回有关待处理消息数,每个使用者的待处理消息以及最高和最低待处理消息ID的高级信息。

    var pendingInfo = db.StreamPending("events_stream", "events_cg");
    
    Console.WriteLine(pendingInfo.PendingMessageCount);
    Console.WriteLine(pendingInfo.LowestPendingMessageId);
    Console.WriteLine(pendingInfo.HighestPendingMessageId);
    Console.WriteLine($"Consumer count: {pendingInfo.Consumers.Length}.");
    Console.WriteLine(pendingInfo.Consumers.First().Name);
    Console.WriteLine(pendingInfo.Consumers.First().PendingMessageCount);
    

    使用 StreamPendingMessages 方法检索有关给定使用者的待处理消息的详细信息。

    // Read the first pending message for the consumer.
    var pendingMessages = db.StreamPendingMessages("events_stream",
        "events_cg",
        count: 1,
        consumerName: "consumer_1",
        minId: pendingInfo.LowestPendingMessageId);
    
    Console.WriteLine(pendingMessages.Single().MessageId);
    Console.WriteLine(pendingMessages.Single().IdleTimeInMilliseconds);
    

    消息在等待消费者处理之前,直到通过调用 StreamAcknowledge 方法得到确认为止。消息被确认后, StreamReadGroup 不能再访问。

    // Returns the number of messages acknowledged.
    db.StreamAcknowledge("events_stream", "events_cg", pendingMessage.MessageId);
    

    StreamClaim 方法可用于将消费者使用的消息所有权更改为其他消费者。

    // Change ownership to consumer_2 for the first 5 messages pending for consumer_1.
    var pendingMessages = db.StreamPendingMessages("events_stream",
        "events_cg",
        count: 5,
        consumerName: "consumer_1",
        minId: "0-0");
    
    db.StreamClaim("events_stream",
        "events_cg",
        claimingConsumer: "consumer_2",
        minIdleTimeInMs: 0,
        messageIds: pendingMessages.Select(m => m.MessageId).ToArray());
    

    还有其他几种使用使用者组处理流的方法。请参考 Streams 单元测试以了解这些方法及其用法。

    原文地址:Stream

  • 相关阅读:
    Golang实现mysql where in 查询
    Golang终止程序运行(类似php die; exit;)和打印变量(print_r)
    (转)Unity中protobuf的使用方法
    (转)PlayerPrefs游戏存档
    Unity3d---> IEnumerator
    (转)Unity3D占用内存太大的解决方法
    UICamera(NGUI Event system)原理
    NGUI诡异的drawCall
    (转)U3D DrawCall优化手记
    (转)Unity3D
  • 原文地址:https://www.cnblogs.com/liang24/p/13847215.html
Copyright © 2011-2022 走看看