zoukankan      html  css  js  c++  java
  • C# RocketMqHelper

    using org.apache.rocketmq.client.consumer;
    using org.apache.rocketmq.client.consumer.listener;
    using org.apache.rocketmq.client.producer;
    using org.apache.rocketmq.common.consumer;
    using org.apache.rocketmq.common.protocol.heartbeat;
    using System;
    using System.Collections.Generic;
    using System.Configuration;
    using System.Linq;
    
    
    namespace SqlBulkCopyData.消息中间件
    {
        public class RocketMqHelper
        {
            private static readonly string namesrvAddr = null;
            private static IList<DefaultMQProducer> producers = new List<DefaultMQProducer>();
            private static IList<DefaultMQPushConsumer> consumers = new List<DefaultMQPushConsumer>();
    
            private static object producer_lock = new object();
            private static object consumer_lock = new object();
    
            static RocketMqHelper()
            {
                namesrvAddr = ConfigurationManager.AppSettings["RocketMqIp"];
                if(string.IsNullOrEmpty(namesrvAddr))
                    namesrvAddr = "47.106.232.106:9876";
            }
    
            /// <summary>
            /// 创建生产者
            /// </summary>
            /// <param name="group"></param>
            /// <returns></returns>
            public static DefaultMQProducer CreateDefaultMQProducer(string groupName, int queueCount = 6)
            {
                var producer = producers.Where(o => o.getProducerGroup() == groupName).FirstOrDefault();
                if (producer == null)
                {
                    lock (producer_lock)
                    {
                        producer = producers.Where(o => o.getProducerGroup() == groupName).FirstOrDefault();
                        if (producer == null)
                        {
                            producer = new DefaultMQProducer(groupName);
                            producer.setNamesrvAddr(namesrvAddr);
                            producer.setRetryTimesWhenSendFailed(3);
                            producer.setDefaultTopicQueueNums(queueCount);
                            producer.start();
                            producers.Add(producer);
    
                        }
                    }
                }
    
                return producer;
            }
    
            /// <summary>
            /// 创建消费者
            /// </summary>
            /// <param name="group"></param> 
            /// <returns></returns>
            public static DefaultMQPushConsumer CreateDefaultMQPushConsumer<T>(string groupName) where T : MessageListenerConcurrently
            {
                var consumer = consumers.Where(o => o.getConsumerGroup() == groupName).FirstOrDefault();
                if (consumer == null) //双if +lock
                {
                    lock (consumer_lock)
                    {
                        consumer = consumers.Where(o => o.getConsumerGroup() == groupName).FirstOrDefault();
                        if (consumer == null)
                        {
                            consumer = new DefaultMQPushConsumer(groupName);
                            consumer.setNamesrvAddr(namesrvAddr);
                            consumer.setMessageModel(MessageModel.CLUSTERING);
                            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
                            consumer.registerMessageListener(Activator.CreateInstance<T>());
                            consumer.start();
                            consumers.Add(consumer);
                        }
                    }
                }
                return consumer;
            }
        }
    }
    

      

  • 相关阅读:
    android数据恢复
    UVA 690 Pipeline Scheduling
    2017 国庆湖南 Day4
    2017 国庆湖南 Day5
    2017 国庆湖南 Day6
    2017国庆 清北学堂 北京综合强化班 Day1
    2017 国庆湖南Day2
    bzoj 2962 序列操作
    UVA 818 Cutting Chains
    UVA 211 The Domino Effect
  • 原文地址:https://www.cnblogs.com/sunliyuan/p/12953145.html
Copyright © 2011-2022 走看看