Spark海量数据处理 技术详解与平台实战

978-7-115-50700-6
作者: 范东来
译者:
编辑: 杨海玲
分类: Spark

图书目录:

详情

本书基于Spark发行版2.4.4写作而成,包含大量的实例与一个完整项目,层次分明,循序渐进。全书分为3部分,涵盖了技术理论与实战,读者可以从实战中巩固学习到的知识。第一部分主要围绕BDAS(伯克利数据分析栈),不仅介绍了如何开发Spark应用的基础内容,还介绍了Structured Streaming、Spark机器学习、Spark图挖掘、Spark深度学习等高级主题,此外还介绍了Alluxio系统。第二部分实现了一个企业背景调查系统,比较新颖的是,该系统借鉴了数据湖与Lambda架构的思想,涵盖了批处理、流处理应用开发,并加入了一些开源组件来满足需求,既是对本书第一部分很好的巩固,又完整呈现了一个实时大数据应用的开发过程。第三部分是对全书的总结和展望。 本书适合准备学习Spark的开发人员和数据分析师,以及准备将Spark应用到实际项目中的开发人员和管理人员阅读,也适合计算机相关专业的高年级本科生和研究生学习和参考,对于具有一定的Spark使用经验并想进一步提升的数据科学从业者也是很好的参考资料。

图书摘要

版权信息

书名:Spark海量数据处理:技术详解与平台实战 

ISBN:978-7-115-50700-6

本书由人民邮电出版社发行数字版。版权所有,侵权必究。

您购买的人民邮电出版社电子书仅供您个人使用,未经授权,不得以任何方式复制和传播本书内容。

我们愿意相信读者具有这样的良知和觉悟,与我们共同保护知识产权。

如果购买者有侵权行为,我们可能对该用户实施包括但不限于关闭该帐号等维权措施,并可能追究法律责任。

著    范东来

责任编辑 杨海玲

人民邮电出版社出版发行  北京市丰台区成寿寺路11号

邮编 100164  电子邮件 315@ptpress.com.cn

网址 http://www.ptpress.com.cn

读者服务热线:(010)81055410

反盗版热线:(010)81055315


本书基于Spark发行版2.4.4写作而成,包含大量的实例与一个完整项目,层次分明,循序渐进。全书分为3部分,涵盖了技术理论与实战,读者可以从实战中巩固学习到的知识。第一部分主要围绕BDAS(伯克利数据分析栈),不仅介绍了如何开发Spark应用的基础内容,还介绍了Structured Streaming、Spark机器学习、Spark图挖掘、Spark深度学习等高级主题,此外还介绍了Alluxio系统。第二部分实现了一个企业背景调查系统,比较新颖的是,该系统借鉴了数据湖与Lambda架构的思想,涵盖了批处理、流处理应用开发,并加入了一些开源组件来满足需求,既是对本书第一部分很好的巩固,又完整呈现了一个实时大数据应用的开发过程。第三部分是对全书的总结和展望。

本书适合准备学习Spark的开发人员和数据分析师,以及准备将Spark应用到实际项目中的开发人员和管理人员阅读,也适合计算机相关专业的高年级本科生和研究生学习和参考,对于具有一定的Spark使用经验并想进一步提升的数据科学从业者也是很好的参考资料。


在数字经济时代,数据是最重要的资源要素;同时,新的衍生数据又在源源不断地产生,企业面临的一个基本问题就是如何管理和利用这些数据,这对传统的数据处理方法与分析框架提出了新的诉求和挑战,也是全球业界与学界最为关心的问题。为了满足大数据时代对信息的快速处理的需求,一个分布式的开源计算框架Apache Spark应运而生。

经过十年的发展,Spark 已经发展成为目前大数据处理的标杆,在整个业界得到了广泛的使用。对大数据工程师来说,用Spark构建数据管道无疑是很好的选择,而对数据科学家来说,Spark也是高效的数据探索工具。

作者是我在业界的同事,他是一名大数据架构师,在工作中运用Spark和相关数据处理框架很好地完成了工作任务和创新项目,使公司大数据处理系统高效稳定地运转并驱动整个公司的业务发展。在与数据爱恨纠缠的工作过程中,作者积累了大量的实践经验,我很高兴并支持他将在实践中取得的经验系统性地总结出来,并写成本书。我相信本书会为学习Spark的读者带来很大的帮助。

袁先智博士

中山大学和苏州大学特聘教授,International Journal of Financial Engineering主编,

成都数联铭品科技有限公司资深副总裁与首席风险官


2013年是大数据元年,对于大数据从业者来说,从技术的乏善可陈到百花齐放,从以前大数据只是一个计算机学科的名词,到现在全民对大数据的喜闻乐见,距今不过几年。随着大数据与人工智能写进了“十三五”规划,在国家层面的设计中,大数据占有重要的一席之地。大数据将会深刻地影响各行各业和人们的衣食住行已是毋庸置疑的事实。

Spark源自加州大学伯克利分校的AMP实验室,经过开源社区的多年耕耘,目前已发展成大数据生态圈中最重要的技术之一。Spark的流行不仅在计算机从业人员中,而且一些业务人员、产品经理也经常用Spark进行数据分析,Spark的影响力可见一斑。

在IT时代全面转向DT时代的过程中,机遇与挑战并存,掌握Spark对于想快人一步的读者无疑是非常重要甚至是必备的,帮助他们以最快的速度达到目标正是本书的写作目的。

本书基于Spark发行版2.4,循序渐进,主要分为基本理论、应用实践和总结。本书主要有下面3个特点。

本书内容由浅入深,既适合初学者入门,也适合有一定基础的技术人员进一步提高技术水平。书中还包含大量实战案例,特别适合循序渐进地学习。

本书的读者对象包括:

本书一共分为3部分,分别为第一部分“基础篇”、第二部分“应用篇”和第三部分“总结篇”,共16章。

第一部分主要围绕BDAS(伯克利数据分析栈),不仅会介绍如何开发Spark应用的基础内容,还会介绍Structured Streaming、Spark机器学习、Spark图挖掘、Spark深度学习等高级主题,在第一部分的最后一章还会介绍Alluxio。这一部分中包含了很多完整的示例,并附带了真实的数据集,读者可以自己进行试验。

第二部分会实现一个企业背景调查系统,该系统借鉴了数据湖与Lambda架构的思想,涵盖了批处理、流处理应用开发,并加入了一些开源组件来满足需求,因此,这一部分既是对第一部分很好的巩固,又完整呈现了一个大数据应用开发过程。

第三部分是对全书的总结和展望,主要来源于一些业界和学界的进展。

写书有点儿像是跳水比赛,完成一系列动作后跳入水中,但在浮出水面之前,运动员却无法知道评委的评分。我期待着读者的评价就像跳水运动员期待评委的评分。由于作者水平有限,编写时间仓促,书中难免会出现一些不妥之处,恳请读者批评指正。读者可以将对本书的反馈和疑问发到ddna_1022@163.com,我将尽力为读者提供满意的回复。

感谢数联铭品(BBD)董事长曾途先生为工程师文化与创新精神营造了宽松环境。

感谢人民邮电出版社杨海玲编辑对我一如既往的信任。

感谢封强、赵运枫、蒋松、周秋香、耿绮灵、黄伟哲、罗清扬在我写作过程中对我的帮助。

感谢我的夫人吴静宜在我写作的过程中对我的支持。

谨以此书纪念我的姥姥李琦珺。

范东来

2019年10月


第一部分主要围绕BDAS(伯克利数据分析栈),不仅介绍了如何开发Spark应用的基础内容,还介绍了Structured Streaming、Spark机器学习、Spark图挖掘、Spark深度学习等高级主题,此外还介绍了Alluxio系统。


Spark从2009年问世以来,就以星火燎原之势迅速席卷业界。我还记得在2011年左右,在搜索引擎中输入“Spark”的关键字,置顶结果是雪佛兰的Spark汽车,而如今,Spark已经成了事实上的大数据处理的工业标准、技术栈和行业领先解决方案,不得不令人感叹技术的日新月异。

目前Spark项目被托管在GitHub上,从GitHub上的统计来看,Spark无论是从Contributor还是从commit数量上来说,都可以说是最活跃的开源项目之一,如图1-1和图1-2所示。

图1-1 Spark Contributor人数日益增加

图1-2 Spark活跃度

Spark诞生于学界,成长于社区,它利用了开源社区的力量与文化,集中了世界上的一群天才们,夜以继日地为Spark做出贡献,这也是Spark发展迅速的原因之一。本章将介绍Spark的背景、生态圈以及思想,为本书后面的学习打下基础。

本章包含以下内容:

Hadoop雏形来源于谷歌公司在2004年到2006年间的3篇论文,从这个层面上来说,Hadoop来自业界。而与Hadoop不同,Spark诞生于加州大学伯克利分校的AMP(Algorithms, Machines, and People)实验室,有着非常浓厚的学术背景,借助于伯克利在计算机科学方面强劲的实力,可谓厚积薄发。虽然Spark在2009年才正式发布,但是它具有很明显的后发优势,针对Hadoop(MapReduce)的不足,有针对性地进行设计与开发,一出手便彰显其不凡特性,迅速赢得了开发者的喜爱。在2012年的时候,正是Hadoop如日中天的时代,Spark也才发布了0.6版本,但当时的情况却是开发者都期待着Spark 1.0的发布,由此可见一斑。Spark从一个追赶者变成领导者用了不到几年时间,AMP实验室也因此闻名世界,在2016年的秋天,伯克利关闭了AMP实验室并在其基础上成立了一个新的RISE(Real-time Intelligence with Secure Execution)实验室,如图1-3所示,在该实验室的官网,可以看到RISE实验室获得了国内外多家企业的资助,且与业界领先公司建立了紧密的合作,如蚂蚁金服、华为、微软等。

图1-3 AMP实验室与RISE实验室

AMP实验室在规划Spark的蓝图时,就没有将其定位为一个工具或者是一个普通的开源项目,而是为其规划了一个生态圈,AMP实验室将其称为伯克利数据分析栈(Berkeley Data Analytics Stack,BDAS),它集成了若干AMP实验室开发的组件,BDAS的初衷是解读大数据,Spark是其中最重要的一个组件,如图1-4所示。

图1-4 BDAS(伯克利数据分析栈)

从图1-4可以看到,在Spark生态圈中,除了HDFS以外,留给Hadoop的位置已经不多了。Hadoop只在存储层和资源管理层占有两个位置,即便如此还面临着Alluxio与Mesos的威胁。Spark生态圈的目的是成为大数据技术的工业标准。

本节的最后,再来谈谈Spark与Hadoop之间的关系。首先这两个名字后面都代表了业界先进的大数据技术生态圈,从这个角度上来说,两者肯定是存在竞争关系的,但在实际情况中,由于Hadoop存量用户数太多,加之Spark生态圈技术成熟度的一些问题,因此,更多情况下,Spark和Hadoop会互补形成生产环境的解决方案,它们之间的关系是竞争与合作并存。

在2013年,Spark还只是一个Apache孵化器项目,Spark核心开发团队成立了一家名为Databricks的公司,专注于基于Spark为行业提供高质量的解决方案。Databricks创始人团队中很多都是Spark项目的Committer,在一定程度上,该公司可以影响Spark的发展方向。Databricks与基于Hadoop做商业化的Cloudera和Hontworks有异曲同工之妙,后者班底也是Hadoop的开发团队,如图1-5所示,该图为Databricks的标志。

图1-5 Databricks

Databricks核心平台产品统一分析平台(Unified Analytics Platform)的最大特点就是“易用”,无论是做数据探索还是数据处理都有不错的交互体验,而且涵盖了整个数据科学工程的方方面面,将工程师和科学家从烦琐的工作中解放出来,这与Cloudera的CDH和Hontworks的HDP相比,无疑更受用户的喜爱。

Databricks每年都会组织召开Spark Summit,该会议已经成为Spark开发者和用户的技术盛会。在会上,可以获得Spark较新发展动向、特性以及大量行业应用分享,喜爱大数据与Spark的读者不妨多多关注。Spark Summit是系列会议,第一次是2013年12月在旧金山举行的,之后每年都会举办一次主会议(Spark Summit)和几次分会议(如Spark Summit Europe、Spark Summit East、Spark Summit China等)。2018年6月,Spark Summit改为“Spark + AI Summit”,如图1-6所示,体现了大数据与人工智能的结合。

图1-6 2018年6月Spark Summit

值得一提的是,Databricks这家初创公司在商业化的道路上也走得非常成功。2019年2月5日,Databricks宣布融资2.5亿美元,公司估值27.5亿美元,投资方不乏微软这类业界巨头。

GitHub是全球最大的社会化编程及代码托管网站,随着GitHub的出现,软件开发者才真正拥有了源代码,世界各地的人都可以比从前更加容易地获得源代码,将其自由更改并加以公开。如今,世界上众多的程序员都在通过GitHub来公开源代码,越来越多的项目也选择GitHub进行托管,其中有Linux这种史诗怪兽级项目,也不乏Akka这种精致小巧的项目。Apache Spark也将代码托管在GitHub上,这样就能利用GitHub强大的多人协作能力,轻而易举地使分散在世界各地素未谋面的程序员共同为Spark进行开发。GitHub独有的Pull Request机制,能够使不同背景、不同地域、互不相识的程序员产生化学反应,不得不说是一件很奇妙的事情。

我们从GitHub上面可以看到Spark Contributor中有了很多中国人的面孔,这是非常值得高兴的一件事情,作者也曾经通过向Spark贡献代码而成为Spark Contributor的一员。在大数据时代,中国很有希望实现弯道超车。本节将介绍如何通过GitHub的Pull Request功能,向Spark贡献代码。步骤如下。

(1)Fork Spark项目到自己账号下。你需要注册一个GitHub账号,这样你在GitHub下就有了一个自己的域名,然后,搜索Spark项目,并点击Fork。这样,在你的账号(域名)下面就会有一份Spark的代码仓库。这是属于你自己的代码仓库,你可以将其任意修改,而不会影响Spark项目,如图1-7所示。

图1-7 Fork Spark

(2)创建一个自己用于修改的分支。在创建之前,我们已经在自己的代码仓库上有了一份Spark代码的镜像,这时我们当然可以在这份镜像的master分支上直接进行修改并发起Pull Request。但是更好的做法是用自己的master分支去追踪Spark项目的master分支(因为master分支会随着Pull Request不断变化),并根据当前的master分支创建一个用于修改的分支,我们在该分支上进行修改,修改完成后,再发起Pull Request。我们创建了mybranch分支,如图1-8所示。

图1-8 创建自己的分支

(3)将当前的代码仓库切换到mybranch分支。现在我们准备对当前的新分支进行修改,因此切换到待修改的分支。如图1-9所示。

图1-9 切换到自己的分支

(4)修改代码。选定一个文件,点击修改按钮,进行修改,如图1-10所示。

图1-10 修改代码

修改完成后,commit到mybranch分支,如图1-11所示。

