zoukankan      html  css  js  c++  java
  • c#生产/消费RabbitMQ

    public sealed class JsonSerializer  
        {
            public static byte[] Serialize(object message)
            {
                return Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message));
            }
    
            public static object Deserialize<T>(byte[] bytes)
            {
                return JsonConvert.DeserializeObject<T>(Encoding.UTF8.GetString(bytes));
            }
        }
        public sealed class BinarySerializer  
        {
            public static byte[] SerializeToBytes(object obj)
            {
                var formatter = new BinaryFormatter();
                using (var stream = new MemoryStream())
                {
                    formatter.Serialize(stream, obj);
    
                    return StreamUtil.ReadAllBytes(stream);
                }
            }
           
            public static object DeserializeFromBytes(byte[] bytes)
            {
                var formatter = new BinaryFormatter();
                using (var stream = new MemoryStream(bytes))
                {
                    return formatter.Deserialize(stream);
                }
            }
        }
    private static bool RawPublishMessage()
            {
                var exchange = "TestExchangeRouting...";
                var routingKey = "rk";
                Uri uri = new Uri("amqp://192.168.1.1:1234/");
                ConnectionFactory factory = new ConnectionFactory();
    
                factory.UserName = "guest";  
                factory.Password = "guest";
                factory.VirtualHost = "/";  
                factory.RequestedHeartbeat = 0;
                factory.Endpoint = new AmqpTcpEndpoint(uri);
    
                //创建一个连接
                using (IConnection connection = factory.CreateConnection())
                {
                    //创建一个通道
                    using (IModel channel = connection.CreateModel())
                    {
                        //声明一个路由
                        channel.ExchangeDeclare(exchange, "direct");
                        var queueOk = channel.QueueDeclare("testQueue", true, false, false, null);
                        channel.QueueBind(queueOk.QueueName, exchange, routingKey);
    
                        var model = new Order
                        {
                            Id = 100021,
                            Title = "工一一个测试Test"
                        }; //这个才是具体的发送内容  
    
                        var body = JsonSerializer.Serialize(model);
    
                        var properties = channel.CreateBasicProperties();
                        properties.SetPersistent(true);
                        properties.ContentType = typeof(Order).AssemblyQualifiedName;
                        properties.ContentEncoding = "JSON";
    
                        //写入  
                        channel.BasicPublish(exchange, routingKey, properties, body);
                        Console.WriteLine("写入成功");
                    }
    
                }
                return false;
            }
    
            private static bool RawGetMessage()
            {
                var exchange = "TestExchangeRouting...";
                var routingKey = "rk";
                Uri uri = new Uri("amqp://192.168.1.1:1234/");
                ConnectionFactory factory = new ConnectionFactory();
    
                factory.UserName = "guest";  
                factory.Password = "guest";
                factory.VirtualHost = "/";  
                factory.RequestedHeartbeat = 0;
                factory.Endpoint = new AmqpTcpEndpoint(uri);
    
                //创建一个连接
                using (IConnection connection = factory.CreateConnection())
                {
                    //创建一个通道
                    using (IModel channel = connection.CreateModel())
                    {
                        var basicConsumer = new QueueingBasicConsumer(channel);
                        channel.BasicConsume("testQueue", false, basicConsumer);
    
                        while (true)
                        {
                            try
                            {
                                BasicDeliverEventArgs basicDeliverEventArgs  ;
                                basicConsumer.Queue.Dequeue(1000, out basicDeliverEventArgs);
                                if (basicDeliverEventArgs == null)
                                {
                                    break;
                                }
    
                                Task.Run(() =>
                                {
                                    channel.BasicAck(basicDeliverEventArgs.DeliveryTag, false);
                                }).Wait();
                                
                                 
                                var body = JsonSerializer.Deserialize<Order>(basicDeliverEventArgs.Body);
                                Console.WriteLine(string.Format("RoutingKey:{0},Body:{1}", basicDeliverEventArgs.RoutingKey,
                                     JsonConvert.SerializeObject(body, Formatting.Indented)));
                            }
                            catch (Exception)
                            {
                                break;
                            }
                        }
    
                        channel.Close();
                    }
                    connection.Close();
                }
    
                return false;
            }
    public static byte[] ReadAllBytes(Stream stream)
            {           
    
                var bytes = new byte[stream.Length];
    
                stream.Seek(0, SeekOrigin.Begin);
    
                for (var i = 0; i < stream.Length; i++)
                {
                    bytes[i] = (byte)stream.ReadByte();
                }
    
                return bytes;
            }
  • 相关阅读:
    新概念英语第三册21-40课(转)
    多线程---线程通信
    多线程----线程同步
    多线程----线程创建的四种方式
    从0开始整合SSM框架-1.mybatis
    easyUI datagrid 动态绑定列名称
    java分享第五天(数组)
    java分享第四天(循环)
    java分享第三天(异常)
    java分享第二天(变量及命名规范)
  • 原文地址:https://www.cnblogs.com/zhshlimi/p/6368388.html
Copyright © 2011-2022 走看看