Kafka Streams实战

978-7-115-50739-6
作者: [美] 小威廉 · P. 贝杰克(William P. Bejeck Jr.)
译者: 牟大恩
编辑: 杨海玲

图书目录:

详情

本书教读者在Kafka平台上实现流式处理。在这本易于理解的书中,读者将通过实际的例子来收集、转换和聚合数据、使用多个处理器处理实时事件,甚至可以使用KSQL深入研究流式SQL。本书最后以Kafka Streams应用程序的测试和运维方面的内容(如监控和调试)结束。

图书摘要

版权信息

书名:Kafka Streams实战

ISBN:978-7-115-50739-6

本书由人民邮电出版社发行数字版。版权所有,侵权必究。

您购买的人民邮电出版社电子书仅供您个人使用,未经授权,不得以任何方式复制和传播本书内容。

我们愿意相信读者具有这样的良知和觉悟,与我们共同保护知识产权。

如果购买者有侵权行为,我们可能对该用户实施包括但不限于关闭该帐号等维权措施,并可能追究法律责任。

著    [美]小威廉 • P. 贝杰克(William P. Bejeck Jr.)

译    牟大恩

责任编辑 杨海玲

人民邮电出版社出版发行  北京市丰台区成寿寺路11号

邮编 100164  电子邮件 315@ptpress.com.cn

网址 http://www.ptpress.com.cn

读者服务热线:(010)81055410

反盗版热线:(010)81055315


Original English language edition, entitled Kafka Streams in Action: Real-time Apps and Microservices with the Kafka Streams API by William P. Bejeck Jr. published by Manning Publications Co., 209 Bruce Park Avenue, Greenwich, CT 06830. Copyright © 2018 by Manning Publications Co.

Simplified Chinese-language edition copyright © 2018 by Posts & Telecom Press. All rights reserved. 本书中文简体字版由Manning Publications Co.授权人民邮电出版社独家出版。未经出版者书面许可,不得以任何方式复制或抄袭本书内容。

版权所有,侵权必究。


Kafka Streams是Kafka提供的一个用于构建流式处理程序的Java库,它与Storm、Spark等流式处理框架不同,是一个仅依赖于Kafka的Java库,而不是一个流式处理框架。除Kafka之外,Kafka Streams不需要额外的流式处理集群,提供了轻量级、易用的流式处理API。

本书包括4部分,共9章,从基础API到复杂拓扑的高级应用,通过具体示例由浅入深地详细介绍了Kafka Streams基础知识及使用方法。本书的主要内容包含流式处理发展历程和Kafka Streams工作原理的介绍,Kafka基础知识的介绍,使用Kafka Streams实现一个具体流式处理应用程序(包括高级特性),讨论状态存储及其使用方法,讨论表和流的二元性及使用场景,介绍Kafka Streams应用程序的监控及测试方法,介绍使用Kafka Connect将现有数据源集成到Kafka Streams中,使用KSQL进行交互式查询等。

本书适合使用Kafka Streams实现流式处理应用的开发人员阅读。


当我在2015年从领英的流数据架构组离职加入Confluent的时候,与Jay和Neha两人有过一次长时间的交流。当时公司刚刚成立,一切都还是从零起步。Jay问我,接下来想要开展哪些工作,我回答说,我已经在流式存储层面,也就是Kafka Core做了两年多的时间,接下来我的兴趣是在存储上,也就是计算层面寻求一些新的挑战。大数据这个提法叫了这么多年了,可是一直以来我们都致力在数据的大规模(volume)上,比如数据系统的可延展性等;我觉得接下来大数据的趋势会向第二个“V”,也就是快速率(velocity)发展,因为越来越多的人已经不满意批处理带来的时间延迟,他们需要的是就在下一秒,从收集的数据中获得信息,产生效益。

所以,接下来我想做流式数据处理。这个想法和他们一拍即合,从那时候开始我投入到Kafka Streams的开发中来。

从写下第一行Kafka Streams的代码到今天已经快4年的时间了,在这期间我有幸目睹了流式数据处理和流事件驱动架构在硅谷的互联网行业,进而在全世界的各个商业领域中突飞猛进的发展。越来越多的人开始从请求/响应以及批处理的应用编程模式向流式处理转移,越来越多的企业开始思考实时计算如何能够给他们的产品或者服务带来信息收益,而Apache Kafka作为当今流数据平台的事实标准,正在被越来越多的人注意和使用。而Kafka Streams作为Apache Kafka项目下原生的流式处理库,也越来越多地被投入到生产环境中,并且得到了大量社区贡献者的帮助。这对我本人而言,是莫大的喜悦和欣慰。

在今年上半年,我的同事Bill Bejeck完成了这本《Kafka Streams实战》,本书是Bill通过总结自身开发并维护真实生产环境下的Kafka Streams的经验完成的,对于想要学习并掌握Kafka Streams以及流事件驱动架构的读者来说是最好的方式之一。本书的译者牟大恩对Kafka源代码了解颇深,此前已著有《Kafka入门与实践》一书,我相信一定能够准确还原Bill在书中想要带给大家的关于流式数据处理应用实践的思维模式。

祝各位读者在探索Kafka Streams的路上不断有惊喜的发现!

——王国璋(Guozhang Wang)

Confluent流数据处理系统架构师

Apache Kafka PMC,Kafka Streams作者之一


Kafka在0.10版本中引入了Kafka Streams,它是一个轻量级、简单易用的基于Kafka实现的构建流式处理应用程序的Java库。虽然它只是一个Java库,但具备了流式处理的基本功能,同时它利用Kafka的分区特性很容易实现透明的负载均衡以及水平扩展,从而达到高吞吐量。

一年前我在写《Kafka入门与实践》一书时,用了专门一章讲解Kafka Streams,由于那是一本关于Kafka的书,因此对Kafka Streams的讲解并没有面面俱到。巧合的是,本书作为一本关于Kafka Streams的书,也是用专门一章来介绍Kafka。就我个人而言,我觉得这两本书中的内容在某种程度上可以互为补充,大家可以根据自己的偏好选择适合自己的Kafka书籍。

我很荣幸有机会翻译本书。通过翻译本书,无论是Kafka Streams知识本身还是本书作者的写作编排方式,都使我收获颇多。Kafka Streams的诸多设计优点在本书中都有详细介绍,并结合具体示例对相关API进行讲解。本书通过模拟近乎真实的场景,从场景描述开始,逐步对问题进行剖析,然后利用Kafka Streams解决问题。阅读本书,读者不仅能够全面掌握Kafka Streams相关的API,而且能够轻松学会如何使用Kafka Streams解决具体问题。

在翻译本书的过程当中,我理解最深的是,国外的技术书籍不是直接给出解决问题的完整代码,而是在场景描述、问题分析、技术选型等方面给予更多的篇幅,这种方式更能够帮助读者真正深入地掌握相关技术的要领,正所谓“授人以鱼,不如授人以渔”。

在此特别感谢人民邮电出版社的杨海玲编辑及其团队,正是他们一丝不苟、认真专业的工作态度,才使本书得以圆满完成。借此机会,我还要感谢我公司信息技术部副总经理、开发中心总经理王洪涛和部门经理熊友根对我的培养,以及同事给予我的帮助。同时还要感谢我的妻子吴小华,姐姐屈海林、尚立霞,妹妹石俊豪,感谢她们在我翻译本书时对我和我儿子的照顾,正是有了她们的帮助,才使我下班回到家时可以全身心投入到翻译工作中。同时,将本书送给我的宝贝儿子牟经纬,作为宝宝周岁的生日礼物,祝他健康、茁壮成长!

虽然在翻译过程中我力争做到“信、达、雅”,但本书许多概念和术语目前尚无公认的中文翻译,加之译者水平有限,译文中难免有不妥或错误之处,恳请读者批评指正。

牟大恩

2018年10月


牟大恩,武汉大学硕士研究生毕业,曾先后在网易杭州研究院、掌门科技、优酷土豆集团担任高级开发工程师和资深开发工程师职务,目前就职于海通证券总部。有多年的Java开发及系统设计经验,专注于互联网金融及大数据应用相关领域。著有《Kafka 入门与实践》,已提交技术发明专利两项,发表论文一篇。


我相信以实时事件流和流式处理为中心的架构将在未来几年变得无处不在。像Netflix、Uber、Goldman Sachs、Bloomberg等技术先进的公司已经建立了这种大规模运行的大型事件流平台。虽然这是一个大胆的断言,但我认为流式处理和事件驱动架构的出现将会对公司如何使用数据产生与关系数据库同样大的影响。

如果你还处在请求/响应风格的应用程序以及使用关系型数据库的思维模式,那么围绕流式处理的事件思维和构建面向事件驱动的应用程序需要你改变这种思维模式,这就是本书的作用所在。

流式处理需要从命令式思维向事件思维的根本性转变——这种转变使响应式的、事件驱动的、可扩展的、灵活的、实时的应用程序成为可能。在业务中,事件思维为组织提供了实时、上下文敏感的决策和操作。在技术上,事件思维可以产生更多自主的和解耦的软件应用,从而产生伸缩自如和可扩展的系统。

在这两种情况下,最终的好处是更大的敏捷性——在业务以及促进业务的技术方面。将事件思维应用于整个组织是事件驱动架构的基础,而流式处理是实现这种转换的技术。

Kafka Streams是原生的Apache Kafka流式处理库,它用Java语言实现,用于构建事件驱动的应用程序。使用Kafka Streams的应用程序可以对数据流进行复杂转换,这些数据流能够自动容错,透明且弹性地分布在应用程序的实例上。自2016年在Apache Kafka的0.10版本中首次发布以来,许多公司已经将Kafka Streams投入生产环境,这些公司包括P站(Pinterest)、纽约时报(The New York Times)、拉博银行(Rabobank)、连我(LINE)等。

我们使用Kafka Streams和KSQL的目标是使流式处理足够简单,并使流式处理成为构建响应事件的事件驱动应用程序的自然方式,而不仅是处理大数据的一个重量级框架。在我们的模型中,主要实体不是用于数据处理的代码,而是Kafka中的数据流。

这是了解Kafka Streams以及Kafka Streams如何成为事件驱动应用程序的关键推动者的极好方式。我希望你和我一样喜欢本书!

——Neha Narkhede

Confluent联合创始人兼首席技术官

Apache Kafka联合创作者


在我作为软件开发人员期间,我有幸在一些令人兴奋的项目上使用了当前软件。起初我客户端和后端都做,但我发现我更喜欢后端开发,因此我扎根于后端开发。随着时间的推移,我开始从事分布式系统相关的工作,从Hadoop开始(那时还是在1.0版本之前)。快进到一个新项目,我有机会使用了Kafka。我最初的印象是使用Kafka工作起来非常简单,也带来很多的强大功能和灵活性。我发现越来越多的方法将Kafka集成到交付项目数据中。编写生产者和消费者的代码很简单,并且Kafka提升了系统的性能。

然后我学习Kafka Streams相关的内容,我立刻意识到:“我为什么需要另一个从Kafka读取数据的处理集群,难道只是为了回写?”当我查看API时,我找到了我所需的流式处理的一切——连接、映射值、归约以及分组。更重要的是,添加状态的方法比我在此之前使用过的任何方法都要好。

我一直热衷于用一种简单易懂的方式向别人解释概念。当我有机会写关于Kafka Streams的书时,我知道这是一项艰苦的工作,但是很值得。我希望为本书付出的辛勤工作能证明一个事实,那就是Kafka Streams是一个简单但优雅且功能强大的执行流式处理的方法。


