RabbitMQ 是一种消息队列技术,当然还有很多其他的消息队列技术,例如 Kafka、ZeroMQ 。。。等等,各有优劣,并未做过多的了解。
也相似于一个数据库,将数据信息缓存起来处理,避开对大数据的处理,减轻数据库处理数据的压力。可用于实时性不高的异步请求或者需要执行的任务极度耗时等情况。
由消费者订阅队列,例如这个队列是一个通道,然后生产者在一方将队列消息压入队列中,然后消费者便可在另一方读取最早压入的数据,这是一种先进先出的数据结构。
废话不多说,还是先安装用用吧。
首先,需要给 RabbitMQ 安装运行环境 Erlang OTP ,(window环境下)我用的是 otp_win64_20.2.exe。
安装过程:下一步,下一步,然后安装完成--------基本操作。
其次再安装 RabbitMQ,(window环境下)我用的是 rabbitmq-server-3.7.3.exe。
安装过程:下一步,下一步,然后安装完成--------基本操作。
然而并没有结束,真正的操作才刚开始。
1---将window下的.erlang.cookie粘贴到当前登录用户下(亲测 window 7)。
找到这里,复制几下
粘贴到当前登录用户这里来,并覆盖当前的.erlang.cookie
2---以管理员的方式打开cmd,并切换到之前安装的RabbitMQ的路径下回车。
再输入rabbitmqctl status 查看是否安装成功可以正常运行,正常运行的状态是不会显示Error字段。
3---安装 RabbitMQ Web 的管理插件,输入rabbitmq-plugins enable rabbitmq_management 回车。
此时就可以通过 http://127.0.0.1:15672/ 地址来访问本地web管理界面,会有一个默认的管理员账户,其账号和密码都是 guest 然而并不能用。
4---查看本地账户列表的信息,输入 rabbitmqctl.bat list_users 回车。
我已经新增过一个user_admin的超级用户了。
5---新增账户,语法是 rabbitmqctl add_user 用户名 用户密码 ,新增一个用户 账号 wnName 密码 wnPwd。
然后输入 rabbitmqctl.bat list_users 回车,便可看到新增的普通用户。
6--授于用户管理员权限,语法是 rabbitmqctl set_user_tags 用户名 administrator,将 wnName 授予管理员权限。
然后输入 rabbitmqctl.bat list_users 回车,便可看到 wnName 用户已经变成了管理员。
rabbitmq 的语法有很多,普及下基础的吧 。
启动服务 rabbitmq-server start
停止服务 rabbitmq-server stop
查看状态 rabbitmqctl status
查看用户 rabbitmqctl.bat list_users 或 rabbitmqctl list_users
新增用户 rabbitmqctl add_user 用户名 用户密码
删除用户 rabbitmqctl delete_user 用户名
修改密码 rabbitmqctl change_user 用户名 新密码
授予权限 rabbitmqctl set_user_tags 用户名 权限
权限 administrator(超级管理员) monitoring(监控者) management(普通管理员) policymaker(策略制定者)
查看队列 rabbitmqctl list_queues
启动应用 rabbitmqctl stop_app
关闭应用 rabbitmqctl stop_app
清除队列 rabbitmqctl reset
7--- 插入一个小提示,由于是新增的用户,并没有分配虚拟主机权限,然后在代码里面登录的时候就会有问题---None of the specified endpoints were reachable。
操作遗漏 登录之后点击 菜单栏 Admin
解决方法:点击 wnName 进入页面再点击 Set permission (设置 权限) 就脱坑了。
8---撸代码,顺便也写了个多线程的生产者衬托多线程的高效。
队列需要两个对象来完成,以下2个 Demo 是 C# 控制台的代码。
下面是生产者,生成一条信息需要1秒钟。将20条队列消息压入 队列名为 test-User 中,然后20条数据就是 20493 毫秒左右。写入了5个线程并执行同个任务,最后所花 4235 毫秒左右。
//生产者 class Program { //实体类 public class tblUser { public string Name { get; set; } public int Age { get; set; } public int Sex { get; set; } public string CreateTime { get; set; } public int Disable { get; set; } } public static int Interval = 0; public static int num = 1; private static Thread t1; private static Thread t2; private static Thread t3; private static Thread t4; private static Thread t5; public static ConnectionFactory factory = new ConnectionFactory() { //其他的属性不想写,简洁点 不然可能报错 偷一波懒 VirtualHost = "/",//虚拟主机 //rabbitmq 账户信息 UserName = "wnName", Password = "wnPwd", }; static void Main(string[] args) { try { //Task t1 = Task.Factory.StartNew(() => //{ // RunModule(); //}); ///*创建任务 t2 t2 执行 数据集合添加操作*/ //Task t2 = Task.Factory.StartNew(() => //{ // RunModule(); //}); t1 = new Thread(new ThreadStart(RunModule));//无参数的委托 t2 = new Thread(new ThreadStart(RunModule));//无参数的委托 t3 = new Thread(new ThreadStart(RunModule));//无参数的委托 t4 = new Thread(new ThreadStart(RunModule));//无参数的委托 t5 = new Thread(new ThreadStart(RunModule));//无参数的委托 t1.Start(); t2.Start(); t3.Start(); t4.Start(); t5.Start(); } catch (Exception ex) { throw; } Console.ReadKey(); } //队列名 public static string mqName = "test-User"; public static object obj = new object(); static void RunModule() { //压入20条数据 可用 rabbitmqctl list_users 查看 while (num <= 20) { //锁 防止并发 lock (obj) { using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { //QueueDeclare方法中的参数 queue 队列 passive被动 durable持久 exclusive专属 autoDelete自动删除 nowait现在 channel.QueueDeclare(mqName, false, false, false, null); //实例化实体类赋值然后转成json格式压入队列中。为什么要转成json格式?这个格式随意,队列中保存的都是字符串。 var jsonData = JsonConvert.SerializeObject(
new tblUser() { Name = "数据" + num, Age = 0, Sex = 0,
CreateTime = DateTime.Now.ToString("yyyy-MM-dd"), Disable = 1 }); //序列化 var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; //编码下 var body = Encoding.UTF8.GetBytes(jsonData); channel.BasicPublish("", mqName, properties, body); } } Console.WriteLine("Starting...测试线程,第" + num+"次执行。"); num++; } System.Threading.Thread.Sleep(1000); } //终止线程 t1.Abort(); t2.Abort(); t3.Abort(); t4.Abort(); t5.Abort(); t6.Abort(); } }
下面是消费者,从 test-User 队列中读取信息,消费者每次处理数据需要一秒钟。数据依次从最早压入的队列中取出,牺牲实时处理,避免了高峰大数据造成的拥挤。
class Program { private static Thread get1;//线程一 private static int Interval = 0; private static Thread times;//计时器 public static string mqName = "test-User"; public static ConnectionFactory factory = new ConnectionFactory() { VirtualHost = "/", UserName = "wnName", Password = "wnPwd", }; static void Main(string[] args) { using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { channel.QueueDeclare(mqName, false, false, false, null); var consumer = new QueueingBasicConsumer(channel); channel.BasicConsume(mqName, true, consumer); while (true) { var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); var body = ea.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine("获得数据 {0}", message); Thread.Sleep(1000); } } } } }
以上都是个人陋闻,如有问题有劳指正,大家共同学习共同进步。如果有帮助到您,希望您能给动个小手点一下右下方的推荐,谢谢。