zoukankan      html  css  js  c++  java
  • [rxjs] Creating An Observable with RxJS

    Create an observable

    var Observable = Rx.Observable;
    
    var source = Observable.create(function(observe){
        var person = {
          name: "Zhentian",
          message: "Hello World!"
        };
      
        observe.onNext(person);
        observe.onCompleted();
    });
    
    var sub = source.subscribe(function onNext(person){
      console.log(person.name + ' say ' + person.message);
    }, function onError(err){
      console.log(err);
    }, function onCompleted(){
      console.log("done");
    });
    
    //Zhentian say Hello World!
    //done

    Async

    var Observable = Rx.Observable;
    
    var source = Observable.create(function(observe){
      
      setTimeout(function(){
        
        var person = {
          name: "Zhentian",
          message: "Hello World!"
        };  
        
        observe.onNext(person);
        observe.onCompleted();
      }, 1000);
    
      console.log("ansyc finished!");
    
    });
    
    var sub = source.subscribe(function onNext(person){
      console.log(person.name + ' say ' + person.message);
    }, function onError(err){
      console.log(err);
    }, function onCompleted(){
      console.log("done");
    });
    
    //"ansyc finished!"
    //"Zhentian say Hello World!"
    //"done"

    Dispose the async

    When you dispose the operation, we can see it log out "start timeout", which is not good, because, the onNext() would never be called, what we want is it even don't get inside setTimeout function.

    var Observable = Rx.Observable;
    
    var source = Observable.create(function(observe){
      
      setTimeout(function(){
        console.log("Starat timeout");
        var person = {
          name: "Zhentian",
          message: "Hello World!"
        };  
        
        observe.onNext(person);
        observe.onCompleted();
      }, 1000);
    
      console.log("ansyc finished!");
    
    });
    
    var sub = source.subscribe(function onNext(person){
      console.log(person.name + ' say ' + person.message);
    }, function onError(err){
      console.log(err);
    }, function onCompleted(){
      console.log("done");
    });
    
    setTimeout(function(){
      
      sub.dispose();
    }, 500);
    
    /*
    "ansyc finished!"
    "Starat timeout"
    */

    Define the dispose

    We can give setTimeout and id, and in the return function, we clear this timeout.

    var Observable = Rx.Observable;
    
    var source = Observable.create(function(observe){
      
      var id = setTimeout(function(){
        console.log("Starat timeout");
        var person = {
          name: "Zhentian",
          message: "Hello World!"
        };  
        
        observe.onNext(person);
        observe.onCompleted();
      }, 1000);
    
      console.log("ansyc finished!");
      
      // Note that this is optional, you do not have to return this if you require no cleanup
      return function(){
        clearTimeout(id);
      }
    
    });
    
    var sub = source.subscribe(function onNext(person){
      console.log(person.name + ' say ' + person.message);
    }, function onError(err){
      console.log(err);
    }, function onCompleted(){
      console.log("done");
    });
    
    setTimeout(function(){
      
      sub.dispose();
    }, 500);
    
    /*
    "ansyc finished!"
    */

    Catch error

    If we throw an error in the code, but we found it actually not catched by the onError handler.

    var Observable = Rx.Observable;
    
    var source = Observable.create(function(observe){
      
      var id = setTimeout(function(){
        throw "there is an error"; //Throw an error here
        var person = {
          name: "Zhentian",
          message: "Hello World!"
        };  
        
        observe.onNext(person);
        observe.onCompleted();
      }, 1000);
      
      // Note that this is optional, you do not have to return this if you require no cleanup
      return function(){
        clearTimeout(id);
      }
    
    });
    
    var sub = source.subscribe(function onNext(person){
      console.log(person.name + ' say ' + person.message);
    }, function onError(err){
      console.log("Error: " + err);
    }, function onCompleted(){
      console.log("done");
    });
    
    /*
    "error"
    "Uncaught there is an error (line 6)"
    */

    What we can do is to add try catch in the block.

    var Observable = Rx.Observable;
    
    var source = Observable.create(function(observe){
      
      var id = setTimeout(function(){
        try{
           throw "there is an error"; //Throw an error here
           var person = {
             name: "Zhentian",
             message: "Hello World!"
           };  
        
           observe.onNext(person);
           observe.onCompleted();
        }catch(error){
           observe.onError(error);
        }
    
      }, 1000);
      
      // Note that this is optional, you do not have to return this if you require no cleanup
      return function(){
        clearTimeout(id);
      }
    
    });
    
    var sub = source.subscribe(function onNext(person){
      console.log(person.name + ' say ' + person.message);
    }, function onError(err){
      console.log("Error: " + err);
    }, function onCompleted(){
      console.log("done");
    });
    
    /*
    "Error: there is an error"
    */
  • 相关阅读:
    《JavaWeb从入门到改行》JSP+EL+JSTL大杂烩汤
    Linux下进程线程,Nignx与php-fpm的进程线程方式
    solr全文检索实现原理
    LSM树以及在hbase中的应用
    MySQL的MyISAM与InnoDB的索引方式
    MySQL的innoDB存储引擎的运作方式,数据结构等
    Redis作缓存
    Redis的几点积累
    Redis数据库各种数据结构的内部实现。
    正则表达式!!!
  • 原文地址:https://www.cnblogs.com/Answer1215/p/4742631.html
Copyright © 2011-2022 走看看