图1-11 提交到自己的分支

(5)处理冲突,提交Pull Request。修改完代码后,就可以准备提交一个Pull Request,在之前还需要考虑一个问题:在我们修改代码的这段时间,Spark的master分支是否已经接受了别人Pull Request而发生了变化,如果是的话,我们需要先处理冲突,再进行提交。

首先,我们通过Pull Request选项卡新建一个Pull Request,如图1-12所示。

图1-12 创建一个PR

这时候会出现你自己修改的版本与你要提交的Spark版本对比的页面,如图1-13所示。

图1-13 版本对比

在页面的后半部分,会显示两个版本之间的差异。如图1-14所示。

图1-14 处理冲突

这时候,如果你发现Spark的master版本已经接受了别人Pull Request而和你当初修改的版本相比发生了变化,那么你需要进行修改以防止出现冲突,处理完毕后,我们就可以完成创建Pull Request了,如图1-15所示。

图1-15 提交PR

我们可以在新建Pull Request页面将关联的Jira issue编号作为题目(如果有的话),便于Committer审阅,另外在正文中尽可能加上对该Pull Request的描述,最好写上关联Jira issue的链接,Jira会自动将该Pull Request进行关联。如果Spark Committer认可了你的Pull Request,那么具有写权限的Committer就会将你的Pull Request合并到Spark的master分支上。

完成了这5步,就完成了向Spark贡献代码的任务了。此外,在学习Spark的过程中,也可以将GitHub作为一个理想的源码阅读、心得记录、代码演练的环境。用好GitHub,将对你学习开源软件起到事半功倍的重要作用。

此外,上述贡献代码的步骤可以用Git的客户端以命令行的形式在本地完成,需要先熟悉Git shell操作,读者可以自行尝试。Git客户端可以在GitHub官网下载。

开源软件与Windows操作系统、Oracle数据库这类商业软件相比有着截然不同的创造力与参与感,甚至在某些理念上,开源软件与商业软件也是完全不同的,但是商业化是开源软件健康发展的必经之路。近年来,开源软件在商业化的道路上做过很多尝试与积极探索,如Databricks这种成功案例,也有一些不太成功的例子,如Hortonworks市值缩水至原来的五分之一,最后不得不和Cloudera合并,但总体来说,开源软件在商业化实践方面逐渐走出了一条路,如2018年10月,大名鼎鼎的全文搜索引擎软件Elasticsearch背后的商业化公司Elastic在纽交所上市,上市首日股价翻倍,达到了惊人的50亿美元;2019年1月,基于Kafka商业化运营的公司Confluent融资1.25亿美元,公司估值超过25亿美元。

Spark在诞生之初就提供了多种编程语言接口:Scala、Java、Python、SQL,在后面的版本中又加入了R语言编程接口。对于Spark来说,虽然内核由Scala编写而成,但是编程语言从来就不是它的重点,从Spark提供这么多的编程接口来看,Spark鼓励不同背景的人去使用Spark完成自己的数据探索工作。尽管如此,不同编程语言在开发效率、执行效率等方面还是有些不同,本书将目前Spark各种编程语言优缺点罗列如表1-1所示。

表1-1 Spark开发语言对比

编程语言

类型

开发效率

执行效率

成熟度

支持类型

Scala

编译型

原生支持

Java

编译型

原生支持

Python

解释型

PySpark

R

解释型

SparkR

SQL

解释型

原生支持

Scala作为Spark的开发语言当然得到原生支持,也非常成熟,它简洁的语法也能显著提高开发效率;Java也是Spark原生支持的开发语言,但是Java语法冗长且不支持函数式编程(1.8以后支持),导致它的API设计得比较冗余以及不合理,再加上需要编译执行,Java开发效率无疑是最低的,但Java程序员基数特别大,Java API对于这些用户来说无疑是很友好的;Python与R语言都是解释型脚本语言,不用编译直接运行,尤其是Python更以简洁著称,开发效率自不必说,此外Python与R语言本身也支持函数式编程,这两种语言在开发Spark作业时也是非常自然,但由于其执行原理是计算任务在每个节点安装的Python或R的环境中执行,结果通过管道输出给Spark Worker,因此效率要比Scala与Java低,而且PySpark、SparkR这两个中间件成熟度有待提高。SQL是Spark原生支持的开发语言,从各个维度上来说都是最优的,因此一般情况下,用Spark SQL解决问题是最优选择。

对于刚刚开始学习Spark的用户,一开始最好选择一门自己最熟悉的语言,这样Spark的学习曲线比较平缓。如果从零开始,建议在Scala与Python中间选择,Scala作为Spark的原生开发语言,如果想要深入了解Spark有必要掌握;Python在开发速度方面的优势可以赋予开发人员极强的数据工程实践能力,加之Python在数据科学领域的广泛应用,可以更好地发挥Spark在数据处理方面的优势。基于以上原因,本书主要的开发语言为Scala,有些案例与实战会用Python实现。

在Spark诞生之初,就有人诟病为什么AMP实验室选了一个如此小众的语言——Scala,很多人还将原因归结为学院派的高冷,但后来事实证明,选择Scala是非常正确的,Scala很多特性与Spark本身理念非常契合,可以说它们是天生一对。Scala背后所代表的函数式编程思想也越来越为人所知。函数式编程思想早在50多年前就被提出,但当时的硬件性能太弱,并不能发挥出这种思想的优势。目前多核CPU大行其道,函数式编程在并发方面的优势也逐渐显示出了威力。这就好像Java在被发明之初,总是有人说消耗内存太多、运行速度太慢,但是随着硬件性能的翻倍,Java无疑是一种非常好的选择。

函数式编程属于声明式编程,与其相对的是命令式编程,命令式编程是按照“程序是一系列改变状态的命令”来建模的一种建模风格,而函数式编程思想是“程序是表达式和变换,以数学方程的形式建立模型,并且尽可能避免可变状态”。函数式编程会有一些类别的操作,如映射、过滤或者归约,每一种都有不同的函数作为代表,如filter、map、reduce。这些函数实现的是低阶变换,而用户定义的函数将作为这些函数的参数来实现整个方程,用户自定义的函数成为高阶变换。

命令式编程将计算机程序看成动作的序列,程序运行的过程就是求解的过程,而函数式编程则是从结果入手,用户通过函数定义了从最初输入到最终输出的映射关系,从这个角度上来说,用户编写代码描述了用户的最终结果(我想要什么),而并不关心(或者说不需要关心)求解过程,因此函数式编程绝对不会去操作某个具体的值,这类似于用户编写的代码:

select class_no, count(*) from student_info group by class_no

对,SQL是很典型的声明式编程,用户只需要告诉SQL引擎统计每个班的人数,至于底层是怎么执行的,用户不需要关心。在《数据库系统概论》(第5版)提到:数据库会把用户提交的SQL查询转化为等价的扩展关系代数表达式,用户用函数式编程的思想进行编码的话,其实就是在直接描述这个关系代数表达式。这么说或许有些抽象,让我们来看个例子,有一个数据清洗的任务,需要将姓氏集合中的单字符姓名(脏数据)去掉,并将首字母大写,最后再拼成一个逗号分隔的字符串,先来看看命令式的实现(Python版):

family_names = ["ann","bob","c","david"]
clean_family_names = []
for i in range(len(family_names)):
   family_name = family_names[i]
   if (len(family_name) > 1):
      clean_family_names.append(family_name.capitalize())
print clean_family_names

再来看看函数式(Scala版)的实现:

val familyNames = List("ann","bob","c","david")
println(
   familyNames.filter(p => p.length() > 1).
   map(f => f.capitalize).
   reduce((a,b) => a + "," + b).toString()
)

从这个例子我们可以看出,在命令式编程的版本中,只执行了一次循环,在函数式编程的版本里,循环执行了3次(filter、map、reduce),每一次只完成一种逻辑(用户编写的匿名函数),从性能上来说,当然前者更为优秀,这说明了在硬件性能羸弱时,函数式的缺点会被放大,但我们也看到了,在函数式编程的版本不用维护外部状态i,这对于并行计算场景非常友好。

在严格的函数式编程中,所有函数都遵循数学函数的定义,必须有自变量(入参),必须有因变量(返回值)。用户定义的逻辑以高阶函数的形式体现,即用户可以将自定义函数以参数形式传入其他低阶函数中。读者可能对函数作为参数难以理解,其实从数学的角度上来说,这是很自然的,下面是一个数学表达式:

括号中的函数f1 = x + b作为参数传给函数f2 = ,这其实是初中的复合函数的用法。相对于高阶函数,函数式语言一般会提供一些低阶函数用于构建整个流程,这些低阶函数都是无副作用的,非常适合并行计算。高阶函数可以让用户专注于业务逻辑,而不需要去费心构建整个数据流。

函数式编程思想因为非常简单,所以特别灵活,用“太极生两仪,两仪生四象,四象生八卦”这句话能很好地反映函数式编程灵活多变的特点,虽然函数式编程语言能显著减少代码行数(其实很多代码由编程语言本身来完成了),但通常让读代码的人苦不堪言。除上述之外,函数式还有很多特性以及有趣之处值得我们去探索。

1.没有变量

在纯粹的函数式编程中,是不存在变量的,所有的值都是不可变(immutable)的,也就是说不允许像命令式编程那样多次给一个变量赋值,比如在命令式编程中我们可以这样写:

x = x + 1

这是因为x本身就是一个可变状态,但在数学家眼中,这个等式是不成立的。

没有了变量,函数就可以不依赖也不修改外部状态,函数调用的结果不依赖于调用的时间和位置,这样更利于测试和调试。另外,由于多个线程之间不共享状态,因此不需要用锁来保护可变状态,这使得函数式编程能更好地利用多核的计算能力。

2.低阶函数与核心数据结构

如果使用低阶函数与高阶函数来完成我们的程序,这时其实就是将程序控制权让位于语言,而我们专注于业务逻辑。这样做的好处还在于,有利于程序优化,享受免费的性能提升午餐,如语言开发者专注于优化低阶函数,而应用开发者则专注于优化高阶函数。低阶函数是复用的,因此当低阶函数性能提升时,程序不需要改一行代码就能免费获得性能提升。此外,函数式编程语言通常只提供几种核心数据结构,供开发者选择,它希望开发者能基于这些简单的数据结构组合出复杂的数据结构,这与低阶函数的思想是一致的,很多函数式编程语言的特性会着重优化低阶函数与核心数据结构。但这与面向对象的命令式编程是不一样的,在OOP中,面向对象编程的语言鼓励开发者针对具体问题建立专门的数据结构。

3.惰性求值

惰性求值(lazy evaluation)是函数式编程语言常见的一种特性,通常指尽量延后求解表达式的值,这样对于开销大的计算可以做到按需计算,利用惰性求值的特性可以构建无限大的集合。惰性求值可以用闭包来实现。

4.函数记忆

由于在函数式编程中,函数本身是无状态的,因此给定入参,一定能得到一定的结果。基于此,函数式语言会对函数进行记忆或者缓存,以斐波那契数列举例,首先用尾递归来实现求斐波那契数列,Python代码如下:

def Fibonacci(n):
   if n == 0 :
      res = 0
   elif num == 1:
      res = 1
   else:
      res = Fibonacci(n - 1) + Fibonacci(n - 2)
return res

当n等于4时,程序执行过程是:

Fibonacci(4)
Fibonacci(3)
Fibonacci(2)
Fibonacci(1)
Fibonacci(0)
Fibonacci(1)
Fibonacci(2)
Fibonacci(1)
Fibonacci(0)

为了求Fibonacci (4),我们执行了1次Fibonacci(3)、2次Fibonacci(2)、3次Fibonacci(1)和2次Fibonacci(0),一共8次计算,在函数式语言中,执行过程是这样的:

Fibonacci(4)
Fibonacci(3)
Fibonacci(2)
Fibonacci(1)
Fibonacci(0)

一共只用4次计算就可求得Fibonacci(4),对于后面执行的Fibonacci(0)、Fibonacci(1),由于函数式语言已经缓存了结果,因此不会重复计算。

5.副作用很少

函数副作用指的是当调用函数时,除了返回函数值之外,还对主调用函数产生附加的影响,例如修改全局变量或修改参数。在函数式编程中,低阶函数本身没有副作用,高阶函数不会(很少)影响其他函数,这对于并发和并行来说非常有用。

函数式编程思想与其他编程思想相比,并没有所谓的优劣之分,还是取决于场景,Spark选择Scala也是由于函数式语言在并行计算下的优势非常契合Spark的使用场景。

Spark的开发语言是Scala,这是Scala在并行和并发计算方面优势的体现,这是微观层面函数式编程思想的一次胜利。此外,Spark在很多宏观设计层面都借鉴了函数式编程思想,如接口、惰性求值和容错等。

本章介绍了Spark的一些背景知识及如何参与社区贡献,另外还提到了函数式编程思想以及在Spark上的体现。读者对函数式编程理解得越深,就越能明白Spark设计的妙处。


在第1章中,读者了解了Spark的背景和思想,在本章中读者将继续深入了解Spark编程相关知识。Spark 1.0于2014年5月发布,标志业界进入了Spark时代。之后,Spark以平均50余天更新一个版本的速度发布,2016年7月,Spark发布2.0版本,而Spark 1.x的版本号最后也定格为1.6.3。说Spark 2.0是对原有Spark的一次革新毫不过分,在整个社区的共同努力下,Spark 2.x在设计思想、编程接口、性能优化方面都有了长足的进步,体现了Spark在数据科学领域的踏实进步与宏大愿景。2019年,Spark 3.x会闪亮登场,百尺竿头,更进一步,在人工智能与机器学习领域开始发力。技术的更迭总是优胜劣汰,本章乃至本书不会对Spark 1.x着墨太多,而是将重点放在Spark的新特性上。

本章包含以下内容:

架构对于技术来说,是技术的灵魂,它体现了技术对于需求的取舍,决定了技术的优点与缺点。Spark的架构也是如此,在分布式技术中,架构无非两种,即主从架构(master-slave)和点对点架构(p2p),Spark采取了前者,也是MapReduce的选择——主从架构,图2-1所示是Spark高层次的架构抽象。

图2-1 Spark架构

Spark程序的入口是Driver中的SparkContext。与Spark 1.x相比,在Spark 2.0中,有一个变化是用SparkSession统一了与用户交互的接口,曾经熟悉的SparkContext、SqlContext、HiveContext都是SparkSession的成员变量,这样更加简洁。SparkContext的作用是连接用户编写的代码与运行作业调度和任务分发的代码。

当用户启动一个Driver程序时,会通过SparkContext向集群发出命令,Executor会遵照指令执行任务。一旦整个执行过程完成,Driver结束整个作业。ClusterManager负责所有Executor的资源管理和调度,根据底层资源管理和调度平台的不同,ClusterManager可以有多种选择,对应了多种资源管理平台,如YARN的ResourceManager与Mesos的ClusterManager,此外Executor也会根据资源管理平台的不同运行在不同的容器中。