本书由异步社区出品,社区(https://www.epubit.com/)为您提供相关资源和后续服务。

本书提供源代码下载,要获得以上配套资源,请在异步社区本书页面中点击,跳转到下载界面,按提示进行操作即可。注意:为保证购书读者的权益,该操作会给出相关提示,要求输入提取码进行验证。

作者和编辑尽最大努力来确保书中内容的准确性,但难免会存在疏漏。欢迎您将发现的问题反馈给我们,帮助我们提升图书的质量。

当您发现错误时,请登录异步社区,按书名搜索,进入本书页面,点击“提交勘误”,输入勘误信息,点击“提交”按钮即可。本书的作者和编辑会对您提交的勘误进行审核,确认并接受后,您将获赠异步社区的100积分。积分可用于在异步社区兑换优惠券、样书或奖品。

我们的联系邮箱是contact@epubit.com.cn。

如果您对本书有任何疑问或建议,请您发邮件给我们,并请在邮件标题中注明本书书名,以便我们更高效地做出反馈。

如果您有兴趣出版图书、录制教学视频,或者参与图书翻译、技术审校等工作,可以发邮件给我们;有意出版图书的作者也可以到异步社区在线提交投稿(直接访问www.epubit.com/ selfpublish/submission即可)。

如果您是学校、培训机构或企业,想批量购买本书或异步社区出版的其他图书,也可以发邮件给我们。

如果您在网上发现有针对异步社区出品图书的各种形式的盗版行为,包括对图书全部或部分内容的非授权传播,请您将怀疑有侵权行为的链接发邮件给我们。您的这一举动是对作者权益的保护,也是我们持续为您提供有价值的内容的动力之源。

异步社区”是人民邮电出版社旗下IT专业图书社区,致力于出版精品IT技术图书和相关学习产品,为作译者提供优质出版服务。异步社区创办于2015年8月,提供大量精品IT技术图书和电子书,以及高品质技术文章和视频课程。更多详情请访问异步社区官网https://www.epubit.com。

异步图书”是由异步社区编辑团队策划出版的精品IT专业图书的品牌,依托于人民邮电出版社近30年的计算机图书出版积累和专业编辑团队,相关图书在封面上印有异步图书的LOGO。异步图书的出版领域包括软件开发、大数据、AI、测试、前端、网络技术等。

异步社区

微信服务号


首先,我要感谢我的妻子Beth,感谢她在这一过程中给予我的支持。写一本书是一项耗时的任务,没有她的鼓励,这本书就不会完成。Beth,你太棒了,我很感激你能成为我的妻子。我也要感谢我的孩子们,他们在大多数周末都忍受整天坐在办公室里的爸爸,当他们问我什么时候能写完的时候,我总模糊地回答“很快”。

接下来,我要感谢Kafka Streams的核心开发者Guozhang Wang、Matthias Sax、Damian Guy和Eno Thereska。如果没有他们卓越的洞察力和辛勤的工作,就不会有Kafka Streams,我也就没机会写这个颠覆性的工具。

感谢本书的编辑,Manning出版社的Frances Lefkowitz,她的专业指导和无限的耐心让写书变得很有趣。我还要感谢John Hyaduck提供的准确的技术反馈,以及技术校对者Valentin Crettaz对代码的出色审查。此外,我还要感谢审稿人的辛勤工作和宝贵的反馈,正是他们使本书更高质量地服务于所有读者,这些审稿人是Alexander Koutmos、Bojan Djurkovic、Dylan Scott、Hamish Dickson、James Frohnhofer、Jim Manthely、Jose San Leandro、Kerry Koitzsch、László Hegedüs、Matt Belanger、Michele Adduci、Nicholas Whitehead、Ricardo Jorge Pereira Mano、Robin Coe、Sumant Tambe和Venkata Marrapu。

最后,我要感谢Kafka的所有开发人员,因为他们构建了如此高质量的软件,特别是Jay Kreps、Neha Narkhede和Jun Rao,不仅是因为他们当初开发了Kafka,也因为他们创办了Confluent公司——一个优秀而鼓舞人心的工作场所。


William P. Bejeck Jr.(本名Bill Bejeck),是Kafka的贡献者,在Confluent公司的Kafka Streams团队工作。他已从事软件开发近15年,其中有6年专注于后端开发,特别是处理大量数据,并在数据提炼团队中,使用Kafka来改善下游客户的数据流。他是Getting Started with Google Guava(Packt,2013)的作者和“编码随想”(Random Thoughts on Coding)的博主。


我写本书的目的是教大家如何开始使用Kafka Streams,更确切地说,是教大家总体了解如何进行流式处理。我写这本书的方式是以结对编程的视角,我假想当你在编码和学习API时,我就坐在你旁边。你将从构建一个简单的应用程序开始,在深入研究Kafka Streams时将添加更多的特性。你将会了解到如何对Kafka Streams应用程序进行测试和监控,最后通过开发一个高级Kafka Streams应用程序来整合这些功能。

本书适合任何想要进入流式处理的开发人员。虽然没有严格要求,但是具有分布式编程的知识对理解Kafka和Kafka Streams很有帮助。Kafka本身的知识是有用的,但不是必需的,我将会教你需要知道的内容。经验丰富的Kafka开发人员以及Kafka新手将会学习如何使用Kafka Streams开发引人注目的流式处理应用程序。熟悉序列化之类的Java中、高级开发人员将学习如何使用这些技能来构建Kafka Streams应用程序。本书源代码是用Java 8编写的,大量使用Java 8的lambda语法,因此具有lambda(即使是另一种开发语言)程序的开发经验会很有帮助。

本书有4部分,共9章。第一部分介绍了一个Kafka Streams的心智模型,从宏观上向你展示它是如何工作的。以下章节也为那些想学习或想回顾的人提供了Kafka的基础知识。

第二部分继续讨论Kafka Streams,从基础API开始,一直到更复杂的特性,第二部分各章介绍如下。

第三部分将从开发Kafka Streams应用程序转到对Kafka Streams的管理知识的讨论。

第四部分是本书的压轴部分,在这里你将深入研究使用Kafka Streams开发高级应用程序。

本书包含了很多源代码的例子,包括书中编号的代码清单所标明的代码,以及内联在普通文本中的代码。在这两种情况下,源代码都采用固定宽度字体的格式,以便与普通文本区分开。

在很多情况下,原始源代码已经被重新格式化了。我们增加了断行以及重新缩进,以适应书中可用的页面空间。在极少数情况下,甚至空间还不够,代码清单中包括续行标识()。此外,当在文本中描述代码时,源代码中的注释常常从代码清单中删除。代码清单中附带的许多代码注释,突出显示重要的概念。

最后,需要注意的是:许多代码示例并不是独立存在的,它们只是包含当前讨论的最相关部分代码的节选。你在本书附带的源代码中将会找到所有示例的完整代码。

本书的源代码是使用Gradle工具构建的一个包括所有代码的项目。你可以使用合适的命令将项目导入IntelliJ或Eclipse中。在附带的README.md文件中可以找到使用和导航源代码的完整说明。

购买本书可以免费访问一个由Manning出版社运营的私人网络论坛,可以在论坛上对本书进行评论、咨询技术问题、接受本书作者或者其他用户的帮助。要访问该论坛,请访问Manning出版社官方网站本书页面。你还可以从Manning出版社官方网站了解更多关于Manning论坛及其行为规则。

Manning的论坛承诺为我们的读者提供一个可以在读者之间,以及读者与作者之间进行有意义对话的地方,但并不承诺作者的参与程度,作者对论坛的贡献是自愿的(并没有报酬)。建议你试着问他一些有挑战性的问题,以免他对你的问题没有兴趣!只要本书在印刷中,论坛和之前所讨论的问题归档就会从出版社的网站上获得。


本书封面上的图片描述的是“18世纪一位土耳其绅士的习惯”,这幅插图来自Thomas Jefferys的A Collection of the Dresses of Different Nations, Ancient and Modern(共4卷),于1757年和1772年之间出版于伦敦。扉页上写着:这些是手工着色的铜版雕刻品,用阿拉伯胶加深了颜色。Thomas Jefferys(1719—1771)被称为“乔治三世的地理学家”。他是一位英国制图师,是当时主要的地图供应商。他为政府和其他官方机构雕刻和印刷地图,制作了各种商业地图和地图集,尤其是北美地区的。作为一名地图制作者,他在所调查和绘制的地区激起了人们对当地服饰习俗的兴趣,这些都在这本图集中得到了很好的展示。向往远方、为快乐而旅行,在18世纪后期还是相对较新的现象,类似于这套服饰集的书非常受欢迎,把旅行者和神游的旅行者介绍给其他国家的居民。Jefferys卷宗中绘画的多样性生动地说明了200多年前世界各国的独特性和个性。从那时起,着装样式已经发生了变化,各个国家和地区当时非常丰富的着装多样性也逐渐消失。现在仅依靠衣着很难把一个大陆的居民和另一个大陆的居民区分开来。或许我们已经用文化和视觉上的多样性换取了个人生活的多样化——当然是更为丰富和有趣的文化和艺术生活。

在一个很难将计算机书籍区分开的时代,Manning以两个世纪以前丰富多样的地区生活为基础,通过以Jefferys的图片作为书籍封面来庆祝计算机行业的创造性和首创精神。


在本书第一部分,我们将论述大数据时代的起源,以及它是如何从最初为了满足处理大量数据的需求,到最终发展成为流式处理——当数据到达时立即被处理。本部分还会讨论什么是Kafka Streams,并向大家展示一个没有任何代码的“心智模型” [1](mental model)是如何工作的,以便大家可以着眼于全局。我们还将简要介绍Kafka,让大家快速了解如何使用它。

[1] 心智模型(mental model)又叫心智模式。心智模型的理论是基于一个试图对某事做出合理解释的个人会发展可行的方法的假设,在有限的领域知识和有限的信息处理能力上,产生合理的解释。心智模型是对思维的高级建构,心智模型表征了主观的知识。通过不同的理解解释了心智模型的概念、特性、功用。(引自百度百科)——译者注


本章主要内容

虽然这是一本关于Kafka Streams的书,但是要研究Kafka Streams不可能不探讨Kafka,毕竟,Kafka Streams是一个运行在Kafka之上的库。

Kafka Streams设计得非常好,因此即使具有很少或者零Kafka经验的人都可以启动和运行Kafka Streams。但是,你所取得的进步和对Kafka调优的能力将是有限的。掌握Kafka的基础知识对有效使用Kafka Streams来说是必要的。

注意

 

本章面向的读者是对Kafka Streams有兴趣,但对Kafka本身具有很少或零经验的开发者。如果读者对Kafka具备很好的应用知识,那么就可以跳过本章,直接阅读第3章。

Kafka是一个很大的话题,很难通过一章进行完整论述。本章将会覆盖足以使读者很好地理解Kafka的工作原理和一些核心配置项设置的必备知识。要想更深入了解Kafka的知识,请看Dylan Scott写的Kafka in Action(Manning,2018)

如今,各组织都在研究数据。互联网公司、金融企业以及大型零售商现在比以往任何时候都更善于利用这些数据。通过利用数据,既能更好地服务于客户,又能找到更有效的经营方式(我们要对这种情况持积极态度,并且在看待客户数据时要从好的意图出发)。

让我们考虑一下在ZMart数据管理解决方案中的各种需求。

在第1章中,已介绍过大型零售公司ZMart。那时,ZMart需要一个流式处理平台来利用公司的销售数据,以便更好地提供客户服务并提升销售总额。但在那时的6个月前,ZMart期待了解它的数据情况,ZMart最初有一个定制的非常有效的解决方案,但是很快就发现该解决方案变得难以驾驭了,接下来将看到其原因。

最初,ZMart是一家小公司,零售销售数据从各分离的应用程序流入系统。这种方法起初效果还是不错的,但随着时间的推移,显然需要一种新的方法。一个部门的销售数据不再只是该部门所感兴趣的,公司的其他部门也可能感兴趣,并且不同的部门对数据的重要性和数据结构都有不同的需求。图2-1展示了ZMart原始的数据平台。

图2-1 ZMart原始数据架构简单,足够使每个信息源流入和流出信息

随着时间的推移,ZMart通过收购其他公司以及扩大其现有商店的产品而持续增长。随着应用程序的添加,应用程序之间的连接变得更加复杂,由最初的少量的应用程序之间的通信演变成了一堆名副其实的意大利面条。如图2-2所示,即使只有3个应用程序,连接的数量也很烦琐且令人困惑。可以看到,随着时间的推移,添加新的应用程序将使这种数据架构变得难以管理。

图2-2 随着时间的推移,越来越多的应用程序被添加进来,连接所有这些信息源变得非常复杂

一个解决ZMart问题的方案是创建一个接收进程来控制所有的交易数据,即建立一个交易数据中心。这个交易数据中心应该是无状态的,它以一种方式接受交易数据并存储,这种方式是任何消费应用程序可以根据自己的需要从数据中心提取信息。对哪些数据的追踪取决于消费应用程序,交易数据中心只知道需要将交易数据保存多久,以及在什么时候切分或删除这些数据。

也许你还没有猜到,我们有Kafka完美的用例。Kafka是一个具有容错能力、健壮的发布/订阅系统。一个Kafka节点被称为一个代理,多个Kafka服务器组成一个集群。Kafka将生产者写入的消息存储在Kafka的主题之中,消费者订阅Kafka主题,与Kafka进行通信以查看订阅的主题是否有可用的消息。图 2-3 展示了如何将Kafka想象为销售交易数据 中心。

现在大家已经对Kafka的概况有了大致的了解,在下面的几节中将进行仔细研究。

图2-3 使用Kafka作为销售交易中心显著简化了ZMart数据架构,现在每台服务器不需要知道其他的信息来源,它们只需要知道如何从Kafka读取数据和将数据写入Kafka

在接下来的几个小节中,我们将介绍Kafka体系架构的关键部分以及Kafka的工作原理。如果想尽早地体验运行Kafka,可以直接跳到2.6节,安装和运行Kafka。等Kafka安装之后,再回到这里来继续学习Kafka。

在前一节中,我曾说过Kafka是一个发布/订阅系统,但更精确地说法是Kafka充当了消息代理。代理是一个中介,将进行互利交换或交易但不一定相互了解的两部分汇聚在一起。图2-4展示了ZMart数据架构的演化。生产者和消费者被添加到图中以展示各单独部分如何与Kafka进行通信,它们之间不会直接进行通信。

Kafka将消息存储在主题中,并从主题检索消息。消息的生产者和消费者之间不会直接连接。此外,Kafka并不会保持有关生产者和消费者的任何状态,它仅作为一个消息交换中心。

Kafka主题底层的技术是日志,它是Kafka追加输入记录的文件。为了帮助管理进入主题的消息负载,Kafka使用分区。在第1章我们讨论了分区,大家可以回忆一下,分区的一个应用是将位于不同服务器上的数据汇集到同一台服务器上,稍后我们将详细讨论分区。

图2-4 Kafka是一个消息代理,生产者将消息发送到Kafka,这些消息被存储,并通过主题订阅的方式提供给消费者

Kafka底层的机制就是日志。大多数软件工程师都对日志很熟悉,日志用于记录应用程序正在做什么。如果在应用程序中出现性能问题或者错误,首先检查的是应用程序的日志,但这是另一种类型的日志。在Kafka(或者其他分布式系统)的上下文中,日志是“一种只能追加的,完全按照时间顺序排列的记录序列”[1]

图2-5展示了日志的样子,当记录到达时,应用程序将它们追加到日志的末尾。记录有一个隐含的时间顺序,尽管有可能不是与每条记录相关联的时间戳,因为最早的记录在左边,后达到的记录在右端。

日志是具有强大含义的简单数据抽象,如果记录按时间有序,解决冲突或确定将哪个更新应用到不同的机器就变得明确了:最新记录获胜。

Kafka中的主题是按主题名称分隔的日志,几乎可以将主题视为有标签的日志。如果日志在一个集群中有多个副本,那么当一台服务器宕机后,就能够很容易使服务器恢复正常:只需重放日志文件。从故障中恢复的能力正是分布式提交日志具有的。

图2-5 日志是追加传入记录的文件——每条新到达的记录都被立即放在接收到的最后一条记录之后,这个过程按时间顺序对记录进行排序

我们只触及了关于分布式应用程序和数据一致性的深入话题的表面,但到目前为止所讲解的知识应该能让读者对Kafka涉及的内容有了一个基本的了解。

当安装Kafka时,其中一个配置项是log.dir,该配置项用来指定Kafka存储日志数据的路径。每个主题都映射到指定日志路径下的一个子目录。子目录数与主题对应的分区数相同,目录名格式为“主题名_分区编号”(将在下一节介绍分区)。每个目录里面存放的都是用于追加传入消息的日志文件,一旦日志文件达到某个规模(磁盘上的记录总数或者记录的大小),或者消息的时间戳间的时间间隔达到了所配置的时间间隔时,日志文件就会被切分,传入的消息将会被追加到一个新的日志文件中(如图2-6所示)。

图2-6 logs目录是消息存储的根目录,/logs目录下的每个目录代表一个主题的分区,目录中的文件名以主题的名称打头,然后是下划线,后面接一个分区的编号

可以看到日志和主题是高度关联的概念,可以说一个主题是一个日志,或者说一个主题代表一个日志。通过主题名可以很好地处理经由生产者发送到Kafka的消息将被存储到哪个日志当中。既然已经讨论了日志的概念,那么我们再来讨论Kafka另一个基本概念——分区。

分区是Kafka设计的一个重要部分,它对性能来说必不可少。分区保证了同一个键的数据将会按序被发送给同一个消费者。图2-7展示了分区的工作原理。

图2-7 Kafka使用分区来实现高吞吐量,并将一个主题的消息在集群的不同服务器中传播

对主题作分区的本质是将发送到主题的数据切分到多个平行流之中,这是Kafka能够实现巨大吞吐量的关键。我们解释过每个主题就是一个分布式日志,每个分区类似于一个它自己的日志,并遵循相同的规则。Kafka将每个传入的消息追加到日志末尾,并且所有的消息都严格按时间顺序排列,每条消息都有一个分配给它的偏移量。Kafka不保证跨分区的消息有序,但是能够保证每个分区内的消息是有序的。

除了增加吞吐量,分区还有另一个目的,它允许主题的消息分散在多台机器上,这样给定主题的容量就不会局限于一台服务器上的可用磁盘空间。

现在让我们看看分区扮演的另一个关键角色:确保具有相同键的消息最终在一起。

Kafka处理键/值对格式的数据,如果键为空,那么生产者将采用轮询(round-robin)方式选择分区写入记录。图2-8展示了用非空键如何分配分区的操作。

如果键不为空,Kafka会使用以下公式(如下伪代码所示)确定将键/值对发送到哪个分区:

HashCode.(key) % number of partitions

通过使用确定性方法来选择分区,使得具有相同键的记录将会按序总是被发送到同一个分区。默认的分区器使用此方法,如果需要使用不同的策略选择分区,则可以提供自定义的分区器。

图2-8 “foo”被发送到分区0,“bar”被发送到分区1。通过键的
字节散列与分区总数取模来获得数据被分配的分区

为什么要编写自定义分区器呢?在几个可能的原因中,下面将举一个简单的例子——组合键的使用。

假设将购买数据写入Kafka,该数据的键包括两个值,即客户ID和交易日期,需要根据客户ID对值进行分组,因此对客户ID和交易日期进行散列是行不通的。在这种情况下,就需要编写一个自定义分区器,该分区器知道组合键的哪一部分决定使用哪个分区。例如,/src/main/java/ bbejeck/model/PurchaseKey.java中的组合键,如代码清单2-1所示。

代码清单2-1 组合键PurchaseKey类

public class PurchaseKey {

    private String customerId;
    private Date transactionDate;

    public PurchaseKey(String customerId, Date transactionDate) {
        this.customerId = customerId;
        this.transactionDate = transactionDate;
    }

    public String getCustomerId() {
        return customerId;
    }

    public Date getTransactionDate() {
        return transactionDate;
    }
}

当提及分区时,需要保证特定用户的所有交易信息都会被发送到同一个分区中。但是整体作为键就无法保证,因为购买行为会在多个日期发生,包括交易日期的记录对一个用户而言就会导致不同的键值,就会将交易数据随机分布到不同的分区中。若需要确保具有相同客户ID的交易信息都发送到同一个分区,唯一的方法就是在确定分区时使用客户ID作为键。

代码清单2-2所示的自定义分区器的例子就满足需求。PurchaseKeyPartitioner类(源代码见src/ main/java/bbejeck/chapter_2/partitioner/PurchaseKeyPartitioner.java)从键中提取客户ID来确定使用哪个分区。

代码清单2-2 自定义分区器PurchaseKeyPartitioner类

public class PurchaseKeyPartitioner extends DefaultPartitioner {

    @Override
    public int partition(String topic, Object key,
                        byte[] keyBytes, Object value,
                         byte[] valueBytes, Cluster cluster) {
        Object newKey = null;
        if (key != null) {  ←--- 如果键不为空,那么提取客户ID
           PurchaseKey purchaseKey = (PurchaseKey) key;
            newKey = purchaseKey.getCustomerId();
            keyBytes = ((String) newKey).getBytes();  ←--- 将键的字节赋值给新的值

        }
        return super.partition(topic, newKey, keyBytes, value,   ←--- 返回具有已被更新键的分区,并将其委托给超类
        valueBytes, cluster);
   }
}

该自定义分区器继承自DefaultPartitioner类,当然也可以直接实现Partitioner接口,但是在这个例子中,在DefaultPartitioner类中有一个已存在的逻辑。

请注意,在创建自定义分区器时,不仅局限于使用键,单独使用值或与键组合使用都是有效的。

注意

 

Kafka API提供了一个可以用来实现自定义分区器的Partitioner接口,本书不打算讲解从头开始写一个分区器,但是实现原则与代码清单2-2相同。

已经看到如何构造一个自定义分区器,接下来,将分区器与Kafka结合起来。

既然已编写了一个自定义分区器,那就需要告诉Kafka使用自定义的分区器代替默认的分区器。虽然还没有讨论生产者,但在设置Kafka生产者配置时可以指定一个不同的分区器[2],配置如下:

partitioner.class=bbejeck_2.partitioner.PurchaseKeyPartitioner

通过为每个生产者实例设置分区器的方式,就可以随意地为任何生产者指定任何分区器类。在讨论Kafka生产者时再对生产者的配置做详细介绍。

警告

 

在决定使用的键以及选择键/值对的部分作为分区依据时,一定要谨慎行事。要确保所选择的键在所有数据中具有合理的分布,否则,由于大多数数据都分布在少数几个分区上,最终导致数据倾斜。

在创建主题时决定要使用的分区数既是一门艺术也是一门科学。其中一个重要的考虑因素是流入该主题的数据量。更多的数据意味着更多的分区以获得更高的吞吐量,但与生活中的任何事物一样,也要有取舍。

增加分区数的同时也增加了TCP连接数和打开的文件句柄数。此外,消费者处理传入记录所花费的时间也会影响吞吐量。如果消费者线程有重量级处理操作,那么增加分区数可能有帮助,但是较慢的处理操作最终将会影响性能。

我们已经讨论了日志和对主题进行分区的概念,现在,花点时间结合这两个概念来阐述分布式日志。

到目前为止,我们讨论日志和对主题进行分区都是基于一台Kafka服务器或者代理,但典型的Kafka生产集群环境包括多台服务器。故意将讨论集中单个节点上,是因为考虑一个节点更容易理解概念。但在实践中,总是使用包括多台服务器的Kafka集群。

当对主题进行分区时,Kafka不会将这些分区分布在一台服务上,而是将分区分散到集群中的多台服务器上。由于Kafka是在日志中追加记录,因此Kafka通过分区将这些记录分发到多台服务器上。图2-9展示了这个过程。

让我们通过使用图2-9作为一个向导来完成一个快速实例。对于这个实例,我们假设有一个主题,并且键为空,因此生产者将通过轮询的方式分配分区。

生产者将第1条消息发送到位于Kafka代理1上的分区0中[3],第2条消息被发送到位于Kafka代理1上的分区1中,第3条消息被发送到位于Kafka代理2上的分区2中。当生产者发送第6条消息时,消息将会被发送到Kafka代理3上的分区5中,从下一条消息开始,又将重复该步骤,消息将被发送到位于Kafka代理1上的分区0中。以这种方式继续分配消息,将消息分配到Kafka集群的所有节点中。

图2-9 生产者将消息写入主题的分区中,如果消息没有关联键,那么生产者就会通过轮询方式选择一个分区,否则通过键的散列值与分区总数取模来决定分区

虽然远程存储数据听起来会有风险,因为服务器有可能会宕机,但Kafka提供了数据冗余。当数据被写入Kafka的一个代理时,数据会被复制到集群中一台或多台机器上(在后面小节会介绍副本)。

到目前为止,我们已经讨论了主题在Kafka中的作用,以及主题如何及为什么要进行分区。可以看到,分区并不都位于同一台服务器上,而是分布在整个集群的各个代理上。现在是时候来看看当服务器故障时Kafka如何提供数据可用性。

Kafka代理有领导者(leader)和追随者(follower)的概念。在Kafka中,对每一个主题分区(topic partition),会选择其中一个代理作为其他代理(追随者)的领导者。领导者的一个主要职责是分配主题分区的副本给追随者代理服务器。就像Kafka在集群中为一个主题分配分区一样,Kafka也会在集群的多台服务器中复制分区数据。在深入探讨领导者、追随者和副本是如何工作之前,先来介绍Kafka为实现这一点所使用的技术。

如果你是个Kafka菜鸟,你可能会问自己:“为什么在Kafka的书中会谈论Apache ZooKeeper?”Apache ZooKeeper是Kafka架构不可或缺的部分,正是由于ZooKeeper才使得Kafka有领导者代理,并使领导者代理做诸如跟踪主题副本的事情,ZooKeeper官网对其介绍如下:

ZooKeeper是一个集中式服务,用于维护配置信息、命名、提供分布式同步和组服务。这些类型的所有服务都是通过分布式应用程序以某种形式使用。

既然Kafka是一个分布式应用程序,那么它一开始就应该知道ZooKeeper在其架构中的作用。在这里的讨论中,我们只考虑两个或多个Kafka服务器的安装问题。

在Kafka集群中,其中一个代理会被选为控制器。在2.3.4节我们介绍了分区以及如何在集群的不同服务器之间分配分区。主题分区有一个领导者分区和一到多个追随者分区(复制的级别决定复制的程度[4]),当生成消息时,Kafka将记录发送到领导者分区对应的代理上。

Kafka使用ZooKeeper来选择代理控制器,对于其中涉及的一致性算法的探讨已超出本书所讲内容的范围,因此我们不做深入探讨,只声明ZooKeeper从集群中选择一个代理作为控制器。

如果代理控制器发生故障或者由于任何原因而不可用时,ZooKeeper从与领导者保持同步的一系列代理(已同步的副本[ISR])中选出一个新的控制器,构成该系列的代理是动态的[5],ZooKeeper只会从这个代理系列中选择一个领导者[6]

Kafka在代理之间复制记录,以确保当集群中的节点发生故障时数据可用。可以为每个主题(正如前面介绍的消息发布或消费实例中的主题)单独设置复制级别也可以为集群中的所有主题设置复制级别[7]。图2-10演示了代理之间的复制流。

Kafka复制过程非常简单,一个主题分区对应的各代理从该主题分区的领导者分区消费消息,并将消息追加到自己的日志当中。正如2.3.12节所论述的,与领导者代理保持同步的追随者代理被认为是ISR,这些ISR代理在当前领导者发生故障或者不可用时有资格被选举为领导者。[8]

图2-10 代理1和代理3是一个主题分区的领导者,同时也是另外一个分区的追随者,而代理2只是追随者,追随者代理从领导者代理复制数据

代理控制器的职责是为一个主题的所有分区建立领导者分区和追随者分区的关系,如果一个Kafka节点宕机或者没有响应(与ZooKeeper之间的心跳),那么所有已分配的分区(包括领导者和追随者)都将由代理控制器重新进行分配。图2-11演示了一个正在运行的代理控制器。[9]

图2-11展示了一个简单的故障情景。第1步,代理控制器检测到代理3不可用。第2步,代理控制器将代理3上分区的领导权重新分配给代理2。

ZooKeeper也参与了Kafka以下几个方面的操作。

图2-11 代理控制器负责将其他代理分配为某些主题/分区的领导者代理和另一些主题/分区的追随者代理, 当代理不可用时,代理控制器将已分配给不可用代理的重新分配给集群中的其他代理

现在可知Kafka为什么依赖于Apache ZooKeeper了,正是ZooKeeper使得Kafka有了一个带着追随者的领导者代理,领导者代理的关键角色是为追随者分配主题分区,以便进行复制,以及在代理成员出现故障时重新分配主题分区。

对追加日志已进行了介绍,但还没有谈到随着日志持续增长如何对其进行管理。一个集群中旋转磁盘的空间是一个有限的资源,因此对Kafka而言,随着时间的推移,删除消息是很重要的事。在谈到删除Kafka中的旧数据时,有两种方法,即传统的日志删除和日志压缩。

日志删除策略是一个两阶段的方法:首先,将日志分成多个日志段,然后将最旧的日志段删除。为了管理Kafka不断增加的日志,Kafka将日志切分成多个日志段。日志切分的时间基于消息中内置的时间戳。当一条新消息到达时,如果它的时间戳大于日志中第一个消息的时间戳加上log.roll.ms配置项配置的值时,Kafka就会切分日志。此时,日志被切分,一个新的日志段会被创建并作为一个活跃的日志段,而以前的活跃日志段仍然为消费者提供消息检索[10]

日志切分是在设置Kafka代理时进行设置的[11]。日志切分有两个可选的配置项。

随着时间的推移,日志段的数据也将不断增加,为了为传入的数据腾出空间,需要将较旧的日志段删除。为了删除日志段,可以指定日志段保留的时长。图2-12说明了日志切分的过程。

图2-12 左边是当前日志段,右上角是一个已被删除的日志段,在其下面是最近切分的仍然在使用的日志段

与日志切分一样,日志段的删除也基于消息的时间戳,而不仅是时钟时间或文件最后被修改的时间,日志段的删除基于日志中最大的时间戳。用来设置日志段删除策略的3个配置项按优先级依次列出如下,这里按优先级排列意味着排在前面的配置项会覆盖后面的配置项。

提出这些设置的前提是基于大容量主题的假设,这里大容量是指在一个给定的时间段内保证能够达到文件最大值。另一个配置项log.retention.bytes,可以指定较长的切分时间阈值,以控制I/O操作。最后,为了防止日志切分阈值设置得相对较大而出现日志量显著增加的情况,请使用配置项log.segment.bytes来控制单个日志段的大小。

对于键为空的记录以及独立的记录[12],删除日志的效果很好。但是,如果消息有键并需要预期的更新操作,那么还有一种方法更适合。

假设日志中已存储的消息都有键,并且还在不停地接收更新的消息,这意味着具有相同键的新记录将会更新先前的值。例如,股票代码可以作为消息的键,每股的价格作为定期更新的值。想象一下,使用这些信息来展示股票的价值,并出现程序崩溃或者重启,这就需要能够让每个键恢复到最新数据[13]

如果使用删除策略,那么从最后一次更新到应用程序崩溃或重启之间的日志段就可能被去除,启动时就得不到所有的记录[14]。一种较好的方式是保留给定键的最近已知值,用与更新数据库表键一样的方式对待下一条记录[15]

按键更新记录是实现压缩主题(日志)的表现形式。与基于时间和日志大小直接删除整个日志段的粗粒度方式不同,压缩是一种更加细粒度的方式,该方式是删除日志中每个键的旧数据。从一个很高的层面上来说,一个日志清理器(一个线程池)运行在后台,如果后面的日志中出现了相同的键,则日志清理器就会重新复制日志段文件并将该键对应的旧记录去除。图2-13阐明了日志压缩是如何为每个键保留最新消息的。

这种方式保证了给定键的最后一条记录在日志中。可以为每个主题指定日志保留策略,因此完全有可能某些主题使用基于时间的保留,而其他主题使用压缩。

默认情况下,日志清理功能是开启的。如果要对主题使用压缩,那么需要在创建主题时设置属性log.cleanup.policy=compact

在Kafka Streams中使用应用状态存储时就要用到压缩,不过并不需要我们自己来创建相应的日志或主题——框架会处理。然而,理解压缩的原理是很重要的,日志压缩是一个宽泛的话题,我们仅谈论至此。如果想了解压缩方面的更多信息,参见Kafka官方文档。

注意

 

当使用cleanup.policy为压缩时,你可能好奇如何从日志中去除一条记录。对于一个压缩的主题,删除操作会为给定键设置一个null值,作为一个墓碑标记。任何值为null的键都确保先前与其键相同的记录被去除,之后墓碑标记自身也会被去除。

图2-13 左边是压缩前的日志,可以看到具有不同值的重复键,这些值是用来更新给定键的。右边是压缩后的日志,保留了每个键的最新值,但日志变小了

本节的关键内容是:如果事件或消息是独立、单独的,那么就使用日志删除,如果要对事件或消息进行更新,那就使用日志压缩。

我们已经花了很多时间介绍Kafka内部是如何处理数据的,现在,让我们转移到Kafka外部,探讨如何通过生产者向Kafka发送消息,以及消费者如何从Kafka读取消息。

[1] Jay Kreps, “The Log: What Every Software Engineer Should Know About Real-time Data’s Unifying Abstraction”(日志:每个软件工程师都应该知道实时数据的统一抽象)。

[2] 这里说的不同的分区器,是指不使用默认分区器,这里指定自定义分区器来覆盖默认分区器。 ——译者注

[3] 代理1是指代理服务器对应的broker.id为1,分区0表示分区编号为0。 ——译者注

[4] 这里的级别是指分区是领导者分区还是追随者分区。——译者注

[5] 代理是动态的是指根据代理的存活情况动态地将代理从ISR集合中移除或将代理加入ISR集合中。——译者注

[6] Kafka官方文档“Replicated Logs: Quorums, ISRs, and State Machines (Oh my!)”。

[7] 复制级别也就是我们通常说的副本数。——译者注

[8] Kafka官方文档“Replication”。

[9] 本节的一些信息来自Gwen Shapira在Qurora上的回答:“What is the actual role of ZooKeeper in Kafka? What benefits will I miss out on if I don’t use ZooKeeper and Kafka together?”。(ZooKeeper在Kafka中的实际角色是什么?如果我们不将ZooKeeper和Kafka一起使用会错失哪些好处?)

[10] Kafka总是将消息追加到活跃日志段的末尾。——译者注

[11] Kafka官方文档“Broker Configs”。

[12] 独立的记录是指若消息有键时,各消息的键都不相同。——译者注

[13] Kafka官方文档“Log Compaction”。

[14] 由于采用删除策略,位于被删除日志段中的数据被删除了,因此在重启后这些数据就丢失了,所以说在启动后就得不到所有的记录。——译者注

[15] 数据库中存在该键对应的记录时就做更新,否则就在数据库中插入一条记录。——译者注

回到ZMart对集中销售交易数据中心的需求,看看如何将购买交易数据发送到Kafka。在Kafka中,生产者是用于发送消息的客户端。图2-14重述ZMart的数据结构,突出显示生产者,以强调它们在数据流中适合的位置。

尽管ZMart有很多的销售交易,但现在我们只考虑购买一个单一物品:一本10.99美元的书。当消费者完成销售交易时,交易信息将被转换为一个键/值对并通过生产者发送到Kafka。

键是客户ID,即123447777,值是一个JSON格式的值,即"{\"item\":\"book\",\ "price\":10.99}"(这里已把双引号转义了,这样JSON可以被表示为Java中的字符串)。有了这种格式的数据,就可以使用生产者将数据发送到Kafka集群。代码清单2-3所示的示例代码可以在源代码/src/main/java/bbejeck.chapter_2/producer/SimpleProducer.java类中找到。

图2-14 生产者用于向Kafka发送消息,它们并不知道哪个消费者会读取消息,也不知道消费者在什么时候会读取消息

代码清单2-3 SimpleProducer示例

Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer","org.apache.kafka.common.serialization.
➥   StringSerializer");
properties.put("value.serializer",
➥   "org.apache.kafka.common.serialization.StringSerializer");
properties.put("acks", "1");
properties.put("retries", "3");
properties.put("compression.type", "snappy");
properties.put("partitioner.class",
➥  PurchaseKeyPartitioner.class.getName());  ←--- 生产者属
性配置

PurchaseKey key = new PurchaseKey("12334568", new Date());

try(Producer<PurchaseKey, String> producer = 
➥   new KafkaProducer<>(properties)) {  ←--- 创建一个KafkaProducer
   ProducerRecord<PurchaseKey, String> record = 
➥   new ProducerRecord<>("transactions", key, "{\"item\":\"book\",
     \"price\":10.99}");  ←--- 实例化ProducerRecord

   Callback callback = (metadata, exception) -> {
             if (exception != null) {
                System.out.println("Encountered exception "
➥   + exception);   ←--- 构造一个回调
            }
       };

    Future<RecordMetadata> sendFuture =
➥   producer.send(record, callback);   ←--- 发送记录,并将返回的Future赋值给一个变量
}

Kafka生产者是线程安全的。所有消息被异步发送到Kafka——一旦生产者将记录放到内部缓冲区,就立即返回Producer.send。缓冲区批量发送记录,具体取决于配置,如果在生产者缓冲区满时尝试发送消息,则可能会有阻塞。

这里描述的Producer.send方法接受一个Callback实例,一旦领导者代理确认收到记录,生产者就会触发Callback.onComplete方法,Callback.onComplete方法中仅有一个参数为非空。在本例中,只关心在发生错误时打印输出异常堆栈信息,因此检验异常对象是否为空。一旦服务器确认收到记录,返回的Future就会产生一个RecordMetadata对象。

定义

 

在代码清单2-3中,Producer.send方法返回一个Future对象,一个Future对象代表一个异步操作的结果。更重要的是,Future可以选择惰性地检索异步结果,而不是等它们完成。更多信息请参考Java文档“Interface Future”(接口Future)。

当创建KafkaProducer实例时,传递了一个包含生产者配置信息的java.util. Properties参数。KafkaProducer的配置并不复杂,但在设置时需要考虑一些关键属性,例如,可以在配置中指定自定义的分区器。这里要介绍的属性太多了,因此我们只看一下代码清单2-3中使用的属性。

更多生产者相关的配置信息请参见Kafka官方文档。

当创建一个ProducerRecord对象时,可以选择指定分区、时间戳或者两者都指定。在代码清单2-3中实例化ProducerRecord时,使用了4个重载构造方法中的一个。其他构造方法允许设置分区和时间戳,或者只设置分区,代码如下:

ProducerRecord(String topic, Integer partition, String key, String value)
ProducerRecord(String topic, Integer partition, 
               Long timestamp, String key, 
               String value)

在2.3.5节中,我们讨论了Kafka分区的重要性。我们也讨论了DefaultPartitioner的工作原理以及如何提供一个自定义分区器。为什么要显式设置分区?可能有多种业务上的原因,下面是其中一个例子。

假设传入的记录都有键,但是记录被分发到哪个分区并不重要,因为消费者有逻辑来处理该键包含的任何数据。此外,键的分布可能不均匀,但你希望确保所有的分区接收到的数据量大致相同,代码清单2-4给出的是一个粗略的实现方案。

代码清单2-4 手动设置分区

AtomicInteger partitionIndex = new AtomicInteger(0);    ←--- 创建一个AtomicInteger实例变量
int currentPartition = Math.abs(partitionIndex.getAndIncrement())%
➥    numberPartitions;   ←--- 获取当前分区并将其作为参数
ProducerRecord<String, String> record = 
➥    new ProducerRecord<>("topic", currentPartition, "key", "value");

上面的代码调用Math.abs,因此对于Math.abs求得的整型值,如果该值超出Integer. MAX_VALUE,也不必关注。

定义

 

AtomicInteger属于java.util.concurrent.atomic包,该包包含支持对单个变量进行无锁、线程安全的操作的类。若需要更多信息,请参考Java官方文档关于java.util.concurrent.atomic包的介绍。

Kafka从0.10版本开始在记录中增加了时间戳,在创建ProducerRecord对象时调用以下重载的构造函数设置了时间戳。

ProducerRecord(String topic, Integer partition,
➥   Long timestamp, K key, V value)

如果没有设置时间戳,那么生产者在将记录发送到Kafka代理之前将会使用系统当前的时钟时间。时间戳也受代理级别的配置项log.message.timestamp.type的影响,该配置项可以被设置为CreateTime(默认类型)和LogAppendTime中的一种。与许多其他代理级别的配置一样,代理级别的配置将作为所有主题的默认值,但是在创建主题时可以为每个主题指定不同的值[16]。如果时间戳类型设置为LogAppendTime,并且在创建主题时没有覆盖代理级别对时间戳类型的配置,那么当将记录追加到日志时,代理将使用当前的时间覆盖时间戳,否则,使用来自ProducerRecord的时间戳。

两种时间戳类型该如何选择呢?LogAppendTime被认为是“处理时间”,而CreateTime被认为是“事件时间”,选择哪一种类型取决于具体的业务需求。这就要确定你是否需要知道Kafka什么时候处理记录,或者真实的事件发生在什么时候。在后面的章节,将会看到时间戳对于控制Kafka Streams中的数据流所起的重要作用。

[16] 主题级别的配置将会覆盖代理级别的配置。 ——译者注

我们已经知道了生产者的工作原理,现在是时候来看看Kafka的消费者。假设你正在构建一个原型应用程序用于展示ZMart最近的销售统计数据。对于这个示例,将消费先前生产者示例中发送的消息。因为这个原型处于早期阶段,所以此时要做的就是消费消息并将消息打印到控制台。

注意

 

因为本书所探讨的Kafka Streams的版本要求Kafka的版本为0.10.2或者更高版本,所以我们仅讨论新的消费者,它是在Kafka 0.9版本中发布的。

KafkaConsumer是用来从Kafka消费消息的客户端。KafkaConsumer类很容易使用,但是有一些操作事项需要重视。图2-15展示了ZMart的体系架构,突出了消费者在数据流中所起的作用。

图2-15 这些是从Kafka读取消息的消费者,正如生产者不知道消费者一样,消费者从Kafka读取消息时也不知道是谁生产的消息

KafkaProducer基本上是无状态的,然而KafkaConsumer需要周期性地提交从Kafka消费的消息的偏移量来管理一些状态。偏移量唯一标识消息,并表示消息在日志中的起始位置。消费者需要周期性地提交它们已接收到的消息的偏移量。

对一个消费者来说,提交一个偏移量有两个含义。

如果创建了一个新消费者实例或者发生了某些故障,并且最后提交的偏移量不可用,那么消费者从何处开始消费取决于具体的配置。

从图2-16可以看到选择不同的auto.offset.reset设置的影响。如果设置为earliest,那么收到消息的起始偏移量是0;如果设置为latest,那么取得消息的起始偏移量为11。

图2-16 将auto.offset.reset设置为earliestlatest的图形对比表示。设置为earliest,消费者将会得到所有未被删除的消息;设置为latest意味着消费者需要等待下一条可用消息到达

接下来,我们需要讨论偏移量提交的选项,你可以自动提交也可以手动提交。

默认情况下,消费者使用的是自动提交偏移量,通过enable.auto.commit属性进行设置。还有一个与enable.auto.commit配合使用的配置项auto.commit.interval.ms,用来指定消费者提交偏移量的频率(默认值是5秒)。调整这个频率值要谨慎,如果设置太小,将会增加网络流量;如果设置太大,可能会导致在发生故障或重启时消费者收到大量重复数据。

手动提交偏移量有两种方式——同步和异步。同步提交方式的代码如下:

consumer.commitSync()
consumer.commitSync(Map<TopicPartition, OffsetAndMetadata>)

无参的commitSync()方法在上一次检索(轮询)成功返回所有的偏移量之前会一直阻塞,此方法适用于所有订阅的主题和分区。另一个方法需要一个Map<TopicPartitonOffsetAndMetadata>类型的参数,它只会提交Map中指定的偏移量、分区和主题。

异步提交也有与同步提交类似的方法,consumer.commitAsync()方法是完全异步的,提交后立即返回。其中一个重载方法是无参的,两个consumer.commitAsync方法都可选择地提供一个OffsetCommitCallback回调对象,它在提交成功或者失败时被调用。通过提供回调实例可以实现异步处理或者异常处理。使用手工提交的好处是可以直接控制记录何时被视为已处理。

创建一个消费者与创建一个生产者类似,提供一个以java.util.Properties形式的Java对象的配置,然后返回一个KafkaConsumer实例。该实例订阅由主题名称列表提供或者由正则表达式指定的主题。通常,会在一个循环中以指定毫秒级的间隔周期性地运行消费者轮询。

轮询的结果是一个ConsumerRecords<KV>对象,ConsumerRecords实现了Iterable接口,每次调用next()方法返回一个包括消息的元数据以及实际的键和值的ConsumerRecord对象。

在处理完上一次轮询调用返回的所有ConsumerRecord对象之后,又会返回到循环的顶部,再次轮询指定的同期。实际上,期望消费者以这种轮询方式无限期地运行,除非发生错误或者应用程序需要关闭和重启(这就是提交的偏移量要发挥作用的地方——在重启时,消费者从停止的地方继续消费)。

通常需要多个消费者实例——主题的每个分区都有一个消费者实例。可以让一个消费者从多个分区中读取数据,但是通常的做法是使用一个线程数与分区数相等的线程池,每个线程运行一个消费者,每个消费者被分配到一个分区。

这种每个分区一个消费者的模式最大限度地提高了吞吐量,但如果将消费者分散在多个应用程序或者服务器上时,那么所有实例的线程总数不要超过主题的分区总数。任何超过分区总数的线程都将是空闲的。如果一个消费者发生故障,领导者代理将会把分配给该故障消费者的分区重新分配给另一个活跃的消费者。

注意

 

这个例子展示了一个消费者订阅一个主题的情况,但是这种情况仅是为了阐述的目的。大家可以让一个消费者订阅任意数量的主题。

领导者代理将主题的分区分配给具有相同group.id的所有可用的消费者,group.id是一个配置项,用来标示消费者属于哪一个消费者组——这样一来,消费者就不需要位于同一台机器上。事实上,最好让消费者分散在几台机器上。这样,当一台服务器发生故障时,领导者代理可以将主题分区重新分配给一台正常运行的机器上的消费者。

在2.5.5节中描述的向消费者添加和移除主题分区(topic-partition)分配的过程被称为再平衡。分配给消费者的主题分区不是静态的,而是动态变化的。当添加一些具有相同消费者组ID的消费者时,将会从活跃的消费者中获取一些当前的主题分区,并将它们分配给新的消费者。这个重新分配的过程持续进行,直到将每个分区都分配给一个正在读取数据的消费者。

在达到这个平衡点之后[17],任何额外的消费者都将处于空闲状态。当消费者不管由于什么原因离开消费者组时,分配给它们的主题分区被重新分配给其他消费者。

在2.5.5节中,我们描述了使用线程池及多个消费者(在同一个消费者组)订阅同一个主题。尽管Kafka会平衡所有消费者的主题分区负载,但是主题和分区的分配并不是确定性的,你并不知道每个消费者将收到哪个主题分区对。

KafkaConsumer有一个允许订阅特定主题和分区的方法,代码如下:

TopicPartition fooTopicPartition_0 = new TopicPartition("foo", 0);
TopicPartition barTopicPartition_0 = new TopicPartition("bar", 0);

consumer.assign(Arrays.asList(fooTopicPartition_0, barTopicPartition_0));

在手动进行主题分区分配时,需要权衡以下两点。

代码清单2-5给出的是ZMart原型消费者的代码,该消费者消费交易数据并打印到控制台。完整代码可以在源代码src/main/java/bbejeck.chapter_2/consumer/ThreadedConsumerExample.java类中找到。

代码清单2-5  ThreadedConsumerExample示例

public void startConsuming() {
        executorService = Executors.newFixedThreadPool(numberPartitions);
        Properties properties = getConsumerProps();
        for (int i = 0; i < numberPartitions; i++) {
           Runnable consumerThread = getConsumerThread(properties);   ←--- 创建一个消费者线程
           executorService.submit(consumerThread);
        }
    }

    private Runnable getConsumerThread(Properties properties) {
        return () -> {
            Consumer<String, String> consumer = null;
            try {
                consumer = new KafkaConsumer<>(properties);
               consumer.subscribe(Collections.singletonList(  ←--- 订阅主题
➥   "test-topic"));
                while (!doneConsuming) {
                   ConsumerRecords<String, String> records =   ←--- 5秒钟轮询一次
➥   consumer.poll(5000);
                    for (ConsumerRecord<String, String> record : records) {
                        String message = String.format("Consumed: key = 
➥   %s value = %s with offset = %d partition = %d",
                                record.key(), record.value(),
                                record.offset(), record.partition());
                        System.out.println(message);   ←--- 打印格式化的消息
                    }

                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                if (consumer != null) {
                   consumer.close();  ←--- 关闭消费者,否则会导致资源泄露
                }
            }
        };
    }

这个例子省略了类的其他代码——它不会独立存在。可以在本章的源代码中找到完整的示例。

[17] 这个平衡点是指同一个消费组下的消费者已将主题分区分配完毕。——译者注

当我写本书时,Kafka的最新版本是1.0.0。因为Kafka是一个Scala项目,所以每次发布有两个版本:一个用于Scala 2.11;另一个用于Scala 2.12。本书使用Scala 2.12版本的Kafka。尽管大家可以下载发行版,本书源代码中也包括Kafka的二进制发行版,它将与本书阐述和描述的Kafka Streams一起工作。要安装Kafka,从本书repo管理的源代码中提取.tgz文件,放到自己机器上的某个目录中。

注意

 

Kafka的二进制版本包括Apache ZooKeeper,因此不需要额外的安装工作。

如果接受Kafka的默认配置,那么本地运行Kafka需要配置的地方就很少。默认情况下,Kafka使用9092端口,ZooKeeper使用2181端口。假设本地没有应用程序使用这些端口,那么一切就绪了。

Kafka将日志写入/tmp/kafka-logs目录下,ZooKeeper使用/tmp/zookeeper目录存储日志。根据自身服务器情况,可能需要更改这些目录的权限或所有权,抑或是需要修改写日志的位置。

为了修改Kafka日志目录,cd命令进入Kafka安装路径的config目录,打开server. properties文件,找到log.dirs配置项,修改该配置项的值为任何你想使用的路径。在同一个目录下,打开zookeeper.properties文件,可以修改dataDir配置项的值。

稍后我们将会在本书中详细介绍Kafka的配置,但现在所需要做的配置仅此而已。需要注意的是,这些说的“日志”是Kafka和ZooKeeper的真实数据,并不是用于跟踪应用行为的应用层面的日志。应用日志位于Kafka安装目录的logs目录下。

Kafka启动很简单,由于ZooKeeper对于Kafka集群正确运行(ZooKeeper决定领导者代理、保存主题信息、对集群中各成员执行健康检查等)是必不可少的,因此在启动Kafka之前需要先启动ZooKeeper。

注意

 

从现在开始,所有对目录的引用均假设当前工作在Kafka安装目录下。如果使用的是Windows机器,目录是Kafka安装目录下的/bin/windows。

1.运行ZooKeeper

要启动ZooKeeper,打开命令提示符,输入以下命令:

bin/zookeeper-server-start.sh  config/zookeeper.properties

该命令执行后,在屏幕上会看到很多信息,但结尾会看到与图2-17所示类似的信息。

图2-17 当ZooKeeper启动时,在控制台可以看到的输出信息

2.启动Kafka

打开另一个命令提示符,输入以下命令,启动Kafka:

bin/Kafka-server-start.sh  config/server.properties

同样,会在屏幕上看到滚动的文本。当Kafka完全启动时,会看到与图2-18所示类似的信息。

图2-18 Kafka启动时的输出信息

提示

 

ZooKeeper对Kafka运行必不可少,因此在关闭时要调换顺序:先关闭Kafka,再关闭ZooKeeper。要关闭Kafka,可以在Kafka运行终端按下Ctrl+C,或在另一个终端执行kafka-server-stop.sh脚本。除了关闭脚本是zookeeper-server-stop.sh,关闭ZooKeeper的操作与关闭Kafka的操作相同。

既然Kafka已启动并开始运行了,现在是时候使用Kafka来发送消息和接收消息了。但是,在发送消息前,需要先为生产者定义一个发送消息的主题。

1.第一个主题

在Kafka中创建一个主题很简单,仅需要运行一个带有一些配置参数的脚本。配置很简单,但是这些配置的设置有广泛的性能影响。

默认情况下,Kafka被配置为自动创建主题,这意味着如果尝试向一个不存在的主题发送或读取消息,那么Kafka代理就会创建一个主题(使用server.properties文件中的默认配置)。即使在开发中,依靠代理创建主题也不是一个好的做法,因为第一次尝试生产或消费会失败,这是由于需要时间来传播关于主题存在的元数据信息。需要确保总是主动地创建主题。

2.创建一个主题

要创建主题,需要运行kafka-topics.sh脚本。打开一个终端窗口,运行以下命令:

bin/kafka-topics.sh --create --topic first-topic --replication-factor 1 
➥   --partitions 1 --zookeeper localhost:2181

当脚本执行后,在终端控制台应该会看到类似如图2-19所示的信息。

图2-19 这是创建主题的结果,事先创建主题很重要,可以提供特定主题的配置。否则,自动创建主题将使用默认配置或者server.properties文件中的配置

前面命令中的大多数配置标记的含义都显而易见,但还是让我们快速了解一下其中的两个配置。

3.发送一条消息

在Kafka中发送消息通常需要编写一个生产者客户端,但Kafka也自带了一个名为kafka- console-producer的方便脚本,允许从终端窗口发送消息。在这个例子中我们将使用控制台生产者,但是在2.4.1节中,我们已经介绍了如何使用KafkaProducer

运行以下命令(图2-20中展示的也是)发送第一条消息:

# 假设在bin目录下运行该命令
./kafka-console-producer.sh --topic first-topic --broker-list localhost:9092

配置控制台生产者有几个选项,但这里我们仅使用必需的配置:消息送达的主题以及连接到Kafka的一个Kafka代理列表(对于本例,只是本地一台机器)。

启动控制台生产者是一个“阻塞脚本”,因此在执行前面的命令之后,输入一些文本并按回车键。可以发送你想要发送的任何数量的消息。但本例为了演示,可以输入一条消息“the quick brown fox jumped over the lazy dog.”,并按回车键,然后按Ctrl+C让生产者退出。

图2-20 控制台生产者是用来快速测试配置和确保端到端功能的一个很好工具

4.读取一条消息

Kafka也提供了一个控制台消费者用来从命令行读取消息。控制台消费者类似于控制台生产者:一旦启动,将持续从主题中读取消息直到脚本被终止(通过Ctrl+C)。

运行以下命令,启动控制台消费者:

bin/kafka-console-consumer.sh --topic first-topic 
➥   --bootstrap-server localhost:9092 --from-beginning

在启动控制台消费者之后,在终端控制台可以看到与图2-21所示类似的信息。

图2-21 控制台消费者是一个方便的工具,可以快速地感知数据
是否正在流动以及消息是否包含预期的信息

--from-beginning参数指定将会收到来自那个主题的任何未被删除的消息。控制台消费者还没有提交偏移量,因此若没有设置--from-beginning,那么只会收到控制台消费者启动之后所发送的消息。

我们已完成了Kafka的旋风之旅,并生产和消费了第一条消息。如果你还没有阅读本章第一部分,现在是时候回到本章起始处去学习Kafka工作原理的细节。

[18] 我们并不能给出一个确切的分区数,这要根据实际应用场景。 ——译者注

下一章,我们将以零售业中的一个具体的例子开始讨论Kafka Streams。尽管Kafka Streams将处理所有生产者和消费者实例的创建,但你能够看到我们在这里介绍的概念所发挥的作用。


本部分的内容基于前面内容的基础之上,在开发第一个Kafka Streams应用程序时将Kafka Streams的心智模型转化为实践应用。一旦付诸实践,我们将介绍Kafka Streams的一些重要API。 首先将会介绍如何将状态应用到流式应用程序中,以及如何使用状态来执行连接操作,就像运行SQL查询时所执行的连接一样。然后将对Kafka Streams一个新的抽象进行介绍——KTable API。本部分从高级DSL开始讨论,但是我们最终将通过讨论低级处理器API以及如何使用它来让Kafka Streams做任何你需要其做的事情来总结。


本章主要内容

在第1章中,我们已经了解了Kafka Streams库,学会了构造一个处理节点构成的拓扑或在数据流入Kafka时将它们转换为一张图。在本章,将会学习如何通过Kafka Streams API来构造这个数据处理的拓扑。

Kafka Streams API是用于构建Kafka Streams应用程序的接口。我们会学到如何组装Kafka Streams应用程序,但更重要的是,我们会更深入地理解各组件如何协同工作以及如何使用Kafka Streams API达到流式处理的目的。

Kafka Streams DSL是一种用于快速构建Kafka Streams应用程序的高级API。高级API设计得比较好,包括能够处理大多数流式处理需求的开箱即用的方法,这样就可以毫不费力地创建一个复杂的流式处理程序。高级API的核心是KStream对象,该对象代表流键/值对记录。

Kafka Streams DSL的大部分方法都返回一个KStream对象的引用,允许一种连贯接口(fluent interface[1])的编程风格。此外,很大一部分KStream方法接受由单方法接口组成的类型,允许使用Java 8的lambda表达式。考虑到以上这些因素,可以想象构建一个Kafka Streams程序非常简单而容易。

早在2005年,Martin Fowler和Eric Evans就提出了连贯接口的概念——接口的方法调用返回值与起初的调用方法是同一个实例。这种方式在构造具有多个参数的对象如Person. builder().firstName("Beth").withLastName ("Smith").withOccupation ("CEO")时非常有用。在Kafka Streams中有一个小但很重要的区别:返回的KStream对象是一个新实例,而不是起初调用方法的实例。

还有一个低级API,即处理器API,它没有Kafka Streams DSL那么简明,但是允许更多的控制,第6章中将介绍处理器API。介绍完这些之后,让我们深入Kafka Streams必备的Hello World程序中[2]

[1] “fluent interface”也有人将这个词翻译为流畅界面。——译者注

[2] 我们学习一门语言或一种框架总是从简单的Hello World着手。——译者注

对于第一个Kafka Streams例子,我们将偏离第1章中概述的问题,直接举一个更简单的用例,这样可以很快地了解Kafka Streams是如何工作的。在稍后3.2.1节我们以一个更加实际和具体的示例回到第1章提出的问题。

第一个Kafka Streams程序将是一个玩具应用程序,它接收传入的消息并将它们转换成大写字符,有效地对读取消息的任何人大喊大叫,我们称之为“Yelling App”。

在深入研究代码之前,让我们先看看为这个应用程序组装的处理拓扑。遵循与第1章相同的模式,构造一个处理图拓扑,图中的每个节点都具有一个特定的功能。主要的区别在于这个图更简单,如图3-1所示。

图3-1 “Yelling App”的图(拓扑)

正如在图3-1中看到的,我们正在构建一个简单的处理图——它非常简单,以至于与典型的树状图结构相比,它更像一个节点链表。但是该图已提供了关于代码中预期内容的足够线索。有一个源节点,一个将传入文本转化为大写字符的处理器节点,以及一个将结果写入Kafka主题的接收器处理器(sink processor)。

这是一个简单的例子,但是这里显示的代码却代表了将在其他Kafka Streams程序中看到的内容。在大多数例子中将会看到类似下面这样的结构。

(1)定义配置项。

(2)创建自定义或预定义的Serde实例。

(3)创建处理器拓扑。

(4)创建和启动KStream

记住这一点:当我们接触更高级的例子时,主要区别在于处理器拓扑的复杂性。现在是时候构建第一个Kafka Streams应用程序了。

创建任何Kafka Streams应用程序的第一步是创建一个源节点。源节点负责从一个主题中消费记录,这些记录将流经应用程序。图3-2突出显示了图中的源节点。

图3-2 创建“Yelling App”的源节点

代码清单3-1中的代码创建了图3-2中的源节点或父节点。

代码清单3-1  定义流的源

KStream<String, String> simpleFirstStream = builder.stream("src-topic",
➥   Consumed.with(stringSerde, stringSerde));

代码清单3-1中的simpleFirstStream实例被设置为从写入主题src-topic的消息中消费消息。除了指定主题名,还提供了Serde类型对象(通过Consumed实例指定)用于将Kafka中的记录进行反序列化。每当在Kafka Streams中创建一个源节点时,都将使用Consumed类来指定任何可选参数。

现在,应用程序已经有了一个源节点,但是还需要增加一个处理节点来使用数据,如图3-3所示。用于增加处理器(源节点的子节点)的代码如代码清单3-2所示,通过这行代码,将会创建另一个KStream实例,它是父节点的子节点。

图3-3  为“Yelling App”添加大写字符处理器

代码清单3-2 将传入文本映射为大写字符

KStream<String, String> upperCasedStream =
➥   simpleFirstStream.mapValues(String::toUpperCase);

调用KStream.mapValues函数将会创建一个新的处理节点。该节点的输入是通过调用mapValues函数得到的结果。

特别需要记住的是,不应该修改ValueMapper提供给mapValues函数的原始值。upperCasedStream实例接收由simpleFirstStream实例调用mapValues方法将原始值进行转换后的副本。对于本例,就是大写字符的文本。

mapValues()方法接受ValueMapper<V, V1>接口的实例,该接口只定义了一个ValueMapper.apply方法,非常合适使用Java 8的lambda表达式。如本例的String:: toUpperCase,它是一个方法引用,是Java 8更短形式的lambda表达式。

注意

 

许多Java 8的教程都有关于lambda表达式和方法引用的介绍,一个不错的参考资料是Oracle的Java文档“Lambda Expressions”(Lambda表达式)和“Method References”(方法引用)。

你可能使用了s→s.toUpperCase()形式,但是由于toUpperCase()方法是String类的实例方法,因此可以使用方法引用。

在本书中,大家将会一而再再而三地看到对于流式处理器的API使用的是lambda表达式而不是具体实现。因为大多数方法需要的类型都是单个方法的接口,所以可以轻松地使用Java 8的lambda表达式。

至此,Kafka Streams应用程序正在从Kafka消费记录并将这些记录转换为大写字符。最后一步是添加一个将结果写入Kafka主题的接收器处理器。图3-4展示了构建拓扑所处的阶段。

图3-4 增加一个处理器,用于写入“Yelling App”的结果

代码清单3-3所示的一行代码用于在图中增加最后一个处理器。

代码清单3-3 创建一个接收器节点

upperCasedStream.to("out-topic", Produced.with(stringSerde, stringSerde));

KStream.to方法在拓扑中创建一个接收器处理节点,接收器处理器将记录写回到Kafka。本例的接收器节点从upperCasedStream处理器获取记录,并将这些记录写入一个名为out-topic的主题中。同样,需要提供Serde实例,这次是将写入Kafka主题的记录进行序列化。但在这种情况下,可以使用Produced类型的实例,该实例提供用于在Kafka Streams中创建一个接收器节点的可选参数。

注意

 

不必总要为ConsumedProduced对象提供Serde对象,如果没有提供,那么应用程序就会使用配置中列出的序列化器/反序列化器。此外,通过ConsumedProduced类,可以为键或值中的任何一个指定一个Serde

前面的示例使用3行代码来构建拓扑,代码如下:

KStream<String,String> simpleFirstStream = 
➥   builder.stream("src-topic", Consumed.with(stringSerde, stringSerde));
KStream<String, String> upperCasedStream =
➥   simpleFirstStream.mapValues(String::toUpperCase);
    upperCasedStream.to("out-topic", Produced.with(stringSerde, stringSerde));

每一步在代码中对应单独一行,用来说明拓扑构建过程的不同阶段。但是,KStream API中的所有方法不会创建终端节点(方法返回类型为void),而是返回一个新的KStream实例,这就允许使用前面提到的连贯接口的编程风格。为了说明这个观点,这里有另一种构建“Yelling App”拓扑结构的方式,代码如下:

builder.stream("src-topic", Consumed.with(stringSerde, stringSerde))
➥   .mapValues(String::toUpperCase)
➥   .to("out-topic", Produced.with(stringSerde, stringSerde));

这种方式将代码从3行缩短到1行,又不失清晰度和目的。从现在开始,所有的示例都将使用连贯接口风格编写,除非这样做会影响程序的清晰度。

现在已经构建了第一个Kafka Streams拓扑,但是我们忽略了一个重要步骤——配置和Serde的创建,现在来介绍下这两部分内容。

尽管Kafka Streams是高度可配置的,只需根据特定需求调整几个参数配置,但第一个示例我们仅使用两个配置项APPLICATION_ID_CONFIGBOOTSTRAP_SERVERS_CONFIG,配置如下:

props.put(StreamsConfig.APPLICATION_ID_CONFIG, "yelling_app_id");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

这两个设置是必需的,因为没有提供默认值。试图启动一个没有定义这两个属性的Kafka Streams程序,将会抛出ConfigException

属性StreamsConfig.APPLICATION_ID_CONFIG用于标识Kafka Streams应用程序,在一个集群中该配置的值必须唯一。如果没有设置任何值,那么会以客户端ID作为前缀,后接组ID作为默认值。客户端ID是用户自定义的值,用于唯一标识连接到Kafka的客户端。组ID用来管理从同一个主题读取消息的一组消费者成员,确保组中的所有消费者都能够有效地读取订阅的主题。

属性StreamsConfig.BOOTSTRAP_SERVERS_CONFIG可以是单个的主机名:端口对,也可以是多个以逗号分隔的主机名:端口对。设置的这个值将Kafka Streams应用程序指向其在Kafka集群的位置。当我们在本书中探讨更多的例子时,我们将讨论更多的配置项。

在Kafka Streams中,Serdes类为创建Serde实例提供了便捷的方法,如下:

Serde<String> stringSerde = Serdes.String();

这一行代码使用Serdes类来创建序列化/反序列化所需的Serde实例。这里,创建一个变量引用Serde以便在拓扑中重复使用。Serdes类提供以下类型的默认实现:

Serde接口的实现非常有用,因为它们包含了序列化器和反序列化器,使得不必每次需要在KStream方法中提供一个Serde时指定4个参数(键序列化器、值序列化器、键反序列器和值反序列化器)。在接下来的示例中,将创建一个Serde的实现来处理类型更加复杂的序列化/反序列化。

让我们看一下刚刚组合起来的整个程序,如代码清单3-4所示。这段代码可以在本书源代码中找到,位于src/main/java/bbejeck/chapter_3/KafkaStreamsYellingApp.java。

代码清单3-4 Hello World: the Yelling App

public class KafkaStreamsYellingApp {

    public static void main(String[] args) {


        Properties props = new Properties();

        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "yelling_app_id");  ←--- 用于配置Kafka Streams程序的属性
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

        StreamsConfig streamingConfig = new StreamsConfig(props);   ←--- 创建用于构建处理器拓扑的StreamsBuilder实例

        Serde stringSerde = Serdes.String();  ←--- 使用给定属性创建
StreamsConfig对象

        StreamsBuilder builder = new StreamsBuilder();  ←--- 创建用于键和值进行序列化/反序列化的Serdes对象
![..\新建文件夹\86b.tif{5%}](/api/storage/getbykey/original?key=19043ba2a515b7128b64)
        KStream simpleFirstStream = builder.stream("src-topic",
➥   Consumed.with(stringSerde, stringSerde));   ←--- 使用数据源对应的主题创建实际流(图中的父节点)

        KStream upperCasedStream =
➥   simpleFirstStream.mapValues(String::toUpperCase);   ←--- 使用Java 8方法处理的处理器(图中的第一个子节点)

       upperCasedStream.to( "out-topic", 
➥   Produced.with(stringSerde, StringSerde));   ←--- 将转换后的输出写入另一个主题当中(图中的接收器节点)

        KafkaStreams kafkaStreams = new KafkaStreams(builder.build(),streamsConfig);

       kafkaStreams.start();  ←--- 启动Kafka Streams线程
        Thread.sleep(35000);[3]
        LOG.info("Shutting down the Yelling APP now");
        kafkaStreams.close();

    }
}

现在已构建了第一个Kafka Streams应用程序,让我们快速回顾一下涉及的步骤,因为这是在大多数Kafka Streams应用程序中可以看到的一般模式,步骤如下。

(1)创建一个StreamsConfig实例。

(2)创建一个Serde对象。

(3)构造一个处理拓扑。

(4)启动Kafka Streams程序。

除了Kafka Streams应用程序的一般结构,这里还有一个要点是尽可能地使用lambda表达式,让程序更简明。

现在我们转到一个更复杂的示例,它将允许我们探索更多流式处理器的API。这是一个新示例,但场景大家已经熟悉了:ZMart数据处理的目标。

[3] 在实现时需要对该方法做异常处理,Thread.sleep方法会抛出InterruptedException。——译者注

在第1章中,我们讨论过ZMart对处理客户数据的新需求,旨在帮助ZMart更有效地开展业务。我们演示了如何构建一个处理器拓扑,这里处理器会对从ZMart商店交易流入的购买记录进行处理。图3-5再一次展示了完整的图。

让我们简要回顾一下流式处理程序的需求,这也可以很好地描述这个程序将要做什么。

(1)所有记录需要保护信用卡号码,本示例是将信用卡号前12位数字屏蔽。

图3-5 ZMart Kafka Streams程序的拓扑结构

(2)需要提取购买的物品以及客户的邮编以确定购买模式,这些数据将被写入Kafka的一个主题中。

(3)需要获取客户在ZMart的会员号和所花费的金额,并将这些信息写入一个主题中。消费者从该主题消费数据进行处理以确定客户的奖励。

(4)需要将完整的交易数据写入主题中,一个存储引擎从该主题消费消息进行特定分析。

正如“Yelling App”程序一样,在构建应用程序时会把连贯接口方式与Java 8的lambda表达式结合起来。虽然有时很明确一个方法调用返回的是一个KStream对象,但是有时并非如此。记住,KStreamAPI中的大多数方法都会返回一个新的KStream实例。现在,让我们构建一个能够满足ZMart业务需求的流式应用程序。

现在将深入研究构建处理拓扑。为了将这里编写的代码与第1章中的处理拓扑图联系起来,我们将在图中突出显示当前正进行的部分。

1.构造源节点

首先,通过链式调用KStream API的两个方法来创建拓扑图的源节点和第一个处理器(图3-6中的深色部分)。到目前为止,起始节点的作用应该是相当明显的。拓扑中第一个处理器将负责屏蔽信用卡号以保护客户的隐私。

图3-6 源处理器从Kafka主题中消费信息,只提供给屏蔽处理器,使屏蔽处理器成为拓扑其余部分的源

代码清单3-5通过调用StreamsBuilder.stream方法创建一个源节点,该方法使用一个默认的String类型的Serde、一个自定义的用于Purchase对象的Serde和流的消息源对应主题的名称。本例中仅指定了一个主题,但是也可以提供一个以逗号分隔的主题名称列表或者正则表达式所匹配的主题名称。

代码清单3-5 创建源节点和第一个处理器

KStream<String,Purchase> purchaseKStream =
➥   streamsBuilder.stream("transactions",
➥   Consumed.with(stringSerde, purchaseSerde))
➥   .mapValues(p -> Purchase.builder(p).maskCreditCard().build());

代码清单3-5通过Consumed实例提供Serdes,但是这里也可不指定Serdes,仅指定主题的名称,让程序依赖配置参数中提供的默认的Serdes

紧接着下一步调用的是KStream.mapValues方法,该方法以ValueMapper<V,V1>实例作为参数。值映射器(value mapper)接受一种类型(本例是一个Purchase对象)的单个参数,并将该参数对应的对象映射成一个新值,可能是另一种类型。本示例中,KStream.mapValues方法返回与其类型相同的对象(Purchase),但是该对象的信用卡号已做了屏蔽。

注意,当使用KStream.mapValues方法时,原始的键并不会变化,不会被分解映射成一个新值。如果想要生成一个新的键/值对或者在生成新值时包括键,那么可以使用KStream.map方法,该方法接受一个KeyValueMapper<K,V, KeyValue<K1,V1>>实例。

2.关于函数式编程的几点建议

对于mapmapValues函数需要记住的一个重要概念是:它们的操作不应该有副作用,这意味着函数不会修改作为参数表示的对象或值。这是由于KStream API中的函数式编程方面的原因。函数式编程是一个深层话题,对它进行充分探讨已超出了本书的范围,但这里我们会简要地介绍一下函数式编程的两个主要原则。

第一个原则是避免状态修改。如果对象需要变更或更新,那么就将该对象传递给一个函数,该函数创建一个包含所需变更或更新的原对象的副本或者一个全新的实例。在代码清单3-5中,传递给KStream.mapValues函数的lambda表达式用于将Purchase对象用屏蔽的信用卡号进行更新,原始Purchase对象的信用卡号保持不变。

第二个原则是通过组合几个较小的、用途单一的函数来构建复杂的操作。函数组合是一种模式,在使用KStream API时会经常看到。

定义

 

就本书而言,函数式编程(functional programming)被定义为一种编程方式,该方式中函数作为第一类对象[4]。此外,函数应该避免产生副作用,如修改状态或变更对象。

3.创建第二个处理器

现在将构建第二个处理器,它负责从一个主题中抽取模式数据,这里说的模式数据是指ZMart用来确定不同地区购买模式的数据。同时,也会添加一个接收器节点,负责将模式数据写入Kafka的主题。图3-7演示了如何构建这些组件。

在代码清单3-6中可以看到purchaseKStream处理器调用熟悉的mapValues方法来创建一个KStream实例。这个新的KStream实例将开始接收由调用mapValues方法而创建的PurchasePattern对象。

代码清单3-6 第二个处理器和向Kafka写入的接收器节点

KStream<String, PurchasePattern> patternKStream = 
➥   purchaseKStream.mapValues(purchase ->
➥   PurchasePattern.builder(purchase).build());
patternKStream.to("patterns",
➥   Produced.with(stringSerde,purchasePatternSerde));

在代码清单3-6中,声明了一个变量用来保存新的KStream实例的引用,因为将会使用该变量调用print方法将流的结果打印到控制台,这在开发和调试时非常有用。购买-模式处理器将它接收到的记录进一步传递给自己的一个子节点,该节点由KStream.to方法定义,将模式数据写入patterns主题。注意,使用的是Produced对象来提供先前创建的Serde

图3-7 第二个处理节点构建购买-模式信息,接收器节点将PurchasePattern对象写入Kafka主题

KStream.to方法是KStream.source方法的镜像。不是为拓扑设置一个读取数据的源,而是通过KStream.to方法定义一个接收器节点,该节点将从KStream实例接收到的数据写入Kafka主题。KStream.to方法也提供了重载的版本,通过重载方法可以忽略Produced参数,而使用配置中定义的默认SerdesProduced类可设置的一个可选参数是流分区器(StreamPartitioner),这将在后面介绍。

4.创建第三个处理器

拓扑中的第三个处理器是客户奖励累加器节点,如图3-8所示,这将使ZMart能够追踪其优质客户群的购买情况。奖励累加器将数据发送到ZMart总部应用程序消费的主题中,以确定客户完成购买后的奖励。对应代码如代码清单3-7所示。

代码清单3-7 第三个处理器和向Kafka写入的终端节点

KStream<String, RewardAccumulator> rewardsKStream = 
➥   purchaseKStream.mapValues(purchase ->
➥   RewardAccumulator.builder(purchase).build());
rewardsKStream.to("rewards",
➥   Produced.with(stringSerde,rewardAccumulatorSerde));

图3-8 第三个处理器从购买数据中创建一个RewardAccumulator对象,终端节点将结果写入Kafka主题

从代码清单3-7可知,使用现在应该熟悉的模式来构建奖励累加器处理器:创建一个新的KStream实例,该实例用于将包含在记录中的原始购买数据映射为一个新的对象类型。同时还添加了一个接收器节点到奖励累加器中,以便可以将奖励KStream的结果写入Kafka的一个主题,并用于确定客户的奖励等级。

5.创建最后一个处理器

最后,将使用创建的第一个KStream实例对象purchaseKStream,并附加一个接收器节点将原始购买记录(当然信用卡号已被屏蔽)写入一个名为purchases的主题中。purchases主题中的数据将被存储到一个NoSQL存储中,诸如Cassandra、Presto或者Elasticsearch,用来做特殊分析。图3-9展示了最后一个处理器,对应代码如代码清单3-8所示。

代码清单3-8 最后一个处理器

purchaseKStream.to("purchases", Produced.with(stringSerde, purchaseSerde));

我们已经一步一步地构建了应用程序,现在来看看完整的程序代码(src/main/java/bbejeck/ chapter_3/ZMartKafkaStreamsApp.java)。你很快就会注意到,它比之前的Hello World(那个“Yelling App”)示例要复杂得多,如代码清单3-9所示。

图3-9 最后一个节点将整个购买交易写入一个主题中,该主题的消费者是一个NoSQL数据存储

代码清单3-9 ZMart客户购买行为的KStream程序

public class ZMartKafkaStreamsApp {

    public static void main(String[] args) {
        // some details left out for clarity

        StreamsConfig streamsConfig = new StreamsConfig(getProperties());

        JsonSerializer<Purchase> purchaseJsonSerializer = new 
➥   JsonSerializer<>();
       JsonDeserializer<Purchase> purchaseJsonDeserializer =
➥   new JsonDeserializer<>(Purchase.class);   ←--- 创建Serde,数据格式是JSON
        Serde<Purchase> purchaseSerde = 
➥   Serdes.serdeFrom(purchaseJsonSerializer, purchaseJsonDeserializer);
        //Other Serdes left out for clarity

        Serde<String> stringSerde = Serdes.String();

        StreamsBuilder streamsBuilder = new StreamsBuilder();

       KStream<String,Purchase> purchaseKStream = 
➥   streamsBuilder.stream("transactions",
➥   Consumed.with(stringSerde, purchaseSerde))
➥   .mapValues(p -> Purchase.builder(p).maskCreditCard().build());  ←--- 创建源和第一个处理器
       KStream<String, PurchasePattern> patternKStream = 
➥   purchaseKStream.mapValues(purchase -> 
➥   PurchasePattern.builder(purchase).build());  ←--- 创建PurchasePattern
处理器

        patternKStream.to("patterns", 
➥   Produced.with(stringSerde,purchasePatternSerde));

       KStream<String, RewardAccumulator> rewardsKStream = 
➥   purchaseKStream.mapValues(purchase -> 
➥   RewardAccumulator.builder(purchase).build());  ←--- 创建RewardAccumulator处理器

        rewardsKStream.to("rewards", 
➥   Produced.with(stringSerde,rewardAccumulatorSerde));

       purchaseKStream.to("purchases", 
➥   Produced.with(stringSerde,purchaseSerde));   ←--- 创建存储接收器,一个被存储消费者使用的主题

        KafkaStreams kafkaStreams = 
➥   new KafkaStreams(streamsBuilder.build(),streamsConfig);
        kafkaStreams.start();
    }
}

注意

 

清晰起见,代码清单3-9中省略了一些细节。书中的示例代码并不一定是完整独立的,本书配套源代码提供了完整的例子。

正如所看到的,这个例子比“Yelling App”程序要稍微复杂一点,但是它有一个类似的流程。具体来说就是仍然执行了以下几个步骤。

(1)创建一个StreamsConfig实例。

(2)构建一个或多个Serde实例。

(3)构建处理拓扑。

(4)组装所有组件并启动Kafka Streams程序。

在这个应用程序中,提到过使用Serde,但并没有解释为什么或者如何创建它们。现在,让我们花一点时间来讨论Serde在Kafka Streams应用程序中的作用。

Kafka以字节数组的格式传输数据。因为数据格式是JSON,所以需要告诉Kafka如何先将一个对象转换成JSON,然后当要将数据发送到主题时再转换成一个字节数组。反之,需要指定如何将消费的字节数组转换成JSON,然后再转换成处理器使用的对象类型。将数据进行不同格式之间的转换就是为什么需要Serde的原因。有些SerdeStringLongInteger等)是由Kafka客户端依赖项提供的,但是需要为其他对象创建自定义的Serde

在第一个示例中,“Yelling App”应用程序仅需要一个字符串序列化器/反序列化器,由Serdes.String()工厂方法提供实现。然而,在ZMart示例中,就需要自定义一个Serde实例,因为对象类型是任意的。我们来看看为Purchase类创建一个Serde的过程,我们不讨论其他Serde实例,因为它们遵循相同的模式,只是类型不同而已。

创建一个Serde需要实现Deserializer<T>Serializer<T>接口,我们将使用代码清单3-10和代码清单3-11实现的整个例子。此外,将使用谷歌公司的Gson库将对象与JSON之间进行互相转换。代码清单3-10给出的是序列化器,代码见src/main/java/bbejeck/util/serializer/ JsonSerializer.java。

代码清单3-10 通用序列化器

public class JsonSerializer<T> implements Serializer<T> {

    private Gson gson = new Gson();  ←--- 创建Gson对象

    @Override
    public void configure(Map<String, ?> map, boolean b) {

    }

    @Override
    public byte[] serialize(String topic, T t) {
       return gson.toJson(t).getBytes(Charset.forName("UTF-8"));  ←--- 将一个对象序列化为字节
    }

    @Override
    public void close() {

    }
}

对于代码清单3-10中的序列化操作,首先将对象转换成JSON字符串,然后获取该字符串的字节。为了将对象转换成JSON字符串或者将JSON字符串转换成对象,本例使用Gson。

对于反序列化过程,采取不同的步骤:用一个字节数组来创建一个字符串,然后使用Gson将JSON字符串转换成一个Java对象。通用反序列化代码如代码清单3-11所示,参见src/main/ java/bbejeck/util/serializer/JsonDeserializer.java。

代码清单3-11 通用反序列化器

public class JsonDeserializer<T> implements Deserializer<T> {

   private Gson gson = new Gson();  ←--- 创建一个Gson
对象
    private Class<T> deserializedClass;   ←--- 实例化反序列化类的变量

   public JsonDeserializer(Class<T> deserializedClass) {
        this.deserializedClass = deserializedClass;
    }

    public JsonDeserializer() {
    }

    @Override
    @SuppressWarnings("unchecked")
    public void configure(Map<String, ?> map, boolean b) {
        if(deserializedClass == null) {
            deserializedClass = (Class<T>) map.get("serializedClass");
        }
    }

    @Override
    public T deserialize(String s, byte[] bytes) {
        if(bytes == null){
            return null;
        }

       return gson.fromJson(new String(bytes),deserializedClass);   ←--- 将字节反序列化为预期类的实例

   }

    @Override
    public void close() {

    }
}

现在,让我们再回顾一下代码清单3-9中的以下几行代码:

JsonDeserializer<Purchase> purchaseJsonDeserializer = 
➥   new JsonDeserializer<>(Purchase.class);   ←--- 为Purchase类创建反序列化器
JsonSerializer<Purchase> purchaseJsonSerializer = 
➥   new JsonSerializer<>();  ←--- 为Purchase类创建序列化器
Serde<Purchase> purchaseSerde = 
➥   Serdes.serdeFrom(purchaseJsonSerializer,purchaseJsonDeserializer);   ←--- 为Purchase对象创建Serde

