Hadoop实战手册

978-7-115-33795-5
作者: 【美】Jonathan R. Owens Jon Lentz Brian Femiano
译者: 傅杰赵磊卢学裕
编辑: 杨海玲
分类: Hadoop

图书目录:

详情

这是一本Hadoop实用手册,主要针对实际问题给出相应的解决方案。本书特色是以实践结合理论分析,手把手教读者如何操作,并且对每个操作都做详细的解释,对一些重要的知识点也做了必要的拓展。全书共三个部分,第一部分为基础篇,第二部分为数据分析高级篇,第三部分为系统管理篇。

图书摘要

Hadoop实战手册
[美]Jonathan R.Owens Jon Lentz Brian Femiano 著

傅杰 赵磊 卢学裕 译

人民邮电出版社

北京

内容提要

这是一本 Hadoop 实用手册,主要针对实际问题给出相应的解决方案。本书特色是以实践结合理论分析,手把手教读者如何操作,并且对每个操作都做详细的解释,对一些重要的知识点也做了必要的拓展。全书共包括3个部分,第一部分为基础篇,主要介绍Hadoop数据导入导出、HDFS 的概述、Pig 与 Hive 的使用、ETL 和简单的数据处理,还介绍了MapReduce的调试方式;第二部分为数据分析高级篇,主要介绍高级聚合、大数据分析等技巧;第三部分为系统管理篇,主要介绍 Hadoop 的部署的各种模式、添加新节点、退役节点、快速恢复、MapReduce调优等。

本书适合各个层次的Hadoop技术人员阅读。通过阅读本书,Hadoop初学者可以使用Hadoop 来进行数据处理,Hadoop 工程师或者数据挖掘工程师可以解决复杂的业务分析, Hadoop系统管理员可以更好地进行日常运维。本书也可作为一本Hadoop技术手册,针对要解决的相关问题,在工作中随时查阅。

译者序

随着Hadoop技术在互联网公司的广泛应用,普及程度越来越高,自学Hadoop的Java程序员也越来越多。大多数人(包括译者本人)自学Hadoop的都是从“部署Hadoop环境+运行WordCount例子”开始,而且大多数自学者也都终止在WordCount。因没有具体的应用场景而感到学习没有方向,没有成就感。

本书特色是以实践结合理论分析,手把手教读者如何操作,并且对每个操作都做详细的解释,对一些重要的知识点也做了必要的拓展。此外,书中的教学源代码都可以在官网上下载到。本书整体可分成3部分,第一部分为基础篇包含第1章、第2章、第3章、第4章、第8章内容,主要介绍Hadoop数据导入导出、HDFS的概述、Pig与Hive的使用、ETL和简单的数据处理,还介绍了MapReduce的调试方式;第二部分为数据分析高级篇包含第5章、第6章、第7章、第10章内容,主要介绍高级聚合、大数据分析等技巧;第三部分为系统管理篇,包含第 9 章,主要介绍 Hadoop 的部署的各种模式、添加新节点、退役节点、快速恢复、MapReduce调优等。

如果你是Hadoop初学者,建议你先阅读第一部分内容,完成这部分内容的学习以后,你基本上可以使用Hadoop来进行数据处理。

如果你已经是Hadoop工程师或者数据挖掘工程师,可以系统地学习第二部分内容,当然也可以根据需要进行查阅学习。完成这部分内容的学习,有助于解决一些复杂的业务分析。

如果你是 Hadoop 系统管理员,建议你阅读第三部分内容,当然你也可以阅读第一部分的内容,这样更有助于进行日常运维。

本书也可作为一本手册,在教学、工作中随时查阅,解决相关问题。

译者简介

傅杰 硕士,毕业于清华大学高性能所,现就职于优酷土豆集团,任数据平台架构师,负责集团大数据基础平台建设,支撑其他团队的存储与计算需求,包含Hadoop基础平台、日志采集系统、实时计算平台、消息系统、天机镜系统等。个人专注于大数据基础平台架构及安全研究,积累了丰富的平台运营经验,擅长Hadoop平台性能调优、JVM调优及诊断各种MapReduce 作业,还担任China Hadoop Submit 2013 大会专家委员、优酷土豆大数据系列课程策划&讲师、EasyHadoop社区讲师。

赵磊 硕士,毕业于中国科学技术大学,现就职于优酷土豆集团,任数据挖掘算法工程师,负责集团个性化推荐和无线消息推送系统的搭建和相关算法的研究。个人专注于基于大数据的推荐算法的研究与应用,积累了丰富的大数据分析与数据挖掘的实践经验,对分布式计算和海量数据处理有深刻的认识。

卢学裕 硕士,毕业于武汉大学,曾供职腾讯公司即通部门,现就职于优酷土豆集团,担任大数据技术负责人,负责优酷土豆集团大数据系统平台、大数据分析、数据挖掘和推荐系统。有丰富的Hadoop平台使用及优化经验,尤其擅长MapReduce的性能优化。基于Hadoop生态系统构建了优酷土豆的推荐系统,BI分析平台。

前言

本书能帮助开发者更方便地使用 Hadoop,从而熟练地解决问题。读者会更加熟悉Hadoop相关的各种工具从而进行最佳的实践。

本书指导读者使用各种工具解决各种问题。这些工具包括:Apache Hive、Pig、MapReduce、Mahout、Giraph、HDFS、Accumulo、Redis以及Ganglia。

本书提供了深入的解释以及代码实例。每章的内容包含一组问题集的描述,并对面临的技术挑战提出了解决方案,最后完整地解决了这些问题。每节将单一问题分解成不同的步骤,这样更容易按照步骤执行相关操作。本书覆盖的内容包括:关于HDFS的导入、导出数据,使用Giraph进行图分析,使用Hive、Pig以及MapReduce进行批量数据分析,使用Mahout 进行机器学习方法,调试并修改MapReduce 作业的错误,使用Apache Accumulo对结构数据进行列存储与检索。

本书的示例中涉及的Hadoop技术同样也可以应用于读者自己所面对的问题。

本书涵盖哪些内容

第1章“Hadoop分布式文件系统——导入和导出数据”,包含了从一些流行的数据库导入导出数据的方法,包括MySQL、MongoDB、Greenplum以及MSSQL Server。此外,还包括一些辅助工具,例如Pig、Flume以及Sqoop。

第2章“HDFS”,介绍从HDFS读入或写出数据,介绍了如何使用不同的序列化库,包含 Avro、Thrift 以及 Protocol Buffers。同样包含如何设置数据块大小、备份数以及是否需要进行LZO压缩。

第3 章“抽取和转换数据”,包含对不同数据源类型进行基本的Hadoop ETL 操作。不同的工具包括Hive、Pig以及MapRedcue JAVA API,用于批量处理数据,输出一份或多份转换数据。

第4章“使用Hive、Pig以及MapReduce处理常见的任务”,关注于如何利用这些工具提供的函数快速解决不同种类的问题。其中包括字符串连接,外部表的映射,简单表的连接,自定义函数以及基于集群进行分发操作。

第5章“高级连接操作”,介绍了MapReduce、Hive以及Pig中复杂而有效的连接技术。章节的内容包括Pig中的归并连接、复制连接以及倾斜连接。同样也包含Hive的map端连接以及全外连接的内容。同时本章也包括对于外部数据存储,如何使用Redis进行数据连接。

第6章“大数据分析”,介绍了如何使用Hadoop解答关于你的不同数据的各种查询。一些关于 Hive 的例子将展示在不同的分析中如何正确地实现并重复使用用户自定义函数(UDF)。此外其中有关于Pig的两小节,介绍了对于Audioscrobbler数据集的各类分析。关于MapReduce Java API 的一小节介绍了Combiner 的使用。

第7 章“高级大数据分析”,展示了使用Apache Giraph 以及Mahout 处理不同类型的图分析和机器学习问题。

第8章“调试”,帮助你定位并测试MapReduce作业。这些例子使用MRUnit以及更利于测试的本地模式。此外还强调了使用counter以及更新任务状态的重要性,这样做有助于监控MapReduce作业。

第9章“系统管理”,主要关注如何性能调优Hadoop中不同的配置项。下面的内容包含在内:基本的初始化,XML配置项调整,定位坏数据节点,处理NameNode故障,使用Ganglia进行性能监控。

第10 章“使用Apache Accumulo 进行持久化”,展示了使用NoSQL 数据存储Apache Accumulo带来的很多特性和功能。这些章节利用了这些独特的特性,包括iterator、combiner、扫描授权以及约束,同时也给出了对于有效建立地理空间行健值以及使用MapReduce执行批量分析的例子。

