zoukankan      html  css  js  c++  java
  • ActiveMQ c# 系列——进阶实例(三)

    前言

    前面介绍了基本的消费者和生产者,那么看下他们之间有什么其他的api。

    正文

    消费者设置等待时间

    生产者生产了5条消息

    改一下消费者。

    static void Main(string[] args)
    {
    
    	Uri connecturl = new Uri("activemq:tcp://106.15.250.57:61616");
    	IConnectionFactory factory = new ConnectionFactory(connecturl);
      
    	using (IConnection connection = factory.CreateConnection())
    	{
    		using (ISession session = connection.CreateSession())
    		{
    			IDestination destination = SessionUtil.GetDestination(session, "queue://test");
    			using (IMessageConsumer consumer=session.CreateConsumer(destination))
    			{
    				connection.Start();
    				//consumer.Listener += new MessageListener(onMessage);
    				while (true)
    				{
    					var message = (ITextMessage)consumer.Receive(TimeSpan.FromSeconds(4));
    					if (message != null)
    					{
    						Console.WriteLine(message.NMSMessageId);
    						Console.WriteLine(message.Text);
    					}
    				}
    			}
    		}
    	}
    }
    

    Receive(TimeSpan.FromSeconds(4)) 表示如果4秒没有消息将不再等待。通过断点调式可以很快的展示出来。

    多个消费者的情况

    我们知道队列是每个生产的东西都只能消费一次,这个就不做试验了,因为这个是队列的基本原理。

    那么我启动两个消费者,那么消费情况是什么样的呢?

    生产者:

    class Program
    {
    	static void Main(string[] args)
    	{
    		Uri connecturl = new Uri("activemq:tcp://106.15.250.57:61616");
    		IConnectionFactory factory = new ConnectionFactory(connecturl);
    		using (IConnection connection = factory.CreateConnection())
    		{
    			using (ISession session = connection.CreateSession())
    			{
    				IDestination destination = SessionUtil.GetDestination(session, "queue://test");
    				using (IMessageProducer producer = session.CreateProducer(destination))
    				{
    					//producer.DeliveryMode = MsgDeliveryMode.Persistent;
    					//producer.RequestTimeout = TimeSpan.FromSeconds(2);
    					for (int i = 1; i < 7; i++)
    					{
    						ITextMessage request = session.CreateTextMessage("oh,my friend"+i);
    						producer.Send(request);
    					}
    				}
    			}
    		}
    	}
    }
    

    两个消费者:

    class Program
    {
    	protected static TimeSpan receiveTimeout = TimeSpan.FromSeconds(10);
    	static void Main(string[] args)
    	{
    
    		Uri connecturl = new Uri("activemq:tcp://106.15.250.57:61616");
    		IConnectionFactory factory = new ConnectionFactory(connecturl);
    
    		using (IConnection connection = factory.CreateConnection())
    		{
    			using (ISession session = connection.CreateSession())
    			{
    				IDestination destination = SessionUtil.GetDestination(session, "queue://test");
    				using (IMessageConsumer consumer = session.CreateConsumer(destination))
    				{
    					connection.Start();
    					consumer.Listener += new MessageListener(onMessage);
    					Console.Read();
    					//while (true)
    					//{
    					//    var message = (ITextMessage)consumer.Receive(TimeSpan.FromSeconds(4));
    					//    if (message != null)
    					//    {
    					//        Console.WriteLine(message.NMSMessageId);
    					//        Console.WriteLine(message.Text);
    					//    }
    					//}
    				}
    			}
    		}
    	}
    
    	protected static void onMessage(IMessage receivedMsg)
    	{
    		ITextMessage message = receivedMsg as ITextMessage;
    		if (message != null)
    		{
    			//查询出消息
    			Console.WriteLine(message.Text);
    		}
    	}
    }
    

    消费情况:

    平均分配

  • 相关阅读:
    https单向证书
    单例模式再学习
    sql经常出现死锁解决办法
    sqlserver结束和监视耗时的sql
    如何保持进步
    es6-学习
    javascript修改div大小遮挡页面渲染问题
    报表功能设计思考-初步尝试-第一次
    导出统计数据-经验积累-深入1
    Java中数据类型转换&基本类型变量和对象型变量
  • 原文地址:https://www.cnblogs.com/aoximin/p/13383745.html
Copyright © 2011-2022 走看看