您的位置:首页 > 其它

创建和订阅简单可观察序列

2017-03-06 14:41 309 查看
您不需要手动实现IObservable 接口来创建可观察的序列。 同样,您不需要实现IObserver 来订阅序列。 通过安装Reactive Extension程序集,您可以利用Observable类型,该类型提供了许多静态LINQ运算符,用于创建具有零个,一个或多个元素的简单序列。 此外,Rx提供了订阅扩展方法,根据代理采用OnNext,OnError和OnCompleted处理程序的各种组合。

创建和订阅简单序列

以下示例使用Observable类型的Range运算符创建一个简单的observable数字集合。 观察者使用Observable类的Subscribe方法订阅此集合,并提供作为处理OnNext,OnError和OnCompleted的委托的操作。

Range运算符有几个重载。 在我们的示例中,它创建一个以x开头的整数序列,然后生成y个序列号。

一旦订阅发生,值将被发送到观察者。 OnNext委托打印输出值。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;

namespace SimpleSequence
{
class Program
{
static void Main(string[] args)
{
IObservable<int> source = Observable.Range(1, 10);
IDisposable subscription = source.Subscribe(
x => Console.WriteLine("OnNext: {0}", x),
ex => Console.WriteLine("OnError: {0}", ex.Message),
() => Console.WriteLine("OnCompleted"));
Console.WriteLine("Press ENTER to unsubscribe...");
Console.ReadLine();
subscription.Dispose();
}
}
}


  当观察者订阅可观察序列时,调用Subscribe方法的线程可以不同于序列运行直到完成的线程。因此,Subscribe调用是异步的,因为调用者不会被阻塞,直到序列的观察完成。这将在使用计划程序主题中的更多详细信息。

  请注意,Subscribe方法返回一个IDisposable,以便您可以取消订阅序列并轻松处理它。当在observable序列上调用Dispose方法时,观察器将停止监听observable的数据。通常,您不需要显式调用Dispose,除非您需要提前取消订阅,或者当源可观察序列具有比观察者更长的寿命时。 Rx中的订阅设计用于不使用终结器的火灾忘记场景。当IDisposable实例由垃圾收集器收集时,Rx不会自动处理预订。但是,请注意,Observable运算符的默认行为是尽快处理订阅(即,发布OnCompleted或OnError消息时)。例如,代码var x = Observable.Zip(a,b).Subscribe();将向序列a和b订阅x。如果a抛出错误,x将立即从b取消订阅。

  您还可以调整代码示例以使用Observable类型的Create运算符,该运算符从指定的OnNext,OnError和OnCompleted操作代理创建并返回观察器。然后,您可以将此观察器传递给Observable类型的Subscribe方法。以下示例显示如何执行此操作。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;

namespace SimpleSequence
{
class Program
{
static void Main(string[] args)
{
IObservable<int> source = Observable.Range(1, 10);
IObserver<int> obsvr = Observer.Create<int>(
x => Console.WriteLine("OnNext: {0}", x),
ex => Console.WriteLine("OnError: {0}", ex.Message),
() => Console.WriteLine("OnCompleted"));
IDisposable subscription = source.Subscribe(obsvr);
Console.WriteLine("Press ENTER to unsubscribe...");
Console.ReadLine();
subscription.Dispose();
}
}
}


  除了从头创建可观察序列之外,您还可以将现有枚举器,.NET事件和异步模式转换为可观察序列。 本节中的其他主题将向您展示如何做到这一点。

  请注意,本主题仅显示几个可以从头创建可观察序列的运算符。 要了解有关其他LINQ运算符的更多信息,请参阅使用LINQ运算符查询可观察序列。

使用定时器

以下示例使用Timer运算符创建序列。 该序列将在经过5秒后推出第一个值,然后它将每隔1秒推出后续值。 为了说明的目的,我们将Timestamp运算符链接到查询,以便每个推出的值将在发布时附加。 通过这样做,当我们订阅这个源序列时,我们可以接收它的值和时间戳。

Console.WriteLine(“Current Time: “ + DateTime.Now);

var source = Observable.Timer(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(1))
.Timestamp();
using (source.Subscribe(x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp)))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}
Console.WriteLine("Press any key to exit");
Console.ReadKey();


输出将类似于:

当前时间:5/31/2011 5:35:08 PM

按任意键取消订阅

0:5/31/2011 5:35:13 PM -07:00

1:5/31/2011 5:35:14 PM -07:00

2:5/31/2011 5:35:15 PM -07:00

通过使用Timestamp运算符,我们验证了第一个项目确实在序列开始后5秒被推出,每个项目在1秒后发布。

将Ienumerable集合转换为可观察序列

使用ToObservable运算符,可以将通用ienumerable集合转换为observable序列并进行订阅。

