zoukankan      html  css  js  c++  java
  • rabbitmq学习——队列

    public class Send {
    public static final String routingKey = "wuqidi_task_durable";
    /*工作队列 也叫任务队列 目的是将任务发送到队列中 由工作者进行处理 在后台的多个工作者中 任务是共享的*/
    public static void main(String[] args) throws Exception{
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");

    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    //消息持久化 :在声明队列过程中 第二个参数 是设置持久化 这是为了 当rabbitmq崩溃 关闭
    //消息和队列是否删除 也就是再次开机会不会保留原来信息和队列
    //这里设置为true是保证队列不会消失
    channel.queueDeclare(routingKey, true, false, false, null);


    String con = getTask();
    System.out.println(con);
    //这里设置的是保证消息不会消失 但也不是完全保证 可能会在内存中
    channel.basicPublish("", routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, con.getBytes());

    channel.close();
    connection.close();
    }

    /*在这里随机生成 task 个数*/
    private static String getTask(){
    Random r = new Random();
    int len = r.nextInt(10);
    StringBuffer sb = new StringBuffer();
    for(int i=0; i< 15; i++){
    sb.append("task ");
    }
    if(sb.length()<1){
    sb.append("task");
    }
    return sb.substring(0, sb.length()-1).toString();
    }
    }

    /*循环队列 使用工作队列的一个优点就是可以启动多个接收端 就是工作者,它可以并行工作。采用轮训的方式进行分配任务。*/
    public static void main(String[] args) throws Exception {
    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    final Channel channel = connection.createChannel();
    channel.queueDeclare(Send.routingKey, true, false, false, null);

    //公平调度:
    //在同一时刻 发给同一工作者不准超过1条任务,直到处理完消息作出相应。可以先发送给空闲的工作者。
    channel.basicQos(1);

    Consumer callback = new Consumer() {
    @Override
    public void handleShutdownSignal(String consumerTag,
    ShutdownSignalException sig) {

    }

    @Override
    public void handleRecoverOk(String consumerTag) {
    }

    @Override
    public void handleDelivery(String arg0, Envelope arg1,
    BasicProperties arg2, byte[] arg3) throws IOException {
    String tasks = new String(arg3, "utf-8");
    String[] taskss = tasks.split(" ");
    for(String tem : taskss){
    try {
    Thread.sleep(1000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    System.out.println(tem);
    }
    //这里执行消息确认 ,为的是待任务执行完毕后才能执行删除操作,如果任务执行过程中
    //任务执行失败了 rabbitmq还可以链接其他工作者 进行工作
    channel.basicAck(arg1.getDeliveryTag(), false);
    }

    @Override
    public void handleConsumeOk(String consumerTag) {
    }

    @Override
    public void handleCancelOk(String consumerTag) {
    }

    @Override
    public void handleCancel(String consumerTag) throws IOException {
    }
    };

    channel.basicConsume(Send.routingKey, false, callback);

    channel.close();
    connection.close();

    }
    }

    在实际项目中目前我还没有用到。。。

  • 相关阅读:
    MySql的常用命令
    yum命令配置及使用说明和常见问题处理
    oracle12c创建用户和表空间出现的问题
    oracle云部署
    ORA-12154: TNS:could not resolve the connect identifier specified
    Linux之iptables
    Linux之MySQL
    Linux之apache
    oracle查锁表
    cookie 和 HttpSession
  • 原文地址:https://www.cnblogs.com/core404/p/7644904.html
Copyright © 2011-2022 走看看