zoukankan      html  css  js  c++  java
  • RabbitMQ 简单测试

    RabbitMQ 测试

    RabbitMQ 基于Erlang 实现, 客户端可以用Python | Java | Ruby | PHP | C# | Javascript | Go等语言来实现。这里做个java语言的测试。
    首先安装好RabbitMQ 服务端。

    maven依赖
    <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.0.4</version>
    </dependency>
    <dependency>
    	<groupId>commons-lang</groupId>
    	<artifactId>commons-lang</artifactId>
    	<version>2.6</version>
    </dependency>
    
    
    java测试代码如下:
    //定义队列
    EndPoint.java
    public abstract class EndPoint{
    	
        protected Channel channel;
        protected Connection connection;
        protected String endPointName;
    	
        public EndPoint(String endpointName) throws IOException{
             this.endPointName = endpointName;
    		
             //Create a connection factory
             ConnectionFactory factory = new ConnectionFactory();
    	    
             //hostname of your rabbitmq server
             factory.setHost("192.168.163.33");
             factory.setPort(5672);
             factory.setUsername("test");
             factory.setPassword("test");
    	    
             //creating a channel
             channel = connection.createChannel();
    	    
             //declaring a queue for this channel. If queue does not exist,
             //it will be created on the server.
             channel.queueDeclare(endpointName, false, false, false, null);
        }
    	
    	
        /**
         * Close channel and connection. Not necessary as it happens implicitly any way. 
         * @throws IOException
         */
         public void close() throws IOException{
             this.channel.close();
             this.connection.close();
         }
    }
    
    
    
    //生产者
    Producer.java
    public class Producer extends EndPoint{
    	
    	public Producer(String endPointName) throws IOException{
    		super(endPointName);
    	}
    
    	public void sendMessage(Serializable object) throws IOException {
    	    channel.basicPublish("",endPointName, null, SerializationUtils.serialize(object));
    	}	
    }
    
    
    
    //消费队列
    QueueConsumer.java
    public class QueueConsumer extends EndPoint implements Runnable, Consumer{
    	
    	public QueueConsumer(String endPointName) throws IOException{
    		super(endPointName);		
    	}
    	
    	public void run() {
    		try {
    			//start consuming messages. Auto acknowledge messages.
    			channel.basicConsume(endPointName, true,this);
    		} catch (IOException e) {
    			e.printStackTrace();
    		}
    	}
    
    	/**
    	 * Called when consumer is registered.
    	 */
    	public void handleConsumeOk(String consumerTag) {
    		System.out.println("Consumer "+consumerTag +" registered");		
    	}
    
    	/**
    	 * Called when new message is available.
    	 */
    	public void handleDelivery(String consumerTag, Envelope env,
    			BasicProperties props, byte[] body) throws IOException {
    		Map map = (HashMap)SerializationUtils.deserialize(body);
    	    System.out.println("Message Number "+ map.get("message number") + " received.");
    		
    	}
    
    	public void handleCancel(String consumerTag) {}
    	public void handleCancelOk(String consumerTag) {}
    	public void handleRecoverOk(String consumerTag) {}
    	public void handleShutdownSignal(String consumerTag, ShutdownSignalException arg1) {}
    }
    
    
    //测试用例
    Main.java
    public class Main {
    	public Main() throws Exception{
    		
    		QueueConsumer consumer = new QueueConsumer("queue");
    		Thread consumerThread = new Thread(consumer);
    		consumerThread.start();
    		
    		Producer producer = new Producer("queue");
    		
    		for (int i = 0; i < 100000; i++) {
    			HashMap message = new HashMap();
    			message.put("message number", i);
    			producer.sendMessage(message);
    			System.out.println("Message Number "+ i +" sent.");
    		}
    	}
    	
    	/**
    	 * @param args
    	 * @throws SQLException 
    	 * @throws IOException 
    	 */
    	public static void main(String[] args) throws Exception{
    	  new Main();
    	}
    }
    

    运行报以下错误

    Exception in thread "main" com.rabbitmq.client.PossibleAuthenticationFailureException: Possibly caused by authentication failure
    	at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:355)
    	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:516)
    	at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:533)
    	at com.tony.test.EndPoint.<init>(EndPoint.java:26)
    	at com.tony.test.QueueConsumer.<init>(QueueConsumer.java:16)
    	at com.tony.test.test.<init>(test.java:13)
    	at com.tony.test.test.main(test.java:33)
    Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; reason: java.net.SocketException: Connection reset
    	at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
    	at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
    	at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
    	at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
    	at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:202)
    	at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:347)
    	... 6 more
    Caused by: java.net.SocketException: Connection reset
    	at java.net.SocketInputStream.read(SocketInputStream.java:209)
    	at java.net.SocketInputStream.read(SocketInputStream.java:141)
    	at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
    	at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
    	at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288)
    	at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95)
    	at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:131)
    	at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:515)
    

    解决方法这个 或者这个


    网上Rabbit性能测试

    性能测试

    上图可以看到每秒百万级别的消息进出数量,以及2343条的消息在队列中等待。

    参考文献:

    getting-started-rabbitmq-java
    rabbitmq-performance-measurements-part-2
    tutorial-one-java

  • 相关阅读:
    [BZOJ4698][SDOI2008]Sandy的卡片(后缀自动机)
    [NOI2015]小园丁与老司机(DP+上下界最小流)
    [BZOJ2007][NOI2010]海拔(对偶图最短路)
    [NOI2018]屠龙勇士(exCRT)
    [NOI2018]归程(可持久化并查集,Kruskal重构树)
    [BZOJ2823][BZOJ1336][BZOJ1337]最小圆覆盖(随机增量法)
    [BZOJ1069][SCOI2007]最大土地面积(水平扫描法求凸包+旋转卡壳)
    [BZOJ1143][CTSC2008]祭祀river(Dilworth定理+二分图匹配)
    [BZOJ3160]万径人踪灭(FFT+Manacher)
    [NOI2015]寿司晚宴
  • 原文地址:https://www.cnblogs.com/tonyY/p/5011417.html
Copyright © 2011-2022 走看看