IEnumerable<int> e = new List<int> { 1, 2, 3, 4, 5 };

IObservable<int> source = e.ToObservable();
IDisposable subscription = source.Subscribe(
x => Console.WriteLine("OnNext: {0}", x),
ex => Console.WriteLine("OnError: {0}", ex.Message),
() => Console.WriteLine("OnCompleted"));
Console.ReadKey();


冷与热观察

Cold observables在订阅时开始运行,即,当调用Subscribe时,observable序列仅开始向观察者推送值。值也不会在订阅者之间共享。这不同于热的可观察量,例如鼠标移动事件或股票行情,即使在订阅活动之前已经产生值。当观察者订阅热可观察序列时,它将获得流中的当前值。热可观察序列在所有订户之间共享,并且每个订户被推送到序列中的下一个值。例如,即使没有人订阅了特定的股票行情,股票代码将继续根据市场变动更新其价值。当用户注册该股票的权益时,它将自动获得最新的股票。

下面的例子演示了一个冷的可观察序列。在这个例子中,我们使用Interval操作符来创建一个简单的可观察的序列,在特定的时间间隔(在这种情况下,每1秒钟)。

然后两个观察者订阅这个序列并打印出它的值。您会注意到,每个订户的序列都会重置,其中第二个订阅将从第一个值重新启动序列。

IObservable<int> source = Observable.Interval(TimeSpan.FromSeconds(1));

IDisposable subscription1 = source.Subscribe(
x => Console.WriteLine("Observer 1: OnNext: {0}", x),
ex => Console.WriteLine("Observer 1: OnError: {0}", ex.Message),
() => Console.WriteLine("Observer 1: OnCompleted"));

IDisposable subscription2 = source.Subscribe(
x => Console.WriteLine("Observer 2: OnNext: {0}", x),
ex => Console.WriteLine("Observer 2: OnError: {0}", ex.Message),
() => Console.WriteLine("Observer 2: OnCompleted"));

Console.WriteLine("Press any key to unsubscribe");
Console.ReadLine();
subscription1.Dispose();
subscription2.Dispose();


  在下面的示例中,我们使用Publish运算符将先前的cold observable序列源转换为热序列,该运算符返回一个名为hot的IConnectableObservable实例。发布运算符提供了通过向多个订户广播单个订阅来共享订阅的机制。热作为代理并订阅源,然后当它从源接收值时,将它们推送到其自己的订阅者。要建立对备份源的预订并开始接收值,我们使用IConnectableObservable.Connect()方法。由于IConnectableObservable继承IObservable,因此即使在开始运行之前,我们也可以使用Subscribe订阅此热序列。请注意,在示例中,当订阅1订阅时,热序列尚未启动。因此,没有值被推送到订户。调用Connect后,值随后被推送到subscription1。延迟3秒后,subscription2订阅hot并开始立即从当前位置(在这种情况下为3)直到结束接收值。输出如下所示:

  

Current Time: 6/1/2011 3:38:49 PM

Current Time after 1st subscription: 6/1/2011 3:38:49 PM

Current Time after Connect: 6/1/2011 3:38:52 PM

Observer 1: OnNext: 0

Observer 1: OnNext: 1

Current Time just before 2nd subscription: 6/1/2011 3:38:55 PM

Observer 1: OnNext: 2

Observer 1: OnNext: 3

Observer 2: OnNext: 3

Observer 1: OnNext: 4

Observer 2: OnNext: 4

Console.WriteLine("Current Time: " + DateTime.Now);
var source = Observable.Interval(TimeSpan.FromSeconds(1));            //creates a sequence

IConnectableObservable<long> hot = Observable.Publish<long>(source);  // convert the sequence into a hot sequence

IDisposable subscription1 = hot.Subscribe(                        // no value is pushed to 1st subscription at this point
x => Console.WriteLine("Observer 1: OnNext: {0}", x),
ex => Console.WriteLine("Observer 1: OnError: {0}", ex.Message),
() => Console.WriteLine("Observer 1: OnCompleted"));
Console.WriteLine("Current Time after 1st subscription: " + DateTime.Now);
Thread.Sleep(3000);  //idle for 3 seconds
hot.Connect();       // hot is connected to source and starts pushing value to subscribers
Console.WriteLine("Current Time after Connect: " + DateTime.Now);
Thread.Sleep(3000);  //idle for 3 seconds
Console.WriteLine("Current Time just before 2nd subscription: " + DateTime.Now);

IDisposable subscription2 = hot.Subscribe(     // value will immediately be pushed to 2nd subscription
x => Console.WriteLine("Observer 2: OnNext: {0}", x),
ex => Console.WriteLine("Observer 2: OnError: {0}", ex.Message),
() => Console.WriteLine("Observer 2: OnCompleted"));
Console.ReadKey();


返回
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: