hadoop之MapReduce详解【转】-程序员宅基地

技术标签: Shuffle  Mapreduce优化  hadoop  MapReduce  

  转自:https://blog.csdn.net/weixin_44591209/article/details/88049264

MapReduce源于Google一篇论文,它充分借鉴了“分而治之”的思想,将一个数据处理过程拆分为主要的Map(映射)与Reduce(归约)两步。简单地说,MapReduce就是"任务的分解与结果的汇总"。

       MapReduce (MR) 是一个基于磁盘运算的框架,贼慢,慢的主要原因:1)MR是进程级别的,一个MR任务会创建多个进程(map task和reduce task都是进程),进程的创建和销毁等过程需要耗很多的时间。 2)磁盘I/O问题,  MapReduce作业通常都是数据密集型作业,大量的中间结果需要写到磁盘上并通过网络进行传输,这耗去了大量的时间。 

       注:mapreduce 1.x架构有两个进程:JobTracker :负责资源管理和作业调度。TaskTrachker:任务的执行者。运行 map task 和 reduce task。在2.x的时候由yarn取代他们的工作了。

     

MapReduce工作流程

input.txt—>InputFormat—>Map阶段—>shuffle阶段(横跨Mapper和Reducer,在Mapper输出数据之前和Reducer接收数据之后都有进行)—>Reduce阶段 —>OutputFormat —>HDFS:output.txt

InputFormat接口:将我们的输入数据进行分片(split),输入分片存储的并非数据本身,而是一个分片长度和一个记录数据的位置的数组。输入分片的大小一般和hdfs的blocksize相同(128M),可以改,但最好不要。

Map阶段: Map会读取输入分片数据,一个输入分片(input split)针对一个map任务,进行map逻辑处理(用户自定义)

Reduce阶段:对已排序输出中的每个键调用reduce函数。此阶段的输出直接写到输出文件系统,一般为hdfs。

 

MapReduce Shffle详解(2.x)

 

          为了确保每个reducer的的输入都是按键排序的,系统执行排序的过程,即将map task的输出通过一定规则传给reduce task,这个过程成为shuffle。

          Shuffle阶段一部分是在map task 中进行的, 这里成为Map shuffle , 还有一部分是在reduce task 中进行的, 这里称为Reduce shffle。

Map Shuffle 阶段

          Map在做输出时候会在内存里开启一个环形缓冲区,默认大小是100M(参数:mapreduce.task.io.sort.mb),Map中的outputCollect会把输出的所有kv对收集起来,存到这个环形缓冲区中。

          环形缓冲区:本质上是一个首尾相连的数组,这个数组会被一分为二,一边用来写索引,一边用来写数据。一旦这个环形缓冲区中的内容达到阈值(默认是0.8,参数:mapreduce.map.sort.spill.percent),一个后台线程就会把内容溢写(spill)到磁盘上,在这过程中,map输出并不会停止往缓冲区写入数据(反向写,到达阈值后,再反向,以此类推),但如果在此期间缓冲区被写满,map会被阻塞直到写磁盘过程完成。溢写过程按照轮询方式将缓冲区的内容写到mapred.local.dir指定的作业特定子目录中的目录中,map任务结束后删除。

           先介绍两个概念:

           Combiner: 本地的reducer,运行combiner使得map输出结果更紧凑,可以减少写到磁盘的数据和传递给reducer的数据。可通过编程自定义(没有定义默认没有)。适用场景:求和、次数等 (做 ‘’+‘’ 法的场景) 【如平均数等场景不适合用】。

           Partitioner:分区,按照一定规则,把数据分成不同的区,Partitioner决定map task输出的数据交由哪个reduce task处理,一个partition对应一个reduce task。 分区规则可通过编程自定义,默认是按照key的hashcode进行分区。

           在数据溢写到磁盘之前,线程首先根据partitioner将数据划成相应的分区,然后在每个分区中按键进行区内排序。如果设置了combiner,它就在排序后的输出上运行。所以每次溢写到磁盘上的数据应该是 分区且区内有序的。

            每次溢写会生成一个溢写文件(spill file),因此在map任务写完其最后一个输出记录之后,会有多个溢写文件。在Map 任务完成前,所有的spill file将会进行归并排序为一个分区且有序的文件。这是一个多路归并过程,最大归并路数由默认是10(参数:mapreduce.task.io.sort.factor)。如果有定义combiner,且至少存在3个(参数:mapreduce.map.combine.minspills )溢出文件时,则combiner就会在输出文件写到磁盘之前再次运行。

            在将压缩map输出写到磁盘的过程中对它进行压缩加快写磁盘的速度、更加节约时间、减少传给reducer的数据量。将mapreduce.output.fileoutputformat.compress设置为true(默认为false),就可以启用这个功能。使用的压缩库由参数mapreduce.output.fileoutputformat.compress.codec指定。

            当spill 文件归并完毕后,Map 将删除所有的临时spill 文件,通知appmaster, map task已经完成。

            

Reduce Shuffle 阶段          

       Reducer是通过HTTP的方式得到输出文件的分区。使用netty进行数据传输,默认情况下netty的工作线程数是处理器数的2倍。一个reduce task 对应一个分区。

       在reduce端获取所有的map输出之前,Reduce端的线程会周期性的询问appmaster 关于map的输出。appmaster是知道map的输出和host之间的关系。在reduce端获取所有的map输出之前,Reduce端的线程会周期性的询问master 关于map的输出。Reduce并不会在获取到map输出之后就立即删除hosts,因为reduce有可能运行失败。相反,是等待appmaster的删除消息来决定删除host。

       当map任务的完成数占总map任务的0.05(参数:mapreduce.job.reduce.slowstart.completedmaps),reduce任务就开始复制它的输出,复制阶段把Map输出复制到Reducer的内存或磁盘。复制线程的数量由mapreduce.reduce.shuffle.parallelcopies参数来决定,默认是 5。

       如果map输出相当小,会被复制到reduce任务JVM的内存(缓冲区大小由mapreduce.reduce.shuffle.input.buffer.percent属性控制,指定用于此用途的堆空间的百分比,默认为0.7),如果缓冲区空间不足,map输出会被复制到磁盘。一旦内存缓冲区达到阈值(参数:mapreduce.reduce.shuffle.merge.percent,默认0.66)或达到map的输出阈值(参数:mapreduce.reduce.merge.inmem.threshold,默认1000)则合并后溢写到磁盘中。如果指定combiner,则在合并期间运行它已降低写入磁盘的数据量。随着磁盘上副本的增多,后台线程会将它们合并为更大的,排序好的文件。注:为了合并,压缩的map输出都必须在内存中解压缩。

        复制完所有的map输出后,reduce任务进入归并排序阶段,这个阶段将合并map的输出,维持其顺序排序。这是循环进行的。目标是合并最小数据量的文件以便最后一趟刚好满足合并系数(参数:mapreduce.task.io.sort.factor,默认10)。

        因此,如果有40个文件(包括磁盘和内存),不会在四趟中每趟合并10个文件而得到4个文件,再将4个文件合并到reduce。而是第一趟只合并4个文件,随后的三塘合并10个文件。最后一趟中,4个已经合并的文件和剩余的6个文件合计十个文件直接合并到reduce。

       这并没有改变合并的次数,它只是一个优化措施,尽量减少写到磁盘的数据量。因为最后一趟总是直接合并到reduce,没有磁盘往返。

       至此,Shuffle阶段结束。

Shuffle总结

    1)map task收集map()方法输出的kv对,放到内存环形缓冲区中

    2)从内存环形缓冲区不断将文件经过分区、排序、combine(可选)溢写(spill)到本地磁盘

    3)多个溢出文件会归并排序成大的spill file

    4)reduce task根据自己的分区号,去各个map task机器上取相应的结果分区数据

    5)reduce task会取到同一个分区的来自不同maptask的结果文件,reduce task会将这些文件再进行归并排序

    6)合并成大文件后,shuffle过程结束

 

MapReduce 调优

输入阶段:

    处理小文件问题:

Map阶段:

    1)减少溢写(spill)次数。

    2)减少合并(merge)次数。

    3)不影响业务逻辑前提下,设置combine。

    4)启用压缩。

Reduce阶段:

    1)合理设置map和reduce数。

    2)合理设置map、reduce共存。

    3)规避使用reduce:因为reduce在用于连接数据集的时候将会产生大量的网络消耗。

    4)合理设置reduce端的buffer:默认情况下,数据达到一个阈值的时候,buffer中的数据就会写入磁盘,然后reduce会从磁盘中获得所有的数据。也就是说,buffer和reduce是没有直接关联的,中间多个一个写磁盘->读磁盘的过程,既然有这个弊端,那么就可以通过参数来配置,使得buffer中的一部分数据可以直接输送到reduce,从而减少IO开销。

配置参数

参数说明

mapreduce.task.io.sort.mb

shuffle的环形缓冲区大小,默认100M。如果能估算map输出大小,就可以合理设置该值来尽可能减少溢出写的次数,这对调优很有帮助。

mapreduce.map.sort.spill.percent

环形缓冲区溢出的阈值,默认0.8。

mapreduce.task.io.sort.factor

