RxJS:合并流:zip运算符


描述

在RxJS中合并流可能会有些棘手。 您需要知道结果流何时发出新值,错误并完成。 取决于使用哪个RxJS运算符,组合的可观察行为可能会有所不同。

目标

这是该系列文章的第一篇,重点是结合流。 主要目的是熟悉属于该组的操作员,并了解何时调用给定观察者的方法。


让我们从一个示例开始,其中有两个数据流。

辅助函数getNewsStream函数返回一个observable,该observable发出给定数量的值(由通道名称标识),并具有提供的频率。

您有两个流:

  • sportsNews $ :5个频率(5秒频率),
  • angularNews $ :3个值,频率为1秒。

生成的news $流从内部可观察对象( sportsNews $angularNews $ )发出一组值。 它发出一对值。

数据流如下:

  • 1秒过去了:angular $(A),体育$(-),新闻$(-),
  • 已通过2秒:angular $(AA),sports $(-),news $(-),
  • 已通过3秒:angular $(AAA |),sports $(-),news $(-),
  • 已通过5秒:angular $(AAA |),sports $(S),news $(AS),
  • 10秒过去了:angular $(AAA |),sports $(SS),news $(AS,AS),
  • 过去了15秒:angular $(AAA |),sports $(SSS),news $(AS,AS,AS |)。

其中A和S分别是角流和运动流发出的值。

如您所见,除非每个内部可观察对象中都有一个对应的值,否则生成(压缩)的流将不会发出值。 此外,如果任何内部流已完成并且所有其值已由所得的可观察值拾取,则压缩后的流将完成。 结果,可观察到的news $不会发出没有相应角度通知的体育新闻。

实时示例:https://jsfiddle.net/pfx2evr6/14/


让我们看看如果有任何内部可观察到的错误会发生什么。

在上面的示例中,当sportsNews $ observable发出第二个值时,将引发错误。 压缩流( news $ )将调用观察者的错误回调,并且如果其内部可观察变量中的任何一个抛出错误,则不会发出任何连续值。 如此一来,您将只收到一个通知和错误消息。

实时示例:https://jsfiddle.net/pfx2evr6/21/


喜欢,爱,恨,鼓掌!