无论在一个软件系统的任何层面,当需要处理“历时过程中逐渐产生的(这段过程的终止时刻可能不存在或无法确定)数据”时,通常使用流的观点看待这些数据(尤其是当需要在这段过程发生期间处理这些数据时)。“数据历时产生”这一事实,可能是业务确实如此所致,也可能是程序员故意做的某种安排(例如通过分批将硬盘上的数据读入内存防止内存不足)。一种称为“响应式编程”的编程范式尤其适合这种情况。
综上,流是时间序列。时间序列与大部分程序员已经习惯使用的空间序列(有序的数据结构,例如列表、数组、字符串等)可以互相转化。
构成流的事件
在该历时过程中产生的每一批数据被称为一个事件(event)。事件,作为描述了某个时刻的信息的数据,由系统中的生产者生成,其特点包括尺寸小、自描述(self-contained)、不可变(immutable)。事件的表现形式可能是人类可读文本、JSON、二进制数据等。人们通常会把相关的一批事件归入同一个 topic 或 stream 之中。
流的使用
本节将讨论作为一种抽象的流的实际用法。
流处理(Stream Processing)
一个流处理器(stream processor)通常有三种对流的可能处理:
- 将流中的数据导入存储系统(例如数据库、缓存、搜索索引等)以备他用。利用数据库的 CDC 实现主从同步是典型应用。产生 CDC 流的数据库作为 leader,其它从库负责消费 CDC 流,该模式有效杜绝了非主从模式下使用 dual write(指应用程序显式地往多个数据库里写入数据) 的种种缺点,例如有很大的状态不同步的可能等
- 把流中的数据直接传给用户(通过电子邮件、通知系统等等)
- 对一个或多个输入流进行处理,输出一个流。这一处理过程被称作 operator 或 job
第3种情况的典型发生场景包括:
- Complex Event Processing(CEP):一个 CEP 系统允许用户使用某种声明式语言(如 SQL、正则表达式等)从流中实时匹配数据,一旦匹配到,则产生一个 complex event。也就是说该系统利用一个或多个流生成一个流
- 对流进行搜索:和上一条类似,但允许用户使用更简单普适的查询方式(而非某种声明式语言)
- 流分析(Stream analytics):如果对流的某些指标感兴趣,则可进行流分析,例如测量某类事件的发生比率/频率、计算流中数据在某窗口下的平均值、从流中发现某种趋势,等等。通常会使用概率算法(见《数据结构3:散列》)来提高计算效率
- 利用流维持物化视图(materialized views,见《3. SQL:DDL部分》)以供他用(通常是用作缓存)。实际上,一个应用程序可以看作一种通过用户输入流得以维持的物化视图(见下文 Event Sourcing 一节)
- 合并多个流(Stream joins):将多个流合并成一个流后,可以更好地进行上述其它应用。有三种形式:
- stream-stream join:例如为分析用户在使用搜索功能时的行为,我们将用户的搜索请求以及相应的返回视为一个个事件,它们构成一个流;用户在搜索页面的点击情视为另一类事件,构成另一个流。通过结合两个流,才能分析某一用户在某次搜索后没有点击哪些条目。进行 stream-stream join 时,流处理器需要维持一定状态,例如,维持根据 seesion ID 索引住的过去一个小时内所有的事件的哈希表,当两个流的新事件发生,都可以更新该哈希表,并可以发出一个事件表示用户的点击情况;每一小时后可以集中发送一个事件表示用户没有点击的情况(我们假设用户在搜索后的一小时没有点击意味着他已经放弃了搜索)
- stream-table join(a.k.a stream enrichment):有时我们想为一个流A中的事件根据我们已有的数据补充一些信息,输出一个由经过补充(enrich)的事件组成的新流。当流A出现新事件时就去数据库中进行相关信息的搜索很可能造成性能问题,一种解决方案是将可能用到的数据先加载到本地内存中或索引在本地硬盘上。这些本地数据可能会过期,这就需要由数据库的CAC机制产生的 table changelog 流来更新本地数据
- table-table join(a.k.a materialized view maintenance):利用多个流来维持一个物化视图(见上文)。这通常需要流处理器维护数据库表(常用作缓存),而这个表的内容通常与某句
SELECT...FROM...JOIN
查询的结果等价,因此得名 table-table join
三类 Stream joins 具有共同点,例如都需要流处理器跨事件地维持状态,而流处理器的状态受各流的各事件的时序影响。这就要求必须考虑实施 stream join 的时机。实际上,并不存在一个“正确的时机”,这意味着实施 stream join 的结果是不确定的(nondeterministic):同样的流、同样的operator,多次应用的结果是不一致的。
流处理时的时间问题
在进行流处理,尤其是实时的流分析时,流处理器通常会设定一个时间窗口(time window),例如“最近五分钟”。但应注意到每个事件至少有两个时刻:由生产者生产出来的时刻(event time)和被流处理器处理的时刻(processing time),其中后者很可能有相当大的相对于前者的时延。除此之外可能有更复杂的情况,例如在手机APP上收集数据作为事件时,存在三种时刻:事件实际发生的时刻(由用户的手机的本地时钟确定,也就是 event time)、设备实发送该事件到服务器的时刻 (由用户的手机的本地时钟确定) 、服务器收到事件的时刻(也就是 processing time)。因此在处理流时设定时间窗口时必须弄清窗口到底是针对哪种时刻的,这决定了窗口的移动逻辑(当窗口决定移动时,称这个窗口 complete 了)。根据移动逻辑的不同,有以下常见的时间窗口类型:
- tumbling window:这种窗口有一个固定时长,且每个事件只会落到一个窗口里。例如一个“每五分钟移动一次的‘最近五分钟’窗口”
- hopping window:有一个固定时长,不同窗口间会有重叠。例如一个“每一分钟移动一次的‘最近五分钟’窗口”
- sliding window:有一个固定时长,窗口将在确保其中最旧的事件和最新的事件的时差不超过该固定时长的前提下尽可能多地收集事件
- session window:不使用固定时长,而根据 seesion ID 来收集事件。当一个 session 在一段时间后没有任何消息,则关闭这个窗口。这种窗口尤其常见于网站分析
当窗口发生移动后才姗姗来迟(可能因为网络传输延迟等)的、本应出现在上一个窗口内的事件被称为 straggler events。流处理器可以忽略这些事件(但应当做个记录,当出现太多 straggler events 时应当有所反应),也可以对此前的输出做一个 correction。
应对流处理过程中的错误
当流处理过程中出现错误时,不可能像一般的数据处理那样简单地抛弃所有处理结果然后重新计算一遍,而是启用基于一定数据粒度的恢复机制,出错时只须抛弃并重计算部分数据。属于一定粒度下的一批数据被称为 microbatching。粒度越小开销越大,但流处理过程的实时性越强。 另一种方案是使用 checkpoint,一旦出现错误,只抛弃上一个 checkpoint 之后的所有输出,并重新进行计算。
以上两种方案的缺点是都无法处理“流处理器的计算有副作用”的情况。为此须为流处理系统引入 atomic commit revisited 的概念,以确保一个流处理器在处理一个事件时,当且仅当没发生错误时才给出输出以及做有副作用的操作,且所有操作要么都成功进行,要么都不进行。实现该概念的方法包括 distributed transactions,或者依赖于操作的幂等性(idempotence)。任何操作都可以通过维护额外的元数据来确保幂等(例如为每个操作指定一个 ID,在实施每个操作前检查 ID 是否和已进行过的操作的 ID 重复了)。
最后,对于需要跨事件地维护状态的流处理器(例如使用任何形式的窗口的流处理器),在发生错误后必须能够恢复其状态。为此,流处理器可以通过某种方式把状态持久化。如果状态的维护是基于很短的窗口的,则通过重新计算也能恢复状态,就无需持久化了。
Event Sourcing
流的一个重要用处是用作 event sourcing。一个程序在某个时刻的状态是从该程序运行起发生的所有事件(用户输入)造成的结果。因此,如果以不可变数据的形式记录(append-only log)下所有事件(或 changelog,是等价的),则可以令程序通过经历(replay) 这些事件进入指定状态( 即 event sourcing )。数据库在事实上只是事件日志(event log)的缓存(保存着最新的状态。而这些状态完全可以通过对changelog 进行 event sourcing 得到)。
基于显式的 event sourcing 思想设计的应用程序通常需要注意以下几点:
- 使用 snapshots storing 机制的辅助,以避免重复进行 event sourcing 以来到某个状态
- 区分 commands 和 events:用户输入,在一开始应当被视为一个 command,只有当同步地验证这个 command 有效后,才能将它转换为一个或多个 event
以不可变数据的形式保存 event log 的好处包括:
- 应用程序从而可以回溯状态。这一特点可以用于实现某些业务,也可以用于 debug
- event log 可以同时被不同主体以不同的方式使用。例如新旧系统交接、转换成不同的形式的数据存入相应的存储系统并确保一致性
- 简化并发控制。唯一需要考虑并发逻辑、对操作进行原子化的只有 event log 处
不可变 event log 的麻烦之处体现在要真正删除它(例如储存空间不够/法律、政治强制力)的时候,因为由它派生的数据可能已出现在各处。
流的传递:消息系统及其实现
为让系统中对事件感兴趣的主体(称为消费者)得到流,需要一种机制将某处产生的流传递给消费者。如果让消费者通过轮询(poll)等方式来检查是否有新事件出现 (pull 模型) ,将造成很多开销,因此通常在系统中加入一种通知机制(notification mechanism),当事件出现时,消费者会通过某种方式(例如回调函数)被动地收到它感兴趣的事件( push 模型)。
通知机制的实现是消息系统 (Messaging Systems) 。消息系统基于发布/订阅模型(publish/subscribe model):生产者(通常有多个)可以将事件推送至一个消息系统(该系统可能具有多个节点。每个节点可以分布在不同的线程/进程/物理机器中)中,随后该消息系统将事件发给所有感兴趣(即进行过订阅的)的消费者(通常有多个)。不同的消息系统的实现细节不尽相同,考察它们时关键在于把握以下两个问题:
- 该消息系统如何处理“生产者生产事件的速度快于消费者处理事件的速度”这一情况?通常有三种选择:
- 作废未被消费者处理的事件
- 在一个队列中缓存(buffer)事件。此种情况应考察如果队列过长会发生什么
- 采用某种背压机制(backpressure),或称流控制机制(flow control),来阻止生产者的事件生产,直到消费者恢复了事件处理能力为止。这实际上是颠倒了一般情况下生产者和消费者的关系,后者对前者产生了主动的影响
- 该消息系统的某个节点崩溃后会发生什么?生产者发送的消息是否会因此丢失?
实现:生产者和消费者直接通信
消息系统最简单的实现就是让生产者直接和消费者通信。UDP多播就是一个典型例子。这种实现的缺点在于一旦消费者离线则生产者的消息都白白发送了,生产者崩溃后也不得不重新生产未发出的事件,造成了不必要的开销。另一方面,各生产者和各消费者间错综复杂的通信关系也加大了系统的复杂程度。
实现:消息队列(Message queue)
消息队列又称消息中介(message broker),是常见的一种消息系统实现。通过维持一个消息服务器,并让生产者和消费者以客户端的身份使用它(各生产者将事件发给中介,中介将事件异步发给相应的消费者),来实现生产者和消费者的通信。
由于存在一个集中处理所有数据的中介,与数据持久性有关的处理可以统一由该中介进行。数据可以被储存在内存或硬盘中从而确保一定的持久性,因此面对处理速度赶不上消息生产速度的情况,消息不会被作废或是触发背压机制。
注意,由于是异步派发消息给消费者,一个消息从产生到被处理可能会经历相当可观的时延。
消息队列与数据库的异同
消息队列,同作为一个数据存放场所,和数据库有以下异同:
- 消息队列通常是为在内存中快速进出的小数据集设计的,积累的数据太多(乃至需要它进行硬盘I/O)会严重影响它派发消息的性能
- 客户端可以以多种方式检索数据库和消息队列中的数据。不过消息队列一般并不支持非常特定的查询(arbitrary queries),但有能力在查询集更新后通知客户端(而数据库做不到这点,查询集更新后,客户端需要重新发起查询获得新数据)
对多消费者的消息派发
当多个消费者订阅了同一 topic 时,面对生产者发来的属于该 topic 的事件,消息队列一般有两种派发方案:
- 根据某种负载均衡算法,将该消息派发给一个特定的消费者
- 将该消息发送给所有消费者(fan-out)
应当结合具体的业务情况结合使用这两种方案。
Ack 和消息重传
为确保发至消费者的事件确实被消费者处理了,消息队列会要求消费者客户端在处理完成后发送一个 ack,在此之后消息队列才会从队列中删掉这个消息。如果没收到 ack 则进行消息重传。为应对“事件已被消费者处理,只是 ack 因某种原因丢失了”的情况,通常会使用某种原子提交协议(atomic commit protocol,见 P360)。
注意,一旦触发重传,可能意味着各事件派发的时序将发生错乱。如果各事件是时序无关的,则这不是一个问题。
实现:log-based 消息系统
一类常见的 log-based 消息系统(例如kafka)以 append-only log 的形式在硬盘上记录事件(而非先在内存中记录直到内存满了才写到硬盘):生产者通过在日志文件末尾写入数据来传递信息,消费者通过顺序读取日志文件来得到信息,并在日志结尾处主动等待新内容的写入。其中,消费者应当维持一个状态记录自己消费到哪一个事件了(可方便别的消费者前来接手),消息系统则只需周期性地为各消费者做记录,从而减小开销。
日志通常可以分片、分机存放,以避免占用一个硬盘的太多空间(注意,占有空间过大时旧日志会被删除,包括未来得及被消费的部分。应用应当实现一个事件积压情况监控的机制)。生产者和消费者可以使用任何一片日志(当然,细节由消息系统为它们隐藏了)。
为实现负载均衡,消息系统可以令客户端(消费者或生产者)长期使用指定的分片,且每个客户端以单线程进行消费,注意这会导致需要准备同分片数量一样多的消费者,以及如果消费某个事件的时间过长,将阻塞之后的事件消费;要实现 fan-out 也容易,让所有消费者都来读取某个日志即可。
log-based 消息系统最适合事件吞吐量很高,但消费每个事件的开销较少的情况。
现实中的流
CDC:一种基于数据库的流的生产机制
数据库的 Change Data Capture(CDC)机制在近年来开始流行。实现该机制后,数据库会将每次数据改动的情况记录成 log 供外部读取,从而成为一种流的源。该类流可被用于导入其它数据存储系统(称为 derived data systems),且处在此类集群中时,生产 change data capture 流的数据库是集群的 leader。将该流完整记录下,可以恢复出完整的数据;或从某个时刻后部分地记录下,则可以基于该时刻的 snapshot 恢复出数据。为解决 log 过大的问题,可以引入 log compaction 机制,该机制通过完整分析 log,确认每个 key 的最新值,从而可以删掉该 key 的值的中间变化的过程。
在实现上, CDC 可借助数据库的触发器功能,但这样做的性能开销颇大;另一种常见做法是异步解析数据库的 changelog(用于数据库内部使用的数据文件)。
Node.js 中的流(Stream)
本节介绍 Node.js 标准库中提供的 Stream 抽象。
Node.js 中的 Stream 除了表示上文所述的作为历时产生数据的源的抽象(Readable
),也能作为数据的目的地的抽象(Writable
、Duplex
、Transform
)。充当后者的 Stream 起到一个数据蓄水池的作用,例如用于控制程序输出至I/O的数据的量和频率,或是方便程序员之后从蓄水池中取出数据使用。
Readable Stream 的功能
Readable Stream 具有以下功能:
-
支持两种模式为消费者产生数据:
-
流动模式(flowing mode):实际上就是使用 push 模型的流。进入该模式的 Readable Stream 对象通过
data
事件给出数据。令 Readable Stream 对象进入该模式的方法有:-
在对象的初始状态下订阅 其
data
事件 -
在对象的初始状态下调用
pipe
方法 -
在对象的暂停模式下调用
resume
方法
-
-
暂停模式(paused mode): 实际上就是使用 pull 模型的流。消费者通过订阅其
readable
事件来知道什么时候有数据可读,随后通过read
方法读取之。进入该模式的方法有:- 在 Readable Stream 对象并没有 pipe 至一个目的地的前提下调用
pause
方法 - 移除其全部的 pipe 目的地
- 在 Readable Stream 对象并没有 pipe 至一个目的地的前提下调用
-
-
数据的产生必然是异步的,即必然发生在下一个 Tick 或之后
-
会维护一个内部缓冲区用于实现 pull 模型。待发放给消费者的数据都存放在缓冲区里。缓冲区里一有数据,就触发
readable
事件 。缓冲区的大小可在构造 Stream 对象时设置 -
支持通过管道(
pipe
)把数据交给作为消费者的 Writable Stream。尽管此时将进入流动模式,但 Writable Stream 内部实际上是以 pull 模型的方式使用它的,因为 Writable Stream 需要根据自己的需要从 Readable Stream 处获得数据。这种 Writable Stream 作为消费者控制 Readable Stream 生产数据的节奏的机制称为背压 (back pressure) 机制
Writable Stream 的功能
Writable Stream(含 Writable
、Duplex
、Transform
)具有以下功能:
- 充当管道的目的地,并调节作为源的 Readable Stream 的生产。内部会维护一个缓冲区,根据缓冲区的情况按需拉取上游数据
基于流的响应式编程:以 Rxjs 为例
流为模拟具有内部状态的对象提供了另一种方式,即——用流表示某对象内部状态的历时过程。从本质上说,流将时间显式地表示了出来,因此就松开了被模拟的世界里的时间与求值过程中事件发生的顺序之间的紧密联系。——《SICP》
由于流自带时序信息,命令式编程所依赖的语句间的先后关系变得不再重要,甚至不再必要,这与声明式,尤其是函数式编程极为契合,因为 FP 中的纯函数们从来不关心时序(否则就不纯了)。
以 Rxjs 为例。Rxjs 中流被抽象为 Observable, 这是比 Node.js 中的 Stream 更抽象的事物。二者有何不同?Node 的 Stream 是和 Unix 哲学紧密契合的概念,非常好用,很简单,容易使用,这是它的优点;但它局限在 IO,通用性不如 Observable,而且提供的操作也仅仅限于 pipe 等最基础的操作,虽然有 event-stream 这样的第三方库加入了大量的实用操作(如map,join,split,merge等),但其功能丰富程度远不如Observable,为 composition 所做的努力也远不如 Observable。
Observable 更适用于声明式编程。而 Node Stream 更应该被理解为一种对产生值、使用值的过程的过程式封装(证据是后者暴露了大量能够干涉数据生产过程的 API)。
通过在“流”这一抽象层次操作数据,程序员可以避免基于底层抽象维护程序状态,因为状态已经由历时的事件来表示了(甚至可以把程序视为“无状态”的。因为事件具有不可变的性质)。并且将流的生产逻辑(处理输入)和消费逻辑(改变内存中的数据/进行输出)解耦,也利于模块化。这种范式契合很多场景,例如 UI 编程。将用户在界面上的反复操作看作一个流,并基于它进行各种流处理、流传递操作,并得到一个输出流供其它内外组件消费(例如渲染界面)。