zoukankan      html  css  js  c++  java
  • com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95)

    RabbitMQ 基于Erlang 实现, 客户端可以用Python | Java | Ruby | PHP | C# | Javascript | Go等语言来实现。这里做个java语言的测试。
    首先安装好RabbitMQ 服务端。

    maven依赖
    <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>3.0.4</version>
    </dependency>
    <dependency>
        <groupId>commons-lang</groupId>
        <artifactId>commons-lang</artifactId>
        <version>2.6</version>
    </dependency>
    java测试代码如下:
    //定义队列
    EndPoint.java
    public abstract class EndPoint{
        
        protected Channel channel;
        protected Connection connection;
        protected String endPointName;
        
        public EndPoint(String endpointName) throws IOException{
             this.endPointName = endpointName;
            
             //Create a connection factory
             ConnectionFactory factory = new ConnectionFactory();
            
             //hostname of your rabbitmq server
             factory.setHost("192.168.163.33");
             factory.setPort(5672);
             factory.setUsername("test");
             factory.setPassword("test");
            
             //creating a channel
             channel = connection.createChannel();
            
             //declaring a queue for this channel. If queue does not exist,
             //it will be created on the server.
             channel.queueDeclare(endpointName, false, false, false, null);
        }
        
        
        /**
         * Close channel and connection. Not necessary as it happens implicitly any way. 
         * @throws IOException
         */
         public void close() throws IOException{
             this.channel.close();
             this.connection.close();
         }
    }
    
    
    
    //生产者
    Producer.java
    public class Producer extends EndPoint{
        
        public Producer(String endPointName) throws IOException{
            super(endPointName);
        }
    
        public void sendMessage(Serializable object) throws IOException {
            channel.basicPublish("",endPointName, null, SerializationUtils.serialize(object));
        }   
    }
    
    
    
    //消费队列
    QueueConsumer.java
    public class QueueConsumer extends EndPoint implements Runnable, Consumer{
        
        public QueueConsumer(String endPointName) throws IOException{
            super(endPointName);        
        }
        
        public void run() {
            try {
                //start consuming messages. Auto acknowledge messages.
                channel.basicConsume(endPointName, true,this);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * Called when consumer is registered.
         */
        public void handleConsumeOk(String consumerTag) {
            System.out.println("Consumer "+consumerTag +" registered");     
        }
    
        /**
         * Called when new message is available.
         */
        public void handleDelivery(String consumerTag, Envelope env,
                BasicProperties props, byte[] body) throws IOException {
            Map map = (HashMap)SerializationUtils.deserialize(body);
            System.out.println("Message Number "+ map.get("message number") + " received.");
            
        }
    
        public void handleCancel(String consumerTag) {}
        public void handleCancelOk(String consumerTag) {}
        public void handleRecoverOk(String consumerTag) {}
        public void handleShutdownSignal(String consumerTag, ShutdownSignalException arg1) {}
    }
    
    
    //测试用例
    Main.java
    public class Main {
        public Main() throws Exception{
            
            QueueConsumer consumer = new QueueConsumer("queue");
            Thread consumerThread = new Thread(consumer);
            consumerThread.start();
            
            Producer producer = new Producer("queue");
            
            for (int i = 0; i < 100000; i++) {
                HashMap message = new HashMap();
                message.put("message number", i);
                producer.sendMessage(message);
                System.out.println("Message Number "+ i +" sent.");
            }
        }
        
        /**
         * @param args
         * @throws SQLException 
         * @throws IOException 
         */
        public static void main(String[] args) throws Exception{
          new Main();
        }
    }

    运行报以下错误

    Exception in thread "main" com.rabbitmq.client.PossibleAuthenticationFailureException: Possibly caused by authentication failure
        at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:355)
        at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:516)
        at com.rabbitmq.client.ConnectionFactory.newConnection(ConnectionFactory.java:533)
        at com.tony.test.EndPoint.<init>(EndPoint.java:26)
        at com.tony.test.QueueConsumer.<init>(QueueConsumer.java:16)
        at com.tony.test.test.<init>(test.java:13)
        at com.tony.test.test.main(test.java:33)
    Caused by: com.rabbitmq.client.ShutdownSignalException: connection error; reason: java.net.SocketException: Connection reset
        at com.rabbitmq.utility.ValueOrException.getValue(ValueOrException.java:67)
        at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:33)
        at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:343)
        at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:216)
        at com.rabbitmq.client.impl.AMQChannel.rpc(AMQChannel.java:202)
        at com.rabbitmq.client.impl.AMQConnection.start(AMQConnection.java:347)
        ... 6 more
    Caused by: java.net.SocketException: Connection reset
        at java.net.SocketInputStream.read(SocketInputStream.java:209)
        at java.net.SocketInputStream.read(SocketInputStream.java:141)
        at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
        at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
        at java.io.DataInputStream.readUnsignedByte(DataInputStream.java:288)
        at com.rabbitmq.client.impl.Frame.readFrom(Frame.java:95)
        at com.rabbitmq.client.impl.SocketFrameHandler.readFrame(SocketFrameHandler.java:131)
        at com.rabbitmq.client.impl.AMQConnection$MainLoop.run(AMQConnection.java:515)

    解决方法这个 或者这个


    网上Rabbit性能测试

    性能测试

    上图可以看到每秒百万级别的消息进出数量,以及2343条的消息在队列中等待。

    前端时间我调MQ的时候也报如上标题的错误,当时MQ的v-host并没给,我试验过"/"和不配该项,但均报如题目所示错误,后经检查为MQ有配置v-host
  • 相关阅读:
    sql server 纵横表的转换
    url参数的编码解码Demo
    SqlServer 列的增加和删除
    asp.net下ajax.ajaxMethod使用方法(转)
    js中document.all 的用法
    cookie跨域,跨目录访问及单点登录。
    错误记录:html隐藏域的值存字符串时出错
    .NET下用C#实现邮箱激活功能
    js与C#服务端 json数据交互
    sqlserver数据可空插入报错
  • 原文地址:https://www.cnblogs.com/dauber/p/9192838.html
Copyright © 2011-2022 走看看