zoukankan      html  css  js  c++  java
  • rabbitmq代码配置

    package com.qukoucai.test;

    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.AMQP.BasicProperties;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.Consumer;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;

    import java.io.IOException;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.TimeoutException;
    import java.util.concurrent.atomic.AtomicLong;

    /**
    * Created by coffee on 15/10/28.
    */
    public class Main {

      //这三个值直接去ip:15672 web网站上去添加设置
    static final String exchangeName = "exchange-master";//交换机申明
    static final String routingKey = "queue-master";//队列绑定交换机并制定路由routingKey
    static final String queueName = "queue-master";//队列声明

    private static int producerConnection_size = 0; //消息生产者连接数
    private static int consumerConnection_size = 1; //消费者连接数
    private static final int consumer_size = 1;//每个消费者连接里面开启的consumer数量
    private static int qos = 1; //Qos设置
    private static long sleep_time = 0; //模拟每条消息的处理时间
    private static boolean autoAck = true; //是否默认Ack

    private static Logger logger = LoggerFactory.getLogger(Main.class);
    public static void main(String[] args) throws Exception {
    logger.debug("start");
    final AtomicLong count = new AtomicLong(10000000000L);
    ConnectionFactory factory = new ConnectionFactory();
    factory.setUsername("rabbitmqadmin");
    factory.setPassword("qkc123456");
    // factory.setVirtualHost("test");
    factory.setHost("39.98.232.25");
    factory.setPort(5672);

    /** 如果要进行消息回调,则这里必须要设置为true */
    // CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
    // cachingConnectionFactory.setPublisherConfirms(true);

    //启动监控程序
    Thread t = new Thread(new Runnable() {
    public void run() {
    long c = count.get();
    while (c != 0){
    try{
    Thread.sleep(1000);
    long c1 = count.get();
    logger.debug("每秒消费为:{}Qps",c-c1);
    c=c1;
    }catch (Exception e){
    }
    }
    }
    });
    t.start();
    //启动
    for (int i=0;i<producerConnection_size;i++){
    Connection conn1 = factory.newConnection();
    Thread t1 = producer(conn1, count.get());
    t1.start();
    }
    //启动consumer
    for (int i=0;i<consumerConnection_size;i++){
    Connection conn1 = factory.newConnection();
    Thread t2 = consumer(conn1, count);
    t2.start();
    }
    }

    public static Thread consumer(final Connection conn, final AtomicLong count) throws Exception {
    return new Thread(new Runnable() {
    public void run() {
    logger.debug("start consumer");
    try {
    final CountDownLatch cdl = new CountDownLatch(1000);
    for(int i = 0;i<consumer_size;i++) {
    final Channel channel = conn.createChannel();

    //同一时刻服务器只发送1条消息给消费者(能者多劳,消费消息快的,会消费更多的消息)
    //保证在接收端一个消息没有处理完时不会接收另一个消息,即消费者端发送了ack后才会接收下一个消息。
    //在这种情况下生产者端会尝试把消息发送给下一个空闲的消费者。
    channel.basicQos(i, qos, false);

    //申明消费者
    Consumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    if (count.decrementAndGet() == 0) {
    channel.basicCancel(consumerTag);
    cdl.countDown();
    try {
    channel.close();
    } catch (TimeoutException e) {
    e.printStackTrace();
    }
    }
    try {
    Thread.sleep(sleep_time);
    } catch (InterruptedException e) {
    }
    if (!autoAck){
    getChannel().basicAck(envelope.getDeliveryTag(), true);
    }
    }
    };
    String consumerTag = channel.basicConsume(queueName,autoAck, "testConsumer" + i, consumer);
    logger.debug("consumerTag is {}", consumerTag);
    }
    cdl.await();
    } catch (Exception e) {
    }
    }
    });
    }


    public static Thread producer(final Connection conn, final long count) throws Exception {
    return new Thread(new Runnable() {
    public void run() {
    logger.debug("start send Message");
    try {
    Channel channel = conn.createChannel();
    channel.exchangeDeclare(exchangeName, "direct", true);
    channel.queueDeclare(queueName, true, false, false, null);
    channel.queueBind(queueName, exchangeName, routingKey);
    BasicProperties properties = new BasicProperties.Builder().deliveryMode(2).build();
    for (long i = 0; i < count; i++) {
    byte[] messageBodyBytes = ("{"merchantsId":13}").getBytes();
    channel.basicPublish(exchangeName, routingKey, properties, messageBodyBytes);
    // logger.debug("add message {}",i);
    }
    channel.close();
    conn.close();
    } catch (Exception e) {
    e.printStackTrace();
    }
    }
    });
    }
    }
  • 相关阅读:
    线段树&&线段树的创建线段树的查询&&单节点更新&&区间更新
    树&二叉树&&满二叉树&&完全二叉树&&完满二叉树
    Git学习记录 力做全网最强入门教程
    Markdown测试
    [转载] c++对结构体数组排序
    c/c++ 中#ifndef和#endif的作用及使用
    交互题(apio2016Gap)
    linux下对拍
    CTSC2017密钥、吉夫特
    省队十连测
  • 原文地址:https://www.cnblogs.com/holyshengjie/p/10621156.html
Copyright © 2011-2022 走看看