归并因子,默认为10。 一般调高,增大merge的文件数目,减少merge的次数,从而缩短mr处理时间。将此值增加到100是很常见的。

mapreduce.map.combine.minspills 默认为3。运行combiner所需的最少溢出写文件数(如果已指定combiner)

mapreduce.output.fileoutputformat.compress

map输出是否压缩,默认为false。如果map输出的数据量非常大,那么在写入磁盘时压缩数据往往是个很好的主意,因为这样会让写磁盘的速度更快,节约磁盘空间,并且减少传给reducer的数据量。

mapreduce.output.fileoutputformat.compress.codec

用于map输出的压缩编解码器,默认为org.apache.Hadoop.io.compress.DefaultCodec。推荐设置为LZO压缩。
mapreduce.job.reduce.slowstart.completedmaps 调用reduce之前,map必须完成的最少比例,默认为0.05。
mapreduce.reduce.shuffle.parallelcopies reducer在复制阶段复制线程的数量,默认为5。
mapreduce.reduce.shuffle.input.buffer.percent 在shuffle的复制阶段,分配给map输出的缓冲区占JVM堆空间的百分比,默认为0.7。
mapreduce.reduce.shuffle.merge.percent reduce输入缓冲区溢写的阈值。默认是0.66。

mapreduce.reduce.merge.inmem.threshold

reduce输入缓存区的文件数阈值。默认是1000。0或者小于0意味着此参数不生效。

mapreduce.map.maxattempts

每个Map Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。

mapreduce.reduce.maxattempts

每个Reduce Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。

mapreduce.task.timeout

Task超时时间,经常需要设置的一个参数,该参数表达的意思为:如果一个task在一定时间内没有任何进入,即不会读取新的数据,也没有输出数据,则认为该task处于block状态,可能是卡住了,也许永远会卡主,为了防止因为用户程序永远block住不退出,则强制设置了一个该超时时间(单位毫秒),默认是600000。如果你的程序对每条输入数据的处理时间过长(比如会访问数据库,通过网络拉取数据等),建议将该参数调大,该参数过小常出现的错误提示是“AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.”。

MapReduce 常用命令

[hadoop@hadoop002 bin]$ mapred --help

Usage: mapred [--config confdir] COMMAND

where COMMAND is one of:

pipes run a Pipes job

job manipulate MapReduce jobs

queue get information regarding JobQueues

classpath prints the class path needed for running

mapreduce subcommands

historyserver run job history servers as a standalone daemon

distcp <srcurl> <desturl> copy file or directories recursively

archive -archiveName NAME -p <parent path> <src>* <dest> create a hadoop archive

archive-logs combine aggregated logs into hadoop archives

hsadmin job history server admin interface

mapred job -list  : 查看当前运行的job

mapred job -kill jobId  : 杀掉某个job

mapred job -kill-task  taskid : 杀掉某个task

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/qq_32641659/article/details/88249049

智能推荐

深度学习与智能故障诊断学习笔记(三)——RNN与LSTM推导详解_深度学习 故障诊断-程序员宅基地

文章浏览阅读3.2k次,点赞7次,收藏38次。1.RNN1.1网络结构标准神经网络的输入输出在不同例子中可能有不同的长度,在学习中并不共享从不同位置上学到的特征。因为标准神经网络的训练集是稳定的,即所有的特征域表达的内容是同一性质的,一旦交换位置,就需要重新学习。故障诊断和健康管理属于带有时间序列的任务场景,在进行学习时参数量巨大,标准神经网络无法体现出时序上的前因后果,所以引入循环神经网络。如图所示为RNN循环神经网络的单元。其中为当前输入,为前一个状态,b为偏置项,tanh为激活函数,用于学习非线性部分。当前输入和前一个状态分别乘以_深度学习 故障诊断

026求总数问题_计算题目总数的代码-程序员宅基地

文章浏览阅读771次。026求总数问题1.题目2.代码3.个人见解4.输出结果截图1.题目集邮爱好者把所有的邮票存放在三个集邮册中,在A册内存放全部的十分之二,在B册内存放不知是全部的七分之几,在C册内存放303张邮票,问这位集邮爱好者集邮总数是多少?以及每册中各有多少邮票?2.代码#include<stdio.h>int main(void){ int a, b, c, x, su..._计算题目总数的代码

webGL创建旋转动画三角形_webgl 三角形 动画-程序员宅基地

文章浏览阅读237次。test.html<!doctype html><html lang="en"> <head> <meta charset="utf-8"> <title>WebGL Demo</title> <link rel="stylesheet" href="./webgl.css" type="text/css"> <script src="https://cdnjs.cloudfla_webgl 三角形 动画

python查询数据库语句大全_sql:查询语句大全-程序员宅基地

