pandownload服务器(pandownload客户端)「pandownload服务器连接失败」

作者先容

  杜亦舒,创业中,技能合资人,喜好研究分享技能。个人订阅号:性能与架构。

  本文旨在帮您快速相识MapReduce的工作机制和开辟方法,办理以下几个题目:

MapReduce根本原理是什么?

MapReduce的实行过程是怎么样的?

MapReduce的核心流程细节

怎样举行MapReduce程序开辟?(通过7个实例渐渐把握)

  文章中提供了程序实例中涉及到的测试数据文件,可以直接下载利用。

  关于实践环境,假如您不喜好本身搭建Hadoop环境,可以下载利用本教程提供的环境,实践部分内容中会先容具体利用方法。

  通过学习并实践完成后,可以对MapReduce工作原理有比力清楚的认识,并把握MapReduce的编程思绪。

  大纲:

  一、MapReduce根本原理

  二、MapReduce入门示例-WordCount单词统计

  三、MapReduce实行过程分析

实例1-自界说对象序列化

实例2-自界说分区

实例3-盘算出每组订单中金额最大的记录

实例4-归并多个小文件

实例5-分组输出到多个文件

  四、MapReduce核心流程梳理

实例6-join操纵

实例7-盘算出用户间的共同好友

  五、下载方式

  一、MapReduce根本原理

  MapReduce是一种编程模子,用于大规模数据集的分布式运算。

  1、MapReduce普通表明

  图书馆要盘点图书数量,有10个书架,管理员为了加快统计速率,找来了10个同砚,每个同砚负责统计一个书架的图书数量。

  张同砚统计书架1

  王同砚统计书架2

  刘同砚统计书架3

  ……

  过了一会儿,10个同砚连续到管理员这报告本身的统计数字,管理员把各个数字加起来,就得到了图书总数。

  这个过程就可以明白为MapReduce的工作过程。

  2、MapReduce中有两个核心操纵

  (1)map

  管理员分配哪个同砚统计哪个书架,每个同砚都举行雷同的“统计”操纵,这个过程就是map。

  (2)reduce

  每个同砚的结果举行汇总,这个过程是reduce。

  3、MapReduce工作过程拆解

  下面通过一个景点案例(单词统计)看MapReduce是怎样工作的。

  有一个文本文件,被分成了4份,分别放到了4台服务器中存储

  Text1:theweatherisgood

  Text2:todayisgood

  Text3:goodweatherisgood

  Text4:todayhasgoodweather

  如今要统计出每个单词的出现次数。

  

  处理惩罚过程

  (1)拆分单词

map节点1

  输入:“theweatherisgood”

  输出:(the,1),(weather,1),(is,1),(good,1)

map节点2

  输入:“todayisgood”

  输出:(today,1),(is,1),(good,1)

map节点3

  输入:“goodweatherisgood”

  输出:(good,1),(weather,1),(is,1),(good,1)

map节点4

  输入:“todayhasgoodweather”

  输出:(today,1),(has,1),(good,1),(weather,1)

  (2)排序

map节点1

map节点2

map节点3

map节点4

  (3)归并

map节点1

map节点2

map节点3

map节点4

  (4)汇总统计

  每个map节点都完成以后,就要进入reduce阶段了。

  比方利用了3个reduce节点,必要对上面4个map节点的结果举行重新组合,比如按照26个字母分成3段,分配给3个reduce节点。

  Reduce节点举行统计,盘算出最闭幕果。

  

  这就是最根本的MapReduce处理惩罚流程。

  4、MapReduce编程思绪

  相识了MapReduce的工作过程,我们思考一下用代码实现时必要做哪些工作?

在4个服务器中启动4个map任务

每个map任务读取目标文件,每读一行就拆分一下单词,并记下来次单词出现了一次

目标文件的每一行都处理惩罚完成后,必要把单词举行排序

在3个服务器上启动reduce任务

每个reduce获取一部分map的处理惩罚结果

reduce任务举行汇总统计,输出终极的结果数据

  但不消担心,MapReduce是一个非常良好的编程模子,已经把绝大多数的工作做完了,我们只必要关心2个部分:

map处理惩罚逻辑——对传进来的一行数据如那边理惩罚?输出什么信息?

reduce处理惩罚逻辑——对传进来的map处理惩罚结果如那边理惩罚?输出什么信息?

  编写好这两个核心业务逻辑之后,只必要几行简单的代码把map和reduce装配成一个job,然后提交给Hadoop集群就可以了。

  至于别的的复杂细节,比方怎样启动map任务和reduce任务、怎样读取文件、如对map结果排序、怎样把map结果数据分配给reduce、reduce怎样把最闭幕果生存到文件等等,MapReduce框架都帮我们做好了,而且还支持很多自界说扩展设置,比方怎样读文件、怎样构造map大概reduce的输出结果等等,背面的示例中会有先容。

  二、MapReduce入门示例:WordCount单词统计

  WordCount黑白常好的入门示例,相称于helloword,下面就开辟一个WordCount的MapReduce程序,体验实际开辟方式。

  1、安装Hadoop实践环境

  您可以选择本身搭建环境,也可以利用打包好的Hadoop环境(版本2.7.3)。

  这个Hadoop环境实际上是一个虚机镜像,以是必要安装virtualbox假造机、vagrant镜像管理工具,和我的Hadoop镜像,然后用这个镜像启动虚机就可以了,下面是具体操纵步调:

  (1)安装virtualbox

  下载地点:https://www.virtualbox.org/wiki/Downloads

  (2)安装vagrant

  由于官网下载较慢,我上传到了云盘

  Windows版

  链接:https://pan.baidu.com/s/1pKKQGHl

  暗码:eykr

  Mac版

  链接:https://pan.baidu.com/s/1slts9yt

  暗码:aig4

  安装完成后,在下令行终端下就可以利用vagrant下令。

  (3)下载Hadoop镜像

  链接:https://pan.baidu.com/s/1bpaisnd

  暗码:pn6c

  (4)启动

pandownload服务器(pandownload 客户端) pandownload服务器(pandownload 客户端)「pandownload服务器连接失败」 行业资讯

  加载Hadoop镜像

  vagrantboxadd{自界说镜像名称}{镜像地点路径}

  比方您想定名为Hadoop,镜像下载后的路径为d:hadoop.box,加载下令就是如许:

  vagrantboxaddhadoopd:hadoop.box

  创建工作目次,比方d:hdfstest。

  进入此目次,初始化

  cdd:hdfstest

  vagrantinithadoop

  启动虚机

  vagrantup

  启动完成后,就可以利用SSH客户端登录虚机了

  IP127.0.0.1

  端口2222

  用户名root

  暗码vagrant

  在Hadoop服务器中启动HDFS和Yarn,之后就可以运行MapReduce程序了

  start-dfs.sh

  start-yarn.sh

  2、创建项目

  注:流程是在本机开辟,然后打包,上传到Hadoop服务器上运行。

  新建项目目次wordcount,此中新建文件pom.xml,内容:

  然后创建源码目次src/main/java

  如今的目次布局

  

  3、代码

  mapper程序:src/main/java/WordcountMapper.java

  内容:

  这里界说了一个mapper类,此中有一个map方法。MapReduce框架每读到一行数据,就会调用一次这个map方法。

  map的处理惩罚流程就是吸取一个keyvalue对儿,然后举行业务逻辑处理惩罚,末了输出一个keyvalue对儿。

  MapperLongWritable,Text,Text,IntWritable

  此中的4个范例分别是:输入key范例、输入value范例、输出key范例、输出value范例。

  MapReduce框架读到一行数据侯以keyvalue情势传进来,key默认环境下是mr矿机所读到一行文本的起始偏移量(Long范例),value默认环境下是mr框架所读到的一行的数据内容(String范例)。

  输出也是keyvalue情势的,是用户自界说逻辑处理惩罚完成后界说的key,用户本身决定用什么作为key,value是用户自界说逻辑处理惩罚完成后的value,内容和范例也是用户本身决定。

  此例中,输出key就是word(字符串范例),输出value就是单词数量(整型)。

  这里的数据范例和我们常用的不一样,由于MapReduce程序的输出数据必要在差别呆板间传输,以是必须是可序列化的,比方Long范例,Hadoop中界说了本身的可序列化范例LongWritable,String对应的是Text,int对应的是IntWritable。

  reduce程序:src/main/java/WordCountReducer.java

  

  这里界说了一个Reducer类和一个reduce方法。

  当传给reduce方法时,就变为:

  ReducerText,IntWritable,Text,IntWritable

  4个范例分别指:输入key的范例、输入value的范例、输出key的范例、输出value的范例。

  必要留意,reduce方法吸取的是:一个字符串范例的key、一个可迭代的数据集。由于reduce任务读取到map任务处理惩罚结果是如许的:

  (good,1)(good,1)(good,1)(good,1)

  当传给reduce方法时,就变为:

  key:good

  value:(1,1,1,1)

  以是,reduce方法吸取到的是同一个key的一组value。

  主程序:src/main/java/WordCountMapReduce.java

  

  这个main方法就是用来组装一个job并提交实行

  4、编译打包

  在pom.xml地点目次下实行打包下令:

  mvnpackage

  实行完成后,会主动天生target目次,此中有打包好的jar文件。

  如今项目文件布局:

  

  5、运行

  先把target中的jar上传到Hadoop服务器,然后在Hadoop服务器的HDFS中预备测试文件(把Hadoop地点目次下的txt文件都上传到HDFS)

  cd$HADOOP_HOME

  hdfsdfs-mkdir-p/wordcount/input

  hdfsdfs-put*.txt/wordcount/input

  实行wordcountjar

  hadoopjarmapreduce-wordcount-0.0.1-SNAPSHOT.jarWordCountMapR

  educe/wordcount/input/wordcount/output

  实行完成后验证

  hdfsdfs-cat/wordcount/output/*

  可以看到单词数量统计结果。

  三、MapReduce实行过程分析

  下面看一下从job提交到实行完成这个过程是怎样。

  (1)客户端提交任务

  Client提交任务时会先到HDFS中查察目标文件的巨细,相识要获取的数据的规模,然后形成任务分配的规划,比方:

  a.txt0-128M交给一个task,128-256M交给一个task,b.txt0-128M交给一个task,128-256M交给一个task...,形成规划文件job.split。

  然后把规划文件job.split、jar、设置文件xml提交给yarn(Hadoop集群资源管理器,负责为任务分配符合的服务器资源)

  

  (2)启动appmaster

  注:appmaster是本次job的主管,负责maptask和reducetask的启动、监控、和谐管理工作。

  yarn找一个符合的服务器来启动appmaster,并把job.split、jar、xml交给它。

  

  (3)启动maptask

  Appmaster启动后,根据固化文件job.split中的分片信息启动maptask,一个分片对应一个maptask。

  分配maptask时,会只管让maptask在目标数据地点的datanode上实行。

  

  (4)实行maptask

  Maptask会一行行地读目标文件,交给我们写的map程序,读一行就调一次map方法,map调用context.write把处理惩罚结果写出去,生存到本机的一个结果文件,这个文件中的内容是分区且有序的。

  分区的作用就是界说哪些key在一组,一个分区对应一个reducer。

  

  (5)启动reducetask

  Maptask都运行完成后,appmaster再启动reducetask,maptask的结果中有几个分区就启动几个reducetask。

  

  (6)实行reducetask

  reducetask去读取maptask的结果文件中本身对应的谁人分区数据,比方reducetask_01去读第一个分区中的数据。

  reducetask把读到的数据按key构造好,传给reduce方法举行处理惩罚,处理惩罚结果写到指定的输出路径。

  

  四、实例1:自界说对象序列化

  1、需求与实现思绪

  (1)需求

  必要统计手机用户流量日记,日记内容实例:

  

  要把同一个用户的上行流量、下行流量举行累加,并盘算出综合。

  比方上面的13897230503有两条记录,就要对这两条记录举行累加,盘算总和,得到:

  13897230503,500,1600,2100

  (2)实现思绪

map

  吸取日记的一行数据,key为行的偏移量,value为此行数据。

  输出时,应以手机号为key,value应为一个团体,包罗:上行流量、下行流量、总流量。

  手机号是字符串范例Text,而这个团体不能用根本数据范例表现,必要我们自界说一个bean对象,而且要实现可序列化。

  key:13897230503

  value:upFlow:100,dFlow:300,sumFlow:400

reduce

  吸取一个手机号标识的key,及这个手机号对应的bean对象聚集。

  比方:

  key:

  13897230503

  value:

  upFlow:400,dFlow:1300,sumFlow:1700,

  upFlow:100,dFlow:300,sumFlow:400

  迭代bean对象聚集,累加各项,形成一个新的bean对象,比方:

  upFlow:400+100,dFlow:1300+300,sumFlow:1700+400

  末了输出:

  key:13897230503

  value:upFlow:500,dFlow:1600,sumFlow:2100

  2、代码实践

  (1)创建项目

  新建项目目次serializebean,此中新建文件pom.xml,内容:

  然后创建源码目次src/main/java

  如今项目目次的文件布局

  

  (2)代码

  自界说bean:src/main/java/FlowBean

  

  MapReduce程序:src/main/java/FlowCount

  

  

  (3)编译打包

  在pom.xml地点目次下实行打包下令:

  mvnpackage

  实行完成后,会主动天生target目次,此中有打包好的jar文件。

  如今项目文件布局:

  

  (4)运行

  先把target中的jar上传到Hadoop服务器,然后下载测试数据文件:

  链接:https://pan.baidu.com/s/1skTABlr

  暗码:tjwy

  上传到HDFS

  hdfsdfs-mkdir-p/flowcount/input

  hdfsdfs-putflowdata.log/flowcount/input

  运行

  hadoopjarmapreduce-serializebean-0.0.1-SNAPSHOT.jarFlowCount

  /flowcount/input/flowcount/output2

  查抄

  hdfsdfs-cat/flowcount/output/*

  五、实例2:自界说分区

  1、需求与实现思绪

  (1)需求

  还是以上个例子的手机用户流量日记为例:

  

  在上个例子的统计必要底子上添加一个新需求:按省份统计,差别省份的手机号放到差别的文件里。

  比方137表现属于河北,138属于河南,那么在结果输出时,他们分别在差别的文件中。

  (2)实现思绪

  map和reduce的处理惩罚思绪与上例雷同,这里必要多做2步:

自界说一个分区器Partitioner

  根据手机号判定属于哪个分区。有几个分区就有几个reducetask,每个reducetask输出一个文件,那么,差别分区中的数据就写入了差别的结果文件中。

  

在main程序中指定利用我们自界说的Partitioner即可

  2、代码实践

  (1)创建项目

  新建项目目次custom_partion,此中新建文件pom.xml,内容:

  

  然后创建源码目次src/main/java

  如今项目目次的文件布局

  

  (2)代码

  自界说bean:src/main/java/FlowBean.java

  自界说分区器:src/main/java/ProvincePartitioner.java

  这段代码是本示例的重点,此中界说了一个hashmap,假设其是一个数据库,界说了手机号和分区的关系。

  getPartition取得手机号的前缀,到数据库中获取区号,假如没在数据库中,就指定其为“别的分区”(用4代表)

  MapReduce程序:src/main/java/FlowCount.java

  main程序中指定了利用自界说的分区器

  job.setPartitionerClass(ProvincePartitioner.class);

  (3)编译打包

  在pom.xml地点目次下实行打包下令:

  mvnpackage

  实行完成后,会主动天生target目次,此中有打包好的jar文件

  如今项目文件布局

  

  (4)运行

  先把target中的jar上传到Hadoop服务器

  运行

  hadoopjarmapreduce-custompartion-0.0.1-SNAPSHOT.jarFlowCount

  /flowcount/input/flowcount/output-part

  查抄

  hdfsdfs-ls/flowcount/output-part

  六、实例3:盘算出每组订单中金额最大的记录

  1、需求与实现思绪

  (1)需求

  有如下订单数据:

  

  必要求出每一个订单中成交金额最大的一笔买卖业务。

  (2)实现思绪

  先先容一个概念GroupingComparator组比力器,通过WordCount来明白它的作用。

  WordCount中map处理惩罚完成后的结果数据是如许的:

  good,1

  good,1

  good,1

  is,1

  is,1

  Reducer会把这些数据都读进来,然后举行分组,把key雷同的放在一组,形成如许的情势:

  good,[1,1,1]

  is,[1,1]

  然后对每一组数据调用一次reduce(key,Iterable,...)方法。

  此中分组的操纵就必要用到GroupingComparator,对key举行比力,雷同的放在一组。

  注:上例中的Partitioner是属于mapDuang的,GroupingComparator是属于reduce端的。

  下面看团体实现思绪。

  1)界说一个订单bean

  属性包罗:订单号、金额

  {itemid,amount}

  要实现可序列化,与比力方法compareTo,比力规则:订单号差别的,按照订单比如力,雷同的,按照金额比力。

  2)界说一个Partitioner

  根据订单号的hashcode分区,可以包管订单号雷同的在同一个分区,以便reduce中吸取到同一个订单的全部记录。

  同分区的数据是序的,这就用到了bean中的比力方法,可以让订单号雷同的记录按照金额从大到小排序。

  在map方法中输出数据时,key就是bean,value为null。

  map的结果数据情势比方:

  

  3)界说一个GroupingComparator

  由于map的结果数据中key是bean,不是平凡数据范例,以是必要利用自界说的比力器来分组,就利用bean中的订单号来比力。

  比方读取到分区1的数据:

  {Order_0000001222.8},null,

  {Order_000000125.8},null,

  {Order_0000003222.8},null

  举行比力,前两条数据的订单号雷同,放入一组,默认是以第一条记录的key作为这组记录的key。

  分组后的情势如下:

  {Order_0000001222.8},[null,null],

  {Order_0000003222.8},[null]

  在reduce方法中收到的每组记录的key就是我们终极想要的结果,以是直接输出到文件就可以了。

  

  2、代码实践

  (1)创建项目

  新建项目目次groupcomparator,此中新建文件pom.xml,内容:

  

  然后创建源码目次src/main/java

  如今项目目次的文件布局

  

  (2)代码

  **自界说bean:**src/main/java/OrderBean.java

  

  

  自界说分区器:src/main/java/ItemIdPartitioner.java

  

  自界说比力器:src/main/java/MyGroupingComparator.java

  

  MapReduce程序:src/main/java/GroupSort.java

  

  

  (3)编译打包

  在pom.xml地点目次下实行打包下令:

  mvnpackage

  实行完成后,会主动天生target目次,此中有打包好的jar文件

  如今项目文件布局

  

  (4)运行

  先把target中的jar上传到Hadoop服务器

  下载测试数据文件

  链接:https://pan.baidu.com/s/1pKKlvh5

  暗码:43xa

  上传到HDFS

  hdfsdfs-putorders.txt/

  运行

  hadoopjarmapreduce-groupcomparator-0.0.1-SNAPSHOT.jarGroupSo

  rt/orders.txt/outputOrders

  查抄

  hdfsdfs-ls/outputOrders

  hdfsdfs-cat/outputOrders/*

  七、实例4:归并多个小文件

  1、需求与实现思绪

  (1)需求

  要盘算的目标文件中有大量的小文件,会造因素配任务和资源的开销比实际的盘算开销还打,这就产生了服从斲丧。

  必要先把一些小文件归并成一个大文件。

  (2)实现思绪

  文件的读取由map负责,在前面的表示图中可以看到一个inputformat用来读取文件,然后以keyvalue情势转达给map方法。

  我们要自界说文件的读取过程,就必要相识其细节流程:

  

  以是我们必要自界说一个inputformat和RecordReader。

  Inputformat利用我们本身的RecordReader,RecordReader负责实现一次读取一个完备文件封装为keyvalue。

  map吸取到文件内容,然后以文件名为key,以文件内容为value,向外输出的格式要留意,要利用SequenceFileOutPutFormat(用来输出对象)。

  由于reduce收到的keyvalue都是对象,不是平凡的文本,reduce默认的输特别式是TextOutputFormat,利用它的话,终极输出的内容就是对象ID,以是要利用SequenceFileOutPutFormat举行输出。

  2、代码实践

  (1)创建项目inputformat,此中新建文件pom.xml,内容:

  

  然后创建源码目次src/main/java

  如今项目目次文件布局

  

  (2)代码

  自界说inputform:src/main/java/MyInputFormat.java

  

  createRecordReader方法中创建一个自界说的reader

  自界说reader:src/main/java/MyRecordReader.java

  此中有3个核心方法:nextKeyValue、getCurrentKey、getCurrentValue。

  nextKeyValue负责天生要转达给map方法的key和value。getCurrentKey、getCurrentValue是实际获取key和value的。以是RecordReader的核心机制就是:通过nextKeyValue天生keyvalue,然后通过getCurrentKey和getCurrentValue来返回上面构造好的keyvalue。这里的nextKeyValue负责把整个文件内容作为value。

  MapReduce程序:src/main/java/ManyToOne.java

  main程序中指定利用我们自界说的MyInputFormat,输出利用SequenceFileOutputFormat。

  (3)编译打包

  在pom.xml地点目次下实行打包下令:

  mvnpackage

  实行完成后,会主动天生target目次,此中有打包好的jar文件。

  如今项目文件布局

  

  (4)运行

  先把target中的jar上传到Hadoop服务器。

  预备测试文件,把Hadoop目次中的设置文件上传到HDFS

  hdfsdfs-mkdir/files

  hdfsdfs-put$HADOOP_HOME/etc/hadoop/*.xml/files

  运行

  hadoopjarmapreduce-inputformat-0.0.1-SNAPSHOT.jarManyToOne/

  files/onefile

  查抄

  hdfsdfs-ls/onefile

  八、实例5:分组输出到多个文件

  1、需求与实现思绪

  (1)需求

  

  必要把雷同订单id的记录放在一个文件中,并以订单id定名。

  (2)实现思绪

  这个需求可以直接利用MultipleOutputs这个类来实现。

  默认环境下,每个reducer写入一个文件,文件名由分区号定名,比方'part-r-00000',而MultipleOutputs可以用key作为文件名,比方‘Order_0000001-r-00000’。

  以是,思绪就是map中处理惩罚每条记录,以‘订单id’为key,reduce中利用MultipleOutputs举行输出,会主动以key为文件名,文件内容就是雷同key的全部记录。

  比方‘Order_0000001-r-00000’的内容就是:

  Order_0000001,Pdt_05,25.8

  Order_0000001,Pdt_01,222.8

  2、代码实践

  (1)创建项目

  新建项目目次multioutput,此中新建文件pom.xml,内容:

  然后创建源码目次src/main/java

  如今项目目次的文件布局

  

  (2)代码

  MapReduce程序:src/main/java/MultipleOutputTest.java

  

  

  (3)编译打包

  在pom.xml地点目次下实行打包下令:

  mvnpackage

  实行完成后,会主动天生target目次,此中有打包好的jar文件。

  如今项目文件布局

  

  (4)运行

  先把target中的jar上传到Hadoop服务器

  然后运行

  hadoopjarmapreduce-multipleOutput-0.0.1-SNAPSHOT.jarMultiple

  OutputTest/orders.txt/output-multi

  查抄

  hdfsdfs-ls/output-multi

  九、MapReduce核心流程梳理

  我们已经相识了MapReduce的大概流程:

  (1)maptask从目标文件中读取数据

  (2)mapper的map方法处理惩罚每一条数据,输出到文件中

  (3)reducer读取map的结果文件,举行分组,把每一组交给reduce方法举行处理惩罚,末了输出到指定路径。

  

  这是最根本的流程,有助于快速明白MapReduce的工作方式。

  通过上面的几个示例,我们要经打仗了一些更深入的细节,比方mapper的inputform中尚有RecordReader、reducer中尚有GroupingComparator。

  下面就看一下更加深入的处理惩罚流程。

  1、Maptask中的处理惩罚流程

  (1)读文件流程

  

  目标文件会被按照规划文件举行切分,inputformat调用RecordReader读取文件切片,RecordReader会天生keyvalue对儿,转达给Mapper的mao方法。

  (2)写入结果文件的流程

  从Mapper的map方法调用context.write之后,到形成结果数据文件这个过程是比力复杂的。

  

  context.write不是直接写入文件,而是把数据交给OutputCollector,OutputCollector把数据写入‘环形缓冲区’。‘环形缓冲区’中的数据会举行排序。

  由于缓冲区的巨细是有限定的,以是每当快满时(到达80%)就要把此中的数据写出去,这个过程叫做数据溢出。

  溢出到一个文件中,溢出过程会对这批数据举行分组、比力操纵,然后吸入文件,以是溢出文件中的数据是分好区的,而且是有序的。每次溢出都会产生一个溢出数据文件,以是会有多个。

  当map处理惩罚完全数据后,就会对各个溢出数据文件举行归并,每个文件中雷同区的数据放在一起,并再次排序,末了得到一个团体的结果文件,此中是分区且有序的。

  如许就完成了map过程,读数据过程和写结果文件的过程连合起来如下图:

  

  2、Reducetask的处理惩罚流程

  

  reducetask去读每个maptask产生的结果文件中本身所负责的分区数据,读到本身本地。对多个数据文件举行归并排序,然后通过GroupingComparator举行分组,把雷同key的数据放到一组。对每组数据调一次reduce方法,处理惩罚完成后写入目标路径文件。

  3、团体流程

  把map和reduce的过程连合起来:

  

  十、实例6:join操纵

  1、需求与实现思绪

  (1)需求

  有2个数据文件:订单数据、商品信息。

  订单数据表order

  

  商品信息表product

  

  必要用MapReduce程序来实现下面这个SQL查询运算:

  selecto.idorder_id,o.date,o.amount,p.idp_id,p.pname,p.c

  ategory_id,p.price

  fromt_orderojoint_productpono.pid=p.id

  (2)实现思绪

  SQL的实行结果是如许的:

  

  实际上就是给每条订单记录增补上商品表中的信息。

  实现思绪:

  1)界说bean

pandownload服务器(pandownload 客户端) pandownload服务器(pandownload 客户端)「pandownload服务器连接失败」 行业资讯

  把SQL实行结果中的各列封装成一个bean对象,实现序列化。

  bean中还要有一个别的的属性flag,用来标识此对象的数据是订单还是商品。

  2)map处理惩罚

  map会处理惩罚两个文件中的数据,根据文件名可以知道当前这条数据是订单还是商品。

  对每条数据创建一个bean对象,设置对应的属性,并标识flag(0代表order,1代表product)

  以join的关联项“productid”为key,bean为value举行输出。

  3)reduce处理惩罚

  reduce方法吸取到pid雷同的一组bean对象。

  遍历bean对象聚集,假如bean是订单数据,就放入一个新的订单聚集中,假如是商品数据,就生存到一个商品bean中。然后遍历谁人新的订单聚集,利用商品bean的数据对每个订单bean举行信息补全。

  如许就得到了完备的订单及其商品信息。

  2、代码实践

  (1)创建项目

  新建项目目次jointest,此中新建文件pom.xml,内容:

  

  

  然后创建源码目次src/main/java

  如今项目目次的文件布局

  

  (2)代码

  **封装bean:**src/main/java/InfoBean.java

  

  

  MapReduce程序:src/main/java/JoinMR.java

  

  

  

  

  (3)编译打包

  在pom.xml地点目次下实行打包下令:

  mvnpackage

  实行完成后,会主动天生target目次,此中有打包好的jar文件。

  如今项目文件布局

  

  (4)运行

  先把target中的jar上传到Hadoop服务器

  下载产物和订单的测试数据文件

  链接:https://pan.baidu.com/s/1pLRnm47

  暗码:cg7x

  链接:https://pan.baidu.com/s/1pLrvsfT

  暗码:j2zb

  上传到HDFS

  hdfsdfs-mkdir-p/jointest/input

  hdfsdfs-putorder.txt/jointest/input

  hdfsdfs-putproduct.txt/jointest/input

  运行

  hadoopjarjoinmr.jarcom.dys.mapreducetest.join.JoinMR/jointe

  st/input/jointest/output

  查抄

  hdfsdfs-cat/jointest/output/*

  十一、实例7:盘算出用户间的共同好友

  1、需求与实现思绪

  (1)需求

  下面是用户的好友关系列表,每一行代表一个用户和他的好友列表。

  

  必要求出哪些人两两之间有共同好友,及他俩的共同好友都有谁。

  比方从前2天记录中可以看出,C、E是A、B的共同好友,终极的情势如下:

  

  (2)实现思绪

  之前的示例中都是一个MapReduce盘算出来的,这里我们利用2个MapReduce来实现。

  1)第1个MapReduce

map

  找出每个用户都是谁的好友,比方:

  读一行A:B,C,D,F,E,O(A的好友有这些,反过来拆开,这些人中的每一个都是A的好友)

  输出B,AC,AD,AF,AE,AO,A

  再读一行B:A,C,E,K

  输出A,BC,BE,BK,B

  ……

reduce

  key雷同的会分到一组,比方:

  C,AC,BC,EC,FC,G......

  Key:C

  value:[A,B,E,F,G]

  意义是:C是这些用户的好友。

  遍历value就可以得到:

  AB有共同好友C

  AE有共同好友C

  ...

  BE有共同好友C

  BF有共同好友C

  输出:

  A-B,C

  A-E,C

  A-F,C

  A-G,C

  B-E,C

  B-F,C

  .....

  2)第2个MapReduce

  对上一步的输出结果举行盘算。

map

  读出上一步的结果数据,构造成keyvalue直接输出

  比方:

  读入一行A-B,C

  直接输出A-B,C

reduce

  读入数据,key雷同的在一组

  A-B,CA-B,FA-B,G......

  输出:

  A-BC,F,G,.....

  如许就得出了两个用户间的共同好友列表

  2、代码实践

  (1)创建项目

  新建项目目次jointest,此中新建文件pom.xml,内容:

  

  

  然后创建源码目次src/main/java

  如今项目目次的文件布局

  

  (2)代码

  第一步的MapReduce程序:src/main/java/StepFirst.java

  

  

  第二步的MapReduce程序:src/main/java/StepSecond.java

  

  

  (3)编译打包

  在pom.xml地点目次下实行打包下令:

  mvnpackage

  实行完成后,会主动天生target目次,此中有打包好的jar文件。

  如今项目文件布局

  

  (4)运行

  先把target中的jar上传到Hadoop服务器

  下载测试数据文件

  链接:https://pan.baidu.com/s/1o8fmfbG

  暗码:kbut

  上传到HDFS

  hdfsdfs-mkdir-p/friends/input

  hdfsdfs-putfriendsdata.txt/friends/input

  运行第一步

  hadoopjarmapreduce-friends-0.0.1-SNAPSHOT.jarStepFirst/frie

  nds/input/friendsdata.txt/friends/output01

  运行第二步

  hadoopjarmapreduce-friends-0.0.1-SNAPSHOT.jarStepSecond/fri

  ends/output01/part-r-00000/friends/output02

  查察结果

  hdfsdfs-ls/friends/output02hdfsdfs-cat/friends/output02/*

  十二、小结

  MapReduce的底子内容先容完了,盼望可以资助您快速认识MapReduce的工作原理和开辟方法。如有品评与发起(比方内容有误、不敷的地方、改进发起等),欢迎留言讨论。

  提示:如需下载本文,点击文末【阅读原文】或登录云盘https://pan.baidu.com/s/1bpxSCZt举行下载。

  相干专题:

这是一篇最普通易懂的HadoopHDFS实践攻略!

MaxScale:实现MySQL读写分离与负载均衡的中心件利器

  精选专题(官网:dbaplus.cn)

◆近期热文◆

  干货!谈主动化运维平台的地基怎样打牢前聚美优品运维负责人:CMDB的那些事儿解锁MySQL备份规复的4种精确姿势DBA要赋闲了?看ML怎样主动优化数据库从HPE净亏损超6亿$看企业家精力

◆MVP专栏◆

杨志洪丨杨建荣丨邹德裕丨韩锋丨欧阳辰

网易丨腾讯云丨百度丨朱祥磊丨卢钧轶

◆近期活动◆

DAMS中国数据资产管理峰会上海站

峰会官网:www.dams.org.cn

客户评论

我要评论