package com.qukoucai.test;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
/**
* Created by coffee on 15/10/28.
*/
public class Main {
//这三个值直接去ip:15672 web网站上去添加设置
static final String exchangeName = "exchange-master";//交换机申明
static final String routingKey = "queue-master";//队列绑定交换机并制定路由routingKey
static final String queueName = "queue-master";//队列声明
private static int producerConnection_size = 0; //消息生产者连接数
private static int consumerConnection_size = 1; //消费者连接数
private static final int consumer_size = 1;//每个消费者连接里面开启的consumer数量
private static int qos = 1; //Qos设置
private static long sleep_time = 0; //模拟每条消息的处理时间
private static boolean autoAck = true; //是否默认Ack
private static Logger logger = LoggerFactory.getLogger(Main.class);
public static void main(String[] args) throws Exception {
logger.debug("start");
final AtomicLong count = new AtomicLong(10000000000L);
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("rabbitmqadmin");
factory.setPassword("qkc123456");
// factory.setVirtualHost("test");
factory.setHost("39.98.232.25");
factory.setPort(5672);
/** 如果要进行消息回调,则这里必须要设置为true */
// CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
// cachingConnectionFactory.setPublisherConfirms(true);
//启动监控程序
Thread t = new Thread(new Runnable() {
public void run() {
long c = count.get();
while (c != 0){
try{
Thread.sleep(1000);
long c1 = count.get();
logger.debug("每秒消费为:{}Qps",c-c1);
c=c1;
}catch (Exception e){
}
}
}
});
t.start();
//启动
for (int i=0;i<producerConnection_size;i++){
Connection conn1 = factory.newConnection();
Thread t1 = producer(conn1, count.get());
t1.start();
}
//启动consumer
for (int i=0;i<consumerConnection_size;i++){
Connection conn1 = factory.newConnection();
Thread t2 = consumer(conn1, count);
t2.start();
}
}
public static Thread consumer(final Connection conn, final AtomicLong count) throws Exception {
return new Thread(new Runnable() {
public void run() {
logger.debug("start consumer");
try {
final CountDownLatch cdl = new CountDownLatch(1000);
for(int i = 0;i<consumer_size;i++) {
final Channel channel = conn.createChannel();
//同一时刻服务器只发送1条消息给消费者(能者多劳,消费消息快的,会消费更多的消息)
//保证在接收端一个消息没有处理完时不会接收另一个消息,即消费者端发送了ack后才会接收下一个消息。
//在这种情况下生产者端会尝试把消息发送给下一个空闲的消费者。
channel.basicQos(i, qos, false);
//申明消费者
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
if (count.decrementAndGet() == 0) {
channel.basicCancel(consumerTag);
cdl.countDown();
try {
channel.close();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
try {
Thread.sleep(sleep_time);
} catch (InterruptedException e) {
}
if (!autoAck){
getChannel().basicAck(envelope.getDeliveryTag(), true);
}
}
};
String consumerTag = channel.basicConsume(queueName,autoAck, "testConsumer" + i, consumer);
logger.debug("consumerTag is {}", consumerTag);
}
cdl.await();
} catch (Exception e) {
}
}
});
}
public static Thread producer(final Connection conn, final long count) throws Exception {
return new Thread(new Runnable() {
public void run() {
logger.debug("start send Message");
try {
Channel channel = conn.createChannel();
channel.exchangeDeclare(exchangeName, "direct", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
BasicProperties properties = new BasicProperties.Builder().deliveryMode(2).build();
for (long i = 0; i < count; i++) {
byte[] messageBodyBytes = ("{"merchantsId":13}").getBytes();
channel.basicPublish(exchangeName, routingKey, properties, messageBodyBytes);
// logger.debug("add message {}",i);
}
channel.close();
conn.close();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}