Flink核心技术:源码剖析与特性开发

978-7-115-58447-2
作者: 黄伟哲
译者:
编辑: 刘雅思

图书目录:

详情

本书主要分为两部分。第一部分以核心概念和基本应用为脉络,介绍了Flink的核心特性(如检查点机制、时间与窗口、shuffle机制等),部署,DataStream API、Dataset API、Table API的应用,运行时的原理等内容,每一章先对概念做基本的介绍,然后基于应用实例详细分析Flink的设计思想和源码实现。第二部分基于对原理的理解,手把手教读者如何进行定制化的特性开发和性能提升,能够让读者对Flink的理解有质的飞跃。这一部分内容来自作者大量的工作实践,所引用例均源自企业级的真实需求,能够解决非常复杂的现网问题。


图书摘要

版权信息

书名:Flink核心技术:源码剖析与特性开发

ISBN:978-7-115-58447-2

本书由人民邮电出版社发行数字版。版权所有,侵权必究。

您购买的人民邮电出版社电子书仅供您个人使用,未经授权,不得以任何方式复制和传播本书内容。

我们愿意相信读者具有这样的良知和觉悟,与我们共同保护知识产权。

如果购买者有侵权行为,我们可能对该用户实施包括但不限于关闭该帐号等维权措施,并可能追究法律责任。

著    黄伟哲

责任编辑 刘雅思

人民邮电出版社出版发行  北京市丰台区成寿寺路11号

邮编 100164  电子邮件 315@ptpress.com.cn

网址 http://www.ptpress.com.cn

读者服务热线:(010)81055410

反盗版热线:(010)81055315

读者服务:

微信扫码关注【异步社区】微信公众号,回复“e58447”获取本书配套资源以及异步社区15天VIP会员卡,近千本电子书免费畅读。


本书以核心概念和基本应用为脉络,介绍Flink的核心特性(如检查点机制、时间与窗口、混洗机制等)、任务部署、DataStream API、DataSet API、Table API的应用以及运行时原理等内容。每章先对概念进行基本介绍,然后基于应用实例详细分析Flink的设计思想和源码实现,逐步引领读者掌握定制化的开发特性并提升性能,让读者对Flink的理解有质的飞跃。本书内容是作者多年工作实践的总结,能够帮助读者实现真实的企业级需求。

本书适合想要学习Flink设计原理并希望对Flink进行定制化开发的平台开发工程师,需要进行架构设计和技术选型的架构师与项目经理,以及计算机相关专业的学生阅读。


2018年,在北京国家会议中心举办的Flink Forward峰会上,来自阿里巴巴、京东、字节跳动、美团等公司的大数据技术负责人向众多参会者介绍了如何使用Flink解决组织内部的业务问题,做到大规模实践。每个人都在热烈地讨论Flink相关的技术实践和应用场景,很难想象Flink是2014年才正式发布的大数据技术,但在场的所有人都深刻感受到实时计算浪潮的到来。现在看来,Flink的口号“实时即未来”(Real-time is the Future)正一步步地变成现实。

数据的快速产生与快速流动对实时分析提出了很高的要求。如果数据不能被很好地实时处理,那么数据本身所蕴含的实时价值会迅速消失。在Flink之前,流处理的解决方案不尽如人意,Storm、Spark Streaming都只能说是过渡方案。

在大家对流处理的需求日益迫切的同时,Google发表的关于MillWheel和Dataflow的两篇论文让我们依稀看到了下一代流处理技术的模样。恰逢其时,Flink作为新一代流处理计算引擎进入了人们的视野。它吸收了上一代流处理技术的经验,融合了学术界和工业界对下一代流处理技术的理论探索与实践,甫一问世,就迅速成为流处理技术的事实标准。

经过几年的飞速发展和社区的共同努力,Flink通过了众多严苛场景的检验,架构逐渐趋于稳定。本书的出版也可以说是恰逢其时,读者可以从中了解到Flink何以在流处理领域“独步天下”。本书从源码出发,对核心技术进行讲解,有助于读者更好地理解Flink,理解开源技术,实现从学习者、使用者到贡献者的身份转变。

范东来  

华为云MVP,《Spark海量数据处理》作者


Flink自发布以来,在学术界与工业界都得到了极广泛的应用,可以说Flink目前已经是全球最为知名的流处理系统框架了,甚至没有之一。但是Flink的使用场景远不止流处理,它在批处理、图、机器学习等场景中依旧可以工作得很好。Flink遵循MapReduce的设计,为用户提供了一组低级编程API(Java语言),同时设计了更高的抽象Table API,支持以Flink SQL的方式进行数据处理。

目前,湖仓一体架构正逐步得到应用。Flink的批流一体设计理念在湖仓一体架构下有着非常大的架构优势,基本上已经成为湖仓一体架构下计算引擎的首选,这也得益于Flink背后的一些关键设计思想。

Flink定位于流计算引擎。这并不意味着Flink只能做流计算,而是Flink用流处理的设计思想更高维度地抽象了数据处理的过程。Flink不提供持久化能力,而是专注于提供跨数千台机器的实时数据处理能力,同时提供多种编程接口,从设计理念到API提供,统一了批流处理过程。这种设计理念极大程度上降低了不同场景下的数据开发成本以及多服务的维护成本,提供了更友好的架构形态。

Flink与大数据生态无缝集成。Flink与Zookeeper、HDFS、Yarn等大数据组件完美集成,可以很好地融入大数据生态,发挥出1+1>2的效果。

对于大数据生态圈或者从事大数据相关工作的从业者,Flink是一个必学的框架,几乎所有架构下都会出现它的身影。目前Flink也在逐步发展,例如实现Flink Table Store向存储延伸。Flink发布多年,其热度依旧不减,因此无论现在还是未来,Flink都是一个非常值得学习的框架。

从学习难度来说,Flink的学习门槛会比同类其他框架略高。在Flink的设计理念中,批是特殊的流,这种批流一体的架构直接从更高维度抽象了数据处理的理念,比单纯的批处理框架或流处理框架有着更高的理解门槛。Flink的部署也比其他框架更为复杂,除了自身可以构建一套独立的集群,Flink也可以与Yarn集成。但是Flink在Yarn下提供了多种不同的On Yarn形态,需要花更多时间去研究。

伟哲做事非常认真,他会把自己在工作中遇到的一些知识进行逐一记录并反复琢磨,不断向下探索,直到掌握知识的完整原理。他将自己多年的Flink实践经验进行整理,汇聚成书,全面地讲解设计理念、数据传输、计算模型等几乎所有的Flink知识模块。

本书由浅入深,从理念到实现,全面地剖析了Flink的原理,可以作为深入学习Flink整体框架的教程,供读者了解Flink的底层知识;也可以作为Flink技术参考手册,供读者在日常的数据工作中碰到“疑难杂症”时翻阅,从而解决开发问题。

白发川

字节跳动大数据资深技术专家

Thoughtworks中国区前数据智能业务线技术负责人


不知不觉,我们已身处大数据时代。即便是一个在学习和工作中完全不会直接接触到此类技术的人,也会频繁听到“大数据”这个词,仿佛一个企业、一个平台或一项服务不与大数据关联,就没有跟上时代的浪潮。前些年,许多组织或公司徒有“大数据”之名,在技术上仍十分守旧,没有任何创新之实。不过,随着近几年大数据技术普及与传统企业向智能化转型,越来越多的企业都已经具备了获取、处理、存储、分析海量数据的能力。通过用户与大数据服务提供方的互动,大数据逐步渗透到人们生产、生活的各个方面。不得不说,大数据时代真的到来了。

对中小企业来说,它们不具备自研框架的能力,因此更愿意拥抱开源,Apache Hadoop框架及其生态圈的各个框架往往会成为其首选。对大型企业来说,在开发数据工具(计算引擎、存储引擎等)时,也一定会参考这些开源框架的设计思想和代码实现,因为这些框架基本都是从早期研究大数据相关问题的论文中催生的,为后来的框架提供了范式。这些开源框架不仅可以直接运用在真实的生产环境中,解决大数据场景中的一般问题,也可以为个人的学习提供友好的入口。

在我开始参与大数据相关的开发时,Hadoop已成为一种十分经典(甚至给人感觉有点过时)的批处理框架,其在批处理领域几乎已经成为行业标准。那时Spark正处于“新锐巅峰”,不仅在批处理领域能提供比Hadoop更高的性能,在流处理领域也有一定的建树,逐步呈现出“取代”Hadoop的势头。Flink在那时作为“新一代流处理计算引擎”进入我们团队的视野,经过快速研究和验证,我们使用它作为Lambda架构中流处理的一部分,并在生产环境中进行实践。我们在受益于Flink流处理的速度的同时,也对其状态存储机制、时间窗口机制、反压机制等技术细节感到困惑。我们对这几个框架的未来发展保持着期待。

后来,由于工作需要和个人兴趣,我对Flink等框架的源码实现进行了更加细致的研究。在了解了更多企业的技术选型后,我发现Hadoop不但没有被后起之秀淘汰,反而因其稳定性而成为企业处理海量数据的“底牌”;Spark则因其良好的性能和成功的商业化发展,被更加广泛地应用在生产环境的批处理场景中。由此,关于Hadoop和Spark的从入门介绍、最佳实践到源码解析的图书层出不穷,给开发人员带来极大的便利。然而,介绍Flink的图书相对较少,源码解析类的图书则几乎没有。这主要是因为流处理场景比批处理场景复杂得多,而且Hadoop和Spark本身就比Flink更早进入应用领域,研究和应用Flink的团队自然就更少。不过,在批流一体化的方向上,Flink比Spark具有更加先进的设计思想。随着人们对实时性要求的提高,更好的解决方案是将对Spark等框架已有的优化移植到Flink上,而不是为Spark等批处理框架添加更多流处理的特性。实际上,国内许多互联网企业和大数据公司都对流处理服务有广泛的需求,甚至已经开始技术转型。阿里巴巴收购Flink背后的商业公司Data Artisans这个事件也许就暗示了未来技术的发展方向。可以预见,以Flink为代表的批流一体化的计算引擎会在未来占据更加重要的位置。

