zoukankan      html  css  js  c++  java
  • Rabbitmq 不同系统 间 调用

    两个web系统,部署在同一台机上。使用rabbitmq 进行发送消息

    1、服务端

    public class TestRPC {
        private static final String RPC_QUEUE_NAME = "rpc_queue";
    
        public static void main(String args[]) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("127.0.0.1");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
    
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            channel.queueDeclare(RPC_QUEUE_NAME,false,false,false,null);
            channel.basicQos(1);
    
            QueueingConsumer consumer = new QueueingConsumer(channel);
            channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
            System.out.println(" [x] 等待请求");
    
            while(true){
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                BasicProperties props = delivery.getProperties();
                BasicProperties replyProps = new BasicProperties.Builder().correlationId(props.getCorrelationId()).build();
                String message = new String(delivery.getBody());
                //int n = Integer.parseInt(message);
                System.out.println(" [返回] fib("+message+")");
                String repsonse = "ok";
                channel.basicPublish("", props.getReplyTo(), replyProps, repsonse.getBytes());
                channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
            }
        }
    
    
    }

    2、 客户端调用

    import java.io.IOException;
    import java.util.UUID;
    import java.util.concurrent.TimeoutException;
    
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.ConsumerCancelledException;
    import com.rabbitmq.client.QueueingConsumer;
    import com.rabbitmq.client.ShutdownSignalException;
    
    public class RPCClient {
        private Connection connection;
        private Channel channel;
        private String requestQueueName = "rpc_queue";
        private String replyQueueName;
        private QueueingConsumer consumer;
    
        public RPCClient() throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("127.0.0.1");
            factory.setPort(5672);
            factory.setUsername("guest");
            factory.setPassword("guest");
    
            connection = factory.newConnection();
            channel = connection.createChannel();
    
            replyQueueName = channel.queueDeclare().getQueue();
            consumer = new QueueingConsumer(channel);
            channel.basicConsume(replyQueueName, true,consumer);
        }
    
        public String call(String message) throws IOException,
                ShutdownSignalException, ConsumerCancelledException,
                InterruptedException {
            String response = null;
            String corrId = UUID.randomUUID().toString();
    
            BasicProperties props = new BasicProperties.Builder()
                    .correlationId(corrId)
                    .replyTo(replyQueueName)
                    .build();
            channel.basicPublish("", requestQueueName, props, message.getBytes());
    
            while(true){
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                if(delivery.getProperties().getCorrelationId().equals(corrId)){
                    response = new String(delivery.getBody());
                    break;
                }
            }
    
            return response;
        }
    
        public void close() throws Exception{
            connection.close();
        }
    
        public static void main(String args[]) throws Exception{
            RPCClient fibRpc = new RPCClient();
            System.out.println(" [x] Requesting fib(30)");
            String response = fibRpc.call("hello");
            System.out.println(" [.] Got '"+response+"'");
            fibRpc.close();
    
        }
    }
  • 相关阅读:
    c++中利用宏定义简化for循环使用
    UVA1152- 枚举 /二分查找
    acm 模板
    Xwindow的文章
    编程语言博客
    csh与bash比较
    关于锁与并发的资料总结
    linux su和sudo命令的区别
    对Memcached使用的总结和使用场景
    iptables配置——NAT地址转换
  • 原文地址:https://www.cnblogs.com/lyon91/p/9111637.html
Copyright © 2011-2022 走看看