正如所看到的,Serde对象是很有用的,因为它是给定对象的序列化器和反序列化器的容器。

到目前为止,我们已经介绍了很多关于开发一个Kafka Streams应用程序的背景知识,仍然还有很多内容要讲,但我们暂停一下,来谈谈开发过程本身,以及如何在开发Kafka Streams应用程序时让自己更轻松。

[4] 第一类对象(first-class object)在计算机科学中指可以在执行期创造并作为参数传递给其他函数或存入一个变量的实体。(摘自百度百科)——译者注

我们已经构建了以流的方式处理来自ZMart的购买记录的图,并创建了3个分别将结果输出到各自对应的主题的处理器。在开发过程中,当然可以运行控制台消费者以查看结果,但是最好有一个更方便的解决方案,比如能够在控制台监视数据在拓扑中的流动情况,如图3-10所示。

KStream接口提供了一个在开发时很有用的方法——KStream.print方法,该方法的入参是一个Printed<K,V>类的实例对象。Printed提供了两个静态方法:打印到标准输出的Printed.toSysOut()方法;将结果写入文件的Printed.toFile(filePath)方法。

此外,还可以通过链式调用withLabel()方法对打印结果进行标识,允许为每条记录打印一个初始的消息头,这对于处理来自不同处理器的消息很有用。重要的是,无论是将流打印到控制台还是文件,都需要对象重写toString()方法,在该方法中创建有意义的输出结果。

