zoukankan      html  css  js  c++  java
  • activeMq笔记

    对比

    http://jm.taobao.org/2016/04/01/kafka-vs-rabbitmq-vs-rocketmq-message-send-performance/

    安装

    下载地址:http://activemq.apache.org/download.html

    安装教程: http://gerrard-ok.iteye.com/blog/1766203

    解压缩:

    运行: ./activemq start

    .Net使用

     教程:http://www.cnblogs.com/madyina/p/4121458.html#3249312

    下载:http://activemq.apache.org/nms/activemq-downloads.html

    还有一个下载地址比较全: http://archive.apache.org/dist/activemq/apache-nms/

    控制台:http://ip:8161/

    用户名: admin, 密码:admin

    发送连接: tcp://192.168.16.23:61616?wireFormat.maxInactivityDuration=0

    接收连接:failover:(tcp://192.168.16.23:61616?wireFormat.maxInactivityDuration=0&maxInactivityDurationInitalDelay=30000&connection.AsyncSend=true)

     Send方法:

        public class FileUploadedMq : IActiveMq
        {
            private static string ConnString;
            public static ConnectionFactory Factory { get; private set; }
    
            //private static IMessageProducer MsgProducer;
    
            public static event Action<FileUploadedModel> Recved;
    
            static FileUploadedMq()
            {
                ConnString = dbo.GetDbConnString(MqTypeEnum.FileUpload.ToString());
    
                Factory = new ConnectionFactory(ConnString);
            }
    
    
            public void Send(FileUploadedModel Msg)
            {
                try
                {
                    using (IConnection connection = Factory.CreateConnection())
                    {
                        //通过连接创建Session会话
                        using (ISession session = connection.CreateSession())
                        {
                            //通过会话创建生产者,方法里面new出来的是MQ中的Queue
                            var MsgProducer = session.CreateProducer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue("FileUploaded"));
                            //创建一个发送的消息对象
                            var TxtMsg = MsgProducer.CreateTextMessage();
                            TxtMsg.Properties.SetString("filter", "FileUploaded");
    
                            //给这个对象赋实际的消息
                            TxtMsg.Text = Msg.ToJson();
                            //设置消息对象的属性,这个很重要哦,是Queue的过滤条件,也是P2P消息的唯一指定属性
                            //生产者把消息发送出去,几个枚举参数MsgDeliveryMode是否长链,MsgPriority消息优先级别,发送最小单位,当然还有其他重载
                            MsgProducer.Send(TxtMsg, MsgDeliveryMode.NonPersistent, MsgPriority.Normal, TimeSpan.MinValue);
                        }
                    }
                }
                catch (Exception ex)
                {
                    InfoTypeEnum.Error.LogTo(ex.Message);
                }
    
            }
        }

    注册Receive:

        public class ActiveMq
        {
            public static FileUploadedMq FileUploaded = new FileUploadedMq();
            public static FileResolvedMq FileResolved = new FileResolvedMq();
            public static FileFastDfsSavedMq FileFastDfsSaved = new FileFastDfsSavedMq();
            // public static EmailUploadMq EmailUpload=new EmailUploadMq();
    
            public static bool RegisteFileUploaded(Action<FileUploadedModel> msgRecv, Action<FileUploadedModel, Exception> ErrorFunc = null)
            {
                return RegisteMq(MqTypeEnum.FileUpload, o =>
                {
                    if (msgRecv == null) return;
                    msgRecv(o.FromJson<FileUploadedModel>());
                }, (a, b) =>
                {
                    if (ErrorFunc == null) return;
                    ErrorFunc(a.FromJson<FileUploadedModel>(), b);
                });
            }
    
            public static bool RegisteFileResolved(Action<FileResolvedModel> msgRecv, Action<FileResolvedModel, Exception> ErrorFunc = null)
            {
                return RegisteMq(MqTypeEnum.FileResolved, o =>
                {
                    if (msgRecv == null) return;
                    msgRecv(o.FromJson<FileResolvedModel>());
                }, (a, b) =>
                {
                    if (ErrorFunc == null) return;
                    ErrorFunc(a.FromJson<FileResolvedModel>(), b);
                });
            }
    
            public static bool RegisteFileFastDfsSaved(Action<FileFastDfsSavedModel> msgRecv, Action<FileFastDfsSavedModel, Exception> ErrorFunc = null)
            {
                return RegisteMq(MqTypeEnum.FileFastDfsSaved, o =>
                {
                    if (msgRecv == null) return;
                    msgRecv(o.FromJson<FileFastDfsSavedModel>());
                }, (a, b) =>
                {
                    if (ErrorFunc == null) return;
                    ErrorFunc(a.FromJson<FileFastDfsSavedModel>(), b);
                });
            }
    
            private static bool RegisteMq(MqTypeEnum Name, Action<string> msgRecv, Action<string, Exception> ErrorFunc)
            {
                if (msgRecv == null) return false;
    
                var QueueName = Name.ToString();
                try
                {
                    var factory = FileResolvedMq.Factory;
    
                    //通过工厂构建连接
                    IConnection connection = factory.CreateConnection();
                    //这个是连接的客户端名称标识
                    //connection.ClientId = Environment.MachineName + "FileResolvedMqListener";
    
                    //启动连接,监听的话要主动启动连接
                    connection.Start();
                    //通过连接创建一个会话
                    ISession session = connection.CreateSession();
                    //通过会话创建一个消费者,这里就是Queue这种会话类型的监听参数设置
                    IMessageConsumer consumer = session.CreateConsumer(new Apache.NMS.ActiveMQ.Commands.ActiveMQQueue(QueueName));//, "filter='FileResolved'");
    
                    session.DeleteDestination(QueueName);
    
                    //注册监听事件
                    consumer.Listener += message =>
                    {
                        var txt = (ITextMessage)message;
                        if (txt == null || txt.Text.HasValue() == false) { return; }
    
                        var model = txt.Text;
    
    
                        try
                        {
                            msgRecv.Invoke(model);
                        }
                        catch (Exception e)
                        {
                            InfoTypeEnum.Error.LogTo(QueueName + ":在处理过程出现错误!" + connection.ClientId, model.ToJson(), e.Message);
    
                            if (ErrorFunc != null)
                            {
                                try
                                {
                                    ErrorFunc(model, e);
                                }
                                catch { }
                            }
                        }
                    };
                    return true;
                }
                catch (Exception ex)
                {
                    InfoTypeEnum.Error.LogTo(QueueName + ":" + ex.Message);
                    return false;
                }
            }
        }

    问题

    1. 查看消息显示: Error!  Exception occurred while processing this request, check the log for more information!

    原因: 安装了 JRE8 , 改到 JRE7!

    参考:http://bbs.csdn.net/topics/390811825

    安装老版本:http://www.java.com/zh_CN/download/faq/other_jreversions.xml

  • 相关阅读:
    idea快捷键
    cas的缺点
    mybatis plus 实现逻辑删除
    mybatis plus的条件查询
    POI写非常大的数据量时
    POI中HSSF和XSSF
    JMM是什么
    Android面试收集录10 LruCache原理解析
    Android面试收集录9 IntentService详解
    Android面试收集录8 HandlerThread详解
  • 原文地址:https://www.cnblogs.com/newsea/p/4740834.html
Copyright © 2011-2022 走看看