阅读需要的准备

读者需要访问一个伪分布式(单台机器)或完全分布式(多台机器)的集群用于执行本书中的例子。章节中使用的各种工具需要在集群上安装并正确地配置。此外,本书提供的代码使用不同的语言编写,因此读者能访问的机器最好安装了合适的开发工具。

读者范围

本书使用简要的代码作为例子,展示了可以使用 Hadoop 解决的各类现实中的问题。本书的目的是使不同水平的开发者都能方便地使用Hadoop及其工具。Hadoop的初学者能通过本书加速学习进度并了解现实中Hadoop应用的例子。对于有更多经验的Hadoop开发者,通过本书,会对许多工具以及技术有新的认识或有一个更清晰的框架,这些东西之前可能你只听说过但并没有真正理解其中的价值。

书中的约定

在你阅读本书时,你会发现书中有各种样式的文本,这些不同样式的文本是用来区分不同类型的信息的。下面是一些不同样式文本的实例,以及相应的说明。

文本中的代码如下所示:“所有的 Hadoop 文件系统 shell 命令行都使用统一的形式hadoop fs –COMMAND。”

代码块如下所示:

weblogs = load '/data/weblogs/weblog_entries.txt' as

(md5:chararray,

url:chararray,

date:chararray,

time:chararray,

ip:chararray);

md5_grp = group weblogs by md5 parallel 4;

store md5_grp into '/data/weblogs/weblogs_md5_groups.bcp';

如果我们希望让代码块中的一特殊部分引起你的注意,相关行或条目会以黑体印刷:

weblogs = load '/data/weblogs/weblog_entries.txt' as

(md5:chararray,

url:chararray,

date:chararray,

time:chararray,

ip:chararray);

md5_grp = group weblogs by md5 parallel 4;

store md5_grp into '/data/weblogs/weblogs_md5_groups.bcp';

命令行输入或输出都是以如下样式书写的:

hadoop distcp –m 10 hdfs://namenodeA/data/weblogs hdfs://namenodeB/data/weblogs

新的术语以及重要文字会以加粗的字体出现。你在屏幕上(如菜单或者对话框中)见到的文字都会是像后面这样出现在正文中:“为了构建JAR 文件,下载Jython Java 安装程序,并执行该程序,从安装菜单中选择Standalone选项。”

方框中出现的为警告或重要的注解。

提示或小技巧出现在这里。

读者反馈

我们总是欢迎来自读者的反馈。请告诉我们你觉得这本书怎么样,你喜欢哪些内容,不喜欢哪些内容。读者的反馈对于帮助我们写出那些对读者真正有用的内容至关重要。

如果是给我们反馈一些普通的信息,你可以给feedback@packtpub.com这个邮箱发一封邮件即可,记得在你邮件的标题中提及相应的书名。

如果你是某一方面的专家并且对于写作或者撰稿有兴趣的话,你可以访问www.packtpub.com/authors,读读我们的作者指南。

客户支持

现在你成为了 Packt 出版社的一名尊敬的用户,为使你的购买物超所值,我们为你准备了一系列的东西。

下载本书中的示例代码

你可以登录账号下载到所有从http://www.packtpub.com购买的Packt的书的示例代码文件。如果你从别的地方购买了此书,可以访问 http://www.packtpub.com/support登记信息,文件将通过邮件直接发给你。

勘误

尽管我们已经非常小心谨慎,以确保内容的准确性,错误还是不可避免。如果你在书中发现了错误(这种错误可能是文字或者代码方面的),若你能向我们报告这些错误,我们将感激不尽。这样做,可以使其他读者免受这些错误带来的困扰,帮助改善该书的下一个版本。如果你发现了什么错误,请访问http://www.packtpub.com/support,选择书名,点击“提交错误”链接,然后输入你发现错误的详细内容,通知我们。一旦你指出的错误得到确认,你提交的内容就会被采纳,并加入一个已经存在的勘误列表中。你也可以访问这个链接http://www.packtpub.com/support,选择书名,查看相应书本已有的勘误表。

关于盗版

所有媒体的版权材料在互联网上被盗版是一个日趋严重的问题。在Packt,我们对于版权与许可证的保护工作是十分看重的。如果你发现任何非法复制我们作品的现象,无论以任何形式,只要在互联网上,请提供给我们对应的网络地址或网站名称,我们会立即对其行为进行纠正。

请通过copyright@packtpub.com联系我们,并附上可疑盗版材料的链接。

我们感谢你对保护作者权益的帮助,你的协助同时也保障了我们带给你更多有价值内容的能力。

如果你有疑问

如果你对书的某些方面有疑问,你可以通过questions@packtpub.com联系我们,我们会尽最大的努力为你解答。

作者简介

Jonathan R. Owens:软件工程师,拥有Java 和C++技术背景,最近主要从事Hadoop及相关分布式处理技术工作。

目前就职于 comScore 公司,为核心数据处理团队成员。comScore 是一家知名的从事数字测量与分析的公司,公司使用 Hadoop 及其他定制的分布式系统对数据进行聚合、分析和管理,每天处理超过400亿单的交易。

感谢我的父母James 和Patricia Owens,感谢他们对我的支持以及从小给我介绍科技知识。

Jon Lentz:comScore 核心数据处理团队软件工程师。comScore 是一家知名的在线受众测评与分析的公司。他更倾向于使用Pig脚本来解决问题。在加入comScore之前,他主要开发优化供应链和分配固定收益证券的软件。

我的女儿Emma出生在本书写作的过程中。感谢她深夜的陪伴!

Brian Femiano:本科毕业于计算机科学专业,并且从事相关专业软件开发工作6年,最近两年主要利用 Hadoop 构建高级分析与大数据存储。他拥有商业领域的相关经验,以及丰富的政府合作经验。他目前就职于弗吉尼亚的Potomac Fusion 公司,这家公司主要从事可扩展算法的开发,并致力于学习并改进政府领域中最先进和最复杂的数据集。他通过教授课程和会议培训在公司内部普及Hadoop和云计算相关的技术。

我要感谢合著者的耐心,以及他们为你们可以在书上看到的代码做出的努力。同时也要感谢Potomac Fusion 的许多同事,他们驾驭最前沿技术的才能和激情,以及促进知识传播的精神一直鼓舞着我。

审阅者简介

Edward J. Cody:作家、演讲师,大数据仓库、Oracle商业智能和Hyperion EPM实现的专家,是The Business Analyst's Guide to Oracle Hyperion Interactive Reporting 11的作者,以及The Oracle Hyperion Interactive Reporting 11 Expert Guide的合著者。在过去的职业生涯中,他给商业公司和联邦政府都做过咨询,目前主要从事大型EPM、BI和大数据仓库的实现研究。

本书的作者做了很好的工作,同时我要感谢Packt出版社让我有机会参与本书的编辑出版工作。

Daniel Jue:Sotera Defense Solutions公司的高级软件工程师,Apache 软件基金会成员之一,他曾在和平与战乱地区为ACSIM、DARPA和许多联邦机构工作,致力于揭示蕴藏在“大数据”背后的动力学与异常现象。Daniel 拥有马里兰大学帕克分校计算机专业的学士学位,并同时专注于物理学与天文学的研究,他目前的兴趣是将分布式人工智能技术应用到自适应异构的云计算中。

感谢我漂亮的妻子 Wendy,以及我的双胞胎儿子 Christopher 和Jonathan,在我审阅本书的过程中,他们给予了我关爱与耐心。非常感激Brian Femiano、Bruce Miller和Jonathan Larson,他们让我接触到了许多伟大的想法、观点,使我深受启发。

Bruce Miller:Sotera Defense Solutions公司高级软件工程师,目前受雇于美国国防部高级研究计划署(DARPA),并专注于大数据的软件开发10余年。工作之余喜欢用Haskell和Lisp语言进行编程,并将其应用于解决一些实际的问题。

第1章 Hadoop分布式文件系统——导入和导出数据

本章我们将介绍:

使用Hadoop shell 命令导入和导出数据到HDFS

使用distcp 实现集群间数据复制

使用Sqoop 从MySQL 数据库导入数据到HDFS

使用Sqoop 从HDFS 导出数据到MySQL

配置Sqoop 以支持SQL Server

从HDFS 导出数据到MongoDB

从MongoDB 导入数据到HDFS

使用Pig 从HDFS 导出数据到MongoDB

在Greenplum外部表中使用HDFS

利用Flume 加载数据到HDFS 中

 

1.1 介绍

