NetMQ是一个封装了Socket队列的开源库,他是ZeroMQ的.net移植版,而ZeroMQ是用C写成的,有人测试过他的性能,几乎可以秒杀其他所有的MQ(MSMQ,RabitMQ等等,都不是他的对手),不过他也有一个弱点,消息不支持持久化!
定义要发送到消息里的对象
1 using System; 2 using ProtoBuf; 3 4 namespace Model 5 { 6 [Serializable] 7 [ProtoContract] 8 public class Person 9 { 10 [ProtoMember(1)] 11 public int Id { get; set; } 12 [ProtoMember(2)] 13 public string Name { get; set; } 14 [ProtoMember(3)] 15 public DateTime BirthDay { set; get; } 16 [ProtoMember(4)] 17 public Address Address { get; set; } 18 } 19 }
using System; using ProtoBuf; namespace Model { [Serializable] [ProtoContract] public class Address { [ProtoMember(1)] public string Line1 { get; set; } [ProtoMember(2)] public string Line2 { get; set; } } }
消息的发送者
using System; using System.IO; using System.Runtime.Remoting.Channels; using System.Runtime.Serialization.Formatters.Binary; using System.Threading; using System.Threading.Tasks; using Model; using NetMQ; using ProtoBuf; using ProtoBuf.Meta; namespace Ventilator { sealed class Ventilator { public void Run() { Task.Run(() => { using (var ctx = NetMQContext.Create()) using (var sender = ctx.CreatePushSocket()) using (var sink = ctx.CreatePushSocket()) { sender.Bind("tcp://*:5557"); sink.Connect("tcp://localhost:5558"); sink.Send("0"); Console.WriteLine("Sending tasks to workers"); RuntimeTypeModel.Default.MetadataTimeoutMilliseconds = 300000; //send 100 tasks (workload for tasks, is just some random sleep time that //the workers can perform, in real life each work would do more than sleep for (int taskNumber = 0; taskNumber < 10000; taskNumber++) { Console.WriteLine("Workload : {0}", taskNumber); var person = new Person { Id = taskNumber, Name = "First", BirthDay = DateTime.Parse("1981-11-15"), Address = new Address { Line1 = "Line1", Line2 = "Line2" } }; using (var sm = new MemoryStream()) { //Serializer.PrepareSerializer<Person>(); //Serializer.Serialize(sm, person); //sender.Send(sm.ToArray()); var binaryFormatter = new BinaryFormatter(); binaryFormatter.Serialize(sm, person); sender.Send(sm.ToArray()); } } } }); } } }
using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using NetMQ; namespace Ventilator { public class Program { public static void Main(string[] args) { // Task Ventilator // Binds PUSH socket to tcp://localhost:5557 // Sends batch of tasks to workers via that socket Console.WriteLine("====== VENTILATOR ======"); Console.WriteLine("Press enter when worker are ready"); Console.ReadLine(); //the first message it "0" and signals start of batch //see the Sink.csproj Program.cs file for where this is used Console.WriteLine("Sending start of batch to Sink"); var ventilator = new Ventilator(); ventilator.Run(); Console.WriteLine("Press Enter to quit"); Console.ReadLine(); } } }
消息的处理者
using System; using System.IO; using System.Runtime.Serialization.Formatters.Binary; using System.Threading; using System.Threading.Tasks; using Model; using NetMQ; using ProtoBuf; namespace Worker { sealed class Worker { public void Run() { Task.Run(() => { using (NetMQContext ctx = NetMQContext.Create()) { //socket to receive messages on using (var receiver = ctx.CreatePullSocket()) { receiver.Connect("tcp://localhost:5557"); //socket to send messages on using (var sender = ctx.CreatePushSocket()) { sender.Connect("tcp://localhost:5558"); //process tasks forever while (true) { //workload from the vetilator is a simple delay //to simulate some work being done, see //Ventilator.csproj Proram.cs for the workload sent //In real life some more meaningful work would be done //string workload = receiver.ReceiveString(); var receivedBytes = receiver.Receive(); using (var sm = new MemoryStream(receivedBytes)) { //Protobuf.net 序列化在多线程方式下报错: /* Timeout while inspecting metadata; this may indicate a deadlock. This can often be avoided by preparing necessary serializers during application initialization, rather than allowing multiple threads to perform the initial metadata inspection; please also see the LockContended event */ //var person = Serializer.Deserialize<Person>(sm); //采用二进制方式 var binaryFormatter = new BinaryFormatter(); var person = binaryFormatter.Deserialize(sm) as Person; Console.WriteLine("Person {Id:" + person.Id + ",Name:" + person.Name + ",BirthDay:" + person.BirthDay + ",Address:{Line1:" + person.Address.Line1 + ",Line2:" + person.Address.Line2 + "}}"); Console.WriteLine("Sending to Sink:" + person.Id); sender.Send(person.Id + ""); } //simulate some work being done //Thread.Sleep(int.Parse(workload)); } } } } }); } } }
using System; using System.Collections.Generic; using System.Linq; using System.Threading.Tasks; namespace Worker { public class Program { public static void Main(string[] args) { // Task Worker // Connects PULL socket to tcp://localhost:5557 // collects workload for socket from Ventilator via that socket // Connects PUSH socket to tcp://localhost:5558 // Sends results to Sink via that socket Console.WriteLine("====== WORKER ======"); //Task 方式多线程 //foreach (Worker client in Enumerable.Range(0, 1000).Select( // x => new Worker())) //{ // client.Run(); //} //多核计算方式多线程 var actList = Enumerable.Range(0, 50).Select(x => new Worker()).Select(client => (Action)(client.Run)).ToList(); var paraOption = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount }; Parallel.Invoke(paraOption, actList.ToArray()); Console.ReadLine(); } } }