zoukankan      html  css  js  c++  java
  • 小蜗牛入门---RabbitMQ 新手练习,自带C#控制台简易测试代码。

    RabbitMQ  是一种消息队列技术,当然还有很多其他的消息队列技术,例如 Kafka、ZeroMQ  。。。等等,各有优劣,并未做过多的了解。

    也相似于一个数据库,将数据信息缓存起来处理,避开对大数据的处理,减轻数据库处理数据的压力。可用于实时性不高的异步请求或者需要执行的任务极度耗时等情况。

    由消费者订阅队列,例如这个队列是一个通道,然后生产者在一方将队列消息压入队列中,然后消费者便可在另一方读取最早压入的数据,这是一种先进先出的数据结构。

      废话不多说,还是先安装用用吧。

      首先,需要给 RabbitMQ 安装运行环境 Erlang OTP ,(window环境下)我用的是  otp_win64_20.2.exe。

    安装过程:下一步,下一步,然后安装完成--------基本操作。

      其次再安装 RabbitMQ,(window环境下)我用的是  rabbitmq-server-3.7.3.exe。

    安装过程:下一步,下一步,然后安装完成--------基本操作。

      然而并没有结束,真正的操作才刚开始。

          1---将window下的.erlang.cookie粘贴到当前登录用户下(亲测 window 7)。

        找到这里,复制几下

        

        粘贴到当前登录用户这里来,并覆盖当前的.erlang.cookie

                

      2---以管理员的方式打开cmd,并切换到之前安装的RabbitMQ的路径下回车。

      

        再输入rabbitmqctl status 查看是否安装成功可以正常运行,正常运行的状态是不会显示Error字段。

           

           3---安装 RabbitMQ Web 的管理插件,输入rabbitmq-plugins enable rabbitmq_management 回车。

      

      此时就可以通过 http://127.0.0.1:15672/ 地址来访问本地web管理界面,会有一个默认的管理员账户,其账号和密码都是 guest 然而并不能用。

      

      4---查看本地账户列表的信息,输入 rabbitmqctl.bat list_users 回车。

       我已经新增过一个user_admin的超级用户了。

           

            

      5---新增账户,语法是 rabbitmqctl  add_user  用户名  用户密码 ,新增一个用户  账号 wnName  密码 wnPwd

        然后输入 rabbitmqctl.bat list_users 回车,便可看到新增的普通用户。

      

        

      6--授于用户管理员权限,语法是 rabbitmqctl set_user_tags 用户名 administrator,将 wnName  授予管理员权限。

        然后输入 rabbitmqctl.bat list_users 回车,便可看到 wnName 用户已经变成了管理员。

      

        rabbitmq 的语法有很多,普及下基础的吧 。

           启动服务 rabbitmq-server start

           停止服务    rabbitmq-server stop

           查看状态   rabbitmqctl   status

         查看用户   rabbitmqctl.bat list_users    rabbitmqctl list_users

         新增用户 rabbitmqctl   add_user            用户名      用户密码

         删除用户 rabbitmqctl   delete_user        用户名   

         修改密码 rabbitmqctl   change_user      用户名      新密码

         授予权限 rabbitmqctl   set_user_tags    用户名      权限

           权限    administrator(超级管理员)  monitoring(监控者) management(普通管理员)  policymaker(策略制定者)

         查看队列    rabbitmqctl    list_queues

         启动应用    rabbitmqctl    stop_app

           关闭应用    rabbitmqctl     stop_app

           清除队列    rabbitmqctl     reset

      7--- 插入一个小提示,由于是新增的用户,并没有分配虚拟主机权限,然后在代码里面登录的时候就会有问题---None of the specified endpoints were reachable

         操作遗漏 登录之后点击 菜单栏 Admin

       

        

        解决方法:点击 wnName 进入页面再点击 Set permission (设置 权限) 就脱坑了。

           

      8---撸代码,顺便也写了个多线程的生产者衬托多线程的高效。

       队列需要两个对象来完成,以下2个 Demo 是 C# 控制台的代码。

      下面是生产者,生成一条信息需要1秒钟。将20条队列消息压入 队列名为 test-User 中,然后20条数据就是 20493 毫秒左右。写入了5个线程并执行同个任务,最后所花 4235 毫秒左右。

     //生产者
    class Program
        {
            //实体类
            public class tblUser
            {
                public string Name { get; set; }
                public int Age { get; set; }
                public int Sex { get; set; }
                public string CreateTime { get; set; }
                public int Disable { get; set; }
            }
    
            public static int Interval = 0;
            public static int num = 1;
            private static Thread t1;
            private static Thread t2;
            private static Thread t3;
            private static Thread t4;
            private static Thread t5;
            public static ConnectionFactory factory = new ConnectionFactory()
            {
                //其他的属性不想写,简洁点 不然可能报错  偷一波懒
                VirtualHost = "/",//虚拟主机
                //rabbitmq 账户信息
                UserName = "wnName",
                Password = "wnPwd",
            };
            static void Main(string[] args)
            {
                try
                {
                    //Task t1 = Task.Factory.StartNew(() =>
                    //{
                    //    RunModule();
                    //});
                    ///*创建任务 t2  t2 执行 数据集合添加操作*/
                    //Task t2 = Task.Factory.StartNew(() =>
                    //{
                    //    RunModule();
                    //});
                    t1 = new Thread(new ThreadStart(RunModule));//无参数的委托
                    t2 = new Thread(new ThreadStart(RunModule));//无参数的委托
                    t3 = new Thread(new ThreadStart(RunModule));//无参数的委托
                    t4 = new Thread(new ThreadStart(RunModule));//无参数的委托
                    t5 = new Thread(new ThreadStart(RunModule));//无参数的委托
                    t1.Start();
                    t2.Start();
                    t3.Start();
                    t4.Start();
                    t5.Start();
                }
                catch (Exception ex)
                {
                    throw;
                }
                Console.ReadKey();
            }
            //队列名
            public static string mqName = "test-User";
    
            public static object obj = new object();
            static void RunModule()
            {
                //压入20条数据 可用 rabbitmqctl list_users 查看
                while (num <= 20)
                {
                    //锁 防止并发
                    lock (obj)
                    {
                        using (var connection = factory.CreateConnection())
                        {
                            using (var channel = connection.CreateModel())
                            {
                                //QueueDeclare方法中的参数  queue 队列   passive被动   durable持久  exclusive专属  autoDelete自动删除  nowait现在
                                channel.QueueDeclare(mqName, false, false, false, null);
                                //实例化实体类赋值然后转成json格式压入队列中。为什么要转成json格式?这个格式随意,队列中保存的都是字符串。
                                var jsonData = JsonConvert.SerializeObject(
                        new tblUser() { Name = "数据" + num, Age = 0, Sex = 0,
                        CreateTime = DateTime.Now.ToString("yyyy-MM-dd"), Disable = 1 }); //序列化 var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; //编码下 var body = Encoding.UTF8.GetBytes(jsonData); channel.BasicPublish("", mqName, properties, body); } } Console.WriteLine("Starting...测试线程,第" + num+"次执行。"); num++; } System.Threading.Thread.Sleep(1000); } //终止线程 t1.Abort(); t2.Abort(); t3.Abort(); t4.Abort(); t5.Abort(); t6.Abort(); } }

        

      下面是消费者,从 test-User 队列中读取信息,消费者每次处理数据需要一秒钟。数据依次从最早压入的队列中取出,牺牲实时处理,避免了高峰大数据造成的拥挤。

       

        class Program
        {
            private static Thread get1;//线程一
            private static int  Interval = 0;
            private static Thread times;//计时器
            public  static string mqName = "test-User";
            public  static ConnectionFactory factory = new ConnectionFactory()
            {
                VirtualHost = "/",
                UserName = "wnName",
                Password = "wnPwd",
            };
            static void Main(string[] args)
            {
                    using (var connection = factory.CreateConnection())
                    {
                        using (var channel = connection.CreateModel())
                        {
                            channel.QueueDeclare(mqName, false, false, false, null);
    
                            var consumer = new QueueingBasicConsumer(channel);
                            channel.BasicConsume(mqName, true, consumer);
    
                            while (true)
                            {
                                var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                                var body = ea.Body;
                                var message = Encoding.UTF8.GetString(body);
                                Console.WriteLine("获得数据 {0}", message);
                            Thread.Sleep(1000);
                            }
                       }
                }
            }
        }
    

      

     

     


    以上都是个人陋闻,如有问题有劳指正,大家共同学习共同进步。如果有帮助到您,希望您能给动个小手点一下右下方的推荐,谢谢。

        

  • 相关阅读:
    CF1480
    网络编程中常见地址结构与转换(IPv4/IPv6)
    inet_pton, inet_ntop
    mktime 夏令时
    C/C++中volatile关键字详解
    STL之vector容器详解
    Linux学习--gdb调试
    Linux编程基础——GDB(设置断点)
    FTP模式简式:PORT/PASV/EPRT/EPSV
    strchr和strstr 函数
  • 原文地址:https://www.cnblogs.com/yuqiuyeyun/p/8489391.html
Copyright © 2011-2022 走看看