文章浏览阅读1.6k次。一、mysql查看数据库:SHOW DATABASES;创建数据库:CREATE DATABASE db_name;使用数据库:USE db_name;删除数据库:DROP DATABASE db_name;创建表:CREATE TABLE table_name(id TINYINT UNSIGNED NOT NULL AUTO_INCREMENT, --id值,无符号、非空、递增——唯一性,可做..._python数据库查询语句

10分钟学会python写游戏脚本!Python其实很简单_手游刷初始号的脚本怎么写-程序员宅基地

文章浏览阅读3.5k次,点赞3次,收藏51次。最近在玩儿公主连结,之前也玩儿过阴阳师这样的游戏,这样的游戏都会有个初始号这样的东西,或者说是可以肝的东西。当然,作为一名程序员,肝这种东西完全可以用写代码的方式帮我们自动完成。游戏脚本其实并不高深,最简单的体验方法就是下载一个Airtest了,直接截几个图片,写几层代码,就可以按照自己的逻辑玩儿游戏了。当然,本篇文章不是要讲Airtest这个怎么用,而是用原始的python+opencv来实现上面的操作。_手游刷初始号的脚本怎么写

CAD绿色版教程-程序员宅基地

文章浏览阅读10次。CAD绿色版下载地址百度网盘CAD完全卸载

随便推点

共存Python2.7和python3.4_v2库支持的python版本为2.7和3.4及以上版-程序员宅基地

文章浏览阅读415次。多版本Python共存[支持使用pip安装包] 有时特殊需要会要用到高版本的Python, 但是系统自带的版本又是很多其他工具依赖的, 不能随意更新。 所以就会考虑安装另一个版本的python环境, 然后需要用到这个版本时就在脚本头部指明 #/usr/bin/env python2.7 这样。## 下载要安装的 Python版本源码包(我用的是 2.7 版本)wget ‘http:_v2库支持的python版本为2.7和3.4及以上版

Linux 系统下 Python 的安装 详细步骤_python linus如何安装-程序员宅基地

文章浏览阅读1.4k次。Linux 系统下 Python 的安装一.安装环境准备centos7系统Python 安装包:官网地址:https://www.python.org/downloads/(速度不是一般的慢)镜像网址:http://npm.taobao.org/mirrors/python/ (速度较快)二.安装步骤1.将安装包从本地上传到远程服务器(可选用SSH工具)2.解压缩 (文件名可直接输入首字母按tab键)解压完成如图所示3.进入文件夹,输入 ./configure --prefix_python linus如何安装

代码点与代码单元和Unicode相关的UTF_代码的ut ui-程序员宅基地

文章浏览阅读1k次。java字符串由char序列组成,char数据类型是一个采用UTF-16编码表示Unicode代码点的代码单元,大多数的常用Unicode字符使用一个代码单元就可以表示,而辅助字符需要一对代码单元来表示,length方法返回的是采用UTF-16编码表示的给定字符串所需要的代码单元的数量,要想得到真实的长度即代码点的数量可以调用xxx.codePointCount(0,xxx.length())方法_代码的ut ui

JDK1.6在LINUX下的安装配置-程序员宅基地

文章浏览阅读63次。JDK1.6在LINUX下的安装是如何进行的呢,让我们开始我们的演示:Ubuntu Linux下jdk的安装与配置1.JDK1.6安装准备从sun公司网站www.sun.com下载linux版本的jdk,我下载的版本是JDK 6 Update 7,地址http://java.sun.com/javase/downloads/index.jspjdk-6u7-linux-i586..._jdk1.6在linux下是不是解开就能用

幅频响应 matlab画法,MATLAB环境下频率响应曲线的绘制方法.pdf-程序员宅基地

文章浏览阅读2.6k次。MATLAB环境下频率响应曲线的绘制方法黄 伟, 聂 东 , 陈英俊(广东肇庆学院电子信息工程系,肇庆526061)频域分析提供了一种简单有效的途径。关t词:MATLAB;Bode图;Nyquist图;Nichoh图前言 表示方法即为系统的频域分析方法。在频域分析M艄露AB作为一种高性能软件和编程语言,Nich..._肇庆学院 陈英俊

凸二次规划问题 库函数_c语言有二次规划函数吗-程序员宅基地

文章浏览阅读959次。#encoding: utf-8'''python 解决 凸二次规划问题 首先转换成标准型 1/2 * X^T * P *X + q ^ T * X限制条件 G * X <= h A * X = bx1 = (3,3), x2 = (4, 3) x3 = (1,1), 其中 x1 x2是正例, 求最大间隔分离 超平面'''import numpyf..._c语言有二次规划函数吗

推荐文章

热门文章

相关标签