首家大数据培训挂牌机构 股票代码:837906 | EN CN
阿里巴巴菜鸟级数据产品经理半年回顾总结篇
干货教程:如何绘制业务流程图(二)
干货教程:如何绘制业务流程图(一)
技术贴:如何在数据库中秘密地查询隐私数据
攻略教程:信息图(infographic)是怎么做出来的?
分析师一定要看!用数据讲故事的五个步骤
技术篇:怎样玩转千万级别的数据?
北漂书生:大数据时代SEO数据如何搜集和分析
干货,从十大问题重新认识并读懂互联网
相似图片搜索、算法、识别的原理解析(下)
相似图片搜索、算法、识别的原理解析(上)
制作信息图时请遵循这10条原则
提高表格可读性的一些技巧,适用于Excel、PPT等数据报表
实用教程:如何让Excel图表更具“商务气质”?
一张数据信息图是这样制作完成的
菜鸟读财报,如何从上市公司财报中挖情报?
北大数据分析老鸟写给学弟们一封信
如何一步一步制作出高品质数据信息图?
总结:海量数据分析处理的十个方法
【实战经验】数据分析师如何了解老板真正想法?
零售业数据分析那些事儿
数据分析时l常用电子表格公式【大全】
用数据来告诉你 上市公司财报的秘密
这12个数据能 帮你搞定淘宝店铺
首席工程师揭秘:LinkedIn大数据后台是如何运作的?(四)
首席工程师揭秘:LinkedIn大数据后台是如何运作的?(三)
首席工程师揭秘:LinkedIn大数据后台是如何运作的?(二)
首席工程师揭秘:LinkedIn大数据后台是如何运作的?(一)
淘宝网店从激活到挽留,4步走玩转数据营销
文案怎样写才有意思、不空洞、打动人?
入门级扫盲贴:数据分析的步骤有哪些?
关系即数据,论社交媒体的关系转换
数据的力量,苹果教你用数据鄙视竞争对手
谁说文科生不能做数据分析?数据分析入行→技能提升→优势
产品运营数据分析——SPSS数据分组案例
如何追踪iPhone和iPad等移动设备的用户行为数据?
阿里巴巴中国站:用户满意度指标权重计算方法
广告中的AdNetwork、AdExchange、DSP、SSP、RTB和DMP是什么?
信息图制作教程:关于数值的表现
为什么大数据会如此轰动?(值得深度的文章)
多图技术贴:深入浅出解析大数据平台架构
面板数据分析中标准误的估计修正——根据Peterson (2009)的归纳
财务官、投资人、CIO看过来:给企业数据定价
推荐系统中常用算法 以及优点缺点对比
探索Weotta搜索引擎背后的大数据技术
如何识别虚假数据?
为什么我们像驯化小狗那样驯化算法
程序员必须知道的10大基础实用算法及其讲解
电子商务:最影响转化率的九大要素
如何迅速成为一名数据分析师?
想从事大数据、海量数据处理相关的工作,如何自学打基础?
如何用亚马逊弹性MapReduce分析大数据?
译文:机器学习算法基础知识
给hadoop新手的一封信:Hadoop入门自学及对就业的帮助
从入门到精通,我是这样学习算法的
小商家,从老客户身上获取的数据才更有意义
13页PPT讲述:大数据下网站数据分析应用
40页PPT详解:京东大数据基础构架与创新应用
67页PPT解密搜索引擎背后的大技术:知识图谱,大数据语义链接的基石
营销洞察力——10个营销度量指标
技术篇:前端数据之美如何展示?
董飞:美国大数据工程师面试攻略【PPT】
easel:如何制作好的信息图——来自专家的顶级技巧
大数据实操:以3D打印机为例,如何知道卖点有没有市场需求?
大数据建模 需要了解的九大形式
用户画像数据建模方法
从规划开始,公司or企业如何入手和实施大数据?
干货:商品信息数据分析和展现系统的设计与开发
高手教你用Excel制作百度迁徙数据地图
50篇干货:淘宝店/电子商务如何玩转数据分析?
精华索引:大数据实际应用案例50篇
验证最小化可行产品 (MVP) 的 15 种方法
干货:数据分析师的完整知识结构
大数据技术Hadoop面试题,看看你能答对多少?答案在后面
用SPSS做数据分析?先弄懂SPSS的基础知识吧
怎样做出优秀的扁平化设计风格PPT? 扁平化PPT设计手册#3
解答│做大数据过程中遇到的13个问题
40页PPT│社交网络发展的新动力:大数据与众包
以Amazon、豆瓣网为例,探索推荐引擎内部的秘密#1
怎样做出优秀的扁平化设计风格PPT?#2
怎样做出优秀的扁平化设计风格PPT?#1
36页PPT│大数据分析关键技术在腾讯的应用服务创新
如何丰满地做SWOT分析?
【35页PPT】TalkingData研发副总阎志涛:移动互联网大数据处理系统架构
27页PPT|以珍爱网为例,如何构建有业务价值的数据分析系统?
国外数据新闻资源分享
21页PPT重磅发布:Mariana——腾讯深度学习平台的进展与应用
从0到100——知乎架构变迁史
PPT解读:百度大数据质量保障方案探索
45页PPT|大数据环境下实现一个O2O通用推荐引擎的实践
从数据看豆瓣兴衰
深度学习系列:解密最接近人脑的智能学习机器——深度学习及并行化实现(四)
重磅推荐:129页PPT讲述移动时代创业黄金法则 via:腾讯企鹅智酷
重磅推荐:大数据工程师飞林沙的年终总结&算法数据的思考
OpenKN——网络大数据时代的知识计算引擎
大数据下城市计算的典型应用
技术贴:大数据告诉你,如何给微信公众号文章取标题?
你的QQ暴露了你的心——QQ大数据及其应用介绍PPT
如何从企业报表看企业的生存能力?
实用的大数据技巧合集
技术帝揭秘:充电宝是如何盗取你的个人隐私的?
重磅!50页PPT揭秘腾讯大数据平台与推荐应用架构
原创教程:饼图之复合饼图与双层饼图(1)
PPT:大数据时代的设计特点——不了解这个你做不了今天的设计
教程贴:如何用方程式写春联?
原创教程:如何用Excel制作简易动态对比图
深度译文:机器学习那些事
教程帖:数学之美——手把手教你用Excel画心(动态图)
董老师走进斯坦福,聊聊硅谷创业公司和大数据的事儿(附课件PPT下载)
【限时】年度钜献,108个大数据文档PDF开放下载
董飞专栏:大数据入门——大数据相关技术、Hadoop生态、LinkedIn内部实战
亿级用户下的新浪微博平台架构
一张图了解磁盘里的数据结构
浅析数据化设计思维在阿里系产品的应用
美团推荐算法实践
一个P2P创业公司有哪些部门,都是做什么的?
一个P2P平台的详细运营框架是怎样的?
机器学习中的算法——决策树模型组合之随机森林与GBDT
神经网络简史
58页PPT看懂互联网趋势,大数据/物联网/云计算/4G都有了
广点通背后的大数据技术秘密——大规模主题模型建模及其在腾讯业务中的应用(附PPT)
微信红包之CBA实践PPT——移动互联网海量访问系统设计
一文读懂机器学习,大数据/自然语言处理/算法全有了……
搜狐新闻客户端的背后大数据技术原理——推荐系统(PPT)
原创教程:用Excel做动态双层饼图
半小时读懂PMP私有广告交易市场
怎样分析样本调研数据(译)
PPT:支付宝背后的大数据技术——DataLab、Higo的实践及应用
大数据技术人员的工具包——开源大数据处理工具list(限时下载)
计算机视觉:随机森林算法在人体识别中的应用
24页PPT:机器学习——支持向量机SVM简介(附下载)
互联网高手教你如何搜集你想要的信息
深度:对地观测大数据处理、挑战与思考
原创教程:用Excel做饼图之复合饼图与双层饼图(2)
移动大数据时代: 无线网络的挑战与机遇(附pdf下载)
Excel使用技巧——25招必学秘技
【年度热门】加上这些 Excel 技能点,秒杀众人(多图)
原创教程:用Excel做纵向折线图
知识图谱——机器大脑中的知识库
何明科专栏:用数据化的方式解析投资条款
DT时代,如何用大数据分析创造商业价值(23页PPT)
MIT牛人梳理脉络详解宏伟现代数据体系
你的老婆是怎么算出来的?揭秘佳缘用户推荐系统
飞林沙:商品推荐算法&推荐解释
PPT:如何成为真正的数据架构师?(附下载)
开源大数据查询分析引擎现状
董飞专栏:打造数据产品必知秘籍
译文:如何做强大又漂亮的信息图
如何使用Amazon Machine Learning构建机器学习预测模型
如何运用数据协助货架管理(内附26张PPT)
SVM算法
主流大数据系统在后台的层次角色及数据流向
PPT:阿里全息大数据构建与应用
人脸识别技术大总结——Face Detection & Alignment
教程:用Excel制作成对条形图
易观智库:大数据下的用户分析及用户画像(18页PPT附下载)
技术向:如何设计企业级大数据分析平台?
电商数据分析基础指标体系
IBM SPSS Modeler 决策树之银行行销预测应用分析
拓扑数据分析与机器学习的相互促进
基于 R 语言和 SPSS 的决策树算法介绍及应用
用php做爬虫 百万级别知乎用户数据爬取与分析
另类新浪微博基本数据采集方法
以10万+阅读的文章为例 教你做微信公众号的运营数据分析
破解数据三大难题:变现?交易?隐私?
微店的大数据平台建设实践与探讨
阿里巴巴PPT:大数据基础建议及产品应用之道
基于社会媒体的预测技术
人工智能简史
技巧:演讲中怎样用数据说话
马云和小贝选谁做老公?写给非数据人的数据世界入门指南
掘金大数据产业链:上游资源+中游技术+下游应用
原创教程:手把手教你用Excel做多层折线图
销售分析:如何从数据指标发现背后的故事
如何一步步从数据产品菜鸟走到骨干数据产品
也来谈谈微博的用户画像
行走在网格之间:微博用户关系模型
如何拍出和明星一样美爆的自拍照?斯坦福大学用卷积神经网络建模告诉你
运营商如何玩转大数据? 浙江移动云计算和大数据实践(PPT附下载)
大数据分析的集中化之路 建设银行大数据应用实践PPT
腾讯防刷负责人:基于用户画像大数据的电商防刷架构
创业提案的逻辑
友盟分享 | 移动大数据平台架构思想以及实践经验
寻路推荐 豆瓣推荐系统实践之路
“小数据”的统计学
重磅!8大策略让你对抗机器学习数据集里的不均衡数据
小团队撬动大数据——当当推荐团队的机器学习实践
微博推荐架构的演进
科普文 手把手教你微信公众号数据分析
信息图制作的六个注意点
【权利的游戏】剧透新玩法:情理之中?意料之外
推荐系统(Recommender System)的技术基础
核心算法 谷歌如何从网络的大海里捞到针
Quora数据科学家和机器学习工程师是如何合作的
阿里巴巴PPT:大数据下的数据安全
数据建模那点事儿
全民拥抱Docker云–Lhotse系统经验分享
实时股票分析系统的架构与算法
架构师必看 京东咚咚架构演进
什么叫对数据敏感?怎样做数据分析?
推荐系统基础知识储备
刘德寰:数据科学的整合与细分 数据科学的七个危险趋势(视频)
实际工作中,如何做简单的数据分析?
分布式前置机器学习在威胁情报中的应用(附PPT下载)
数据科学 怎样进行大数据的入门级学习?
扛住100亿次请求 如何做一个“有把握”的春晚红包系统?(PPT下载)
从 LinkedIn 的数据处理机制学习数据架构
大数据会如何改变管理咨询公司(I)
优秀大数据GitHub项目一览
生硬的数字和数据新闻:这么近,那么远
经典大数据架构案例:酷狗音乐的大数据平台重构(长文)
揭秘中兴大数据在银行领域的系统部署
基于大数据的用户画像构建(理论篇)
【R】支持向量机模型实现
数据图处处有陷阱?五个例子教你辨真伪
如何用R绘制地图
你确定你真的懂用户画像?
数据模型需要多少训练数据?
【接地气】01 数据报表的颜色怎么配
游戏价值和数据分析新思路
【R】异常值检测
快的打车架构实践
豆瓣还是朋友圈:大数据、新方法和日常问
PPT数据图表,怎么做才好看?
大道至简的数据体系构建方法论
数据的误区及自身业务
新浪微博的用户画像是怎样构建的?
面试干货!21个必知数据科学面试题和答案part1(1-11)
易观智库:中国大数据产业生态图谱2016(附下载)
Airbnb的数据基础架构
50PB海量数据排序,谷歌是这么做的
大数据时代工程师如何应对–今日头条走进硅谷技术讲座
D3.js教学记(下)
D3.js教学记(上)
飞林沙:企业级服务公司如何赚钱?只有平台级产品才有大数据的理论
一个母婴电子商务网站的大数据平台及机器学习实践
7大板块 组成数据分析师的完整知识结构
干货:SaaS领域如何分析收入增长?
学术 | 词嵌入的类比特性有实用意义吗?
6个用好大数据的秘诀
一个数据库外行眼中的微信优化 (附专家补充)
大数据调研,如何实现快全准?
数据大师Olivier Grisel给志向高远的数据科学家的指引
数据堂肖永红:数据交易的是使用权或数据的增值,而不是数据本身(PPT附下载)
淘宝商品详情平台化思考与实践
刘译璟:百分点大数据理念和实践(图文+PPT下载)
如何快速搞定一份看起来还不错的演示文档?
【BABY夜谈大数据】决策树
数据驱动设计:数据处理流程、分析方法和实战案例
美图数据总监:Facebook的法宝,我们在产品中怎么用?
树的内核:量化树结构化数据之间的相似性
拿到用户数据之后,LinkedIn怎么赚钱?
GrowingIO张溪梦:增长黑客的核心 企业应该重视产品留存率(附PPT下载)
[译]Airbnb是如何使用数据理解用户旅行体验的?
微博推荐数据服务代理: hyper_proxy的设计和实现
星图数据谷熠:消费领域DaaS 大数据重构未来商业游戏规则(附PPT下载)
鲍忠铁:TalkingData大数据技术与应用实践(PPT下载)
【干货教材】数据分析VS业务分析需求
九枝兰专访:数字营销的核心—企业如何使用数据管理平台(DMP)进行精准营销
我们的应用系统是如何支撑千万级别用户的
R应用空间数据科学
Excel进行高级数据分析(上)
Excel进行高级数据分析(下)
国内各大互联网公司2.0版技术站点收集
网站数据分析思路导图
大数据分析报表设计开发要素
大数据需要的12个工具 推荐
YARN/MRv2 Resource Manager深入剖析—NM管理
YARN/MRv2 Resource Manager深入剖析—RMApp状态机分析
Hadoop 1.0与Hadoop 2.0资源管理方案对比
Hadoop 2.0中单点故障解决方案总结
Hadoop 2.0 (YARN)中的安全机制概述
Hadoop 新特性、改进、优化和Bug分析系列1:YARN-378
Hadoop 新特性、改进、优化和Bug分析系列2:YARN-45
Hadoop 新特性、改进、优化和Bug分析系列3:YARN-392
Hadoop版本选择探讨
探究提高Hadoop稳定性与性能的方法
《Effective C++》读书笔记(第一部分)
Hadoop分布式环境下的数据抽样
Hadoop计算能力调度器算法解析
如何编写Hadoop调度器
数据结构之红黑树
Hadoop pipes设计原理
《C++ Primer plus》学习笔记之”类”
《C++ Primer plus》学习笔记之”类继承”
《C++ Primer plus》学习笔记之”C++中的代码重用”
《C++ Primer plus》学习笔记之”异常”
《C++ Primer plus》学习笔记之”RTTI”
Hadoop pipes编程
Hadoop Streaming高级编程
《C++ Primer plus》学习笔记之”标准模板库”
《C++ Primer plus》学习笔记之”输入输出库”
Linux Shell 命令总结
算法之图搜索算法(一)
awk使用总结
素数判定算法
《C++ Primer plus》学习笔记之“函数探幽”
使用Thrift RPC编写程序
如何在Hadoop上编写MapReduce程序
怎样从10亿查询词找出出现频率最高的10个