在一个经典的数据架构中,Hadoop是处理复杂数据流的核心。数据往往是从许多分散的系统中收集而来,并导入 Hadoop 分布式文件系统(HDFS)中,然后通过 MapReduce或者其他基于MapReduce封装的语言(如Hive、Pig和Cascading等)进行处理,最后将这些已经过滤、转换和聚合过的结果导出到一个或多个外部系统中。

举个比较具体的例子,一个大型网站可能会做一些关于网站点击率的基础数据分析。从多个服务器中采集页面访问日志,并将其推送到HDFS中。启动一个MapReduce作业,并将这些数据作为 MapReduce 的输入,接下来数据将被解析、汇总以及与 IP 地址进行关联计算,最终得出URL、页面访问量和每个cookie的地理位置数据。生成的相关结果可以导入关系型数据库中。即席查询(Ad-hoc query) [1]此时就可以构建在这些数据上了。分析师可以快速地生成各种报表数据,例如,当前的独立用户数、用户访问最多的页面、按地区对用户进行拆分及其他的数据汇总。

本章的重点将关注 HDFS数据的导入与导出,主要内容包含与本地文件系统、关系数据库、NoSQL数据库、分布式数据库以及其他Hadoop集群之间数据的互相导入和导出。

 

1.2 使用Hadoop shell命令导入和导出数据到HDFS

HDFS提供了许多shell命令来实现访问文件系统的功能,这些命令都是构建在HDFS FileSystem API 之上的。Hadoop 自带的shell 脚本是通过命令行来执行所有操作的。这个脚本的名称叫做hadoop,通常安装在$HADOOP_BIN目录下,其中$HADOOP_BIN是Hadoopbin文件完整的安装目录,同时有必要将$HADOOP_BIN配置到$PATH环境变量中,这样所有的命令都可以通过hadoop fs -command这样的形式来执行。

如果需要获取文件系统的所有命令,可以运行hadoop命令传递不带参数的选项fs。

hadoop fs

这些按照功能进行命名的命令的名称与Unix shell 命令非常相似。使用help选项可以获得某个具体命令的详细说明。

hadoop fs –help ls

这些shell命令和其简要的参数描述可在官方在线文档http://hadoop.apache.org/docs/r1.0.4/file_system_shell.html中进行查阅。

在这一节中,我们将使用Hadoop shell命令将数据导入HDFS 中,以及将数据从HDFS中导出。这些命令更多地用于加载数据,下载处理过的数据,管理文件系统,以及预览相关数据。掌握这些命令是高效使用HDFS的前提。

准备工作

你需要在 http://www.packtpub.com/support 这个网站上下载数据集 weblog_entries.txt。

操作步骤

完成以下步骤,实现将weblog_entries.txt文件从本地文件系统复制到HDFS上的一个指定文件夹下。

1.在HDFS中创建一个新文件夹,用于保存weblog_entries.txt文件:

hadoop fs -mkdir /data/weblogs

2.将weblog_entries.txt文件从本地文件系统复制到HDFS刚创建的新文件夹下:

hadoop fs -copyFromLocal weblog_entries.txt /data/weblogs

3.列出HDFS上weblog_entries.txt文件的信息:

hadoop fs –ls /data/weblogs/weblog_entries.txt

在Hadoop处理的一些结果数据可能会直接被外部系统使用,可能需要其他系统做更进一步的处理,或者MapReduce处理框架根本就不符合该场景,任何类似的情况下都需要从HDFS上导出数据。下载数据最简单的办法就是使用Hadoop shell。

4.将HDFS上的weblog_entries.txt文件复制到本地系统的当前文件夹下:

hadoop fs -copyToLocal /data/weblogs/weblog_entries.txt ./weblog_entries.txt

复制HDFS的文件到本地文件系统时,需要保证本地文件系统的空间可用以及网络连接的速度。HDFS中的文件大小在几个TB到几十个TB是很常见的。在1Gbit网络环境下,从HDFS 中导出10 TB数据到本地文件系统,最好的情况下也要消耗23个小时,当然这还要保证本地文件系统的空间是可用的。

下载本书的示例代码

你可以从 http://www.packtpub.com下载你买过的任何书的示例代码,如果你买过本书还可以访问http://www.packtpub.com/support,并进行注册来让文件直接发送到你的邮箱

工作原理

Hadoop shell非常轻量地封装在HDFS FileSystem API 之上。在执行hadoop命令时,如果传进去的参数是fs,实际上执行的是org.apache.hadoop.fs.FsShell这个类。在0.20.2版本中FsShell实例化了一个org.apache.hadoop.fs.FileSystem对象,并且将命令行参数与类方法映射起来。比如,执行 hadoop fs –mkdir /data/weblogs 相当于调用FileSystem.mkdirs(new Path("/data/weblogs"))。同样,运行hadoop fs –copyFromLocal weblog_entries.txt /data/weblogs 相当于在调用 FileSystem.copyFromLocal(newPath ("weblog_entries.txt"), new Path("/data/weblogs"))。HDFS 数据复制到本地系统的实现方式也是一样,等同于调用 FileSystem.copyToLocal(newPath("/data/weblogs/ weblog_entries.txt"), new Path("./weblog_entries.txt"))。更多关于文件系统的接口信息描述可以见官方文档 http://hadoop.apache.org/docs/r1.0.4/api/org/ apache/ hadoop/fs/FileSystem.html。

mkdir可以通过hadoop fs -mkdir PATH1 PATH2的形式来执行。例如,hadoop fs –mkdir /data/weblogs/12012012 /data/ weblogs/12022012将会在HDFS 系统上分别创建两个文件夹/data/weblogs/12012012 和/data/weblogs/12022012。如果文件夹创建成功则返回0,否则返回-1。

hadoop fs –mkdir /data/weblogs/12012012 /data/weblogs/12022012

hadoop fs –ls /data/weblogs

