zoukankan      html  css  js  c++  java
  • [rxjs] Creating An Observable with RxJS

    Create an observable

    var Observable = Rx.Observable;
    
    var source = Observable.create(function(observe){
        var person = {
          name: "Zhentian",
          message: "Hello World!"
        };
      
        observe.onNext(person);
        observe.onCompleted();
    });
    
    var sub = source.subscribe(function onNext(person){
      console.log(person.name + ' say ' + person.message);
    }, function onError(err){
      console.log(err);
    }, function onCompleted(){
      console.log("done");
    });
    
    //Zhentian say Hello World!
    //done

    Async

    var Observable = Rx.Observable;
    
    var source = Observable.create(function(observe){
      
      setTimeout(function(){
        
        var person = {
          name: "Zhentian",
          message: "Hello World!"
        };  
        
        observe.onNext(person);
        observe.onCompleted();
      }, 1000);
    
      console.log("ansyc finished!");
    
    });
    
    var sub = source.subscribe(function onNext(person){
      console.log(person.name + ' say ' + person.message);
    }, function onError(err){
      console.log(err);
    }, function onCompleted(){
      console.log("done");
    });
    
    //"ansyc finished!"
    //"Zhentian say Hello World!"
    //"done"

    Dispose the async

    When you dispose the operation, we can see it log out "start timeout", which is not good, because, the onNext() would never be called, what we want is it even don't get inside setTimeout function.

    var Observable = Rx.Observable;
    
    var source = Observable.create(function(observe){
      
      setTimeout(function(){
        console.log("Starat timeout");
        var person = {
          name: "Zhentian",
          message: "Hello World!"
        };  
        
        observe.onNext(person);
        observe.onCompleted();
      }, 1000);
    
      console.log("ansyc finished!");
    
    });
    
    var sub = source.subscribe(function onNext(person){
      console.log(person.name + ' say ' + person.message);
    }, function onError(err){
      console.log(err);
    }, function onCompleted(){
      console.log("done");
    });
    
    setTimeout(function(){
      
      sub.dispose();
    }, 500);
    
    /*
    "ansyc finished!"
    "Starat timeout"
    */

    Define the dispose

    We can give setTimeout and id, and in the return function, we clear this timeout.

    var Observable = Rx.Observable;
    
    var source = Observable.create(function(observe){
      
      var id = setTimeout(function(){
        console.log("Starat timeout");
        var person = {
          name: "Zhentian",
          message: "Hello World!"
        };  
        
        observe.onNext(person);
        observe.onCompleted();
      }, 1000);
    
      console.log("ansyc finished!");
      
      // Note that this is optional, you do not have to return this if you require no cleanup
      return function(){
        clearTimeout(id);
      }
    
    });
    
    var sub = source.subscribe(function onNext(person){
      console.log(person.name + ' say ' + person.message);
    }, function onError(err){
      console.log(err);
    }, function onCompleted(){
      console.log("done");
    });
    
    setTimeout(function(){
      
      sub.dispose();
    }, 500);
    
    /*
    "ansyc finished!"
    */

    Catch error

    If we throw an error in the code, but we found it actually not catched by the onError handler.

    var Observable = Rx.Observable;
    
    var source = Observable.create(function(observe){
      
      var id = setTimeout(function(){
        throw "there is an error"; //Throw an error here
        var person = {
          name: "Zhentian",
          message: "Hello World!"
        };  
        
        observe.onNext(person);
        observe.onCompleted();
      }, 1000);
      
      // Note that this is optional, you do not have to return this if you require no cleanup
      return function(){
        clearTimeout(id);
      }
    
    });
    
    var sub = source.subscribe(function onNext(person){
      console.log(person.name + ' say ' + person.message);
    }, function onError(err){
      console.log("Error: " + err);
    }, function onCompleted(){
      console.log("done");
    });
    
    /*
    "error"
    "Uncaught there is an error (line 6)"
    */

    What we can do is to add try catch in the block.

    var Observable = Rx.Observable;
    
    var source = Observable.create(function(observe){
      
      var id = setTimeout(function(){
        try{
           throw "there is an error"; //Throw an error here
           var person = {
             name: "Zhentian",
             message: "Hello World!"
           };  
        
           observe.onNext(person);
           observe.onCompleted();
        }catch(error){
           observe.onError(error);
        }
    
      }, 1000);
      
      // Note that this is optional, you do not have to return this if you require no cleanup
      return function(){
        clearTimeout(id);
      }
    
    });
    
    var sub = source.subscribe(function onNext(person){
      console.log(person.name + ' say ' + person.message);
    }, function onError(err){
      console.log("Error: " + err);
    }, function onCompleted(){
      console.log("done");
    });
    
    /*
    "Error: there is an error"
    */
  • 相关阅读:
    vue03
    vue02
    vue01
    VMware下Linux配置局域网和外网访问
    以太坊solidity编程常见错误(不定期更新)
    elasticsearch的探索之路
    一个rails项目连多个mongo数据库
    分析rails日志,计算响应率
    PostgreSQL新手上路PG::ConnectionBad (FATAL: Peer authentication failed
    cache目录没有权限
  • 原文地址:https://www.cnblogs.com/Answer1215/p/4742631.html
Copyright © 2011-2022 走看看