zoukankan      html  css  js  c++  java
  • rxgo示例

    rxgo使用场景,基于事件的场景。

    package main
    
    import (
    	"context"
    	"fmt"
    	"github.com/reactivex/rxgo/v2"
    	"math/rand"
    	"time"
    )
    
    type User struct {
    	Name string `json:"name"`
    	Age int `json:"age"`
    }
    
    func main() {
    	//observable := rxgo.Just(1, 2, 3, errors.New("unknown"), 4)()
    	//ch := observable.Observe()
    	//for item := range ch {
    	//	if item.Error() {
    	//		fmt.Println("error: ", item.E)
    	//	} else {
    	//		fmt.Println(item.V)
    	//	}
    	//}
    
    
    	//observable := rxgo.Just(1, 2, 3, errors.New("unknown"), 4)()
    	//<-observable.ForEach(func(v interface{}) {
    	//	fmt.Println("received: ", v)
    	//}, func(err error) {
    	//	fmt.Println("error: ", err)
    	//}, func() {
    	//	fmt.Println("completed")
    	//})
    
    
    	//observable := rxgo.Create([]rxgo.Producer{func(ctx context.Context, next chan<- rxgo.Item) {
    	//	next <- rxgo.Of(1)
    	//	next <- rxgo.Of(2)
    	//	next <- rxgo.Error(errors.New("unknown"))
    	//	next <- rxgo.Of(3)
    	//}})
    	//ch := observable.Observe()
    	//for item := range ch {
    	//	if item.Error() {
    	//		fmt.Println("err: ", item.E)
    	//	} else {
    	//		fmt.Println("item: ", item.V)
    	//	}
    	//}
    
    	//ch := make(chan rxgo.Item)
    	//go func() {
    	//	for i := 0; i < 10; i++ {
    	//		ch <- rxgo.Of(i)
    	//	}
    	//	close(ch)
    	//}()
    	//observable := rxgo.FromChannel(ch)
    	//for itrm := range observable.Observe() {
    	//	fmt.Println(itrm.V)
    	//}
    
    
    	//observable := rxgo.Interval(rxgo.WithDuration(5 * time.Second))
    	//for item := range observable.Observe() {
    	//	fmt.Println(item.V)
    	//}
    
    
    	//observable := rxgo.Range(0, 3)
    	//for item := range observable.Observe() {
    	//	fmt.Println(item.V)
    	//}
    
    
    	//observable := rxgo.Just(1, 2, 3)().Repeat(
    	//	3, rxgo.WithDuration(1*time.Second))
    	//for item := range observable.Observe() {
    	//	fmt.Println(item.V)
    	//}
    
    
    	//observable := rxgo.Defer([]rxgo.Producer{func(_ context.Context, ch chan<- rxgo.Item) {
    	//	for i := 0; i < 3; i++ {
    	//		ch <- rxgo.Of(i)
    	//	}
    	//}})
    	//
    	//for item := range observable.Observe() {
    	//	fmt.Println(item.V)
    	//}
    	//
    	//for item := range observable.Observe() {
    	//	fmt.Println(item.V)
    	//}
    
    
    	//observable := rxgo.Just(1, 2, 3)()
    	//observable = observable.Map(func(_ context.Context, i interface{}) (interface{}, error) {
    	//	return i.(int)*2 + 1, nil
    	//}).Map(func(_ context.Context, i interface{}) (interface{}, error) {
    	//	return i.(int)*3 + 2, nil
    	//})
    	//for item := range observable.Observe() {
    	//	fmt.Println(item.V)
    	//}
    
    
    	//observable := rxgo.Just(
    	//	User{
    	//		Name:"hufan",
    	//		Age:12,
    	//	},
    	//	User{
    	//		Name: "libai",
    	//		Age:  10,
    	//	})()
    	//observable = observable.Marshal(json.Marshal)
    	//for item := range observable.Observe() {
    	//	fmt.Println(string(item.V.([]byte)))
    	//}
    
    
    	//observable := rxgo.Just(
    	//	`{"name":"dj","age":18}`,
    	//	`{"name":"jw","age":20}`,
    	//)()
    	//observable = observable.Map(func(_ context.Context, i interface{}) (interface{}, error) {
    	//	return []byte(i.(string)), nil
    	//}).Unmarshal(json.Unmarshal, func() interface{} {
    	//	return &User{}
    	//})
    	//for item := range observable.Observe() {
    	//	fmt.Println(item.V)
    	//}
    
    
    	//observable := rxgo.Just(1, 2, 3, 4)()
    	//observable = observable.BufferWithCount(3)
    	//for item := range observable.Observe() {
    	//	fmt.Println(item.V)
    	//}
    
    
    	//ch := make(chan rxgo.Item)
    	//go func() {
    	//	i := 0
    	//	for range time.Tick(time.Second) {
    	//		ch <- rxgo.Of(i)
    	//		i++
    	//	}
    	//}()
    	//observable := rxgo.FromChannel(ch).BufferWithTime(rxgo.WithDuration(3 * time.Second))
    	//for item := range observable.Observe() {
    	//	fmt.Println(item.V)
    	//}
    
    
    	//ch := make(chan rxgo.Item, 1)
    	//go func() {
    	//	i := 0
    	//	for range time.Tick(time.Second) {
    	//		ch <- rxgo.Of(i)
    	//		i++
    	//	}
    	//}()
    	//observable := rxgo.FromChannel(ch).BufferWithTimeOrCount(rxgo.WithDuration(3*time.Second), 2)
    	//for item := range observable.Observe() {
    	//	fmt.Println(item.V)
    	//}
    
    
    	//count := 3
    	//observable := rxgo.Range(0, 10).GroupBy(count, func(item rxgo.Item) int {
    	//	return item.V.(int) % count
    	//}, rxgo.WithBufferedChannel(10))
    	//for subObservable := range observable.Observe() {
    	//	fmt.Println("new observable")
    	//	for item := range subObservable.V.(rxgo.Observable).Observe() {
    	//		fmt.Println(item.V)
    	//	}
    	//}
    
    
    	observable := rxgo.Range(1, 100)
    	observable = observable.Map(func(_ context.Context, i interface{}) (interface{}, error) {
    		time.Sleep(time.Duration(rand.Int31()))
    		return i.(int)*2+1, nil
    	}, rxgo.WithCPUPool()).Filter(func(i interface{}) bool {
    		return i.(int)%2 == 0
    	}).Distinct(func(_ context.Context, i interface{}) (i2 interface{}, err error) {
    		return i, nil
    	}).Skip(2).Take(10)
    	for item := range observable.Observe() {
    		fmt.Println(item.V)
    	}
    
    }
    
    // Just
    func add(value int) func(int) int {
    	return func(a int) int {
    		return a + value
    	}
    }
    
    
    
  • 相关阅读:
    【转载】狐狸的故事
    【android】 资源路径
    【随笔】居然又玩了两天
    【android】 新浪oauth,进度条,AnimationDrawable,Toast,android预定义style
    【随笔】写下现在所想的,开始写博客
    【转】让我们如此享受的慢性毒药
    如何生成SPFILE文件
    如何区别存储过程中的EXIT与RUTURN
    如何设置虚拟机网络
    如何写SQLPLUS例子
  • 原文地址:https://www.cnblogs.com/CherryTab/p/13958375.html
Copyright © 2011-2022 走看看