zoukankan      html  css  js  c++  java
  • RxJava2学习笔记(2)

    上一篇已经熟悉了Observable的基本用法,但是如果仅仅只是“生产-消费”的模型,这就体现不出优势了,java有100种办法可以玩这个:)

    一、更简单的多线程

    正常情况下,生产者与消费者都在同一个线程里处理,参考下面的代码:

    final long start = System.currentTimeMillis();
    
    Observable<String> fileSender = Observable.create(emitter -> {
        for (int i = 1; i < 6; i++) {
            Thread.sleep(1000);
            String temp = "thread:" + Thread.currentThread().getId() + " , file " + i + " 的内容";
            System.out.println(temp);
            emitter.onNext(temp);
        }
        emitter.onComplete();
    });
    
    Observer<String> fileHander = new Observer<String>() {
        @Override
        public void onSubscribe(@NonNull Disposable d) {
            System.out.println("准备处理文件...");
        }
    
        @Override
        public void onNext(@NonNull String s) {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("thread:" + Thread.currentThread().getId() + " , [" + s + "] 已处理!");
        }
    
        @Override
        public void onError(@NonNull Throwable e) {
            System.out.println("师傅,有妖怪!");
        }
    
        @Override
        public void onComplete() {
            System.out.println("总算完事儿,累屎大爷了!");
            long end = System.currentTimeMillis();
            System.out.println("耗时:" + (end - start));
        }
    };
    
    fileSender.subscribe(fileHander);
    
    Thread.sleep(60000);
    

    假设生产者在读取一堆文件,然后发给消费者处理,通常情况下,这类涉及IO的操作都是很耗时的,我们用sleep(1000)来模拟。

    从输出结果上看,生产者与消费者的thread id相同,耗时约为10s。

    fileSender.subscribe(fileHander);
    

    如果上面这行,换成

    fileSender.subscribeOn(Schedulers.io()) //生产者处理时,放在io线程中
            .observeOn(Schedulers.newThread()) //消费者处理时,用新线程
            .subscribe(fileHander); 
    
    注:subscribeOn() 是生产者发送子弹的线程, observeOn() 则是消费者(靶子)收子弹的线程,如果有多个消费者,每次调用observeOn() 消费者线程便会切换一次
    这样生产者、消费者就变成不同的线程了,跑一下看看:

    可以看到二个线程id不一样,说明分别在不同的线程里,而且总耗时明显缩短了。

    二、更平滑的链式调用

    假设我们有一个经典的在线电商场景:用户提交订单后,马上跳到支付页面付款。传统写法,通常是中规中矩的封装2个方法,依次调用。用rxjava后,可以写得更流畅,先做点准备工作:

    先定义二个服务接口:订单服务(OrderService)以及支付服务(PayService)

    OrderService.java

    public interface OrderService {
        Observable<CreateOrderResponse> createOrder(CreateOrderRequest request) throws Exception;
    }
    

    PayService.java

    public interface PayService {
        Observable<PayResponse> payOrder(PayRequest request) throws Exception;
    }
    

    然后来二个实现:

    OrderServiceImpl

    public class OrderServiceImpl implements OrderService {
    
        @Override
        public Observable<CreateOrderResponse> createOrder(CreateOrderRequest request) throws InterruptedException {
            System.out.println("threadId:" + Thread.currentThread().getId() + ", 订单创建中:" + request.toString());
            CreateOrderResponse response = new CreateOrderResponse();
            response.setOrderNo(UUID.randomUUID().toString().replace("-", ""));
            response.setOrderStatus("NEW");
            response.setOrderAmount(request.getOrderAmount());
            response.setOrderDesc(request.getOrderDesc());
            return Observable.create(emitter -> emitter.onNext(response));
        }
    }
    

    PayServiceImpl

    public class PayServiceImpl implements PayService {
    
        @Override
        public Observable<PayResponse> payOrder(PayRequest request) throws InterruptedException {
            System.out.println("threadId:" + Thread.currentThread().getId() + ", 正在请求支付:" + request);
            PayResponse response = new PayResponse();
            response.setSuccess(true);
            response.setOrderNo(request.getOrderNo());
            response.setTradeNo(UUID.randomUUID().toString().replace("-", ""));
            return Observable.create(emitter -> emitter.onNext(response));
        }
    }
    

    然后测试一把:

        @Test
        public void test1() throws Exception {
            OrderService orderService = new OrderServiceImpl();
            PayService payService = new PayServiceImpl();
            orderService.createOrder(new CreateOrderRequest("iphone X", new BigDecimal(8888.00))) //创建订单
                    //将"创建订单的Response" 转换成 "支付订单的Response"
                    .flatMap((Function<CreateOrderResponse, ObservableSource<PayResponse>>) response -> payService.payOrder(new PayRequest(response.getOrderNo(), response.getOrderAmount())))
                    //支付完成的处理
                    .subscribe(response -> System.out.println("threadId:" + Thread.currentThread().getId() + ", 支付完成"));
            Thread.sleep(1000);//等待执行完毕
        }
    

    链式的写法,更符合阅读习惯,注:flatMap这个操作,通俗点讲,就是将一种口径的子弹,转换成另一种口径的子弹,然后再继续发射

    输出:

    threadId:1, 订单创建中:CreateOrderRequest(orderDesc=iphone X, orderAmount=8888)
    threadId:1, 正在请求支付:PayRequest(orderNo=81419b0580d547acbb53955978ace6b8, paymentAmount=8888)
    threadId:1, 支付完成
    

    可以看到,默认情况下,创建订单/支付订单在同一个线程中,结合前面学到的知识,也可以将它们划分到不同的线程里:(虽然就这个场景而言,这样做的意义不大,因为支付前,肯定要等订单先提交,这个没办法并发处理,这里只是意思一下,可以这样做而已)

        @Test
        public void test2() throws Exception {
            OrderService orderService = new OrderServiceImpl();
            PayService payService = new PayServiceImpl();
            orderService.createOrder(new CreateOrderRequest("iphone X", new BigDecimal(8888.00)))
                    .subscribeOn(Schedulers.newThread())  //(生产者)创建订单时,使用新线程
                    .observeOn(Schedulers.newThread()) //(消费者1)接收订单时,使用新线程
                    .flatMap((Function<CreateOrderResponse, ObservableSource<PayResponse>>) response -> payService.payOrder(new PayRequest(response.getOrderNo(), response.getOrderAmount())))
                    .observeOn(Schedulers.newThread()) //(消费者2)接收支付结果时,使用新线程
                    .subscribe(response -> System.out.println("threadId:" + Thread.currentThread().getId() + ", 支付完成"));
            Thread.sleep(1000);//等待执行完毕
        }
    

    输出:

    threadId:1, 订单创建中:CreateOrderRequest(orderDesc=iphone X, orderAmount=8888)
    threadId:13, 正在请求支付:PayRequest(orderNo=d5ff7890f22f486bb1bf8aa8e4f0a3bf, paymentAmount=8888)
    threadId:14, 支付完成
    

    从threadId看,已经是不同的线程了。

    上面的代码,都没考虑到出错的情况,如果支付时出异常了,rxjava如何处理呢?

    先改下支付的实现,人为抛个异常:

    public class PayServiceImpl implements PayService {
    
        @Override
        public Observable<PayResponse> payOrder(PayRequest request) throws Exception {
            throw new Exception("支付失败!");
        }
    }
    

    rxjava里有一个重载版本,见: io.reactivex.Observable

        @CheckReturnValue
        @SchedulerSupport("none")
        public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {
            return this.subscribe(onNext, onError, Functions.EMPTY_ACTION, Functions.emptyConsumer());
        }
    

    使用这个版本即可:

        @Test
        public void test3() throws Exception {
            OrderService orderService = new OrderServiceImpl();
            PayService payService = new PayServiceImpl();
            orderService.createOrder(new CreateOrderRequest("iphone X", new BigDecimal(8888.00)))
                    .flatMap((Function<CreateOrderResponse, ObservableSource<PayResponse>>) response -> payService.payOrder(new PayRequest(response.getOrderNo(), response.getOrderAmount())))
                    .subscribe(response -> System.out.println("threadId:" + Thread.currentThread().getId() + ", 支付完成"),
                            //异常处理
                            err -> System.out.println("支付出错啦:" + err.getMessage()));
            Thread.sleep(1000);//等待执行完毕
        }
    

    输出:

    threadId:1, 订单创建中:CreateOrderRequest(orderDesc=iphone X, orderAmount=8888)
    支付出错啦:支付失败!  

    如果想在订单创建完后,先做些处理,再进行支付,可以这么写:

        @Test
        public void test4() throws Exception {
            OrderService orderService = new OrderServiceImpl();
            PayService payService = new PayServiceImpl();
            orderService.createOrder(new CreateOrderRequest("iphone X", new BigDecimal(8888.00)))
                    //订单创建完成后的处理
                    .doOnNext(response -> System.out.println("订单创建完成:" + response))
                    .flatMap((Function<CreateOrderResponse, ObservableSource<PayResponse>>) response -> payService.payOrder(new PayRequest(response.getOrderNo(), response.getOrderAmount())))
                    .subscribe(response -> System.out.println("threadId:" + Thread.currentThread().getId() + ", 支付完成"),
                            err -> System.out.println("支付出错啦:" + err.getMessage()));
            Thread.sleep(1000);//等待执行完毕
        }
    

    输出:

    threadId:1, 订单创建中:CreateOrderRequest(orderDesc=iphone X, orderAmount=8888)
    订单创建完成:CreateOrderResponse(orderNo=8c194b1d07c044a8af3771159e1bb2bf, orderDesc=iphone X, orderAmount=8888, orderStatus=NEW)
    支付出错啦:支付失败!
    

    最后再说下flatMap与concatMap,看下面二个示例就明白差异:

        @Test
        public void flatMapTest() throws InterruptedException {
            Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
                for (int i = 0; i < 10; i++) {
                    emitter.onNext(i);
                }
            }).flatMap((Function<Integer, ObservableSource<String>>) integer -> Observable.fromArray(integer + "")
                    .delay(10, TimeUnit.MILLISECONDS)
            )
                    .subscribe(s -> System.out.print(s + " "));
            Thread.sleep(5000);
        }
    

      输出:0 1 5 9 2 3 7 4 6 8 

        @Test
        public void concatMapTest() throws InterruptedException {
            Observable.create((ObservableOnSubscribe<Integer>) emitter -> {
                for (int i = 0; i < 10; i++) {
                    emitter.onNext(i);
                }
            }).concatMap((Function<Integer, ObservableSource<String>>) integer -> Observable.fromArray(integer + "")
                    .delay(10, TimeUnit.MILLISECONDS)
            )
                    .subscribe(s -> System.out.print(s + " "));
            Thread.sleep(5000);
        }
    

      输出:0 1 2 3 4 5 6 7 8 9

    结论:flatMap不保证顺序,concatMap能保证顺序

  • 相关阅读:
    希望走过的路成为未来的基石
    第三次个人作业--用例图设计
    第二次结对作业
    第一次结对作业
    第二次个人编程作业
    第一次个人编程作业(更新至2020.02.07)
    Springboot vue 前后分离 跨域 Activiti6 工作流 集成代码生成器 shiro权限
    springcloud 项目源码 微服务 分布式 Activiti6 工作流 vue.js html 跨域 前后分离
    spring cloud springboot 框架源码 activiti工作流 前后分离 集成代码生成器
    java代码生成器 快速开发平台 二次开发 外包项目利器 springmvc SSM后台框架源码
  • 原文地址:https://www.cnblogs.com/yjmyzz/p/rx-java-2-tutorial-2.html
Copyright © 2011-2022 走看看