Driver解析用户编写的代码,并序列化字节级别的代码,这些代码将会被分发至将要执行的Executor上。当执行Spark作业时,这些计算过程实际上是在每个节点本地计算并完成。实际过程如图2-2所示。

图2-2 Spark执行过程

与图2-1相比,图2-2更像是调大了放大镜的倍数,能让我们将Driver与Executor之间的运行过程看得更加清楚。图2-2中,首先Driver根据用户编写的代码生成一个计算任务的有向无环图(Directed Acyclic Graph,DAG),接着,DAG会根据RDD(弹性分布式数据集)之间的依赖关系被DAGScheduler切分成由Task组成的Stage(TaskSet),TaskScheduler会通过ClusterManager将任务调度到Executor上执行。在DAG中,每个Task的输入就是一个Partition(分区),而一个Executor同时只能执行一个Task,但一个Worker(物理节点)上可以同时运行多个Executor。

从图2-1和图2-2中可以看到,在Spark的架构中,Driver主要负责作业调度工作,Executor主要负责执行具体的作业计算任务,ClusterManager主要负责资源管理和调度,Driver中还有几个重要的组件——SparkContext、DAGScheduler、TaskScheduler。Driver是整个架构中最重要的角色,它通过监控和管理整个执行过程保证了一切按照计划正常运行,此外它还在Spark容错中起到了重要的作用。

在MapReduce这类型的计算框架中,中间结果的传输是整个计算过程中最重要的一个步骤,Spark也是如此,在Spark作业中,这也是Stage划分的依据,我们称之为数据混洗(Shuffle),图2-3是Spark的DAG Scheduler生成的计算任务图。

图2-3 Spark的计算任务DAG图

这个作业被划分成了3个Stage,一共经历了2次数据混洗,数据混洗在图2-3中以分区之间多对多的关系体现,A、B、C、D、E分别代表不同的RDD。在2.5节会详细介绍数据混洗过程。

Spark 2.4是目前Spark的最新版本,通过社区和用户的多年沉淀,Spark已经逐渐形成了自己的思路和方向,变得更加简单、智能、极致。与Spark 1.x相比,从设计上来说,Spark 2.x的新特性主要体现在3个方面,即性能优化(Tungsten项目)、接口优化(统一Dataset和DataFrame接口)和流处理上,下面展开讲解。

Tungsten项目最先产生的原因是由于固态硬盘和万兆交换机(10 Gbit/s、40 Gbit/s)的普及和应用,I/O性能的大幅提升使得CPU和内存成了大数据处理中的新瓶颈。例如一个中等规模的集群(50~100台),在某些大型作业执行过程中,网络I/O和硬盘I/O经常会接近其性能理论值,而CPU的使用率却很难长期维持在一个很高水平。基于此,Spark开发团队希望开发一个新的Spark核心执行引擎来尽可能地压榨出CPU和内存的性能极限。2015年,Tungsten项目诞生了。Tungsten目前分为两个阶段,下面先介绍第一阶段。

1.内存管理与二进制处理

Tungsten旨在利用应用程序语义显式管理内存,消除JVM对象模型和垃圾回收的开销。Spark选择JVM来负责内存管理,JVM的垃圾回收器(Garbage Collector,GC)会不停地监控某个对象是否还有活跃的引用,如果没有,垃圾回收器会回收该对象并释放为其分配的内存。而对对象的引用通常存在于堆中的某些对象里或者作为变量存放于栈里,前者存活时间较长,后者存活时间较短。

另外,JVM对象开销一向是很大的,例如字符串采用UTF-8编码,还有一些对象header等信息,这样就容易引起内存不足,降低CPU访问数据的吞吐量。JVM的垃圾回收器是一个非常复杂的组件,同时它的设计思路和面对的场景通常都是针对在线事务处理(OLTP)系统,如交易系统,而Spark的使用场景则偏向于在线分析处理(OLAP)系统和离线计算系统,这两种场景对于性能需求差别非常大,因此利用JVM的垃圾回收器来应对Spark面对的场景,必然无法令人满意。

Tungsten的目的就是摆脱JVM的垃圾回收器,自己管理内存。尽管在过去十几年中,对于那些普通用途的字节码,JVM的垃圾回收器在预测对象生命周期方面取得了很好的效果,但是Spark比谁都清楚哪些数据需要留在内存中,哪些需要从内存中移除,这种情况下选择JVM管理内存,无疑不是最好的选择,这也是利用应用语义显式管理内存的意义所在,因此,Tungsten绕过了JVM提供的安全内存托管系统,而使用了sun.misc.Unsafe包中的类,它允许Tungsten自主管理其内存。使用Unsafe类构建的数据结构在存储和访问性能上也大大优于JVM对象模型。

通常Spark面对的数据都是结构化的表,Spark 1.4就使用了org.apache.spark.sql.catalyst. expressions.UnsafeRow来表示行对象,如图2-4所示。

图2-4 UnsafeRow数据结构

从图2-4可以看出,一个行对象由空位设置域、定长值域和变长值域3个域组成。

(1)空位设置域(null bit set)。在该域中,对于该行的每一个字段,都用一个比特的空间来表示该字段是否为空。这在过滤数据时非常有用,因为应用可以直接从内存读取该域的信息,而不用直接读取实际值。字段的数量和该域的大小都会作为UnsafeRow的独立属性进行存储。所以,这个域的长度也是可变的,因为字段的数量是可变的。

(2)定长值域(values(fixed length))。该域主要存储两种内容,每一种都符合定长长度:8字节。要么是固定长度的值,例如长整型、双精度或者整型的值,要么是不符合定长长度要求的值的引用或指针。指针指向变长值域中该字段的字节偏移量,此外,该域除了存储指针以外,还会存储该字段的长度(以字节为单位),这样两个整型合并为一个长整型值存储在定长值域中。最后要说明的是,由于字段的数量是已知的,因此该域具有固定长度并为每个字段保留8字节的值域。

(3)变长值域(values(variable length))。UnsafeRow对象最后一个域存储的是可变长度的字段,如字符串。如前所述,包含变长值域中位置偏移的指针被存储在定长值域中。由于已知存储某变长字段在定长值域中的偏移和长度,因此可以容易地定位变长字段的值。可以看到,UnsafeRow在不用反序列化整行的情况下可以很轻易地定位值,这就是我们所说的二进制操作,先来看看定长字段值的定位:

private long getFieldOffset(int ordinal) {
    return baseOffset + bitSetWidthInBytes + ordinal * 8L;
}

该方法比较好理解,baseOffset是该UnsafeRow对象自己的地址,bitSetWidthInBytes指的是空位设置域的长度,最后再加上前面字段数乘以8。而定位可变字段值则有些麻烦,来看看下面这段代码:

public UTF8String getUTF8String(int ordinal) {
    if (isNullAt(ordinal)) return null;
    final long offsetAndSize = getLong(ordinal);
    final int offset = (int) (offsetAndSize >> 32);
    final int size = (int) offsetAndSize;
    return UTF8String.fromAddress(baseObject, baseOffset + offset, size);
}

首先我们通过空位设置域来判断该值是否为空,是则返回空值并停止,接着用getLong方法取得该字段在定长值域中存储的值,通过类型转换和位运算得到指针和字符串长度,接着就可以用fromAddress方法得到该字段变长字符串类型的值了。

java.util.HashMap是Java中常见的数据结构。这种数据结构的优点是快速与便捷的数据访问,我们可以用一个任意类型的Java对象作为键来获取值。但是,我们在实际使用中也发现了java.util.HashMap的一些缺点:

这在顺序扫描的场景下,会导致非常随机和对缓存不友好的内存访问模式。因此Tungsten自己实现了一种新的数据结构,称为BytesToBytesMap,它提升了内存本地性,并且通过避免使用重量级的Java对象来减少内存开销,也很适合顺序扫描。顺序扫描这种操作对缓存是非常友好的,因为对内存的访问往往是连续的。经过测试,在大量数据的压力下,Tungsten的BytesToBytesMap几乎没有性能衰减,而java.util.HashMap则最终被GC压垮。

2.缓存感知计算

现代计算机系统使用64位地址指针指向64位内存块。而Tungsten也总是使用8字节的数据集来和64位内存块对齐。在CPU内核和内存之间,有一个L1、L2和L3高速分层存储,它们随着CPU数量增加而增加。通常,L3在所有核心之间共享。如果你的CPU内核要求将某个主存储器地址加载到CPU内核的寄存器(寄存器是CPU内核中的一个存储区),那么首先会在L1-L3缓存中检查是否包含请求的内存地址。我们将与这种地址相关联的数据称为存储器页。如果是这种情况,则略过主存储器访问,并且从L1、L2或L3高速缓存中直接加载该页。否则,该页从主存储器加载,会导致更高的延迟。延迟太高,CPU内核会等待(或执行其他工作)多个CPU时钟周期,直到主存储器页被传送到CPU内核的寄存器中。此外,该页也被放入所有高速缓存中,并且如果它们是满的,则从高速缓存中删除较不频繁访问的存储器页。因此我们得出两个结论。

基于上述结论,缓存淘汰和预取策略十分关键。当然,现代计算机系统不仅使用最近最少使用算法(Least Recently Use,LRU)从缓存中删除缓存的存储页,还会保留下那些虽然缓存时间长但很有可能被再次请求的存储页。另外,现代CPU还会预测将来的存储页请求,从而将该存储页预取至缓存中。不管怎样,应始终避免随机存储访问模式,通常越顺序存储访问执行得越快。

那么我们应该如何避免随机存储访问模式呢?让我们再来看看java.util.HashMap。顾名思义,键(key)对象的散列值(value)会被用来将对象分组到桶中。散列(hash)的副作用是哪怕键值差别非常细微,散列值也会不一样,并会导致被分组到相应的桶中。每个桶可以被看成指向存储在映射表中的链表指针(pointer)。这些指针指向的是随机内存区域。因此,顺序扫描是不可能的,如图2-5所示。

图2-5 HashMap的存储布局

读者会发现这些指针指向的对象都位于主存储(Java堆)的随机区域中。

为了提升顺序扫描性能,Tungsten采取了不同的办法:指针不仅存储目标值内存地址,还会保存键本身。在前面,我们已经了解了UnsafeRow的概念,8字节的存储区域用来保存两个整型值,例如,键和指向值的指针。这种存储布局如图2-6所示。

图2-6 改进的存储布局

这样,就可以运行具有顺序存储访问模式的排序算法(如快速排序)。当排序时,键和指针的组合存储区域会被到处移动,存储值的地方却不会变。虽然这些值可以随机分布在存储器中,但是键和指针的组合存储区域被以顺序布局,如图2-7所示。

图2-7 键和指针可以顺序遍历

前面介绍的BytesToBytesMap也同样具有缓存友好的特性。

3.代码生成

Tungsten引入了代码推断用于表达式估计。为了好理解一些,先举个小例子,先来看看以下表达式:

val i = 23
val j = 5
var z = i*x+j*y

假设x和y都来自表中的某一行。现在,假设将表达式应用到表中的每一行中,而这个表有数十亿行,JVM只能执行这个表达式数十亿次,这是一个非常大的开销。因此Tungsten实际做的是将这个表达式转换为字节码并将其发送到执行者线程中。

你可能知道,每个类在JVM上执行的都是字节码,这是针对不同微处理器架构的机器代码的一个中间层。这是Java的特点之一。因此JVM的工作流如下:

目前,还没有人想过在运行时直接生成字节码,这就是代码生成想要实现的。Tungsten分析将要被执行的任务,而不是依赖预编译组件,它会生成由人编写的在JVM上执行的特定的高性能字节码。

Tungsten还有助于加速序列化与反序列化对象,JVM提供的原生框架性能较差。而分布式数据处理框架的性能瓶颈通常在Shuffle阶段,在这个阶段中,数据通过网络传输,对象的序列化与反序列化是主要瓶颈(而不是I/O带宽),它同时也增加了CPU负担。因此提高这里的性能有助于消除计算瓶颈。

前面提到的特性主要是Tungsten阶段1(Tungsten Phase 1)带来的成果。Tungsten阶段2(Tungsten Phase 2)的优化主要集中在以下3个方面:

要理解这3点需要用到下一章的知识,我们会在下一章中详细介绍这3点。

作为Apache Spark 1.6发行版的一部分,Spark引入了Dataset API。Dataset的目标是提供类型安全的编程接口。这允许开发人员使用编译时类型安全的半结构化数据(如JSON或键值对,即可以在应用程序运行之前检查错误)。所以Spark Python API不实现Dataset API的原因是Python不是类型安全的语言。

同样重要的是,Dataset API还包含高级域特定的语言操作,如sum、avg、join和group。该特性意味着Dataset具有传统RDD的灵活性,而且代码也更具可读性。类似于DataFrame,Dataset可以通过将表达式和数据字段暴露给查询计划器并利用Tungsten的快速内存编码来从Spark的Catalyst优化器获益。

Spark API从RDD到DataFrame再到Dataset的历史演变如图2-8所示。

图2-8 Spark API的演变

在Spark 1.x中,DataFrame API和Dataset API的统一可能会造成大改动而影响向后兼容性。这其实是Apache Spark 2.0作为重要版本的主要原因之一(而不是一个1.x的次要版本,可以最大限度地减少任何破坏性的更改)。从图2-9可以看出,DataFrame和Dataset都作为Apache Spark 2.0引入的新API的一部分。

图2-9 统一的Spark 2.0 API

如前所述,Dataset API提供了类型安全的面向对象编程接口。Dataset可以通过将表达式和数据字段暴露给查询计划程序和Tungsten的快速内存编码来利用Catalyst优化器。但是,现在DataFrame和Dataset都作为Apache Spark 2.0的一部分,其实DataFrame现在是Dataset Untyped API的别名。更具体地说:

DataFrame = Dataset[Row]

此外,DataFrame API与DataSet API能够充分地享受到Tungsten项目的优化成果,这是由于在DataFrame中,可以获得更多的应用语义。

Spark Commiter、Databricks CTO辛湜在Spark Summit East 2016说过:“执行流分析最简单的方法就是不用去理会流。”

流的作用非常强大,但流的一个关键点是很难被构建和维护。像Uber、Netflix和Pinterest都采用了Spark Streaming,但公司内部还是有专门的团队来保证整个系统高可用。Spark Streaming在很多情况下都无法满足业务的需求,如晚到事件、状态持久化、分布式读写等。

为了改善上述情况,并简化编程接口,让开发人员更专注于业务,Spark在2.0中基于Spark SQL引擎统一了流处理与批处理接口。无论是Dataset、DataFrame还是流处理,运行的都是同样的查询,流处理也能享受到性能优化。

因此,Spark 2.0不仅统一了DataFrame与Dataset接口,还统一了流处理与批处理接口。基于此,我们可以轻易构建持续型应用,所谓持续型应用(continuous application),就是能实时响应数据的端到端的应用,例如:

图2-10展示了如何构建一个持续型应用的流程图,在本书第二部分也会实现一个持续型应用。

图2-10 持续型应用

Spark 2.2正式推出了可用于生产的Structured Streaming套件,标志Spark流处理技术有了质的飞跃,该技术将会在第4章进行详细介绍。

在本书付梓之际,Spark最新的版本为2.4.4,但我们从GitHub上的代码仓库可以一窥端倪:下一个大版本将会是Spark 3.0而不是2.5,图2-11是Spark主分支的一个commit,从图中可以看到,原本Spark下一个版本的编号原定为2.5,而后被修改为3.0,社区在2018年初就开始规划和讨论3.0的蓝图,3.0也是Spark十周年版本,将会在2019年发布,具有一定的纪念意义,也许这也是2.5改为3.0的一个原因。从JIRA官网上可以看到,目前和Spark 3.0有关的issue一共有250余个,例如 GraphX(SPARK-25994:支持属性图、Cphyer和图算法)、Spark SQL(SPARK-2578:添加Avro数据格式支持)、Spark Core(SPARK-21505:动态连接算子)、Spark ML(SARK-25383:支持图像数据采样)、资源调度(SPARK-25678:使Spark运行在HPC集群中)等,其中最重要的要属SPARK-24579,SPARK-24579是一个史诗级issue,在介绍这个issue之前,我们先来看看继Tungsten项目之后Spark最新的黑科技:Hydrogen项目。

图2-11 原本的2.5的编号被修改为3.0

Hydrogen项目与Tungsten项目一样,都是对Spark有巨大提升的前沿探索项目,也分为多个阶段,Hydrogen项目从Spark 2.3开始,历经Spark 2.4以及后续即将发布的Spark 3.0。Hydrogen项目出现的背景是,目前机器学习框架与深度学习框架开始井喷,而Spark的野心在于一统整个数据科学领域,也乐见其成,Spark的对这些框架的态度是“拥抱机器学习生态系统,并将其视为一等公民”,由此,Spark需要将涉及数据预处理以及模型训练等整个流程深度地与这些机器学习、深度学习框架进行集成,这也是Hydrogen项目的目标。

为了实现这个目标,也就是高效地支持绝大部分机器学习框架,Spark面临两大挑战:数据交换与执行模型。我们来看看Hydrogen项目是如何解决的。

1.数据交换

数据交换指的是在Spark与机器学习框架之间高吞吐地传输数据。Spark提出了一种用户自定义函数(UDF)用来执行用户任意的代码,这种UDF通常用来与机器学习框架进行集成,例如使用TensorFlow对测试数据进行预测。UDF支持各种语言,如Scala、Python、R等,UDF可以很方便地使Spark与机器学习框架进行集成,用户可以在UDF中写一段代码来调用机器学习库。在使用UDF时,我们可以采用一次一行的方式执行,如图2-12所示。

图2-12 一次一行的数据交换方式

图2-12中包含一个简单的Python UDF,对输入进行+1操作,它将对每一行的第一列进行+1操作,数据首先被Spark一次一行地读取,并在Spark中进行列切分,将第一列发送给Python进程,Python进程接收到输入以后,对输入进行+1操作,并返回给Spark,Spark得到结果并将其和原来的两列拼接成新的一行,也就是图2-12中右边的那一行。Spark一共要执行3次操作,直到所有数据读取完毕。如果我们深入分析这种交换方式,会发现这种交换方式的性能非常糟糕,原因是大部分的时间花在了Spark将数据传输给Python,Python又把数据传输给Spark上,据统计92%的CPU周期被浪费了。这当然不是我们想要的,来看看下一种交换方式:向量化的数据传输,如图2-13所示。

图2-13 向量化的数据交换方式

与一次一行的数据交换方式不同,我们采取了列式存储的小批量传输,也就是说数据本来就是按列存储,如ORC或者Parquet这种格式,而非按行存储,Spark会选取第一列(需要进行+1的列)的一个切片发送给Python进程,而Python收到的则会是一个numpy数组或者panda序列,在UDF中我们可以直接通过向量化操作对向量进行+1操作,这种计算无疑是高效的,例如numpy底层的数组操作由C语言编写,效率较原生Python大大提升。Spark得到结果会按照固有的列式存储格式发送给下游。

向量化的数据交换方式在两个环节都有提升,其一是与Python进程的数据传输,其二是UDF的执行效率,根据Databricks的测试结果,整体效率较一次一行的数据交换方式有3~240倍的提升,效果极其明显。

2.执行模型

执行模型要解决的是,一旦Spark与机器学习框架进行深度融合,而导致它与计算模型之间天生的不相容性。如果不解决这个问题,那么“一等公民”始终是一句空话。Spark的计算模型是高度并行的,作业被划分为任务,任务与任务之间相互独立,没有依赖,如图2-14所示。

图2-14 Spark的执行模型

而常见的分布式机器学习框架的执行模型通常是统一调度,互相协调的,这是为了优化通信,在模型训练过程中,任务之间通常会有高吞吐量和大带宽的数据交互,如图2-15所示。

图2-15 分布式机器学习框架的执行模型

这两种模式看起来没有什么冲突,但是一旦某个任务失败,Spark只需重新执行该任务即可,但分布式机器学习框架通常会执行所有相关的任务。在Hydrogen项目的第二部分,Spark在一个更高的层次提出了一种带有同步栅的执行模型(及其配套的API),统一了Spark与机器学习框架的执行模型,如图2-16所示。

图2-16 一种带有同步栅的执行模型

在这种模型中,Spark将整个作业切分成3个Stage,其中虚线表示的就是同步栅,在每个Stage中,并行的方式可以不同,以Stage 2的并行方式为例,一旦某个任务失败,将会重新执行所有任务。这种执行模型很好地融合了Spark与机器学习框架。

从Hydrogen项目的这两个部分来看,Hydrogen项目的关键词是融合,数据交换从数据边界的层面进行了融合,而执行引擎在执行逻辑上将两种不同的分布式计算理念进行了融合,从图2-16中可以看出数据交换是执行引擎的基础,Stage之间的数据交换就是利用了Hydrogen的数据交换的能力。与Tungsten项目类似,Hydrogen项目也分为三个阶段。

(1)第一阶段:Spark 2.3(2018年春季),基本的向量化UDF(SPARK-21190)。

(2)第二阶段:Spark 2.4(2018年秋季),同步栅调度(SPARK-24374)并支持更多的UDF(SPARK-22216)。

(3)第三阶段:Spark 3.0(2019年),测试版并支持标准的数据交换格式(SPARK-24579)。

在第三阶段中,Hydrogen项目的主要内容就是SPARK-24579,也就是前面提到的Spark 3.0中最重要的issue。SPARK-24579的主要内容是标准化Spark和人工智能、深度学习框架,如TensorFlow、MXNet之间的数据交换过程,并优化其传输性能。SPARK-24579的出发点在于,目前大数据与人工智能的结合是很多业务与应用成功的关键,而这两个领域的顶级开源社区也多次尝试整合,但由于Spark SQL、DataFrame、Structured Streaming的日趋成熟,Spark仍然是大数据社区的首选,因此人工智能框架如何与Spark进行集成是整合的关键。当然,目前已经存在一些解决方案如TensorFlowOnSpark、TensorFrames等,但是还没有一种标准化传输方案,所以性能优化只能根据具体情况来实现,例如TensorFlowOnSpark使用Hadoop的InputFormat / OutputFormat来加载和保存TensorFlow的TFRecords,并用RDD将数据传输给TensorFlow。SPARK-24579所探讨的正是如何降低整个过程的复杂性:标准化Spark和人工智能、深度学习框架之间的数据交换接口。这样,人工智能、深度学习框架就可以利用Spark从任何地方加载数据,而无须花费额外的精力来构建复杂的数据解决方案,例如从数据仓库或流模型推断中读取某个特征。Spark用户可以使用人工智能、深度学习框架,而无须学习特定的数据API,并且双方的开发人员可以独立地进行性能优化,因为接口本身不会带来很大的开销。SPARK-24579只是Spark在人工智能领域的一个开始,这也和2018年的Spark Summit首次改名为Spark + AI Summit相契合,预示着Spark将在人工智能领域发力。

在JIRA上我们还可以通过Hydrogen项目的标签对issue进行过滤,目前有三个没有关闭的史诗级issue,除了上面的SPARK-24579、SPARK-24374,还有一个SPARK-24615,该issue也是Hydrogen项目的一个重要改进,将会为Spark添加原生的GPU调度支持。目前,GPU已经广泛应用于分布式深度学习与训练加速,但通常用户需要用Spark加载大量数据,最新的Spark版本已经在YARN和Kubernetes中支持GPU了,虽然如此,但是Spark本身并不知道它们暴露的GPU,所以Spark用户无法正常请求和调度,这为Spark统一分析的口号留下了一丝遗憾。SPARK-24615将会为这类训练加速任务添加调度支持,该issue的目标如下。

未来,该issue希望达到的目标如下。

该issue想要做到的是在资源层面实现Spark和人工智能框架的融合和统一,同步栅模型则在执行层面上实现Spark和人工智能框架的融合和统一。

此外,属于Spark 3.0的史诗级issue除了SPARK-24579还有一个SPARK-25994,该issue将在第5章进行介绍。除了以上特性,Spark 3.0及后面的版本还会支持Scala 2.12和Hadoop 3.x、SQL解析、连接提示、新的DataSource API、自适应执行等新特性。

在生产环境中安装Spark并不是一件难事,首先需要了解Spark的部署方式,目前Spark有4种部署方式:

根据部署方式的不同,安装方式也会有所不同。目前分布式数据处理系统的架构主要分为3层,即存储、资源管理与调度、计算,如图2-17所示。

图2-17 3层分布式数据处理系统架构

每一层都有不同的技术选型,通常存储技术选用得较多的是HDFS,而资源管理与调度平台比较成熟且常用的选择通常是YARN与Mesos,这一层的技术选型决定了上层计算框架的部署方式。

Spark on YARN是生产环境中最为常见的部署方式,YARN(Yet Another Resource Negotiator)是Hadoop 2.0区别于之前版本的重大改进,得益于Hadoop的普及,YARN是目前最为普遍的资源管理与调度平台(详见《Hadoop海量数据处理(第2版)》一书第5章)。如果采取这种方式,用户只需要通过Spark客户端来提交作业,而作业的执行在YARN的Container中完成,资源调度由YARN的Application Master完成,计算任务调度由Spark的Driver完成,如图2-18所示。

图2-18 Spark on YARN(yarn-client)

用户首先在客户端(该客户端会部署Spark)提交作业,Driver会向YARN的ResourceManager申请资源,这时YARN会在集群中随机选取一个Container启动Spark Application Master作为二级资源调度器并回应Driver,接着Driver会和Spark Application Master通信,Spark Application Master会调度申请到的资源启动Executor开始计算任务,在作业执行过程中,Driver会和Executor保持通信。在这种模式中,Spark的计算节点可能会运行在YARN中的任意节点,但Spark的Driver不会运行在YARN中,而运行在用户提交任务的节点,因此这种模式也被称为yarn-client。Spark on YARN还有一种模式是yarn-cluster,该模式与yarn-client模式大同小异,区别主要在于Driver会运行在YARN中,而且一般与Application Master运行在同一个节点,如图2-19所示。

图2-19 Spark on YARN(yarn-cluster)

这两种方式读者可以根据需要自行选择,在中小规模的集群中,yarn-client要常用些,而大规模集群中,yarn-cluster要常用些。如果选取yarn-client模式,那么客户端会直接输出Driver日志,利于调试,而yarn-cluster模式则客户端无法输出Driver日志,另外如果想用Spark Shell进行作业调试,只能采用yarn-client模式。

Mesos是AMP实验室开发的资源管理与调度平台,与YARN是直接竞争对手,和Spark结合更加紧密,但目前由于YARN依托于Hadoop的先发优势太大,因此目前使用Mesos的用户不是太多,但可以看到的是,这是一种极具前景的技术。采用这种部署方式,同样是通过Spark客户端提交作业,Driver向Mesos的主节点的集群管理器申请资源,集群管理器为其分配资源并在相应的节点上启动Executor,接着Driver会调度任务在Driver上执行,并在执行过程中与Executor进行通信,如图2-20所示。

图2-20 Spark on Mesos

Spark Standalone模式采用了一个内置的调度器,而没有依赖任何外部资源管理与调度平台(如Mesos与YARN),相当于在Spark Standalone中,Spark集群自己管理资源与调度,在这个模式中,Spark客户端依然存在,作业通过Spark客户端提交到Spark集群,如图2-21所示。

图2-21 Spark Standalone

Driver会与集群管理器节点通信申请资源,平时Worker节点都会与集群管理器节点通信汇报自己的负载与健康状况,Driver拿到分配的资源,就会与各个Worker节点通信,开始启动计算任务。

可以看到,在Mesos与Standalone模式中,Driver承担了计算任务调度与资源调度的工作,而在YARN中,Driver只承担了计算任务调度的工作,其余工作由Application Master完成。造成差异的原因在于,Mesos与Standalone模式可以被认为是Spark原生模式,核心只是Cluster Manager实现不同,其余设计思路都高度一致。但在YARN中,就必须按照YARN的规则来实现,对于一个YARN应用来说,Application Master是不可或缺的,如果读者用过运行在YARN上的MapReduce——MRv2,就会发现在MapReduce作业中,计算任务调度与资源调度都由Application Master完成。此外,Mesos与Standalone这两种模式的最大不同在于,Mesos集群除了Spark应用还可以运行其他几乎所有类型的应用,如Web服务、数据库服务等,甚至是Standalone的集群,而Standalone集群只能运行Spark应用,因此从这个角度上来说,Standalone模式只是Mesos模式的一个精简版。

Kubernetes是Google大规模资源管理和调度平台Brog的开源实现,与Mesos功能类似,Spark 2.3以后开始支持Spark on Kubernetes运行模式。Kubernetes本身也是主从架构,Kubernetes Master是Kubernetes集群的主节点,负责与客户端交互、资源调度和自动控制,Node是从节点,可以运行在虚拟机和物理机上,主要功能是承载Pod的运行,Pod的创建和启停等操作由Node的Kubelet组件控制,Pod是若干容器的组合,同一个Pod的容器运行在同一个宿主机上,Pod是Kubernetes能够进行创建、调度和管理的最小单位。

向Kubernetes集群提交Spark作业如图2-22所示,客户端向Kubernetes Master提交作业,调度器分配资源启动Pod和Spark Driver,Driver创建运行在Pod中的Executor,并开始执行应用代码,当应用终止时,Executor所在的Pod也随之终止并销毁,但Driver会保留日志与“complete”状态,直到最终垃圾回收或者被手动清除。

图2-22 Spark on Kubernetes

