zoukankan      html  css  js  c++  java
  • NetMQ

    NetMQ是一个封装了Socket队列的开源库,他是ZeroMQ的.net移植版,而ZeroMQ是用C写成的,有人测试过他的性能,几乎可以秒杀其他所有的MQ(MSMQ,RabitMQ等等,都不是他的对手),不过他也有一个弱点,消息不支持持久化!

    定义要发送到消息里的对象

     1 using System;
     2 using ProtoBuf;
     3  
     4 namespace Model
     5 {
     6     [Serializable]
     7     [ProtoContract]
     8     public class Person
     9     {
    10         [ProtoMember(1)]
    11         public int Id { get; set; }
    12         [ProtoMember(2)]
    13         public string Name { get; set; }
    14         [ProtoMember(3)]
    15         public DateTime BirthDay { set; get; }
    16         [ProtoMember(4)]
    17         public Address Address { get; set; }
    18     }
    19 }
    using System;
    using ProtoBuf;
     
    namespace Model
    {
        [Serializable]
        [ProtoContract]
        public class Address
        {
            [ProtoMember(1)]
            public string Line1 { get; set; }
            [ProtoMember(2)]
            public string Line2 { get; set; }
        }
    }

    消息的发送者

    using System;
    using System.IO;
    using System.Runtime.Remoting.Channels;
    using System.Runtime.Serialization.Formatters.Binary;
    using System.Threading;
    using System.Threading.Tasks;
    using Model;
    using NetMQ;
    using ProtoBuf;
    using ProtoBuf.Meta;
     
    namespace Ventilator
    {
        sealed class Ventilator
        {
            public void Run()
            {
                Task.Run(() =>
                {
                    using (var ctx = NetMQContext.Create())
                    using (var sender = ctx.CreatePushSocket())
                    using (var sink = ctx.CreatePushSocket())
                    {
                        sender.Bind("tcp://*:5557");
                        sink.Connect("tcp://localhost:5558");
                        sink.Send("0");
     
                        Console.WriteLine("Sending tasks to workers");
                        RuntimeTypeModel.Default.MetadataTimeoutMilliseconds = 300000;
     
                        //send 100 tasks (workload for tasks, is just some random sleep time that
                        //the workers can perform, in real life each work would do more than sleep
                        for (int taskNumber = 0; taskNumber < 10000; taskNumber++)
                        {
                            Console.WriteLine("Workload : {0}", taskNumber);
                            var person = new Person
                            {
                                Id = taskNumber,
                                Name = "First",
                                BirthDay = DateTime.Parse("1981-11-15"),
                                Address = new Address { Line1 = "Line1", Line2 = "Line2" }
                            };
                            using (var sm = new MemoryStream())
                            {
                                //Serializer.PrepareSerializer<Person>();
                                //Serializer.Serialize(sm, person);
                                //sender.Send(sm.ToArray());
     
                                var binaryFormatter = new BinaryFormatter();
                                binaryFormatter.Serialize(sm, person);
                                sender.Send(sm.ToArray());
                            }
                        }
                    }
                });
            }
        }
    }
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    using NetMQ;
     
    namespace Ventilator
    {
        public class Program
        {
     
            public static void Main(string[] args)
            {
                // Task Ventilator
                // Binds PUSH socket to tcp://localhost:5557
                // Sends batch of tasks to workers via that socket
                Console.WriteLine("====== VENTILATOR ======");
     
                Console.WriteLine("Press enter when worker are ready");
                Console.ReadLine();
     
                //the first message it "0" and signals start of batch
                //see the Sink.csproj Program.cs file for where this is used
                Console.WriteLine("Sending start of batch to Sink");
     
                var ventilator = new Ventilator();
                ventilator.Run();
     
                Console.WriteLine("Press Enter to quit");
                Console.ReadLine();
            }
        }
    }

    消息的处理者

    using System;
    using System.IO;
    using System.Runtime.Serialization.Formatters.Binary;
    using System.Threading;
    using System.Threading.Tasks;
    using Model;
    using NetMQ;
    using ProtoBuf;
     
    namespace Worker
    {
        sealed class Worker
        {
            public void Run()
            {
                Task.Run(() =>
                {
                    using (NetMQContext ctx = NetMQContext.Create())
                    {
                        //socket to receive messages on
                        using (var receiver = ctx.CreatePullSocket())
                        {
                            receiver.Connect("tcp://localhost:5557");
     
                            //socket to send messages on
                            using (var sender = ctx.CreatePushSocket())
                            {
                                sender.Connect("tcp://localhost:5558");
     
                                //process tasks forever
                                while (true)
                                {
                                    //workload from the vetilator is a simple delay
                                    //to simulate some work being done, see
                                    //Ventilator.csproj Proram.cs for the workload sent
                                    //In real life some more meaningful work would be done
     
                                    //string workload = receiver.ReceiveString();
     
                                    var receivedBytes = receiver.Receive();
                                    using (var sm = new MemoryStream(receivedBytes))
                                    {
                                        //Protobuf.net 序列化在多线程方式下报错:
                                        /*
                                          Timeout while inspecting metadata; this may indicate a deadlock. 
                                          This can often be avoided by preparing necessary serializers during application initialization, 
                                          rather than allowing multiple threads to perform the initial metadata inspection; 
                                          please also see the LockContended event
                                         */
                                        //var person = Serializer.Deserialize<Person>(sm);
     
                                        //采用二进制方式
                                        var binaryFormatter = new BinaryFormatter();
                                        var person = binaryFormatter.Deserialize(sm) as Person;
                                        Console.WriteLine("Person {Id:" + person.Id + ",Name:" + person.Name + ",BirthDay:" +
                                                          person.BirthDay + ",Address:{Line1:" + person.Address.Line1 +
                                                          ",Line2:" + person.Address.Line2 + "}}");
                                        Console.WriteLine("Sending to Sink:" + person.Id);
                                        sender.Send(person.Id + "");
                                    }
     
                                    //simulate some work being done
                                    //Thread.Sleep(int.Parse(workload));
                                }
                            }
                        }
                    }
                });
            }
        }
    }
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Threading.Tasks;
     
    namespace Worker
    {
        public class Program
        {
            public static void Main(string[] args)
            {
                // Task Worker
                // Connects PULL socket to tcp://localhost:5557
                // collects workload for socket from Ventilator via that socket
                // Connects PUSH socket to tcp://localhost:5558
                // Sends results to Sink via that socket
                Console.WriteLine("====== WORKER ======");
     
                //Task 方式多线程
                //foreach (Worker client in Enumerable.Range(0, 1000).Select(
                //    x => new Worker()))
                //{
                //    client.Run();
                //}
     
                //多核计算方式多线程
                var actList =
                    Enumerable.Range(0, 50).Select(x => new Worker()).Select(client => (Action)(client.Run)).ToList();
                var paraOption = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount };
                Parallel.Invoke(paraOption, actList.ToArray());
     
                Console.ReadLine();
            }
        }
    }
  • 相关阅读:
    oracle 自动备份
    oracle 常用操作语句
    数据库创建及使用注意事项
    oracle 导入 导出 备份
    http://blog.sina.com.cn/s/blog_5fc8b3810100iw9n.html
    利用普通工具编译的第一个Servlet
    对java:comp/env的研究(转)
    MyEclipse配置tomcat、jdk和发布第一个web项目
    构建 SSH 框架(转)
    Java Project和Web Project
  • 原文地址:https://www.cnblogs.com/asd14828/p/10135748.html
Copyright © 2011-2022 走看看