zoukankan      html  css  js  c++  java
  • [rxjs] Creating An Observable with RxJS

    Create an observable

    var Observable = Rx.Observable;
    
    var source = Observable.create(function(observe){
        var person = {
          name: "Zhentian",
          message: "Hello World!"
        };
      
        observe.onNext(person);
        observe.onCompleted();
    });
    
    var sub = source.subscribe(function onNext(person){
      console.log(person.name + ' say ' + person.message);
    }, function onError(err){
      console.log(err);
    }, function onCompleted(){
      console.log("done");
    });
    
    //Zhentian say Hello World!
    //done

    Async

    var Observable = Rx.Observable;
    
    var source = Observable.create(function(observe){
      
      setTimeout(function(){
        
        var person = {
          name: "Zhentian",
          message: "Hello World!"
        };  
        
        observe.onNext(person);
        observe.onCompleted();
      }, 1000);
    
      console.log("ansyc finished!");
    
    });
    
    var sub = source.subscribe(function onNext(person){
      console.log(person.name + ' say ' + person.message);
    }, function onError(err){
      console.log(err);
    }, function onCompleted(){
      console.log("done");
    });
    
    //"ansyc finished!"
    //"Zhentian say Hello World!"
    //"done"

    Dispose the async

    When you dispose the operation, we can see it log out "start timeout", which is not good, because, the onNext() would never be called, what we want is it even don't get inside setTimeout function.

    var Observable = Rx.Observable;
    
    var source = Observable.create(function(observe){
      
      setTimeout(function(){
        console.log("Starat timeout");
        var person = {
          name: "Zhentian",
          message: "Hello World!"
        };  
        
        observe.onNext(person);
        observe.onCompleted();
      }, 1000);
    
      console.log("ansyc finished!");
    
    });
    
    var sub = source.subscribe(function onNext(person){
      console.log(person.name + ' say ' + person.message);
    }, function onError(err){
      console.log(err);
    }, function onCompleted(){
      console.log("done");
    });
    
    setTimeout(function(){
      
      sub.dispose();
    }, 500);
    
    /*
    "ansyc finished!"
    "Starat timeout"
    */

    Define the dispose

    We can give setTimeout and id, and in the return function, we clear this timeout.

    var Observable = Rx.Observable;
    
    var source = Observable.create(function(observe){
      
      var id = setTimeout(function(){
        console.log("Starat timeout");
        var person = {
          name: "Zhentian",
          message: "Hello World!"
        };  
        
        observe.onNext(person);
        observe.onCompleted();
      }, 1000);
    
      console.log("ansyc finished!");
      
      // Note that this is optional, you do not have to return this if you require no cleanup
      return function(){
        clearTimeout(id);
      }
    
    });
    
    var sub = source.subscribe(function onNext(person){
      console.log(person.name + ' say ' + person.message);
    }, function onError(err){
      console.log(err);
    }, function onCompleted(){
      console.log("done");
    });
    
    setTimeout(function(){
      
      sub.dispose();
    }, 500);
    
    /*
    "ansyc finished!"
    */

    Catch error

    If we throw an error in the code, but we found it actually not catched by the onError handler.

    var Observable = Rx.Observable;
    
    var source = Observable.create(function(observe){
      
      var id = setTimeout(function(){
        throw "there is an error"; //Throw an error here
        var person = {
          name: "Zhentian",
          message: "Hello World!"
        };  
        
        observe.onNext(person);
        observe.onCompleted();
      }, 1000);
      
      // Note that this is optional, you do not have to return this if you require no cleanup
      return function(){
        clearTimeout(id);
      }
    
    });
    
    var sub = source.subscribe(function onNext(person){
      console.log(person.name + ' say ' + person.message);
    }, function onError(err){
      console.log("Error: " + err);
    }, function onCompleted(){
      console.log("done");
    });
    
    /*
    "error"
    "Uncaught there is an error (line 6)"
    */

    What we can do is to add try catch in the block.

    var Observable = Rx.Observable;
    
    var source = Observable.create(function(observe){
      
      var id = setTimeout(function(){
        try{
           throw "there is an error"; //Throw an error here
           var person = {
             name: "Zhentian",
             message: "Hello World!"
           };  
        
           observe.onNext(person);
           observe.onCompleted();
        }catch(error){
           observe.onError(error);
        }
    
      }, 1000);
      
      // Note that this is optional, you do not have to return this if you require no cleanup
      return function(){
        clearTimeout(id);
      }
    
    });
    
    var sub = source.subscribe(function onNext(person){
      console.log(person.name + ' say ' + person.message);
    }, function onError(err){
      console.log("Error: " + err);
    }, function onCompleted(){
      console.log("done");
    });
    
    /*
    "Error: there is an error"
    */
  • 相关阅读:
    docker 容器卷及提交
    docker 容器命令及解析
    docker镜像常用命令及解析
    drf 中集成swgger api功能文档
    drf 二次封装Response
    drf 中 自定义 异常处理方法
    drf 中自定义登录 以及token验证
    drf_vue对接极验验证
    django 信号的使用
    element plut tree renderContent
  • 原文地址:https://www.cnblogs.com/Answer1215/p/4742631.html
Copyright © 2011-2022 走看看