目录:
1、Timer:设定Obervable执行时间间隔。
2、Enumerable to Observable转换
3、Observable to Enumerable 转换
4、zip 匹配的两个序列合并操作,返回一个合并后的值
5、Buffer 缓存集合
6、TimeOut 超出预订时间,取消订阅事件。当超出时间后可以Catch这个异常。
7、Merge 合并两个Observable
8、CombineLatest 最近输入的两个数字相比较
9、Publish 共享订阅事件
10、Interval 订阅事件的时间间隔
11、Group 分组
1、Timer:设定Obervable执行时间间隔。
Subscrible 订阅事件,Dispose 取消订阅的事件。Timer的前一个参数是开始时间,第二个是 时间间隔。
var observable = Observable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1)) .Where(x => x % 2 == 0).Select(x => new { First = x, Second = x * x }); var disposable = observable.Subscribe(a => Console.WriteLine("Value is {0} double Value is {1}", a.First, a.Second), () => Console.WriteLine("Completed")); Console.ReadLine(); disposable.Dispose();
结果
2、Enumerable to Observable转换
IEnumerable<char> c = "abc"; IObservable<char> observable = c.ToObservable(); observable.Subscribe(x => Console.WriteLine(x),()=>Console.WriteLine("Completed"));
3、Observable to Enumerable 转换
IObservable<long> observable = Observable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1)); var enumerable = observable.ToEnumerable(); foreach (var l in enumerable) { Console.WriteLine(l); }
4、zip 匹配的两个序列合并操作,返回一个合并后的值
IObservable<int> observable1 = new int[] { 1, 2, 3 }.ToObservable(); IObservable<int> observable2 = new int[] { 5, 3, 8, 4 }.ToObservable(); var zip = observable1.Zip(observable2, (x1, x2) => x1 * x2); zip.Subscribe(x => Console.WriteLine(x));
5、Buffer 缓存集合
当我集合不断在增加的时候,我需要在增加到一定数量,才执行操作。如下:每过3秒 输出一次结果。
IObservable<long> observable = Observable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1)); var buffer = observable.Buffer(TimeSpan.FromSeconds(3)); buffer.Subscribe(x => { Console.WriteLine("after 3 second"); foreach (var l in x) { Console.WriteLine("value is {0}", l); } });
6、TimeOut 超出预订时间,取消订阅事件。当超出时间后可以Catch这个异常。
IObservable<long> sequence = Observable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1)); sequence = sequence.Buffer(3).SelectMany(values => values); sequence = sequence.Timeout(TimeSpan.FromSeconds(2)); IDisposable subscription = sequence.Subscribe(value => Console.WriteLine("Value produced is {0}", value));
这里TimeOut设置了2秒,而Buffer是3秒,也就是要3秒后输出结果,但这时的3秒超出了我们设定的TimeOut,也就取消了这个订阅事件
,什么都不会输出。我们捕获这个异常。
IObservable<long> sequence = Observable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1)); sequence = sequence.Buffer(3).SelectMany(values => values); sequence = sequence.Timeout(TimeSpan.FromSeconds(2)); IDisposable subscription = sequence.Subscribe(value => Console.WriteLine("Value produced is {0}", value), error => Console.WriteLine("Error occurred [{0}]", error.Message));
对这个异常做处理。
IObservable<long> sequence = Observable.Timer(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(1)); sequence = sequence.Buffer(3).SelectMany(values => values); sequence = sequence.Timeout(TimeSpan.FromSeconds(2)).Catch(new long[] { 1, 3, 5 }.ToObservable()); IDisposable subscription = sequence.Subscribe(value => Console.WriteLine("Value produced is {0}", value), () => Console.WriteLine("Sequence completed normally"));
7、Merge 合并两个Observable
IObservable<int> observable1 = new int[] { 1, 2, 3 }.ToObservable(); IObservable<int> observable2 = new int[] { 5, 3, 8, 4 }.ToObservable(); observable1.Merge(observable2).Subscribe(x=>Console.WriteLine(x));
8、CombineLatest 最近输入的两个数字相比较
IObservable<int> observable1 = new int[] { 1, 2, 3 }.ToObservable(); IObservable<int> observable2 = new int[] { 5, 3, 8, 4 }.ToObservable(); var combineLatest = observable1.CombineLatest(observable2, (x1, x2) => x1 + x2).Subscribe(x=>Console.WriteLine(x));
A和B的序列,任何一边在引发变化时都会取出两边最新的值输出。
9、Publish 共享订阅事件
var unshared = Observable.Range(1, 4); // Each subscription starts a new sequence unshared.Subscribe(i => Console.WriteLine("Unshared Subscription #1: " + i)); unshared.Subscribe(i => Console.WriteLine("Unshared Subscription #2: " + i)); Console.WriteLine(); // By using publish the subscriptions are shared, but the sequence doesn't start until Connect() is called. var shared = unshared.Publish(); shared.Subscribe(i => Console.WriteLine("Shared Subscription #1: " + i)); shared.Subscribe(i => Console.WriteLine("Shared Subscription #2: " + i)); shared.Connect();
10、Interval 订阅事件的时间间隔
var observable = Observable.Interval(TimeSpan.FromMilliseconds(750)).TimeInterval(); observable.Subscribe(x=>Console.WriteLine(x));
Timestamp 可以输出精确到秒后面7位数。
var observable = Observable.Interval(TimeSpan.FromMilliseconds(750)).Timestamp(); observable.Subscribe(x=>Console.WriteLine(x));
11、Group 分组
var timeToStop = new ManualResetEvent(false); var keyPresses = KeyPresses().ToObservable(); var groupedKeyPresses = from k in keyPresses group k by k.Key into keyPressGroup select keyPressGroup; Console.WriteLine("Press Enter to stop. Now bang that keyboard!"); groupedKeyPresses.Subscribe(keyPressGroup => { int numberPresses = 0; keyPressGroup.Subscribe(keyPress => { Console.WriteLine( "You pressed the {0} key {1} time(s)!", keyPress.Key, ++numberPresses); }, () => timeToStop.Set()); }); timeToStop.WaitOne();
static IEnumerable<ConsoleKeyInfo> KeyPresses() { for (; ; ) { var currentKey = Console.ReadKey(true); if (currentKey.Key == ConsoleKey.Enter) yield break; else yield return currentKey; } }