图3-10 在开发时一个很好的工具,它具有将各节点输出数据打印到控制台的能力。为了启用打印到控制台,只需调用print方法来替代其他任何的to方法

最后,如果不想使用toString方法,或者想自定义Kafka Streams如何打印记录,那么就使用Printed.withKeyValueMapper方法,该方法的入参是一个KeyValueMapper实例,因此可以用任何想要的方式格式化记录。之前提到过同样的告诫——不应该修改原始记录,在这里同样适用。

在本书中,对所有示例我们只关注打印到控制台。以下代码片段是代码清单3-11中使用KStream.print的一些示例。

patternKStream.print(Printed.<String, PurchasePattern>toSysOut()
➥   .withLabel("patterns"));  ←--- 设置将PurchasePattern转换打印到控制台

rewardsKStream.print(Printed.<String, RewardAccumulator>toSysOut()
➥   .withLabel("rewards"));  ←--- 设置将RewardAccumulator转换打印到控制台

purchaseKStream.print(Printed.<String, Purchase>toSysOut()
➥   .withLabel("purchases"));   ←--- 将购买数据打印到控制台

让我们快速看一下屏幕上的输出(如图3-11所示),以及它是如何在开发过程中提供帮助的。通过启用打印功能,当对应用程序进行修改、关闭和启动时可以直接通过IDE来运行Kafka Streams应用程序,并确认输出结果是所预期的。这不能替代单元测试和集成测试,但是在开发过程中直接查看流结果是一个很好的工具。

