zoukankan      html  css  js  c++  java
  • RabbitMQ 一个demo

    Code:

    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    using RabbitMQ.Client;
    using RabbitMQ.Client.Events;
    using System.IO;
    
    
    namespace ConsoleDemo
    {
        class Program
        {
            static void Main(string[] args)
            {
                //step1 
                new Thread(Provider.Write).Start();
                new Thread(Provider.Write).Start();
                new Thread(Provider.Write).Start();
    
                //step2(写入不能多线程,会冲突)
                new Thread(Consumer.Read).Start();
            }
        }
    
        public class Provider
        {
            public static void Write()
            {
                var factory = new ConnectionFactory() { HostName = "localhost", UserName = "qhong", Password = "hongdada", };
                using (var connection = factory.CreateConnection())
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare(queue: "writeLog", durable: false, exclusive: false, autoDelete: false, arguments: null);
                    for (int i = 0; i < 1994; i++)
                    {
                        string message = i.ToString();
                        var body = Encoding.UTF8.GetBytes(message);
    
                        channel.BasicPublish(exchange: "", routingKey: "writeLog", basicProperties: null, body: body);
                        Console.WriteLine("Program Sent {0}", message);
                    }
                }
            }
        }
    
        public class Consumer
        {
            public static void Read()
            {
                var factory = new ConnectionFactory() { HostName = "localhost", UserName = "qhong", Password = "hongdada", VirtualHost = "/" };
                using (var connection = factory.CreateConnection())
                using (var channel = connection.CreateModel())
                {
                    channel.QueueDeclare(queue: "writeLog",
                                         durable: false,
                                         exclusive: false,
                                         autoDelete: false,
                                         arguments: null);
    
                    var consumer = new EventingBasicConsumer(channel);
                    consumer.Received += (model, ea) =>
                    {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body);
                        ExcuateWriteFile(message);
                        Console.WriteLine(" Receiver Received {0}", message);
                    };
                    channel.BasicConsume(queue: "writeLog",
                                         noAck: true,
                                         consumer: consumer);
    
                    Console.WriteLine(" Press [enter] to exit.");
                    Console.ReadLine();
                }
            }
            public static void ExcuateWriteFile(string i)
            {
                using (FileStream fs = new FileStream(@"d:\test.txt", FileMode.Append))
                {
                    using (StreamWriter sw = new StreamWriter(fs, Encoding.Unicode))
                    {
                        sw.WriteLine(i);
                    }
                }
            }
        }
    }

    分成2步执行,第一步生产者往队列里面添加数据

    第二步,消费者读取队列里面的数据并写入文件test.txt

    http://www.cnblogs.com/ericli-ericli/p/5917018.html

    http://www.cnblogs.com/piaolingzxh/p/5448927.html

  • 相关阅读:
    centos 安装 TortoiseSVN svn 客户端
    linux 定时任务 日志记录
    centos6.5 安装PHP7.0支持nginx
    linux root 用户 定时任务添加
    composer 一些使用说明
    laravel cookie写入
    laravel composer 安装指定版本以及基本的配置
    mysql 删除重复记录语句
    linux php redis 扩展安装
    linux php 安装 memcache 扩展
  • 原文地址:https://www.cnblogs.com/hongdada/p/7049861.html
Copyright © 2011-2022 走看看