zoukankan      html  css  js  c++  java
  • NetMQ

    NetMQ是一个封装了Socket队列的开源库,他是ZeroMQ的.net移植版,而ZeroMQ是用C写成的,有人测试过他的性能,几乎可以秒杀其他所有的MQ(MSMQ,RabitMQ等等,都不是他的对手),不过他也有一个弱点,消息不支持持久化!

    定义要发送到消息里的对象

     1 using System;
     2 using ProtoBuf;
     3  
     4 namespace Model
     5 {
     6     [Serializable]
     7     [ProtoContract]
     8     public class Person
     9     {
    10         [ProtoMember(1)]
    11         public int Id { get; set; }
    12         [ProtoMember(2)]
    13         public string Name { get; set; }
    14         [ProtoMember(3)]
    15         public DateTime BirthDay { set; get; }
    16         [ProtoMember(4)]
    17         public Address Address { get; set; }
    18     }
    19 }
    using System;
    using ProtoBuf;
     
    namespace Model
    {
        [Serializable]
        [ProtoContract]
        public class Address
        {
            [ProtoMember(1)]
            public string Line1 { get; set; }
            [ProtoMember(2)]
            public string Line2 { get; set; }
        }
    }

    消息的发送者

    using System;
    using System.IO;
    using System.Runtime.Remoting.Channels;
    using System.Runtime.Serialization.Formatters.Binary;
    using System.Threading;
    using System.Threading.Tasks;
    using Model;
    using NetMQ;
    using ProtoBuf;
    using ProtoBuf.Meta;
     
    namespace Ventilator
    {
        sealed class Ventilator
        {
            public void Run()
            {
                Task.Run(() =>
                {
                    using (var ctx = NetMQContext.Create())
                    using (var sender = ctx.CreatePushSocket())
                    using (var sink = ctx.CreatePushSocket())
                    {
                        sender.Bind("tcp://*:5557");
                        sink.Connect("tcp://localhost:5558");
                        sink.Send("0");
     
                        Console.WriteLine("Sending tasks to workers");
                        RuntimeTypeModel.Default.MetadataTimeoutMilliseconds = 300000;
     
                        //send 100 tasks (workload for tasks, is just some random sleep time that
                        //the workers can perform, in real life each work would do more than sleep
                        for (int taskNumber = 0; taskNumber < 10000; taskNumber++)
                        {
                            Console.WriteLine("Workload : {0}", taskNumber);
                            var person = new Person
                            {
                                Id = taskNumber,
                                Name = "First",
                                BirthDay = DateTime.Parse("1981-11-15"),
                                Address = new Address { Line1 = "Line1", Line2 = "Line2" }
                            };
                            using (var sm = new MemoryStream())
                            {
                                //Serializer.PrepareSerializer<Person>();
                                //Serializer.Serialize(sm, person);
                                //sender.Send(sm.ToArray());
     
                                var binaryFormatter = new BinaryFormatter();
                                binaryFormatter.Serialize(sm, person);
                                sender.Send(sm.ToArray());
                            }
                        }
                    }
                });
            }
        }
    }
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Text;
    using System.Threading;
    using System.Threading.Tasks;
    using NetMQ;
     
    namespace Ventilator
    {
        public class Program
        {
     
            public static void Main(string[] args)
            {
                // Task Ventilator
                // Binds PUSH socket to tcp://localhost:5557
                // Sends batch of tasks to workers via that socket
                Console.WriteLine("====== VENTILATOR ======");
     
                Console.WriteLine("Press enter when worker are ready");
                Console.ReadLine();
     
                //the first message it "0" and signals start of batch
                //see the Sink.csproj Program.cs file for where this is used
                Console.WriteLine("Sending start of batch to Sink");
     
                var ventilator = new Ventilator();
                ventilator.Run();
     
                Console.WriteLine("Press Enter to quit");
                Console.ReadLine();
            }
        }
    }

    消息的处理者

    using System;
    using System.IO;
    using System.Runtime.Serialization.Formatters.Binary;
    using System.Threading;
    using System.Threading.Tasks;
    using Model;
    using NetMQ;
    using ProtoBuf;
     
    namespace Worker
    {
        sealed class Worker
        {
            public void Run()
            {
                Task.Run(() =>
                {
                    using (NetMQContext ctx = NetMQContext.Create())
                    {
                        //socket to receive messages on
                        using (var receiver = ctx.CreatePullSocket())
                        {
                            receiver.Connect("tcp://localhost:5557");
     
                            //socket to send messages on
                            using (var sender = ctx.CreatePushSocket())
                            {
                                sender.Connect("tcp://localhost:5558");
     
                                //process tasks forever
                                while (true)
                                {
                                    //workload from the vetilator is a simple delay
                                    //to simulate some work being done, see
                                    //Ventilator.csproj Proram.cs for the workload sent
                                    //In real life some more meaningful work would be done
     
                                    //string workload = receiver.ReceiveString();
     
                                    var receivedBytes = receiver.Receive();
                                    using (var sm = new MemoryStream(receivedBytes))
                                    {
                                        //Protobuf.net 序列化在多线程方式下报错:
                                        /*
                                          Timeout while inspecting metadata; this may indicate a deadlock. 
                                          This can often be avoided by preparing necessary serializers during application initialization, 
                                          rather than allowing multiple threads to perform the initial metadata inspection; 
                                          please also see the LockContended event
                                         */
                                        //var person = Serializer.Deserialize<Person>(sm);
     
                                        //采用二进制方式
                                        var binaryFormatter = new BinaryFormatter();
                                        var person = binaryFormatter.Deserialize(sm) as Person;
                                        Console.WriteLine("Person {Id:" + person.Id + ",Name:" + person.Name + ",BirthDay:" +
                                                          person.BirthDay + ",Address:{Line1:" + person.Address.Line1 +
                                                          ",Line2:" + person.Address.Line2 + "}}");
                                        Console.WriteLine("Sending to Sink:" + person.Id);
                                        sender.Send(person.Id + "");
                                    }
     
                                    //simulate some work being done
                                    //Thread.Sleep(int.Parse(workload));
                                }
                            }
                        }
                    }
                });
            }
        }
    }
    using System;
    using System.Collections.Generic;
    using System.Linq;
    using System.Threading.Tasks;
     
    namespace Worker
    {
        public class Program
        {
            public static void Main(string[] args)
            {
                // Task Worker
                // Connects PULL socket to tcp://localhost:5557
                // collects workload for socket from Ventilator via that socket
                // Connects PUSH socket to tcp://localhost:5558
                // Sends results to Sink via that socket
                Console.WriteLine("====== WORKER ======");
     
                //Task 方式多线程
                //foreach (Worker client in Enumerable.Range(0, 1000).Select(
                //    x => new Worker()))
                //{
                //    client.Run();
                //}
     
                //多核计算方式多线程
                var actList =
                    Enumerable.Range(0, 50).Select(x => new Worker()).Select(client => (Action)(client.Run)).ToList();
                var paraOption = new ParallelOptions { MaxDegreeOfParallelism = Environment.ProcessorCount };
                Parallel.Invoke(paraOption, actList.ToArray());
     
                Console.ReadLine();
            }
        }
    }
  • 相关阅读:
    关于Maya Viewport 2.0 API 开发的介绍视频
    春节大假
    Some tips about the life cycle of Maya thread pool
    Can I compile and run Dx11Shader for Maya 2015 on my side?
    How to get current deformed vertex positions in MoBu?
    想加入全球首届的 欧特克云加速计划吗?
    三本毕业(非科班),四次阿里巴巴面试,终拿 offer(大厂面经)
    mac、window版编辑器 webstorm 2016... 永久破解方法。
    node 搭载本地代理,处理web本地开发跨域问题
    js 一维数组,转成嵌套数组
  • 原文地址:https://www.cnblogs.com/asd14828/p/10135748.html
Copyright © 2011-2022 走看看