使用print()方法的一个缺点是它创建了一个终端节点,这意味着不能将它嵌入处理器链中,而是需要一个单独语句。然而,还有一个KStream.peek方法,它以ForeachAction实例为参数,并返回一个新的KStream实例。ForeachAction接口有一个apply()方法,该方法返回类型为void,因此KStream.peek方法不会向下游传递任何东西,这使得该方法成为诸如打印等操作的理想选择。使用KStream.peek方法就能够将它嵌入处理器链中而不必编写一条单独的打印语句。在本书其他示例中将会看以这种方式使用KStream.peek方法。

图3-11 这是屏幕上数据的详细视图。启用打印到控制台功能后,将很快能够看到处理器是否正常工作

至此,你已经将使用Kafka Streams实现的购买分析(purchase-analysis)程序正常运行了。用于消费消息并将结果写入patternsrewardspurchases主题的其他的程序也开发了,对ZMart效果也不错。但是,好处从来都是有代价的,现在ZMart的管理层看到你的流式处理程序能提供什么,又向你提了一堆新的需求。

现在对前面生成的3类结果都提出了新的需求,好在仍将使用相同的源数据。新需求要求将所提供的数据进行细化,在某些情况还得将数据进一步分解。当前的主题可能适用于新需求,也有可能需要创建全新的主题。

