Scala并发编程 第2版

978-7-115-55834-3
作者: 亚历山大 ▪ 普罗科佩茨(Aleksandar Prokopec)
译者: 王文涛
编辑: 陈聪聪

图书目录:

详情

本书是一本关于并发编程技术的教程,书中详细介绍了并发编程中的主要概念和基本数据结构,包括传统并发模型、基于Future 和Promise的异步编程、数据并行容器、基于响应式扩展的并发编程、软件事务性内存、角色模型、并发编程实践和反应器编程模型等。本书基于Scala语言编写,实例丰富,可操作性很强。 本书面向的用户群体以Scala用户为主,因为书中所有的示例都是基于Scala代码的。但其他语言用户也可以从中获益良多,因为书中介绍的并发编程概念是普遍适用的,并不局限于特定编程语言,只不过Scala比较适用于并发编程而已。

图书摘要

Scala并发编程(第2版)

[瑞士]亚历山大·普罗科佩茨(Aleksandar Prokopec) 著

王文涛 译

人民邮电出版社

北京

图书在版编目(CIP)数据

Scala并发编程:第2版/(瑞士)亚历山大·普罗科佩茨(Aleksandar Prokopec)著;王文涛译.--北京:人民邮电出版社,2021.5

ISBN 978-7-115-55834-3

Ⅰ.①S… Ⅱ.①亚…②王… Ⅲ.①JAVA语言—程序设计 Ⅳ.①TP312.8

中国版本图书馆CIP数据核字(2020)第266839号

版权声明

Copyright © Packt Publishing 2020. First published in the English language under the title Learning Concurrent Programming in Scala - Second Edition.

All Rights Reserved.

本书由英国Packt Publishing公司授权人民邮电出版社有限公司出版。未经出版者书面许可,对本书的任何部分不得以任何方式或任何手段复制和传播。

版权所有,侵权必究。

◆著 [瑞士]亚历山大·普罗科佩茨(Aleksandar Prokopec)

译 王文涛

责任编辑 陈聪聪

责任印制 王郁 焦志炜

◆人民邮电出版社出版发行  北京市丰台区成寿寺路11号

邮编 100164  电子邮件 315@ptpress.com.cn

网址 https://www.ptpress.com.cn

北京鑫正大印刷有限公司印刷

◆开本:800×1000 1/16

印张:21.75

字数:425千字  2021年5月第1版

印数:1-2000册  2021年5月北京第1次印刷

著作权合同登记号  图字:01-2018-7654号

定价:99.90元

读者服务热线:(010)81055410 印装质量热线:(010)81055316

反盗版热线:(010)81055315

广告经营许可证:京东市监广登字20170147号

内容提要

本书是一本关于并发编程技术的教程,书中详细介绍了并发编程中的主要概念和基本数据结构,包括传统并发模型、基于Future 和Promise的异步编程、数据并行容器、基于响应式扩展的并发编程、软件事务性内存、角色模型、并发编程实践和反应器编程模型等。本书基于Scala语言编写,实例丰富,可操作性很强。

本书面向的用户群体以Scala用户为主,因为书中所有的示例都是基于Scala代码的。但其他语言用户也可以从中获益良多,因为书中介绍的并发编程概念是普遍适用的,并不局限于特定编程语言,只不过Scala比较适用于并发编程而已。

关于作者

亚历山大·普罗科佩茨(Aleksandar Prokopec)是一名并发和分布式编程研究人员,他也是本书第1版的作者。亚历山大是瑞士洛桑联邦理工学院(EPFL)的计算机科学博士。他曾在Google公司工作过,现在是Oracle实验室的研究员。

作为EPFL Scala团队的一员,亚历山大积极投身于Scala编程语言研究,他主要从事基于Scala的并发计算抽象、数据并行编程和并发数据结构方面的工作。他创造了Scala并行容器(parallel collection)框架,它可用于Scala中的高层数据并行编程。他还参与了一些Scala并发库的开发,比如Future、Promise和ScalaSTM。亚历山大是分布式计算反应器编程(reactor programming)模型的主要创造者。

致谢

首先,感谢本书的所有审校人员,包括萨米拉·塔沙罗菲(Samira Tasharofi)、卢卡斯·莱茨(Lukas Rytz)、多米尼克·格伦茨(Dominik Gruntz)、米歇尔·申茨(Michel Schinz)、李震和弗拉基米尔·科斯秋科夫(Vladimir Kostyukov)等人,他们为本书提供了大量的反馈信息和意见。还要感谢Packt出版社的编辑,包括凯文·科拉科(Kevin Colaco)、斯鲁蒂·库蒂(Sruthi Kutty)、卡皮尔·埃姆纳尼(Kapil Hemnani)、瓦伊巴耶夫·帕瓦尔(Vaibhav Pawar)和塞巴斯蒂安·罗德里格斯(Sebastian Rodrigues)等,感谢他们对本书的付出,能与他们共事是我的荣幸。

本书中描述的并发框架是许多人共同努力的结果,要不是他们,这些框架也许永远不会出现在世人面前。有很多人直接或间接参与了这些框架的开发,他们是 Scala 并发编程真正的英雄,也是Scala的优秀并发编程能力的“源泉”。我会尽力将谢意传达给每一个人,但限于篇幅,这里无法将所有人一一列出。如果有人觉得致谢中有遗漏的情况,请及时联系我,以便在下一版中补充进来。

毫无疑问,需要特别感谢 Scala 语言的作者马丁·奥德斯基(Martin Odersky),因为Scala是本书中列出的那些并发框架的承载平台。还要特别感谢的是EPFL的Scala团队在最近十余年的付出,以及Lightbend公司的员工为使Scala成为最好的通用语言所做出的努力。

大部分 Scala 并发框架或多或少依赖于道格·利(Doug Lea)的工作成果。他的Fork/Join框架是Akka角色库、Scala并行容器库,以及Future和Promise库的基础。本书中描述的很多Java开发工具包(Java Development Kit,JDK)并发数据结构就是由他本人实现的,甚至很多Scala并发库都受到他的建议的影响。

Scala 的 Future 和 Promise 库最初的设计团队人员包括来自 EPFL 的菲利普·哈勒(Philipp Haller)、希瑟·米勒(Heather Miller)、沃因·约万诺维奇(Vojin Jovanović)和我,来自Akka团队的维克托·克朗(Viktor Klang)和罗兰·库恩(Roland Kuhn),来自Twitter 公司的马里乌斯·埃里克森(Marius Eriksen);另外,哈沃克·彭宁顿(Havoc Pennington)、里奇·多尔蒂(Rich Dougherty)、贾森·佐格(Jason Zaugg)、道格·利等人也做出了贡献。

虽然我是Scala并行容器库的主要作者,但这个库受到过很多人的影响,包括菲尔·巴格韦尔(Phil Bagwell)、蒂亚克·龙普夫(Tiark Rompf)、内森·布朗森(Nathan Bronson)、马丁·奥德斯基和道格·利。后来,德米特里·彼得拉什科(Dmitry Petrashko)和我一起改进并行和标准容器操作,这是通过Scala Macros来实现优化的。尤金·布尔马科夫(Eugene Burmako)和德尼斯·沙巴林(Denys Shabalin)是Scala Macros项目的主要贡献者。

Rx项目上的工作始于埃里克·梅耶尔(Erik Meijer)、韦斯·戴尔(Wes Dyer)和Rx团队的其他人。自从.NET得以实现,Rx框架又被移植到了很多其他语言上,包括Java、Scala、Groovy、JavaScript和PHP,并且在本·克里斯滕森(Ben Christensen)、塞缪尔·格吕特(Samuel Grütter)、朱世雄、唐娜·马拉耶里(Donna Malayeri)等人的贡献和维护下,Rx框架得到越来越广泛的使用。

内森·布朗森是 ScalaSTM 项目的主要贡献者,此项目的默认实现是基于内森的CCSTM项目的。ScalaSTM的应用程序接口(Application Program Interface,API)是由ScalaSTM专家组设计的,其成员包括内森·布朗森、乔纳斯·博纳(Jonas Bonér)、盖伊·科兰(Guy Korland)、克里希纳·桑卡尔(Krishna Sankar)、丹尼尔·斯皮瓦克(Daniel Spiewak)和彼得·维恩杰尔(Peter Veentjer)。

Scala 角色库的最初版本受 Erlang 的角色模型启发,并由菲利普·哈勒开发。这个库促使乔纳斯·博纳创建了Akka角色框架。Akka项目的贡献者包括维克托·巴生、亨里克·恩斯特伦(Henrik Engström)、彼得·维恩杰尔、罗兰·库恩、帕特里克·努德瓦尔(Patrik Nordwall)、比昂·安东松(Björn Antonsson)、里奇·多尔蒂、约翰内斯·鲁道夫(Johannes Rudolph)、马赛厄斯·德尼茨(Mathias Doenitz)和菲利普·哈勒等人。

最后,我还要感谢整个Scala社区的成员,是他们让Scala成为一门如此优秀的编程语言。

关于审校者

维卡什·夏尔马(Vikash Sharma)是来自印度的一名软件开发人员和开源技术推广员,他是 Infosys 公司的助理顾问,也是一名 Scala 开发者。他追求“大道至简”,能够写出简洁而可管理的代码。他还制作了一个有关Scala的视频教程。

感谢并不足以回馈我的母亲、父亲和哥哥对我的支持。我要感谢那些在我最需要帮助的时候在我身边的那些人,特别感谢维贾伊·阿西克萨万(Vijay Athikesavan)向我传授编程理念。

多米尼克·格伦茨是苏黎世联邦理工学院(ETH Zürich)的博士,自2000年以来在瑞士西北应用科学大学任计算机科学教授。除了研究项目,他还教授并发编程课程。几年前,这门课还在告诉学生并发程序太难写对了。但随着Java和Scala中高抽象层次框架的出现,这个观点发生了变化,而本书是极好的并发编程教材,可为所有希望编写正确、可读和高效的并发程序的程序员提供正确的方向。

感谢邀请我参加该书的审校。

李震在小学第一次接触 Logo 语言时开始对计算产生热情。在获得中国复旦大学软件工程学位和爱尔兰都柏林大学计算机科学学位之后,她又在美国佐治亚大学攻读博士学位。她研究的是程序员学习行为的心理因素,特别是程序员理解并发程序的方式。基于这项研究,她致力于设计有效的软件工程方法和教学方法,来帮助程序员“拥抱”并发程序。

李震在计算机科学多个领域拥有本科生教学经验,包括系统和网络编程、建模和仿真以及人机交互等。她对计算机编程教学的主要贡献是设计教学大纲,并提供多种编程语言以及多种并发编程形态的课程,鼓励学生积极取得自己的软件设计哲学和理解并发编程。

李震还有很多工业创新经验,10年来,她在多个 IT 公司工作过,包括 Oracle、Microsoft和Google,她参与开发了一些尖端产品、平台以及企业核心基础设施。李震对编程和教学充满热情。读者可以通过电子邮件janeli@uga.edu联系她。

卢卡斯·莱茨是一个编译器工程师,就职于Lightbend公司的Scala团队。他于2013年获得EPFL的博士学位,曾师从于马丁·奥德斯基,即Scala的作者。

米歇尔·申茨是EPFL的讲师。

萨米拉·查菲(Samira Charfi)在伊利诺伊大学获得软件工程博士学位。她在多个领域从事研究工作,比如测试并发程序(特别是基于角色的程序)、并行编程模式、基于组件的系统的验证等。萨米拉审校过多本图书。她还是多个软件工程会议的审稿人,多个会议的程序委员会成员。

我要感谢我的丈夫和母亲,感谢他们的爱和支持。

序言

并发计算和并行计算发展迅猛,或许它们曾经是纯粹的理论知识,或者局限于内核计算和高性能计算等少数几个领域,但如今,它们已经走向千家万户,成为每名优秀程序员必备的技能之一。因为并行和分布式计算系统已经是行业标配,大部分应用需要用并发计算来提高性能以及处理异步事件。

这是一场“革命”,然而,目前大部分开发者还没有准备好。也许有些人在学校学过传统的并发计算模型,采用的是线程和锁机制,但这种模型已经不足以高效、可靠地实现大规模并发计算。实际上,线程和锁机制较难用,更容易出错。因此,人们迫切需要对并发计算进行进一步抽象,构造出更高层次的组件。

15年前,我用过一门实验性语言 Funnel,它是 Scala 的先驱,在内核采用了并发语义。在这门语言中,所有编程概念都是基于函数式网络(functional net)的“语法糖”,这是一种面向对象版本的连接演算(join calculus)。虽然连接演算理论优美,但人们在试验后发现,并发计算其实涉及多方面的问题,难以在单一的形式化体系中很好地表达出来。也许并不存在解决所有并发问题的高招,对不同的需求需要采用不同的方案。比如,利用异步计算来响应事件和数据流,在消息通信时使用自发而独立的实体,为状态可变的数据中心定义事务,或者用并发计算提高性能。每一个任务都有着相应的更为合适的抽象方式:Future、响应式流(reactive stream)、角色、事务性内存或并行容器。

于是有了 Scala 和本书。并发计算中有用的抽象模型如此之多,将它们拼凑到一门语言中似乎并不明智。不过,Scala的目标就是让用户能在编码时更方便地定义各种高层抽象,并以此来构建代码库。于是,Scala程序员可以定义出能够处理不同并发编程问题的模块,而这样的模块都基于宿主系统提供的底层内核。回顾过去,这种编程方式已然成功。现如今,Scala已经拥有一些强大而优雅的并发编程库。本书将为读者呈现其中非常重要的几种,并介绍每一种的使用案例和应用模式。

本书的作者亚历山大·普罗科佩茨是Scala语言方面的专家。他编写了多个流行的Scala并发和并行库。他还发明了一些精巧的数据结构和算法。本书既是阅读性强的教程,也是本书作者所在的并发计算领域的重要参考资料。我相信本书会成为 Scala 并发和并行编程相关人员的必备读物。同时,如果读者只是希望了解一下这个快速发展、令人着迷的计算领域,本书也值得一读。

马丁·奥德斯基 EPFL的教授,Scala的创造者

前言

并发计算无处不在。随着消费者市场中多核处理器的崛起,人们对并发计算的需求已经在开发者世界中掀起巨大波澜。曾几何时,并发编程还只是一个学术名词,常常被解释为程序和计算机系统中的异步计算。现在,并发编程已经成为软件开发中被广泛遵循的方法论之一。于是,高级的并发框架和软件库如雨后春笋般大量涌现,让我们见证了并发计算领域的复兴。

随着现代程序语言和并发模型的抽象层次不断提高,确定它们的使用场合和时机就显得比较关键了。仅仅了解经典的并发和同步等基础原语(比如线程、锁和监控器等)已经不够。高层次的并发框架可解决很多传统并发计算面临的难题,而且可以针对具体任务进行裁剪,于是其逐渐占领了并发计算市场。

本书用 Scala 描述高层次的并发编程,详细解释了不同的并发计算主题,并覆盖了并发编程的基础理论。同时,本书还描述了几种现代并发计算框架,详细介绍了它们的语义及用法。在介绍重要的并发抽象概念的同时,本书也讲解了它们在实际场景中的应用。有理由相信,读者通过本书既可以获得对并发编程理论的扎实理解,也能学会如何编写正确而高效的并发程序。掌握这些实用技能,读者将能走出成为一名现代并发计算专家的第一步。本书的编写过程是令人愉快的,希望读者也能享受同样愉悦的阅读过程。

组织结构

本书各章分别介绍并发编程的不同的主题。其中包含 Scala 运行时中的基础并发计算API,以及更复杂的并发原语,还有一些高层次的并发抽象模型。

● 第1章:概述。本章介绍并发编程的必要性及背景。同时,本章介绍Scala语言基础知识,这对读者理解本书后面的内容是必要的。

● 第2章:JVM和JMM上的并发性。本章介绍并发编程的基础知识,包含如何使用线程、如何保护共享内存,以及JMM。

● 第3章:并发编程的传统构造模块。本章介绍经典并发编程的一些工具,比如线程池、原子性变量、并发容器,重点介绍它们在Scala语言中的体现。本书关注的是现代高层次并发编程框架。因而,本章只回顾传统并发编程技术,并不会深入展开讲解。

● 第4章:基于Future和Promise的异步编程。本章专门针对Scala并发框架进行讲解,介绍Future和Promise的API,以及它们在异步编程中的正确使用方法。

● 第5章:数据并行容器。本章描述Scala的并行容器框架,介绍如何将容器操作并行化,以及如何评估性能的提升。

● 第6章:基于响应式扩展的并发编程。本章介绍如何在基于事件和异步的编程中使用响应式扩展框架,介绍事件流操作和容器操作之间的对应关系、如何让事件在线程之间传递,以及如何使用事件流设计响应式用户界面。

● 第7章:软件事务性内存。本章介绍用于事务性编程的ScalaSTM库,它提供了一种更安全、更直观的共享内存模型。在本章中,读者将学习如何通过可扩展内存事务来保护共享数据,同时减少死锁和竞态条件发生的风险。

● 第8章:角色模型。本章介绍角色模型和Akka框架。在本章中,读者将学习如何透明地在多个机器上构建消息传递分布式程序。

● 第9章:并发编程实践。本章总结前面介绍的不同并发库。在本章中,读者将学习如何在解决实际问题时选择正确的并发抽象模型,以及如何在设计大型并发应用时结合使用多个并发抽象模型。

● 第10章:反应器编程模型。本章介绍反应器编程模型,重点介绍如何更好地在并发和分布式程序中实现模拟化组合。这种新的模型将并发和分布式编程模式分解为模块化组件,称为协议。

推荐读者依次阅读本书这些章节的内容,也可以不必完全如此。如果读者已经对第2章的内容很熟悉,可以直接跳过这一章。唯有第9章依赖于前面的内容,因为这一章是对前面章节的内容的总结。第10章则用于帮助读者理解角色和事件流的工作方式。

阅读本书所需的条件

下面主要介绍阅读和理解本书所需的必要条件,包括JDK(这是运行Scala所必需的)的安装和如何使用简单构建工具(Simple Build Tool,SBT)运行示例程序。

本书中并不要求使用集成开发环境(Integrated Development Environment,IDE),使用什么工具编写代码完全由读者自行决定。原则上,任何文本编辑器都是可以的,包括但不限于Vim、Emacs、Sublime Text、Eclipse、IntelliJ IDEA和Notepad++等。

安装JDK

Scala程序并不会被直接编译成本地机器码,所以无法在硬件平台上作为可执行程序来运行。Scala 编译器会生成一种称为 Java 字节码的中间代码,这种中间代码需要运行在Java虚拟机(Java Virtual Machine,JVM)软件上。下面将介绍如何下载和安装JDK, JDK中就包含了JVM和其他一些有用的工具。

JDK的软件实现有很多,它们来自不同的软件厂商,本书建议使用Oracle JDK。下载和安装JDK的步骤如下。

1.进入官网下载。

2.如果官网打不开,则可以在搜索引擎中搜索关键字“JDK下载”。

3.找到Java SE的下载链接,在官网上选择正确的版本并下载,比如操作系统可以是Windows、Linux或macOS,系统架构可以是32位也可以是64位。

4.如果使用Windows,则直接运行安装包。如果使用macOS,则打开.dmg文件并安装JDK。如果使用Linux,先将压缩包解压到某个目录,比如XYZ,然后将其加到环境变量PATH中。

export PATH=XYZ/bin:$PATH

5.现在,读者应该可以在终端中运行java和javac命令了。在终端中试一试javac命令,看看系统能不能找得到JDK(本书中不会直接使用这个命令,这里只是用它来验证JDK是否已经装好)。读者的操作系统中有可能已经安装过JDK了,验证方式同样是使用javac命令。

安装和使用SBT

SBT是Scala工程所用的命令行构建工具。它的用途包括编译Scala代码、管理依赖项、持续性编译和测试、部署等。纵观全书,示例代码都是用SBT来管理依赖项和运行的。

安装SBT的步骤如下。

1.进入Scala官网。

2.下载读者所用操作系统对应的安装文件。如果使用Windows,这是个.msi安装包;如果使用Linux或macOS,这是一个.zip或.tgz压缩包。

3.安装SBT。在Windows下可直接运行安装包;在Linux或macOS下,将压缩包解压到用户主目录下即可。

安装好SBT之后,按照下面的步骤可生成一个新的SBT工程。

1.打开Windows下的命令提示符窗口,或Linux/macOS下的终端窗口。

2.创建一个名称为scala-concurrency-examples的目录(在Linux中)。

    $ mkdir scala-concurrency-examples

3.进入scala-concurrency-examples目录。

    $ cd scala-concurrency-examples

4.创建本示例中的唯一的源码目录。

    $ mkdir src/main/scala/org/learningconcurrency/

5.用选定的文本编辑器创建一个构建定义文件build.sbt。此文件定义了工程属性。在工程根目录(scala-concurrency-examples)中创建这个文件,并加入如下定义(注意,空行是必要的)。

    name := "concurrency-examples"
    version := "1.0"
    scalaVersion := "2.11.1"

6.最后,回到终端,从工程根目录运行SBT。

    $ sbt

7.SBT会打开一个交互式命令行界面,可以输入一些SBT构建命令。

现在,读者可以开始编写Scala程序了。打开编辑器,在目录src/main/scala/org/learningconcurrency中创建一个源码文件HelloWorld.scala。并在HelloWorld. scala中加入如下内容。

    package org.learningconcurrency
    object HelloWorld extends App {
      println("Hello, world!")
    }

回到终端窗口的SBT交互式命令行界面,运行如下命令。

    > run

会得到如下输出。

    Hello, world!

上述步骤对于本书中大部分示例来说已经够用。不过,偶尔还会用到外部依赖库,SBT会从标准软件仓库中自动解析和下载这些库。有时候,还需要指定其他的软件仓库,所以可以在build.sbt中加入如下内容。

    resolvers ++= Seq(
      "Sonatype OSS Snapshots" at
        "https://oss.sonatype.org/content/repositories/snapshots",
      "Sonatype OSS Releases" at
        "https://oss.sonatype.org/content/repositories/releases",
      "Typesafe Repository" at
        "http://repo.typesafe.com/typesafe/maven-releases/"
    )

这样,所有必需的软件仓库都已经就绪,然后就可以添加具体的库了。在 build.sbt文件中加入下列内容,它表示添加Apache Commons IO库。

    libraryDependencies += "commons-io" % "commons-io" % "2.4"

修改完build.sbt之后,有必要重新加载正在运行的SBT实例。在SBT交互式命令行中,执行如下命令。

    > reload

这样,SBT就能检测到构建定义文件中的任何改动,并加载必要的外部依赖库。

不同的 Scala 库保存在不同的命名空间中,称为包。为获得某个包的内容,需要使用import语句。本书中的示例在第一次使用某个并发库时,会给出其import语句,但后续再次出现该并发库时,就不会重复同样的语句了。

类似地,为了更简练,本书也会在代码示例中避免出现重复的包声明。只需要让每一章使用同一个包命名规则即可。比如,第2章所有代码都放在名为org. learningconcurrency. ch2的包中。这一章中的代码示例都以如下代码开头。

    package org.learningconcurrency
    package ch2

本书是关于并发计算和异步编程的书。许多示例启动的是并发计算,它们会在主程序停止之后仍然继续执行。为确保这些并发计算总能结束,本书中的示例都是在 SBT 本身的JVM实例上运行的。所以,需要在build.sbt文件中加入下面一行内容。

    fork := false

如果某个示例需要用到另一个JVM进程,书中会给出明确的提示。

使用Eclipse、IntelliJ IDEA或其他IDE

使用诸如Eclipse或IntelliJ IDEA的IDE的一个好处是用户可以一气呵成地编写、编译和运行Scala代码。这时就不再需要安装SBT了。虽然本书建议读者使用SBT运行示例代码,但使用IDE也是没问题的。不过,使用IDE运行本书的示例代码会有一个问题,即Eclipse和IntelliJ IDEA等IDE会在单独的JVM进程中运行程序。前面提过,某些并发计算会在主程序结束之后仍然继续运行,为了确保它们总能正常结束,用户有时需要在主程序后面添加 sleep 语句,以延缓主程序的退出。本书的大部分示例代码会加上sleep语句,但有时候读者需要自行加上。

本书的目标读者

本书主要面向学过串行 Scala 程序且希望了解如何编写并发程序的开发者。本书假设读者对 Scala 程序语言有基本的了解。本书的目的是展示如何编写并发程序,因此会坚持只使用Scala中的简单功能。即使只对Scala有初步了解,读者也应该能容易地理解书中的各种并发编程主题。

但这并不是说本书只面向Scala开发者。不管是Java、.NET,还是其他的程序语言的爱好者,都能够通过阅读本书有所收获。从这一方面看,读者具备基本的面向对象编程或函数式编程经验应该就足够了。

广义来讲,本书其实是一本介绍现代并发编程的图书。即使只是略懂多线程计算或JVM并发模型,读者也能从本书中学到很多关于现代高层次并发编程工具的知识。书中提到的很多并发库也只是刚开始进入主流编程语言行列,有些还算得上是前沿技术。

资源与支持

本书由异步社区出品,社区(https://www.epubit.com/)为您提供相关资源和后续服务。

配套资源

本书提供如下资源:

● 本书源代码。

要获得以上配套资源,请在异步社区本书页面中单击,跳转到下载界面,按提示进行操作即可。注意:为保证购书读者的权益,该操作会给出相关提示,要求输入提取码进行验证。

提交勘误

作者和编辑尽最大努力来确保书中内容的准确性,但难免会存在疏漏。欢迎您将发现的问题反馈给我们,帮助我们提升图书的质量。

当您发现错误时,请登录异步社区,按书名搜索,进入本书页面,单击“提交勘误”,输入勘误信息,单击“提交”按钮即可。本书的作者和编辑会对您提交的勘误进行审核,确认并接受后,您将获赠异步社区的100积分。积分可用于在异步社区兑换优惠券、样书或奖品。

扫码关注本书

扫描下方二维码,您将会在异步社区微信服务号中看到本书信息及相关的服务提示。

与我们联系

我们的联系邮箱是chencongcong@ptpress.com.cn。

如果您对本书有任何疑问或建议,请您发邮件给我们,并请在邮件标题中注明本书书名,以便我们更高效地做出反馈。

如果您有兴趣出版图书、录制教学视频,或者参与图书翻译、技术审校等工作,可以发邮件给我们。

如果您所在的学校、培训机构或企业,想批量购买本书或异步社区出版的其他图书,也可以发邮件给我们。

如果您在网上发现有针对异步社区出品图书的各种形式的盗版行为,包括对图书全部或部分内容的非授权传播,请您将怀疑有侵权行为的链接发邮件给我们。您的这一举动是对作者权益的保护,也是我们持续为您提供有价值的内容的动力之源。

关于异步社区和异步图书

异步社区”是人民邮电出版社旗下IT专业图书社区,致力于出版精品IT技术图书和相关学习产品,为作译者提供优质出版服务。异步社区创办于2015年8月,提供大量精品IT技术图书和电子书,以及高品质技术文章和视频课程。更多详情请访问异步社区官网https://www.epubit.com。

异步图书”是由异步社区编辑团队策划出版的精品IT专业图书的品牌,依托于人民邮电出版社近30年的计算机图书出版积累和专业编辑团队,相关图书在封面上印有异步图书的LOGO。异步图书的出版领域包括软件开发、大数据、AI、测试、前端、网络技术等。

异步社区

微信服务号

第1章 概述

10年前就有人预言一台计算机的能力已经达到极限,只有通过多台计算机互联才能真正实现突破。

——吉恩·阿姆达尔(Gene Amdahl)

虽然并发编程这个领域已经有很长的发展历史了,但直到多核处理器出现才受到关注。计算机硬件近年来的发展不仅“复兴”了一些经典并发技术,也开启了并发编程范式的一次重大“变迁”。现如今,并发编程已经不可或缺,理解并发编程甚至已经成为每个软件开发者的核心技能之一。

本章将解释并发计算的一些基本概念,并介绍阅读本书所需的Scala语言基础知识。具体而言,包含如下几个方面。

● 概述并发编程。

● 阐述用Scala开展并发编程的好处。

● 介绍阅读本书所需的Scala语言基础知识。

首先,本章将介绍什么是并发编程及其重要性。

1.1 并发编程

在并发编程中,程序被描述为一些并发计算的集合,这些计算在时间上重叠,在执行过程中互相协调。实现正确运行的并发程序比实现串行程序困难多了。串行程序中的所有陷阱在并发程序中同样存在,而并发程序本身还有很多问题。于是有人会说:“何必呢?继续写串行程序不好吗?”

这是因为并发编程的好处还是很多的。首先,提高并发度可以改进程序性能。在一个处理器上运行整个程序会很慢,让不同的处理器同时运行多个子任务可以提高速度。随着多核处理器的流行,性能因素成为并发编程获得关注的主要原因。

其次,并发编程模型可以实现更快的I/O操作。纯串行程序必须周期性地查看I/O,以检测是否有来自键盘、网卡或其他设备的数据输入。而并发程序可以立即响应 I/O 请求。对于 I/O 密集型操作,这会提高数据吞吐量,这也是在多核处理器出现之前不少程序语言中就已经支持并发编程的原因。因而,并发性可保证提高程序与环境交互时的响应性(responsiveness)。

最后,并发性可简化计算机程序的实现和提高可维护性。有些应用程序用并发性来表述时会更为简洁。与其将所有计算过程嵌入一个较大的工程中,将它分到更小且独立的计算过程中可能会更方便一些。用户接口、网络服务器以及游戏引擎都属于这类应用。

本书假设并发程序通过共享内存的方式互相通信,且所有程序都在同一台计算机上运行。相比而言,分散在不同计算机上,且各自拥有独立内存的程序,称为分布式程序。编写分布式程序的领域称为分布式编程。一般而言,分布式程序假设每台计算机都可能失效,因而提供了应对这种情况的保护措施。本书重点关注并发程序,但也会涉及一些分布式程序。

1.1.1 传统并发计算概述

在一个计算机系统中,并发性具有多个层次,它可以存在于计算机硬件层面、操作系统层面,也可以存在于编程语言层面。本书主要关注编程语言层面的并发性。

在并发计算系统中,多个执行(execution)之间的协调称为同步,这是实现并发性的关键要素。同步涉及维持并发程序时序性的机制。此外,同步指定了并发执行之间的通信方式,即如何交互信息。在并发程序中,不同的执行通过修改计算机的共享内存子系统实现通信。这种通信称为共享内存通信。在分布式程序中,不同执行之间通过交换消息实现通信,这种通信称为消息传递通信。

在底层,并发执行称为进程和线程的实体表示,详情参见第2章。进程和线程使用一些传统实体(比如锁和监控器)来维护相互之间的运行次序。在线程之间建立运行次序保证了前一个线程对内存的修改在后一个线程中是可见的。

一般而言,单独用线程和锁来表达并发程序是很烦琐的。于是,产生了一些更复杂的并发工具用于解决这个问题,比如通信通道、并发容器、同步栅栏(barrier)、计数闩(countdown latch)、线程池等。这些工具可以更好地表达特定类型的并发编程模式,第3章会介绍其中一部分。

相比而言,传统的并发编程更底层一些,且容易出错,比如死锁、饥饿、数据争用和竞态条件(race condition)。使用Scala编写并发程序时,一般不会使用底层并发原语。不过,对底层并发编程有基本了解也是有价值的,这对进一步理解高层次的并发概念很有帮助。

1.1.2 现代并发编程范式

现代并发编程范式比传统方法更高级,关键的区别在于高层次的并发框架更关心如何表述目标,而不是如何实现目标。

在实践中,底层和高层并发之间没那么泾渭分明,而且一些并发框架进一步填充了两者之间的空白,形成了一个并发框架的谱系。不过,并发编程目前的发展趋势更倾向于声明式和函数式风格。

在第2章中会看到,并发计算一个值需要用到一个线程,该线程要定制其run方法,并用start方法启动,启动之后需要等待线程结束,然后到指定内存区域读取最后的结果。总而言之,并发计算过程就是并发地计算,然后等待结束通知。

但更好的并行计算方案是选择一种编程模型,将并发计算时的通信细节隐藏。这样,用户不会感觉到自己在等待,也不需要手动去内存中取结果,仿佛计算结果自然而然就产生了。异步编程中的一种称为Future的范式特别适用于这种声明式并发计算场合,这在第4章中会介绍。类似地,响应式编程(reactive programming)使用事件流,它以一种声明式的方式表达多个值的并发计算,详情参见第6章。

声明式编程风格在串行编程中也越来越普遍。像Python、Haskell、Ruby和Scala这样的语言使用函数式操作处理容器数据结构,并支持“filter all negative integers from this collection”这样的声明式语句。这种语句表述的是目标,而不是底层的实现方式,因此后台也就有了很大的并行优化空间。第5章将描述 Scala 中的数据并行容器(parallel collection)框架,用于在多核环境下对容器操作加速。

另一个高层次并发框架的发展趋势是专用化。软件事务性内存技术专门用于表达内存事务,却丝毫不关心如何启动并发执行。内存事务是指一连串内存操作,这些操作要么都执行,要么都不执行,这个概念类似于数据库事务。使用内存事务的好处是,避免了底层并发计算的常见错误。第7章详细解释了事务性内存。

一些高层次并发框架还致力于实现分布式计算的透明化。对数据并行框架和消息传递并发框架而言尤其如此,比如第8章中的角色模型(actor model)。

1.2 Scala的优势

虽然Scala的发展势头很好,但其应用范围还没法和Java这样的“主流”语言相提并论。不过 Scala 的并发编程生态圈非常丰富而强大。几乎所有的并发编程样式都能在Scala 的并发编程生态圈中找到,且处于活跃的发展之中。Scala 在不断扩展其并发计算领域,提供更多现代化的高层次的应用程序接口(Application Programming Interface, API)。Scala在并发计算领域获得成功的原因有很多,主要的一个原因在于众多现代并发框架都能从Scala原生的灵活语法特性中获益,包括头等函数(first-class function)、传名参数、类型推理以及模式匹配等,这些特性在本书中都有介绍。通过这些语言特性,可以定义看起来具有原生语言特性的新API。这样的API可以将不同的编程模型伪装成嵌入宿主语言Scala的领域语言,比如角色模型、软件事务性内存和Future,它们看起来具有基础的语言特性,实际上只不过是第三方库。这样,Scala成功将众多现代并发框架吸收进来,无须再为每一种并发编程模型设计一种新的语言。而且,相对于其他语言,较小的语法负担也吸引了不少用户。

Scala发展良好的第二个原因在于它是一种安全的语言。原子垃圾回收、自动边界检测以及避免指针运算,让Scala避免出现内存泄露、缓冲区溢出等内存问题。此外,Scala的类型安全性也消除了很多早期的编程错误。当涉及并发计算时,因为其本身问题可能就不少,所以少一个语言层面的问题,也就少了一些烦恼。

第三个重要原因在于 Scala 的互操作性。Scala 程序被编译为 Java 字节码,在 Java虚拟机(Java Virtual Machine,JVM)上运行。Scala程序可以与Java库无缝集成,从而充分利用 Java 庞大的生态圈。通常情况下,切换到不同的语言是很痛苦的,但对 Scala而言,从Java切换过来则平缓和容易得多。这也是Scala的市场占有率越来越高的原因,而且一些Java兼容的框架也愿意选择Scala作为实现语言。

重要的是,Scala运行在JVM上意味着Scala程序是跨平台的。不仅如此,鉴于JVM拥有定义良好的线程和内存模型,这也保证了Scala可以在不同类型的计算机上以相同的方式运行。为实现语义一致性,可移植性对串行程序非常重要,对并发计算更是如此。

说了Scala的不少优点,后文将介绍本书涉及的Scala语言特性。

1.3 准备工作

本书假设读者对串行编程有基本的了解。虽然建议读者至少熟悉一些 Scala 语言的知识,但对于本书来说,理解类似的语言(比如 Java 或 C#)也足够了。如果对面向对象编程中的概念有基本的了解,比如类、对象、接口(interface)等,则阅读本书也会更容易一些。同样,对函数式编程原则有基本理解,比如头等函数、纯洁性和类型多态性等,对阅读本书也有帮助,但这不是必需的。

1.3.1 执行一个Scala程序

为了更好地理解 Scala 程序的执行模型,先考虑使用一个简单的程序。在此程序中用square方法来计算数字5的平方,然后将结果输出到标准输出上。

    object SquareOf5 extends App {
      def square(x: Int): Int = x * x
      val s = square(5)
      println(s"Result: $s")
    }

使用简单构建工具(Simple Build Tool,SBT)运行这个程序时,JVM运行时会分配程序所需的内存。这里考虑两种重要的内存区域:调用栈和堆对象。调用栈存储了程序的局部变量信息和当前方法的参数信息。堆对象中保存了程序分配的对象。为理解这两个区域的区别,考虑上述程序的执行过程,如图1.1所示。

如图1.1中第1步所示,程序在调用栈中为局部变量s分配一个条目,在第2步中调用square方法计算局部变量s的值。程序将值5放在调用栈上,作为x参数的值。程序还保留了调用栈中的一个条目,用于存放方法的返回值。到这里为止,程序可以开始执行square方法了,让x参数与它自己相乘,并将返回值25放在调用栈中,如图1.1第3步所示。

square方法返回之后,结果25被复制到局部变量s所在调用栈的位置上,如图1.1第4步所示。现在,程序必须为println语句创建一个字符串。在Scala中,字符串为String类的对象实例,程序会在堆对象上分配一个新的String对象,如图1.1第5步所示。如图1.1第6步所示,程序将对分配对象的引用保存在调用栈中的x位置上,然后调用println方法。

虽然这个过程被严重简化了,但它展示了Scala程序的基本执行模型。在第2章中,我们将了解到,每个执行的线程都会维护自己独立的调用栈,并且线程之间主要通过修改堆对象进行通信,而堆对象和局部调用栈之间的不一致造成了并发程序中的大部分错误。

了解了Scala程序的典型执行过程,现在就可以看一看Scala有哪些语言特性了。本章只介绍理解本书所必需的那些内容。

1.3.2 初识Scala

本节简单描述和本书示例相关的 Scala 语言特性,以快速、粗略的介绍为主,并不是Scala的完整指南。

通过本节,读者可以回想起一些 Scala 语言特性,并能将其和自己熟悉的语言进行比较。如果想更深入地了解Scala,请参考本章小结中提到的参考书目。

下面的示例代码定义了 Printer 类,它有一个 greeting 参数和两个方法:printMessage和printNumber。

    class Printer(val greeting: String) {
      def printMessage(): Unit = println(greeting + "!")
      def printNumber(x: Int): Unit = {
        println("Number: " + x)
      }
    }

上述代码中,printMessage 方法没有参数,只有一个 println 语句。printNumber有一个Int类型的参数x。两个方法都没有返回值,因此标识为Unit类型。下面的代码将此类实例化,并调用其方法。

    val printy = new Printer("Hi")
    printy.printMessage()
    printy.printNumber(5)

Scala 支持单例对象的声明。这就像声明一个类,然后将其实例化。之前介绍过的SquareOf5就是一个单例对象,它适用于声明一个简单的Scala程序。下面的单例对象Test声明了字段Pi,并将其初始化为3.14。

    object Test {
      val Pi = 3.14
    }

在其他类似语言中,供类扩展的实体称为接口,Scala 中相似的概念则称为特质(trait)。Scala 的类可以扩展特质,而且 Scala 特质还支持具体的字段和方法实现。在下面的示例中,定义了Logging特质,它通过抽象的log方法输出自定义错误和警告信息,然后将此特质加入PrintLogging类中。

    trait Logging {
      def log(s: String): Unit
      def warn(s: String) = log("WARN: " + s)
      def error(s: String) = log("ERROR: " + s)
    }
    class PrintLogging extends Logging {
      def log(s: String) = println(s)
    }

类定义中可以有类型参数(type parameter)。下面的泛型Pair类有两个类型参数P和Q,其决定了两个参数的类型。

    class Pair[P, Q](val first: P, val second: Q)

Scala支持头等函数对象,其也称为匿名函数。在下列代码中,声明了一个匿名函数twice,它用于将参数乘以2。

    val twice: Int => Int = (x: Int) => x * 2

在上述代码中,(x: Int)为匿名函数的参数部分,而x * 2则是函数体。=>符号必须位于匿名函数的参数和函数体之间。=>符号还用于表示匿名函数的类型,这里是Int => Int,可念成“从Int到Int”。在前面的示例中,函数类型标记Int => Int是可以省略的,因为编译器可以自动推理twice函数的类型,如下所示。

    val twice = (x: Int) => x * 2

在一种更简洁的语法中,可以忽略匿名函数声明中的参数类型标记,如下所示。

    val twice: Int => Int = x => x * 2

如果匿名函数的参数只在函数体中出现一次,甚至还可以表示得更简单一些,如下所示。

    val twice: Int => Int = _ * 2

Scala对头等函数的支持表现在可以将代码块作为参数传给函数,从而得到一种更轻量级的简洁语法。在下面的示例中,使用传名参数(byname parameter)声明了runTwice方法,此方法将代码块执行两次。

    def runTwice(body: =>Unit) = {
      body
      body
    }

在传名参数的声明中,=>符号被置于类型之前。RunTwice方法每引用一次body参数,这个代码块中的语句就会被重新执行,如下所示。

    runTwice { // 将Hello输出两次
      println("Hello")
    }

Scala的for表达式可以对容器进行遍历和变换。下面的for循环输出0~10的数字(不包含10)。

    for (i <- 0 until 10) println(i)

上述代码中,区间由表达式0 until 10创建,它等价于0.until(10),即调用值0的until方法。在Scala中,当调用对象的方法时,句点符号可以忽略。每个for循环都等价于一个foreach语句。上述for循环会被Scala编译器编译成下链表达式。

    (0 until 10).foreach(i => println(i))

Scala的for推导式(comprehension)语句可实现数据的变换。下面的for推导式将0~10的数字都乘以-1。

    val negatives = for (i <- 0 until 10) yield -i

negatives中的值为−10~0的负数。这个for推导式等价于下列map调用。

    val negatives = (0 until 10).map(i => -1 * i)

for推导式还支持多个输入数据的变换。下面的for推导式语句创建0~4的整数的所有二元组。

    val pairs = for (x <- 0 until 4; y <- 0 until 4) yield (x, y)

上述for推导式等价于下列表达式。

    val pairs = (0 until 4).flatMap(x => (0 until 4).map(y => (x, y)))

for 推导式中支持嵌入任意多个生成器表达式。Scala编译器会将它们翻译成多个嵌套flatMap,然后在最里层调用map。

常用的Scala容器包括序列(sequence),记为Seq[T]类型;映射(map),记为Map[K, V]类型;集合(set),记为Set[T]类型。在下面的示例中,创建了字符串的一个序列。

    val messages: Seq[String] = Seq("Hello", "World.", "!")

本书使用了大量的字符串模板(string interpolation)功能。一般来说,Scala字符串使用双引号。而字符串模板前面则多了一个 s 字符,字符串中间可以用$符号引用任何当前作用域中的标识符,如下所示。

    val magic = 7
    val myMagicNumber = s"My magic number is $magic"

模式匹配是另一个重要的Scala 语言特性。对Java、C#或C 用户而言,理解 Scala的match语句的一种办法是将其类比于switch语句。match语句可以分解为任意多个子句,并支持用户在程序中简洁地表达不同的匹配情况。

在下面的示例中,声明了一个 Map 容器,名为 successors,它将整数映射到自己的直接后继。然后调用get方法来获得数字5的后继。get方法返回了一个对象,类型为Option[Int],表示结果要么属于Some类(表示5在此映射中存在),要么属于None类(表示5不是此映射的一个键)。Option对象上的模式匹配支持逐个情况的比对,如下所示。

    val successors = Map(1 -> 2, 2 -> 3, 3 -> 4)
    successors.get(5) match {
      case Some(n) => println(s"Successor is: $n")
      case None => println("Could not find successor.")
    }

在 Scala 中,大部分操作符可重载。操作符重载不同于重新声明一个方法。在下面的示例中,定义了一个Position类,它有一个+操作符。

    class Position(val x: Int, val y: Int) {
      def +(that: Position) = new Position(x + that.x, y + that.y)
    }

Scala还支持定义包对象(package object),用于存储一个包的最外层方法和值定义。在下面的示例中,声明了org.learningconcurrency中的一个包对象。其中实现了最外层的log方法,用于输出指定的字符串和当前线程名称。

    package org
    package object learningconcurrency {
      def log(msg: String): Unit =
        println(s"${Thread.currentThread.getName}: $msg")
    }

本书后面会一直使用这个log方法,它用于追踪并发程序的执行过程。

本节的 Scala 语言特性就介绍到这里了。如果想更深入地理解这门语言,建议参考Scala的串行编程入门图书。

1.4 小结

本章讨论了什么是并发编程和在并发编程中选择 Scala 的原因,并简单列举了本书的主要内容和内容组织,而且为了帮助读者理解后续章节,还特意介绍了必要的 Scala基础知识。如果读者想要深入学习 Scala的串行编程,建议阅读由马丁·奥德斯基(Martin Odersky)等人编写的图书《Scala编程》。

第2章将介绍JVM上的并发编程基础知识,包括并发编译的基本概念、JVM提供的底层并发编译工具,以及Java内存模型(Java Memory Model,JMM)。

1.5 练习

下面的练习用于测试读者对 Scala 编程知识的掌握程度,其覆盖了本章的内容和其他Scala特性。练习8和9对比了并发编程和分布式编程的区别。读者并不需要编写完整的Scala程序,可以用伪代码来解答练习中的问题。

1.实现一个具有如下类型声明的compose方法。此方法必须返回一个函数h,它是输入函数f和g的组合。

    def compose[A, B, C]
    (g: B => C, f: A => B): A => C = ???

2.实现一个具有如下类型声明的fuse方法,若a和b都非空,则返回的Option对象应该包含Option对象a和b中的值的二元组(使用for推导式)。

    def fuse[A, B]
    (a: Option[A], b: Option[B]): Option[(A, B)] = ???

3.实现一个check方法,其参数是类型为T的值的序列和类型为T => Boolean的函数,当且仅当 pred 函数对 xs 中所有的值都为真,并且不会抛出异常时,check才返回真。

    def check[T](xs: Seq[T])(pred: T => Boolean): Boolean = ???

check的使用方法如下。

    check(0 until 10)(40 / _ > 0)

check方法使用的是柯里化定义,即两个参数没有用一个链表表示,而是用两个链表表示。柯里化定义让函数调用语法更优雅,而且语义上等价于单个参数链表的定义方式。

4.修改本章的Pair类,使它能够用于模式匹配。

如果读者从未做过,可以先熟悉一下Scala中的模式匹配。

5.实现一个permutations函数,它将一个字符串变换为一个字符串序列,结果中每一个字符串都是输入字符串字母顺序的变换结果。

    def permutations(x: String): Seq[String]

6.实现一个combinations函数,输入是一个元素序列,输出是长度为n的所有可能组合的遍历器。组合指的是从一个元素集合中选出一个子集的方式,每个元素只被选择一次,只不过不关心元素子集中的次序。比如,给定序列 Seq(1, 4, 9, 16),长度为2的组合包括Seq(1, 4)、Seq(1, 9)、Seq(1, 16)、Seq(4, 9)、Seq(4, 16)和 Seq(9, 16)。combinations 函数的定义声明如下(参见标准库文档中Iterator的API)。

    def combinations(n: Int, xs: Seq[Int]): Iterator[Seq[Int]]

7.实现一个方法,输入是一个正则表达式,输出是一个部分函数。部分函数将一个字符串映射为此字符串中的匹配链表。

    def matcher (regex: String): PartialFunction[String, List[String]]

如果没有找到匹配项,则这个部分函数不需要定义;否则此函数使用正则表达式输出匹配链表。

8.假设读者和同事们同处一个办公室,各自有一个隔间,大家互相看不见对方,且不能说话(会吵到别人)。因为都被限制在隔间中,所以每个人都无法确认传递聚会消息的纸条是否已经到达目的地。在某一时刻,其中一人被叫到老板办公室,被永久“扣留”在那里。请设计一个算法,使大家能够确定何时能够聚会。除被老板叫走的那位之外,所有人都需要同时做出决定。如果有些纸条被递到目的地时发生随机性失误,则该如何改进算法?

9.在练习8中,假设读者和同事们所在的办公室旁的大厅中有一个白板。每个人可以偶尔穿过大厅,并在白板上写字,但无法保证哪两个人能同时出现在大厅中。在这种设定下设计一个算法解决同时确定时间的问题,即用白板替代练习8中的纸条。

第2章JVM和JMM上的并发性

在某种程度上,所有非平凡的抽象都会被泄露。

——杰夫·阿特伍德(Jeff Atwood)

从诞生之日起,Scala 程序就主要运行在 JVM 上,这种设计也成就了众多的 Scala并发库。Scala 的内存模型、多线程能力以及线程间同步,都继承自 JVM。绝大部分高层次的 Scala 并发构造都基于本章介绍的底层原语。这些原语是实现并发计算的基本方式,可以说,本章介绍的 API和同步原语是 JVM上的并发编程的基石。在大部分情况下,用户需要避免直接使用底层并发技术,而应该使用后文会介绍的高层次并发抽象构造技术。不过,用户还需要掌握一些基础知识,比如什么是线程、为什么保护块优于忙等待、内存模型有何用处等。有理由相信,明白这些问题有助于理解高层次并发抽象构造。尽管有人说需要了解实现细节的抽象不是好的抽象,但是理解抽象背后的基础知识还是很有益处的。实际上,所有抽象在某种程度上都是会泄露底层细节的。

接下来,本章不仅会介绍JVM的并发计算基础,还会讨论它们和Scala中的具体特性是如何交互的。具体而言,本章涉及如下主题。

● 创建和启动线程,并等待其结束。

● 通过对象监控器和同步语句在线程之间进行通信。

● 利用卫式代码块避免忙等待。

● 易失变量的语义。

● JMM的规范及其重要性。

本章将介绍如何使用线程,这也是最基础的并发计算。

2.1 进程和线程

在现代抢占式多任务操作系统中,程序员基本无法指定一个处理器来运行程序。实际上,同一个程序可能同时运行在多个处理器上。在程序运行过程中,不同部分的可执行代码被分配到不同的处理器上,这个分配机制称为多任务,它是由操作系统负责的,计算机用户是看不见的。

历史上,在操作系统中采用多任务是为了改善用户体验,这样多个用户或程序就可以同时使用同一台计算机上的资源了。在合作式多任务操作系统中,程序可以决定何时停止使用处理器,并将控制权移交给其他程序。不过,这对程序员提出了更高的要求,且很容易造成程序无法及时响应。比如,某个下载管理器开始下载一个文件,它就需要及时移交控制权,否则,如果等到文件下载完成再移交控制权,用户体验就被破坏了。现在大部分操作系统依赖于抢占式多任务机制,这样每个程序都会不断被分配到某个处理器上的一小段处理时间。这些时间片断被称为时间片。因而,对应用开发者和用户而言,多任务的调度过程是不可见的。

同一个计算机程序可以被启动多次,这些被重复启动的程序甚至可以在同一个操作系统中同时运行。进程是正在运行的计算程序实例。当一个进程启动之后,操作系统会为它分配一部分内存和其他计算资源。然后,操作系统将进程指定给一个处理器,让进程在处理器的一个时间片上运行。一个时间片用完之后,操作系统可能会将处理器的下一个时间片分配给其他进程。需要强调的是,进程的内存和其他计算资源是私有的,进程之间不能直接访问对方的内存,也不能同时使用私有的大部分资源。大部分程序只有一个进程,有些程序则可能有多个进程。对于多进程的程序,程序内的不同任务分别由独立的进程处理。因为进程无法直接访问其他进程的内存,所以基于多进程的多任务程序的实现过程往往是非常烦琐的。

在多核计算机成为主流之前,多任务处理就已经非常重要。网页浏览器之类的大型程序需要将功能划分为多个逻辑子模块。浏览器的下载管理器用于下载文件,它和网页刷新或文档对象模型(Document Object Model,DOM)更新之间应该相互独立。当用户访问一个社交网站时,文件下载过程应该在后台运行,只不过这些独立的计算隶属于同一个进程。进程中独立的计算过程称为线程。在典型的操作系统中,线程数目是要大于进程数目的。

每个线程都描述了程序执行过程中的程序栈和程序计数器的当前状态。程序栈包含当前执行的一系列方法调用,以及每个方法的局部变量和参数。程序计数器描述了当前方法中指令的位置。处理器运行线程的方式是操作其程序栈的状态或程序对象的状态,然后执行当前程序计数器上的指令。当我们说一个线程执行一个操作(如将数据写入一个内存位置),指的是该线程所在的处理器执行了那个操作。在抢占式多任务操作系统中,线程的执行是由操作系统来调度的。程序员只能假设分配到每个线程的处理器时间是均等的。操作系统的线程是一种编程设施,通常表现为一种与操作系统相关的编程接口。和独立的进程不同,同一进程内的不同线程是可以共享内存的,因而通过内存的读写就可以实现线程之间的通信。另一种进程的定义方式是将它视为一个线程集合,再加上这些线程共享的内存和其他资源。

基于上述关于进程和线程之间关系的讨论,典型的操作系统可以通过一个简化版示意图展现出来,如图2.1所示。

图2.1中的操作系统有多个进程同时在执行,图中只显示了前3个进程,每个进程都指定了确定的计算机内存区域。实际的操作系统的内存系统要复杂得多,这里只是简化的概念模型。

每个进程都可以包含多个操作系统线程,图2.1中的每个进程至少各包含两个线程。当前时刻,进程2的线程1在CPU内核1上执行,而进程3的线程2则在CPU内核2上执行。操作系统会周期性地将不同的操作系统线程指定到不同的CPU内核上,从而让所有进程都能持续执行。

介绍完了操作系统线程和进程的关系,接下来将要介绍JVM的相关概念。Scala程序都是在JVM这种运行时上运行的。

启动一个新的JVM实例总会创建一个新进程。在这个JVM进程中,多个线程同时运行。JVM将该进程的线程表示为java.lang.Thread类。和其他语言的运行时不同,比如相比 Python,JVM 并没有实现定制的线程。相反,每个 Java 线程都直接被映射为一个操作系统线程。这意味着Java线程与操作系统线程非常类似,而且JVM也需要遵守操作系统相关的限制条件。

Scala是一种程序语言,在默认情况下其程序会被编译为JVM字节码。从JVM的角度看,Scala的编译结果和其他Java程序的编译结果没什么区别。所以,Scala程序可以直接调用其他Java库,在很多情况下,反过来操作也是可以的。Scala重用了Java中的线程API,理由有很多,首先是Scala可以直接和已有的Java线程模型交互,这就已经足够强大。其次,采用同样的线程API是出于兼容性考虑的,实际上,在Java线程API之外,Scala也不会引入新东西了。

本章后文将介绍如何用Scala创建JVM线程、如何执行线程及如何实现线程通信。这些内容会通过多个具体的实例来讲解。当然,Java专家完全可以忽略本章剩下的内容。

2.1.1 线程的创建和启动

每当创建新的JVM进程时,会默认创建几个线程。其中,非常重要的线程为主线程,它执行的是Scala程序的main方法。下面的程序将获得当前线程的名称,并将其输出到标准输出中。

    object ThreadsMain extends App {
      val t: Thread = Thread.currentThread
      val name = t.getName
      println(s"I am the thread $name")
    }

在JVM上,线程对象由Thread类表示。上面的程序使用静态的currentThread方法来获得当前线程对象的引用,并将其存储到局部变量t中。然后调用getName方法,获得此线程的名称。如果用SBT中的run命令运行这个程序,可得到如下输出。

    [info] I am the thread run-main-0

正常情况下,主线程的名称应该就是main方法的名称。这里之所以不同,是因为SBT在SBT进程内的另一个线程中运行了这个程序。为确保程序运行在独立的JVM进程中,需要设置SBT的fork选项。

    > set fork := true

重新执行SBT的run命令,可以看到如下输出。

    [info] I am the thread main

每个线程都会经历多种线程状态。当一个Thread对象被创建时,它的初始状态为new。当新线程对象开始执行时,它进入runnable状态。当线程对象完成执行时,它会变成terminated状态,并且无法再次执行。

启动一个独立的线程包含两步。第一步,需要创建一个Thread对象,它为线程栈和线程状态分配了内存。第二步,线程的启动需要调用此对象的start方法。下面示例中的ThreadsCreation应用展示了这个过程。

    object ThreadsCreation extends App {
      class MyThread extends Thread {
        override def run(): Unit = {
          println("New thread running.")
        }
      }
      val t = new MyThread
      t.start()
      t.join()
      println("New thread joined.")
    }

当一个JVM应用程序启动时,它会创建一种被称为主线程的特殊线程,此线程会调用指定类中的main方法,即本例中的ThreadsCreation对象。当App类被继承时,其main方法会自动出现在新的类中。在本例中,主线程首先创建MyThread类型的一个线程,然后将其赋值给t。

接下来,主线程通过调用t的start方法启动了这个新线程。调用start方法会进一步执行新线程的run方法。首先,操作系统会获悉t必然已经开始执行。当操作系统决定将新线程指定到某个处理器时,后面的事情程序员就管不了了,不过操作系统必须确保线程一定会被执行。主线程启动新线程t之后,它会调用新线程的join方法。此方法会让主线程暂停执行,直到 t 完成执行。换一句话说,join操作让主线程进入等待(waiting)状态,直到t终止。需强调的是,等待中的线程会将控制权交还给处理器,然后操作系统会将这个处理器指定给其他线程。

等待中的线程通知操作系统它正在等待某个条件,并且停止消耗中央处理器(Central Processing Unit,CPU)时钟,而不会不停地检查条件是否满足。

同时,操作系统会找到一个可用的处理器,并让它运行子线程。一个线程执行的指令来自run方法,因此需要重载这个方法。MyThread类的实例t在标准输出上输出字符串"New thread running.",并终止。然后操作系统被通知 t 结束了,于是主线程得以继续执行。操作系统将主线程重新设置为 running 状态,而且主线程会输出字符串"New thread joined."。这个过程如图2.2所示。

需要注意的是,"New thread running."和"New thread joined."总是依次先后输出。这是因为 join 方法会保证线程 t 在执行 join 之前结束。当运行ThreadsCreation时,速度太快了,以至于两个println语句几乎同时执行。那有没有可能使 println 语句的执行顺序取决于操作系统执行线程的策略呢?为了确认主线程确实在等t而不论操作系统怎么取舍,下面可以做一个实验。实验之前,先实现一个工具方法 thread,用于创建和启动一个新线程,因为当前的语法还是太烦琐了。这个thread方法的作用是在一个新创建的线程里执行一段代码。这一次,新线程将使用一个匿名的线程类,它在定义的同时就实例化了。

    def thread(body: => Unit): Thread = {
      val t = new Thread {
        override def run() = body
      }
      t.start()
      t
    }

这个thread方法的输入是一个代码块,它会创建一个新线程,启动线程,并在run方法中执行这个代码块,然后返回新线程的一个引用,这样,在后续的代码中还可以继续调用其join方法。

使用thread方法来创建和启动线程让代码变得简洁多了。为了让本章后文的示例更加精简,后文会一直使用此方法。不过,在生产环境中,用户在使用thread方法之前就需要三思了。在高性能和代码简洁之间需要进行权衡;简单操作往往没有必要使用轻量级语法,特别是要避免像创建线程这样相对耗时的操作。

下面开始实验,首先要确保操作系统中所有处理器都是空闲的。为实现这一点,可以使用 Thread 类的静态 sleep 方法,此方法会让当前执行的线程延缓执行指定的毫秒数,即让当前线程进入定时等待(timed waiting)的状态。然后,操作系统在调用sleep之后将处理器分配给其他线程使用。当然,延缓的时间要比操作系统典型的时间片长一些,比如10~100 ms。下面的代码展示了这个过程。

    object ThreadsSleep extends App {
      val t = thread {
        Thread.sleep(1000)
        log("New thread running.")
        Thread.sleep(1000)
        log("Still running.")
        Thread.sleep(1000)
        log("Completed.")
      }
      t.join()
      log("New thread joined.")
    }

ThreadsSleep应用的主线程创建并启动了一个新线程t,它先睡眠1s,然后输出一些文本,重复这个过程几次,最后结束。主线程仍和之前一样调用join方法,并输出字符串"New thread joined."。

注意,这里用到了第1章定义的log方法,此方法在输出一个字符串的同时,还会输出调用log的线程的名称。

不管重复执行多少次ThreadsSleep,最后输出的总是"New thread joined."。这个结果是确定的,同样的输入总是得到同样的输出,这和操作系统的调度策略无关。

不过,不是所有使用线程的程序都是确定性的。下面就是一个非确定性的应用程序。

    object ThreadsNondeterminism extends App {
      val t = thread { log("New thread running.") }
      log("...")
      log("...")
      t.join()
      log("New thread joined.")
    }

上述代码中,主线程中的log("...")语句和t线程中的log调用之间的顺序是不确定的。在一个多核处理器上多次执行这个程序,"New thread running."有可能出现在两个"..."之前、中间或之后。其执行结果可能如下。

    run-main-46: ...
    Thread-80: New thread running.
    run-main-46: ...
    run-main-46: New thread joined.

也有可能是另外一种顺序,如下所示。

    Thread-81: New thread running.
    run-main-47: ...
    run-main-47: ...
    run-main-47: New thread joined.

大部分多线程程序都是非确定性的,因此多线程编程是非常难的,原因涉及多个方面。规模太大的程序会让程序员难以推断其确定性属性,多个线程之间的交互过程往往过于复杂。而且有些程序在逻辑上就是非确定性的。比如,一个网络服务器不可能预先知道哪个客户端会发来第一个请求,它只能假设这些请求发来的次序是任意的,并且要尽快进行处理。而客户端请求即使内容不变,次序不同也会表现出不同的行为。

2.1.2 原子执行

前面提到了一种线程间的通信方式,即等到某一时刻同时终止。被连接(被调用join)的线程发出了自己已经运行结束的信息。不过,这个运行结束的信息实际上没太大用处,大部分时候,线程需要知道其他线程运行过程中的信息。比如一个线程在网页浏览器中渲染一个页面,它必须通知其他线程哪个统一资源定位系统(Uniform Resource Locator,URL)被访问了,这样其他线程就可以将访问过的URL渲染成不同的颜色。

事实上,线程的join方法还有另外一个属性。当一个线程被调用join方法时,它所有的内存写操作都会在join返回之前发生,而且这些写操作对调用join的那个线程是可见的。这个性质可由下面的示例展示出来。

    object ThreadsCommunicate extends App {
      var result: String = null
      val t = thread { result = "\nTitle\n" + "=" * 5 }
      t.join()
      log(result)
    }

在这个示例中,主线程永远不会输出null,因为join调用总是在log调用之前发生,而线程中的赋值操作又发生在join返回之前。这种使用线程结果进行通信的模式是一种非常基础的线程间通信方式。不过,这个模式非常受限,它只支持单向通信,而且不能在执行过程中互相通信。而无限制的双向通信是非常普遍的。比如让多个线程并发生成互不相同的唯一标识符。下面是一个错误示例的前半部分。

    object ThreadsUnprotectedUid extends App {
      var uidCount = 0L
      def getUniqueId () = {
        val freshUid = uidCount + 1
        uniqueUid = freshUid
        freshUid
      }
    }

在上述代码中,首先声明了一个uidCount变量,它存储着那些线程生成的最后一个唯一标识符。这些线程都要调用getUniqueId方法来计算另一个未使用过的标识符,然后更新uidCount变量。在这个例子中,读取uidCount来初始化freshUid,并将freshUid重新赋值给uniqueUid,这两个操作并不一定一起发生。更准确地讲,它们不一定是原子性执行的,因为随时都可能有其他线程插入进来,从而打乱原来的节奏。接下来要定义一个 printUniqueIds 方法,输入为一个数字 n,该方法会生成 n个唯一标识符,然后输出。这里使用了Scala的for推导式语句,将0~n的数字映射为唯一的标识符。最后,主线程会启动一个新线程 t,t 线程会调用 printUniqueIds方法,主线程也会并发调用printUniqueIds方法。代码如下所示。

    def printUniqueIds(n: Int): Unit = {
      {
      val uids = for (i<- 0 until n) yield getUniqueId()
        log(s"Generated uids: $uids")
      }
      val t = thread { printUniqueIds(5) }
      printUniqueIds(5)
      t.join()
    }

多次运行这个程序之后,会发现两个线程产生的标识符不一定是唯一的;有时候结果正确,输出 Vector(1, 2, 3, 4, 5)和 Vector(1, 6, 7, 8, 9),有时候则结果完全错误!这个程序的输出结果取决于各线程的运行时机。

竞态条件指的是并发程序的执行结果依赖于该程序中代码的执行调度的现象。

竞态条件不一定是错误的程序行为。不过,如果某个执行调度引起了意外的输出,则其竞态条件将被视为程序错误。上述示例中的竞态条件就是一种典型的程序错误,因为getUniqueId方法不是原子性的。假设t线程和main线程并发调用getUniqueId。在第一行中,它们同时读取uidCount,其初始值为0,于是它们都认定自己的freshUid变量应该为1。freshUid变量是一个局部变量,所以它被分配在线程栈上,每个线程都各自存有这个变量的实例。然后,两个线程都决定将值1写回到uidCount中,并且写入的顺序是不确定的。最终的结果是,两个线程都得到了一个不唯一的标识符1。过程如图2.3所示。

大部分程序员习惯于串行编程,所以在使用 getUniqueId 方法时就容易犯错误,显然串行思维和并发思维的差异极大。这种差异来自对getUniqueId方法原子性的假设。代码块的原子性执行意味着,当一个线程执行此代码块时,不能有其他线程插入进来。在原子性执行中,代码块中的表达式只能串行执行,从而保证uidCount得到正确的更新。getUniqueId方法内部的代码依次对值进行读、改和写操作,这部分代码在 JVM 上不是原子性的。因此,需要构造一些语言结构来确保代码的原子性。Scala 中支持这种原子性执行的基础的结构称为同步语句(synchronized statement),这种结构可作用于任何对象。于是,getUniqueId可以用同步语句重新实现,如下所示。

    def getUniqueId() = this.synchronized {
      val freshUid = uidCount + 1
      uidCount = freshUid
      freshUid
    }

synchronized确保了只有在没有其他线程同时执行这个代码块,或同一个this对象上没有其他同步代码块(synchronized block)被调用时,该代码块才会被执行。在这里,this对象是外围的单例对象,即ThreadsUnprotectedUid。但是在一般情况下,this 也可能是外围的类或特质对象。getUniqueId 的并发调用过程如图2.4所示。

也可以直接调用 synchronized,而省略前面的 this,编译器会自动推断出外围的对象。但是这种做法一般不推荐,因为在错误的对象上进行同步会造成难以检测的并发错误。

显式地声明同步语句的作用对象是一个好习惯,这样做可以避免让程序出现奇怪而难以检测的错误。

JVM保证了一个线程在某个对象x上执行synchronized语句时,该线程是x对象上唯一在执行synchronized语句的线程。如果线程T要在x上调用synchronized语句,而另一个线程S正在x上调用其他synchronized语句,那么线程T会进入阻塞(blocked)状态。一旦线程S完成synchronized语句的执行,那么JVM会让线程T开始执行它的synchronized语句。

JVM内创建的每个对象都附带有一个特殊的实体,称为内蕴锁(intrinsic lock)或监控器(monitor),其作用是确保同时只有一个线程在该对象上执行某个synchronized代码块。当一个线程开始执行synchronized代码块时,我们称该线程获得了x的监控器的所有权,换句话说,获得了x的监控权。当一个线程执行完synchronized代码块后,我们称它释放了监控器。synchronized语句是Scala在JVM上进行线程间通信的基本机制。只要在有可能出现多个线程同时访问并修改某个对象的字段时,就应该使用synchronized语句执行这些操作。

2.1.3 重排序

使用synchronized 语句也是有代价的,比如说修改变量 uidCount 时使用synchronized语句进行保护,其代价会比通常的无保护写操作更大。synchronized语句的性能代价取决于JVM的实现,但通常不会很大。有些程序员会误以为多个线程交替执行一些简单的程序代码并不会有什么负面作用,因而会避免使用synchronized。但这是错误的,就像上述的唯一标识符的例子一样。下面是非同步代码导致严重错误的一个示例。

假设下面一个程序中有两个线程,即t1和t2,它们访问两个布尔型变量a和b以及两个整型变量x和y。线程t1将变量a设置为true,然后读取b的值。如果b的值为true,则线程t1将y赋值为0,否则赋值为1。线程t2则相反,它首先将b赋值为true,然后当a为true时将x赋值为0,否则赋值为1。重复这个过程100000次,如下面代码所示。

    object ThreadSharedStateAccessReordering extends App {
      for (i <- 0 until 100000) {
        var a = false
        var b = false
        var x = -1
        var y = -1
        val t1 = thread {
          a = true
          y = if (b) 0 else 1
        }
        val t2 = thread {
          b = true
          x = if (a) 0 else 1
        }
        t1.join()
        t2.join()
        assert(!(x == 1 && y == 1), s"x = $x, y = $y")
      }
    }

这个程序不太好理解,需要仔细地讨论各种可能。通过分析线程t1和t2的可能出现的交替执行情况,可以得出结论:如果两个线程同时对a和b赋值,那么x和y都会被赋值为0。

这种结果表明两个线程几乎同时执行,如图2.5(a)所示。

另一种情况是假设线程t2执行得更快。在这种情况下,线程t2会将变量b赋值为true,并继续读取a的值。若访问a发生在线程t1对a赋值之前,那么t2读取的值将为false,这时会将x赋值为1。然后线程t1执行,它发现b为true,于是将y赋值为0。这些事件的发生如图2.5(b)所示。注意,若t1先启动,则结果类似,得到x=0和y=1,所以图2.5中并没有列出这种情况。

可以确定的是,无论两个线程执行代码的顺序如何,x=1和y=1应该不可能在最后结果中同时出现。因而,最后的断言也应该永远不会抛出异常。

不过,执行这个程序几次之后,会得到如下输出,竟然出现x和y同时为1的情况。

    [error] Exception in thread "main": assertion failed: x = 1, y = 1

这种结果违背了常识,无论如何也推理不出这种结果。问题出在 JMM 规范上,如果在某个特定的线程内代码语句的顺序不影响程序的串行语义,那么执行过程中JVM是允许这类重排序的。这是因为某些处理器并不会总以程序代码的顺序执行指令。此外,程序并不需要将所有的更新立即写到主存中,而是会将它们临时保存在处理器的寄存器中。这会最大程度地提升程序的执行效率,从而实现更好的编译优化。

既然这样,那么对于多线程程序要怎么样才能正确推理呢?刚才犯的错来自一个假设,就是一个线程的写操作会立刻反映到内存中并对另一个线程可见,但这种同步并不会自然而然发生。synchronized语句就是实现正确的同步的基本方式。在对象x上执行 synchronized 语句产生的写操作不仅是原子性的,而且对所有在 x 上执行synchronized语句的线程而言都是可见的。将每个赋值语句都放在线程t1和t2的synchronized语句中,程序的行为就恢复正常了。

当多线程访问(读或写)某个共享状态时,记得在某个对象x上使用synchronized语句。这样,可确保任意时刻至多只有一个线程在x上执行synchronized语句。这还保证了线程T在对象x上进行的所有内存写操作,对所有其他随后也在x上执行synchronized语句的线程而言都是可见的。

在本章后文以及第3章的内容中,将会看到其他的同步机制,比如易失变量(volatile variable)和原子性变量。2.2节会介绍其他 synchronized 语句的使用案例,以及对象监控器。

2.2 监控器和同步

本节将会详细探讨如何用 synchronized 语句进行线程间通信。如前文所述,synchronized语句既保证了不同线程的写操作的可见性,还限制了共享内存的并发访问。一般来说,对共享资源的访问进行限制的同步机制称为锁。锁还被用于保证没有两个线程同时执行同一段代码,即这两个线程对这段代码的执行是互斥的。

前文提到过,JVM上的每个对象都有一个特殊的内置监控器锁,也称为内蕴锁。当一个线程访问对象x上的synchronized语句时,若没有其他线程拥有x上的监控器,那么它就会获得此监控器。否则,该线程会等待监控器被释放。在获得监控器的同时,该线程也能看到释放该监控器的前一线程的所有内存写操作。

synchronized 语句的一个自然而然的性质是它可以嵌套。一个线程可以同时拥有属于不同对象的多个监控器。这对基于简单组件的大型系统而言是很有用的。用户并不能提前预知不同软件组件使用了哪些监控器。比如设计一个记录资金流水的在线银行系统,在系统中维护一个资金流水链表,用可变ArrayBuffer来实现。这个银行系统不会直接修改流水,但是会用一个logTransfer方法来添加新消息,此方法的调用与资金变化同步。ArrayBuffer 是针对单线程而设计的容器,所以需要对它进行并发写保护。下面的代码定义了一个logTransfer方法。

    object SynchronizedNesting extends App {
      import scala.collection._
      private val transfers = mutable.ArrayBuffer[String]()
      def logTransfer(name: String, n: Int) = transfers.synchronized {
        transfers += s"transfer to account '$name' = $n"
      }
    }

银行系统中除了日志模块之外,还用Account类来表示账户。Account对象中保存了账户所有者name和资金数目money。为了往账户中存钱,系统使用add方法来获得Account对象的监控器,并修改其money字段。银行的业务流程要求对大宗交易进行特殊处理,即如果转账数目超过10个货币单元,就需要记录日志。下面的代码定义了Account类及其add方法,add方法用于在Account对象上添加数目为n的货币单元。

    class Account(val name: String, var money: Int)
    def add(account: Account, n: Int) = account.synchronized {
      account.money += n
      if (n > 10) logTransfer(account.name, n)
    }

add方法在synchronized语句内部调用logTransfer,而logTransfer会首先获得 transfers 的监控器。值得注意的是,这个过程不需要释放account 的监控器。如果transfers的监控器由另一个线程所有,当前线程将进入阻塞状态,而且它不会释放之前获得的监控器。

下面的示例中,应用程序创建两个独立的账户,由3个线程执行转账操作。一旦所有线程完成转账操作,主线程就将在日志中记录。

    // 银行系统示例代码(续)
    val jane = new Account("Jane", 100)
    val john = new Account("John", 200)
    val t1 = thread { add(jane, 5) }
    val t2 = thread { add(john, 50) }
    val t3 = thread { add(jane, 70) }
    t1.join(); t2.join(); t3.join()
    log(s"--- transfers ---\n$transfers")

此例中的synchronized语句避免了线程t1和t3并发修改Jane的账户。线程t2和t3还会访问transfers日志。这个简单的例子表明了嵌套的好处,因为用户并不知道银行系统中还有哪些组件可能使用了transfers日志。为了封装代码并提高代码的可重用性,独立的软件组件不应该显式地对银行转账日志操作进行同步;相反,同步应该隐藏在logTransfer方法内。

2.2.1 死锁

在上述银行系统的示例中,一个比较好的地方是 logTransfer 方法绝不会尝试获取transfers的监控器之外的其他锁。一旦获得了监控器,线程就开始修改transfers日志,然后释放监控器;在这种嵌套锁的栈中,transfers总是最后出现。由于logTransfer方法是唯一对transfers进行同步的方法,因此它在同步transfers时不会无限阻塞其他线程。

死锁是一种经常出现的情况,两个或多个执行过程互相等待对方完成各自的操作。等待的原因在于每个执行过程都获得了某个资源的唯一访问权,而其他执行过程又恰好需要对方占有的那个资源。以日常生活为例,假设两位同事坐在咖啡馆中开始吃午餐(需要同时使用刀和叉子),一个同事拿叉子,另一个同事拿刀。双方都在等对方吃完饭,但又不交出自己的餐具,于是陷入了死锁,两个人都无法吃完午餐。至少,在领导来之前这个问题是无解的。

在并发编程中,两个线程同时获得两个不同的监控器,然后尝试获得对方的监控器时,死锁就发生了。双方都不释放自己的监控器,于是这两个线程进入阻塞状态,直到其中一个监控器被释放。

使用logTransfer方法绝不会造成死锁,因为多个线程在处理多个账户时也只会尝试获得同一个监控器,而这个监控器终究是会被释放掉的。现在,扩展介绍前面的银行系统示例,支持两个账户之间转账,代码如下。

    object SynchronizedDeadlock extends App {
      import SynchronizedNesting.Account
      def send(a: Account, b: Account, n: Int) = a.synchronized {
        b.synchronized {
          a.money -= n
          b.money += n
        }
      }
    }

这里从前文的示例中导入了Account类。send方法是原子性的,它将数目为n的钱从账户 a 转给账户 b。要实现这一点,需要同时在两个账户上触发 synchronized语句,以确保没有其他线程可以并发地修改其中任意一个账户。代码如下所示。

    val a = new Account("Jack", 1000)
    val b = new Account("Jill", 2000)
    val t1 = thread { for (i<- 0 until 100) send(a, b, 1) }
    val t2 = thread { for (i<- 0 until 100) send(b, a, 1) }
    t1.join(); t2.join()
    log(s"a = ${a.money}, b = ${b.money}")

现在,假设有两个新的银行客户 Jack和 Jill,他们在开户之后很喜欢新的电子银行平台,于是登录之后互相转账小笔金额进行测试,他们“狂按”了100次转账键。很快,问题就来了。线程t1和t2分别执行Jack和Jill的请求,同时触发send方法,只不过转账的方向是反的。比如线程t1锁住账户a,而t2锁住账户b,但都不能锁住对方的账户。让Jack和Jill惊讶的是,新的转账系统并没有看上去那么美好。读者如果运行这个示例,也只能以关闭终端而告终,然后重启SBT。

当两个或多个线程获得资源控制权,在不释放自己的资源的同时却又循环申请对方的资源时,死锁就发生了。

那么,如何防止死锁发生呢?回忆一下,在银行系统的最初版本中,申请监控器的顺序是良定义的。单个账户的监控器被一个线程获得之后,transfers 的监控器才有可能被其他线程获得。有理由相信,只要资源的访问存在确定的顺序,就不会有发生死锁的风险。在线程S获得资源X之后,线程T要想访问X就只能等待,而此时S绝不会尝试访问T已经获得的任何资源Y,因为Y < X,所以S只会尝试获取资源Z(Z > X)。资源之间的访问顺序打破了潜在的死循环,这是避免死锁的必要条件。

因此,需要在所有资源之间建立一个全序,这可以保证不会出现几个线程循环互相等待其他线程已经获得的资源的情况。

在上面的例子中,同样需要在不同账户之间建立顺序。一种方法是使用之前定义的getUniqueId方法。

    import SynchronizedProtectedUid.getUniqueId
    class Account(val name: String, var money: Int) {
      val uid = getUniqueId()
    }

这个新定义的Account类保证了没有两个账户拥有同样的uid字段,不论账户是哪个线程创建的。下面的send方法就是根据uid字段的顺序来申请资源的,这样就可以避免造成死锁。

    def send(a1: Account, a2: Account, n: Int) {
      def adjust() {
        a1.money -= n
        a2.money += n
      }
      if (a1.uid < a2.uid)
        a1.synchronized { a2.synchronized { adjust() } }
      else a2.synchronized { a1.synchronized { adjust() } }
    }

经过银行软件工程师的快速改进之后,Jack 和 Jill 又可以开心地互相转账了,阻塞的线程循环再也没有出现。在任何并发系统中,只要多个线程不释放自己已经获得的资源却又无限等待其他资源,就不可避免会造成死锁。不过,虽然死锁需要尽量避免,但是死锁并没有想象中那么可怕。从死锁的定义来看,值得安慰的是,出现死锁的系统不会再进一步执行了。开发者可以通过保存运行中的JVM实例的堆数据,分析线程的栈,然后快速解决Jack和Jill的问题;至少,死锁问题是很容易发现的,即使是在生产环境中也是如此。但竞态条件下的错误就不一样了,系统运行很长时间之后,其影响才会逐渐显现出来。

2.2.2 保护块

创建新线程的代价比创建 Account 之类的轻量级对象要大得多。高性能的银行系统要求能够快速响应,若对每个请求都创建新线程会拖慢系统的运行速度,特别在需要1 s内同时处理数千个请求时更是如此。同一线程应该能够被多个请求重用,这种可重用的线程的集合通常称为线程池。

在下列示例中,将定义一种特殊的称为工作(worker)线程的特殊线程,它将响应其他线程的请求,执行一个代码块。这里使用Scala标准库collection包中的可变类Queue来存储被调度的代码块。

    import scala.collection._
    object SynchronizedBadPool extends App {
      private val tasks = mutable.Queue[() => Unit]()

这里的代码块被表示为() => Unit类型的函数。worker线程会反复执行poll方法,它会对tasks进行同步,以检查队列是否为空。从poll方法的定义可以看出,synchronized语句也可以有返回值。在本例中,若还有任务未完成,就返回一个Some类型的可选值,否则返回None。Some对象包含了待执行的代码块。

    val worker = new Thread {
      def poll(): Option[() => Unit] = tasks.synchronized {
        if (tasks.nonEmpty) Some(tasks.dequeue()) else None
      }
      override def run() = while (true) poll() match {
        case Some(task) => task()
        case None =>
      }
    }
    worker.setName("Worker")
    worker.setDaemon(true)
    worker.start()

在上面的代码中,worker线程在启动之前被设置为守护线程。一般而言,JVM进程并不会在主线程终止时结束,而是会等所有守护线程全部结束。当 asynchronous方法向tasks中发送任务后,worker线程会执行tasks中未完成的代码块,所以要将worker线程设置为守护线程。

    def asynchronous(body: => Unit) = tasks.synchronized {
      tasks.enqueue(() => body)
    }
    asynchronous { log("Hello") }
    asynchronous { log(" world!")}
    Thread.sleep(5000)

执行上面的示例,可以看到worker线程会输出Hello和world!。同时读者可以听一听自己计算机的风扇的声音,这会儿应该开始响一会儿了。打开Windows操作系统的任务管理器,或在UNIX操作系统的终端中执行top命令。可以发现一个CPU几乎被一个java进程占满了。原因应该很清楚了,等到worker线程完成任务,它会继续检查队列中是否有任务。我们称worker线程这样的状态为忙等待。忙等待是不必要的,因为这会无止境地占用处理器资源。不过,主线程结束时这些守护进程难道不应该也终止吗?一般情况下是这样的,但是本示例是在SBT所在的JVM进程中执行的,而SBT本身并未终止。而且SBT也有自己的非守护进程,所以这里的worker线程不会结束。为了让SBT在新进程中执行run命令,输入下列命令。

    set fork := true

再次执行上述示例,这次主线程结束时worker线程也会跟着结束。但是忙等待的问题仍然存在,因为在大型系统中主线程不会很快结束。重复创建新线程是比较浪费资源的,而忙等待线程只会更浪费资源。只需几个这样的线程就能很快降低系统性能。忙等待只在极少数情况下是合理的,如果读者还是不确定它是否一定很危险,可以在自己的笔记本上执行上述示例,然后观察一下电池的耗尽速度。在这么做之前,记得保存好正在编辑中的文件,因为突然断电会导致数据丢失。

worker 线程更好的一种状态是休眠状态,类似于调用 join 之后线程的状态。worker只会在tasks队列不为空时才需要被唤醒。

Scala对象(以及一般的JVM对象)支持两个特殊的方法,称为wait和notify,它们分别用于让线程休眠和唤醒休眠线程。当前线程只有拥有对象x的监控器,才允许执行x的这两个方法。换句话说,当线程T调用某对象的wait方法时,它会释放x的监控器,然后进入休眠状态,直到另一线程S调用同一对象的notify方法。线程S通常用于为 T 准备数据。如下面的示例,主线程传递 Some 类型的消息,然后 greeter线程将其输出。

    object SynchronizedGuardedBlocks extends App {
      val lock = new AnyRef
      var message: Option[String] = None
      val greeter = thread {
        lock.synchronized {
          while (message == None) lock.wait()
          log(message.get)
        }
      }
      lock.synchronized {
        message = Some("Hello!")
        lock.notify()
      }
      greeter.join()
    }

上面的代码中出现了一种新的 AnyRef 类型的锁 lock(映射到 java.lang. Object类),线程使用这个锁的监控器。线程greeter首先会申请获得这个锁的监控器,并检查message是否被设置为None。如果为None,则什么也不需要输出,然后线程greeter会调用lock上的wait方法,此时lock的监控器被释放。而主线程(之前在synchronized语句中被阻塞)则获得lock的监控器的所有权,它会设置message的值,然后调用notify方法。当主线程离开synchronized代码块时,它会释放lock。这会导致greeter被唤醒、获得lock,并检查是否又有消息了,如果有的话就输出。因为greeter尝试获得的监控器就是主线程之前释放掉的,主线程对message的设置发生在greeter线程查看消息之前。于是,可以看出线程greeter将会看到主线程设置的消息。在此例中,无论哪个线程先执行synchronized代码块,线程greeter都将输出Hello!。

wait的一个重要性质是它会引起虚假唤醒。有时候,JVM允许在没有调用notify的情况下唤醒一个执行了wait的休眠线程。为了防止出现这种情况,需要用一个while循环反复检查状态,然后结合 wait 使用,如上面代码所示。使用一个 if 语句是不够的,因为即使message的值为None,一个虚假唤醒也将允许线程执行message.get。

当线程发现满足唤醒条件时,它会获得监控器的所有权,这样就可以保证检查操作的原子性。注意,检查条件的那个线程必须获得监控器才能被唤醒。如果没有立即得到监控器,它会进入阻塞状态。

若synchronized语句在调用wait之前反复检查条件,那这个synchronized语句称为保护块。下面,就可以用保护块来提前避免Worker线程进入忙等待状态。使用监控器的Worker线程的完整代码如下所示。

    object SynchronizedPool extends App {
      private val tasks = mutable.Queue[() => Unit]()
      object Worker extends Thread {
        setDaemon(true)
        def poll() = tasks.synchronized {
          while (tasks.isEmpty) tasks.wait()
          tasks.dequeue()
        }
        override def run() = while (true) {
          val task = poll()
          task()
        }
      }
      Worker.start()
      def asynchronous(body: =>Unit) = tasks.synchronized {
        tasks.enqueue(() => body)
        tasks.notify()
      }
      asynchronous { log("Hello ") }
      asynchronous { log("World!") }
      Thread.sleep(500)
    }

在上面的示例中,声明的 Worker 线程是应用程序中的一个单例对象。和之前不一样的是,poll方法在tasks对象上调用了wait,然后一直等到主线程在asynchronous方法中往tasks中加了一个代码块,并调用notify。执行这个示例,再看一看CPU使用情况。如果在执行忙等待的示例后被迫重启了 SBT(假设现在电池还有电),则可以看到Java进程的CPU使用量是0。

2.2.3 线程中断和平滑关闭

在上一个示例中,Worker线程在它的run方法中无穷循环,永不终止。读者可能会不以为意,反正Worker在休眠时并没有使用CPU,而且Worker是一个守护线程,总会在程序退出时结束。

不过,守护线程的栈空间在程序退出之前都会一直存在。如果休眠线程太多,内存就会被用完。结束休眠线程的一种方法是将它中断,代码如下所示。

    Worker.interrupt()

当一个线程等待或计时等待时,调用其 interrupt 方法会抛出一个 InterruptedExption异常。此异常可以被捕获和处理,但在这里,它的作用是终止线程Worker。不过,如果对运行中的线程调用这个方法,这个异常就不会产生,而是设置线程的interrupt 标志。对于不阻塞的线程必须周期性地用 isInterrupted 方法查询interrupt标志。

另一种结束线程的方法称为平滑关闭。在平滑关闭中,一个线程设置终止条件,然后调用notify来唤醒工作线程。然后,工作线程会释放它所有的资源,并顺利地结束。定义一个称为terminated的变量,如果其值为true,就需要结束线程。在等待tasks之前,poll方法会额外地检查此变量,如果Worker线程应该继续运行,poll方法会选择性地返回一个任务。代码如下所示。

    object Worker extends Thread {
      var terminated = false
      def poll(): Option[() => Unit] = tasks.synchronized {
        while (tasks.isEmpty && !terminated) tasks.wait()
        if (!terminated) Some(tasks.dequeue()) else None
      }
    }

下面重新定义run方法,它会在模式匹配中检查poll方法是否返回Some(task)。在此run方法中,不再使用while循环,而是在poll返回Some(task)时使用尾递归调用run方法。

    import scala.annotation.tailrec
    @tailrec override def run() = poll() match {
      case Some(task) => task(); run()
      case None =>
    }
    def shutdown() = tasks.synchronized {
      terminated = true
      tasks.notify()
    }

然后,主线程就可以在Worker线程上调用同步化的shutdown方法,从而发送终止线程的请求。这里不再需要将Worker线程设置为守护线程,Worker总是会自己结束运行的。

为了确保这些工具线程能够不进入竞态条件,并正确地结束,可以使用平滑关闭的思想。

如果出现无法用notify唤醒线程的情况,则应该使用interrupt方法,而不用平滑关闭这一方法。比如,线程在一个InterruptibleChannel对象上阻塞I/O,这时,被此线程调用wait方法的那个对象是隐藏的。

Thread类还定义一个 stop方法,这个方法不推荐使用,它会立刻终止线程,并抛出ThreadDeath异常。用户要避免使用这个方法,因为它会中断线程在任意点的运行,容易让程序数据处于不一致的状态。

2.3 易失变量

JVM还提供了一种比synchronized代码块更轻量级的同步形式,称为易失变量。易失变量可供于原子性读写,常用于表示状态标志。比如,标记某个计算是否完成、是否取消。这种方法有两个好处,第一是单个线程中易失变量的读操作和写操作不会重排序,第二是易失变量的写操作对其他线程是立即可见的。

被标记为volatile的变量的读操作和写操作绝不会被重排序。如果对一个易失变量v进行写操作W,这被另一个线程通过对变量v的读操作R观察到,那么在W之前该变量的所有写操作都可以在R之后观察到。

在下面的示例中,对多个页面的文本搜索至少一个感叹号字符。这些文本也许来源于某位流行科幻小说家,多个线程同时开始在各个页面中寻找感叹号字符。一旦有一个线程找到感叹号了,其他线程将停止搜索。

    class Page(val txt: String, var position: Int)
    object Volatile extends App {
      val pages = for (i<- 1 to 5) yield
        new Page("Na" * (100 - 20 * i) + " Batman!", -1)
      @volatile var found = false
      for (p <- pages) yield thread {
        var i = 0
        while (i < p.txt.length && !found)
          if (p.txt(i) == '!') {
            p.position = i
            found = true
          } else i += 1
      }
      while (!found) {}
      log(s"results: ${pages.map(_.position)}")
    }

每个页面都由一个Page类来表示,包含一个特殊的position字段,其用于保存搜索到的感叹号的位置。found标志表示已经由某个线程找到了感叹号。found标志之前添加了@volatile 注解,即将其声明为易失变量。当某个线程在一个页面中找到感叹号了,该页面的position会保存搜索到的感叹号的位置,而found标志会被设置为 true,这样其他线程就会提前结束搜索。不过,所有线程搜索完整个页面文本的情况是完全可能出现的,只是在这之前,它们发现found被设置为true的概率更大。因而,至少会有一个线程保存了感叹号的位置。

就这个示例而言,主线程会在找到感叹号(found为true)之前保持忙等待状态。当其找到之后,主线程会输出找到的感叹号的位置。注意,在那些线程中,position的写操作发生在found的写操作之前,从而也发生在主线程发现found为true之前。这意味着,主线程总是能够检测到是否有哪个线程设置了found,然后会输出至少一个感叹号的位置,而不会出现所有位置都是-1的情况。

本章的 ThreadSharedStateAccessReordering 示例可通过将所有变量声明为volatile来修复。在2.4节中可以看到,这么做可以保证a和b上读写操作的正确顺序。和Java不一样,Scala支持声明局部(本示例指的是for循环的闭包)易失变量。对于闭包或嵌套类中的每个局部易失变量,Scala都会创建一个带易失字段的堆对象。于是,可以称这些局部易失变量被提升为对象。

对易失变量进行读操作的代价通常是非常小的。不过,在大部分情况下,用户还是应该采用 synchronized 语句;易失变量的语义比较隐晦,且使用其时容易出错。另外,一个易失变量无法正确地实现 getUniqueId,而多个易失变量的读写操作不是原子性的,这时仍然需要同步。

2.4 JMM

虽然本章并没有明确说明,但是实际上已经定义 JMM 的大部分内容。那么,到底什么是内存模型呢?

一个语言的内存模型指的是一种规范,它描述了变量的写操作在什么情况下对其他线程可见。读者可以认为一个处理器修改了变量v之后就会立刻改变其相应的内存,然后其他处理器可瞬时发现v的新值。这种内存一致性模型称为顺序一致性。

但从ThreadSharedStateAccessReordering示例中可以看到,顺序一致性实际上只是用户的“一厢情愿”,处理器和编译器实际上并不是这么做的。写操作极少会立即作用于主存;在计算机系统中,处理器与主存之间存在着一个多级缓存结构,利用缓存可以提高性能,但只保证数据会最终写入主存。为了在不改变串行语义的情况下获得最优性能,编译器会利用缓存来推迟或避免主存写操作。这么做是合理的。虽然本书中的示例中有很多同步原语,但是在实际的程序中,每个线程用于实际计算的时间远远多于通信时间。

为了既保证并发程序的行为的可预测性,又让编译器最大程度地优化程序,内存模型实际上是一种权衡的产物。并不是每门编程语言或每个平台都有内存模型。比如,纯函数式编程语言就不支持变量修改,所以根本就不需要内存模型。

不同处理架构会导致不同的内存模型;如果不定义好 synchronized 语句或易失读写等操作的精确语义,则几乎无法正确编写出对所有平台都适用的 Scala 并发程序。Scala 继承了 JVM 的内存模型,该内存模型定义了一系列程序中各种行为的前发生(happens-before)关系。

在JVM中,这些行为包括易失读写、对象监控器的获取与释放、启动线程和等待线程结束等。如果一个行为A发生于行为B之前,则行为B可以发现行为A的内存写操作。不管程序运行在哪种机器上,这些前发生关系都是合法的;JVM有义务确保这些关系的正确性。虽然前面已经提到这些规则的部分内容,但这里还是有必要给出一个概述。

程序指令顺序(program order):在线程中,程序指令顺序决定了它的各个行为之间的先后发生的顺序。

监控器锁定(monitor locking):对一个监控器的解锁发生在此监控器后续被锁定之前。

易失字段(volatile fields):易失字段的写操作发生在此易失字段后续的读操作之前。

线程启动(thread start):线程的start方法发生在此线程中所有行为之前。

线程终止(thread termination):一个线程中的任意行为发生在另一个线程完成其join方法之前。

传递性(transitivity):如果行为A发生在行为B之前,而行为B发生在行为C之前,则行为A发生在行为C之前。

虽然“前发生关系”这个名字有些奇怪,但这种机制确保了线程之间能够发现对方的写操作。但它并不用于建立程序中不同语句之间的时序关系。当我们说写操作A发生于读操作B之前,那么写操作A的结果对读操作B一定是可见的。而A是否在B之前发生则取决于程序的执行过程。

前发生关系描述了不同线程之间的写操作的可见性。

此外,JMM还保证易失读写操作,以及监控器锁定和解锁都不会重排序。前发生关系确保了非易失读写操作也不能任意重排序。具体而言,前发生关系进行了如下保证。

● 非易失读操作不能通过重排序出现在程序指令顺序更靠前的程序易失读操作(或监控器锁定)之前。

● 非易失写操作不能通过重排序出现在程序指令顺序更靠后的易失写操作(或监控器解锁)之后。

还有一些高层的构造也构成了前发生关系,它们是基于上述规则而实现的。比如,interrupt 的调用发生在被中断的线程检测到此调用信号之前,这是因为在传统的实现方式中,interrupt的调用是通过监控器来唤醒线程的。

后文介绍的Scala并发API也在各种方法调用之间建立了前发生关系。在这些情况下,程序员需要自己保证一个变量的写操作与所有此变量的新值的读操作构成前发生关系。如果做不到这一点,就会出现所谓的数据争用。

不可变对象和终态字段

前面介绍了使用前发生关系避免数据争用的必要性,但凡事有例外。如果一个对象只包含终态字段,而且对外围对象的引用在构造函数完成之前对其他线程不可见,那么,此对象可视为不可变的,共享时就无须使用同步了。

在Java中,终态字段通过final关键字来标识。在Scala中,将一个对象字段声明为final表示此字段的getter方法不能在子类中重载。如果一个字段被声明为val,则它本身也是终态的。它们的区别如下面代码所示。

    class Foo(final val a: Int, val b: Int)

上面的类定义对应于下列Scala编译器编译出来的Java类。

    class Foo { // 以下为Java代码
      final private int a$;
      final private int b$;
      final public int a() { return a$; }
      public int b() { return b$; }
      public Foo(int a, int b) {
        a$ = a;
        b$ = b;
      }
    }

注意,a 和 b 字段在 JVM 层面都是终态的,因此在共享时无须同步。区别在于 a的getter方法无法在Foo的子类中重载。Scala中的重新赋值意义下的终态和重载意义下的终态完全是两码事。

因为,Scala同时采用了函数式编程和面向对象编程,它的很多语言特性实际上对应于不可变对象。一个匿名函数可以捕捉到外围类或被提升对象的引用,如下面代码所示。

    var inc: () => Unit = null
    val t = thread { if (inc != null) inc() }
    private var number = 1
    inc = () => { number += 1 }

局部变量number被匿名函数所捕捉,所以需要进一步提升。最后一行代码会被编译成如下匿名Function0类的实例。

    number = new IntRef(1)   // 捕捉到的局部变量提升成了对象
    inc = new Function0 {
      val $number = number  // 注意,val声明表示终态字段
      def apply() = $number.elem += 1
    }

这里的inc赋值和线程t对inc的读操作之间并不存在前发生关系。不过,如果线程t发现inc不为null,调用inc仍然是正确的。这是因为$number字段已经正确初始化,它存储在不可变的匿名函数对象中。Scala编译器保证匿名函数值中只包含正确初始化的终态字段。匿名类、自动装箱(auto-boxed)原语以及值类(value class)都有同样的理念。

不过,在本书使用的 Scala 中,某些容器虽然号称不可变,但是不能在不同步的情况下共享,比如List和Vector。虽然它们的外部API不允许对其进行修改,但是它们包含了非终态字段。

即使一个对象貌似不可变,稳妥起见,也应尽量通过正确的同步机制实现线程间的共享。

2.5 小结

本章介绍了关于并发编程中的常用概念,包括如下内容。

● 如何创建线程、启动线程以及等待线程终止。

● 如何通过修改共享内存和通过synchronized语句来实现线程间通信,注意,通信会导致线程陷入阻塞状态。

● 研究了利用锁的排序来避免死锁,以及利用卫式代码块来避免忙等待。

● 如何通过平滑关闭来终止线程。

● 何时用易失变量来通信。

● 如何避免竞态条件和数据争用等不必要的线程交互,这些情况是由于没有同步而产生的。

最重要的是,本章最后总结了理解多线程程序语义的正确方式是使用 JMM 定义的前发生关系。

本章介绍的语言原语和API都是比较底层的,其是在JVM上用Scala并发编程的基石,但用户只在少数情况下需要直接使用这些工具。一种情况是用户想要设计自己的并发编程库;另一种情况是不得不继续使用基于这些工具的旧版本的 API。虽然读者应尽量使用后文介绍的并发框架来编写并发 Scala 应用程序,但是本章介绍的思想对理解高层工具是很有帮助的,因为读者对后台的运行机理有更深入的见解。

如果读者想了解更多关于JVM和JMM的并行机制,推荐阅读布赖恩·格茨(Brian Goetz)等人编写的《Java 并发编程实战》。而对于进程、线程以及操作系统内部机制,推荐图书为由亚伯拉罕·西尔伯沙茨(Abraham Silberschatz)等人编写的《操作系统概念》。

第3章将介绍并发程序编程中更高级的工具,我们将学习利用执行器(executor)来避免直接创建线程,使用并发容器来实现线程安全的数据访问,以及用于防止死锁同步的原子性变量。这些高级抽象构造将解决本章中的基础并发原语存在的一些问题。

2.6 练习

在下列练习中,读者需要基于基本的JVM并发原语来实现高级并发抽象。有些练习提到了串行编程抽象概念的并发版本,并强调了串行编程和并行编程之间的重要区别。虽然读者不需要依次完成这些练习,但有些练习有先后依赖关系。

1.实现一个parallel方法,它的参数是两个计算代码块a和b,此方法用两个新线程来执行这两个代码块,并用一个二元组返回两者的计算结果,其声明如下。

    def parallel[A, B](a: =>A, b: =>B): (A, B)

2.实现一个 periodically 方法,它的参数是一个以 ms 为单位的时间参数duration,以及一个计算代码块b。此方法每隔duration ms用线程执行b。此方法的声明如下。

    def periodically(duration: Long)(b: =>Unit): Unit

3.实现一个SyncVar类,接口如下。

    class SyncVar[T] {
      def get(): T = ???
      def put(x: T): Unit = ???
    }

SyncVar对象用于在两个或多个线程之间交换数据。刚创建的SyncVar是空的,满足如下要求。

● 调用空SyncVar对象的get方法会抛出异常。

● 调用put方法可为空SyncVar对象添加值。

当在SyncVar对象中添加了一个值之后,其被称为非空对象,满足如下要求。

● 调用get方法返回当前值,然后又变回空对象。

● 调用put方法会抛出异常。

4.练习3中的SyncVar对象使用起来有诸多不便,因为状态不对时其总会抛出异常。为SyncVar实现两个方法,isEmpty和nonEmpty。然后,实现一个生产者线程和一个消费者线程,由生产者线程将区间0 until 15中的数字传递给消费者线程并输出。

5.使用isEmpty和nonEmpty方法会导致忙等待,为SyncVar类添加如下方法。

    def getWait(): T
    def putWait(x: T): Unit

这些方法的语义类似于对应的 get 或 put 方法,只不过调用时不会抛出异常,而是进入等待状态,它们分别在SyncVar对象为空或不为空时立刻返回。

6.SyncVar对象一次最多只能存一个值。实现一个SyncQueue类,它的接口与SyncVar相同,但最多可存n个值。参数n是由SyncQueue的构造函数指定的。

7.2.2.1节中的send方法用于在两个账户之间汇款。而sendAll方法用于将多个账户的钱汇到同一个目标账户。实现sendAll方法,确保不会发生死锁。sendAll方法的声明如下。

    def sendAll(accounts: Set[Account], target: Account): Unit

8.2.2.2节中介绍的asynchronous方法使用先入先出(First Input First Output, FIFO)队列存储多个任务;在一个提交的任务被执行之前,所有之前提交的任务都需要被执行。在某些情况下,需要为不同任务指定不同优先级,比如让高优先级任务在提交到任务池之时就立刻执行。实现一个 PriorityTaskPool 类,它有一个 asynchronous方法,声明如下。

    def asynchronous(priority: Int)(task: =>Unit): Unit

只有一个工作线程从任务池中选择任务并执行。当此工作线程从任务池中选择了一个新的任务时,此任务必须具有最高优先级。

9.扩展PriorityTaskPool类,使它支持多个工作线程p,p由PriorityTaskPool的构造函数指定。

10.扩展PriorityTaskPool类,使它支持如下shutdown方法。

    def shutdown(): Unit

当shutdown方法被调用时,优先级比important更高的任务必须要完成,而剩下的任务必须放弃。整型参数important由PriorityTaskPool类的构造函数指定。

11.实现一个ConcurrentBiMap容器,它是一个并发的双向映射。在此映射中,每个键对应唯一的值,反之亦然。映射上的操作必须是原子性的。这个并发双向映射的接口如下。

    class ConcurrentBiMap[K, V] {
      def put(k: K, v: V): Option[(K, V)]
      def removeKey(k: K): Option[V]
      def removeValue(v: V): Option[K]
      def getValue(k: K): Option[V]
      def getKey(v: V): Option[K]
      def size: Int
      def iterator: Iterator[(K, V)]
    }

读者需要确保这个并发双向映射不会出现死锁。

12.为练习11中的并发双向映射添加一个replace方法。此方法应该能够原子性地将一个键值二元组换成另一个键值二元组。

    def replace(k1: K, v1: V, k2: K, v2: V): Unit

13.测试练习12中的并发双向映射,在测试中,多个线程并发地插入上百万个键值二元组。当所有键值二元组插入完成时,使用另一个批次的线程将映射中的键、值反转,即将键值二元组(k1,k2)变成(k2,k1)。

14.实现一个cache方法,将任意函数转换成带记忆功能的版本。当结果函数以任意参数被第一次调用时,其调用与原函数无异。只不过结果被记下来了,下一次调用时,如果还使用同样的参数,那么返回的将是之前记忆的结果。

    def cache[K, V](f: K => V): K => V

读者需确保在多个线程并发调用时cache方法也能正常工作。

相关图书

GPU编程实战(基于Python和CUDA)
GPU编程实战(基于Python和CUDA)
C++并发编程实战(第2版)
C++并发编程实战(第2版)
Android 并发开发
Android 并发开发
OpenCL实战
OpenCL实战
C++ AMP:用Visual C++加速大规模并行计算
C++ AMP:用Visual C++加速大规模并行计算
Intel Xeon Phi协处理器高性能编程指南
Intel Xeon Phi协处理器高性能编程指南

相关文章

相关课程