Kafka入门与实践

978-7-115-46957-1
作者: 牟大恩
译者:
编辑: 杨海玲

图书目录:

详情

本书先从Kafka基础切入,由浅入深对Kafka技术内幕进行深入剖析,由点到线再到面的思路对Kafka核心进行分析,接着详细讲解Kafka基本操作,基本操作也是由简单到复杂,先介绍如何应用Kafka自带工具进行相关操作,然后通过编码实现详细介绍Kafka主要Api接口实现。最后结合应用实例来搭建Kafka相关应用。

图书摘要

版权信息

书名:Kafka入门与实践

ISBN:978-7-115-46957-1

本书由人民邮电出版社发行数字版。版权所有,侵权必究。

您购买的人民邮电出版社电子书仅供您个人使用,未经授权,不得以任何方式复制和传播本书内容。

我们愿意相信读者具有这样的良知和觉悟,与我们共同保护知识产权。

如果购买者有侵权行为,我们可能对该用户实施包括但不限于关闭该帐号等维权措施,并可能追究法律责任。

• 著    牟大恩

  责任编辑 杨海玲

• 人民邮电出版社出版发行  北京市丰台区成寿寺路11号

  邮编 100164  电子邮件 315@ptpress.com.cn

  网址 http://www.ptpress.com.cn

• 读者服务热线:(010)81055410

  反盗版热线:(010)81055315


本书以Kafka 0.10.1.1版本以基础,对Kafka的基本组件的实现细节及其基本应用进行了详细介绍,同时,通过对Kafka与当前大数据主流框架整合应用案例的讲解,进一步展现了Kafka在实际业务中的作用和地位。本书共10章,按照从抽象到具体、从点到线再到面的学习思维模式,由浅入深,理论与实践相结合,对Kafka进行了分析讲解。

本书中的大量实例来源于作者在实际工作中的实践,具有现实指导意义。相信读者阅读完本书之后,能够全面掌握Kafka的基本实现原理及其基本操作,能够根据书中的案例举一反三,解决实际工作和学习中的问题。此外,在阅读本书时,读者可以根据本书对Kafka理论的分析,再结合Kafka源码进行定位学习,了解Kafka优秀的设计和思想以及更多的编码技巧。

本书适合应用Kafka的专业技术人员阅读,包括但不限于大数据相关应用的开发者、运维者和爱好者,也适合高等院校、培训结构相关专业的师生使用。


Kafka 由于高吞吐量、可持久化、分布式、支持流数据处理等特性而被广泛应用。但当前关于Kafka原理及应用的相关资料较少,在我打算编写本书时,还没有见到中文版本的Kafka相关书籍,对于初学者甚至是一些中高级应用者来说学习成本还是比较高的,因此我打算在对Kafka进行深入而系统的研究基础上,结合自己在工作中的实践经验,编写一本介绍Kafka原理及其基本应用的书籍,以帮助Kafka初、中、高级应用者更快、更好地全面掌握Kafka的基础理论及其基本应用,从而解决实际业务中的问题。同时,一直以来我都考虑在技术方面写点什么,将自己所学、所积累的知识沉淀下来。

通过编写本书,我最大收获有如下两点。

第一,凡事不是要尽力而为,而是要全力以赴,持之以恒。写书和阅读源码其实都是很枯燥的事,理工科出身的我,在文字表达能力上还是有所欠缺的,有些知识点可能在脑海里十分清晰,然而当用文字表述出来时,就显得有些“力不从心”了。对于纯技术的东西要用让读者阅读时感觉轻松的文字描述出来更是不易,因此看似简短的几行文字,我在编写时可能斟酌和修改了很久。我真的很钦佩那些大师们,他们写出来的东西总让人很轻松地就能够掌握,“路漫漫其修远兮,吾将上下而求索”,向大师们致敬!虽然有很多客观或主观的因素存在,但我依然没有放弃。还记得2016年10月的一天,当我决定编写本书时,我告诉妻子:“我要写一本书作为送给我们未来宝宝的见面礼!”带着这份动力我利用下班时间、周末时间,在夜深人静时默默地进行着Kafka相关内容的研究、学习、实战,妻子对我的鼓励、陪伴更是激励我要坚持本书的编写。带着这份动力,带着这份爱,我终于完成了本书。

第二,通过对Kafka源码的阅读,我除了对很多原来在实践中只知其然而不知其所以然的问题有了更深入的理解以外,还对Kafka优秀的设计思想及其编码技巧有所了解。

本书共10章,各章主要内容具体描述如下。

第1章对Kafka的基本概念进行简要介绍,方便读者对Kafka有一个大致的了解。

第2章详细介绍Kafka安装环境的配置及Kafka源码的编译,这一章为后续各章的Kafka原理讲解及基本操作进行准备。

第3章对Kafka基本组件的实现原理、实现细节进行了分析。如果只想了解Kafka的相关应用,而不关注Kafka的实现原理,在阅读时可以直接跳过这一章。但我觉得,如果想真正掌握Kafka及其实现细节,这一章是值得花时间仔细阅读的。

第4章对Kafka核心流程进行分析,主要从Kafka启动流程到创建一个主题、生产者发送消息、消费者消费消息的过程进行了简要介绍。这一章是Kafka运行机制的缩影,如果跳过了第3章关于组件实现原理的讲解,那么建议一定要阅读这一章,因为通过阅读这一章可以更进一步地了解Kafka运行时的主要角色及其职责,为后面的Kafka实战部分打下坚实基础。

第5章开始就进入了Kafka实战部分。这一章通过Kafka自带脚本演示,详细介绍了Kafka基本应用的操作步骤,基本覆盖了Kafka相关操作,因此请读者在阅读时要跟随本书所讲内容进行实战。

第6章对Kafka的API应用进行了详细介绍。如果读者在实践工作中不会用到调用Kafka的相关API,在阅读时也可以跳过这一章。

第7章对Kafka Streams进行了介绍。Kafka Streams是Kafka新增的支持流数据处理的Java库。如果读者不希望使用此功能,也可以跳过这一章。

第8章介绍Kafka在数据采集方面的应用,主要包括与Log4j、Flume和HDFS的整合应用。

第9章对Kafka与ELK(Elasticsearch、Logstash和Kibana)整合实现日志采集平台相关应用进行介绍。

第10章通过两个简单的实例,介绍了Spark以及Kafka与Spark整合在离线计算、实时计算方面的应用。

本书的结构安排上,各章的内容相互独立,因此读者可以首先选择自己最感兴趣的章进行阅读,之后再阅读其他章。例如,读者可以先阅读第5章及其之后的几章,先通过实践操作对Kafka有一个感性的认识,然后再阅读第3章和第4章的相关原理及运行机制的内容,逐步加深对Kafka实现细节的理解。而第8章至第10章则是Kafka与当前大数据处理主流框架的整合应用,属于Kafka高级应用部分,可以帮助读者解决实际业务问题。

我建议读者一定要阅读第2章。通过第2章介绍的环境配置,读者能自己在本地搭建Kafka运行环境,阅读本书时,可跟随本书所讲解的操作进行实践。

本书的目标读者定位是应用Kafka的初、中、高级开发人员及运维工程师。

从事Kafka应用开发的技术人员读完本书,可以学习到Kafka原理的分析及相关API应用以及结合当前主流大数据框架整合的应用,应该能够全面掌握Kafka的基本原理和整体结构,并为实际业务实现提供思路,从而能够更加快速地解决一些问题。

从事Kafka或数据运维的技术人员,读完本书详细的Kafka基本操作以及Kafka与其他大数据框架的整合应用案例,应该可以快速搭建、运维和管理Kafka及相应的系统平台。

从事Kafka相关应用的资深开发或架构人员,读完本书对Kafka原理的分析有助于对Kafka性能进行调优,可以更好地开发和设计与Kafka相关的应用。

对于初学者,通过阅读本书可以全面掌握Kafka的知识,同时可以通过Kafka与其他框架整合的案例来拓宽视野,为学习分布式相关知识打下基础。

在阅读本书之前,读者需要具备以下基础。

在写作过程当中,我除阅读了Kafka源码之外,还从网络上阅读了大量参考资料,从中获得了很多帮助,在此对这些前辈的无私奉献精神表示由衷的钦佩和衷心的感谢。本书参考的资料如下。

非常高兴能将这本书分享给大家,也十分感谢大家购买和阅读本书。在编写本书时,虽然我精益求精,尽了最大的努力,但由于能力有限,加之时间仓促,书中难免存在不足甚至错误,敬请读者给予指正。如果有任何问题和建议,读者可发送邮件至moudaen@163.com。

在编写本书时得到了很多人的帮助。

首先我要感谢我的妻子,在我编写本书时你承担了所有家务,让我过着饭来张口、衣来伸手的生活,使我能够全身心投入到写作当中,这本书能够完成有你一半的功劳。也要感谢我的家人,家永远是我心灵的港湾,家人的爱永远是我奋斗的动力。同时也将本书献给我即将出生的宝宝,愿你健康成长,在未来的日子里我会给你更多的惊喜。

然后我特别要感谢人民邮电出版社的杨海玲老师,感谢你一直以来给予我的支持和鼓励,感谢你在本书编写、出版整个过程当中的辛勤付出。也要感谢人民邮电出版社所有参与本书编辑和出版的老师们,正是由于你们的辛勤付出和一丝不苟的工作态度才让本书出版成为可能。

同时要感谢我的工作单位海通证券,公司为我提供了一个非常优越的工作、学习和生活环境。在此要特别感谢部门领导和同事在我编写本书过程中提出很多宝贵的建议,我很荣幸能够与大家成为同事,共同奋斗。

最后我要感谢所有培养过我的老师们,是你们教会了我用知识改变命运,用学习成就未来。

牟大恩

2017年9月于上海


Kafka是一个高吞吐量、分布式的发布—订阅消息系统。据Kafka官方网站介绍,当前的Kafka已经定位为一个分布式流式处理平台(a distributed streaming platform),它最初由LinkedIn公司开发,后来成为Apache项目的一部分。Kafka核心模块使用Scala语言开发,支持多语言(如Java、C/C++、Python、Go、Erlang、Node.js等)客户端,它以可水平扩展和具有高吞吐量等特性而被广泛使用。目前越来越多的开源分布式处理系统(如Flume、Apache Storm、Spark、Flink等)支持与Kafka集成,本书第8章至第10章将通过具体案例详细介绍Kafka与当前一些流行的分布式处理系统的集成应用。接下来我们将对Kafka相关知识做进一步深入介绍。

随着信息技术的快速发展及互联网用户规模的急剧增长,计算机所存储的信息量正呈爆炸式增长,目前数据量已进入大规模和超大规模的海量数据时代,如何高效地存储、分析、处理和挖掘海量数据已成为技术研究领域的热点和难点问题。当前出现的云存储、分布式存储系统、NoSQL数据库及列存储等前沿技术在海量数据的驱使下,正日新月异地向前发展,采用这些技术来处理大数据成为一种发展趋势。而如何采集和运营管理、分析这些数据也是大数据处理中一个至关重要的组成环节,这就需要相应的基础设施对其提供支持。针对这个需求,当前业界已有很多开源的消息系统应运而生,本书介绍的Kafka就是当前流行的一款非常优秀的消息系统。

Kafka 是一款开源的、轻量级的、分布式、可分区和具有复制备份的(Replicated)、基于ZooKeeper 协调管理的分布式流平台的功能强大的消息系统。与传统的消息系统相比,Kafka能够很好地处理活跃的流数据,使得数据在各个子系统中高性能、低延迟地不停流转。

据Kafka官方网站介绍,Kafka定位就是一个分布式流处理平台。在官方看来,作为一个流式处理平台,必须具备以下3个关键特性。

Kafka能够很好满足以上3个特性,通过Kafka能够很好地建立实时流式数据通道,由该通道可靠地获取系统或应用程序的数据,也可以通过Kafka方便地构建实时流数据应用来转换或是对流式数据进行响应处理。特别是在0.10版本之后,Kafka推出了Kafka Streams,这让Kafka对流数据处理变得更加方便。

Kafka已发布多个版本。截止到编写本书时,Kafka的最新版本为0.10.1.1,因此本书内容都是基于该版本进行讲解。

通过前面对Kafka背景知识的简短介绍,我们对Kafka是什么有了初步的了解,本节我们将进一步介绍Kafka作为消息系统的基本结构。我们知道,作为一个消息系统,其基本结构中至少要有产生消息的组件(消息生产者,Producer)以及消费消息的组件(消费者,Consumer)。虽然消费者并不是必需的,但离开了消费者构建一个消息系统终究是毫无意义的。Kafka消息系统最基本的体系结构如图1-1所示。

图1-1 Kafka消息系统最基本的体系结构

生产者负责生产消息,将消息写入Kafka集群;消费者从Kafka集群中拉取消息。至于生产者如何将生产的消息写入 Kafka,消费者如何从 Kafka 集群消费消息,Kafka 如何存储消息,Kafka 集群如何管理调度,如何进行消息负载均衡,以及各组件间如何进行通信等诸多问题,我们将在后续章节进行详细阐述,在本节我们只需对Kafka基本结构轮廓有个清晰认识即可。随着对Kafka相关知识的深入学习,我们将逐步对Kafka的结构图进行完善。

在对Kafka基本体系结构有了一定了解后,本节我们对Kafka的基本概念进行详细阐述。

1.主题

Kafka将一组消息抽象归纳为一个主题(Topic),也就是说,一个主题就是对消息的一个分类。生产者将消息发送到特定主题,消费者订阅主题或主题的某些分区进行消费。

2.消息

消息是Kafka通信的基本单位,由一个固定长度的消息头和一个可变长度的消息体构成。在老版本中,每一条消息称为Message;在由Java重新实现的客户端中,每一条消息称为Record。

3.分区和副本

Kafka将一组消息归纳为一个主题,而每个主题又被分成一个或多个分区(Partition)。每个分区由一系列有序、不可变的消息组成,是一个有序队列。

每个分区在物理上对应为一个文件夹,分区的命名规则为主题名称后接“—”连接符,之后再接分区编号,分区编号从0开始,编号最大值为分区的总数减1。每个分区又有一至多个副本(Replica),分区的副本分布在集群的不同代理上,以提高可用性。从存储角度上分析,分区的每个副本在逻辑上抽象为一个日志(Log)对象,即分区的副本与日志对象是一一对应的。每个主题对应的分区数可以在Kafka启动时所加载的配置文件中配置,也可以在创建主题时指定。当然,客户端还可以在主题创建后修改主题的分区数。

分区使得Kafka在并发处理上变得更加容易,理论上来说,分区数越多吞吐量越高,但这要根据集群实际环境及业务场景而定。同时,分区也是Kafka保证消息被顺序消费以及对消息进行负载均衡的基础。

Kafka只能保证一个分区之内消息的有序性,并不能保证跨分区消息的有序性。每条消息被追加到相应的分区中,是顺序写磁盘,因此效率非常高,这是Kafka高吞吐率的一个重要保证。同时与传统消息系统不同的是,Kafka并不会立即删除已被消费的消息,由于磁盘的限制消息也不会一直被存储(事实上这也是没有必要的),因此Kafka提供两种删除老数据的策略,一是基于消息已存储的时间长度,二是基于分区的大小。这两种策略都能通过配置文件进行配置,在这里不展开探讨,在3.5.4节将详细介绍。

4.Leader副本和Follower副本

由于Kafka副本的存在,就需要保证一个分区的多个副本之间数据的一致性,Kafka会选择该分区的一个副本作为Leader副本,而该分区其他副本即为Follower副本,只有Leader副本才负责处理客户端读/写请求,Follower副本从Leader副本同步数据。如果没有Leader副本,那就需要所有的副本都同时负责读/写请求处理,同时还得保证这些副本之间数据的一致性,假设有n个副本则需要有n×n条通路来同步数据,这样数据的一致性和有序性就很难保证。

引入Leader副本后客户端只需与Leader副本进行交互,这样数据一致性及顺序性就有了保证。Follower副本从Leader副本同步消息,对于n个副本只需n−1条通路即可,这样就使得系统更加简单而高效。副本Follower与Leader的角色并不是固定不变的,如果Leader失效,通过相应的选举算法将从其他Follower副本中选出新的Leader副本。

5.偏移量

任何发布到分区的消息会被直接追加到日志文件(分区目录下以“.log”为文件名后缀的数据文件)的尾部,而每条消息在日志文件中的位置都会对应一个按序递增的偏移量。偏移量是一个分区下严格有序的逻辑值,它并不表示消息在磁盘上的物理位置。由于Kafka几乎不允许对消息进行随机读写,因此Kafka并没有提供额外索引机制到存储偏移量,也就是说并不会给偏移量再提供索引。消费者可以通过控制消息偏移量来对消息进行消费,如消费者可以指定消费的起始偏移量。为了保证消息被顺序消费,消费者已消费的消息对应的偏移量也需要保存。需要说明的是,消费者对消息偏移量的操作并不会影响消息本身的偏移量。旧版消费者将消费偏移量保存到ZooKeeper当中,而新版消费者是将消费偏移量保存到Kafka内部一个主题当中。当然,消费者也可以自己在外部系统保存消费偏移量,而无需保存到Kafka中。

6.日志段

一个日志又被划分为多个日志段(LogSegment),日志段是Kafka日志对象分片的最小单位。与日志对象一样,日志段也是一个逻辑概念,一个日志段对应磁盘上一个具体日志文件和两个索引文件。日志文件是以“.log”为文件名后缀的数据文件,用于保存消息实际数据。两个索引文件分别以“.index”和“.timeindex”作为文件名后缀,分别表示消息偏移量索引文件和消息时间戳索引文件。

7.代理

在Kafka基本体系结构中我们提到了Kafka集群。Kafka集群就是由一个或多个Kafka实例构成,我们将每一个Kafka实例称为代理(Broker),通常也称代理为Kafka服务器(KafkaServer)。在生产环境中Kafka集群一般包括一台或多台服务器,我们可以在一台服务器上配置一个或多个代理。每一个代理都有唯一的标识id,这个id是一个非负整数。在一个Kafka集群中,每增加一个代理就需要为这个代理配置一个与该集群中其他代理不同的id,id值可以选择任意非负整数即可,只要保证它在整个Kafka集群中唯一,这个id就是代理的名字,也就是在启动代理时配置的broker.id对应的值,因此在本书中有时我们也称为brokerId。由于给每个代理分配了不同的brokerId,这样对代理进行迁移就变得更方便,从而对消费者来说是透明的,不会影响消费者对消息的消费。代理有很多个参数配置,由于在本节只是对其概念进行阐述,因此不做深入展开,对于代理相关配置将穿插在本书具体组件实现原理、流程分析及相关实战操作章节进行介绍。

8.生产者

生产者(Producer)负责将消息发送给代理,也就是向Kafka代理发送消息的客户端。

9.消费者和消费组

消费者(Comsumer)以拉取(pull)方式拉取数据,它是消费的客户端。在Kafka中每一个消费者都属于一个特定消费组(ConsumerGroup),我们可以为每个消费者指定一个消费组,以groupId代表消费组名称,通过group.id配置设置。如果不指定消费组,则该消费者属于默认消费组test-consumer-group。同时,每个消费者也有一个全局唯一的id,通过配置项client.id指定,如果客户端没有指定消费者的id,Kafka会自动为该消费者生成一个全局唯一的id,格式为${groupId}-${hostName}-${timestamp}-${UUID前8位字符}。同一个主题的一条消息只能被同一个消费组下某一个消费者消费,但不同消费组的消费者可同时消费该消息。消费组是Kafka用来实现对一个主题消息进行广播和单播的手段,实现消息广播只需指定各消费者均属于不同的消费组,消息单播则只需让各消费者属于同一个消费组。

10.ISR

Kafka在ZooKeeper中动态维护了一个ISR(In-sync Replica),即保存同步的副本列表,该列表中保存的是与Leader副本保持消息同步的所有副本对应的代理节点id。如果一个Follower副本宕机(本书用宕机来特指某个代理失效的情景,包括但不限于代理被关闭,如代理被人为关闭或是发生物理故障、心跳检测过期、网络延迟、进程崩溃等)或是落后太多,则该Follower副本节点将从ISR列表中移除。

11.ZooKeeper

这里我们并不打算介绍ZooKeeper的相关知识,只是简要介绍ZooKeeper在Kafka中的作用。Kafka利用ZooKeeper保存相应元数据信息,Kafka元数据信息包括如代理节点信息、Kafka集群信息、旧版消费者信息及其消费偏移量信息、主题信息、分区状态信息、分区副本分配方案信息、动态配置信息等。Kafka在启动或运行过程当中会在ZooKeeper上创建相应节点来保存元数据信息,Kafka通过监听机制在这些节点注册相应监听器来监听节点元数据的变化,从而由ZooKeeper负责管理维护Kafka集群,同时通过ZooKeeper我们能够很方便地对Kafka集群进行水平扩展及数据迁移。

通过以上Kafka基本概念的介绍,我们可以对Kafka基本结构图进行完善,如图1-2所示。

图1-2 Kafka的集群结构

Kafka的设计初衷是使Kafka能够成为统一、实时处理大规模数据的平台。为了达到这个目标,Kafka必须支持以下几个应用场景。

(1)具有高吞吐量来支持诸如实时的日志集这样的大规模事件流。

(2)能够很好地处理大量积压的数据,以便能够周期性地加载离线数据进行处理。

(3)能够低延迟地处理传统消息应用场景。

(4)能够支持分区、分布式,实时地处理消息,同时具有容错保障机制。

满足以上功能的Kafka与传统的消息系统相比更像是一个数据库日志系统。了解了Kafka的设计动机之后,在下一节我们将看看Kafka发展至今已具有哪些特性。

上一节对Kafka的设计动机进行了介绍。随着Kafka的不断更新发展,当前版本的Kafka又增加了一些新特性,下面就来逐个介绍Kafka的这些新特性。

1.消息持久化

Kafka高度依赖于文件系统来存储和缓存消息。说到文件系统,大家普遍认为磁盘读写慢,依赖于文件系统进行存储和缓存消息势必在性能上会大打折扣,其实文件系统存储速度快慢一定程度上也取决于我们对磁盘的用法。据Kafka官方网站介绍:6块7200r/min SATA RAID-5阵列的磁盘线性写的速度为600 MB/s,而随机写的速度为100KB/s,线性写的速度约是随机写的6000多倍。由此看来磁盘的快慢取决于我们是如何去应用磁盘。加之现代的操作系统提供了预读(read-ahead)和延迟写(write-behind)技术,使得磁盘的写速度并不是大家想象的那么慢。同时,由于Kafka是基于JVM(Java Virtual Machine)的,而Java对象内存消耗非常高,且随着Java对象的增加JVM的垃圾回收也越来越频繁和繁琐,这些都加大了内存的消耗。鉴于以上因素,使用文件系统和依赖于页缓存(page cache)的存储比维护一个内存的存储或是应用其他结构来存储消息更有优势,因此Kafka选择以文件系统来存储数据。

消息系统数据持久化一般采用为每个消费者队列提供一个 B 树或其他通用的随机访问数据结构来维护消息的元数据,B树操作的时间复杂度为O(log n),O(log n)的时间复杂度可以看成是一个常量时间,而且B树可以支持各种各样的事务性和非事务性语义消息的传递。尽管B树具有这些优点,但这并不适合磁盘操作。目前的磁盘寻道时间一般在10ms以内,对一块磁盘来说,在同一时刻只能有一个磁头来读写磁盘,这样在并发IO能力上就有问题。同时,对树结构性能的观察结果表明:其性能会随着数据的增长而线性下降。鉴于消息系统本身的作用考虑,数据的持久化队列可以建立在简单地对文件进行追加的实现方案上。因为是顺序追加,所以Kafka在设计上是采用时间复杂度O(1)的磁盘结构,它提供了常量时间的性能,即使是存储海量的信息(TB级)也如此,性能和数据的大小关系也不大,同时Kafka将数据持久化到磁盘上,这样只要磁盘空间足够大数据就可以一直追加,而不会像一般的消息系统在消息被消费后就删除掉,Kafka提供了相关配置让用户自己决定消息要保存多久,这样为消费者提供了更灵活的处理方式,因此Kafka能够在没有性能损失的情况下提供一般消息系统不具备的特性。

正是由于Kafka将消息进行持久化,使得Kafka在机器重启后,已存储的消息可继续恢复使用。同时Kafka能够很好地支持在线或离线处理、与其他存储及流处理框架的集成。

2.高吞吐量

高吞吐量是Kafka设计的主要目标,Kafka将数据写到磁盘,充分利用磁盘的顺序读写。同时,Kafka在数据写入及数据同步采用了零拷贝(zero-copy)技术,采用sendFile()函数调用,sendFile()函数是在两个文件描述符之间直接传递数据,完全在内核中操作,从而避免了内核缓冲区与用户缓冲区之间数据的拷贝,操作效率极高。Kafka还支持数据压缩及批量发送,同时Kafka将每个主题划分为多个分区,这一系列的优化及实现方法使得Kafka具有很高的吞吐量。经大多数公司对Kafka应用的验证,Kafka支持每秒数百万级别的消息。

3.扩展性

Kafka要支持对大规模数据的处理,就必须能够对集群进行扩展,分布式必须是其特性之一,这样就可以将多台廉价的PC服务器搭建成一个大规模的消息系统。Kafka依赖ZooKeeper来对集群进行协调管理,这样使得Kafka更加容易进行水平扩展,生产者、消费者和代理都为分布式,可配置多个。同时在机器扩展时无需将整个集群停机,集群能够自动感知,重新进行负责均衡及数据复制。

4.多客户端支持

Kafka核心模块用Scala语言开发,但Kafka支持不同语言开发生产者和消费者客户端应用程序。0.8.2之后的版本增加了Java版本的客户端实现,0.10之后的版本已废弃Scala语言实现的Producer及Consumer,默认使用Java版本的客户端。Kafka提供了多种开发语言的接入,如Java、Scala、C、C++、Python、Go、Erlang、Ruby、Node.js等,感兴趣的读者可以自行参考https://cwiki.apache.org/confluence/display/KAFKA/Clients。同时,Kafka支持多种连接器(Connector)的接入,也提供了Connector API供开发者调用。Kafka与当前主流的大数据框架都能很好地集成,如Flume、Hadoop、HBase、Hive、Spark、Storm等。

5.Kafka Streams

Kafka在0.10之后版本中引入Kafak Streams。Kafka Streams是一个用Java语言实现的用于流处理的jar文件,关于Kafka Streams的相关内容将在第7章中进行讲解。

6.安全机制

当前版本的Kafka支持以下几种安全措施:

7.数据备份

Kafka可以为每个主题指定副本数,对数据进行持久化备份,这可以一定程度上防止数据丢失,提高可用性。

8.轻量级

Kafka的代理是无状态的,即代理不记录消息是否被消费,消费偏移量的管理交由消费者自己或组协调器来维护。同时集群本身几乎不需要生产者和消费者的状态信息,这就使得Kafka非常轻量级,同时生产者和消费者客户端实现也非常轻量级。

9.消息压缩

Kafka支持Gzip、Snappy、LZ4这3种压缩方式,通常把多条消息放在一起组成MessageSet,然后再把MessageSet放到一条消息里面去,从而提高压缩比率进而提高吞吐量。

消息系统或是说消息队列中间件是当前处理大数据一个非常重要的组件,用来解决应用解耦、异步通信、流量控制等问题,从而构建一个高效、灵活、消息同步和异步传输处理、存储转发、可伸缩和最终一致性的稳定系统。当前比较流行的消息中间件有Kafka、RocketMQ、RabbitMQ、ZeroMQ、ActiveMQ、MetaMQ、Redis等,这些消息中间件在性能及功能上各有所长。如何选择一个消息中间件取决于我们的业务场景、系统运行环境、开发及运维人员对消息中件间掌握的情况等。我认为在下面这些场景中,Kafka是一个不错的选择。

(1)消息系统。Kafka作为一款优秀的消息系统,具有高吞吐量、内置的分区、备份冗余分布式等特点,为大规模消息处理提供了一种很好的解决方案。

(2)应用监控。利用Kafka采集应用程序和服务器健康相关的指标,如CPU占用率、IO、内存、连接数、TPS、QPS等,然后将指标信息进行处理,从而构建一个具有监控仪表盘、曲线图等可视化监控系统。例如,很多公司采用Kafka与ELK(ElasticSearch、Logstash和Kibana)整合构建应用服务监控系统。

(3)网站用户行为追踪。为了更好地了解用户行为、操作习惯,改善用户体验,进而对产品升级改进,将用户操作轨迹、内容等信息发送到Kafka集群上,通过Hadoop、Spark或Strom等进行数据分析处理,生成相应的统计报告,为推荐系统推荐对象建模提供数据源,进而为每个用户进行个性化推荐。

(4)流处理。需要将已收集的流数据提供给其他流式计算框架进行处理,用Kafka收集流数据是一个不错的选择,而且当前版本的Kafka提供了Kafka Streams支持对流数据的处理。

(5)持久性日志。Kafka可以为外部系统提供一种持久性日志的分布式系统。日志可以在多个节点间进行备份,Kafka为故障节点数据恢复提供了一种重新同步的机制。同时,Kafka很方便与HDFS和Flume进行整合,这样就方便将Kafka采集的数据持久化到其他外部系统。

本书在结构编排上,先介绍Kafka基础知识,接着介绍Kafka应用环境搭建,然后对Kafka核心组件实现原理进行简要讲解。在核心组件原理讲解之后,又将相应组件应用串起来分析Kafka核心流程,之后从Kafka基本脚本操作实战开始,结合Kafka在实际工作中应用案例详细介绍Kafka与当前主流大数据处理框架的应用。同时,将Kafka Streams独立成一章进行详细介绍,基本上覆盖了Kafka Streams的核心及重要知识的讲解。

为了编写和讲解方便,本书有以下几点约定说明。

(1)本书所讲Kafka版本为0.10.1.1,书中提及的当前版本Kafka均指这一版本。

(2)在Kafka基本组件实现原理讲解时,为了指明方法所属的对象,本书简单地以“类名.方法名()”的形式说明,这并不表示对类静态方法的调用。同时,鉴于篇幅考虑也省去了方法参数列表,但不代表该方法无参数。

(3)读者在阅读本书时经常会看到“${属性字段}”表达式,本书以此表示该属性字段对应的值。

本章首先对Kafka背景及一个简单的Kafka消息系统基本结构进行了简单介绍,然后对Kafka涉及的基本概念进行了阐述,最后就Kafka的设计思想、特性及应用场景进行了归纳。


本章将详细介绍Kafka运行环境的搭建,包括在Linux系统和Windows系统中搭建Kafka运行环境。

由于Kafka是用Scala语言开发的,运行在JVM上,因此在安装Kafka之前需要先安装JDK。

最新版本的Kafka需要运行在JDK 1.7以上,Kafka官方网站推荐使用JDK 1.8,因此本书所应用的JDK环境采用JDK 1.8。下面将详细介绍JDK 1.8安装步骤。

1.Windows安装JDK

(1)下载并安装。首先在Oracle官方网站http://www.oracle.com/technetwork/java/javase/ downloads/jdk8-downloads-2133151.html下载JDK 1.8安装文件,根据操作系统类型选择相应的JDK版本。我使用的是64位操作系统,因此下载jdk-8u111-windows-x64.exe安装文件。下载完成后,双击运行安装。在安装时可以选择安装路径,这里在安装时全使用默认路径。

(2)环境变量配置。在系统变量中新增变量名JAVA_HOME,变量值为JDK 1.8安装路径。由于Java默认安装在Program Files目录下,这个目录名之间有空格,有可能在运行某些应用时因JDK安装路径有空格而报错。例如,我在安装JDK后,运行Kafka时报如下错误:

错误: 找不到或无法加载主类 Files\Java\jdk1.8.0_111\lib\dt.jar;C:\Program

为了避免出现类似的错误,在Windows系统上,若JDK安装在Program File目录下,在设置JAVA_HOME时,用该目录别名PROGRA~1,因此将JAVA_HOME设置为C:\PROGRA~1\Java\jdk1.8.0_111。新增变量名CLASSPATH,变量值为.;% JAVA_HOME%\lib\dt.jar;% JAVA_HOME%\ lib\tools.jar。本步操作JDK环境变量配置如表2-1所示。

表2-1 JDK环境变量配置

变 量 类 型

变 量 名

变 量 值

系统变量

JAVA_HOME

C:\PROGRA~1\Java\jdk1.8.0_111

用户变量

CLASSPATH

.;% JAVA_HOME%\lib\dt.jar;%JAVA_HOME%\lib\tools.jar

(3)验证。环境变量配置完成后,在Windows的cmd终端输入查看Java版本的命令,以此来验证JDK安装配置是否成功。命令如下:

Java –version

若输出为以下JDK版本信息,则表示JDK 1.8已安装成功,且为系统默认JDK。

Java version "1.8.0_111"
Java(TM) SE Runtime Environment (build 1.8.0_111-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.111-b14, mixed mode)

2.Linux安装JDK

一些Linux的发行版默认已安装了JDK,如OpenJDK,这里所用的Linux操作系统默认已经安装了OpenJDK。输入下面的命令查看JDK版本信息:

Java  –version

输出JDK版本信息如下:

Java version "1.7.0_45"
OpenJDK Runtime Environment (rhel-2.4.3.3.el6-x86_64 u45-b15)
OpenJDK 64-Bit Server VM (build 24.45-b08, mixed mode)

然而有些Linux系统并没有安装JDK,因此本小节将详细讲解如何在Linux中安装JDK。这里我们讲解JDK 1.8的安装。

(1)下载并安装。进入Oracle官方网站http://www.oracle.com/technetwork/Java/Javase/downloads/ jdk8-downloads-2133151.html下载Linux版本的JDK 1.8安装包。这里我们下载的安装包版本为jdk-8u111-linux-x64.gz,并将安装包解压到/usr/local/software/Java路径下。

tar -xzvf jdk-8u111-linux-x64.gz      # 解压jdk安装包

将安装包解压后,即完成JDK的安装。

(2)配置环境变量。在/etc/profile文件中添加JDK和JRE的路径,并添加到Path中,操作命令如下:

vi /etc/profile      # 编辑profile文件

在文件中添加以下内容:

export JAVA_HOME=/usr/local/software/Java/jdk1.8.0_111
export JRE_HOME=/usr/local/software/Java/jdk1.8.0_111/jre
export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin

保存退出。若系统默认安装了OpenJDK,则用户可以选择将其删除,也可以修改配置用最新安装的版本替换OpenJDK。这里选择用新安装的JDK替换系统自带的OpenJDK,则按序执行以下命令。

    update-alternatives --install /usr/bin/Java Java /usr/local/software/Java/jdk1.8.0_111 300
    update-alternatives --install /usr/bin/Javac Javac /usr/local/software/   
         Java/jdk1.8.0_111/bin/Javac 300
    update-alternatives --config Java

执行以上命令会出现JDK版本选择界面,如图2-1所示。这里我们选择新安装的JDK 1.8,即输入序号3,按回车键。

图2-1 Linux控制台展示的JDK版本选择命令行界面

环境变量配置好后执行以下命令,让刚才的修改操作立即生效:

    source /etc/profile           # 让对/etc/profile的修改立即生效

(3)验证。输入查看JDK版本命令,查看环境变量配置是否成功,执行以下命令:

Java –version      # 查看jdk版本

输出以下JDK版本信息:

Java version "1.8.0_111"
Java(TM) SE Runtime Environment (build 1.8.0_111-b14)
Java HotSpot(TM) 64-Bit Server VM (build 25.111-b14, mixed mode)

由以上JDK版本信息可知,JDK版本已替换为新安装的JDK 1.8版本。至此,JDK安装完成。

对Kafka集群本身来讲,配置SSH免密钥登录并不是必需的步骤,但作为分布式系统,一般会由多台机器构成。为了便于操作管理,如通过SSH方式启动集群代理等,这里对SSH安装配置进行介绍。

(1)在根目录下查看是否存在一个隐藏文件夹.ssh。若没有该文件夹,则在确保机器联网条件下执行以下命令安装ssh:

sudo apt-get install ssh      # 安装ssh

(2)进入.ssh目录,生成密钥对,执行命令如下:

ssh-keygen -t rsa      # 产生密钥

在执行以上命令时一路回车即可。ssh-keygen用于生成认证密钥,-t用来指定密钥类型,这里选择rsa密钥。执行完毕后会在~/.ssh目录下生成id_rsa和id_rsa.pub两个文件,其中id_rsa为私钥文件,id_rsa.pub为公钥文件。依次在集群其他机器上完成步骤1和步骤2。

(3)将id_rsa.pub文件内容追加到授权的key文件中,命令如下:

cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys      # 追加公钥到授权文件中

若是单机环境,则至此已完成ssh配置。

(4)将第一台机器的authorized_keys文件复制到第二台机器上,并将第二台机器的公钥也追加到authorized_keys文件中,依次执行以下命令:

scp authorized_keys root@172.117.12.62:~/.ssh/   # 复制第一台机器的授权文件到第二台机器
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys  # 在第二台机器上执行此命令,将第二台机器的公钥追加到授权文件中

(5)将第二台机器的authorized_keys文件复制到第三台机器上,并将第三台机器的公钥追加到authorized_keys文件中,执行命令如下:

scp authorized_keys root@172.117.12.63:~/.ssh/   # 复制第二台机器的授权文件到第三台机器
cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys   # 在第三台机器上执行此命令,将第三台机器的公钥追加到授权文件中

若集群还有更多机器,则依此类推完成授权文件合并。至此 ssh 配置完成,在已配置 ssh的任何一台机器上均可免密登录到其他机器。例如,在第一台机器上执行以下ssh命令,输出如下(首次登录会让输入密码):

[root@rhel65 .ssh]# ssh 172.117.12.61
The authenticity of host '172.117.12.61 (172.117.12.61)' can't be established.
RSA key fingerprint is a3:5b:a9:29:ed:00:74:f4:ce:51:e5:7c:42:5b:8d:44.
Are you sure you want to continue connecting (yes/no)? yes
Warning: Permanently added '172.117.12.61' (RSA) to the list of known hosts.
root@172.117.12.61's password: 
Last login: Wed Feb  8 17:30:11 2017 from server-1
[root@rhel65 ~]# ssh 172.117.12.61
Last login: Wed Feb  8 17:32:04 2017 from server-3
[root@rhel65 ~]# ssh 172.117.12.62
Last login: Wed Feb  8 17:26:09 2017 from server-1

ZooKeeper是一个分布式应用程序协调服务框架,分布式应用程序可以基于ZooKeeper来实现同步服务、配置维护、命名服务等,ZooKeeper能提供基于类似于文件系统的目录节点树方式的数据存储,通过监控各节点数据状态的变化,达到基于数据的集群管理。ZooKeeper主要由表2-2所示的几个角色构成。

表2-2 ZooKeeper集群主要角色说明

角  色

描  述

Leader

集群的领导者,负责投票的发起和决议及更新系统状态

Learner

Follower

跟随者,接受客户端的请求并返回结果给客户端,参与投票

Observer

接受客户端的请求,将写的请求转发给Leader,不参与投票。Observer目的是扩展系统,提高读的速度

关于ZooKeeper的原理及其他相关知识,读者可查阅ZooKeeper官方网站(http://mirrors. cnnic.cn/apache/zookeeper/)及相关书籍进行深入了解。

Kafka依赖ZooKeeper,通过ZooKeeper来对代理、消费者上下线管理、集群、分区元数据管理等,因此ZooKeeper也是Kafka得以运行的基础环境之一。

进入ZooKeeper官方网站http://mirrors.cnnic.cn/apache/zookeeper/下载ZooKeeper(本书所用ZooKeeper版本为zookeeper-3.4.8),然后将下载文件解压到指定目录。对ZooKeeper的安装,下面按Windows和Linux分别进行讲解。

1.Windows安装ZooKeeper

一般会选择在Linux操作系统上安装和部署分布式服务,因此这里并不打算讲解Windows环境下ZooKeeper集群环境搭建,只是简单介绍Windows环境下ZooKeeper单机模式的安装。

(1)解压安装。首先将ZooKeeper安装包zookeeper-3.4.8.tar.gz解压到相应目录,这里将ZooKeeper解压到D:\software\zookeeper-3.4.8目录下。然后进入ZooKeeper安装路径conf目录下,会看到ZooKeeper提供了一个zoo_sample.cfg的配置模板,将该文件重命名为zoo.cfg。zoo.cfg文件中只需修改dataDir和dataLogDir配置,其他配置使用默认值(其他配置及其含义将在下面的“Linux搭建ZooKeeper环境”小节详细介绍)。这里对dataDir和dataLogDir配置如下:

dataDir=F:\\zookeeper\\data
dataLogDir=F:\\zookeeper\\logs

至此,Windows环境下ZooKeeper安装配置完成。下面进入ZooKeeper安装路径bin目录下,启动及验证ZooKeeper安装是否成功。

(2)验证。执行启动ZooKeeper命令:

zkServer.cmd    # windows下启动ZooKeeper

若输出没有任何错误,通过jps命令可以看到ZooKeeper相关进程。输入命令:

jps  # 查看Java进程命令

输出结果中至少包括以下进程名:

12008 QuorumPeerMain
11596 Jps

还可以进入ZooKeeper的安装路径bin目录下,通过ZooKeeper客户端连接到ZooKeeper服务,执行以下命令进一步验证ZooKeeper是否安装成功:

zkCli.cmd -server 127.0.0.1:2181   # 登录到ZooKeeper服务器

在输出信息中会看到“Welcome to ZooKeeper!”,同时显示接受命令输入界面。

在客户端输入:

ls /      # 查看ZooKeeper服务器目录结构

此时ZooKeeper服务器中仅有一个zookeeper节点,信息显示如下:

[zk: 127.0.0.1:2181(CONNECTED) 0] ls /
[zookeeper]

至此,Windows环境下安装ZooKeeper讲解完毕。

2.Linux搭建ZooKeeper环境

在Linux环境下ZooKeeper单机模式配置与上一小节介绍的Windows环境下ZooKeeper安装配置的操作步骤基本相同,因此本小节直接介绍ZooKeeper分布式环境搭建。下面将讲解在Linux环境下如何配置由3台机器构成的ZooKeeper集群环境,这3台机器的IP地址分别为172.117.12.61、172.117.12.62和172.117.12.63。

(1)解压安装。首先在3台机器上分别将zookeeper-3.4.8.tar.gz解压到/usr/local/software/ zookeeper目录。进入解压后的zookeeper-3.4.8 /conf目录,将zoo.sample.cfg重命名为zoo.cfg。关于ZooKeeper配置文件中几个基础配置项的说明如表2-3所示。

表2-3 ZooKeeper基础配置说明

配 置 项

默 认 值

说  明

tickTime

2000ms

ZooKeeper中的一个时间单元。ZooKeeper中所有时间都以这个时间单元为基准,进行整数倍配置,默认是2 s

initLimit

10

Follower在启动过程中,会从Leader同步所有最新数据,确定自己能够对外服务的起始状态。当Follower在initLimt个tickTime还没有完成数据同步时,则Leader仍为Follower连接失败

syncLimit

5

Leader与Follower之间通信请求和应答的时间长度。若Leader在syncLimit个tickTime还没有收到Follower应答,则认为该Leader已下线

dataDir

/tmp/zookeeper

存储快照文件的目录,默认情况下,事务日志也会存储在该目录上。由于事务日志的写性能直接影响ZooKeeper性能,因此建议同时配置参数dataLogDir

dataLogDir

/tmp/zookeeper

事务日志输出目录

clientPort

2181

ZooKeeper对外端口

请读者根据自已服务器环境,修改zoo.cfg文件中表2-3提及参数的配置。这里只修改了以下两个配置项,其他几个基础配置沿用默认值。

dataDir=/opt/data/zookeeper/data
dataLogDir=/opt/data/zookeeper/logs

若是单机模式,操作至此完成。接下来配置将3台机器构成一个分布式集群。

(2)集群配置。首先在3台机器的/etc/hosts文件中加入3台机器的IP与机器域名映射,域名自定义,这里分别命名为server-1、server-2、server-3,3台机器IP与机器域名映射关系如下:

172.117.12.61 server-1
172.117.12.62 server-2
172.117.12.63 server-3

然后进入其中一台机器的ZooKeeper安装路径conf目录。这里我们选择在IP为172.117.12.61的机器上进行配置,编辑conf/zoo.cfg文件,在该文件中添加以下配置:

server.1=server-1:2888:3888
server.2=server-2:2888:3888
server.3=server-3:2888:3888

为了便于讲解以上配置,在这里抽象一个公式,即server.n=n-server-domain:port1:port2。这个公式中的n是一个数字类型常量,这里配置的1、2和3用于表示第几台ZooKeeper服务器;n-server-domain表示第n台ZooKeeper服务器的IP所映射的域名,当然这里也可以是第n台机器的IP;port1表示该服务器与集群中的Leader交换信息的端口,默认是2888;port2表示选举时服务器相互通信的端口。

接着在${dataDir}路径下创建一个myid文件。myid里存放的值就是服务器的编号,即对应上述公式中的n,在这里第一台机器myid存放的值为1。ZooKeeper在启动时会读取myid文件中的值与zoo.cfg文件中的配置信息进行比较,以确定是哪台服务器。

在zoo.cfg文件中我们同时配置了3台机器,因此接下来通过scp命令将本台机器的zoo.cfg文件复制到另外两台机器相应目录进行替换。

scp zoo.cfg root@172.117.12.62:/usr/local/software/zookeeper/zookeeper-3.4.8/conf/
scp zoo.cfg root@172.117.12.63:/usr/local/software/zookeeper/zookeeper-3.4.8/conf/

然后分别修改另外两台机器的myid。同时,为了操作方便,我们将ZooKeeper相关环境变量添加到/etc/profile文件当中。

设置ZooKeeper安装路径,在/etc/profile相关环境变量配置中添加以下信息:

export ZOOKEEPER_HOME=/usr/local/software/zookeeper/zookeeper-3.4.8

在该文件的Path配置项最后加上:$ZOOKEEPER_HOME/bin。注意,在$ZOOKEEPER_HOME前有一个冒号。然后执行source/etc/profile命令使所做的修改操作立即生效。其他两台机器也进行同样的环境设置。至此,由3台机器构成的分布式ZooKeeper环境搭建步骤介绍完毕。下面启动ZooKeeper进行验证。

(3)验证。由于配置了ZooKeeper环境变量,因此无需进入ZooKeeper安装路径bin目录下。在这3台机器上分别启动ZooKeeper:

zkServer.sh start    # 启动ZooKeeper服务

输出如下信息:

ZooKeeper JMX enabled by default
Using config: /usr/local/software/zookeeper/zookeeper-3.4.8/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

查看这3台ZooKeeper服务器状态,依次在这3台机器上执行以下命令:

zkServer.sh status    # 查询zookeeper状态

执行上述启动命令,其中有两台机器输出以下信息:

ZooKeeper JMX enabled by default
Using config: /usr/local/software/zookeeper/zookeeper-3.4.8/bin/../conf/zoo.cfg
Mode: follower

另外一台机器输出信息如下:

ZooKeeper JMX enabled by default
Using config: /usr/local/software/zookeeper/zookeeper-3.4.8/bin/../conf/zoo.cfg
Mode: leader

可以看到,这3台机器中,一台机器作为Leader,其他两台服务器作为Follower。同时,可以查看zookeeper.out文件内容,通过启动日志进一步了解ZooKeeper运行过程。至此,ZooKeeper集群环境搭建讲解完毕。

Kafka安装较简单,不同操作系统下安装步骤基本相同,针对大多数用户来讲,在生产环境使用Kafka一般选择Linux服务器,本书Kafka实战操作也是基于Linux环境进行讲解的。下面分别介绍Kafka在Windows操作系统以及Linux操作系统的安装步骤,Mac操作系统的安装步骤与Linux操作系统的安装步骤基本类似,不再介绍。同时,后面几节中的Kafka集群环境搭建也只介绍在Linux环境下Kafka集群环境的部署。

Windows下安装Kafka只需将下载的Kafka安装包解压到相应目录即可。

(1)下载及安装。进入Kafka官方网站http://kafka.apache.org/downloads下载当前最新版本的Kafka,Kafka安装包并没有区分Windows安装包还是Linux安装包,仅在bin目录下将Windows环境执行Kafka的相关脚本放在/bin/windows目录下。当前Kafka最新版本为kafka_2.11-0.10.1.1.tgz,其中2.11代表Scala版本,0.10.1.1表示Kafka的版本。这里将下载的安装包解压到D:\software\kafka_2.11-0.10.1.1目录下,为了便于讲解,这里记Kafka安装路径为$KAFKA_HOME。至此,Windows下Kafka完成安装。当然我们也可以像安装JDK一样配置Kafka环境变量,感兴趣的读者可以自行配置,这步操作不是必需的步骤,因此不再阐述。

(2)启动KafkaServer验证。安装好Kafka后,启动KafkaServer。在启动Kafka之前,需要启动Zoookeeper。若ZooKeeper服务不是本地服务,应修改Kafka安装目录下/config/server.properties文件zookeeper.connect配置项,然后在Windows的cmd下进入$KAFKA_HOME/bin/windows目录,执行以下命令,启动KafkaServer。

kafka-server-start.bat ../../config/server.properties   # windows下启动kafak server

若在启动过程中没有报任何异常信息,同时在控制台最后输出打印内容如图2-2所示,则表示Kafka在Windows环境下安装成功。

图2-2 KafkaServer启动日志

在Linux系统上安装Kafka与在Windows系统上安装操作基本相同,将安装包解压到相应目录,这里依然将Kafka安装目录记为$KAFKA_HOME,修改$KAFKA_HOME/config/server.properties文件相关配置即可。这里安装Kafka所用机器与安装ZooKeeper的机器相同,但在生产环境,一般将ZooKeeper集群与Kafka机器分机架部署。在讲解Kafka单机版本安装时,我们选择3台机器中的一台,IP为172.117.12.61。

(1)解压安装。先将Kafka安装包kafka_2.11-0.10.1.1.tgz解压到指定目录下,这里将Kafka解压到/usr/local/software/kafka目录下。进入/usr/local/software/kafka目录执行以下命令解压Kafka安装包。

tar -xzvf kafka_2.11-0.10.1.1.tgz      # 解压安装Kafka

由于后续对Kafka的讲解都是在Linux环境下,因此为了操作方便我们对Kafka的环境变量进行设置。在/etc/profile文件中加入Kafka安装路径,并将Kafka的bin目录添加进Path中。这一步操作并非Kafka安装必需的设置,读者可根据情况选择是否需要对Kafka环境变量进行配置。打开/etc/profile文件添加以下配置。

    export KAFKA_HOME=/usr/local/software/kafka/kafka_2.11-0.10.1.1
    export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin:$KAFKA_HOME/bin

保存文件退出,执行source /etc/profile命令让刚才新增的Kafka环境变量设置生效。再在任一路径下输入kafka然后按Tab键,会提示补全Kafka运行相关脚本.sh文件,表示Kafka环境变量配置成功,但一般Kafka脚本运行时会加载/config路径下的相关配置文件,因此当不在Kafka安装目录bin下执行相关脚本时,需要指定配置文件绝对路径。

(2)修改配置。修改$KAFKA_HOME/config目录下的server.properties文件,为了便于后续集群环境搭建的配置,需要保证同一个集群下broker.id要唯一,因此这里手动配置broker.id,直接保持与ZooKeeper的myid值一致,同时配置日志存储路径。server.properties修改的配置如下:

broker.id=1                      # 指定代理的id
log.dirs=/opt/data/kafka-logs    # 指定Log存储路径

其他配置保持不变,由于172.117.12.61这台机器本地已安装了ZooKeeper,因此在Kafka单机版本安装讲解时,我们暂不对zookeeper.connect配置进行修改,其他配置文件也暂不进行修改。

(3)验证

    kafka-server-start.sh -daemon ../config/server.properties    # 启动Kafka

执行jps命令查看Java进程,此时进程信息至少包括以下几项:

    15976 Jps
    14999 QuorumPeerMain
    15906 Kafka

可以看到ZooKeeper进程和Kafka进程名,同时进入$KAFKA_HOME/logs目录下,查看server.log会看到KafkaServer启动日志,在启动日志中会记录KafkaServer启动时加载的配置信息。

    zkCli.sh -server server-1:2181    # 登录ZooKeeper 
    ls /                              # 查看ZooKeeper目录结构

在Kafka启动之前ZooKeeper中只有一个zookeeper目录节点,Kafka启动后目录节点如下:

    [cluster, controller, controller_epoch, brokers, zookeeper, admin, isr_change_   
        notification, consumers, config]

执行以下命令,查看当前已启动的Kafka代理节点:

    ls /brokers/ids      # 查看已启动的代理节点
     [1]                  # 已启动的代理节点对应的brokerId

输出信息显示当前只有一个Kafka代理节点,当前代理的brokerId为1。至此,Kafka单机版安装配置介绍完毕,相关操作我们将在第5章进行详细介绍。

KafkaServer启动时需要加载一个用于KafkaServer初始化相关配置的server.properties文件,当然文件名可以任意,一个server.properties对应一个KafkaServer实例。Kafka伪分布式就是在一台机器上启动多个KafkaServer来达到多代理的效果,因此要保证broker.id及port在同一台机器的多个server.properities中唯一。

本节在上一节的Linux安装Kafka基础配置之上,将server.properties文件复制一份并命名为server-0.properties,在server-0.properties文件中修改配置如下:

broker.id=0
log.dirs=/opt/data/kafka-logs/broker-0
port=9093

由于代理默认端口是9092,server.properties没有设置端口则采用默认设置,因此在server-0.properties将port设置为9093。这个端口可以自定义,只要新端口没有被占用即可。执行以下命令,分别启动brokerId为0和1的两个KafkaServer:

kafka-server-start.sh -daemon ../config/server-0.properties      # 启动代理0
kafka-server-start.sh -daemon ../config/server.properties        # 启动代理1

再次执行jps命令查看Java进程信息,打印输出如下信息:

19453 Kafka
18036 ZooKeeperMain
18228 QuorumPeerMain
19169 Kafka
19504 Jps

从输出的进程信息可以看到有两个Kafka进程存在,即代表刚才启动的broker.id为0和1的两个代理。此时登录ZooKeeper客户端,再查看ZooKeeper的/brokers/ids目录,会看到该目录下有两个节点:

[zk: 172.117.12.61(CONNECTED) 0] ls /brokers/ids
[0, 1]

这样,一台机器上启动多个代理的伪分布式环境安装配置介绍完毕。

2.2.2节已经讲解了Kafka单机版安装配置,因此对Kafka集群环境配置时只需将单机版安装的Kafka配置进行相应修改,然后复制到另外两台机器即可。这里只需修改server.properties文件中Kafka连接ZooKeeper的配置,将Kafka连接到ZooKeeper集群,配置格式为“ZooKeeper服务器IP:ZooKeeper的客户端端口”,多个ZooKeeper机器之间以逗号分隔开。

zookeeper.connect=server-1:2181,server-2:2181,server-3:2181

进入172.117.12.61服务器/usr/local/software目录下,执行以下两条命令将本机安装的Kafka分别复制到另外两台服务器上:

scp -r kafka_2.11-0.10.1.1  root@172.117.12.62:/usr/local/software/kafka
scp -r kafka_2.11-0.10.1.1  root@172.117.12.63:/usr/local/software/kafka

复制完成后,分别登录另外两台机器,修改server.properties文件中的broker.id依次为2和3。当然可以设置任一整数,只要保证一个集群中broker.id唯一即可。同时在3台机器的server.properties文件中设置host.name为本机的IP。例如,对主机名为server-1的机器上的Kafka节点,在server.properties文件中增加host.name=172.117.12.61。本书所用版本的Kafka若不设置host.name,则会在创建主题及向主题发送消息时发生NOT_LEADER_FOR_PARTITION这样的异常。

配置完毕后,分别启动3台机器的KafkaServer,通过ZooKeeper客户端查看Kafka在ZooKeeper中的相应元数据信息,其中查看/brokers/ids节点信息如下:

[zk: 172.117.12.61(CONNECTED) 1] ls /brokers/ids
[1, 2, 3]

由/brokers/ids节点存储的元数据可知,3台机器的Kafka均已正常已启动。至此,Kafka分布式环境搭建过程介绍完毕。

在实际应用中,我们经常需要了解集群的运行情况,如查看集群中代理列表、主题列表、消费组列表、每个主题对应的分区列表等,抑或是希望通过简单的Web界面操作来创建一个主题或是在代理负载不均衡时,手动执行分区平衡操作等。为了方便对Kafka集群的监控及管理,目前已有开源的Kafka监控及管理工具,如Kafka Manager、Kafka Web Console、KafkaOffsetMonitor等,读者也可以根据自己业务需要进行定制开发。本节只简单讲解Kafka Manager的安装应用。

Kafka Manager由yahoo公司开发,该工具可以方便查看集群主题分布情况,同时支持对多个集群的管理、分区平衡以及创建主题等操作。读者可访问https://github.com/yahoo/kafka- manager进行深入了解。

(1)下载编译Kafka Manager。进入GitHut官网搜索关键词“kafka-manager”即可查询到Kafka Manager的下载地址,具体地址为https://github.com/yahoo/kafka-manager/,直接点击“Clone or download”按钮进行下载。将下载的kafka-manager-master.zip文件上传到Linux服务器。用户也可以在Linux机器上执行以下命令在线下载Kafka Manager源码:

git clone https://github.com/yahoo/kafka-manager  # 从GitHub上下载Kafka Manager源码

Kafka Manager是用Scala语言开发的,通过sbt(Simple Build Tool)构建,sbt是对Scala或Java语言进行编译的一个工具,它类似于Maven,Gradle。截止到编写本书时,Kafka Manager是基于0.9.0.1版本的Kafka开发的,鉴于Kafka 0.9与Kafka-0.10版本的实现,该版本的Kafka Manager也能作为0.10.+版本的Kafka管理及监控工具,在Kafka Manager管理界面添加集群管理配置时,Kafka Version选0.9.0.1即可。待源码下载之后,进入Kafka Manager源码目录,会有一个sbt文件,执行以下命令进行Kafka Manager源码编译。

./sbt clean dist      # 编译Kafka Manager源码

编译过程会下载相关的jar文件,因此有些耗时。等源码编译完成后,在控制台输出的编译日志的最后几行信息如下:

[info] Your package is ready in /home/morton/.sbt/0.13/staging/17dfe5a6b216985c290a/   
kafka-manager-master/target/universal/kafka-manager-1.3.2.1.zip
[info] [success] Total time: 170 s, completed 2017-1-15 14:23:45

从控制台输出的编译日志信息可以看到,在编译时会在/home/用户名/路径下创建一个.sbt目录,编译后的文件存放在该目录相应子目录里,编译日志信息中的 morton 为编译 Kafka Manager源码的机器名。在编译过程中出现:

Download failed. Obtain the jar manually and place it at /home/morton/.sbt/launchers/   
0.13.9/sbt-launch.jar

表示在编译过程下载sbt-launch.jar文件遇到问题,请读者单独下载sbt-launch.jar相应版本并上传到/home/用户名/.sbt/launchers/0.13.9/目录下,再次执行编译命令。最终会在/home/用户名/.sbt/0.13/staging相应子目录下生成kafka-manager-1.3.2.1.zip文件,该文件就是用来对Kafka进行监控和管理的工具。若读者在编译时由于个人网络环境原因无法编译,可以直接在网络上下载该文件然后复制到服务器。将编译好的kafka-manager-1.3.2.1.zip文件解压到指定位置(这里解压到/usr/local/software/kafka-manager目录下)即完成安装。

(2)修改配置。进入Kafka Manager安装路径下的conf目录,打开application.conf文件,修改以下配置。将kafka-manager.zkhosts="kafka-manager-zookeeper:2181"配置项,修改为实际的ZooKeeper连接地址,例如这里修改为:

kafka-manager.zkhosts="172.117.12.61:2181,172.117.12.62:2181,172.117.12.63:2181"

(3)启动Kafka Manager。进入bin目录下执行以下启动命令:

nohup ./kafka-manager -Dconfig.file=../conf/application.conf &   # 启动Kafka Manager

Kafka Manager默认请求端口是9000,在浏览器中输入安装Kafka Manager服务地址及9000端口访问Kafka Manager,如访问http://172.117.12.62:9000。Kafka Manager启动初始化界面如图2-3所示。

图2-3 Kafka Manager启动初始化界面

通过修改配置文件application.conf里http.port的值,或是通过命令行参数传递可以修改Kafka Manager访问端口。例如,在启动时指定端口为9001,启动命令如下:

nohup ./kafka-manager -Dhttp.port=9001 -Dconfig.file=../conf/application.conf &
# 修改Kafka Manager外部访问端口号为9001

(4)关闭Kafka Manager。Kafka Manager没有提供关闭操作的执行脚本及命令,当希望关闭Kafka Manager时,可直接通过kill命令强制杀掉Kafka Manager进程。

查看Kafka Manager进程,输入jps命令,输出以下进程信息:

767 ProdServerStart
12422 QuorumPeerMain
13348 Kafka
895 Jps

其中ProdServerStart即为Kafka Manager进程。通过kill命令关闭Kafka Manager:

kill -9 767             # 关闭Kafka Manager进程

同时,由于Kafka Manager运行时有一个类似锁的文件RUNNING_PID,位于Kafka Manager安装路径bin同目录下,为了不影响下次启动,在执行kill命令后同时删除RUNNING_PID文件,命令如下:

rm –f  RUNNING_PID      # 删除Kafka Manager运行时的PID文件

否则,在下次启动时会由于以下错误而导致Kafka Manager无法启动。错误信息如下:

This application is already running (Or delete /usr/local/software/kafka-manager/ 
RUNNING_PID file).

若想在Kafka Manager监控中能展示更多的信息,则在Kafka启动时启动JMX。至此,Kafka Manager安装讲解完毕,对于Kafka Manager的相关操作将在5.8节进行介绍。

要研究Kafka,阅读Kafka源码是必不可少的环节。因此,本节将介绍Kafka源码编译及将编译后的源码导入Eclipse的具体步骤。当然也可以将Kafka源码导入其他IDE(如Intellij Idea、STS等)中,大家选用自己惯用的IDE即可。源码导入步骤与导入Eclipse操作基本类似,本书不再做详细介绍。这里只讲解在Windows操作系统下Kafka源码的编译,在其他操作系统上对Kafka源码的编译操作基本类似,只不过添加环境变量操作有所不同,这里不做讲解,读者可以查阅相关资料进行了解。由于Kafka核心模块是用Scala语言开发,用Gradle编译和构建的,因此下面先介绍相关环境的安装配置。

由于0.10.1.1版本的Kafka需要Scala版本在2.10以上,因此这里选择scala-2.11.8版本进行安装。

(1)下载并安装。先进入Scala官方网站http://www.scala-lang.org/download/下载相应的安装包,下载图2-4所示版本的Scala。

图2-4 Scala安装包下载列表

下载完成后,直接将安装包解压到指定目录即完成安装,安装时解压到D:\software\scala- 2.11.8目录下。

(2)环境变量配置。安装完成后,配置Scala运行环境变量,在系统变量中新增Scala安装路径配置,编辑系统变量配置如图2-5所示。

然后将;%SCALA_HOME%\bin添加到用户变量path中。与JDK环境安装配置一样,直接添加至自定义的用户环境变量CLASSPATH中,如图2-6所示。

图2-5 新建SCALA_HOME变量指定Scala安装路径

图2-6 修改CLASSPATH添加Scala环境变量

(3)验证。Scala安装及环境变量配置完成后,在Windows下打开一个cmd命令行终端。输入查看Scala版本信息的命令:

scala  -version      # 查询Scala版本

若输出以下信息则表示Scala安装配置成功:

Scala code runner version 2.11.8 -- Copyright 2002-2016, LAMP/EPFL

进入Gradle官方网站https://gradle.org/releases/下载Gradle安装包。本书编写时Gradle的最新版本为gradle-3.3,这里下载的就是这个版本,读者可以根据自己需要选择不同版本进行下载。下载后将Gradle文件解压到相应目录,这里将Gradle解压到D:\software\gradle-3.3目录下,安装及环境变量配置与Scala操作一样,新增系统环境GRADLE_HOME,指定gradle安装路径,并将;%GRADLE_HOME%\bin添加到path中,这里依然是添加到CLASSPATH之中。

Gradle安装及环境变量配置完成之后,打开Windows的cmd命令窗口,输入gradle –version,若输出如图2-7所示信息,则表示Gradle安装配置成功。

图2-7 Gradle安装验证结果

先进入http://kafka.apache.org/downloads.html下载Kafka源码文件。本书编写时Kafka的最新版本为kafka-0.10.1.1,这里我们下载的是kafka-0.10.1.1-src.tgz,将下载的源码包放在F:\kafka-0.10.1.1目录下,解压后如图2-8所示。

图2-8 Kafka源码解压后的文件目录

进入kafka-0.10.1.1-src,Kafka源码包括图2-9所示的目录及文件。

图2-9 Kafka源码包括的目录及文件

Kafka源码对应目录及文件说明如表2-4所示。

表2-4 Kafka源码对应目录及文件说明

名  称

描  述

bin

包括Windows和Linux平台下Kafka相关操作的执行脚本,如启动和关闭KafkaServer、创建主题、分区管理、模拟生产者和消费者基本操作的脚本等

clients

Kafka客户端,包括KafkaProducer和KafkaConsumer,用Java语言开发

config

Kafka运行相关配置文件,如在启动代理时需要加载的server.properties文件

connect

0.9版本之后新增加的特性,提供了Kafka与其他系统整合进行数据导入、导出操作的统一接口,为Kafka能够与其他系统整合构建可水平扩展、高可靠的数据流处理平台提供了一个简单模型,用Java语言开发

core

Kafka的核心代码,包括消息协议定义、日志管理、各组件之间通信、安全协议等

docs

Kafka官方网站相关文档

examples

Kafka实例代码

streams

Kafka 0.10版本之后增加的新特性,是一个用来构建流处理程序的库,用Java语言开发

tools

Kafka提供的工具类,用于查看生产者性能、吞吐量等

tests

系统测试脚本

由于在Kafka源代码的gradle子目录中没有wrapper类库,因此在Kafka根目录下执行gradlew eclipse命令时会报图2-10所示的错误。

图2-10 Kafka源码编译出错信息

接下来安装wrapper类库。由于本地安装的Scala版本为2.11.8,在安装wrapper类库之前,先修改Kafka源码目录下的gradle.properties文件,将Scala版本设置为2.11.8。gradle.properties文件内容如图2-11所示。

图2-11 gradle.properties文件内容

然后进入Kafka源码根目录下,执行gradle wrapper命令来下载wrapper包,如图2-12所示。

图2-12 wrapper安装过程输出信息

在该命令执行过程中会下载相应的jar文件,待完成相应文件下载后,若在控制台打印输出“BUILD SUCCESSFUL”字样则表示安装wrapper类库成功。执行成功后会在Kafka源码的gradle目录下生成wrapper目录,如图2-13所示。

图2-13 wrapper安装过程创建的wrapper目录

进入wrapper目录,在该目录下已创建了一个gradle-wrapper.jar文件,如图2-14所示。

图2-14 wrapper安装过程生成的文件

最后在Kafka源码根目录执行gradlew eclipse命令,对Kafka源码进行编译。这个过程由于要下载一系列依赖包,因此有些耗时,若出现“BUILD SUCCESSFUL”字样,则表示编译完成,如图2-15所示。

图2-15 Kafka源码成功编译输出日志信息

若读者在编译时输入gradlew eclipse命令后控制台打印日志输出:

Downloading https://services.gradle.org/distributions/gradle-3.3-bin.zip  
.........................

一直卡在下载gradle-3.3-bin.zip时,可通过下载工具先下载gradle-3.3-bin.zip文件,然后复制到C:\Users\用户名.gradle\wrapper\dists\gradle-3.3-bin\37qejo6a26ua35lyn7h1u9v2n目录下,接着再次运行gradlew eclipse命令进行编译。

通过前面的步骤已完成了Kafka源码的编译,现在介绍如何将Kafka源码导入Eclipse。在Eclipse视图中选择“import”,在弹出对话框中选择“Existing Projects into Workspace”,指定Kafka源码路径,依次导入Kafka源码中的core和client工程。导入项目后,若Eclipse的编码方式不是UTF-8,会有错误提示,读者在导入Kafka源码时要确保Eclipse已设置workspase的编码方式为UTF-8,同时建议修改Scala使用的JVM版本为1.8,如图2-16所示。

Eclipse工作空间环境配置完毕后,导入Kafka的core和client工程,如图2-17所示。

图2-16 Eclipse设置工程Scala运行的JVM版本界面

图2-17 Kafka源码导入Eclipse效果

若在Eclipse中看到core工程有错误提示信息,则在core工程上右键配置“build path”,在Libraries视图下可以看到缺失如图2-18所示的两个文件,这两个文件都是core工程测试代码所依赖的文件,并不影响core工程本身的运行。这里为了简单,直接将这两个文件从Libraries中移除。

图2-18 Kafka core报错所缺失的文件

若直接运行core工程,kafka.kafka.scala会报出如图2-19所示的错误信息。

图2-19 Eclipse启动Kafka时在控制台输出的错误信息

图2-19所示的错误是由于Kafka启动时需要加载server.properties文件,用于初始化KafkaServer,因此在运行kafka.kafka.scala启动KafkaServer时,需要指定一个配置文件。KafkaServer初始化的配置这里暂不进行详细介绍,将穿插在第3章至第6章对Kafka相关知识的讲解中进行介绍。现在,在Eclipse中设置运行参数,指定server.properties文件路径,配置如图2-20所示。由于Kafka依赖ZooKeeper,因此要保证在启动KafkaServer之前先启动ZooKeeper。

图2-20 Eclipse设置Kafka启动加载配置文件界面

为了在控制台输出启动日志,需要将Kafka源码config目录下的log4j.properties文件复制到Eclipse core工程src/main/scala目录下,运行kafka.scala启动KafkaServer,Eclipse控制台输出启动日志信息如图2-21所示。

图2-21 Eclipse启动KafkaServer输出结果

图2-21所示的日志信息表明:Kafka源码已成功在Eclipse中运行起来。接下来就可以调试Kafka,深入了解Kafka运行机制了。

本章详细讲解了Kafka运行环境安装部署的步骤,包括在Windows操作系统、Linux操作系统安装部署Kafka,以及Kafka可视化管理工具的安装和Kafka源码的编译等。


相关图书

推荐系统:产品与算法解析
推荐系统:产品与算法解析
程序员的制胜技
程序员的制胜技
面向电子鼻的复合光气体传感方法
面向电子鼻的复合光气体传感方法
程序设计竞赛专题挑战教程
程序设计竞赛专题挑战教程
Serverless核心技术和大规模实践
Serverless核心技术和大规模实践
深入浅出Windows API程序设计:编程基础篇
深入浅出Windows API程序设计:编程基础篇

相关文章

相关课程