技术标签: MapReduce
MapReduce的官网文档地址:https://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core/MapReduceTutorial.html
Hadoop MapReduce 是一个软件框架,用于轻松编写应用程序,以可靠、容错的方式在大型商用硬件集群(数千个节点)上并行处理大量数据(多TB数据集)。
一个MapReduce job 通常将输入的数据集拆分成独立的快。Map任务以完全并行的方式处理这些块。框架对map的输出进行排序,进而作为输入提供给reduce任务。通常来说,job的输入和输出都保存在一个文件系统中。框架负责调度任务,监控任务并重新执行失败了的任务。
通常来说,计算节点和存储节点是相同的,也就是说,MapReduce框架和HDFS运行在相同的节点集上。这样的配置能够保证框架在已经存在数据的节点上有效的调度任务,进而在不同集群间获得一个非常高的总带宽。
MapReduce框架由一个单一的主Resourcemanager,每个集群节点上的一个从Nodemanager以及每个应用上一个MRAppMaster组成。
应用至少会指定输入/输出位置以及通过实现合适的接口和抽象类来提供map和reduce功能。这些,以及其他job参数,组成job配置(configuration)。
然后,Hadoop job客户端提交job(jar/可执行的文件等等)以及配置ResourceManger。ResoureManger负责给从节点分发软件/配置,调度和监督任务,反馈状态和诊断信息给job客户端。
MapReduce框架完全以<键,值>形式操作,也就是说,框架将输入给job的数据视为<键,值>对,并且产生一个<键,值>对集作为job的输出。
键和值类必须通过框架序列化,因此需要实现Writable接口。除此之外,key类必须实现WritableComparable接口以辅助框架的排序。
一个MapReducejob的输入输出类型如下所示:
(输入)<k1,v1> -> map -> <k2,v2> -> combine -> <k2,v2> -> reduce -> <k3,v3>(输出)
通过一个MapReduce应用程序示例来了解它们的工作原理。
WordCount 是一个简单的应用程序,它计算给定输入集中每个单词的出现次数。
这适用于本地独立、伪分布式或完全分布式 Hadoop 安装。
源代码:
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
假设环境变量的设置如下:
export JAVA_HOME=/usr/java/default
export PATH= J A V A H O M E / b i n : {JAVA_HOME}/bin: JAVAHOME/bin:{PATH}
export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jar
编译WordCount.java并创建一个Jar:
$ bin/hadoop com.sun.tools.javac.Main WordCount.java
$ jar cf wc.jar WordCount*.class
假如:
/user/joe/wordcount/input
- HDFS的输入目录/user/joe/wordcount/output
- HDFS的输出目录示例文本文件作为输入:
$ bin/hadoop fs -ls /user/joe/wordcount/input/
/user/joe/wordcount/input/file01
/user/joe/wordcount/input/file02$ bin/hadoop fs -cat /user/joe/wordcount/input/file01
Hello World Bye World$ bin/hadoop fs -cat /user/joe/wordcount/input/file02
Hello Hadoop Goodbye Hadoop
运行应用程序:
$ bin/hadoop jar wc.jar WordCount /user/joe/wordcount/input /user/joe/wordcount/output
输出:
$ bin/hadoop fs -cat /user/joe/wordcount/output/part-r-00000
Bye 1
Goodbye 1
Hadoop 2
Hello 2
World 2
应用程序可以通过使用选项 -files来在该任务当前工作目录下指定多个以逗号分开的路径列表。-libjars选项允许应用程序将jars添加到map和reduce的类路径中。选项-archives允许传递以逗号分开的备份作为参数。
运行带有-libjars, -files和-archives的wordcount例子:
bin/hadoop jar hadoop-mapreduce-examples-.jar wordcount -files cachefile.txt -libjars mylib.jar -archives myarchive.zip input output
其中,myarchive.zip 会解压放在“myarchive.zip”所在的路径下。
用户可以使用 # 给通过 -files 和 -archives 传递的文件和备份指定一个不同的名称。
如:
bin/hadoop jar hadoop-mapreduce-examples-.jar wordcount -files dir1/dict.txt#dict1,dir2/dict.txt#dict2 -archives mytar.tgz#tgzdir input output
其中,文件 dir1/dict.txt 和 dir2/dict.txt 可以分别被使用了#的dict1 和 dict2 的任务访问。备份文件mytar.tgz将被放置并取消归档到名为“tgzdir”的目录中。
应用程序可以通过使用选项 -Dmapreduce.map.dev、-DmapReduce.reduce.env 和 -Dyarn.app.mapreduce.am.env在命令行上分别指定映射器、reducer和应用程序主任务的环境变量。
例如如下为映射器和reducer设置环境变量 FOO_VAR=bar 和 LIST_VAR=a,b,c。
bin/hadoop jar hadoop-mapreduce-examples-<ver>.jar wordcount -Dmapreduce.map.env.FOO_VAR=bar -Dmapreduce.map.env.LIST_VAR=a,b,c -Dmapreduce.reduce.env.FOO_VAR=bar -Dmapreduce.reduce.env.LIST_VAR=a,b,c input output
WordCount应用是非常简单的。
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
Mapper通过map方法实现,在指定的textInputFormat的辅助下,一次处理一行。然后,通过StringTokenizer将行按空格拆分成字符,且输出类似<,1>的键值对。
对于给定的样例,第一个map的输出:
< Hello, 1>
< World, 1>
< Bye, 1>
< World, 1>
第二个map输出:
< Hello, 1>
< Hadoop, 1>
< Goodbye, 1>
< Hadoop, 1>
我们已经了解到了为一个给定job而衍生的很多map,在教程的后面部分将会介绍如何以一种细粒度的方式控制它们。
job.setCombinerClass(IntSumReducer.class);
WordCount 同样指定了一个合并器。因此,在基于key值排序后,每一个map的输出都会传递给本地的合并器以实现本地聚合。
第一个map的输出:
< Bye, 1>
< Hello, 1>
< World, 2>
第二个map输出:
< Goodbye, 1>
< Hadoop, 2>
< Hello, 1>
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
Reducer通过reduce方法实现,目的是对值进行求和,也就是计算每个键出现的次数(比如,在这个例子中就是单词)
因此,job的输出为:
< Bye, 1>
< Goodbye, 1>
< Hadoop, 2>
< Hello, 2>
< World, 2>
main 方法指定job的不同方面,比如输入/输出路径(通过命令行传递),键/值类型,输入/输出格式等等。
文章浏览阅读6.3k次。 2005-09-05我的一个关于文件的程序 - [C语言]#includevoid main(){char ch;FILE* fp;if((fp=fopen("test.txt","r"))==NULL){printf("error");exit(1);}fseek(fp,0L,2);while((fseek(fp,-1L,1))!=-1){ch=fgetc(fp);pu_fseek(fp,0l,2)
文章浏览阅读674次。SQL查询前10条的方法为:select top X * from table_name--查询前X条记录,可以改成需要的数字,比如前10条。select top X * from table_name order by colum_name desc--按colum_name属性降序排序查询前X条记录,“order by” 后紧跟要排序的属性列名,其中desc表示降序,asc表示升序(默认也..._oracle怎么用语句设置查询结果数量
文章浏览阅读58次。讨论成员:罗凯旋、罗林杰、吴伟锋、黎文衷讨论完善APP,调试功能。转载于:https://www.cnblogs.com/383237360q/p/5011594.html
文章浏览阅读5.4k次。首先看你 favicon.ico 图标文件引入路径是否正确然后 看ico文件能否正常打开,这两个没问题的话,在地址栏直接输入你的域名 http://xxx.com/favicon.ico 注意 此刻可能还是 之前的ico图标 不要着急 刷新一下 试试 完美解决 清除程序缓存_win 软件开发 ico图标多久更新
文章浏览阅读2.1k次。Oracle归档日志删除我们都都知道在controlfile中记录着每一个archivelog的相关信息,当然们在OS下把这些物理文件delete掉后,在我们的controlfile中仍然记录着这些archivelog的信息,在oracle的OEM管理器中有可视化的日志展现出,当我们手工清除 archive目录下的文件后,这些记录并没有被我们从controlfile中清除掉,也就是or_rman 说明与资料档案库中在任何归档日志都不匹配
文章浏览阅读706次。命令提示符:[ root@localhost桌面] #[用户名@主机名 当前所在位置] #(超级用户) KaTeX parse error: Expected 'EOF', got '#' at position 25: …用户: #̲ su 用户名 //切… su密码:[ root@cml桌面] #临时提升为root权限:# sudo 命令..._命令提示符文件开头
文章浏览阅读152次。0x01 基本项目结构使用Android Studio创建的Android项目会划分成三个层级:project : settings.gradle定义了构建应用时包含了哪些模块;build.gradle定义了适用于项目中所有模块的构建配置module : 可以是一个app类型的module,对应生成apk应用;也可以是一个lib类型的module,对应生成aar包. 每个module中包含的bui..._android多个应用 gradle 怎么打包指定的应用
文章浏览阅读599次,点赞12次,收藏11次。前言:通常我们排序都需要创建一个函数实现排序,但当我们排完整型数组时,想要排字符串呢?那需要重新创建一个函数,完善它的功能,进而实现排字符串,这样非常繁琐,但是有一个函数可以帮我们实现传什么,排什么;qsort的传参:(1️⃣,2️⃣,3️⃣,4️⃣) (首元素地址,排序的元素个数,每个元素的大小,指向比较两个元素的函数的指针)1️⃣2️⃣3️⃣4️⃣的传参方法,下面介绍:…整型数组:......_qsort反向排序
文章浏览阅读355次。MVC绕过登陆界面验证时HttpContext.Current.User.Identity.Name取值为空问题解决方法_mvc 不验证登陆
文章浏览阅读7.6k次,点赞2次,收藏8次。1.分层领域模型规约: • DO( Data Object):与数据库表结构一一对应,通过DAO层向上传输数据源对象。 • DTO( Data Transfer Object):数据传输对象,Service或Manager向外传输的对象。 • BO( Business Object):业务对象。 由Service层输出的封装业务逻辑的对象。 • AO( Ap..._dto命名规范
文章浏览阅读91次。A reversible prime in any number system is a prime whose "reverse" in that number system is also a prime. For example in the decimal system 73 is a reversible prime because its reverse 37 is also a pr..._pat甲级1015
文章浏览阅读1.5k次。ABAP接口之Http发送json报文abap 调用http 发送 json 测试函数SE11创建结构:zsmlscpnoticeSE37创建函数:zqb_test_http_fuc1FUNCTIONzqb_test_http_fuc1.*"----------------------------------------------------------------..._abap http 转换为json输出