我们通过提交jar包,进行MapReduce处理,那么整个运行过程分为五个环节:
1、向client端提交MapReduce job.
2、随后yarn的ResourceManager进行资源的分配.
3、由NodeManager进行加载与监控containers.
4、通过applicationMaster与ResourceManager进行资源的申请及状态的交互,由NodeManagers进行MapReduce运行时job的管理.
5、通过hdfs进行job配置文件、jar包的各节点分发。
Job 提交过程
job的提交通过 调用submit()方法 创建一个 JobSubmitter 实例,并 调用submitJobInternal() 方法。整个job的运行过程如下:
1、向ResourceManager申请application ID,此ID为该MapReduce的jobId。
2、检查output的路径是否正确,是否已经被创建。
3、计算input的splits。
4、拷贝运行job 需要的jar包、配置文件以及计算input的split 到各个节点。
5、在ResourceManager中调用submitAppliction()方法,执行job
Job 初始化过程
1、当resourceManager收到了submitApplication()方法的调用通知后,scheduler开始分配container,随之ResouceManager发送applicationMaster进程,告知每个nodeManager管理器。
2、 由applicationMaster决定 如何运行tasks,如果job数据量比较小,applicationMaster便选择 将tasks运行在一个JVM中 。那么如何判别这个job是大是小呢?当一个job的 mappers数量小于10个 , 只有一个reducer或者读取的文件大小要小于一个HDFS block时 ,(可通过修改配置项mapreduce.job.ubertask.maxmaps,mapreduce.job.ubertask.maxreduces以及mapreduce.job.ubertask.maxbytes 进行调整)
3、在运行tasks之前,applicationMaster将会 调用setupJob()方法 ,随之创建output的输出路径(这就能够解释,不管你的mapreduce一开始是否报错,输出路径都会创建)
Task 任务分配
1、接下来applicationMaster向ResourceManager请求containers用于执行map与reduce的tasks(step 8),这里map task的优先级要高于reduce task,当所有的map tasks结束后,随之进行sort(这里是shuffle过程后面再说),最后进行reduce task的开始。(这里有一点,当map tasks执行了百分之5%的时候,将会请求reduce,具体下面再总结)
2、运行tasks的是需要消耗内存与CPU资源的, 默认情况下,map和reduce的task资源分配为1024MB与一个核 ,(可修改运行的最小与最大参数配置,mapreduce.map.memory.mb,mapreduce.reduce.memory.mb,mapreduce.map.cpu.vcores,mapreduce.reduce.reduce.cpu.vcores.)
Task 任务执行
1、这时一个task已经被ResourceManager分配到一个container中,由applicationMaster告知nodemanager启动container,这个task将会被一个 主函数为YarnChild 的java application运行,但在运行task之前, 首先定位task需要的jar包、配置文件以及加载在缓存中的文件 。
2、YarnChild运行于一个专属的JVM中,所以 任何一个map或reduce任务出现问题,都不会影响整个nodemanager的crash或者hang 。
3、每个task都可以在相同的JVM task中完成,随之将完成的处理数据写入临时文件中。
Mapreduce数据流
运行进度与状态更新
1、MapReduce是一个较长运行时间的批处理过程,可以是一小时、几小时甚至几天,那么Job的运行状态监控就非常重要。每个job以及 每个task都有一个包含job(running,successfully completed,failed)的状态 ,以及value的计数器,状态信息及描述信息(描述信息一般都是在代码中加的打印信息),那么,这些信息是如何与客户端进行通信的呢?
2、当一个task开始执行,它将会保持运行记录,记录task完成的比例,对于map的任务,将会记录其运行的百分比,对于reduce来说可能复杂点,但系统依旧会估计reduce的完成比例。当一个map或reduce任务执行时, 子进程会持续每三秒钟与applicationMaster进行交互 。
Job 完成
最终,applicationMaster会收到一个job完成的通知,随后改变job的状态为successful。最终,applicationMaster与task containers被清空。
Shuffle与Sort
从map到reduce的过程,被称之为shuffle过程,MapReduce使到reduce的数据一定是经过key的排序的,那么shuffle是如何运作的呢?
当map任务将数据output时, 不仅仅是将结果输出到磁盘,它是将其写入内存缓冲区域,并进行一些预分类 。
1、The Map Side
首先map任务的 output过程是一个环状的内存缓冲区,缓冲区的大小默认为100MB(可通过修改配置项mpareduce.task.io.sort.mb进行修改),当写入内存的大小到达一定比例 ,默认为80% (可通过mapreduce.map.sort.spill.percent配置项修改),便开始写入磁盘。
在写入磁盘之前,线程将会指定数据写入与reduce相应的patitions中,最终传送给reduce.在每个partition中 ,后台线程将会在内存中进行Key的排序 ,( 如果代码中有combiner方法,则会在output时就进行sort排序 ,这里,如果只有少于3个写入磁盘的文件,combiner将会在outputfile前启动,如果只有一个或两个,那么将不会调用)
这里 将map输出的结果进行压缩会大大减少磁盘IO与网络传输的开销 (配置参数mapreduce.map .output.compress 设置为true,如果使用第三方压缩jar,可通过mapreduce.map.output.compress.codec进行设置)
随后这些paritions输出文件将会通过HTTP发送至reducers,传送的最大启动线程通过mapreduce.shuffle.max.threads进行配置。
2、The Reduce Side
首先上面每个节点的map都将结果写入了本地磁盘中,现在reduce需要将map的结果通过集群拉取过来,这里要注意的是, 需要等到所有map任务结束后,reduce才会对map的结果进行拷贝 ,由于reduce函数有少数几个复制线程,以至于它 可以同时拉取多个map的输出结果。默认的为5个线程 (可通过修改配置mapreduce.reduce.shuffle.parallelcopies来修改其个数)
这里有个问题,那么reducers怎么知道从哪些机器拉取数据呢?
当所有map的任务结束后, applicationMaster通过心跳机制(heartbeat mechanism),由它知道mapping的输出结果与机器host ,所以 reducer会定时的通过一个线程访问applicationmaster请求map的输出结果 。
Map的结果将会被拷贝到reduce task的JVM的内存中(内存大小可在mapreduce.reduce.shuffle.input.buffer.percent中设置)如果不够用,则会写入磁盘。当内存缓冲区的大小到达一定比例时(可通过mapreduce.reduce.shuffle.merge.percent设置)或map的输出结果文件过多时(可通过配置mapreduce.reduce.merge.inmen.threshold),将会除法合并(merged)随之写入磁盘。
这时要注意, 所有的map结果这时都是被压缩过的,需要先在内存中进行解压缩,以便后续合并它们 。(合并最终文件的数量可通过mapreduce.task.io.sort.factor进行配置) 最终reduce进行运算进行输出。
这里附带的整理了下Parquet存储结构与SequenceFile存储结构的特点
Parquet
Parquet是面向分析型业务的列式存储格式,由Twitter和Cloudera合作开发,2015年5月从Apache的孵化器里毕业成为Apache顶级项目,那么这里就总结下Parquet数据结构到底是什么样的呢?
一个Parquet文件是 由一个header以及一个或多个block块组成,以一个footer结尾。header中只包含一个4个字节的数字PAR1用来识别整个Parquet文件格式。文件中所有的metadata都存在于footer中 。footer中的metadata包含了格式的版本信息,schema信息、key-value paris以及所有block中的metadata信息。footer中最后两个字段为一个以4个字节长度的footer的metadata,以及同header中包含的一样的PAR1。
读取一个Parquet文件时,需要完全读取Footer的meatadata,Parquet格式文件不需要读取sync markers这样的标记分割查找,因为所有block的边界都存储于footer的metadata中(因为metadata的写入是在所有blocks块写入完成之后的,所以吸入操作包含的所有block的位置信息都是存在于内存直到文件close)
这里注意,不像sequence files以及Avro数据格式文件的header以及sync markers是用来分割blocks。Parquet格式文件不需要sync markers,因此block的边界存储与footer的meatada中。
在Parquet文件中,每一个block都具有一组Row group,她们是由一组Column chunk组成的列数据。继续往下,每一个column chunk中又包含了它具有的pages。每个page就包含了来自于相同列的值.Parquet同时使用更紧凑形式的编码,当写入Parquet文件时,它会自动基于column的类型适配一个合适的编码,比如,一个boolean形式的值将会被用于run-length encoding。
另一方面,Parquet文件对于每个page支持标准的压缩算法比如支持Snappy,gzip以及LZO压缩格式,也支持不压缩。
更多hadoop 内容请访问 https://www.hadoop.so/
相关推荐
Hadoop 用mapreduce实现Wordcount实例,绝对能用
Hadoop介绍,HDFS和MapReduce工作原理
upon the widely used and highly successful Hadoop MapReduce v1. The recipes that will help you analyze large and complex datasets with next generation Hadoop MapReduce will provide you with the skills...
用MapReduce实现TF-IDF,Hadoop版本是2.7.7,参考某教程亲自手写的,可以运行,有问题可以留言
本书对Hadoop Mapreduce进行详细讲解,切合实际应用,能够更深入地学习MapReduce,确实是一本不错的书。
在hadoop平台上,用mapreduce编程实现大数据的词频统计
Hadoop MapReduce Cookbook 高清完整版PDF下载 Hadoop MapReduce Cookbook
Java操作Hadoop Mapreduce基本实践源码.
一、实验目的 (1)熟悉Hadoop开发包 (2)编写MepReduce程序 (3)调试和运行MepReduce程序 (4)完成上课老师演示的内容 二、实验环境 Windows 10 VMware Workstation Pro虚拟机 Hadoop环境 Jdk1.8 二、实验内容 1...
结论: 本章介绍了 Hadoop MapReduce,同时发现它有以下缺点: ...2、有运行效率问题,MapReduce 需要将中间产生的数据保存到硬盘中,因此会有读写数据延迟问题。 3、不支持实时处理,它原始的设计就是以批处理为主。
本文分别对传统MapReduce和YARN的体系结构、工作流程及故障处理等三个方面进行讲解,让你深入理解MapReduce的工作原理
基于Hadoop MapReduce的电商网站商品数据分析.rar
Hadoop MapReduce v2 Cookbook (第二版), Packt Publishing
基于Hadoop MapReduce的短视频主播数据分析项目代码+数据集.rar
Ubuntu系统上Hadoop与MapReduce 运行实例
基于Hadoop MapReduce的电影点评网站数据分析项目代码+数据集.rar
基于Apriori算法的频繁项集Hadoop mapreduce
myeclipse +maven 搭建的hadoop mapreduce 例子项目,运行了单机wordcount
这本书都是实例,很接地气,多加练习和阅读,可稳步上升
深入 剖析 了 Hadoop ... 书中 不仅 详细 介绍 了 MapReduce 各个 组件 的 内部 实现 原理, 而且 结合 源 代码 进行 了 深入 的 剖析, 使 读者 可以 快速 全面 地 掌握 Hadoop MapReduce 设计 原理 和 实现 细节。