zoukankan      html  css  js  c++  java
  • 订阅普通消息 无缓冲的channel

    收发普通消息 - 消息队列RocketMQ版 - 阿里云 https://help.aliyun.com/document_detail/255809.html

    package main
    
    import (
        "fmt"
        "github.com/gogap/errors"
        "strings"
        "time"
    
        "github.com/aliyunmq/mq-http-go-sdk"
    )
    
    func main() {
        // 设置HTTP协议客户端接入点,进入消息队列RocketMQ版控制台实例详情页面的接入点区域查看。
        endpoint := "${HTTP_ENDPOINT}"
        // AccessKey ID阿里云身份验证,在阿里云RAM控制台创建。
        accessKey := "${ACCESS_KEY}"
        // AccessKey Secret阿里云身份验证,在阿里云RAM控制台创建。
        secretKey := "${SECRET_KEY}"
        // 消息所属的Topic,在消息队列RocketMQ版控制台创建。
        //不同消息类型的Topic不能混用,例如普通消息的Topic只能用于收发普通消息,不能用于收发其他类型的消息。
        topic := "${TOPIC}"
        // Topic所属的实例ID,在消息队列RocketMQ版控制台创建。
        // 若实例有命名空间,则实例ID必须传入;若实例无命名空间,则实例ID传入null空值或字符串空值。实例的命名空间可以在消息队列RocketMQ版控制台的实例详情页面查看。
        instanceId := "${INSTANCE_ID}"
        // 您在控制台创建的Group ID。
        groupId := "${GROUP_ID}"
    
        client := mq_http_sdk.NewAliyunMQClient(endpoint, accessKey, secretKey, "")
    
        mqConsumer := client.GetConsumer(instanceId, topic, groupId, "")
    
        for {
            endChan := make(chan int)
            respChan := make(chan mq_http_sdk.ConsumeMessageResponse)
            errChan := make(chan error)
            go func() {
                select {
                case resp := <-respChan:
                    {
                        // 处理业务逻辑。
                        var handles []string
                        fmt.Printf("Consume %d messages---->
    ", len(resp.Messages))
                        for _, v := range resp.Messages {
                            handles = append(handles, v.ReceiptHandle)
                            fmt.Printf("	MessageID: %s, PublishTime: %d, MessageTag: %s
    "+
                                "	ConsumedTimes: %d, FirstConsumeTime: %d, NextConsumeTime: %d
    "+
                                "	Body: %s
    "+
                                "	Props: %s
    ",
                                v.MessageId, v.PublishTime, v.MessageTag, v.ConsumedTimes,
                                v.FirstConsumeTime, v.NextConsumeTime, v.MessageBody, v.Properties)
                        }
    
                        // NextConsumeTime前若不确认消息消费成功,则消息会被重复消费。
                        // 消息句柄有时间戳,同一条消息每次消费拿到的都不一样。
                        ackerr := mqConsumer.AckMessage(handles)
                        if ackerr != nil {
                            // 某些消息的句柄可能超时,会导致消息消费状态确认不成功。
                            fmt.Println(ackerr)
                            if errAckItems, ok := ackerr.(errors.ErrCode).Context()["Detail"].([]mq_http_sdk.ErrAckItem); ok {
                               for _, errAckItem := range errAckItems {
                                 fmt.Printf("	ErrorHandle:%s, ErrorCode:%s, ErrorMsg:%s
    ",
                                   errAckItem.ErrorHandle, errAckItem.ErrorCode, errAckItem.ErrorMsg)
                               }
                            } else {
                               fmt.Println("ack err =", ackerr)
                            }
                            time.Sleep(time.Duration(3) * time.Second)
                        } else {
                            fmt.Printf("Ack ---->
    	%s
    ", handles)
                        }
    
                        endChan <- 1
                    }
                case err := <-errChan:
                    {
                        // Topic中没有消息可消费。
                        if strings.Contains(err.(errors.ErrCode).Error(), "MessageNotExist") {
                            fmt.Println("
    No new message, continue!")
                        } else {
                            fmt.Println(err)
                            time.Sleep(time.Duration(3) * time.Second)
                        }
                        endChan <- 1
                    }
                case <-time.After(35 * time.Second):
                    {
                        fmt.Println("Timeout of consumer message ??")
                        endChan <- 1
                    }
                }
            }()
    
            // 长轮询消费消息。
            // 长轮询表示如果Topic没有消息,则客户端请求会在服务端挂起3s,3s内如果有消息可以消费则立即返回响应。
            mqConsumer.ConsumeMessage(respChan, errChan,
                3, // 一次最多消费3条(最多可设置为16条)。
                3, // 长轮询时间3秒(最多可设置为30秒)。
            )
            <-endChan
        }
    }
  • 相关阅读:
    HDU 6182 A Math Problem 水题
    HDU 6186 CS Course 位运算 思维
    HDU 6188 Duizi and Shunzi 贪心 思维
    HDU 2824 The Euler function 欧拉函数
    HDU 3037 Saving Beans 多重集合的结合 lucas定理
    HDU 3923 Invoker Polya定理
    FZU 2282 Wand 组合数学 错排公式
    HDU 1452 Happy 2004 数论
    HDU 5778 abs 数论
    欧拉回路【判断连通+度数为偶】
  • 原文地址:https://www.cnblogs.com/rsapaper/p/15076023.html
Copyright © 2011-2022 走看看