前面介绍了Spark的4种部署方式,无论哪一种部署方式,都是“Spark客户端 + 计算资源集群”的模式。本小节主要介绍Spark客户端安装方式,而不会介绍YARN、Mesos、Spark集群的安装方式。安装Spark客户端的核心其实就是通过配置让该客户端找到相应的资源管理与调度平台,因此只需在spark-env.sh文件中添加HADOOP_CONF_DIR环境变量即可,如下:

HADOOP_CONF_DIR=/etc/hadoop/conf/

该路径下部署了YARN、HDFS等相关配置文件,此外如果还需要用到HBase、Hive等组件,可以将这些组件的配置文件放到Spark客户端的conf文件夹下。

预编译的Spark客户端可以在Spark的官网下载。

安装好了Spark以后,通常安装Spark的节点被称为Spark客户端,用户可以通过Spark安装路径bin目录下的spark-summit脚本来向集群提交任务,提交命令格式如下:

./bin/spark-submit \
--class <main-class> \
--master <master-url> \
--deploy-mode <deploy-mode> \
--conf <key>=<value> \
... # 其他配置项
<application-jar> \
[application-arguments]

用户可以通过配置master与deploy-mode选项来指定提交的集群与Driver运行的方式,class选项指定了应用的入口,下面举几个例子:

./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master spark://192.184.56.138:7077 \
--deploy-mode cluster \
--supervise \
--executor-memory 20G \
--total-executor-cores 100 \
/path/to/examples.jar 1000

该命令是向以Standalone模式部署的集群提交作业,而Driver也运行在集群中,每个Executor的内存大小为20 GB,所有Executor可以使用的CPU资源为100核,应用参数为1000。supervise选项确保当Driver以非零值退出时自动重启。/path/to/examples.jar是用户编写的代码打包成的jar包。

./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master mesos://192.184.56.138:7077 \
--deploy-mode cluster\
--supervise \
--executor-memory 20G \
--total-executor-cores 100 \
path/to/examples.jar 1000

该命令是向Mesos集群提交作业,Driver则运行在客户端节点。其余与Standalone集群一致。

export HADOOP_CONF_DIR=XXX
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master YARN \
--deploy-mode cluster \
--executor-memory 10G \
--num-executors 10 \
/path/to/examples.jar 1000

向YARN集群提交作业则有些不同,由于YARN并没有Master URL这样的配置来定位资源集群,用户需要让客户端找到YARN的配置,因此需要指定HADOOP_CONF_DIR的路径为YARN的配置文件所在路径,这样才能提交成功。此外,提交的master配置项可以简写为yarn-client/yarn-master,它们等同于--master yarn --deploy-mode cluster/client。

在提交命令中,可以通过--conf <key>=<value>的形式可以配置Spark所有参数。下面介绍一些重要的参数。

Spark的所有配置都可以通过提交时以命令行参数的形式进行设置,还可以通过SparkContext在代码中进行设置,以及在Spark客户端配置文件夹下的spark-defaults.conf文件中以键值对形式进行配置。默认代码中的配置会覆盖命令行的配置和配置文件的配置,如果没有配置命令行参数也没有在代码中进行配置,那么就以Spark客户端的配置文件夹下的spark-defaults.conf文件中的配置和默认配置为准。

Spark Shell是Spark提供的一种基于Scala的REPL(Read-Eval-Print-Loop)交互式解释器环境。一般来说,脚本语言(如Python等)都支持REPL,但是编译型的语言大多不支持,在利用Spark进行数据探索时(使用Java、Scala语言),如果先编译再运行,会大大降低开发效率,而Spark Shell的出现使Scala可以以脚本语言的形式来进行编程和运行,这在大规模数据探索时尤其有用,不能不说是一种创新思路,而Spark Shell也以其简洁实用的特性深受开发人员的喜爱。用户可以通过Spark Shell编码、配置并提交作业,Spark Shell本质上是对spark-submit命令的客户端模式的封装,用户可以在执行spark-shell脚本时加上启动参数,如--num-executors等。代码如下:

我们可以在Shell中编写Scala代码。此外,Spark还提供了Spark SQL的Shell工具spark-sql。

要编写Spark作业,初始化SparkSession是第一步,初始化方法如下:

val spark = SparkSession
.builder
.master("yarn-client")
.config("spark.reducer.maxSizeInFlight", "128M")
.appName("your_app_name")
.getOrCreate()

如前所述,在代码中,用户也可以指定提交方式与提交参数,默认代码配置的优先级配置要高于前面的命令行配置。

函数式编程思想一大特点是核心数据结构和低阶函数的运用,那么秉承函数式编程思想的Spark也有自己的实现方式:RDD与算子,RDD类似于Scala语言中的集合,算子相当于Scala语言中的低阶函数,算子构建了或者说描述了Spark作业的数据处理流程,而用户定义的高阶函数则会作为参数传入算子中用来解决和业务相关的问题。

RDD(Resilient Distributed Dataset)全称为弹性分布式数据集,是Spark对数据的核心抽象,也是最关键的抽象,实质上是一组分布式的JVM不可变对象集合,不可变决定了它是只读的,RDD在经过变换产生新的RDD时,原有RDD不会改变。弹性主要表现在两个方面:其一是在面对出错情况(例如任意一台节点宕机)时,Spark能通过RDD之间的依赖关系恢复任意出错的RDD,RDD就像一块海绵一样,无论怎么挤压,都像海绵一样完整;其二,在经过转换算子处理时,RDD中的分区数以及分区所在的位置随时都有可能改变。RDD名字中虽然带有分布式,但是它的目的却是要让用户感觉不到分布式,而像操作本地数据集一样操作在分布式存储中的数据。用户可以将RDD看作一个数组,数组并没有保存数据,而是每个数据分区的引用,数据以分区中的元素的形式分散保存在集群中的各个节点上。从这个角度上来说,RDD存储的是元数据而非数据本身。

每个RDD都有如下几个成员:

如图2-23所示,RDD_0根据HDFS上的块地址生成,块地址集合是RDD_0的成员变量,RDD_1由RDD_0与转换(transform)函数(算子)转换而成,该算子其实是RDD_0内部成员。从这个角度上来说,RDD_1依赖RDD_0,这种依赖关系集合也作为RDD_1的成员变量而保存。

图2-23 RDD的转换过程

在Spark源码中,RDD是一个抽象类,根据具体的情况有不同的实现,比如RDD_0可以是MapPartitionRDD,而RDD_1由于产生了Shuffle,则是ShuffledRDD。来看一下RDD的代码:

// 表示RDD之间的依赖关系的成员变量
@transient private var deps: Seq[Dependency[_]]
// 分区器成员变量
@transient val partitioner: Option[Partitioner] = None
// 该RDD所引用的分区集合成员变量
@transient private var partitions_ : Array[Partition] = null
// 得到该RDD与其他RDD之间的依赖关系
protected def getDependencies: Seq[Dependency[_]] = deps
// 得到该RDD所引用的分区
protected def getPartitions: Array[Partition]
// 得到每个分区地址
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
// distinct算子
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = 
withScope  {
    map(x => (x, null)).reduceByKey((x, y) => x, numPartitions).map(_._1)
}

创建RDD是使用RDD的第一步,它可以由内存中的集合、文件、外部数据源生成或者由其他RDD转换而成,还有一类比较特殊的方式,从流式数据中生成。

1.并行集合

并行集合可以由对Driver中的集合调用parallelize方法得到,sparkcontext是SparkSession对象的成员变量。这时,Driver会将集合切分成分区,并将数据分区分发到整个集群中。代码如下:

val rdd_one = sparkcontext.parallelize(Seq(1, 2, 3))

这里rdd_one类型是ParallelCollectionRDD。

2.从外部数据源读取

这是创建RDD最常见的一种方法,Spark支持几乎所有外部数据源,如HDFS、HBase、Elasticsearch、MySQL等。如果从HDFS读取数据生成RDD,Spark集群中的节点就会去读取分散在HDFS中的数据块,而通常HDFS集群与Spark集群通常会部署在同一个物理集群中,这里就会引出一个数据本地性的问题,我们会在后面介绍。

3.从HDFS中读取

HDFS作为Spark的底层存储,Spark可以直接从HDFS中读取。

val rdd_two = sparkcontext.textFile("/user/me/wiki.txt")

这里rdd_two类型是BlockRDD。

4.从HBase中读取

Spark从HBase读取有两种方式,一种是基于HBase的Scan操作,另一种是直接读取HBase的Region文件(关于HBase详见《Hadoop海量数据处理(第2版)》第8章),Region类似于HDFS数据块,也是切片,且每个切片都有副本,切片是有序二进制文件。从技术上说,当然是后者更为高效。代码如下:

...
val sc = new SparkContext(sparkConf)
val tablename = "your_hbasetable"  
val conf = HBaseConfiguration.create()  
conf.set("hbase.zookeeper.quorum", "zk1,zk2,zk3")  
conf.set("hbase.zookeeper.property.clientPort", "2181")  
conf.set(TableInputFormat.INPUT_TABLE, tablename)  
val rdd_three = sc.newAPIHadoopRDD(conf, classOf[TableInputFormat],  
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],  
classOf[org.apache.hadoop.hbase.client.Result]) 
// 利用HBase API解析出行键与列值
rdd_three.foreach{case (_,result) => {    
    val rowkey = Bytes.toString(result.getRow)  
    val value1 = Bytes.toString(result.getValue("cf".getBytes,"c1".getBytes))
}

rdd_three的类型为newAPIHadoopRDD,其数据类型为Tuple2[mmutableBytesWritable, Result],可以在后续迭代时用HBase API解析得到行键与列值。

5.从Elasticsearch中读取

Elasticsearch是一个分布式的全文搜索引擎,其数据存储模式也是采取的水平分片(sharding)+副本(replication)的形式,很适合Spark集群分布式读取。

...
sparkConf.set("es.index.auto.create", "true")
sparkConf.set("es.nodes", "es1")
sparkConf.set("es.port", "9200")
val sc = new SparkContext(sparkConf)

val rdd_four = EsSpark.esRDD(sc, "spark/docs").foreach(line => {
   val key = line._1
   val value = line._2
})

这段代码需要引用第三方库elasticsearch-spark_2.10-2.1.0.jar。

6.从MySQL读取

Spark从MySQL读取数据与上面3种不太一样,上面3种都是从分布式系统中读取数据,而MySQL是一个单点数据库。Spark从MySQL中读数据的方式很像我们熟悉的一个数据抽取工具Sqoop。代码如下:

...
val lowerBound = 1
val upperBound = 1000
val numPartition = 10
val rdd_five = new JdbcRDD(sc,() => {
       Class.forName("com.mysql.jdbc.Driver").newInstance()
       DriverManager.getConnection("jdbc:mysql://localhost:3306/db", "root", "123456")
   },
   "SELECT content FROM mysqltable WHERE ID >= ? AND ID <= ?",
   lowerBound, 
   upperBound, 
   numPartition,
   r => r.getString(1)
)

Spark从MySQL中读取数据返回的RDD类型是JdbcRDD,顾名思义,是基于JDBC读取数据的,这点与Sqoop是相似的,但不同的是JdbcRDD必须手动指定数据的上下界,也就是以MySQL表某一列的最值作为切分分区的依据,且该列数据类型必须为长整型,这就大大限制了JdbcRDD的使用场景。

7.PairRDD

在开始介绍转换算子之前,还有一类RDD,它与其他RDD并无不同,只不过它的数据类型是Tuple2[K,V],即键值对,因此这种RDD也被称为PairRDD,泛型为RDD[(K,V)]。这种数据结构决定了PairRDD可以使用某些基于键的算子,如分组、汇总等。PairRDD可以由普通RDD转换得到:

val a = sc.textFile("/user/me/wiki").map(x => (x,x))

从流式数据生成RDD(DStream)的方式将在第4章介绍。

RDD算子主要分为两类,一类为转换(transform)算子,另一类为行动(action)算子,转换算子主要负责改变RDD中数据、切分RDD中数据、过滤掉某些数据等。转换算子按照一定顺序组合,Spark会将其放入到一个计算的有向无环图中,并不立刻执行,当Driver请求某些数据时,才会真正提交作业并触发计算,而行动算子就会触发Driver请求数据。这种机制与函数式编程思想的惰性求值类似。这样设计的原因首先是避免无谓的计算开销,更重要的是Spark可以了解所有执行的算子,从而设定并优化执行计划。

RDD转换算子有20多个,按照DAG中分区与分区之间的映射关系来分组,有如下3类:

而按照RDD的结构可以分为两种:

本书按照转换算子的用途分为以下4类:

在介绍算子时,并没有刻意区分RDD和Pair RDD,读者可以根据RDD的泛型来做判断,此外往往两个功能相似的算子如groupBy与groupByKey,底层都是先将值型RDD转换成键值型RDD,再直接利用键值型RDD完成转换功能,故不重复介绍。

1.通用类

这一类可以满足绝大多数需要,特别适合通用分析型需求。

图2-24 map算子

图2-25 reduceByKey算子

图2-26 flatMap算子

图2-27 filter算子

图2-28 单分区排序

图2-29 多分区排序

图2-30 groupByKey算子

2.数学/统计类

这类算子实现的是某些常用的数学或者统计功能,如分层抽样等。

3.集合论与关系类

这类算子主要实现的是像连接数据集这种功能和其他关系代数的功能,如交集、差集、并集、笛卡儿积等。

图2-31 cogroup算子

图2-32 union算子

4.数据结构类

这类算子主要改变的是RDD中底层的数据结构,即RDD中的分区。在这些算子中,你可以直接操作分区而不需要访问这些分区中的元素。在Spark应用中,当你需要更高效地控制集群中的分区和分区的分发时,这些算子会非常有用。通常,根据集群状态、数据规模和使用方式有针对性地对数据进行重分区可以显著提升性能。默认情况下,RDD使用散列分区器对集群中的数据进行分区。分区数与集群中的节点数无关,很可能集群中的单个节点有几个数据分区。数据分区数一般取决于数据量和集群节点数。作业中的某个计算任务的输入是否在本地,这称为数据的本地性,计算任务会尽可能地优先选择本地数据。

行动算子从功能上来说作为一个触发器,会触发提交整个作业并开始执行。从代码上来说,它与转换算子的最大不同之处在于:转换算子返回的还是RDD,行动算子返回的是非RDD类型的值,如整数,或者根本没有返回值。

行动算子可以分为Driver和分布式两类。

1.Driver类算子

以下算子为Driver类的行动算子。

2.分布式类算子

以下算子为分布式类的行动算子。

在计算过程中,用户可能会经常使用同一份数据,此时就可以用到Spark缓存技术,也就是利用缓存算子将RDD进行缓存,从而加速Spark作业的执行速度。Spark缓存算子也属于行动算子,也就是说会触发整个作业开始计算,想要缓存数据,你可以使用cache算子或者persist算子,这两个算子是仅有的两个返回值为RDD的行动算子。事实上,Spark缓存技术是加速Spark作业执行的关键技术之一,尤其是在迭代计算的场景,效果非常好。

缓存策略是尽可能地将数据放入内存。如果没有足够的内存,那么驻留在内存的当前数据就有可能被移除,例如LRU策略;如果数据量本身已经超过可用内存容量,这时由于磁盘会代替内存存储数据,性能会下降。

表2-1 cache算子的缓存级别

存储级别

含义

MEMORY_ONLY

将RDD以反序列化Java对象的形式存储在JVM中。如果RDD超过了可用内存容量,那么一部分分区则不会被缓存,而是在需要时进行重算

MEMORY_AND_DISK

将RDD以反序列化Java对象的形式存储在JVM中。如果RDD超过了可用内存容量,那么一部分分区会存储在磁盘上,当需要时会直接从磁盘读取

MEMORY_ONLY_SER
(Java与Scala)

将RDD以序列化Java对象(一个分区为一个字节数组)的形式存储在JVM中。当使用某个快速序列化工具时,这通常更节省空间,但会增加CPU的压力,这是用时间换空间的取舍

MEMORY_ONLY_DISK_SER
(Java与Scala)

与MEMORY_ONLY_SER相似,当内存不够用时,但不会将剩下的分区写到磁盘上,而是在需要时进行重算

DISK_ONLY

将所有RDD分区存到磁盘上

MEMORY_ONLY_2、
MEMORY_AND_DISK_2等

与MEMORY_ONLY级别相同,但是在缓存数据时,会在集群中的两个节点保存一份,也就是说两个副本

OFF_HEAP(实验性质)

与MEMORY_ONLY_SER相似,但是将数据存储在堆外内存,需要开启堆外内存选项,例如可以用Alluxio作为堆外存储

如果内存足够大,使用MEMORY_ONLY无疑是性能最好的选择,想要节省点空间的话,可以采取MEMORY_ONLY_SER,可以序列化对象使其所占空间减少一点。DISK是在重算的代价特别昂贵时的不得已的选择。MEMORY_ONLY_2和MEMORY_AND_DISK_2拥有最佳的可用性,但是会消耗额外的存储空间。

在DAG中,最初的RDD被称为基础RDD,后续生成的RDD都是由算子以及依赖关系生成的,也就是说,无论哪个RDD出现问题,都可以由这种依赖关系重新计算而成。这种依赖关系被称为RDD血统(lineage)。血统的表现形式主要分为宽依赖(wide dependency)与窄依赖(narrow dependency),如图2-33所示。

图2-33 宽依赖和窄依赖(1)

如图2-33所示,窄依赖的准确定义是子RDD中的分区与父RDD中的分区只存在一对一的映射关系,而宽依赖则是子RDD中的分区与父RDD中的分区存在一对多的映射关系,那么从这个角度来说,map、filter、union等就是窄依赖,而groupByKey、cogroup就是典型的宽依赖,如图2-34所示。

图2-34 宽依赖和窄依赖(2)

宽依赖还有个名字,叫Shuffle依赖,也就是说宽依赖必然会发生Shuffle操作,在前面也提到过Shuffle也是划分Stage的依据。而窄依赖由于不需要发生Shuffle,所有计算都是在分区所在节点完成,它类似于MapReduce中的ChainMapper。

当RDD中的某个分区出现故障,那么只需要按照这种依赖关系重新计算即可,窄依赖最简单,只涉及某个节点内的计算,而宽依赖,则会按照依赖关系由父分区计算而得到,如图2-35所示。

图2-35 根据宽依赖重算

如果P1_0分区发生故障,那么按照依赖关系需要P0_0与P0_1的分区重算,如果P0_0与P0_1没有持久化,就会不断回溯,直到找到存在的父分区为止。当计算逻辑复杂时,就会引起依赖链过长,这样重算的代价会极其高昂,因此用户可以在计算过程中,适时调用RDD的checkpoint方法,保存当前算好的中间结果,这样依赖链就会大大缩短。RDD的血统机制就是RDD的容错机制。

Spark的容错主要分为资源管理平台的容错和Spark应用的容错,Spark应用基于资源管理平台运行,因此资源管理平台的容错也是Spark容错的一部分,如YARN的ResourceManager HA机制。在Spark应用执行的过程中,可能会遇到失败的情况有:

Driver执行失败是Spark应用最严重的一种情况,标志整个作业彻底执行失败,需要开发人员手动重启Driver;Executor报错通常是因为Executor所在的机器故障导致,这时Driver会将执行失败的任务调度到另一个Executor继续执行,重新执行的任务会根据RDD的依赖关系继续计算,并将报错的Executor从可用Executor的列表中去掉;Spark会对执行失败的任务进行重试,重试3次后若仍然失败会导致整个作业失败,任务的数据恢复和重新执行都用到了RDD的血统机制。

很多算子都会引起RDD中的数据进行重分区,新的分区被创建,旧的分区被合并或者被打碎。在重分区的过程中,数据跨节点移动被称为Shuffle,在Spark中,Shuffle负责将Map端的处理的中间结果传输到Reduce端供Reduce端聚合,它是MapReduce类型计算框架中最重要的概念也是消耗性能巨大的步骤,它体现了从函数式编程接口到分布式计算框架的实现。与MapReduce的Sort-based Shuffle不同,Spark对Shuffle的实现有两种:Hash Shuffle与Sort-based Shuffle,这其实是一个优化的过程。在较老的版本中,Spark Shuffle的方式可以通过spark.shuffle. manager配置项进行配置,而在新的Spark版本中,已经去掉了该配置,统一为Sort-based Shuffle。

在Spark 1.6.3之前,Hash Shuffle都是Spark Shuffle的解决方案之一。Shuffle的过程一般分为两个部分:Shuffle Write和Shuffle Fetch,前者是Map任务划分分区、输出中间结果,而后者则是Reduce任务获取到这些中间结果。Hash Shuffle的过程如图2-36所示。

图2-36 Hash Shuffle

在图2-36中,Shuffle Write发生在一个节点上,该节点用来执行Shuffle任务的CPU核数为2,每个核可以同时执行两个任务,每个任务输出的分区数与Reducer数相同,即为3,每个分区都有一个缓冲区(bucket)用来接收结果,每个缓冲区的大小由配置spark.shuffle.file.buffer.kb决定。这样每个缓冲区写满后,就会输出到一个文件段(filesegment),而Reducer就会去相应的节点拉取文件。这样的实现很简单,但是问题也很明显。

为了解决第一个问题,Spark推出过File Consolidation机制,旨在通过共用输出文件以降低文件数,如图2-37所示。

图2-37 Consolidation机制

当每次Shuffle任务输出时,同一个CPU核心处理的Map任务的中间结果会输出到同分区的一个文件中,然后Reducer只需一次性将整个文件拿到即可。那么这样的话,Shuffle产生的文件数为C(CPU核数)×R。Spark的FileConsolidation机制默认开启,可以通过spark.shuffle. consolidateFiles配置项进行配置。

在Spark先后引入了Hash Shuffle与FileConsolidation后,还是无法根本解决中间文件数太大的问题,Spark在1.2版之后又推出了与MapReduce一样(读者可参照《Hadoop海量数据处理(第2版)》的5.3.4节)的Shuffle机制——Sort-based Shuffle,才真正解决了Shuffle的问题,再加上Tungsten项目的优化,Spark的Sort-based Shuffle比MapReduce的Sort-based Shuffle更为优异,如图2-38所示。

图2-38 Sort-based Shuffle

每个Map任务最后只会输出两个文件(其中一个是索引文件),其中间过程采用的是与MapReduce一样的归并排序,但是会用索引文件记录每个分区的偏移量,输出完成后,Reducer会根据索引文件得到属于自己的分区,在这种情况下,Shuffle产生的中间结果文件数为2×M(M为Map任务数)。

在Sort-based Shuffle中,Spark还提供了一种折中方案——Bypass Sort-based Shuffle,当Reduce任务小于spark.shuffle.sort.bypassMergeThreshold配置(默认200)时,Spark Shuffle开始按照Hash Shuffle的方式处理数据,而不用进行归并排序,只是在Shuffle Write步骤的最后,将其合并为1个文件,并生成索引文件。这样实际上还是会生成大量的中间文件,只是最后合并为1个文件并省去了排序所带来的开销,该方案准确的说法是Hash Shuffle的Shuffle Fetch优化版。

Spark在1.5版时开始了Tungsten项目,也在1.5.0、1.5.1、1.5.2版的时候推出了一种tungsten-sort的选项,这是一种成果应用,类似于一种实验,该类型Shuffle本质上还是排序的Shuffle,只是用UnsafeShuffleWriter进行Map任务输出,并采用了与前面介绍的BytesToBytesMap相似的数据结构,把对数据的排序转化为对指针数组的排序,能够基于二进制数据进行操作,对GC有了很大提升,但是该方案对数据量有一些限制,随着Tungsten项目的逐渐成熟,该方案在1.6版就消失不见了。

从上面整个过程的变化来看,Spark Shuffle也是经过了一段时间才趋于成熟和稳定,这也正像学习的过程,不能一蹴而就,贵在坚持。

在Spark作业中,用户编写的高阶函数会在集群中的Executor里执行,这些Executor可能会用到相同的变量,这些变量被复制到每个Executor中,而Executor对变量的更新不会传回Driver。在计算任务中支持通用的可读写变量一般是低效的,即便如此,Spark还是提供了两类共享变量:广播变量(broadcast variable)与累加器(accumulator)。

广播变量类似于MapReduce中DistributeFile,通常来说是一份不大的数据集,一旦广播变量在Driver中被创建,整个数据集就会在集群中进行广播,就能让所有正在运行的计算任务以只读方式访问。广播变量支持一些简单的数据类型,如整型、集合类型等,也支持很多复杂数据类型。

广播变量为了保证数据广播到所有节点,使用了很多办法。这其实是一个很重要的问题,我们不能期望100个或者1000个Executor去连接Driver,并拉取数据,这会让Driver不堪重负。Executor采用的是通过HTTP连接去拉取数据,类似于BitTorrent点对点传输。这样的方式更具扩展性,避免了所有Executor都去向Driver请求数据而造成Driver故障。

Spark广播机制运作方式是Driver将已序列化的数据切分成小块,然后将其存储在自己的块管理器BlockManager中,当Executor开始运行时,每个Executor首先从自己的内部块管理器中试图获取广播变量,如果以前广播过,那么直接使用;如果没有,Executor就会从Driver或者其他可用的Executor去拉取数据块。一旦拿到数据块,就会放到自己的块管理器,供自己和其他需要拉取的Executor使用。这就很好地防止了Driver单点的性能瓶颈,如图2-39所示。

图2-39 广播变量

下面来看一下如何在Spark作业中创建、使用广播变量。代码如下:

scala> val rdd_one = sc.parallelize(Seq(1,2,3))
rdd_one: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[101] at
parallelize at <console>:25
    scala> val i = 5
    i: Int = 5
scala> val bi = sc.broadcast(i)
bi: org.apache.spark.broadcast.Broadcast[Int] = Broadcast(147)
scala> bi.value
res166: Int = 5
scala> rdd_one.take(5)
res164: Array[Int] = Array(1, 2, 3)
scala> rdd_one.map(j => j + bi.value).take(5)
res165: Array[Int] = Array(6, 7, 8)

在用户定义的高阶函数中,可以直接使用广播变量的引用。下面看一个集合类型的广播变量:

scala> val rdd_one = sc.parallelize(Seq(1,2,3))
    rdd_one: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[109] at
parallelize at <console>:25
scala> val m = scala.collection.mutable.HashMap(1 -> 2, 2 -> 3, 3 -> 4)
    m: scala.collection.mutable.HashMap[Int,Int] = Map(2 -> 3, 1 -> 2, 3 -> 4)
scala> val bm = sc.broadcast(m)
bm:
org.apache.spark.broadcast.Broadcast[scala.collection.mutable.HashMap[Int,I
nt]] = Broadcast(178)
scala> rdd_one.map(j => j * bm.value(j)).take(5)
res191: Array[Int] = Array(2, 6, 12)

该例中,元素乘以元素对应值得到最后结果。广播变量会持续占用内存,当我们不需要的时候,可以用unpersist算子将其移除,这时,如果计算任务又用到广播变量,那么就会重新拉取数据,如下:

    ...
scala> val rdd_one = sc.parallelize(Seq(1,2,3))
rdd_one: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[101] at
parallelize at <console>:25
scala> val k = 5
k: Int = 5
scala> val bk = sc.broadcast(k)
bk: org.apache.spark.broadcast.Broadcast[Int] = Broadcast(163)
scala> rdd_one.map(j => j + bk.value).take(5)
res184: Array[Int] = Array(6, 7, 8)
scala> bk.unpersist
scala> rdd_one.map(j => j + bk.value).take(5)
res186: Array[Int] = Array(6, 7, 8)

你还可以使用destroy方法彻底销毁广播变量,调用该方法后,如果计算任务中又用到广播变量,则会抛出异常:

scala> val rdd_one = sc.parallelize(Seq(1,2,3))
rdd_one: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[101] at
parallelize at <console>:25
scala> val k = 5
k: Int = 5
scala> val bk = sc.broadcast(k)
bk: org.apache.spark.broadcast.Broadcast[Int] = Broadcast(163)
scala> rdd_one.map(j => j + bk.value).take(5)
res184: Array[Int] = Array(6, 7, 8)
scala> bk.destroy
scala> rdd_one.map(j => j + bk.value).take(5)
17/05/27 14:07:28 ERROR Utils: Exception encountered
org.apache.spark.SparkException: Attempted to use Broadcast(163) after it
was destroyed (destroy at <console>:30)
at org.apache.spark.broadcast.Broadcast.assertValid(Broadcast.scala:144)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$writeObject$1.apply$mc
V$sp(TorrentBroadcast.scala:202)
at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$wri

广播变量在一定数据量范围内可以有效地使作业避免Shuffle,使计算尽可能本地运行,Spark的Map端连接操作就是用广播变量实现的。

与广播变量只读不同,累加器是一种只能进行增加操作的共享变量。如果你想知道记录中有多少错误数据,一种方式是针对这种错误数据编写额外逻辑,另一种方式是使用累加器。用法如下:

    ...
scala> val acc1 = sc.longAccumulator("acc1")
acc1: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 10355,
name: Some(acc1), value: 0)
scala> val someRDD = tableRDD.map(x => {acc1.add(1); x})
someRDD: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[99] at map at
<console>:29
scala> acc1.value
res156: Long = 0 /*there has been no action on the RDD so accumulator did
not get incremented*/
scala> someRDD.count
res157: Long = 351
scala> acc1.value
res158: Long = 351
scala> acc1
res145: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 10355,
name: Some(acc1), value: 351)

上面这个例子用SparkContext初始化了一个长整型的累加器。LongAccumulator方法会将累加器变量置为0。行动算子count触发计算后,累加器在map函数中被调用,其值会一直增加,最后定格为351。Spark内置的累加器有如下几种。

所有这些累加器都是继承自AccumulatorV2,如果这些累加器还是不能满足用户的需求,Spark允许自定义累加器。如果需要某两列进行汇总,无疑自定义累加器比直接编写逻辑要方便很多,例如:

A B
100 1
200 2
400 4
800 8
1600 16

这个表只有两列,需要统计A列与B列的汇总值。下面来看看根据上面的逻辑如何实现一个自定义累加器。代码如下:

import org.apache.spark.util.AccumulatorV2
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

// 构造一个保存累加结果的类
case class SumAandB(A: Long, B: Long)

class FieldAccumulator extends AccumulatorV2[SumAandB,SumAandB] {

private var A:Long = 0L
private var B:Long = 0L
   // 如果A和B同时为0,则累加器值为0
   override def isZero: Boolean = A == 0 && B == 0L
   // 复制一个累加器
   override def copy(): FieldAccumulator = {
      val newAcc = new FieldAccumulator
      newAcc.A = this.A
      newAcc.B = this.B
      newAcc
   }
   // 重置累加器为0
   override def reset(): Unit = { A = 0 ; B = 0L }
   // 用累加器记录汇总结果
   override def add(v: SumAandB): Unit = {
      A += v.A
      B += v.B
   }
   // 合并两个累加器
   override def merge(other: AccumulatorV2[SumAandB, SumAandB]): Unit = {
      other match {
      case o: FieldAccumulator => {
         A += o.A
         B += o.B}
      case _ =>
      }
   }
   // 当Spark调用时返回结果
   override def value: SumAandB = SumAandB(A,B)
}

凡是有关键字override的方法,均是重载实现自己逻辑的方法。累加器调用方式如下:

package com.spark.examples.rdd

import org.apache.spark.SparkConf
import org.apache.spark.SparkContext

class Driver extends App{

  val conf = new SparkConf
  val sc = new SparkContext(conf)
  val filedAcc = new FieldAccumulator
  sc.register(filedAcc, " filedAcc ")
  // 过滤掉表头
  val tableRDD = sc.textFile("table.csv").filter(_.split(",")(0) != "A")
  tableRDD.map(x => {
     val fields = x.split(",")
     val a = fields(1).toInt
     val b = fields(2).toLong
     filedAcc.add(SumAandB (a, b))
     x
  }).count
}

最后计数器的结果为(3100, 31)。

Spark原生接口只支持Scala和Java,这两种接口对于非计算机专业的用户来说还是不够友好,不利于Spark推广与使用。为了方便使用,Spark提供了两种对数据分析师非常容易上手的语言接口:Python和R。这两种编程接口的出现,极大地拓宽了Spark的使用人群,Python的简单易学和高效率开发等特性使Python逐渐成为Spark主流开发语言之一。为了保证Spark核心实现的独立性,Spark仅在外围包装通过PySpark和SparkR两个中间件实现对Python和R语言的支持。

PySpark基于Spark的Java API,提供了Spark Python应用的运行环境和编程接口。

如图2-40所示,用户提交作业后,会启动一个Python版的SparkSession,其中SparkContext会通过Py4J启动一个JVM并创建一个JavaSparkContext。Py4J只在Driver端用于JavaSparkContext与Python之间的本地通信,而中间结果的传输(Shuffle)则没有采用Py4J,而是使用Spark自己的数据传输机制。在集群中,传输过来的数据以及用户代码会通过管道在Python子进程中处理,结果再通过管道输出回Spark Worker中。以PySpark这种方式提交的作业,计算由Python子进程完成,数据缓存在JVM中,由于Python天生计算性能不足,以这种方式执行的Spark应用速度会慢于同样逻辑的Scala或者Java版Spark应用。

图2-40 PySpark运行时架构

从图2-40中可以看出,PySpark执行Spark应用需要所有Executor节点上都要安装Python运行环境。

SparkR同样提供了R语言运行环境和编程接口,其原理大同小异,如图2-41所示,同样是通过R语言调用Java API初始化生成Spark的JVM Driver,中间结果还是利用Spark本身的机制进行传输。用户代码在Executor所在节点的R子进程中执行,并通过管道将结果输出回Executor。R语言版的Spark应用性能与Python版Spark应用类似。

图2-41 SparkR运行时架构

在了解了如何编写Spark作业后,下一步就是让Spark作业面对海量数据稳定运行,这就需要对Spark进行性能调优。Spark调优是一个持续的过程,随着你对Spark、数据本身、业务场景越发了解,调优的思路也会更加多样,这是一个持续累积的过程。能够有针对性地对Spark作业进行调优是一名有经验的大数据工程师的必备技能。本节将会从硬件、资源管理平台与使用方式3个维度介绍如何对Spark进行性能调优。

构建Spark集群的硬件只需普通的商用PC Server即可,由于Spark作业对内存需求巨大,建议配置高性能CPU、大内存的服务器,以下是建议配置。

该CPU是双路6核心,且具有超线程技术,因此一个CPU相当于有2×6×2 = 24核心。对于交换器的选择,通常如果在生产环境使用,那么无论集群规模大小,都应该直接考虑万兆交换机,对于上千个节点的集群,还需要多台交换机进行堆叠才能满足需求。

Spark基于资源管理平台运行,该平台对Spark来说就像一个资源池一样,那么资源池的大小取决于每个物理节点有多少资源供资源管理平台调度。一般来说,每台节点应预留百分之二十的资源保证操作系统与其他服务稳定运行,对于前面提到的机器配置,加入到资源池的内存为200 GB、CPU为20核。假设使用YARN作为资源管理平台,相关配置如下:

yarn.nodemanager.resource.memory-mb = 200G
yarn.nodemanager.resource.cpu-vcores = 20

假设YARN集群中有10个NodeManager节点,那么总共的资源池大小为2000 GB、200核。在Spark作业运行时,用户可以通过集群监控页面来查看集群CPU使用率,如果发现CPU使用率一直维持在偏低的水平,可以尝试将yarn.nodemanager.resource.cpu-vcores改大。内存与CPU资源设置应该维持一个固定的比例,如1:5,这样在提交作业时,也按照这个比例来申请资源,可以提高集群整体资源利用率。

一般来说,YARN集群中会运行各种各样的作业,这样资源利用率会比较高,但是也经常造成Spark作业在需要时申请不到资源,这时可以采取YARN的新特性:基于标签的调度,在某些节点上打上相应的标签,来实现部分资源的隔离。

本节主要从使用层面来介绍调优,其中会涉及参数调优、应用调优甚至代码调优。

1.提高作业并行度

在作业并行程度不高的情况下,最有效的方式就是提高作业并行程度。在Spark作业划分中,一个Executor只能同时执行一个任务,一个计算任务的输入是一个分区(partition),因此改变并行程度只有一个办法就是提高同时运行Executor的个数。

通常集群的资源总量是一定的,这样Executor数量增加,必然会导致单个Executor所分得的资源减少,这样的话,在每个分区不变的情况下,有可能会引起性能方面的问题,因此,我们可以增大分区数来降低每个分区的大小,从而避免这个问题。

RDD一开始的分区数与该份数据在HDFS上的数据块数量一致,后面可以通过coalesce与repartition算子进行重分区,这其实改变的是Map端的分区数,如果想改变Reduce端的分区数,有两个办法,一个是修改配置spark.default.parallelism,该配置设定所有Reduce端的分区数,会对所有Shuffle过程生效,另一个是直接在算子中将分区数作为参数传入,绝大多数算子都有分区数参数的重载版本,如groupByKey(600)等。在Shuffle过程中,Shuffle相关的算子会构建一个散列表,Reduce任务有时会因为这个表过大而造成内存溢出,这时就可以试着增大并行程度。

2.提高Shuffle性能

Shuffle是Spark作业中的关键一环,也是性能调优的重点,先来看看Spark参数中与Shuffle性能有关的有哪些:

spark.shuffle.file.buffer
spark.reducer.maxSizeInFlight
spark.shuffle.compress

根据2.5节的内容,第一个配置是Map端输出为中间结果的缓冲区大小,默认为32 KB。第二个配置是Map端输出为中间结果的文件大小,默认为48 MB,该文件还会与其他文件进行合并。第三个配置是Map端输出是否开启压缩,默认开启。缓冲区当然越大,写入性能越高,所以有条件可以增大缓冲区大小,提升Shuffle Write的性能,但该参数实际消耗的内存为C×spark.shuffle.file.buffer,其中C为执行该任务的核数。在Shuffle Read的过程中,Reduce任务所在的Executor会按照spark.reducer.maxSizeInFlight的设置大小去拉取文件,这需要创建内存缓冲区来接收,在内存足够大的情况下,可以考虑提高spark.reducer.maxSizeInFlight的值来提升Shuffle Read的效率。spark.shuffle.compress配置项默认为true,表示会对Map端输出进行压缩。

Spark Shuffle会将中间结果写到spark.local.dir配置的目录下,可以将该目录配置多路磁盘目录,以提升写入性能。

3.内存管理

Spark作业中内存主要有两个用途:计算和存储。计算是指在Shuffle、连接、排序和聚合等操作中用于执行计算任务的内存,而存储指的是用于跨集群缓存和传播数据的内存。在Spark中,这两块共享一个统一的内存区域(M),如图2-42所示,用计算内存时,存储部分可以获取所有可用内存,反之亦然,如有必要,计算内存也可以将数据从存储区移出,但会在总存储内存使用量下降到特定阈值(R)时才执行。换句话说,R决定了M内的一个分区,在这个分区中,数据不会被移出。由于实际情况的复杂性,存储区一般不会去占用计算区。

图2-42 Spark Executor内存布局

这样设计是为了使那些不使用缓存的作业可以尽可能地使用全部内存,而需要使用缓存的作业也会有一个区域始终用来缓存数据,这样用户就可以在不需要知道背后其复杂原理就自己根据实际内存需求来调节M与R的值以达到最好效果。下面是决定M与R的两个配置。

上面这两个默认值基本满足绝大多数作业的使用,在特殊情况可以考虑设置spark.memory. fraction的值以适配JVM老年代的空间大小,默认JVM老年代在不经过设置的情况下占JVM的2/3,因此这个值是合理的。

Spark Executor除了堆内存以外,还有非堆内存空间,这部分可以通过参数spark.yarn. executor.memoryoverhead进行配置,最小为384 MB,默认为Executor内存的10%,因此整个Executor JVM消耗的内存为:

spark.yarn.executor.memoryoverhead + spark.executor.memory

其中:

M = spark.executor.memory * spark.memory.fraction
R = M * spark.memory.storageFraction

此外,Spark还有可能会用到堆外内存:

O = spark.memory.offHeap.size

因此,整个Spark的内存管理布局如图2-42所示。

用户需要知道每个部分的大小应如何调节,这样才能针对场景进行调优。这其实是Spark实现的一种比较简化且粗粒度的内存调节方案。如果用户想要更精细地调整内存的每个区域,就需要在参数中spark.executor.extraClassPath配置Java选项了,这种方式只针对富有经验的工程师,对于普通用户来说不太友好。

4.序列化

序列化是以时间换空间的一种内存取舍方式,其根本原因还是内存比较吃紧,我们可以优先选择对象数组或者基本类型而不是那些集合类型来实现自己的数据结构,fastutil包提供了与Java标准兼容的集合类型。除此之外,还应该避免使用大量小对象与指针嵌套的结构。我们可以考虑使用数据ID或者枚举对象来代替字符串键。如果内存小于32 GB,可以设置Java选项-XX:+ UseCompressedOops来压缩指针为4字节,以上是需要用到序列化之前可以做的调优工作以节省内存。

对于大对象来说,可以使用RDD的persist算子并选取MEMORY_ONLY_SER级别进行存储,更好的方式则是以序列化的方式进行存储。这相当于用时间换空间,因为反序列化时会造成访问过慢,如果想用序列化的方式存储数据,推荐使用Kyro格式,它比原生的Java序列化框架性能优秀(官方介绍性能提升10倍)。Spark 2.0已经开始用Kyro序列化Shuffle中传输的字符串等基础类型数据了。

要将需要序列化的类在Kyro序列化库中注册方可使用Kyro序列化库。具体使用步骤如下。

(1)编写一个注册器,实现KyroRegister接口,在Kyro中注册你需要使用的类:

public static class YourKryoRegistrator implements KryoRegistrator {
   public void registerClasses(Kryo kryo) {
      // 在Kryo序列化库中注册自定义的类
      kryo.register(YourClass.class, new FieldSerializer(kryo, YourClass.class));
   }
}

(2)设置序列化工具并配置注册器:

...
spark.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
spark.config("spark.kryo.registrator", YourKryoRegistrator.class.getName())

5.JVM垃圾回收(GC)调优

通常来说,那种只读取RDD一次,然后对其进行各种操作的作业不太会引起垃圾回收(GC)问题。当Java需要将老对象释放而为新对象腾出空间时,需要追踪所有Java对象,然后在其中找出没有被使用的那些对象。GC的成本与Java对象数量成正比,因此使用较少对象的数据结构会大大减轻GC压力,如直接使用整型数组,而不选用链表。通常在出现GC问题的时候,序列化缓存是首先应该尝试的方法。

由于执行计算任务需要的内存和缓存RDD的内存互相干扰,因此GC也可能成为问题。这可以控制分配给RDD缓存空间来缓解这个问题。

GC调优的第一步是搞清楚GC的频率和花费的时间,这可以通过添加Java选项来完成:

-verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps

在Spark运行时,一旦发生GC,就会被记录到日志里。

为了进一步调优JVM,先来看看JVM如何管理内存。Java的堆空间被划分为两个区域:年轻代、老年代。顾名思义,年轻代会保存一些短生命周期对象,而老年代会保存长生命周期对象。年轻代又被划分为3个区域:一个Eden区,两个Supervisor区。简单来说,GC过程是这样的:当Eden区被填满后,会触发minor GC,然后Eden区和Supervisor 1区还存活的对象被复制到Supervisor 2区,如果某个对象太老或者Supervisor2区已满,则会将对象复制到老年代中,当老年代快满时,会触发full GC。

在Spark中,GC调优的目的是确保只有长生命周期的对象才会被保存到老年代中,年轻代有充足的空间来存储短生命周期对象。这会有助于避免执行full GC来回收任务执行期间生成的临时对象,有以下几个办法。

可以通过在作业中设置spark.executor.extraJavaOptions选项来指定执行程序的GC选项以及JVM内存各个区域的精确大小,但不能设置JVM堆大小,该项只能通过--executor-memory或者spark.executor.memory来进行设置。

6.数据本地性的取舍

对于分布式计算框架,通常都有数据本地性问题。如果数据所在的节点与计算任务(代码所在)节点相同,那么结果肯定会快,反之则需要将远端数据移动过来,这样就会慢。通常情况下,由于代码体积通常比数据小得多,因此一般Spark的调度准则会优先考虑分发代码。

数据本地性衡量的是数据与处理它的代码之间的远近,基于数据当前的位置有以下几个级别。

Spark当然希望每个计算任务都具有最佳的数据本地性,但这不一定总是满足的。如果没有空闲的节点处理数据,这时就会有两种选择,一种情况是等待数据所在节点完成计算,另一种是切换到远端节点开始计算。

