重磅!Flink 将重构其核心线程模型
最近,社区提交了一个新的Proposal: 《Change threading-model in StreamTask to a mailbox-based approach》(来自Ververica的Stefan Richter, Piotr Nowojski),用于改进当前StreamTask
这一核心类的线程模型。这个Proposal可谓是千呼万唤始出来,从去年就开始讨论,之前一直迟迟没有太大的动静,最近终于给出了概要设计。接下来,我们就来简单地介绍一下这个Proposal。
动机
改进的动机肯定来自于先前实现的不足或缺陷。StreamTask
现有的线程模型存在着一些缺陷,它可能导致多个潜在的线程去并发访问其内部状态,比如事件处理以及检查点的触发线程。当前,他们都通过一个全局锁(检查点锁)来保证彼此互斥。这种机制有一些劣势:
锁对象必须在类的各种互斥访问的代码段中进行传递,代码可读性很差,使用不当或者漏用则容易造成许多难以定位的问题;
设计不够优雅,锁对象暴露给了面向用户的API(SourceContext)
改进机制
改进机制希望采用类Actor模型的mailbox机制来取代现有的多线程模型,变成:单线程(mailbox 线程)+ 阻塞队列(mail box)的形式。这样action会入mail box,而对状态的变更则由单一的线程来完成,这种方式阻止了数据的共享(这是大部分情况下并发问题的万恶之源)。
当前检查点锁的使用场景分析
检查点锁用于实现对以下三个并发源之间对StreamTask
的组件状态的互斥访问:
事件处理
检查点
Processing-Time的定时器(Event time的触发是同步的,可以归属到第一点)
下一小节会介绍如何对这三个并发源的处理逻辑进行改进。
提议的改进点
文档所包含的改进点主要有如下四个。
Stream Task中的改变
预期会引入一个mailbox
实例字段,它的类型是一个阻塞队列(ArrayBlockingQueue)。mailbox 主线程将承担当前StreamTask#run()
中的任务,不同于现有的实现,以上三个并发源相关的处理逻辑都将会变成letter
事件入队mailbox并被顺序处理。这些事件预期可能以Runnable
作为共同抽象。伪代码如下:
BlockingQueue<Runnable> mailbox = ...
void runMailboxProcessing() {
//TODO: can become a cancel-event through mailbox eventually
Runnable letter;
while (isRunning()) {
while ((letter = mailbox.poll()) != null) {
letter.run();
}
defaultAction();
}
}
void defaultAction() {
// e.g. event-processing from an input
}
当然这段代码只是描述了实现的核心思想,还有很多待优化的细节尚不明确。
客户端使用检查点锁的通用实现
mailbox的概念其实没有对外暴露,它被隐藏在Queue接口的后面,我们可以将Queue传递给检查点锁来保证向后的兼容性。
事件生成与处理
基于mailbox的机制将大大简化这部分的逻辑,很多加锁的代码段将可以被移除。当前的事件处理主循环也将被打破并改写,当前的循环(One/TwoInputStreamTask):
while (running && inputProcessor.processInput())
将会被简化为如下的每一次调用:
inputProcessor.processInput()
在再次检查mailbox的letter之前。
检查点与定时触发器
mailbox机制天然适合这两种并发源(2,3)。试想一下,其实当前Flink的processing-timer就是用的队列排队异步执行来实现的。
与遗留Source的兼容性问题
由于历史原因,当前Flink Source被实现为运行一个无限循环来进行事件生成,这种实现在后续也将会被进行重构(FLIP-27)。但这种模型跟mailbox无法较好地兼容,目前SourceFunction
的Task实现是SourceStreamTask
而常规算子对应的Task则是StreamTask
(SourceStreamTask继承自StreamTask)。
兼容的核心思想是以两个不同的线程来独立运行,SourceFunction对应的事件生成在一个线程上,而Mailbox是另一个线程,并且两者以检查点锁来保持互斥。时序图如下:
这样针对遗留的Source循环还是以独立的一套机制运行,而绝大部分算子的task则运行在mailbox线程上。