在这样的背景下,我想将自己对Flink的学习和研究成果成体系地与大家分享。这些内容较为深入,涉及代码的运行流程、底层的数据结构、类与类的依赖关系等,体现了对源码实现细节和相关论文的分析与思考。希望本书对同样热爱技术的同行来说是一本实用之书。

本书基于Apache Flink发行版1.10.2,从面向开发人员的API的使用方式入手,逐步对Flink的源码展开分析。本书主要有以下特点。

首先,本书对源码的实现细节进行详细的分析,不仅在逻辑层面展示核心组件之间的交互方式,而且从代码层面揭示它们之间的关系。通过对细节的掌握,读者可以对Flink的运行流程、设计思想有更加深刻的认识。

其次,本书内容层次分明、循序渐进,从用户熟悉的API开始介绍,基于业务代码的运行流程逐步深入讲解执行图生成、任务调度、数据传输等内容。对于不同的模块,侧重点有所不同。如果想要系统地、全面地了解一个框架,可以将本书的学习路径当作一种参考范例。

最后,本书基于一些社区的讨论和论文,提供了几种Flink的特性优化方案。这些优化方案抽象自真实的生产场景,用于解决十分复杂的业务问题。

本书从基础知识开始讲解,内容由浅入深、循序渐进,特别适合对技术执着和有热情的人阅读。本书的读者对象包括:

想要学习Flink设计原理的应用开发工程师;

希望对Flink进行定制化开发的平台开发工程师;

需要进行架构设计、技术选型的架构师和项目经理;

熟悉其他大数据计算引擎,想要进一步学习流处理计算框架的工程师;

计算机相关专业对Flink技术感兴趣的学生。

本书不仅会介绍Flink核心API的使用方式,而且会对核心流程与重要组件的源码实现细节进行详细的分析,建议读者在阅读本书的同时运行开发工具,对相关代码进行调试,这样将事半功倍。

本书包含两个部分和两个电子附录。

第一部分为第1~10章。第1章是序篇,对Flink的历史、应用场景、架构等进行总体介绍。第2章介绍Flink的应用,主要包含核心API的使用方式。第3~10章分模块介绍Flink的源码实现及其设计思想,主要包括执行图生成、任务调度和执行、数据传输、时间与窗口、状态与容错等。

第二部分为第11~14章。这些章讲解的是针对Flink核心功能的特性开发。在阅读这些章时,可以回顾第一部分的相关内容,这样更能加深理解。这些增强的特性均可运用在生产环境中,相信读者可以从中得到启发,解决棘手的技术难题。

本书还提供两个电子附录,分别介绍Flink中的资源管理和类型系统。

感谢我的良师益友范东来先生,从我初入职场至今,是你给了我最多、最有益的帮助。在写作本书的过程中,你多次给予我技术、写作方面的建议,由衷感谢你无私的经验分享,从你的身上我不仅学到了对技术的执着,也学到了待人的真诚。

感谢人民邮电出版社杨海玲编辑与刘雅思编辑在本书出版过程中给予我的信任与帮助,是你们在选题、审阅、排版等工作上的辛勤付出,才使本书得以顺利出版。

感谢我的好友贺鹏数次帮助我解决技术上的难题,与你交流时总有拨云见日之感,是你让我感受到了技术人的纯粹。

最后,感谢我的父母一直以来对我的支持、关怀与宽容,你们是最棒的。感谢代依珊在我写作过程中给我的陪伴与鼓励,你就像一束温暖的光,让我在漫长的写作过程中倍感温暖。


