public class Send {
public static final String routingKey = "wuqidi_task_durable";
/*工作队列 也叫任务队列 目的是将任务发送到队列中 由工作者进行处理 在后台的多个工作者中 任务是共享的*/
public static void main(String[] args) throws Exception{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//消息持久化 :在声明队列过程中 第二个参数 是设置持久化 这是为了 当rabbitmq崩溃 关闭
//消息和队列是否删除 也就是再次开机会不会保留原来信息和队列
//这里设置为true是保证队列不会消失
channel.queueDeclare(routingKey, true, false, false, null);
String con = getTask();
System.out.println(con);
//这里设置的是保证消息不会消失 但也不是完全保证 可能会在内存中
channel.basicPublish("", routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, con.getBytes());
channel.close();
connection.close();
}
/*在这里随机生成 task 个数*/
private static String getTask(){
Random r = new Random();
int len = r.nextInt(10);
StringBuffer sb = new StringBuffer();
for(int i=0; i< 15; i++){
sb.append("task ");
}
if(sb.length()<1){
sb.append("task");
}
return sb.substring(0, sb.length()-1).toString();
}
}
/*循环队列 使用工作队列的一个优点就是可以启动多个接收端 就是工作者,它可以并行工作。采用轮训的方式进行分配任务。*/
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(Send.routingKey, true, false, false, null);
//公平调度:
//在同一时刻 发给同一工作者不准超过1条任务,直到处理完消息作出相应。可以先发送给空闲的工作者。
channel.basicQos(1);
Consumer callback = new Consumer() {
@Override
public void handleShutdownSignal(String consumerTag,
ShutdownSignalException sig) {
}
@Override
public void handleRecoverOk(String consumerTag) {
}
@Override
public void handleDelivery(String arg0, Envelope arg1,
BasicProperties arg2, byte[] arg3) throws IOException {
String tasks = new String(arg3, "utf-8");
String[] taskss = tasks.split(" ");
for(String tem : taskss){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(tem);
}
//这里执行消息确认 ,为的是待任务执行完毕后才能执行删除操作,如果任务执行过程中
//任务执行失败了 rabbitmq还可以链接其他工作者 进行工作
channel.basicAck(arg1.getDeliveryTag(), false);
}
@Override
public void handleConsumeOk(String consumerTag) {
}
@Override
public void handleCancelOk(String consumerTag) {
}
@Override
public void handleCancel(String consumerTag) throws IOException {
}
};
channel.basicConsume(Send.routingKey, false, callback);
channel.close();
connection.close();
}
}
在实际项目中目前我还没有用到。。。