Spark默认会等待一小会儿(3 s),希望有节点完成计算,一旦超时,则只好退而求其次,切换到下一个本地性级别,每个级别的超时时间都可以配置,可以都配置到spark.locality.wait中,或者在以下配置中按级别分别进行配置:

spark.locality.wait.process
spark.locality.wait.node
spark.locality.wait.rack

7.将经常被使用的数据进行缓存

如果某份数据经常会被使用,可以尝试用cache算子将其缓存,有时效果极好。

8.使用广播变量避免Hash连接操作

在进行连接操作时,可以尝试将小表通过广播变量进行广播,从而避免Shuffle,这种方式也被称为Map端连接。

9.聚合filter算子产生的大量小分区数据

在使用filter算子后,通常数据会被打碎成很多个小分区,这会影响后面的执行操作,可以先对后面的数据用coalesce算子进行一次合并。

10.根据场景选用高性能算子

很多算子都能达到相同的效果,但是性能差异却比较大,例如在聚合操作时,选择reduceByKey无疑比groupByKey更好;在map函数初始化性能消耗太大或者单条记录很大时,mapPartition算子比map算子表现更好;在去重时,distinct算子比groupBy算子表现更好。

11.数据倾斜

数据倾斜是数据处理作业中的一个非常常见也是非常难以处理的问题。正常情况下,数据通常都会出现数据倾斜的问题,只是轻重不同而已。数据倾斜的症状是大量数据集中到一个或者几个任务里,导致这几个任务会拖慢整个作业的执行速度,严重的甚至会导致整个作业执行失败,如图2-43所示。

图2-43 数据倾斜

可以看到任务A处理了绝大多数数据,其他任务执行完成后需要等待此任务执行完成后,作业才算完成。对于这种情况,可以采取以下几种办法处理。

图2-44 不均匀的数据单独进行处理

这种方式可以将倾斜的数据打散,从而避免数据倾斜。

对于那种分组统计的任务,可以通过两阶段聚合的方案来解决数据倾斜的问题,首先将数据打上一个随机的键值,并根据键的散列值进行分发,将数据均匀地分散到多个任务中去,然后在每个任务中按照真实的键值做局部聚合,最后再按照真实的键值分发一次,得到最后的结果,如图2-45所示,这样,最后一次分发的数据已经是聚合过后的数据,就不会出现数据倾斜的情况。这种方法虽然能够解决数据倾斜的问题但只适合聚合计算的场景。

图2-45 两阶段聚合

数据科学在很多方面,与传统的软件工程的开发过程有很多不同之处,它需要数据科学家们先要感知数据、再了解数据并探索数据,如此循环往复,这就需要快速实现算法、快速得到结果并快速修正。REPL的这种交互式解释器无疑非常适合用来调试、开发Spark作业,而Spark Shell也是Spark为Scala程序员准备的一个开发利器,但如果你是一位Python开发者,你还有更好的选择——Jupyter Notebook。

Jupyter Notebook是一个开源的Web应用(其标识如图2-46所示),用户可以用它来创建、分享那些包含代码、公式、可视化和文本的文档。它可以用于数据清洗、数据转换、数值模拟、统计建模、数据可视化、机器学习等。简单来说,Jupyter Notebook是一个交互式的笔记本应用,用户可以在其中编写代码、文档等,编写的代码立即运行,并很快产生结果,如HTML、表格、图片或者LaTeX表达式等,这对于数据探索方面尤其有用。笔记本电脑中的任何内容可以轻松地分享给别人,编写的内容也会自动保存在Jupyter Notebook中,这也非常利于用户学习和协作。

图2-46 Jupyter Notebook

Jupyter Notebook的核心是IPython Notebook,与IPython Notebook一样,Jupyter Notebook可以根据自身Kernel的不同,支持不同的编程语言,如Python 2、Python 3、R、Scala等,目前支持的语言数量多达40多种。此外,它还能与Spark进行整合,这一特性使Jupyter Notebook成为大数据应用开发的利器。它在交互式输出方面的优势,可以使数据探索、数据调研的效率大幅提升。

通过浏览器访问Jupyter Notebook,界面如图2-47所示,其中目前的Kernel为Python 2,其中有3个代码段,每个代码段都可以直接运行,并返回结果。

图2-47 Jupyter Notebook编程环境

安装Jupyter Notebook非常简单,用户只需要安装Anaconda即可,Anaconda是一个包管理器,它集成了数据科学领域几乎所有的软件包、库和工具,如图2-48所示,我们可以通过Anaconda的conda命令管理包、依赖和环境,默认的Anaconda的版本是包含了Jupyter Notebook应用的,因此在安装好Anaconda后,直接启动Jupyter Notebook即可。

图2-48 Anaconda包管理器

首先需要在Anaconda官方网站下载Anaconda包。

执行命令:

bash Anaconda2-5.0.1-Linux-X86_64.sh

按照提示,即可安装成功。

安装成功之后,就可以在Anaconda的安装路径的bin文件下,用jupyter-notebook命令启动Jupyter Notebook:

./jupyter-notebook --ip=10.28.200.196 --notebook-dir=~/notebook

启动成功后,就可以在浏览器中用地址localhost:8888进入Jupyter Notebook应用,如图2-49所示。

图2-49 浏览器访问Jupyter Notebook

进入Jupyter Notebook后,可以新建一个Python 2笔记本开始编写代码,如图2-50所示。

图2-50 新建一个笔记本

但是这样还不能直接编写Spark大数据应用,与Spark集成只需执行一条命令:

PYSPARK_DRIVER_PYTHON=/opt/cloudera/parcels/Anaconda/bin/ipython 
PYSPARK_DRIVER_PYTHON_OPTS="notebook --ip 11.21.210.16" 
/opt/cloudera/parcels/CDH/lib/spark/bin/pyspark --master  yarn-client

这条命令包含两个环境变量,第一个环境变量需配置用户安装Anaconda路径下的IPython命令,最后一行是启动PySpark,这3行要在一条命令中执行,执行后,以地址10.28.200.196:8888访问Jupyter Notebook,这时,已经可以在Jupyter Notebook中直接使用SparkSession实例spark和SparkContext实例sc,如图2-51所示。

图2-51 使用初始化好的SparkContext(sc)

蒙特卡罗方法(Monte Carlo method),也称统计模拟方法,是20世纪40年代中期由于科学技术的发展和电子计算机的发明,而被提出的一种以概率统计理论为指导的一类非常重要的数值计算方法,原理是通过大量随机样本去了解一个系统,进而得到所要计算的值。它非常强大和灵活,又相当简单易懂,很容易实现。对于许多问题来说,它往往是最简单的计算方法,有时甚至是唯一可行的方法。蒙特卡罗方法在金融工程学、宏观经济学、计算物理学(如粒子输运计算、量子热力学计算、空气动力学计算)等领域应用广泛。

本节将运用蒙特卡罗方法预测股票价格,这里我们认为影响股票价格的因子(解释变量)有原油价格(WTI)、30年期国债价格(Bond)、标准普尔500指数(GSPC)、纳斯达克指数(NDAQ),它们之间是一个线性关系,例如:

y =b0 + b1xWTI + b2xBond + b3xGSPC + b4xNDAQ

其中y就是股票价格,而我们有的就是历史原油价格、30年期国债价格、标准普尔500指数、纳斯达克指数与对应的股票价格(本例中的数据来源于财经网站,已经做好预处理)。这里假设xWTI、xBond、xGSPC、xNDAQ都服从正态分布。

用蒙特卡罗方法预测股票价格的方法大致如下。

(1)首先读取数据并对数据进行转换。

(2)用最小二乘法进行多元线性回归,得到每个因子的系数。

(3)通过随机生成各个因子的值,来得到股票价格的分布,随机生成的因子需服从正态分布。

(4)计算VaR与CVaR,评估最坏情况下的股票价格。

一开始在Jupyter Notebook中的第一个Cell中,先将引用的模块列出来:

import sys

import datetime

import os

import math
import numpy as np

from pyspark.ml.feature import VectorAssembler
from pyspark.ml.regression import LinearRegression
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType, DoubleType

reload(sys)
sys.setdefaultencoding("utf-8")

接下来的一个Cell,读取数据,并由原来的4个因子扩展为12个因子,扩展规则为每个因子增加自己的平方和平方根作为扩展因子,也就是说股票价格与因子之间的关系为:

y =b0 + b1xWTI+ b1xWTI1+ b2xWTI2 + b3xBond + b4xBond1 + b5xBond2 + b6xGSPC

+ b7•xGSPC1 + b8xGSPC2 + b9xNDAQ+ b10xNDAQ1+ b11xNDAQ2

WTI1为原油价格的平方根;WTI2为原油价格的平方,以此类推。代码如下:

# 读全部因子数据文件,命名为data
data = spark.read.csv("./data/data.csv", header=True)
# 注册表,以便下文进行数据类型转换
data.registerTempTable("data")
# 使用Spark SQL将字符串数据类型转换成double数据类型
df = spark.sql('select cast(label as double) label, cast(WTI as double) WTI, cast(WTI1 as double) WTI1, cast(WTI2 as double) WTI2, cast(Bond as double) Bond, cast(Bond1 as double) Bond1, cast(Bond2 as double) Bond2, cast(GSPC as double) GSPC, cast(GSPC1 as double) GSPC1, cast(GSPC2 as double) GSPC2, cast(NDAQ as double) NDAQ, cast(NDAQ1 as double) NDAQ1, cast(NDAQ2 as double) NDAQ2 from data')
# 由于计算矩阵方差和均值还要用到WTI、Bond、GSPC、NDAQ因子数据,因此提取这4列数据,并转换成列表数据
factor = spark.sql('select cast(WTI as double) WTI, cast(Bond as double) Bond, cast(GSPC as double) GSPC, cast(NDAQ as double) NDAQ from data')
list_factor = []
list_factor.append(factor.select('WTI').rdd.map(lambda x: x[0]).collect())
list_factor.append(factor.select('Bond').rdd.map(lambda x: x[0]).collect())
list_factor.append(factor.select('GSPC').rdd.map(lambda x: x[0]).collect())
list_factor.append(factor.select('NDAQ').rdd.map(lambda x: x[0]).collect())
print "全部因子数据预览:"
df.show(5)
print "提取出的WTI,Bond,GSPC,NDAQ因子数据预览"
print list_factor
print "数据导入完毕!"

数据准备好后,在下面一个Cell里用最小二乘法进行多元线性回归:

vecAssembler = VectorAssembler(inputCols=["WTI", "WTI1", "WTI2", "Bond", "Bond1", "Bond2", "GSPC", "GSPC1", "GSPC2", "NDAQ", "NDAQ1", "NDAQ2"], outputCol="features")
vecDF = vecAssembler.transform(df)
# 用最小二乘法进行多元线性回归
lr = LinearRegression(maxIter=5, regParam=0.0, solver="normal")
model = lr.fit(vecDF)
factorWeights = []
# 将模型系数coefficients存到factorWeights列表中
factorWeights.append(model.coefficients)

在下一个Cell中,进行蒙特卡罗模拟:

# 将因子数据转换为矩阵形式,这里用到了Python的numpy包
# 方便利用已有函数计算均值和协方差,以及进行矩阵运算等
data = np.array(list_factor)
# 通过自定义函数进行特征分解和抽样,并返回均值means、协方差cov和抽样矩阵samplingMatrix
def multiVar(data):
   covarianceMatrix = np.mat(np.cov(data))
   means = data.mean(axis=1)
   dim = len(means)
   # Eigenvalues and vectors of the covariance matrix
   covMatEigenvalues, covMatEigenvectors = np.linalg.eig(covarianceMatrix)
   # transpose of matrix
   transposeMatrix = covMatEigenvectors.T
   tmpMatrix = np.copy(transposeMatrix)
   # Scale each eigenvector by the square root of its eigenvalue.
   for row in range(0, dim):
      factor = np.sqrt(covMatEigenvalues[row])
      for col in range(0, dim):
         tmpMatrix[row, col] = tmpMatrix[row, col]*factor
   samplingMatrix = np.multiply(tmpMatrix, covMatEigenvectors)
   return means, covarianceMatrix, samplingMatrix
# 计算均值和协方差矩阵
means, cov, samplingMatrix = multiVar(data)
# 定义实验次数变量numTrials,并初始化值100
numTrials = 100
# 将权重矩阵设为广播变量
bFactorWeights = sc.broadcast(factorWeights)
# 定义分区数变量parallelism,并初始化为100
parallelism = 100
# 定义随机种子,一般为当前时间 
baseSeed = 1496
# 对每个分区都设置不同的种子,保存在seeds列表变量seeds中
seeds = []
for i in range(baseSeed, baseSeed+parallelism):
seeds.append(i)
# 将种子列表变量seeds转为rdd变量seedRdd,种子数即为分区数
seedRdd = sc.parallelize(seeds, parallelism)
# 将模拟的因子代入模型中,模拟股票价格(y)
trials = seedRdd.flatMap(lambda seed: trialReturns(seed, numTrials/parallelism, bFactorWeights.value, means, cov))
# 计算实验次数,并触发模拟开始
size = trials.count()

在最后一个Cell里,计算VaR与CVaR,来评估模拟的股票价格:

# VaR和CVaR
# 找出最差5%的数据,采用max(,)控制至少返回1个数据
test = trials.takeOrdered(max(size / 20, 1))
# VaR是期望损失,计算方法为选取数据中最差5%的数据中最大的数据,此处选取数据集列表中最后一个数据
VaR = test[-1]
# CVaR是平均期望损失,计算方法为选取数据中最差5%的数据,并求其平均值
# 因此计算test之和再除以数据长度得到该数据集平均值
CVaR = sum(test)/len(test)
print VaR
print CVaR

以上就是如何用蒙特卡罗模拟来预测股票价格,读者掌握了这种方法后,可以试着在Jupyter Notebook中用蒙特卡罗方法试求圆周率、定积分等。

本章主要介绍了Spark的架构、Spark 2.x与Spark 3.x的新特性(Tungsten项目和Hydrogen项目)、Spark基础编程和Spark计算中最重要的过程Shuffle,此外还介绍了一些作业调优经验。作为一名数据科学家,选取一个方便的IDE可以大大加强数据探索的效率,Jupyter Notebook不失为一个好的选择。


相关图书

Spark分布式处理实战
Spark分布式处理实战
Apache Spark大数据分析:基于Azure Databricks云平台
Apache Spark大数据分析:基于Azure Databricks云平台
Spark大数据实时计算:基于Scala开发实战
Spark大数据实时计算:基于Scala开发实战
Spark和Python机器学习实战:预测分析核心方法(第2版)
Spark和Python机器学习实战:预测分析核心方法(第2版)
图解Spark 大数据快速分析实战
图解Spark 大数据快速分析实战
精通Spark数据科学
精通Spark数据科学

相关文章

相关课程