RPCServer:
data:image/s3,"s3://crabby-images/6da44/6da44a3c422e49abcf1dae786223d28e774e2de6" alt=""
public class RPCServer { private static final String RPC_QUEUE_NAME = "rpc_queue"; public static void main(String[] args) throws Exception { ConnectionFactory factory = Common.getFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //创建接收队列 channel.queueDeclare(RPC_QUEUE_NAME, false, false, true, null); //可运行多个服务器进程,可将负载平均分配到多台服务器上 channel.basicQos(1); System.out.println("receive start"); Consumer consumer = new DefaultConsumer(channel) { public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { //CorrelationId:每个请求都被设置成唯一的值(这里要处理后返回给客户端进行判断) BasicProperties replyProps = new BasicProperties().builder().correlationId(properties.getCorrelationId()).build(); String msg = new String(body); System.out.println("Get "+msg+" md5:"); String msgMd5 = getMd5String(msg); //处理完放至回调队列 channel.basicPublish("", properties.getReplyTo(), replyProps, msgMd5.getBytes()); channel.basicAck(envelope.getDeliveryTag(), false); } }; channel.basicConsume(RPC_QUEUE_NAME, false, consumer); } // 模拟RPC方法 获取MD5字符串 public static String getMd5String(String str) { MessageDigest md5 = null; try { md5 = MessageDigest.getInstance("MD5"); } catch (Exception e) { System.out.println(e.toString()); e.printStackTrace(); return ""; } char[] charArray = str.toCharArray(); byte[] byteArray = new byte[charArray.length]; for (int i = 0; i < charArray.length; i++) byteArray[i] = (byte) charArray[i]; byte[] md5Bytes = md5.digest(byteArray); StringBuffer hexValue = new StringBuffer(); for (int i = 0; i < md5Bytes.length; i++) { int val = ((int) md5Bytes[i]) & 0xff; if (val < 16) hexValue.append("0"); hexValue.append(Integer.toHexString(val)); } return hexValue.toString(); } }
RPCClient:
data:image/s3,"s3://crabby-images/6da44/6da44a3c422e49abcf1dae786223d28e774e2de6" alt=""
public class RPCClient { private static final String RPC_QUEUE_NAME = "rpc_queue"; public static void main(String[] args) throws Exception { ConnectionFactory factory = Common.getFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); //创建接收队列 channel.queueDeclare(RPC_QUEUE_NAME, false, false, true, null); /** * 注册回调队列,用于接收rpc响应 */ String replyQueueName = channel.queueDeclare().getQueue(); String CorrId = UUID.randomUUID().toString(); //注册关联Id和响应队列名称 //correlationId提供了一个消息头,用于将当前的消息和先前的某些消息或应用程序特定的ID关联起来 //replyTo指定响应队列 BasicProperties props = new BasicProperties().builder().correlationId(CorrId).replyTo(replyQueueName).build(); Consumer consumer = new DefaultConsumer(channel) { public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body) throws IOException { if(properties.getCorrelationId().equals(CorrId)) { String str = new String(body); System.out.println("返回md5消息:"+str); } channel.basicAck(envelope.getDeliveryTag(), false); } }; //指定回调消费队列 channel.basicConsume(replyQueueName, false, consumer); /** * 向队列发送消息 */ channel.basicPublish("", RPC_QUEUE_NAME, props, "you are".getBytes()); // channel.close(); // connection.close(); } }