zoukankan      html  css  js  c++  java
  • [rxjs] Creating An Observable with RxJS

    Create an observable

    var Observable = Rx.Observable;
    
    var source = Observable.create(function(observe){
        var person = {
          name: "Zhentian",
          message: "Hello World!"
        };
      
        observe.onNext(person);
        observe.onCompleted();
    });
    
    var sub = source.subscribe(function onNext(person){
      console.log(person.name + ' say ' + person.message);
    }, function onError(err){
      console.log(err);
    }, function onCompleted(){
      console.log("done");
    });
    
    //Zhentian say Hello World!
    //done

    Async

    var Observable = Rx.Observable;
    
    var source = Observable.create(function(observe){
      
      setTimeout(function(){
        
        var person = {
          name: "Zhentian",
          message: "Hello World!"
        };  
        
        observe.onNext(person);
        observe.onCompleted();
      }, 1000);
    
      console.log("ansyc finished!");
    
    });
    
    var sub = source.subscribe(function onNext(person){
      console.log(person.name + ' say ' + person.message);
    }, function onError(err){
      console.log(err);
    }, function onCompleted(){
      console.log("done");
    });
    
    //"ansyc finished!"
    //"Zhentian say Hello World!"
    //"done"

    Dispose the async

    When you dispose the operation, we can see it log out "start timeout", which is not good, because, the onNext() would never be called, what we want is it even don't get inside setTimeout function.

    var Observable = Rx.Observable;
    
    var source = Observable.create(function(observe){
      
      setTimeout(function(){
        console.log("Starat timeout");
        var person = {
          name: "Zhentian",
          message: "Hello World!"
        };  
        
        observe.onNext(person);
        observe.onCompleted();
      }, 1000);
    
      console.log("ansyc finished!");
    
    });
    
    var sub = source.subscribe(function onNext(person){
      console.log(person.name + ' say ' + person.message);
    }, function onError(err){
      console.log(err);
    }, function onCompleted(){
      console.log("done");
    });
    
    setTimeout(function(){
      
      sub.dispose();
    }, 500);
    
    /*
    "ansyc finished!"
    "Starat timeout"
    */

    Define the dispose

    We can give setTimeout and id, and in the return function, we clear this timeout.

    var Observable = Rx.Observable;
    
    var source = Observable.create(function(observe){
      
      var id = setTimeout(function(){
        console.log("Starat timeout");
        var person = {
          name: "Zhentian",
          message: "Hello World!"
        };  
        
        observe.onNext(person);
        observe.onCompleted();
      }, 1000);
    
      console.log("ansyc finished!");
      
      // Note that this is optional, you do not have to return this if you require no cleanup
      return function(){
        clearTimeout(id);
      }
    
    });
    
    var sub = source.subscribe(function onNext(person){
      console.log(person.name + ' say ' + person.message);
    }, function onError(err){
      console.log(err);
    }, function onCompleted(){
      console.log("done");
    });
    
    setTimeout(function(){
      
      sub.dispose();
    }, 500);
    
    /*
    "ansyc finished!"
    */

    Catch error

    If we throw an error in the code, but we found it actually not catched by the onError handler.

    var Observable = Rx.Observable;
    
    var source = Observable.create(function(observe){
      
      var id = setTimeout(function(){
        throw "there is an error"; //Throw an error here
        var person = {
          name: "Zhentian",
          message: "Hello World!"
        };  
        
        observe.onNext(person);
        observe.onCompleted();
      }, 1000);
      
      // Note that this is optional, you do not have to return this if you require no cleanup
      return function(){
        clearTimeout(id);
      }
    
    });
    
    var sub = source.subscribe(function onNext(person){
      console.log(person.name + ' say ' + person.message);
    }, function onError(err){
      console.log("Error: " + err);
    }, function onCompleted(){
      console.log("done");
    });
    
    /*
    "error"
    "Uncaught there is an error (line 6)"
    */

    What we can do is to add try catch in the block.

    var Observable = Rx.Observable;
    
    var source = Observable.create(function(observe){
      
      var id = setTimeout(function(){
        try{
           throw "there is an error"; //Throw an error here
           var person = {
             name: "Zhentian",
             message: "Hello World!"
           };  
        
           observe.onNext(person);
           observe.onCompleted();
        }catch(error){
           observe.onError(error);
        }
    
      }, 1000);
      
      // Note that this is optional, you do not have to return this if you require no cleanup
      return function(){
        clearTimeout(id);
      }
    
    });
    
    var sub = source.subscribe(function onNext(person){
      console.log(person.name + ' say ' + person.message);
    }, function onError(err){
      console.log("Error: " + err);
    }, function onCompleted(){
      console.log("done");
    });
    
    /*
    "Error: there is an error"
    */
  • 相关阅读:
    Activity
    python 执行DOS/CMD命令
    MyISAM存储引擎的表级锁定优化
    vbs隐藏cmd窗口
    文件操作一
    Java基础知识
    Linux下利用.htaccess建立多站点
    python使用WMI监视系统-CPU使用率
    python操作mysql数据库注意
    类集框架(二)
  • 原文地址:https://www.cnblogs.com/Answer1215/p/4742631.html
Copyright © 2011-2022 走看看