copyFromLocal可以通过hadoop fs –copyFromLocal LOCAL_FILE_PATH URI的形式来执行,如果URI的地址(指的是HDFS://filesystemName:9000这个串)没有明确给出,则默认会读取core-site.xml中的fs.default.name这个属性。上传成功返回0,否则返回-1。

copyToLocal 命令可以通过 hadoop fs –copyToLocal [-ignorecrc] [-crc]URILOCAL_FILE_PATH 的形式来执行。如果 URI 的地址没有明确的给出,则默认会读取core-site.xml中的fs.default.name这个属性。copyToLocal会执行CRC(Cyclic Redundancy Check)校验保证已复制的数据的正确性,一个失败的副本复制可以通过参数–ignorecrc来强制执行,还可以通过-crc参数在复制文件的同时也复制crc校验文件。

更多参考

put命令与copyFromLocal类似,put更通用一些,可以复制多个文件到HDFS中,也能接受标准输入流。

任何使用copyToLocal的地方都可以用get替换,两者的实现一模一样。

在使用MapReduce处理大数据时,其输出结果可能是一个或者多个文件。最终输出结果的文件个数是由 mapred.reduce.tasks 的值决定的。我们可以在 Jobconf 类中通过setNumReduceTasks()方法来设置这个属性,改变提交作业的reduce个数,每个reduce将对应输出一个文件。该参数是客户端参数,非集群参数,针对不同的作业应该设置不同的值。其默认值为1,意味着所有Map(映射函数,以下都用Map表示)的输出结果都将复制到1 个reducer 上进行处理。除非Map 输出的结果数据小于1 GB,否则默认的配置将不合适。reduce 个数的设置更像是一门艺术而不是科学。在官方的文档中对其设置推荐的两个公式如下:

0.95×NUMBER_OF_NODES×mapred.tasktracker.reduce.tasks.maximum

或者

1.75×NUMBER_OF_NODES×mapred.tasktracker.reduce.tasks.maximum

假设你的集群有10 个节点来运行Task Tracker,每个节点最多可以启动5个reduce 槽位(通过设置 tasktracker.reduce.tasks.maximum 这个值来决定每个节点所能启动的最大reduce槽位数)对应的这个公式应该是0.95×10×4=47.5。因为reduce个数的设置必须是整数,所以需要进行四舍五入。

JobConf 文档中给出了使用这两个公式的理由,具体见 http://hadoop. apache.org/docs/current/api/org/apache/hadoop/mapred/JobConf.html#setNumReduceTasks(int)。

0.95可以保证在map结束后可以立即启动所有的reduce进行map结果的复制,只需要一波就可以完成作业。1.75使得运行比较快的reducer能够再执行第二波reduce,保证两波reduce就能完成作业,使作业整体的负载均衡保持得比较好。

reduce输出的数据存储在HDFS中,可以通过文件夹的名称来引用。若文件夹作为一个作业的输入,那么该文件夹下的所有文件都会被处理。上文介绍的get和copyToLocal只能对文件进行复制,无法对整个文件夹进行复制 [2]。当然Hadoop提供了getmerge命令,可以将文件系统中的多个文件合并成一个单独的文件下载到本地文件系统。

通过以下Pig脚本来演示下getmerge命令的功能:

weblogs = load '/data/weblogs/weblog_entries.txt' as

(md5:chararray,

url:chararray,

date:chararray,

time:chararray,

ip:chararray);

md5_grp = group weblogs by md5 parallel 4;

store md5_grp into '/data/weblogs/weblogs_md5_groups.bcp';

Pig脚本可以通过下面的命令行来执行:

pig –f weblogs_md5_group.pig

该脚本逐行读取HDFS上的weblog_entries.txt文件,并且按照md5的值进行分组。parallel是Pig脚本用来设置reduce个数的方法。由于启动了4个reduce任务,所以会在输出的目录/data/weblogs/weblogs_md5_groups.bcp中生成4个文件。

注意,weblogs_md5_groups.bcp实际上是一个文件夹,显示该文件夹的列表信息可以看到:

在/data/weblogs/weblogs_md5_groups.bcp中包含4个文件,即part-r-00000、part-r-00001、part-r-00002和part-r-00003。

getmerge命令可以用来将4个文件合并成一个文件,并且复制到本地的文件系统中,具体命令如下:

hadoop fs -getmerge /data/weblogs/weblogs_md5_groups.bcp weblogs_md5_groups.bcp

操作完我们可以看到本地文件列表如下:

延伸阅读

关于HDFS数据的读写,我们将在第2章中重点介绍如何直接利用文件系统的API进行读写。

通过以下的两个链接可以对比出文件系统shell 命令与Java API 的不同:

http://hadoop.apache.org/docs/r1.0.4/file_system_shell.html

http://hadoop.apache.org/docs/r1.0.4/api/org/apache/hadoop/fs/FileSystem.html

 

1.3 使用distcp实现集群间数据复制

Hadoop 分布式复制(distcp)是 Hadoop 集群间复制大量数据的高效工具。distcp是通过启动 MapReduce 实现数据复制的。使用 MapReduce 的好处包含可并行性、高容错性、作业恢复、日志记录、进度汇报等。Hadoop 分布式复制(distcp)对在开发集群环境、研究集群环境和生产集群环境之间进行数据复制十分有用。

准备工作

首先必须保证复制源和复制目的地能够互相访问。

最好关闭复制源集群map任务的推测机制,可以在配置文件 mapred-site.xml中将mapred.map.tasks.speculative.execution的值设置为false来实现,这样就可以避免在map任务失败的时候产生任何不可知的行为。

源集群和目的集群的RPC协议必须是一致。这意味着两个集群之间安装的Hadoop版本必须一致 [3]

操作步骤

完成以下几个步骤实现集群间的文件夹复制。

1.将集群A的weblogs文件夹复制到集群B上:

hadoop distcp hdfs://namenodeA/data/weblogs hdfs://namenodeB/data/weblogs

2.将集群A的weblogs文件夹复制到集群B并覆盖已存在文件:

hadoop distcp –overwrite hdfs://namenodeA/data/weblogs hdfs://namenodeB/data/weblogs

3.同步集群A和集群B之间的weblogs文件夹:

hadoop distcp –update hdfs://namenodeA/data/weblogs hdfs://namenodeB/data/weblogs

工作原理

在源集群,文件夹中的内容将被复制为一个临时的大文件。将会启动一个只有 map (map-only [4])的MapReduce作业来实现两个集群间的数据复制。默认情况下,每个map就将会分配到一个256 MB的数据块文件。举个例子,如果weblogs 文件夹总大小为10 GB,默认将会启动40 个map,每个map 会复制大约256 MB 的数据。distcp复制也可以通过参数手动设置启动的map数量。

hadoop distcp –m 10 hdfs://namenodeA/data/weblogs hdfs://namenodeB/data/ weblogs

在上面这个例子中,将会启动10个map进行数据复制。如果weblogs文件夹的总大小是10 GB,那么每个map 会复制大约1 GB 的数据。

更多参考

如果要在运行的 Hadoop 版本不一样的两个集群之间进行数据复制,一般建议在复制源集群使用HftpFileSystem [5]。HftpFileSystem是一个只读的文件系统。相应的distcp命令只能在目标服务器上运行:

hadoop distcp hftp://namenodeA:port/data/weblogs hdfs://namenodeB/data/ weblogs

在上面这条命令中,port的值要与配置文件 hdfs-site.xml中 dfs.http.address属性的端口值一致。

 

1.4 使用Sqoop从MySQL数据库导入数据到HDFS

Sqoop是Apache基金下的一个项目,是庞大Hadoop生态圈中的一部分。在很多方面Sqoop和distcp很相似(见1.3节)。这两个工具都是构建在MapReduce之上的,利用了MapReduce的并行性和容错性。与集群间的数据复制不同,Sqoop设计通过JDBC驱动连接实现Hadoop集群与关系数据库之间的数据复制。

它的功能非常广泛,本节将以网络日志条目为例展示如何使用Sqoop从MySQL数据库导入数据到HDFS。

准备工作

本例子使用Sqoop V1.3.0 版本。

如果你使用的是 CDH3 版本,Sqoop 默认是已经安装了。如果不是 CDH3,你可以通过https://ccp.cloudera.com/display/CDHDOC/Sqoop+Installation找到发行版的说明。

在本节假设你已经启动了一个MySQL实例,并且能够访问Hadoop集群 [6]。mysql.user该表配置了你运行 Sqoop 的那台机器上被允许连接的用户。访问 http://dev.mysql.com/doc/refman/5.5/en/installing.html获取更多关于MySQL安装与配置的相关信息。

将 MySQL JDBC 驱动包复制到$SQOOP_HOME/libs [7]目录下。该驱动包可以从http://dev.mysql.com/downloads/connector/j/下载。

操作步骤

完成以下步骤实现将MySQL表数据导出到HDFS中。

1.在MySQL实例中创建一个新数据库:

CREATE DATABASE logs;

2.创建并载入表weblogs:

USE logs;

CREATE TABLE weblogs (

md5   VARCHAR(32),

url   VARCHAR(64),

request_date DATE,

request_time TIME,

ip    VARCHAR(15)

);

LOAD DATA INFILE '/path/weblog_entries.txt' INTO TABLE weblogs

FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\r\n';

3.查询weblogs表的行数:

mysql> select count(*) from weblogs;

输出结果将会是:

+----------+

| count(*) |

+----------+

| 3000 |

+----------+

1 row in set (0.01 sec)

4.将MySQL数据导出到HDFS:

sqoop import -m 1 --connect jdbc:mysql://<HOST>:<PORT>/logs--username hdp_usr --password test1 --table weblogs --target-dir /data/weblogs/import

输出结果将会是:

INFO orm.CompilationManager: Writing jar file:

/tmp/sqoop-jon/compile/f57ad8b208643698f3d01954eedb2e4d/weblogs. jar

WARN manager.MySQLManager: It looks like you are importing from mysql.

WARN manager.MySQLManager: This transfer can be faster! Use the --direct

WARN manager.MySQLManager: option to exercise a MySQL-specific fast path.

...

INFO mapred.JobClient: Map input records=3000

INFO mapred.JobClient: Spilled Records=0

INFO mapred.JobClient: Total committed heap usage (bytes)=85000192

INFO mapred.JobClient: Map output records=3000

INFO mapred.JobClient: SPLIT_RAW_BYTES=87

INFO mapreduce.ImportJobBase: Transferred 245.2451 KB in 13.7619 seconds

(17.8206 KB/sec)

INFO mapreduce.ImportJobBase: Retrieved 3000 records.

工作原理

Sqoop 连接数据库的 JDBC 驱动在--connect 语句中定义,并从$SQOOP_HOME/libs目录中加载相应的包,其中$SQOOP_HOME 为 Sqoop 安装的绝对路径。--username 和--password选项用于验证用户访问 MySQL 实例的权限。mysql.user 表必须包含 Hadoop集群每个节点的主机域名以及相应的用户名,否则Sqoop将会抛出异常,表明相应的主机不允许被连接到MySQL服务器。

mysql> USE mysql;

mysql> select host, user from user;

显示输出如下:

+------------+-----------+

| user  | host  |

+------------+-----------+

| hdp_usr | hdp01 |

| hdp_usr | hdp02 |

| hdp_usr | hdp03 |

| hdp_usr | hdp04 |

| root  | 127.0.0.1 |

| root  | ::1  |

| root  | localhost |

+------------+-----------+

7 rows in set (1.04 sec)

在这个例子中,我们使用hdp_usr用户连接到MySQL服务器。我们的集群拥有4台机器,即hdp01、hdp02、hdp03和hdp04。

--table变量告诉Sqoop哪个表需要被导入。在我们的例子中,是要导入weblogs这个表到HDFS。--target-dir变量决定了导出的表数据将被存储在HDFS的哪个目录下:

hadoop fs -ls /data/weblogs/import

输出结果为:

-rw-r--r-- 1 hdp_usr hdp_grp 0  2012-06-08  23:47 /data/

weblogs/import/_SUCCESS

drwxr-xr-x- - hdp_usr hdp_grp 0  2012-06-08  23:47 /data/

weblogs/import/_logs

-rw-r--r-- 1 hdp_usr hdp_grp 251131 2012-06-08 23:47 /data/

weblogs/import/part-m-00000

默认情况下,导入的数据将按主键进行分割。如果导入的表并不包含主键,必须指定-m或者--split-by参数决定导入的数据如何分割。在前面的例子中,使用了-m参数。-m参数决定了将会启动多少个mapper来执行数据导入。因为将-m设置为1,所以就启动了一个mapper用于导入数据。每个mapper将产生一个独立的文件。

这行命令背后隐藏了相当复杂的逻辑。Sqoop 利用数据库中存储的元数据生成每一列的DBWritable类,这些类使用了DBInputFormat。DBInputFormat是Hadoop用来格式化读取数据库任意查询的结果。在前面的例子中,启动了一个使用 DBInputFormat 索引weblogs 表内容的 MapReduce 作业。整个 weblogs 表被扫描并存储在 HDFS 的路径/data/weblogs/import下。

更多参考

使用 Sqoop 导入数据还有很多有用的参数可以配置。Sqoop 可以分别通过参数--as-avrodatafile和--as-sequencefile将数据导入为Avro文件和序列化的文件。通过-z或者--compress参数可以在导入的过程中对数据进行压缩。默认的压缩方式为GZIP压缩,可以通过--compression-codec <CODEC>参数使用Hadoop 支持的任何压缩编码。可以查看第2章的使用LZO压缩数据那一节的介绍。另一个有用的参数是--direct,该参数指示 Sqoop 直接使用数据库支持的本地导入导出工具。在前面的例子中,如果--direct被添加为参数,Sqoop 将使用 mysqldump 工具更快地导出 weblogs 表的数据。--direct参数非常重要以至于我们在运行前面的日志会打印出如下的日志信息:

WARN manager.MySQLManager: It looks like you are importing from mysql.

WARN manager.MySQLManager: This transfer can be faster! Use the --direct

WARN manager.MySQLManager: option to exercise a MySQL-specific fast path.

延伸阅读

使用Sqoop 从HDFS 导出数据到MySQL(1.5 节)。

 

1.5 使用Sqoop从HDFS导出数据到MySQL

Sqoop是Apache基金会下的一个项目,是庞大Hadoop生态圈中的一部分。在很多方面Sqoop和distcp很相似(见1.3节)。这两个工具都是构建在MapReduce之上的,利用了MapReduce的并行性和容错性。与集群间的数据复制不同,Sqoop设计通过JDBC驱动连接实现Hadoop集群与关系数据库之间的数据复制。

它的功能非常广泛,本节将以网络日志条目为例展示如何使用Sqoop从HDFS导入数据到MySQL数据库。

准备工作

本例使用Sqoop V1.3.0 版本。

如果你使用的是CDH3版本,Sqoop默认是已经安装了。如果不是CDH3,你可以通过 https://ccp.cloudera.com/display/CDHDOC/Sqoop+Installation找到发行版的说明。

在本节假设你已经启动了一个MySQL实例,并且能够访问Hadoop集群。mysql.user表配置了你运行 Sqoop 的那台机器上被允许连接的用户。访问 http://dev.mysql.com/doc/refman/5.5/en/installing.html获取更多关于MySQL安装与配置的相关信息。

将 MySQL JDBC 驱动包复制到$SQOOP_HOME/libs 目录下。该驱动包可以从http://dev.mysql.com/downloads/connector/j/下载。

按照1.1节介绍的导入weblog_entires.txt文件到HDFS的方式操作。

操作步骤

完成以下步骤实现将HDFS数据导出到MySQL表中。

1.在MySQL实例中创建一个新数据库:

CREATE DATABASE logs;

2.创建表weblogs_from_hdfs:

USE logs;

CREATE TABLE weblogs_from_hdfs (

md5   VARCHAR(32),

url   VARCHAR(64),

request_date DATE,

request_time TIME,

ip    VARCHAR(15)

);

3.从HDFS导出weblog_entries.txt文件到MySQL:

sqoop export -m 1 --connect jdbc:mysql://<HOST>:<PORT>/logs --username hdp_usr

--password test1 --table weblogs_from_hdfs --export-dir /data/weblogs/05102012

--input-fields-terminated-by '\t' --mysql-delmiters

输出结果如下:

INFO mapreduce.ExportJobBase: Beginning export of weblogs_from_

hdfs

input.FileInputFormat: Total input paths to process : 1

input.FileInputFormat: Total input paths to process : 1

mapred.JobClient: Running job: job_201206222224_9010

INFO mapred.JobClient: Map-Reduce Framework

INFO mapred.JobClient: Map input records=3000

INFO mapred.JobClient: Spilled Records=0

INFO mapred.JobClient: Total committed heap usage

(bytes)=85000192

INFO mapred.JobClient: Map output records=3000

INFO mapred.JobClient: SPLIT_RAW_BYTES=133

INFO mapreduce.ExportJobBase: Transferred 248.3086 KB in 12.2398

seconds (20.287 KB/sec)

INFO mapreduce.ExportJobBase: Exported 3000 records.

工作原理

Sqoop连接数据库的JDBC驱动可使用-connect参数声明,并从$SQOOP_HOME/libs目录中加载相应的包。其中$SQOOP_HOME 为 Sqoop 安装的绝对路径。--username 和--password选项用于验证用户访问MySQL实例的权限。mysql.user表必须包含Hadoop集群每个节点的主机域名以及相应的用户名,否则Sqoop将会抛出异常,表明相应的主机不允许被连接到MySQL服务器。

mysql> USE mysql;

mysql> select host, user from user;

+---------------+-----------+

| user   | host  |

+---------------+-----------+

| hdp_usr  | hdp01 |

| hdp_usr  | hdp02 |

| hdp_usr  | hdp03 |

| hdp_usr  | hdp04 |

| root   | 127.0.0.1 |

| root   | ::1  |

| root   | localhost |

+---------------+-----------+

7 rows in set (1.04 sec)

在这个例子中,我们使用hdp_usr用户连接到MySQL服务器。我们的集群拥有4台机器,即hdp01、hdp02、hdp03和hdp04。

--table参数决定了HDFS导出的数据将存储在哪个MySQL表中。这个表必须在执行Sqoop export语句之前创建好。Sqoop 通过表的元数据信息、列数量以及列类型来校验HDFS 需要导出目录中的数据并生成相应的插入语句。举个例子,导出作业可以被想象为逐行读取HDFS的weblogs_entries.txt文件并产生以下输出:

INSERT INTO weblogs_from_hdfs

VALUES('aabba15edcd0c8042a14bf216c5', '/jcwbtvnkkujo.html', '2012-05- 10', '21:25:44', '148.113.13.214');

INSERT INTO weblogs_from_hdfs

VALUES('e7d3f242f111c1b522137481d8508ab7', '/ckyhatbpxu.html', '2012- 05-10', '21:11:20', '4.175.198.160');

INSERT INTO weblogs_from_hdfs

VALUES('b8bd62a5c4ede37b9e77893e043fc1', '/rr.html', '2012-05-10', '21:32:08','24.146.153.181');

...

Sqoop export默认情况下是创建新增语句。如果--update-key参数被设置了,则将是创建更新语句。如果前面的例子使用了参数--update-key md5 那么生成的 Sql 代码将运行如下:

UPDATE weblogs_from_hdfs SET url='/jcwbtvnkkujo.html', request_ date='2012-05-10'request_time='21:25:44'

ip='148.113.13.214'WHERE md5='aabba15edcd0c8042a14bf216c5'

UPDATE weblogs_from_hdfs SET url='/jcwbtvnkkujo.html', request_ date='2012-05- 10' request_time='21:11:20' ip='4.175.198.160' WHERE md5='e7d3f242f111c1b522137481d8508ab7'

UPDATE weblogs_from_hdfs SET url='/jcwbtvnkkujo.html', request_ date='2012- 05-10'request_time='21:32:08' ip='24.146.153.181' WHERE md5='b8bd62a5c4ede37b9e77893e043fc1'

如果--update-key设置的值并没找到,可以设置--update-mode为allowinsert允许新增这行数据。

-m 参数决定将配置几个 mapper 来读取 HDFS 上文件块。每个 mapper 各自建立与MySQL服务器的连接。每个语句将会插入100条记录。当完成100条语句也就是插入10000条记录,将会提交当前事务。一个失败的map任务,很可能导致数据的不一致,从而出现插入冲突数据或者插入重复数据。这种情况可以通过使用参数--staging-table来解决。这会促使任务将数据插入一个临时表,等待一个事务完成再将数据从临时表复制到--table 参数配置的表中。临时表结构必须与最终表一致。临时表必须是一个空表否则需要配置参数--clear-staging-table。

延伸阅读

使用Sqoop 从MySQL 数据库导入HDFS(1.4 节)。

 

1.6 配置Sqoop以支持SQL Server

本节将展示如何配置Sqoop和SQL Server数据库进行连接。这样可以允许数据从SQLServer数据库有效地导入HDFS中。

准备工作

本例子使用了Sqoop V1.3.0 版本。

如果你使用的是 CDH3 版本,Sqoop 默认是已经安装了。如果不是 CDH3,你可以通过https://ccp.cloudera.com/display/CDHDOC/Sqoop+Installation找到发行版的说明。

在本节假设你已经启动了一个Microsoft SQL Server实例,并且能够与Hadoop 集群正常连接。

操作步骤

完成以下步骤配置Sqoop 连接Microsoft SQL Server。

1.从 http://download.microsoft.com/download/D/6/A/D6A241AC-433E-4CD2- A1CE- 50177E8428F0/1033/sqljdbc_3.0.1301.101_enu.tar.gz 下载 Microsoft SQL Server JDBC驱动3.0。该下载包包含了SQL Server JDBC 驱动(sqljdbc4.jar)。Sqoop连接关系数据库是通过JDBC驱动的。

2.解压缩TAR文件:

gzip -d sqljdbc_3.0.1301.101_enu.tar.gz

tar -xvf sqljdbc_3.0.1301.101_enu.tar

这将创建一个新的文件夹 sqljdbc_3.0。

3.复制sqljdbc4.jar到$SQOOP_HOME/lib目录下:

cp sqljdbc_3.0/enu/sqljdbc4.jar $SQOOP_HOME/lib

Sqoop 可以访问sqljdbc4.jar文件,并且可以通过该驱动包访问SQL Server实例。

4.为Hadoop 下载微软SQL Server连接器:

http://download.microsoft.com/download/B/E/5/BE5EC4FD-9EDA-

4C3F-8B36-1C8AC4CE2CEF/sqoop-sqlserver-1.0.tar.gz.

5.解压缩TAR文件:

gzip -d sqoop-sqlserver-1.0.tar.gz

tar -xvf sqoop-sqlserver-1.0.tar

这将创建一个新的文件夹sqoop-sqlserver-1.0。

6.设置 MSSQL_CONNECTOR_HOME环境变量:

export MSSQL_CONNECTOR_HOME=/path/to/sqoop-sqlserver-1.0

7.运行安装脚本:

./install.sh

8.导入导出数据可以查看1.4 节和1.5 节。这两节的内容对SQL Server同样适用,只是需要把--connect参数修改为——connect jdbc:sqlserver://<HOST>:<PORT>。

工作原理

Sqoop和数据库之间通过JDBC连接。只要将sqljdbc4.jar添加到$SQOOP_HOME/lib目录下,Sqoop 就可以使用--connect jdbc:sqlserver://<HOST>:<PORT>连接 SQL Server数据库实例。为了使SQL Server 与Sqoop 有充分的兼容性,修改了一些必要的配置,这些配置可以通过运行install.sh脚本来实现更新。

 

1.7 从HDFS导出数据到MongoDB

本节将使用MongoOutputFormat类加载HDFS中的数据并收集到MongoDB中。

准备工作

使用Mongo Hadoop 适配器最简单的方法是从GitHub 上克隆 Mongo-Hadoop 工程,并且将该工程编译到一个特定的Hadoop版本。克隆该工程需要安装一个Git客户端。

本节假定你使用的Hadoop版本是CDH3。

Git客户端官方的下载地址是:http://git-scm.com/downloads。

在Windows操作系统上可以通过http://windows.github.com/访问GitHub。

在Mac操作系统上可以通过http://mac.github.com/访问GitHub。

可以通过https://github.com/mongodb/mongo-hadoop获取到Mongo Hadoop适配器。该工程需要编译在特定的Hadoop版本上。编译完的JAR文件需要复制到Hadoop集群每个节点的$HADOOP_HOME/lib目录下。

Mongo Java的驱动包也需要安装到Hadoop集群每个节点的$HADOOP_HOME/lib目录下。该驱动包可从https://github.com/mongodb/mongo-java-driver/downloads下载。

操作步骤

完成下面步骤实现将HDFS数据复制到MongoDB。

1.通过下面的命令实现克隆mongo-hadoop工程:

git clone https://github.com/mongodb/mongo-hadoop.git

2.切换到稳定发布的1.0分支版本:

git checkout release-1.0

3.必须保持mongo-hadoop与Hadoop的版本一致。使用文本编辑器打开mongo-hadoop克隆目录下的build.sbt文件,将下面这行:

hadoopRelease in ThisBuild := "default"

修改为:

hadoopRelease in ThisBuild := "cdh3"

4.编译mongo-hadoop:

./sbt package.

这将会在core/target文件夹下生成一个名为mongo-hadoop-core_cdh3u3-1.0.0.jar的JAR文件。

5.从 https://github.com/mongodb/mongo-java-driver/downloads下载 MongoDB2.8.0版本的Java驱动包。

6.复制mongo-hadoop和MongoDB Java 驱动包到Hadoop 集群每个节点的$HADOOP_HOME/lib:

cp mongo-hadoop-core_cdh3u3-1.0.0.jar mongo-2.8.0.jar $HADOOP_HOME/lib

7.编写MapReduce读取HDFS上weblog_entries.txt文件并通过MongoOutputFormat类将数据写入MongoDB中:

import java.io.*;

import org.apache.commons.logging.*;

import org.apache.hadoop.conf.*;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.*;

import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import org.apache.hadoop.mapreduce.*;

import org.bson.*;

import org.bson.types.ObjectId;

import com.mongodb.hadoop.*;

import com.mongodb.hadoop.util.*;

public class ExportToMongoDBFromHDFS {

private static final Log log =

LogFactory.getLog(ExportToMongoDBFromHDFS.class);

public static class ReadWeblogs extends Mapper<LongWritable, Text,

ObjectId, BSONObject>{

public void map(Text key, Text value, Context context) throws

IOException, InterruptedException{

System.out.println("Key: " + key);

System.out.println("Value: " + value);

String[] fields = value.toString().split("\t");

String md5 = fields[0];

String url = fields[1];

String date = fields[2];

String time = fields[3];

String ip = fields[4];

BSONObject b = new BasicBSONObject();

b.put("md5", md5);

b.put("url", url);

b.put("date", date);

b.put("time", time);

b.put("ip", ip);

context.write( new ObjectId(), b);

}

}

public static void main(String[] args) throws Exception{

final Configuration conf = new Configuration();

MongoConfigUtil.setOutputURI(conf,

"mongodb://<HOST>:<PORT>/test. weblogs");

System.out.println("Configuration: " + conf);

final Job job = new Job(conf, "Export to Mongo");

Path in = new Path("/data/weblogs/weblog_entries.txt");

FileInputFormat.setInputPaths(job, in);

job.setJarByClass(ExportToMongoDBFromHDFS.class);

job.setMapperClass(ReadWeblogs.class);

job.setOutputKeyClass(ObjectId.class);

job.setOutputValueClass(BSONObject.class);

job.setInputFormatClass(TextInputFormat.class);

job.setOutputFormatClass(MongoOutputFormat.class);

job.setNumReduceTasks(0);

System.exit(job.waitForCompletion(true) ? 0 : 1 );

}

}

8.导出为一个可运行的JAR文件,并运行该作业:

hadoop jar ExportToMongoDBFromHDFS.jar

9.在Mongo shell 上验证weblogs 已经导入MongoDB:

db.weblogs.find();

工作原理

Mongo Hadoop适配器提供了一种新的兼容Hadoop的文件系统实现包括MongoInputFormat和MongoOutputFormat。这些抽象实现使得访问MongoDB和访问任何兼容Hadoop的文件系统一样。

 

1.8 从MongoDB导入数据到HDFS

本节将使用MongoInputFormat类加载MongoDB中的数据导入HDFS中。

准备工作

使用Mongo Hadoop 适配器最简单的方法是从GitHub 上克隆Mongo-Hadoop 工程,并且将该工程编译到一个特定的Hadoop版本。克隆该工程需要安装一个Git客户端。

本节假定你使用的Hadoop版本是CDH3。

Git客户端官方的下载地址是:http://git-scm.com/downloads。

在Windows操作系统上可以通过http://windows.github.com/访问GitHub。

在Mac操作系统上可以通过http://mac.github.com/访问GitHub。

可以通过https://github.com/mongodb/mongo-hadoop获取到Mongo Hadoop适配器。该工程需要编译在特定的Hadoop版本上。编译完的JAR文件需要复制到Hadoop集群每个节点的$HADOOP_HOME/lib目录下。

Mongo Java驱动包也需要安装到Hadoop集群每个节点的$HADOOP_HOME/lib目录下。该驱动包可从https://github.com/mongodb/mongo-java-driver/downloads下载。

操作步骤

完成下面步骤实现将MongoDB中的数据复制到HDFS中。

1.通过下面的命令实现克隆mongo-hadoop工程:

git clone https://github.com/mongodb/mongo-hadoop.git

2.切换到稳定发布的1.0分支版本:

git checkout release-1.0

3.必须保持 mongo-hadoop 与 Hadoop 的版本一致。使用文本编辑器打开 mongo-hadoop克隆目录下的build.sbt文件,修改下面这行:

hadoopRelease in ThisBuild := "default"

修改为:

hadoopRelease in ThisBuild := "cdh3"

4.编译mongo-hadoop:

./sbt package.

这将会在core/target文件夹下生成一个名为mongo-hadoop-core_cdh3u3-1.0.0.jar的JAR文件。

5.从https://github.com/mongodb/mongo-java-driver/downloads下载MongoDB2.8.0版本的Java驱动包。

6.复制mongo-hadoop和MongoDB Java 驱动包到Hadoop 集群每个节点的$HADOOP_HOME/lib:

cp mongo-hadoop-core_cdh3u3-1.0.0.jar mongo-2.8.0.jar $HADOOP_HOME/lib

7.编写MapReduce读取MongoDB数据库中的数据并写入HDFS中:

import java.io.*;

import org.apache.commons.logging.*;

import org.apache.hadoop.conf.*;

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.*;

import org.apache.hadoop.mapreduce.lib.output.*;

import org.apache.hadoop.mapreduce.*;

import org.bson.*;

import com.mongodb.hadoop.*;

import com.mongodb.hadoop.util.*;

public class ImportWeblogsFromMongo {

private static final Log log = LogFactory.

getLog(ImportWeblogsFrom Mongo.class);

public static class ReadWeblogsFromMongo extends Mapper<Object,

BSONObject, Text, Text>{

public void map(Object key, BSONObject value, Context context) throws

IOException, InterruptedException{

System.out.println("Key: " + key);

System.out.println("Value: " + value);

String md5 = value.get("md5").toString();

String url = value.get("url").toString();

String date = value.get("date").toString();

String time = value.get("time").toString();

String ip = value.get("ip").toString();

String output = "\t" + url + "\t" + date + "\t" +

time + "\t" + ip;

context.write( new Text(md5), new Text(output));

}

}

public static void main(String[] args) throws Exception{

final Configuration conf = new Configuration();

MongoConfigUtil.setInputURI(conf, "mongodb://<HOST>:<PORT>/test.weblogs");

MongoConfigUtil.setCreateInputSplits(conf, false);

System.out.println("Configuration: " + conf);

final Job job = new Job(conf, "Mongo Import");

Path out = new Path("/data/weblogs/mongo_import");

FileOutputFormat.setOutputPath(job, out);

job.setJarByClass(ImportWeblogsFromMongo.class);

job.setMapperClass(ReadWeblogsFromMongo.class);

job.setOutputKeyClass(Text.class);

job.setOutputValueClass(Text.class);

job.setInputFormatClass(MongoInputFormat.class);

job.setOutputFormatClass(TextOutputFormat.class);

job.setNumReduceTasks(0);

System.exit(job.waitForCompletion(true) ? 0 : 1 );

}

}

这个只有map 的作业用到了Mongo Hadoop 适配器提供的几个类。从HDFS 读入的数据会被转换成一个BSONObject对象。该类描述的是一个二进制的JSON值。MongoDB使用这些BSONObject对象来有效地序列化、传输和存储数据。

Mongo Hadoop 适配器还提供了一个方便的工具类 MongoConfigUtil,使得可以把MongoDB当成是一个文件系统来访问。

8.导出为一个可运行的JAR文件,并运行该作业:

hadoop jar ImportWeblogsFromMongo.jar

9.验证weblogs数据是否已经导入HDFS中:

hadoop fs -ls /data/weblogs/mongo_import

工作原理

Mongo Hadoop适配器提供了一种新的兼容Hadoop的文件系统实现,包括MongoInputFormat和MongoOutputFormat。这些抽象实现使得访问MongoDB和访问任何兼容Hadoop的文件系统一样。

 

1.9 使用Pig从HDFS导出数据到MongoDB

MongoDB 是一种 NoSQL 数据库,用于存储和检索海量数据。MongoDB 通常用于存储面向用户的数据,这些数据必须经过清洗、格式化之后才可以被使用。Apache Pig 从某种程度上讲就是用来处理这种任务的。Mongostorage类使得使用Pig 可以非常方便地批量处理 HDFS 上的数据,再直接将这些数据导入 MongoDB 中。本节将使用 Mongostorage类将HDFS上的数据导出到MongoDB数据库中。

准备工作

使用Mongo Hadoop 适配器最简单的方法是从GitHub 上克隆Mongo-Hadoop工程,并且将该工程编译到一个特定的Hadoop版本。克隆该工程需要安装一个Git客户端。

本节假定你使用的Hadoop版本是CDH3。

Git客户端官方的下载地址是:http://git-scm.com/downloads。

在Windows操作系统上可以通过http://windows.github.com/访问GitHub。

在Mac操作系统上可以通过http://mac.github.com/访问GitHub。

可以通过https://github.com/mongodb/mongo-hadoop获取到Mongo Hadoop适配器。该工程需要编译在特定的Hadoop版本上。编译完的JAR文件需要复制到Hadoop集群每个节点的$HADOOP_HOME/lib目录下。

Mongo Java的驱动包也需要安装到Hadoop集群每个节点的$HADOOP_HOME/lib目录下。该驱动包可从https://github.com/mongodb/mongo-java-driver/downloads下载。

操作步骤

完成下面的步骤,将数据从HDFS复制到MongoDB。

1.通过下面的命令实现克隆mongo-hadoop工程:

git clone https://github.com/mongodb/mongo-hadoop.git

2.切换到稳定发布的1.0分支版本:

git checkout release-1.0

3 .必须保持 mongo-hadoop 与 Hadoop 的版本一致。使用文本编辑器打开mongo-hadoop克隆目录下的build.sbt文件,修改下面这行:

hadoopRelease in ThisBuild := "default"

修改为:

hadoopRelease in ThisBuild := "cdh3"

4.编译mongo-hadoop:

./sbt package.

这将会在core/target文件夹下生成一个名为mongo-hadoop-core_cdh3u3-1.0.0.jar的JAR文件。

5.从https://github.com/mongodb/mongo-java-driver/downloads下载MongoDB2.8.0版本的Java驱动包。

6.复制 mongo-hadoop-core、mongo-hadoop-pig 和 MongoDB Java 驱动包到Hadoop集群每个节点的$HADOOP_HOME/lib:

cp mongo-hadoop-core_cdh3u3-1.0.0.jar mongo-2.8.0.jar $HADOOP_HOME/lib

7.创建一个Pig脚本读取HDFS上的weblogs数据并将其存储到MongoDB数据库:

register /path/to/mongo-hadoop/mongo-2.8.0.jar

register /path/to/mongo-hadoop/core/target/mongo-hadoop-core- 1.0.0.jar

register /path/to/mongo-hadoop/pig/target/mongo-hadoop-pig- 1.0.0.jar

define MongoStorage com.mongodb.hadoop.pig.MongoStorage();

weblogs = load '/data/weblogs/weblog_entries.txt' as

(md5:chararray, url:chararry, date:chararray, time:chararray,

ip:chararray);

store weblogs into 'mongodb://<HOST>:<PORT>/test.weblogs_from_pig' using

MongoStorage();

工作原理

Mongo Hadoop适配器提供了一种新的兼容Hadoop的文件系统实现,包括MongoInputFormat和MongoOutputFormat。这些抽象实现使得访问MongoDB和访问任何兼容Hadoop的文件系统一样。MongoStorage将Pig类型转化为MongoDB可以直接访问的BasicDBObjectBuilder类型。

 

1.10 在Greenplum外部表中使用HDFS

Greenplum是一个并行数据库,数据的存储与查询基于一个或多个PostgreSQL实例。它补充了Hadoop,提供对大数据的实时或准实时访问,它还支持使用HDFS文件作为外部表。外部表是一个处理Greenplum集群之外数据很好的解决方案。由于外部表访问首先要消耗网络带宽,所以与Greenplum集群内的数据相比,它应该存储那些访问相对不频繁的数据。本节将介绍如何创建只读的外部表和可读写的外部表。

准备工作

在本节假定你使用的Hadoop版本为CDH3。

运行一个Greenplum实例,确认它能够访问Hadoop集群。相关信息可以查看http://www.greenplum.com/products/greenplum-database。

按照下面的方式配置Greenplum。

gp_hadoop_target_version 设置为cdh3u2。

gp_hadoop_home 设置为$HADOOP_HOME 的完整路径。

在Greenplum集群的每个节点上都需要安装1.6以上(包含1.6)版本的Java。

操作步骤

创建HDFS上weblogs文件的一个外部表:

CREATE EXTERNAL TABLE weblogs(

md5   text,

url   text,

request_date date,

request_time time,

ip   inet

)

LOCATION ('gphdfs://<NAMENODE_HOST>:<NAMENODE_PORT>/data/weblogs/

weblog_ entries.txt')

FORMAT 'TEXT' (DELIMITER '\t');

工作原理

Greenplum本地支持并行地将 HDFS 上的数据加载到数据库中。当一个查询需要访问表weblog_entries.txt时,该文件会被加载为Greenplum的一个临时表,然后执行对该临时表的查询。等到查询结束以后再将该表丢弃。

更多参考

Greenplum也支持对外部表的写操作。这需要在创建表的时候指定写关键字:

CREATE WRITABLE EXTERNAL TABLE weblogs(

md5   text,

url   text,

request_date date,

request_time time,

ip   inet

)

LOCATION ('gphdfs://<NAMENODE_HOST>:<NAMENODE_PORT>/data/weblogs/

weblog_ entries.txt')

FORMAT 'TEXT' (DELIMITER '\t');

更多的信息可以查看Greenplum管理员手册,见http://media.gpadmin.me/wp-content/uploads/2011/05/GP-4100-AdminGuide.pdf。

 

1.11 利用Flume加载数据到HDFS中

Apache Flume 是Hadoop 社区的一个项目,由多个相关项目组成,用于从不同的数据源可靠有效地加载数据流到HDFS中。Flume最常见的一个场景是加载多个数据源的网站日志数据。本节将介绍如何使用Flume加载数据到HDFS中。

准备工作

在本节中假定你已经安装和配置好Flume。

Flume可以从Apache网页(http://incubator.apache.org/flume/)下载。

如果你使用的是CDH3,那么默认已经安装了Flume 0.9.4+25.43 的版本。

操作步骤

完成下面的步骤,实现将weblogs数据导入HDFS。

1.使用dump命令测试Flume是否配置正确:

flume dump 'text("/path/to/weblog_entries.txt")'

2.通过Flume shell 执行一个配置:

flume shell -c<MASTER_HOST>:<MASTER_PORT> -e 'exec config text("/path/to/ weblog_entries.txt") | collectorSink("hdfs://<NAMENODE_HOST>:<NAMENODE_PORT>/data/weblogs/flume")'

工作原理

Flume包含Sources [8]和Sinks [9]两个抽象概念,并通过管状的数据流将它们合并在一起。在这个例子中,数据来源是text方式,将文件路径作为参数,并将该文件中的内容发送到配置的数据输出端。dump命令使用控制台为数据输出端。按照这样的配置,weblog_entries.txt文件的内容以text的方式被读取,同时被写到控制台。

在步骤 2 中,使用了 Flume shell 配置并执行一个作业。-c 参数告诉了 Flume Master节点的连接地址。Flume将会执行–e参数后面的命令。如前所述,text是一种读取所传文件中所有内容的数据源。collectorSink是一个数据去向,可以传给本地文件系统路径或者HDFS文件系统路径。在前面这个例子中,我们传递的是一个HDFS的路径。这个命令执行的效果会将weblog_entries.txt导入HDFS中。

更多参考

Flume提供了几个预定义的Source和Sink。下面是一些基本的数据源。

null:不读取任何数据。

stdin:读入一个标准的输入流。

rpcSource:读取Thrift 或者Avro RPC。

text:读取一个文件中的内容。

tail:读取一个文件,并保持文件打开状态用于持续读取追加到文件中的数据下面是一些基本的Sink。

null:将数据丢弃,不进行写操作。

collectorSink:写到一个本地文件系统或者HDFS 上。

console:写到控制台。

formatDfs:写到HDFS 并带一定的格式,如序列化文件、Avro、Thrift 等。

rpcSink:写给Thrift 或者Avro RPC。

注 释

[1]. 即席查询是用户根据自己的需求,灵活地选择查询条件,系统能够根据用户的选择生成相应的统计报表。

[2]. 原文是针对Hadoop 0.20.0 的版本,对目前来说该版本已经很老,Hadoop 1.0 以上的版本put 已经可以对文件夹进行复制。

[3]. Hadoop 0.20.2 以上已经支持不同版本间的distcp复制了。——译者注

[4]. “只有map”表示一个作业只启动map 阶段没有启动reduce阶段。

[5]. Hadoop ftp 文件系统。

[6]. 保证MySQL与Hadoop 集群中的每个节点间网络是相通的。

[7]. $SQOOP_HOME为SQOOP 的安装目录。

[8]. 数据来源。

[9]. 数据去向。

图书在版编目(CIP)数据

Hadoop实战手册/(美)欧文斯(Owens,J.R.),(美)伦茨(Lentz,J.),(美)费米亚诺(Femiano,B.)著;傅杰,赵磊,卢学裕译.--北京:人民邮电出版社,2014.2

书名原文:Hadoop real-world solutions cookbook

ISBN 978-7-115-33795-5

Ⅰ.①H… Ⅱ.①欧…②伦…③费…④傅…⑤赵…⑥卢… Ⅲ.①数据处理软件—技术手册 Ⅳ.①TP274-62

中国版本图书馆CIP数据核字(2013)第277092号

版权声明

Copyright ©2013 Packt Publishing. First published in the English language under the title Hadoop Real-World Solutions Book.

All Rights Reserved.

本书由英国Packt Publishing公司授权人民邮电出版社出版。未经出版者书面许可,对本书的任何部分不得以任何方式或任何手段复制和传播。

版权所有,侵权必究。

◆著 [美]Jonathan R.Owens Jon Lentz Brian Femiano

译 傅杰 赵磊 卢学裕

责任编辑 杨海玲

责任印制 程彦红 杨林杰

◆人民邮电出版社出版发行  北京市丰台区成寿寺路11号

邮编 100164  电子邮件 315@ptpress.com.cn

网址 http://www.ptpress.com.cn

北京鑫正大印刷有限公司印刷

◆开本:800×1000 1/16

印张:16.25

字数:318千字  2014年2月第1版

印数:1-3500册  2014年2月北京第1次印刷

著作权合同登记号 图字:01-2013-4468号

定价:59.00元

读者服务热线:(010)81055410 印装质量热线:(010)81055316

反盗版热线:(010)81055315

相关图书

Hadoop虚拟化
Hadoop虚拟化
大数据开发者权威教程:NoSQL、Hadoop组件及大数据实施
大数据开发者权威教程:NoSQL、Hadoop组件及大数据实施
Hadoop海量数据处理:技术详解与项目实战(第2版)
Hadoop海量数据处理:技术详解与项目实战(第2版)
Hadoop海量数据处理:技术详解与项目实战
Hadoop海量数据处理:技术详解与项目实战
Hadoop MapReduce实战手册
Hadoop MapReduce实战手册
Hadoop MapReduce性能优化
Hadoop MapReduce性能优化

相关文章

相关课程