本书由异步社区出品,社区(https://www.epubit.com/)为您提供相关资源和后续服务。

本书提供两个电子附录。要获得相关配套资源,请在异步社区本书页面中点击,跳转到下载界面,按提示进行操作即可。注意:为保证购书读者的权益,该操作会给出相关提示,要求输入提取码进行验证。

您还可以扫码右侧二维码, 关注【异步社区】微信公众号,回复“e58447”直接获取,同时可以获得异步社区15天VIP会员卡,近千本电子书免费畅读。

作者和编辑尽最大努力来确保书中内容的准确性,但难免会存在疏漏。欢迎您将发现的问题反馈给我们,帮助我们提升图书的质量。

当您发现错误时,请登录异步社区,按书名搜索,进入本书页面,点击“提交勘误”,输入勘误信息,点击“提交”按钮即可。本书的作者和编辑会对您提交的勘误信息进行审核,确认并接受您的建议后,您将获赠异步社区的100积分。积分可用于在异步社区兑换优惠券、样书或奖品。

扫描下方二维码,您将会在异步社区微信服务号中看到本书信息及相关的服务提示。

我们的联系邮箱是contact@epubit.com.cn。

如果您对本书有任何疑问或建议,请您发邮件给我们,并请在邮件标题中注明本书书名,以便我们更高效地做出反馈。

如果您有兴趣出版图书、录制教学视频,或者参与图书技术审校等工作,可以发邮件给本书的责任编辑(liuyasi@ptpress.com.cn)。

如果您来自学校、培训机构或企业,想批量购买本书或异步社区出版的其他图书,也可以发邮件给我们。

如果您在网上发现有针对异步社区出品图书的各种形式的盗版行为,包括对图书全部或部分内容的非授权传播,请您将怀疑有侵权行为的链接通过邮件发给我们。您的这一举动是对作者权益的保护,也是我们持续为您提供有价值的内容的动力之源。

“异步社区”是人民邮电出版社旗下IT专业图书社区,致力于出版精品IT图书和相关学习产品,为作译者提供优质出版服务。异步社区创办于2015年8月,提供大量精品IT图书和电子书,以及高品质技术文章和视频课程。更多详情请访问异步社区官网https://www.epubit.com。

“异步图书”是由异步社区编辑团队策划出版的精品IT专业图书的品牌,依托于人民邮电出版社的计算机图书出版积累和专业编辑团队,相关图书在封面上印有异步图书的LOGO。异步图书的出版领域包括软件开发、大数据、AI、测试、前端和网络技术等。

异步社区

微信服务号



目前在生产环境中可供选择的大数据分布式计算框架层出不穷,其中较为流行、稳定且性能最良好的开源框架包括MapReduce和Spark,它们几乎分别成为批量计算和微批计算的行业标准。而且Spark经过多年的发展,在流式计算领域也有着优异的表现。

虽然Spark在多种计算场景下有着不俗的性能表现,但是由于它早期是用于批量计算的,因此在底层架构、数据抽象等方面仍不可避免地保留了许多批量计算的概念。Flink的诞生并不比Spark晚,不过由于它在设计之初就真正将数据当作数据流而非数据集来处理,因此可以说在流式计算方面,Flink拥有比Spark更加先进的计算模型。此外,Flink以流处理为基础,对批处理也有很好的支持。许多人说“Flink是(继Spark之后的)下一代大数据处理引擎”,这并非噱头。本书会从Flink的计算封装逻辑、执行图的生成、数据的交换、状态容错、窗口计算等各方面对这一说法进行全面且深入的探讨。

本章旨在让读者对Flink有总体的了解,以为接下来的学习打好基础。

希望在学习完本章后,读者能够了解:

Flink的发展历史;

Flink支持的应用场景;

Flink的核心特性;

Flink的架构;

实现Flink源码的基本技巧。

Flink最早是德国一些大学中的研究项目,经过多年的发展,其已在现实的生产场景中得到了越来越多的应用。近些年,由于社区的推动和商业上的支持,Flink在流式计算领域相比其他大数据分布式计算引擎有着明显的优势,并且在批量计算、批流一体化的发展上有着令人期待的前景。

在Flink官网,有一栏为Flink Blog(Flink博客),其中会定期发布一些文章来记录Flink发展过程中的重大事件或介绍新引入的重要概念等。Flink Blog中的第一篇文章宣布了Flink 0.6的发布。在Flink官网中可供下载的第一个版本是0.6版本,因为在0.5版本及之前的版本中,项目名称为Stratosphere,即Flink的前身。

Stratosphere项目起源于德国柏林工业大学(Technische Universität Berlin)Volker Markl教授于2008年提出的构想。由于数据库是Volker Markl教授的主要研究方向之一,因此创建该项目的初衷是构建一个以数据库概念为基础、以大规模并行处理(massively parallel processing,MPP)架构为支撑、以MapReduce计算模型为逻辑框架的分布式数据计算引擎。在此构想之上,在该项目中还专门引入了流处理,为后来Flink的发展“添加”了良好的“基因”。

Volker Markl教授联络德国柏林工业大学、德国柏林洪堡大学(Humboldt–Universität zu Berlin)和德国哈索·普拉特纳研究所(Hasso Plattner Institute)的多名科研人员,共同开始Stratosphere项目的研发。在2010年前后,第一版Stratosphere以开源的形式发布。在获得初步的应用和一定范围的关注后,该项目组在2010年至2014年又持续改进并陆续发布了多个版本。从此期间项目组发表的论文可以观察到,该项目已经具备了后来的Flink的雏形,可以看到如JobGraph、ExecutionGraph等执行图的概念以及作业管理器(JobManager)、任务管理器(TaskManager)等组件的架构设计。

随着知名度的提高,Stratosphere项目遇到了命名的困扰。项目组成员发现Stratosphere这个名字早已由一家商业实体注册,他们不得不对项目重新命名。最终,在2014年申请成为Apache软件基金会的孵化器项目后,经过项目组成员投票,项目正式更名为Flink。Flink在德语中意为“敏捷、快速”。同时,项目组决定使用松鼠形象作为商标,也是为了强调“敏捷、快速”的特性(如图1-1所示)。

图1-1 Flink 商标

Flink自从加入Apache后发展十分迅猛。自2014年8月发布0.6版本后,Flink仅用了3个月左右的时间,在2014年11月就发布了0.7版本,该版本包含Flink目前为止最重要的特性之一——Flink Streaming。Flink于2014年年底顺利从孵化器“毕业”,成为Apache顶级项目。随后,Flink逐步添加了在今天看来都属于其核心功能的特性,如一致性语义、事件时间和Table API等。

随着Flink受到社区越来越多的关注,其功能和稳定性也不断得到完善。一方面是因为它的功能特性受到了商学两界的广泛认可;另一方面也是因为要应对其他已经商业化的大数据计算引擎的竞争,越来越多的公司开始将Flink应用在它们真实的现网环境中,并在技术和商业上共同推动Flink的发展。我国很多公司都已经大规模使用Flink作为其分布式计算场景的解决方案,如阿里巴巴、华为、小米等。其中,阿里巴巴已经基于Flink实时计算平台实现了对淘宝、天猫、支付宝等的数据业务的支持。

早期Stratosphere项目的核心成员曾共同创办了一家名叫Data Artisans的公司,其多年来一直致力于Flink的技术发展和商业化。2019年,阿里巴巴收购了Data Artisans公司,并将其开发的分支Blink开源。相信在未来的发展中,凭借阿里巴巴强大的技术储备和商业支持,以及庞大的数据量和丰富的业务场景,Flink的发展一定会迎来新的机遇。

Flink的应用场景十分广泛,下面介绍3种常见的应用。

在许多场景中,需要处理的数据往往来自事件。小到一些交互式的用户行为,大到一些复杂的业务操作,它们都会被转化成一条条数据,进而形成数据流(事件流)。事件驱动型应用的特点在于,一些事件会触发特定的计算或其他外部行为,其典型场景有异常检测、反欺诈等。

在传统架构下,数据流通常会流入数据库,随后应用系统读取数据库中的数据,根据数据触发计算。在这种架构下,数据和计算分离,而且在存取数据时需要进行远程访问。与传统架构不同,Flink利用有状态的数据流来完成整个过程的处理,无须将数据先存入数据库再读取出来。数据流的状态数据在本地(local)维护,并且会周期性地持久化以实现容错。图 1-2 展示了传统事务型应用架构与Flink事件驱动型应用架构的区别。

图1-2 传统事务型应用架构与Flink事件驱动型应用架构的区别

Flink事件驱动型应用架构的优势在于,它的状态数据在本地维护,不需要远程访问数据库,由此获得了更低的延迟、更高的吞吐量。同时,不像传统架构那样多个应用共享同一个数据库,任何对数据库的修改都需要谨慎协调,Flink事件驱动型应用架构中的每一个应用都独立地维护状态,可以灵活地进行扩/缩容。

从上面的介绍可以了解到,实现事件驱动型应用的关键在于支持“有状态的数据流”及容错机制。这是Flink最优秀的设计之一。这部分内容会在后文详细分析。

从历史发展的角度来看,企业要处理的数据量是由小到大变化的,因此不妨从传统企业的角度来看待数据分析型应用的演变。

过去,传统企业的数据分析型应用往往就是商务智能(business intelligence,BI)系统。一个成熟的BI产品是一套集数据清洗、数据分析、数据挖掘、报表展示等功能于一体的完整解决方案。不过,当数据量过大时传统的BI系统会出现性能瓶颈,而且它的底层是基于关系数据库的,处理非结构化数据时会十分乏力。因此,当今企业在进行技术选型和架构设计时,更倾向于选择Hadoop生态系统组件及其相关架构。

早期大数据场景下的数据分析型应用架构如图1-3所示。

图1-3 早期大数据场景下的数据分析型应用架构

图1-3充分体现了数据分析型应用的核心设计思想,即业务系统与分析系统分离。业务系统的数据周期性地转换并加载到数据仓库中,在数据仓库内部经过分层处理,最终标准化的数据被提供给其他应用使用。这种架构与BI系统的主要区别就是整个流程不再有完整的解决方案,而需要技术人员自己选择工具进行开发和组合。

图1-4 流式架构

从传统的BI系统到早期大数据场景下的数据分析型应用架构,始终存在着一个问题,那就是整个过程中所有的抽取、转换、加载(Extract-Transform-Load,ETL)逻辑都是离线进行的,导致整个分析流程具有较高的延迟。由此,流式架构便应运而生。

流式架构的目的是在不丢失数据的前提下保证整个分析流程的低延迟,如图 1-4所示。

图1-4所示的整个流程少了ETL,直接将数据摄入流处理引擎,经过业务处理后输出给其他应用使用。在早期的技术储备条件下,想要保证低延迟,通常就难以保证结果的准确性,因此流式架构仅适用于那些对数据准确性要求不高,而对数据实时性要求极高的场景。

那么,在早期的技术储备条件下,能否通过架构的演进,既保证数据的实时性,又兼顾数据的准确性呢?开源框架Storm的创始人Nathan Marz提出了Lambda架构,有效地解决了这一问题。

Lambda架构的核心思想是实时处理和离线处理共存,实时处理如流处理一般保证数据的实时性,离线处理通过周期性地合并数据来保证数据的最终一致性。Lambda架构如图1-5所示。

图1-5 Lambda架构

在Lambda架构下,批处理层将准确结果写入批处理表,流处理层则将数据实时地写入速度表,批处理表的结果会定期与速度表中的数据合并以保证其准确性。数据应用则根据需求进行查询。

显而易见,虽然Lambda架构在一定程度上同时保证了数据的准确性与实时性,但它需要开发和维护两套系统,这实在是一笔不小的开销。由此,Kafka的核心成员之一Jay Kreps在Lambda架构的基础上提出了Kappa架构,解决了“两套代码实现一套业务逻辑”的问题。Kappa架构舍弃了批处理层,只保留了流处理层。与流式架构不同的是,Kappa架构需要让业务数据先进入支持数据重播的消息队列(如Kafka)。如果数据出现错误,那么再执行一个流处理作业,以对历史数据进行重新计算。当新启动的作业消费到最新的数据时,让外部应用访问新的服务数据库,完成服务的切换。Kappa架构如图1-6所示。

图1-6 Kappa架构

Kappa架构虽然不需要开发两套代码,但是仍然需要维护两套环境。而且,它所能处理的历史数据会受到消息队列存储策略的限制。

从Lambda架构和Kappa架构的提出者的技术背景可以了解到,他们提出的架构方案都是以他们熟悉的组件特性为基础的。Storm无法很好地保证数据的准确性,因此需要利用批处理层来保证数据的最终一致性。Kafka支持数据重播,因此可以只开发流处理层,在必要的时候对数据进行重播,从而保证数据的准确性。

Kappa架构之所以需要在两套环境中来回切换,主要是因为过去的流处理引擎无法保证数据的准确性,所以需要频繁地重新计算。如果流处理引擎能够像批处理引擎一样保证端到端的数据的最终一致性,从理论上来说就意味着一套环境可以解决所有问题。Flink完美地解决了这一问题。

以Flink作为流处理引擎,其架构如图1-7所示。

图1-7 Flink流式分析架构

Flink内部维护了数据流的状态,并以容错机制、窗口机制等特性作为支持,可以保证精确地实现端到端的数据的最终一致性。同时,Flink提供了从SQL到底层API的多层接口,使分析工作变得十分容易。因为Flink本身也能够进行批处理,所以Flink流式分析架构可以很容易地被转换成批处理架构。

对Kappa架构来说,可以直接选用Flink作为其中的流处理引擎,但此时设计两套环境的主要目的不再是保证数据的准确性,而是当Flink业务代码发生变动时可以执行新的作业,待数据消费到相同位置时及时完成服务的切换。

数据管道型应用也常常作为传统ETL流程的替代流程,与传统的ETL流程相比,其优势在于实时性高。Flink以流式的方式处理数据,无须像传统ETL流程一样进行周期性的离线处理。

数据分析型应用实际上包含数据管道型应用,与数据分析型应用不同的是,数据管道型应用的侧重点在于数据的流转。在数据管道型应用中,数据可能仅仅是从一个消息队列流转到另一个消息队列。

前文介绍了Flink的应用场景,我们已经了解到,正是由于Flink拥有一些特性,某些应用和数据架构的实现才成为可能。本节将简单介绍Flink的核心特性和架构。

了解Flink的核心特性有助于阅读源码时把握重点。Flink包括以下核心特性。

批流一体化。Flink可以在底层用同样的数据抽象和计算模型来进行批处理和流处理。事实上,Flink在设计理念上没有刻意强调批处理或流处理,而更多地强调数据的有界或无界。这个特性在企业技术选型中具有举足轻重的作用,因为这意味着如果Flink能够满足业务需求,就无须用两种甚至多种框架分别实现批处理和流处理,这大大降低了架构设计、开发、运维的复杂度,可以节省大量人力成本。

支持有状态计算。从产品的角度来看,Flink最大的“卖点”就是它支持有状态计算,这是实现前文介绍的事件驱动型应用、数据分析型应用等的基础。正如Flink官网首页上介绍的那样——Flink是数据流上的有状态计算。

提供多种时间语义。在流处理中,数据到达Flink系统的顺序很可能与事件本身发生的顺序不同,这就是流处理中常见的数据乱序现象。针对这个问题,Flink中区分了事件时间和处理时间:前者表示事件发生的时间,一般从数据自带的时间戳字段提取;后者表示数据被Flink处理的系统时间。当Flink选用事件时间对数据进行处理时,可以对数据进行排序等操作,从而得到准确的结果;当Flink选用处理时间对数据进行处理时,虽然不一定能得到准确的结果,但可以满足低延迟需求。多种时间语义使Flink可以在不同的需求实现间达到平衡。

轻量级分布式快照。既然Flink支持有状态计算,那么同时提供对状态的持久化功能就能实现容错机制。Flink提供了检查点(checkpoint)机制和相关组件,可实现状态存储与恢复,其最大的特点是,存储状态的操作过程是十分轻量的分布式过程。

支持多种一致性语义。Flink可以精确地满足系统内部的“至少一次”(at least once)语义和“恰好一次”(exactly once)语义。在外部系统的配合下,Flink也可以比较容易地实现端到端的“恰好一次”行为。

多层级API。Flink为用户提供了多个层级的API,用户可以根据自身对于表达力和易用性的需求来选择。不同层级的API可以混用,以实现复杂的业务逻辑。

Flink的特性远不止上面介绍的这些,还包括丰富的连接器、多平台部署等。但主要是上面介绍的这些特性让Flink实现了相对于其他框架的差异化,并深刻地影响了Flink未来的发展方向。

图1-8所示为Flink对应各层级结构的组件架构。

图1-8 Flink组件架构

如果想利用Flink进行业务开发,那么将重点放在API层即可。要想学习Flink的底层原理,对源码进行定制化开发,则必须深入学习API层下面的层级。

图1-8中,最下层为部署层,Flink提供了多种部署模式,在不同的部署模式之上提供的是相同的运行时架构。

运行时层可以说是Flink组件架构中最重要的一层,大部分概念和核心操作定义都在这一层,包括执行图的生成、作业的调度与部署、数据的处理和交换等。这里先对运行时架构和其中的基本概念进行简单介绍,本书后文会对运行时层的各个环节展开分析。

与大部分分布式架构一样,Flink采用的也是“主从架构”。其中,“主”指作业管理器,负责执行图的生成、作业的调度与部署等;“从”指任务管理器,负责任务的执行、数据的交换等。运行时架构如图1-9所示。

图1-9 运行时架构

在该架构中,客户端(Client)负责向作业管理器(JobManager)提交作业,从而生成执行图;JobManager负责任务(Task)的调度和部署,与任务管理器(TaskManager)通信,将任务派发到TaskManager中;TaskManager根据资源情况将任务放在各个任务槽(TaskSlot)中执行,并向JobManager汇报任务状态等信息。任务执行过程中会涉及上下游任务的数据交换,这个过程发生在TaskManager内部或TaskManager之间。

图1-9所示的只是一个简化版的运行时架构,但它也基本涵盖Flink运行时的核心组件和流程。学习Flink源码的过程实际上就是逐步理解该架构的过程。比如,可以针对图1-9提出如下问题。

执行图是如何生成的?

任务是如何拆分的?

调度器(scheduler)是如何调度和部署这些任务的?

TaskManager是如何划分资源的?

任务是如何在TaskManager中执行的?

任务的数据是如何交换的?

JobManager与TaskManager之间还存在其他交互,如有关检查点机制的流程中涉及的通信等。这些问题会在阅读后文后一一得到解决。

本书不仅会对Flink各方面的内容进行概念性的介绍,还会更多地就源码的逻辑、设计思想等进行分析,读者可以在阅读本书的同时用调试源码的方式增强理解,也可以基于自己的理解对源码进行修改。因此,我强烈建议读者用IntelliJ IDEA来调试代码,并用Git进行版本控制,甚至可以在自己的代码仓库中不断地对Flink进行定制化开发。

通过git命令从代码仓库把Flink源码工程下载到本地后,切换到release-1.10.2,执行mvn clean install-DskipTests命令进行构建。构建完成后,在Flink-examples模块下分别找到Flink-examples- streaming、Flink-examples-batch、Flink-examples-table中的WordCount程序,如果程序能够运行成功,则可以开始后面的学习。

考虑到Flink是分布式计算引擎,在学习过程中很可能需要将作业部署到集群环境中执行,因此可以通过远程调试对各个进程中的代码进行追踪。Flink的远程调试主要指对JobManager和TaskManager进行调试,可以在Flink-conf.yaml文件中进行如下配置:

env.java.opts.jobmanager: "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5005"
env.java.opts.taskmanager: "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=5006"

在IntelliJ IDEA中,可以通过相应的IP地址和端口号进行调试。

本章首先从发展历史的角度介绍了Flink的发展历程;随后列举了几个典型的应用场景,让读者对使用Flink的场景有直观的感受;接着对Flink的核心特性与架构进行了简单介绍,主要目的是帮助读者建立学习框架;最后介绍了调试源码的工具和技巧。

第2章将参考大多数Flink入门教程的学习路径,列举在Flink应用开发中常用的API,从API层级、编程模型等业务开发角度对Flink的使用进行介绍(不过不会对这部分内容讲解太多)。从第3章起,会从API层的设计思想和代码实现入手进行讲解,以逐步加深读者对源码的学习和理解。

微信扫码关注【异步社区】微信公众号,回复“e58447”获取本书配套资源以及异步社区15天VIP会员卡,近千本电子书免费畅读。


当学习一门新的编程语言时,往往会从“hello world”程序开始,而接触一套新的大数据计算框架时,则一般会从WordCount用例入手。可千万不要小看WordCount,这个用例除了有简洁、易懂的优点,还包含对数据的映射处理和聚合操作,这正是MapReduce编程模型。在分布式架构中,要聚合不同物理节点上的数据,这意味着需要进行网络传输、数据的重分区等。可以说,如果完全了解了WordCount用例在分布式计算框架中的运行原理,基本上就掌握了该框架的核心设计思想。

本章从Flink源码工程中的WordCount程序入手进行讲解,“开门见山”地讲解何谓“Flink编程”。随后会在介绍各个API的语义时对该用例进行简单的修改,引入新的转换操作对数据进行处理。本章会依次介绍DataStream API、DataSet API、Table API和SQL。

希望在学习本章后,读者能够了解:

Flink编程的基本模式;

常用API的语义和用法。

对Flink业务代码开发人员来说,非常重要的是了解如何使用Flink编程模型中的API。Flink将API抽象成了图2-1所示的层级结构。

图2-1 Flink中的API层级结构

在图2-1所示的层级结构中,上层并不是下层的高级封装,该结构展示抽象程度和易用性程度。

底层的状态流处理API的抽象程度最低,而且它只能用于流处理。不过它提供了非常灵活的接口,可以用于自定义底层与状态、时间相关的操作。

DataStream/DataSet API这一层级的API是Flink中的核心API。这一层级中要处理的数据会被抽象成数据流(DataStream)或数据集(DataSet),然后在其上通过定义转换操作实现业务逻辑。这一层级的API的使用风格与Java 8中的Stream编程风格十分类似。

在DataStream/DataSet API之上是Table API。Table API和DataStream/DataSet API不同,不是用复杂的函数定义业务过程的,而是用陈述性的语言加以描述。这样就可大大地降低编程难度,增强描述性。这种语言来自SQL语法,只不过以API的形式呈现出来。既然有了Table API,那么自然可以直接使用SQL来进行描述。这就是最上层的SQL。

总而言之,越上层的API,其描述性和可读性越强;越下层的API,其灵活度越高、表达力越强。多数时候上层API能做到的事,下层API也能做到,反过来却未必。不过,这些API的底层模型是一致的,可以混合使用。

敏锐的读者或许可以从中得到一个推论:当我们用上层API开发业务代码时,在Flink内部会有一个将其转化为底层API的“翻译”过程(比如用SQL开发时,会有各种规则对执行计划进行优化),而这个过程不是业务开发人员可以介入的,得到的结果很有可能不是最优的。这就好比我们用高级语言进行开发,虽然开发效率得到了提高,但是系统的性能往往不如使用底层语言进行开发时那么高。这也提示了我们学习底层实现原理的必要性——从理论上来讲,要想写出性能最佳的Flink业务代码,就应该在理解其转换规则和运行的原理后,用底层的API进行开发。若有余力,则可以思考如何优化转换规则,使框架能够自动生成最佳执行计划。

一般来说,DataStream/DataSet API及其上层API已经能够描述清楚整个业务场景,需要用到底层API的场景较少,因此下面会略过底层的API,直接从核心API层开始介绍。

2.1节提到,DataStream API和DataSet API属于核心API。在探究各种API的底层原理之前,可以这样简单地理解:DataStream API用于处理数据流,对应流处理;DataSet API用于处理数据集,对应批处理。本节主要介绍DataStream API,DataSet API会在2.3节进行详细介绍。

在Flink源码工程的Flink-examples模块下有DataStream版本的WordCount,即org.apache.Flink. streaming.examples.wordcount.WordCount类。这里将其进一步简化,只保留核心的步骤,代码如下:

// (1)获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// (2)指定数据的加载方式
DataStream<String> text = env.fromElements(WordCountData.WORDS);
// (3)指定数据的转换方式
DataStream<Tuple2<String, Integer>> counts =
        text.flatMap(new Tokenizer())
        .keyBy(0).sum(1);
// (4)指定如何输出
counts.print();
// (5)触发程序运行
env.execute();

上述示例中的5个步骤几乎可代表所有Flink业务代码的编程模式(不仅仅是DataStream API)。读者如果接触过Spark等类似Flink的框架,那么应该对这种编程模式并不陌生。下面对这5个步骤稍加解释。

(1)获取执行环境。要想编写一个流处理任务,则需要获取StreamExecutionEnvironment对象,并通过它调用DataStream API。对于其他API,也需要分别获取对应的对象作为入口。在后面介绍其他API时会依次说明对应的类。

(2)指定数据的加载方式。通过执行环境对象env指定数据的加载方式,获取DataStream对象。因为任何一个分布式计算框架对数据的加载、转换、输出操作都是懒执行的,所以这一步并没有真正加载数据,DataStream对象也不会保存数据。它只是封装了加载逻辑,以便继续调用其他接口来定义数据的转换方式。

(3)指定数据的转换方式。这一步与第2步类似,同样是懒执行的,即仅定义数据的转换方式,并不会真正对数据进行处理。flatMap()方法与其他框架中的语义相同。Tokenizer类表示一个分词的处理逻辑,返回一个二元组。keyBy()方法表示对数据进行重分区,按照二元组的第一位(索引为0)来分区,随后对重分区后的数据进行聚合操作。整个逻辑与Hadoop或Spark中的WordCount示例的逻辑完全一致,只是方法名和参数可能有些许差别。

(4)指定如何输出。这一步可以与Spark的编程模式进行对比。在Spark中,算子被分为转换(Transformation)算子和行动(Action)算子。上述代码中的print()看起来对应Spark中的行动算子,但其实两者完全不同。在Spark中,行动算子不仅要定义如何输出,还要肩负生成数据库可用性组(Database Availability Group,DAG)图、划分阶段(stage)、提交作业等多个任务。在DataStream API中,这一步仅定义了数据的输出逻辑,作业的触发执行是在第5步中完成的。

(5)触发程序运行。前面几步中指定了计算逻辑,这些逻辑被封装在env对象中,调用其execute()方法即可完成任务的拆分、提交等工作。这个方法有一个JobExecutionResult类型的返回值,通过该返回值,可以获取任务在执行中返回的结果。

Tokenizer类的代码如下:

public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String,   
Integer>> {
   @Override
   public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
      String[] tokens = value.toLowerCase().split("\\W+");
      for (String token : tokens) {
          if (token.length() > 0) {
              out.collect(new Tuple2<>(token, 1));
          }
      }
   }
}

Flink针对常用的数据源提供了一些现成的方法,也提供了多个接口让用户自定义数据源。大体上,数据源的读取方式分为4类——从文件读取、从套接字读取、从集合读取和自定义数据源。下面介绍这些分类下的方法示例。

(1)从文件读取。

如果想从文本文件读取数据源,则可以调用readTextFile()方法。方法签名如下:

public DataStreamSource<String> readTextFile(String filePath)

该方法是从文件读取数据的最简单的方法。参数为文本文件的路径。

如果调用更下层的方法,则可以更加灵活地读取其他格式的文件,并且可以监控是否有新的文件生成,以决定是否周期性地读取数据源。读取数据源后会对文件进行分片,并分发给后面的任务进行处理。

(2)从套接字读取。

如果想从套接字读取数据源,则可以调用socketTextStream()方法。方法签名如下:

public DataStreamSource<String> socketTextStream(String hostname, int port)

通过指定主机名与端口号就可以从套接字读取数据源。

(3)从集合读取。

从集合读取数据源一般只在演示代码中存在。常用的方法是fromElements():

public final <OUT> DataStreamSource<OUT> fromElements(OUT... data)

前文介绍的WordCount程序中使用的就是该方法。

(4)自定义数据源。

自定义数据源常用的接口是addSource()方法:

public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function)

通过实现SourceFunction接口可以自定义数据源的读取方式。SourceFunction接口如下:

public interface SourceFunction<T> extends Function, Serializable {
   void run(SourceContext<T> ctx) throws Exception;
   void cancel();
}

其中最主要的是实现run()方法。上述几个内置的source在底层也实现了SourceFunction接口。在生产环境中常用抽象类FlinkKafkaConsumerBase的实现类(如FlinkKafkaConsumer010类)来读取Kafka的消息,FlinkKafkaConsumerBase就是一个自定义的SourceFunction。它在run()方法中会循环读取Kafka中的数据。

数据从数据源读取出来后,需要经过转换处理。Flink定义了相当多的转换方法,因此大多数情况下开发人员无须调用底层的API。

(1)Map/FlatMap/Filter。

这3种转换操作的语义与其他框架的一致,可以通过自定义函数将数据流转换成SingleOutput StreamOperator,如:

public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper)
public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper)
public SingleOutputStreamOperator<T> filter(FilterFunction<T> filter)

这3个方法的返回值类型SingleOutputStreamOperator类继承自DataStream类。由于Single OutputStreamOperator类在行为上与DataStream类没有太大的不同,因此若官方文档或一些图书中称这3个方法完成了DataStream到DataStream的转换是没有问题的。这里特此说明,以免读者在阅读不同资料时产生歧义。

(2)KeyBy。

keyBy()方法可以通过指定KeySelector()等方法对数据流进行重分区,如:

public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key)

DataStream经过keyBy()方法的转换后会变成KeyedStream。KeyedStream也继承自DataStream,因此它拥有所有DataStream的属性与行为。

(3)Reduce/Fold/Aggregations。

这3种类型的操作必须作用在KeyedStream上,其含义是对键(key)相同的数据进行某种聚合操作。分区后进行聚合操作在业务上是非常普遍的需求,典型的应用是WordCount。

Reduce和Fold操作通过指定自定义函数规定聚合的逻辑,其中Fold操作需要指定初始值,如:

public SingleOutputStreamOperator<T> reduce(ReduceFunction<T> reducer)
public <R> SingleOutputStreamOperator<R> fold(R initialValue, FoldFunction<T, R> folder)

Aggregations操作用于表示一些内置的聚合操作,如:

public SingleOutputStreamOperator<T> sum(int positionToSum)
public SingleOutputStreamOperator<T> min(int positionToMin)
public SingleOutputStreamOperator<T> max(int positionToMax)

经过聚合操作后,数据流又变回了DataStream。

(4)Window/WindowAll与Window Apply/Window Reduce/ Window Fold/Window Aggregations。

Window/WindowAll这两种操作都用于对数据流进行加窗,它们的区别是:前者对KeyedStream加窗,即对单个分区内的数据加窗;后者对DataStream加窗,即对所有数据加窗。两者的返回值类型也不同,分别是WindowedStream和AllWindowedStream。

在WindowedStream和AllWindowedStream上可以定义窗口内的数据转换操作,即Window Apply/Window Reduce/Window Fold/Window Aggregations操作,返回值类型为DataStream。

关于窗口的应用和底层设计原理在第9章会详细介绍。

(5)Union。

利用Union操作可以将多个数据流合并成一个数据流。对前文介绍的示例WordCount进行简单的修改:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.fromElements(WordCountData.WORDS);
DataStream<Tuple2<String, Integer>> counts =
       text.flatMap(new Tokenizer())
       .keyBy(0).sum(1);
DataStream<Tuple2<String, Integer>> filter1 = counts.filter(value -> value.f1 == 1);
DataStream<Tuple2<String, Integer>> filter2 = counts.filter(value -> value.f1 == 2);
DataStream<Tuple2<String, Integer>> union = filter1.union(filter2);
union.print();
env.execute();

将统计个数为1和2的单词分别过滤出来,再利用Union操作将两个数据流合并,最终的输出结果就是所有统计个数为1或2的单词。

(6)Window Join//Window CoGroup/IntervalJoin。

对两个数据集进行连接(join)是容易理解的,但是对两个无界的数据流进行连接,就需要在相同的时间窗口上进行。Window Join和Window CoGroup操作是在相同时间窗口上对两个数据流进行连接,需要分别指定连接键(join key)。通过观察底层源码能够发现,前者可以理解为后者的一个特例,后者能够更加灵活地对同一分区的数据进行操作。

代码示例如下:

dataStream.join(otherStream)
   .where(<key selector>).equalTo(<key selector>)
   .window(TumblingEventTimeWindows.of(Time.seconds(3)))
   .apply (new JoinFunction () {...});
dataStream.coGroup(otherStream)
   .where(0).equalTo(1)
   .window(TumblingEventTimeWindows.of(Time.seconds(3)))
   .apply (new CoGroupFunction () {...});

IntervalJoin是对两个数据流在某一指定时间区间内进行连接。输入流均为KeyedStream,即连接键已提前指定好。

代码示例如下:

keyedStream.intervalJoin(otherKeyedStream)
   .between(Time.milliseconds(-2), Time.milliseconds(2)) 
   .upperBoundExclusive(true) 
   .lowerBoundExclusive(true) 
   .process(new IntervalJoinFunction() {...});

(7)Connect与CoMap/CoFlatMap。

Connect操作可将两个数据流合并为一个ConnectedStreams。与Union操作不同的是,使用Connect操作合并数据流后,两个数据流还是独立地进行操作的,常用的转换操作是CoMap/CoFlatMap,转换后汇聚成一个数据流输出。返回值类型为DataStream。对WordCount进行简单的修改:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.fromElements(WordCountData.WORDS);
DataStream<Tuple2<String, Integer>> counts =
       text.flatMap(new Tokenizer())
       .keyBy(0).sum(1);
DataStream<Tuple2<String, Integer>> filter1 = counts.filter(value -> value.f1 == 1);
DataStream<Tuple2<String, Integer>> filter2 = counts.filter(value -> value.f1 == 2);
ConnectedStreams<Tuple2<String, Integer>, Tuple2<String, Integer>> connectedStreams = filter1.connect(filter2);
DataStream<Tuple2<String, Integer>> result = connectedStreams.map(new CoMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>>() {
   @Override
   public Tuple2<String, Integer> map1(Tuple2<String, Integer> value) throws Exception {
      value.f1 += 1;
      return value;
   }
   @Override
   public Tuple2<String, Integer> map2(Tuple2<String, Integer> value) throws Exception {
      return value;
   }
});
result.print();
env.execute();

在上面的代码中,先分别过滤出了统计个数为1和2的单词,对两个数据流执行Connect操作,再将一个CoMap作用在数据流上。注意,Connect操作是有顺序的,对于前面的数据流会用map1()方法进行转换,对于后面的数据流会用map2()方法进行转换。上面代码的逻辑是,将统计个数为1的单词的个数再加1,那么最后输出结果中二元组的第二位全部为2。

(8)Split与Select。

Split操作相当于给满足某些条件的数据贴一个标签,标签相同的数据会被输出到同一个位置,其通常与Select操作一起使用。Split操作的返回值类型为SplitStream,执行Select操作后再返回DataStream。对WordCount进行简单的修改:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.fromElements(WordCountData.WORDS);
DataStream<Tuple2<String, Integer>> counts =
       text.flatMap(new Tokenizer())
       .keyBy(0).sum(1);
SplitStream<Tuple2<String, Integer>> split = counts.split(new OutputSelector<Tuple2<String, Integer>>() {
   @Override
   public Iterable<String> select(Tuple2<String, Integer> value) {
      List<String> list = new ArrayList<>();
      if (value.f1 % 2 == 0) {
         list.add("even"); // 相当于给数据贴标签。一条数据可以有多个标签,即在list中添加多个字符串
      } else {
         list.add("odd");
      }
      return list;
   }
});
// 选择某一标签下的数据。参数可以有多个,表示将多个标签下的数据选择出来并形成数据流
DataStream<Tuple2<String, Integer>> even = split.select("even"); 
even.print();
env.execute();

(9)Project。

Project操作只能作用在Tuple类型的数据流上。它会选择其中一部分索引对应的数据形成新的Tuple类型的数据流。对WordCount进行简单的修改:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> text = env.fromElements(WordCountData.WORDS);
DataStream<Tuple2<String, Integer>> counts =
       text.flatMap(new Tokenizer())
       .keyBy(0).sum(1);
DataStream<Tuple2<String, Integer>> filter = counts.filter((FilterFunction<Tuple2<String, Integer>>) value -> value.f1 == 1);
DataStream<Tuple> project = filter.project(0); // 选择索引为0对应的数据形成新的Tuple类型
                                               // 的数据流
project.print();
env.execute();

因为在Flink中Tuple的实现类最多只能到Tuple25,所以要注意不能超出25这个值。

对数据进行转换操作后,往往需要定义输出方式,以将数据输出到外部系统进行存储。大体上,数据输出方式也有4类——输出到文件、输出到套接字、标准输出或标准错误输出和自定义输出。下面介绍这些分类下的方法示例。

(1)输出到文件。

如果想将数据输出到文本文件,则可以调用writeAsText()方法:

public DataStreamSink<T> writeAsText(String path)

调用该方法时需指定文件路径。类似的方法还有writeAsCsv(),其可以用于将数据输出为CSV文件。通常底层调用的是writeUsingOutputFormat()方法,该方法可以用于自定义输出格式(OutputFormat)等。

(2)输出到套接字。

如果想将数据输出到套接字,则可以调用writeToSocket()方法:

public DataStreamSink<T> writeToSocket(String hostName, int port, SerializationSchema<T> schema)

通过指定主机名和端口号输出到套接字。这种输出方式的并行度强制为1。

(3)标准输出或标准错误输出。

如果想实现标准输出或标准错误输出,则可以调用print()方法或printToErr()方法:

public DataStreamSink<T> print()
public DataStreamSink<T> printToErr()

(4)自定义输出。

自定义输出时,可以调用addSink()方法:

public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction)

利用这个方法可以实现SinkFunction接口的自定义输出。SinkFunction接口如下:

public interface SinkFunction<IN> extends Function, Serializable {
   default void invoke(IN value) throws Exception {}
   default void invoke(IN value, Context context) throws Exception {
      invoke(value);
   }
}

其中主要是要实现invoke()方法。上述几个内置的sink在底层也实现了SinkFunction接口。在生产环境中常用的用于输出数据到Kafka的sink的基类FlinkKafkaProducerBase也是一个自定义的SinkFunction,它在invoke()方法中实现了将数据发往Kafka的逻辑。

重分区操作实际上是在底层指定一种分区策略,当消息从上游发往下游时,会根据这种分区策略决定发往哪个任务实例。

前面介绍过的KeyBy也实现对数据进行重分区,只不过经过keyBy()方法的处理后,数据流变成了KeyedStream,可以进行一些分区的聚合操作等。与KeyBy操作不同的是,这里要介绍的重分区操作的返回值类型仍是DataStream。

重分区操作可分为如下几种。

随机重分区(random partitioning):顾名思义,即随机地对数据进行重分区,通过调用shuffle()方法实现。

重新平衡(rebalance)分区:通过重新平衡的方式循环地将数据分发到各个分区,通过调用rebalance()方法实现。

本地重新平衡(local rebalance)分区:将下游的并行实例平均分配给每个上游的并行实例,然后上游的并行实例将数据在本地重新平衡地分配给它对应的下游的并行实例,如图2-2所示。重新平衡分区可实现全局平衡,而本地重新平衡分区实现的是上游的每个并行实例级的平衡,通过调用rescale()方法实现。

广播(broadcasting):将数据广播到下游的每个并行实例上,通过调用broadcast()方法实现。

自定义重分区(custom partitioning):通过调用partitionCustom()方法实现。

图2-2 本地重新平衡分区

DataSet API与DataStream API一样,处于核心API层,用于批处理的场景。

在Flink源码工程的Flink-examples模块下有DataSet版本的WordCount,即org.apache.Flink. examples.java.wordcount.WordCount类。这里将其进一步简化,只保留核心的步骤,代码如下:

// (1)获取执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// (2)指定数据的加载方式
DataSet<String> text = env.fromElements(WordCountData.WORDS);
// (3)指定数据的转换方式
DataSet<Tuple2<String, Integer>> counts =
       text.flatMap(new Tokenizer())
              .groupBy(0).sum(1);
// (4)指定如何输出并触发执行
counts.print();

将DataStream版本的WordCount与DataSet版本的WordCount的核心步骤进行对比可以发现,二者的核心逻辑一致,主要的区别在于:一是执行环境的类不同,DataStream版本中是StreamExecutionEnvironment类,DataSet版本中是ExecutionEnvironment类;二是执行转换操作后的返回值类型不同,分别为DataStream和DataSet。

在上面代码的第4步中,print()方法既指定了输出,又触发了作业的执行。这看起来与DataStream版本不同,但实际上是把触发作业执行的execute()方法封装在了print()方法中,因此这与DataStream版本仍是一致的。

在DataSet API中读取数据源时需要指定输入格式(InputFormat)。Flink提供了一些现成的实现方法,如从文件或集合中读取数据,其中内置了对应的InputFormat。

(1)从文件读取。

可以调用readTextFile()方法实现从文本文件读取数据:

public DataSource<String> readTextFile(String filePath)

调用该方法需要指定文件路径。

(2)从集合读取。

从集合读取数据常用在演示代码中,可以调用fromElements()方法:

public final <X> DataSource<X> fromElements(X... data)

(3)自定义输入源。

调用createInput()方法可以自定义InputFormat:

public <X> DataSource<X> createInput(InputFormat<X, ?> inputFormat)

如果将DataSet API中的转换操作与DataStream API中的进行对比,则会发现它们无法一一对应。这主要是因为人们看待数据流和数据集的方式不一样。对于数据流,更多考虑的是如何处理数据流中的单条数据;而对于数据集,更多考虑的是如何对完整数据集进行处理。

下面对相关操作进行介绍。

(1)Map/FlatMap/Filter/MapPartition/Project。

前3种转换操作的语义与其他框架的一致。MapPartition与Map类似,只不过Map操作是对迭代器中的每个元素单独进行操作,而MapPartition是通过传入迭代器来定义转换操作。相关代码如下:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> text = env.fromElements(WordCountData.WORDS);
DataSet<Tuple2<String, Integer>> counts =
       text.flatMap(new Tokenizer())
              .mapPartition(
                     (MapPartitionFunction<Tuple2<String, Integer>, Tuple2<String,   
                                  Integer>>)
                            (values, out) -> values.forEach(out::collect))
              .groupBy(0)
              .sum(1);
counts.print();

这里Project操作的语义和用法与DataStream API中一致,用于从Tuple类型的数据中提取出元素并返回,形成新的Tuple类型的数据。

(2)分组数据集上的转换操作。

对数据集调用groupBy()方法后,其返回值类型为UnsortedGrouping;对UnsortedGrouping调用sortGroup()方法后,其返回值类型为SortedGrouping,它们都继承自Grouping类。对于UnsortedGrouping,可以执行Reduce/GroupReduce/GroupCombine/Aggregate操作;对于SortedGrouping,可以执行GroupReduce/GroupCombine操作。

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> text = env.fromElements(WordCountData.WORDS);
DataSet<Tuple2<String, Integer>> counts =
       text.flatMap(new Tokenizer())
              .groupBy(0)
              .sum(1);
DataSet first = counts.groupBy(1).sortGroup(0, Order.DESCENDING).first(5);
first.print();

(3)完整数据集上的聚合操作。

DataSet API中的聚合操作可以直接作用在未分区的完整数据集上,这些操作包括Reduce/ ReduceGroup/ ReduceCombine/Aggregate操作。

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> text = env.fromElements(WordCountData.WORDS);
DataSet<Tuple2<String, Integer>> counts =
       text.flatMap(new Tokenizer())
              .first(10)
              .groupBy(0)
              .sum(1);
counts.print();

(4)Distinct。

Distinct的语义与其他框架中的一致,用于去重。

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> text = env.fromElements(WordCountData.WORDS);
DataSet<Tuple2<String, Integer>> counts =
       text.flatMap(new Tokenizer()).distinct()
       .groupBy(0).sum(1);
counts.print();

输入数据经过分词处理后,通过distinct()方法可实现对相同的单词去重,因此所有单词的统计结果为1。

(5)Join/Cross/CoGroup。

相比DataStream API中的Join操作,DataSet API中的Join操作更好理解一些,因为不需要考虑无界的数据流,也不需要考虑窗口与时间,其与传统数据库中的Join语义类似。当两个数据集连接到一起后,通过where()和equalTo()方法指定连接键,还可以选择性地调用with()方法添加JoinFunction,用于定义两个数据集连接到一起后的操作。

在Join操作中可以指定JoinHint,如BROADCAST_HASH_FIRST、REPARTITION_SORT_MERGE等,表示两个数据集以什么方式连接。

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> text = env.fromElements(WordCountData.WORDS);
// 大数据集
DataSet<Tuple2<String, Integer>> counts =
       text.flatMap(new Tokenizer())
       .groupBy(0).sum(1);
// 小数据集
DataSet<Tuple2<String, Integer>> small = text
       .flatMap(new Tokenizer())
       .map(new MapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>() {
   @Override
   public Tuple2<String, Integer> map(Tuple2<String, Integer> value) throws Exception {
       return new Tuple2<>(value.f0, value.f0.length());
   }
}).first(10);
// 对两个数据集进行连接操作,并指定连接方式
DataSet<Tuple2<Tuple2<String, Integer>, Tuple2<String, Integer>>> result
       = counts.join(small, JoinHint.BROADCAST_HASH_SECOND).where(0).equalTo(0);
result.print();

第一个数据集仍是WordCount的结果,第二个数据集中的每个元素是一个二元组,二元组中的第一位表示单词,第二位表示单词长度,并且只取结果中的前10个,以形成一个小数据集。

Cross操作是对两个数据集计算笛卡儿积。CoGroup操作的语义与DataStream API中的一致。

(6)Union。

DataSet API中的Union操作的语义与DataStream API中的一致,表示将两个数据集合并。可以连续执行Union操作合并多个数据集,这些数据集的数据类型必须相同。

下面介绍指定数据输出的方式。

(1)输出到文件。

如果想将数据输出到文本文件,则可以调用writeAsText()方法:

public DataSink<T> writeAsText(String filePath)

调用该方法时需要指定文件路径。类似的方法还有writeAsCsv()等。

(2)标准输出和标准错误输出。

调用print()或printToErr()方法可以实现标准输出或标准错误输出:

public void print() throws Exception
public void printToErr() throws Exception

(3)自定义输出。

调用output()方法可以自定义OutputFormat(输出格式):

public DataSink<T> output(OutputFormat<T> outputFormat)

DataSet API中的重分区操作的返回值类型是DataSet。重分区操作可分为如下几种。

重新平衡(rebalance)分区:数据平均分配到每个下游的任务实例上,通过调用rebalance()方法实现。

哈希重分区(hash-partitioning):根据哈希值进行重分区,通过调用partitionByHash()方法实现。

范围重分区(range-partitioning):根据范围进行重分区,通过调用partitionByRange()方法实现。

自定义重分区(custom partitioning):通过调用partitionCustom()方法实现。

Table API在Flink中的API层级结构中位于核心API的上层,它在底层可转换成数据流或数据集。它的核心思想是对要处理的数据流或数据集注册为一个表(Table),然后用一种直观的方式对其进行关系型操作,如Join、Select、Filter等。由于它是靠上层的API,因此它的灵活度不如下层API的高。目前这套API正在完善中,有些语义尚不支持。

在Flink源码工程的Flink-examples/Flink-examples-table模块下有多个Table API版本的WordCount。不过这些用例将Table API与SQL混合在一起。下面的示例对WordCount进行了修改,其中只包含Table API。

// (1)获取执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
// (2)指定数据的加载方式
DataSet<WC> input = env.fromElements(
       new WC("Hello", 1),
       new WC("Ciao", 1),
       new WC("Hello", 1));
// (3)获取Table对象,定义对表的转换操作
tEnv.createTemporaryView("WordCount", input, "word, frequency");
Table table = tEnv
       .from("WordCount")
       .select("word, frequency")
       .groupBy("word")
       .select("word, frequency.sum as frequency");
// (4)指定如何输出并触发程序运行
DataSet<WC> result = tEnv.toDataSet(table, WC.class);
result.print();

虽然Table API常常与SQL或核心API混合在一起使用,但编程模式与核心API的没有区别。

首先仍然需要获取执行环境。这里的关键是要获取Table API所需的执行环境,即TableEnvironment。

接着是要获取Table对象,这是最重要的一步。获取Table对象有多种方式,上例中是将DataSet对象注册成临时表,再通过临时表创建表。得到Table对象后就可以利用Table API定义转换操作。这部分操作的语义是本节的主要内容。

最后,可以选择性地指定如何输出并触发程序运行。

在整个过程中,Table API可以与其他层级的API混合使用,因此可以在Table对象和DataStream/DataSet对象间自由转换。

上例中的WC类如下:

public static class WC {
   public String word;
   public long frequency;
   public WC() {}
   public WC(String word, long frequency) {
      this.word = word;
      this.frequency = frequency;
   }
   @Override
   public String toString() {
      return "WC " + word + " " + frequency;
   }
}

使用DataStream API或DataSet API时,需要在程序开始处指定执行环境。对DataStream API来说,需要初始化StreamExecutionEnvironment对象;对DataSet API来说,需要初始化ExecutionEnvironment对象。因为Table API在底层要转换成数据流或数据集,所以使用这套API时也需要指定执行环境。

从上层API转换到下层API,可以理解为“翻译”的过程。该“翻译”过程有多种实现方式。目前Flink提供两种“翻译”方式——Old Planner和Blink Planner。但因为Blink Planner做了批流统一,所以在创建执行环境时可以直接指定流模式(StreamingMode)或批模式(BatchMode),在这两种模式下API在底层都会转换成数据流;而Old Planner只能进行流处理,即只能指定流模式,如果想要用Flink原有方式将Table API“翻译”成数据库,则需要初始化BatchTableEnvironment。

总而言之,在初始化执行环境时,存在下面4种情况。

(1)Flink流查询。

StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv);

(2)Flink批查询。

ExecutionEnvironment fbEnv = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment fbTableEnv = BatchTableEnvironment.create(fbEnv);

(3)Blink流查询。

EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
TableEnvironment bsTableEnv = TableEnvironment.create(bsSettings);

(4)Blink批查询。

EnvironmentSettings bbSettings = EnvironmentSettings.newInstance().useBlinkPlanner().  
inBatchMode().build();
TableEnvironment bbTableEnv = TableEnvironment.create(bbSettings);

对第一种情况而言,StreamTableEnvironment.create()方法的内部实际上是这样的:

static StreamTableEnvironment create(StreamExecutionEnvironment executionEnvironment) {
   return create(
      executionEnvironment,
      EnvironmentSettings.newInstance().build());
}

这里利用默认值实例化了EnvironmentSettings对象。因此也可以显式地指定默认值,利用重载的方法完成初始化,如下所示:

EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
StreamExecutionEnvironment fsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment fsTableEnv = StreamTableEnvironment.create(fsEnv, fsSettings);

或者用如下方式完成初始化:

EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build();
TableEnvironment fsTableEnv = TableEnvironment.create(fsSettings);

这样就与第三种情况的初始化方式形成了对比。

StreamTableEnvironment接口继承自TableEnvironment接口,提供了更多方法。在流处理中一般直接使用StreamTableEnvironment接口。因而对于上述第三种情况也可以写成如下形式:

StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);

对于初始化内容可以做如下总结。

原生的Flink和Blink分别提供了一套初始化方式。原生的Flink可以直接通过StreamTable Environment或BatchTableEnvironment的create()方法进行初始化;Blink则要实例化EnvironmentSettings对象,需要指定Planner以及是否为流模式。

原生的Flink会根据执行环境的不同,将Table API分别转换成DataStream API或DataSet API;Blink只需要指定流模式,在底层都会转换成DataStream API,能做到批流统一。

因为Blink无论如何都会将Table API转换成DataStream API,所以在底层,原生的Flink对于流处理执行环境的初始化方式与Blink对于执行环境的初始化方式在形式上达成了一致,通过指定Planner就可以进行区分。需要注意的是,如果用Old Planner,则StreamingMode必须为true。

要想用Table API定义数据的转换操作,首先需要获取Table对象。一般在这个过程中就会定义数据的加载方式。如2.4.1节的示例中,通过env.fromElements()方法获取了DataSet对象,并将其注册在执行环境中。

除此以外,还可以定义TableSource,通过TableEnvironment的registerTableSource()方法对其进行注册:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
tEnv.registerTableSource("WordCount", new CsvTableSource(...));
Table table = tEnv.from("WordCount")
       .select("word, frequency")
       .groupBy("word")
       .select("word, frequency.sum as frequency");
DataSet<WC> result = tEnv.toDataSet(table, WC.class);
result.print();

CsvTableSource类中已经定义好了数据的加载方式。

如果已经获取了Table对象,那么可将其直接注册在执行环境中:

tEnv.registerTable("WordCount", table);

这里所说的注册,主要是指在内存中将临时表的信息维护起来,这样就可以利用TableEnvironment的from()等方法通过指定表名的方式来获取Table对象。如果连接了外部数据源的元信息,那么可以通过指定外部数据源的表名来直接获取Table对象。

总而言之,这种方式是通过内存中或者外部数据源的元信息来构造Table对象的。另外,还可以将DataStream/DataSet直接转换成Table对象。

DataSet<WC> input = env.fromElements(
       new WC("Hello", 1),
       new WC("Ciao", 1),
       new WC("Hello", 1));
Table table = tEnv.fromDataSet(input);

这样通过省略注册的步骤,可直接得到Table对象。实际上在注册过程中就是先调用fromDataStream()/fromDataSet()方法构造Table对象,再将其注册到执行环境中。

Table算子的语义来源于SQL,可以看到Table API的方法名和其中的参数都保留了大量SQL语法的“痕迹”。Table API中大多数转换操作同时支持数据流和数据集,不过由于其语义来源于SQL,而SQL的操作对象主要是数据集而非数据流,因此许多语义对数据流并不适用,这些操作包括Union、Intersect、OrderBy等。针对数据流的处理,Table API专门设计了一些转换操作以满足业务需求,这些操作不能作用在数据集上。

Table API中的转换操作相对复杂,下面介绍一些常用的转换操作。在介绍这些转换操作时,如果没有特殊说明,则表示该操作同时支持批处理和流处理。

(1)Select/As。

select()操作已在2.2.3节中介绍过。as操作用于给某一列取一个别名,相关代码如下:

Table table = tEnv.from("WordCount")
       .select("word, frequency")
       .as("word_alias, frequency_alias")
       .groupBy("word_alias")
       .select("word_alias as word, frequency_alias.sum as frequency");

一旦给某一列取了别名,后面的操作就都要以这个别名为准。最后如果要把Table对象转换为DataStream/DataSet,则需要使列名与DataStream/DataSet中数据类型的属性名一致。从上例也可以看出,As操作可以通过as()方法实现,也可以在select()方法的参数中直接使用AS关键字。

(2)Filter/Where。

这两个算子在Table API中的语义完全相同。实际上where()方法在底层调用的就是filter()方法,相关代码如下:

Table table = tEnv.from("WordCount")
       .select("word, frequency")
       .where("word == 'Ciao'")
       .groupBy("word")
       .select("word, frequency.sum as frequency");

(3)InnerJoin/OuterJoin。

Join操作的语义与SQL中的一致,相关代码如下。

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
DataSet<WC> input = env.fromElements(
   new WC("Hello", 1),
   new WC("Ciao", 1),
   new WC("Hello", 1));
DataSet<Dict> dict = env.fromElements(
   new Dict("Hello", "English"),
   new Dict("Ciao", "Italian"));
tEnv.createTemporaryView("WordCount", input, "word, frequency");
Table table = tEnv.from("WordCount")
   .select("word, frequency")
   .groupBy("word")
   .select("word as word_alias, frequency.sum as frequency");
Table dictTable = tEnv.fromDataSet(dict);
Table result = table.join(dictTable).where("word_alias == word").select("word, frequency, language");
tEnv.toDataSet(result, Row.class).print();

其中Dict类如下:

public static class Dict {
   public String word;
   public String language;
   public Dict() {}
   public Dict(String word, String language) {
      this.word = word;
      this.language = language;
   }
}

(4)Union/UnionAll/Intersect/IntersectAll/Minus/MinusAll。

这部分操作属于集合操作,分别对应SQL中的关键字UNION、UNION ALL、INTERSECT、INTERSECT ALL、EXCEPT和EXCEPT ALL。除了UNION ALL,其余操作均只能作用在数据集上。以UnionAll操作为例:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
DataSet<WC> input = env.fromElements(
       new WC("Hello", 1),
       new WC("Ciao", 1),
       new WC("Hello", 1));
DataSet<WC> input2 = env.fromElements(
       new WC("Hello", 1),
       new WC("Hello", 1),
       new WC("Hello", 1));
tEnv.createTemporaryView("WordCount", input, "word, frequency");
Table table = tEnv.from("WordCount")
       .select("word, frequency")
       .unionAll(tEnv.fromDataSet(input2).select("word, frequency"))
       .groupBy("word")
       .select("word, frequency.sum as frequency");
DataSet<WC> result = tEnv.toDataSet(table, WC.class);
result.print();

(5)In。

In操作对应SQL中的IN关键字。其一般与filter()/where()方法一起使用,相关代码如下:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
DataSet<WC> input = env.fromElements(
       new WC("Hello", 1),
       new WC("Ciao", 1),
       new WC("Hello", 1));
DataSet<String> wordFilter = env.fromElements("Hello");
tEnv.createTemporaryView("WordCount", input, "word, frequency");
tEnv.createTemporaryView("WordFilter", wordFilter, "word");
Table table = tEnv.from("WordCount")
       .select("word, frequency")
       .where("word.in(WordFilter)")
       .groupBy("word")
       .select("word, frequency.sum as frequency");
DataSet<WC> result = tEnv.toDataSet(table, WC.class);
result.print();

(6)OrderBy/Offset/Fetch。

OrderBy操作用于排序,语义与SQL中的一致。Offset和Fetch操作必须用在OrderBy操作的后面,表示从某个偏移量开始取值以及取值数量。这一套转换操作仅针对批处理。

Table table = tEnv.from("WordCount")
       .select("word, frequency")
       .groupBy("word")
       .select("word, frequency.sum as frequency")
       .orderBy("frequency.asc")
       .offset(0)
       .fetch(1);

上例表示根据词频按升序排序,从第一位开始取一个值。

(7)Distinct。

Distinct的语义与SQL中的一致,用于去重。

Table table = tEnv.from("WordCount")
       .select("word, frequency")
       .distinct()
       .groupBy("word")
       .select("word, frequency.sum as frequency");

(8)GroupBy Aggregation/GroupBy Window Aggregation。

GroupBy Aggregation操作就是WordCount中的groupBy()方法加上后续select()中对某一列的聚合操作(frequency.sum)。GroupBy Window Aggregation就是给数据加窗后,在每个窗口内调用groupBy()方法。

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
DataSet<WC> input = env.fromElements(
          new WC("Hello", 1, new Timestamp(1600489315000L)),
          new WC("Ciao", 1, new Timestamp(1600489374000L)),
          new WC("Hello", 1, new Timestamp(1600489854000L)));
tEnv.createTemporaryView("WordCount", input, "word, frequency, rowtime");
Table table = tEnv.from("WordCount")
          .select("word, frequency, rowtime")
          .window(Tumble.over("5.minutes").on("rowtime").as("w"))
          .groupBy("word, w")
          .select("word, frequency.sum as frequency, w.start, w.end, w.rowtime");
DataSet<Row> result = tEnv.toDataSet(table, Row.class);
result.print();

既然要加窗,那么必须有一列用于表示时间。此时的WC类被修改为如下形式:

public static class WC {
   public String word;
   public long frequency;
   public Timestamp rowtime;
   public WC() {}
   public WC(String word, long frequency, Timestamp rowtime) {
      this.word = word;
      this.frequency = frequency;
      this.rowtime = rowtime;
   }
   @Override
   public String toString() {
      return "WC " + word + " " + frequency + " " + rowtime;
   }
}

上例中第一个Hello与Ciao在一个窗口中,第二个Hello在另一个窗口中。

(9)Row-based Operations。

这种类型的操作需要定义一个函数,用于对整行进行操作。常用的函数有map()/flatMap()/ aggregate()。

1)map()。

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
DataSet<String> input = env.fromElements("Hello", "Ciao", "Hello");
ScalarFunction func = new MyMapFunction();
tEnv.registerFunction("func", func);
tEnv.createTemporaryView("Word", input, "word");
Table table = tEnv.from("Word")
              .select("word")
              .map("func(word)")
              .as("word, frequency")
              .groupBy("word")
              .select("word, frequency.sum as frequency");
DataSet<WC> result = tEnv.toDataSet(table, WC.class);
result.print();

其中,MyMapFunction类如下:

public static class MyMapFunction extends ScalarFunction {
   public Row eval(String a) {
      return Row.of(a, 1L);
   }
   @Override
   public TypeInformation<?> getResultType(Class<?>[] signature) {
      return Types.ROW(Types.STRING, Types.LONG);
   }
}

2)flatMap()。

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
DataSet<String> input = env.fromElements("Hello, Ciao, Hello");
TableFunction func = new MyFlatMapFunction();
tEnv.registerFunction("func", func);
tEnv.createTemporaryView("Text", input, "line");
Table table = tEnv.from("Text")
              .select("line")
              .flatMap("func(line)")
              .as("word, frequency")
              .groupBy("word")
              .select("word, frequency.sum as frequency");
DataSet<WC> result = tEnv.toDataSet(table, WC.class);
result.print();

其中,MyFlatMapFunction类如下:

public static class MyFlatMapFunction extends TableFunction<Row> {
   public void eval(String str) {
      String[] tokens = str.toLowerCase().split("\\W+");
      for (String token : tokens) {
         if (token.length() > 0) {
            collector.collect(Row.of(token, 1L));
         }
      }
   }
   @Override
   public TypeInformation<Row> getResultType() {
      return Types.ROW(Types.STRING, Types.LONG);
   }
}

3)aggregate()。

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
DataSet<WC> input = env.fromElements(
       new WC("Hello", 1),
       new WC("Ciao", 1),
       new WC("Hello", 1));
AggregateFunction myAggFunc = new MySum();
tEnv.registerFunction("myAggFunc", myAggFunc);
tEnv.createTemporaryView("WordCount", input, "word, frequency");
Table table = tEnv.from("WordCount")
       .select("word, frequency")
       .groupBy("word")
       .aggregate("myAggFunc(frequency) as frequency")
       .select("word, frequency");
DataSet<WC> result = tEnv.toDataSet(table, WC.class);
result.print();

其中,MySum类如下:

public static class MySumAcc {
   public Long sum = 0L;
}
public static class MySum extends AggregateFunction<Row, MySumAcc> {
   public void accumulate(MySumAcc acc, Long value) {
      acc.sum += value;
   }
   @Override
   public MySumAcc createAccumulator() {
      return new MySumAcc();
   }
   public void resetAccumulator(MySumAcc acc) {
      acc.sum = 0L;
   }
   @Override
   public Row getValue(MySumAcc acc) {
      return Row.of(acc.sum);
   }
   @Override
   public TypeInformation<Row> getResultType() {
      return new RowTypeInfo(Types.LONG);
   }
}

(10)列操作。

这种类型的操作有AddColumns、AddOrReplaceColumns、DropColumns和RenameColumns等。

(11)表的输出。

Table API可以与其他层级的API混合使用,因此有多种方式可用于定义其输出,比如Table对象在执行环境中注册,再由SQL定义输出;或者将其转换为DataStream/DataSet,再由对应的API定义输出。

Table对象本身可以由insertInto()方法定义输出,其参数为一个注册过的TableSink。

SQL位于Flink中的API层级结构中的顶层。它的使用离不开Table API和Table API所需的执行环境TableEnvironment。TableEnvironment主要提供了sqlQuery()和sqlUpdate()方法。前者用于查询,后者用于表、函数的创建、删除操作以及数据的输入操作等。

下面是SQL版本的WordCount:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
DataSet<WC> input = env.fromElements(
       new WC("Hello", 1),
       new WC("Ciao", 1),
       new WC("Hello", 1));
tEnv.createTemporaryView("WordCount", input, "word, frequency");
Table table = tEnv.sqlQuery("SELECT word, SUM(frequency) as frequency FROM WordCount GROUP BY word");
DataSet<WC> result = tEnv.toDataSet(table, WC.class);
result.print();

SQL语句中查询的表必须是在执行环境中可以查询到的表。上例还有另一种写法,如下:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
DataSet<WC> input = env.fromElements(
       new WC("Hello", 1),
       new WC("Ciao", 1),
       new WC("Hello", 1));
Table wordCount = tEnv.fromDataSet(input);
Table table = tEnv.sqlQuery("SELECT word, SUM(frequency) as frequency FROM " + wordCount +  
" GROUP BY word");
DataSet<WC> result = tEnv.toDataSet(table, WC.class);
result.print();

其中没有显式的注册步骤,但其实Table对象的toString()方法会将其自身注册在执行环境中。

本章首先展示了Flink应用程序开发中常用API的层级结构,并从核心API层开始对每一层的编程模式和常用接口等进行了介绍。越下层的API灵活度和复杂性越高,越上层的API易用性和表达力越强。这些API的底层实现一致,因而可以互相转换,混合使用。

每一层的编程模式基本相同,都从获取执行环境开始,然后定义数据源、添加转换操作,最终定义数据输出方式并执行。对于核心API层,操作的对象为数据流或数据集,读者学习的重点在于理解各种转换操作的语义。Table API层之上的层操作的对象则是表,许多转换操作的语义与下层的基本相同,读者学习的重点在于了解获取表的方式以及表与数据流/数据集的相互转换。

微信扫码关注【异步社区】微信公众号,回复“e58447”获取本书配套资源以及异步社区15天VIP会员卡,近千本电子书免费畅读。


相关图书

SPSS医学数据统计与分析
SPSS医学数据统计与分析
首席数据官知识体系指南
首席数据官知识体系指南
大数据实时流处理技术实战——基于Flink+Kafka技术
大数据实时流处理技术实战——基于Flink+Kafka技术
大数据安全治理与防范——流量反欺诈实战
大数据安全治理与防范——流量反欺诈实战
搜索引擎与程序化广告:原理、设计与实战
搜索引擎与程序化广告:原理、设计与实战
医疗大数据挖掘与可视化
医疗大数据挖掘与可视化

相关文章

相关课程