zoukankan      html  css  js  c++  java
  • Rabbit的事务

    加入事务的方法:

    txSelect()  txCommit()  txRollback()

    生产者:

    package com.kf.queueDemo.transactions;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.kf.utils.RabbitConnectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    /**
     * 简单队列  (事务) txSelect()  txCommit()  txRollback()
     * @author kf
     *
     */
    public class TransactionsQueueProducer {
        
        //队列名称
        private static String QUEUENAME = "TRANSACTIONSQUEUE";
        
        public static void main(String[] args) throws IOException, TimeoutException{
            Connection connection = RabbitConnectionUtils.getConnection();
            
            //创建通道
            Channel channel = connection.createChannel();
            
            //通道里放入队列
            /**
             * 第一个参数是  队列名称
             * 第二个参数指 要不要持久化
             */
            channel.queueDeclare(QUEUENAME, false, false, false, null);
            
    /*        //消息体
            String mes = "demo_message汉字";
            
            //发送消息
            *//**
             * 参数为  exchange, routingKey, props, body
             * exchange   交换机
             * routingKey 路由键
             * 
             * body 消息体
             *//*
            channel.basicPublish("", QUEUENAME, null, mes.getBytes());*/
            
            /**
             * 集群环境下,多个消费者情况下。消费者默认采用均摊
             */
            try {
                //开启事务
                channel.txSelect();
                String mes = "demo_message汉字";
                System.out.println("发送消息"+mes);
                channel.basicPublish("", QUEUENAME, null, mes.getBytes());
                //提交事务
                channel.txCommit();
                int  i = 1/0;
            } catch (Exception e) {
                e.printStackTrace();
                //事务回滚
                channel.txRollback();
                System.out.println("生产者发生错误,事务已回滚");
            }finally {
                channel.close();
                connection.close();
            }
            
        }
    
    }

    消费者:

    package com.kf.queueDemo.transactions;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    import com.kf.utils.RabbitConnectionUtils;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    import com.rabbitmq.client.AMQP.BasicProperties;
    
    /**
     * 简单队列消费者  事务
     * @author kf
     *
     */
    public class TransactionsConsumer {
        //队列名称
            private static String QUEUENAME = "TRANSACTIONSQUEUE";
            
            public static void main(String[] args) throws IOException, TimeoutException{
                System.out.println("开始接收消息");
                Connection connection = RabbitConnectionUtils.getConnection();
                
                //创建通道
                final Channel channel = connection.createChannel();
                
                //通道里放入队列
                /**
                 * 第一个参数是  队列名称
                 * 第二个参数指 要不要持久化
                 */
                channel.queueDeclare(QUEUENAME, false, false, false, null);
                
                DefaultConsumer consumer = new DefaultConsumer(channel){
                    //监听队列
                        @Override
                        public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
                                byte[] body) throws IOException {
                            System.out.println("------------进入监听---------");
                            String s = new String(body, "utf-8");
                            System.out.println("获取到的消息是:"+s);
                            //手动应答。
                            /**
                             * 当  channel.basicConsume(QUEUENAME, true, consumer);第二个参数为false时  是手动应答模式
                             */
                        //    channel.basicAck(envelope.getDeliveryTag(), false);
                        }
                };
                
                //设置应答模式
                /**
                 * 参数:  对列名,是否自动签收,监听的类
                 */
                System.out.println("获取消息的方法之前");
                channel.basicConsume(QUEUENAME, true, consumer);
                System.out.println("获取消息的方法之后");
                
            }
    
    
    }
  • 相关阅读:
    Codeforces Round #239(Div. 2) 做后扯淡玩
    hdu 3507 Print Article
    prufer序列
    POJ 2778 DNA Sequence
    Codeforces Round #237 (Div. 2)
    poj3352
    图论知识
    POJ 2186
    Codeforces Round #236 (Div. 2)
    POJ 2823 Sliding Window
  • 原文地址:https://www.cnblogs.com/fuguang/p/10660580.html
Copyright © 2011-2022 走看看