Hadoop pipes编程

于2017-03-26由小牛君创建

分享到:



1. Hadoop pipes编程介绍

Hadoop pipes允许C++程序员编写mapreduce程序,它允许用户混用C++和Java的RecordReader, Mapper, Partitioner,Rducer和RecordWriter等五个组件。关于Hadoop pipes的设计思想,可参见我这篇文章:Hadoop Pipes设计原理

本文介绍了Hadoop pipes编程的基本方法,并给出了若干编程示例,最后介绍了Hadoop pipes高级编程方法,包括怎样在MapReduce中加载词典,怎么传递参数,怎样提高效率等。

2. Hadoop pipes编程初体验

Hadoop-0.20.2源代码中自带了三个pipes编程示例,它们位于目录src/examples/pipes/impl中,分别为wordcount-simple.cc,wordcount-part.cc和wordcount-nopipe.cc。下面简要介绍一下这三个程序。

(1) wordcount-simple.cc:Mapper和Reducer组件采用C++语言编写,RecordReader, Partitioner和RecordWriter采用Java语言编写,其中,RecordReader 为LineRecordReader(位于InputTextInputFormat中,按行读取数据,行所在的偏移量为key,行中的字符串为value),Partitioner为PipesPartitioner,RecordWriter为LineRecordWriter(位于InputTextOutputFormat中,输出格式为”key\tvalue\n”)

(2) wordcount-part.cc:Mapper,Partitioner和Reducer组件采用C++语言编写,其他采用Java编写

(3)wordcount-nopipe.cc:RecordReader,Mapper,Rducer和RecordWriter采用C++编写

接下来简单介绍一下wordcount-simple.cc的编译和运行方法。

在Hadoop的安装目录下,执行下面命令:

 ant -Dcompile.c++=yes examples 

则wordcount-simple.cc生成的可执行文件wordcount-simple被保存到了目录build/c++-examples/Linux-amd64-64/bin/中,然后将该可执行文件上传到HDFS的某一个目录下,如/user/XXX/ bin下:

 bin/hadoop  -put  build/c++-examples/Linux-amd64-64/bin/wordcount-simple  /user/XXX/ bin/ 

上传一份数据到HDFS的/user/XXX /pipes_test_data目录下:

 bin/hadoop  -put  data.txt  /user/XXX /pipes_test_data 

直接使用下面命令提交作业:


bin/hadoop pipes \

-D hadoop.pipes.java.recordreader=true \

-D hadoop.pipes.java.recordwriter=true \

-D mapred.job.name= wordcount \

-input /user/XXX /pipes_test_data \

-output /user/XXX /pipes_test_output \

-program /user/XXX/ bin/wordcount-simple

3. Hadoop pipes编程方法

先从最基础的两个组件Mapper和Reducer说起。

(1) Mapper编写方法

用户若要实现Mapper组件,需继承HadoopPipes::Mapper虚基类,它的定义如下:


class Mapper: public Closable {

public:

virtual void map(MapContext& context) = 0;

};

用户必须实现map函数,它的参数是MapContext,该类的声明如下:


class MapContext: public TaskContext {

public:

virtual const std::string& getInputSplit() = 0;

virtual const std::string& getInputKeyClass() = 0;

virtual const std::string& getInputValueClass() = 0;

};

而TaskContext类地声明如下:


class TaskContext {

public:

class Counter {

……

public:

Counter(int counterId) : id(counterId) {}

Counter(const Counter& counter) : id(counter.id) {}

……

};

virtual const JobConf* getJobConf() = 0;

virtual const std::string& getInputKey() = 0;

virtual const std::string& getInputValue() = 0;

virtual void emit(const std::string& key, const std::string& value) = 0;

virtual void progress() = 0;

…….

};

用户可以从context参数中获取当前的key,value,progress和inputsplit等数据信息,此外,还可以调用emit将结果回传给Java代码。

Mapper的构造函数带有一个HadoopPipes::TaskContext参数,用户可以通过它注册一些全局counter,对于程序调试和跟踪作业进度非常有用:

如果你想注册全局counter,在构造函数添加一些类似的代码:


WordCountMap(HadoopPipes::TaskContext& context) {

inputWords1 = context.getCounter(“group”, ”counter1”);

inputWords2 = context.getCounter(“group”, ”counter2”);

}

当需要增加counter值时,可以这样:


context.incrementCounter(inputWords1, 1);

context.incrementCounter(inputWords2, 1);

其中getCounter的两个参数分别为组名和组内计数器名,一个组中可以存在多个counter。

用户自定义的counter会在程序结束时,输出到屏幕上,当然,用户可以用通过web界面看到。

(2) Reducer编写方法

Reducer组件的编写方法跟Mapper组件类似,它需要继承虚基类public HadoopPipes::Reducer。

与Mapper组件唯一不同的地方时,map函数的参数类型为HadoopPipes::ReduceContext,它包含一个nextValue()方法,这允许用于遍历当前key对应的value列表,依次进行处理。

接下来介绍RecordReader, Partitioner和RecordWriter的编写方法:

(3) RecordReader编写方法

用户自定义的RecordReader类需要继承虚基类HadoopPipes::RecordReader,它的声明如下:


class RecordReader: public Closable {

public:

virtual bool next(std::string& key, std::string& value) = 0;

virtual float getProgress() = 0;

};

用户需要实现next和 getProgress两个方法。

用户自定义的RecordReader的构造函数可携带类型为HadoopPipes::MapContext的参数,通过该参数的getInputSplit()的方法,用户可以获取经过序列化的InpuSplit对象,Java端采用不同的InputFormat可导致InputSplit对象格式不同,但对于大多数InpuSplit对象,它们可以提供至少三个信息:当前要处理的InputSplit所在的文件名,所在文件中的偏移量,它的长度。用户获取这三个信息后,可使用libhdfs库读取文件,以实现next方法。

下面介绍一下反序列化InputSplit对象的方法:

【1】 如果Java端采用的InputFormat为WordCountInpuFormat,可以这样:


class XXXReader: public HadoopPipes::RecordReader {

public:

XXXReader (HadoopPipes::MapContext& context) {

std::string filename;

HadoopUtils::StringInStream stream(context.getInputSplit());

HadoopUtils::deserializeString(filename, stream);

……

}; 

【2】 如果Java端采用的InputFormat为TextInpuFormat,可以这样:


class XXXReader: public HadoopPipes::RecordReader {

public:

XXXReader (HadoopPipes::MapContext& context) {

std::string filename;

HadoopUtils::StringInStream stream(context.getInputSplit());

readString(filename, stream);

int start = (int)readLong(stream);

int len = (int)readLong(stream);

……

private:

void readString(std::string& t, HadoopUtils::StringInStream& stream)

{

int len = readShort(stream);

if (len > 0) {

// resize the string to the right length

t.resize(len);

// read into the string in 64k chunks

const int bufSize = 65536;

int offset = 0;

char buf[bufSize];

while (len > 0) {

int chunkLength = len > bufSize ? bufSize : len;

stream.read(buf, chunkLength);

t.replace(offset, chunkLength, buf, chunkLength);

offset += chunkLength;

len -= chunkLength;

}

} else {

t.clear();

}

}

long readLong(HadoopUtils::StringInStream& stream) {

long n;

char b;

stream.read(&b, 1);

n = (long)(b & 0xff) << 56 ;

stream.read(&b, 1);

n |= (long)(b & 0xff) << 48 ;

stream.read(&b, 1);

n |= (long)(b & 0xff) << 40 ;

stream.read(&b, 1);

n |= (long)(b & 0xff) << 32 ;

stream.read(&b, 1);

n |= (long)(b & 0xff) << 24 ;

stream.read(&b, 1);

n |= (long)(b & 0xff) << 16 ;

stream.read(&b, 1);

n |= (long)(b & 0xff) << 8 ;

stream.read(&b, 1);

n |= (long)(b & 0xff) ;

return n;

}

}; 

(4) Partitioner编写方法

用户自定义的Partitioner类需要继承虚基类HadoopPipes:: Partitioner,它的声明如下:


class Partitioner {

public:

virtual int partition(const std::string& key, int numOfReduces) = 0;

virtual ~Partitioner() {}

};

用户需要实现partition方法和 析构函数。

对于partition方法,框架会自动为它传入两个参数,分别为key值和reduce task的个数numOfReduces,用户只需返回一个0~ numOfReduces-1的值即可。

(5) RecordWriter编写方法

用户自定义的RecordWriter类需要继承虚基类HadoopPipes:: RecordWriter,它的声明如下:


class RecordWriter: public Closable {

public:

virtual void emit(const std::string& key,

const std::string& value) = 0;

};

用户自定的RecordWriter的构造函数可携带类型为HadoopPipes::MapContext的参数,通过该参数的getJobConf()可获取一个HadoopPipes::JobConf的对象,用户可从该对象中获取该reduce task的各种参数,如:该reduce task的编号(这对于确定输出文件名有用),reduce task的输出目录等。


class MyWriter: public HadoopPipes::RecordWriter {

public:

MyWriter(HadoopPipes::ReduceContext& context) {

const HadoopPipes::JobConf* job = context.getJobConf();

int part = job->getInt("mapred.task.partition");

std::string outDir = job->get("mapred.work.output.dir");

……

}

}

用户需实现emit方法,将数据写入某个文件。

4. Hadoop pipes编程示例

网上有很多人怀疑Hadoop pipes自带的程序wordcount-nopipe.cc不能运行,各个论坛都有讨论,在此介绍该程序的设计原理和运行方法。

该运行需要具备以下前提:

(1) 采用的InputFormat为WordCountInputFormat,它位于src/test/下的org.apache.hadoop.mapred.pipes中

(2) 输入目录和输出目录需位于各个datanode的本地磁盘上,格式为:file:///home/xxx/pipes_test (注意,hdfs中的各种接口同时支持本地路径和HDFS路径,如果是HDFS上的路径,需要使用hdfs://host:9000/user/xxx,表示/user/xxx为namenode 为host的hdfs上的路径,而本地路径,需使用file:///home/xxx/pipes_test,表示/home/xxx/pipes_test为本地路径。例如,bin/hadoop fs –ls file:///home/xxx/pipes_test表示列出本地磁盘上/home/xxx/pipes_tes下的文件)

待确定好各个datanode的本地磁盘上有输入数据/home/xxx/pipes_test/data.txt后,用户首先上传可执行文件到HDFS中:

 bin/hadoop  -put  build/c++-examples/Linux-amd64-64/bin/wordcount-nopipe  /user/XXX/bin/ 

然后使用下面命令提交该作业:


bin/hadoop pipes \

-D hadoop.pipes.java.recordreader=false \

-D hadoop.pipes.java.recordwriter=false \

-D mapred.job.name=wordcount \

-D mapred.input.format.class=org.apache.hadoop.mapred.pipes.WordCountInputFormat \

-libjars hadoop-0.20.2-test.jar \

-input file:///home/xxx/pipes_test/data.txt \

-output file:///home/xxx/pipes_output \

-program /user/XXX/bin/wordcount-nopipe

5. Hadoop pipes高级编程

如果用户需要在mapreduce作业中加载词典或者传递参数,可这样做:

(1) 提交作业时,用-files选项,将词典(需要传递参数可以放到一个配置文件中)上传给各个datanode,如:


bin/hadoop pipes \

-D hadoop.pipes.java.recordreader=false \

-D hadoop.pipes.java.recordwriter=false \

-D mapred.job.name=wordcount \

-files dic.txt \

….

(2) 在Mapper或者Reducer的构造函数中,将字典文件以本地文件的形式打开,并把内容保存到一个map或者set中,然后再map()或者reduce()函数中使用即可,如:


WordCountMap(HadoopPipes::TaskContext& context) {

file = fopen(“dic.txt”, "r"); //C库函数

…….

}

为了提高系能,RecordReader和RecordWriter最好采用Java代码实现(或者重用Hadoop中自带的),这是因为Hadoop自带的C++库libhdfs采用JNI实现,底层还是要调用Java相关接口,效率很低,此外,如果要处理的文件为二进制文件或者其他非文本文件,libhdfs可能不好处理。

6. 总结

Hadoop pipes使C++程序员编写MapReduce作业变得可能,它简单好用,提供了用户所需的大部分功能。

1. Hadoop pipes编程介绍

Hadoop pipes允许C++程序员编写mapreduce程序,它允许用户混用C++JavaRecordReaderMapperPartitionerRducerRecordWriter等五个组件。关于Hadoop pipes的设计思想,可参见我这篇文章:

本文介绍了Hadoop pipes编程的基本方法,并给出了若干编程示例,最后介绍了Hadoop pipes高级编程方法,包括怎样在MapReduce中加载词典,怎么传递参数,怎样提高效率等。

2. Hadoop pipes编程初体验

Hadoop-0.20.2源代码中自带了三个pipes编程示例,它们位于目录src/examples/pipes/impl中,分别为wordcount-simple.ccwordcount-part.ccwordcount-nopipe.cc。下面简要介绍一下这三个程序。

1 wordcount-simple.ccMapperReducer组件采用C++语言编写,RecordReader, PartitionerRecordWriter采用Java语言编写,其中,RecordReader LineRecordReader(位于InputTextInputFormat中,按行读取数据,行所在的偏移量为key,行中的字符串为value),PartitionerPipesPartitionerRecordWriterLineRecordWriter(位于InputTextOutputFormat中,输出格式为”key\tvalue\n”

2 wordcount-part.ccMapperPartitionerReducer组件采用C++语言编写,其他采用Java编写

3wordcount-nopipe.ccRecordReaderMapperRducerRecordWriter采用C++编写

接下来简单介绍一下wordcount-simple.cc的编译和运行方法。

Hadoop的安装目录下,执行下面命令:

ant -Dcompile.c++=yes examples

wordcount-simple.cc生成的可执行文件wordcount-simple被保存到了目录build/c++-examples/Linux-amd64-64/bin/中,然后将该可执行文件上传到HDFS的某一个目录下,如/user/XXX/ bin下:

bin/hadoop -put build/c++-examples/Linux-amd64-64/bin/wordcount-simple /user/XXX/ bin/

上传一份数据到HDFS/user/XXX /pipes_test_data目录下:

bin/hadoop -put data.txt /user/XXX /pipes_test_data

直接使用下面命令提交作业:

bin/hadoop pipes \

-D hadoop.pipes.java.recordreader=true \

-D hadoop.pipes.java.recordwriter=true \

-D mapred.job.name= wordcount \

-input /user/XXX /pipes_test_data \

-output /user/XXX /pipes_test_output \

-program /user/XXX/ bin/wordcount-simple

3. Hadoop pipes编程方法

先从最基础的两个组件MapperReducer说起。

(1) Mapper编写方法

用户若要实现Mapper组件,需继承HadoopPipes::Mapper虚基类,它的定义如下:

class Mapper: public Closable {

public:

virtual void map(MapContext& context) = 0;

};

用户必须实现map函数,它的参数是MapContext,该类的声明如下:

class MapContext: public TaskContext {

public:

virtual const std::string& getInputSplit() = 0;

virtual const std::string& getInputKeyClass() = 0;

virtual const std::string& getInputValueClass() = 0;

};

TaskContext类地声明如下:

class TaskContext {

public:

class Counter {

……

public:

Counter(int counterId) : id(counterId) {}

Counter(const Counter& counter) : id(counter.id) {}

……

};

virtual const JobConf* getJobConf() = 0;

virtual const std::string& getInputKey() = 0;

virtual const std::string& getInputValue() = 0;

virtual void emit(const std::string& key, const std::string& value) = 0;

virtual void progress() = 0;

…….

};

用户可以从context参数中获取当前的key