更多新需求将不可避免地出现在你面前,但是现在可以开始处理当前的新需求了。在查看KStreamAPI时,你会很欣慰地看到一些已经定义的方法将能够很容易实现这些新需求。

注意

 

从现在开始,所有示例代码都精简到必不可少的部分,以最大限度地提高清晰度。除非有新的内容需要介绍,否则可以假定配置和设置代码保持不变。这些被删减的例子并不是孤立的——本示例完整代码见src/main/java/bbejeck/chapter_3/ZMartKafkaStreamsAdvancedReqsApp.java。

1.过滤购买信息

让我们从过滤掉没有达到最小阈值的购买信息开始。为了移除低价购买信息,需要在KStream实例和接收器节点之间插入一个过滤处理器节点。需要更新处理器拓扑图如图3-12所示。

图3-12 在屏蔽处理器和向Kafka写入的终端节点之间放置了一个处理器,这个过滤处理器会将给定额度以下的购买信息过滤掉

可以使用KStream方法,该方法以一个Predicate<K,V>实例为参数。虽然在这里采用链式方法调用,但是在拓扑中创建了一个新处理节点,如代码清单3-12所示。

代码清单3-12 过滤KStream

KStream<Long, Purchase> filteredKStream = 
➥   purchaseKStream((key, purchase) ->
➥   purchase.getPrice() > 5.00).selectKey(purchaseDateAsKey);

此代码过滤掉额度小于5美元的购买信息,并选择购买日期转换为长整型作为消息的键。

Predicate接口只定义了一个test()方法,该方法需要两个参数,即消息的键和值,尽管在这里只需要使用值。同样可以使用Java 8的lambda表达式代替KStream API中定义的具体类型。

定义

 

如果你熟悉函数式编程,那么你在使用Predicate接口时会有种宾至如归的感觉。如果术语“谓词”对你来讲是新内容的话,也不要想复杂了,它仅是一个给定的表达式语句(如x < 100)而已。一个对象要么匹配谓词语句,要么不匹配。

此外,为了想使用购买时间戳作为键,可以使用selectKey处理器,它使用3.4节中提到的KeyValueMapper将购买时间提取为长整型值。在下面“生成一个键”部分将会介绍有关选择键的细节。

镜像函数KStreamNot,执行相同的过滤功能,不过执行方式相反,只有与给定谓词不匹配的记录才会在拓扑中被进一步处理。

2.分裂/分支流

现在,需要将购买流分裂成可以写入不同主题的独立流。幸运的是,KStream.branch方法可以完美地实现该需求。KStream.branch方法接受任意数量的Predicate实例,返回一个KStream实例数组。返回的数组大小与方法调用时提供的谓词数量匹配。

在前面的更改中,修改了处理拓扑中一个存在的叶子节点。对于这个分支流的需求,将会在处理节点图中创建一个全新的叶子节点,如图3-13所示。

图3-13 分支处理器将流一分为二:一支流包括来自咖啡馆的购买信息, 另一支流包括来自电子产品商店的购买信息

当原始流的记录流经分支处理器时,每条记录都是按照所提供的谓词顺序进行匹配。分支处理器会将记录分配给第一个匹配的流,不会尝试匹配其他谓词。

如果记录不匹配任何给定的谓词,那么分支处理器会放弃该记录。返回数组中流的顺序与提供给branch()方法的谓词顺序一致。每个部门对应一个独立的主题也许不是唯一的方法,但暂时我们就这样认为。它满足当前需求,并且以后也可以被重新访问。具体实现如代码清单3-13所示。

代码清单3-13 分裂流

Predicate<String, Purchase> isCoffee = 
➥   (key, purchase) ->
➥   purchase.getDepartment().equalsIgnoreCase("coffee");  ←--- 用Java 8的lambda表达式创建谓词

Predicate<String, Purchase> isElectronics = 
➥   (key, purchase) -> 
➥   purchase.getDepartment().equalsIgnoreCase("electronics");

int coffee = 0;
int electronics = 1;   ←--- 标记返回数组的预期索引

KStream<String, Purchase>[] kstreamByDept = 
➥   purchaseKStream.branch(isCoffee, isElectronics);   ←--- 调用branch方法将原始流分为两个流

kstreamByDept[coffee].to( "coffee", 
➥   Produced.with(stringSerde, purchaseSerde));
kstreamByDept[electronics].to("electronics", 
➥   Produced.with(stringSerde, purchaseSerde));   ←--- 将每个流的结果写入相应的主题中

警告

 

代码清单3-13中的示例将记录发送到几个不同主题中。尽管Kafka能够被配置为当首次试图从不存在的主题生产或消费数据时自动创建主题,但最好不要依赖这种机制。如果你依赖于自动创建主题,那么这些主题将使用来自服务器配置属性文件的默认值进行配置,这里的配置属性文件可能是你需要的设置,也可能不是。你应该总是提前考虑好所需要的主题、分区级别、副本因子,在运行Kafka Streams应用程序之前就先创建它们。

在代码清单3-13中,提前定义了谓词,因为传递4个lambda表达式参数有些麻烦。为了使程序可读性实现最大化,对返回数组的索引也作了标记。

这个例子展示了Kafka Stream的强大和灵活性。只通过几行代码就可以将购买交易的原始流分裂成四支流。此外,在重用同一个源处理器的同时,你也开始构建更复杂的处理拓扑。

分裂流与分区流

 

尽管分裂和分区看起来有类似的想法,但它们在Kafka和Kafka Streams中没有关联。通过KStream.branch方法分裂一个流会导致创建一个或多个流,这些流最终可以将记录发送到另一个主题当中。分区是Kafka将一个主题的消息分配到不同服务器的方式。除了配置调优之外,分区也是Kafka实现高吞吐量的主要方法。

到目前为止,你已轻松地满足了3个新需求中的2个。现在是时候来实现最后一个附加需求,即为购买记录生成一个键用于存储。

3.生成一个键

Kafka的消息是键/值对,因此所有流过Kafka Streams应用程序的记录也是键/值对。但是没有要求键不能为空。在实践中,如果不需要特定的键,那么使用空键将减少网络传输的总体数据流量。所有流入ZMart的Kafka Streams应用程序的记录都具有空键。

这一直没问题,直到意识到NoSQL存储解决方案存储的数据是以键/值对格式存储的。在将购买数据写入purchases主题之前需要一种方法为其生成一个键。当然可以用KStream.map方法生成一个键,并返回一个新的键/值对(只有键是新生成的)。但是有一个更简明的KStream. selectKey方法,该方法返回一个新的KStream实例,该实例使用新键(可能是不同类型)和相同值生成记录,如代码清单3-14所示。

代码清单3-14 生成一个新键

KeyValueMapper<String, Purchase, Long> purchaseDateAsKey = 
➥   (key, purchase) -> purchase.getPurchaseDate().getTime();  ←---  KeyValueMapper提取购买日期,并将其转换为长整型值

KStream<Long, Purchase> filteredKStream = 
➥   purchaseKStream.filter((key, purchase) ->
➥   purchase.getPrice() > 5.00).selectKey(purchaseDateAsKey);   ←--- 通过一条语句过滤出购买信息并选择一个键

filteredKStream.print(Printed.<Long, Purchase>
➥   toSysOut().withLabel("purchases"));  ←--- 将结果打印到控制台
filteredKStream.to("purchases", 
➥   Produced.with(Serdes.Long(),purchaseSerde));   ←--- 将记录具体实现到Kafka
主题中

这种对处理器拓扑的更改类似于过滤操作,因为在过滤器和接收器处理器之间增加了一个处理节点,如图3-14所示。

在代码清单3-14中为了创建新键,提取购买日期并将其转化为长整型值。尽管可以传递一个lambda表达式,但这里它被赋值给一个变量以便于程序可读。还要注意,需要修改KStream.to方法中的serde类型,因为已改变键的类型。

这是一个映射到一个新键的简单示例。稍后,在另一个示例中,我们将会看到选择启用键来连接独立流。此外,在这之前的所有示例都是无状态的。但是,对于有状态转换也有几个选项,稍后将会看到。

图3-14 NoSQL数据存储将使用购买日期作为数据的键进行存储,selectKey 处理器恰好在将数据写入Kafka之前将提取购买日期作为键

ZMart的安全部门联系你说,在它的一家店有欺诈嫌疑。有报告显示有一家店的经理正在输入无效的销售折扣码,安全人员不知道发生了什么,现在向你求助。

安全人员不希望将这些信息写入Kafka主题中,你和他们讨论了关于Kafka的安全、访问控制以及如何锁定对某个主题的访问。但是安全人员还是坚持他们的意见,需要将记录写入他们可完全控制的关系数据库。你感觉这是一场你赢不了的战斗,因此你就妥协了,决定按照要求完成这项任务。

循环操作

你需要做的第一件事是创建一个新的KStream,过滤出与单个员工ID相关的结果。尽管有大量的数据流经拓扑,但这个过滤器可以把数据量缩减到很小。

这里将使用带有谓词的KStream,该谓词看起来与特定的员工ID相匹配。这个过滤器完全独立于先前的过滤器,它隶属于源KStream实例。尽管完全有可能使用链过滤器,但在这里不会这样做,你希望该过滤器可以完全访问流中的数据。

接下来,将使用KStream.foreach方法,如图3-15所示。KStream.foreach方法接受一个ForeachAction<K, V>实例,这是另一个终端节点的例子。它是一个简单的处理器,使用提供的ForeachAction实例对所接收的每条记录执行操作。具体实现如代码清单3-15所示。

图3-15 为了将包含一个特定员工的购买信息写入Kafka Streams应用程序之外,首先要增加一个过滤处理器,用来根据员工ID提取购买信息,然后使用foreach操作符将每条记录写入一个外部关系数据库

代码清单3-15  Foreach操作

ForeachAction<String, Purchase> purchaseForeachAction = (key, purchase) ->
➥   SecurityDBService.saveRecord(purchase.getPurchaseDate(),
➥   purchase.getEmployeeId(), purchase.getItemPurchased());
purchaseKStream.filter((key, purchase) -> 
➥   purchase.getEmployeeId()
➥   .equals("source code has 000000"))
➥   .foreach(purchaseForeachAction);

ForeachAction同样使用Java 8的lambda表达式,它存储在一个名为purchaseForeach Action的变量中。这需要另外一行代码,但是这样做与将它们写在一起相比提高了代码清晰度。在下一行代码,另一个KStream实例将过滤的结果发送给上面定义的ForeachAction对象。

注意,KStream.foreach是无状态的,如果每条记录需要状态来执行一些操作,那么可以使用KStream.process方法。KStream.process方法将在下一章将状态添加到Kafka Streams应用程序时进行讲解。

如果后退一步,看看你迄今为止所完成的工作,从编写的代码量角度来看,是相当了不起的。但是不要感到太轻松了,因为ZMart的高层管理人员注意到你的生产能力了,更多的变化和对“purchase-streaming”分析程序的优化即将到来。

在下一章中,我们将开始讨论状态,探讨流式应用程序使用状态所需要的属性,以及为什么需要添加状态。然后向KStream应用程序添加状态,首先通过使用本章所看到的KStream的有状态方法(KStream.mapValues)。在更高级的示例中,将在两个不同的购买流之间执行连接操作,以帮助ZMart提升客户服务。


相关图书

Rust游戏开发实战
Rust游戏开发实战
JUnit实战(第3版)
JUnit实战(第3版)
Kafka实战
Kafka实战
Rust实战
Rust实战
PyQt编程快速上手
PyQt编程快速上手
Elasticsearch数据搜索与分析实战
Elasticsearch数据搜索与分析实战

相关文章

相关课程