技术标签: # 大数据原理与应用 mapreduce hadoop 分布式 大数据
过去很长一段时间,CPU的性能都遵循“摩尔定律”:【当价格不变时,集成电路上可容纳的元器件的数目,约每隔18个月便会增加一倍,性能也将提升一倍】。从2005年开始摩尔定律逐渐失效,需要处理的数据量快速增加,人们开始借助于分布式并行编程来提高程序性能。
分布式并行程序运行在大规模计算机集群上,可以并行执行大规模数据处理任务,从而获得海量的计算能力。同时通过向集群中增加新的计算节点,就能很容易地实现集群计算能力的扩充。
谷歌公司最先提出了分布式并行编程模型 MapReduce,Hadoop MapReduce是它的开源实现 。谷歌的 MapReduce运行在分布式文件系统 GFS上,Hadoop MapReduce运行在分布式文件系统 HDFS上。相对而言,Hadoop MapReduce要比谷歌 MapReduce的使用门槛低很多,程序员即使没有任何分布式编程开发经验,也可以很轻松地开发出分布式程序部署到计算机集群上。
集群的架构 | 容错性 | 硬件价格及扩展性 | 编程和学习难度 | 适用场景 | |
---|---|---|---|---|---|
传统并行编程框架 | 通常采用共享式架构(共享内存、共享存储),底层通常采用统一的存储区域网络SAN | 容错性差,其中一个硬件发生故障容易导致整个集群不可工作 | 通常采用刀片服务器,高速网络以及共享存储区域网络 SAN,价格高,扩展性差 | 编程难度大,需要解决做什么和怎么做的问题,编程原理和多线程编程逻辑类似,需要借助互斥量、信号量、锁等机制,实现不同任务之间的同步和通信 | 适用于实时、细粒度计算,尤其适用于计算密集型的应用 |
MapReduce | 采用典型的非共享式架构 | 容错性好,在整个集群中每个节点都有自己的内存和存储,任何一个节点出现问题不会影响其他节点正常运行,同时系统中设计了冗余和容错机制 | 整个集群可以随意增加或减少相关的计算节点,普通PC机就可以实现,价格低廉,扩展性好 | 编程简单,只需要告诉系统要解决什么问题,系统自动实现分布式部署,屏蔽分布式同步、通信、负载均衡、失败恢复等底层细节 | 一般适用于非实时的批处理及数据密集型应用 |
MapReduce将复杂的、运行于大规模集群上的并行计算过程高度地抽象到了两个函数:Map和Reduce。
MapReduce采用“分而治之”策略,一个存储在分布式文件系统中的大规模数据集,会被切分成许多独立的分片(split),这些分片可以被多个 Map任务并行处理。MapReduce框架会为每个 Map任务输入一个数据子集,Map任务生成的结果会继续作为 Reduce任务的输入,最终由 Reduce任务输出最后结果,并写入分布式文件系统。
这里要特别强调一下,适合用 MapReduce来处理的数据集需要满足一个前提条件:待处理的数据集可以分解成许多个小的数据集,而且每一个小数据集都可以完全并行地进行处理。
MapReduce设计的一个理念就是“计算向数据靠拢”,而不是“数据向计算靠拢”,因为,移动数据需要大量的网络传输开销,在大规模数据环境下开销更为惊人。所以,移动计算要比移动数据更加经济。
MapReduce框架采用了Master/Slave架构,包括一个 Master和若干个Slave。Master上运行JobTracker, Slave上运行TaskTracker。
Map函数和 Reduce函数都是以<key, value>作为输入,按一定的映射规则转换成另一个或一批<key, value>进行输出。
函数 | 输入 | 输出 | 说明 |
---|---|---|---|
Map | <k1,v1> | List(<k2,v2>) | 将小数据集(split)进一步解析成一批<key,value>对,输入 Map函数中进行处理。每一个输入的<k1,v1>会输出一批<k2,v2>,<k2,v2>是计算的中间结果 |
Reduce | <k2,List(v2)> | <k3,v3> | 输入的中间结果<k2,List(v2)>中的 List(v2)表示是一批属于同一个 k2的 value |
MapReduce体系结构主要由四个部分组成,分别是: Client、JobTracker、TaskTracker以及 Task。
Client:
JobTracker:
TaskTracker:
Task:
MapReduce的核心思想是“分而治之”。一个大的 MapReduce作业,首先会被拆分成许多个 Map任务在多台机器上并行处理,每个 Map任务通常运行在数据存储的节点上。当 Map任务结束后,会生成以<key,value>形式表示的中间结果,这些中间结果会被分发到多个 Reduce任务在多台机器上并行执行,具有相同 key的<key,value>会被发送到同一个 Reduce任务那里。Reduce任务会被中间结果进行汇总计算得到最后结果,并输出到分布式文件系统中。
MapReduce算法的执行过程:
Shuffle:是指对 Map输出的结果进行分区、排序、合并、归并等处理并交给 Reduce的过程,分为 Map端的操作和 Reduce端的操作。
Map端的 Shuffle过程 | Reduce端的 Shuffle过程 |
---|---|
|
|
WordCount程序任务
WordCount | 说明 |
---|---|
输入 | 一个包含大量单词的文本文件 |
输出 | 文件中每个单词及其出现次数(频数),并按照单词 字母顺序排序,每个单词和其频数占一行,单词和频 数之间有间隔 |
一个WordCount执行过程的实例
Map过程示意图 | 用户没有定义Combiner时的Reduce过程示意图 | 用户有定义Combiner时的Reduce过程示意图 |
---|---|---|
MapReduce可以很好地应用于各种计算问题:
在 MapReduce环境下执行两个关系的连接操作的方法如下:
假设关系 R(A,B),S(B,C)都存储在一个文件中,为了连接这些关系,必须把来自每个关系的各个元组都和一个键关联,这个键就是属性 B的值。可以使用 Map过程把来自 R的每个元组<a,b>转换成一个键值对<b,<R,a>>,其中的键就是 b,值就是<R,a>。注意,这里把关系 R包含在值中,这样做可以使得我们在 Reduce阶段只把那些来自 R的元组和来自 S的元组进行匹配。
类似地,使用 Map过程把来自 S的每个元组<b,c>转换成一个键值对<b,<S,c>>,键是 b,值是<S,c>。Reduce进程的任务就是,把来自关系 R和 S的具有共同属性 B值的元组进行合并。这样,所有具有特定 B值的元组必须被发送到同一个 Reduce进程。
任务要求:用 MapReduce实现对输入文件中的单词做词频统计
实践一共分为四步:
1.编写 Map处理逻辑
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
2.编写 Reduce处理逻辑
public static class IntSumReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
3.编写 Main函数
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
编译打包代码请参考另一篇博客 简单的MapReduce实践
完整代码:
import java.io.IOException;
import java.util.StringTokenizer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
public class WordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
public static class IntSumReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
文章浏览阅读371次。给大家推荐一个廖雪峰老师讲解git的网站:https://www.liaoxuefeng.com/wiki/0013739516305929606dd18361248578c67b8067c8c017b000一、SVN与git的区别 SVN是“集成式”管理方式,所有的“版本控制器”都在中央服务器上,每个开发人员的的计算机都要连接到中央服务器上才能进行合作开发。开发人员一般只能在公司才能进行开发(因..._snv git
文章浏览阅读4.4k次,点赞3次,收藏4次。1 简介RabbitMQ是一个开源的免费的消息队列系统,一端往消息队列中不断写入消息,而另一端则可以读取或者订阅队列中的消息。它是用Erlang编写的,并实现了高级消息队列协议(AMQP)。2 安装部署2.1 安装EPEL存储库和Erlang安装RabbitMQ是用Erlang语言编写的,在本教程中我们将安装最新版本的Erlang到服务器中。 Erlang在默认的YUM存储库中不可用,因此...
文章浏览阅读3.2k次,点赞9次,收藏20次。最近实验室做的一些工作需要使用lattepanda(类似增强的树莓派)并且使用其无线网卡的ad-hoc模式,并且实现多台lattepanda之间实现网络通信。_测试ad-hoc无线网络的连通性
文章浏览阅读261次。oracle 建议关闭透明大页[root@ht01 ~]# cat /etc/grub.conf# grub.conf generated by anaconda## Note that you do not have to rerun grub after making changes to this file# NOTICE: You have a /boot partition. Thi..._ubuntu grub 关闭透明大页
文章浏览阅读1.3k次。deepin v20默认是装不了vmware15的,因为vm15 依赖 gcc6 而deepin v20 默认是gcc8,版本太高了,所以要先安装gcc6,但是默认的源里没有gcc6及以下版本,所以要从换源开始。第一步 换阿里云的源1:快捷键“Ctrl + Alt + T”打开命令输入框,输入sudo cp /etc/apt/sources.list /etc/apt/sources.list.bak备份当前配置回车后 需要你输入你的开机密码再输入sudo dedit /..._deepin安装vmware15
文章浏览阅读3.7k次,点赞2次,收藏10次。在上一篇的Html+Css+Js 3D旋转爱心 中间可插图片的代码基础上稍做修改获得两个大小不一3D旋转立方体,留作记忆,代码如下:<!DOCTYPE html><html xmlns="http://www.w3.org/1999/xhtml"><head> <meta http-equiv="Content-Type" conten..._cao96
文章浏览阅读1.2w次,点赞9次,收藏78次。Unity中,使“鼠标点击”穿透UI,触发物体事件_unity ui穿透
文章浏览阅读1k次。在一百年前,行列式不是线性代数的中心,但是数学的方向一直在变换!毕竟,仅仅一个数就能告诉我们许多矩阵的信息。对行列式的一种理解是:它对A−1,A−1bA^{-1},A^{-1}b的每一项给出了明确的方式,这个公式不会随着我们的计算方法而改变。事实上,我们可以将行列式看成n×nn\times n矩阵最有效的替代公式,这个公式说明了A−1A^{-1}如何依赖AA的n2n^2个元素,以及这些元素变化时这个_将visa换成avis需要多少次对换,是奇数吗
文章浏览阅读647次。部署FTP服务FTP(文本传输协议)是INTERNET上仍常用的最老的网络协议之一,它为系统提供了通过网络与远程服务器进行传输的简单方法。FTP服务器包的名称为vsftpd,它代表very secure file transferprotocol damon 服务器名称也叫做vsftpd.默认配置文件让匿名用户(anoymous用户)只能呢个下在位于chroot 目录中的内容。/var/f..._liunx系统ftp服务器架构设计
文章浏览阅读1k次。API以wx.开头,如未特殊约定,一般都以接受一个object作为参数。 其中wx.on开头的API是监听某个事件发生的API接口,接收一个callback函数。当事件出发时,会调用callback函数。API主要用于逻辑层的开发,实现原生应用具有的一些功能。列如利用网络API获取丰富的内容、通过媒体API实现多样化信息交流,等待。wx.request用于发一个HTTPS请求。一个微信小..._wx.request的最大超时时长是多少? 想设置三分钟
文章浏览阅读1.3k次。原理分析:这是由于从node.js 14版及以上版本中,require作为COMMONJS的一个命令已不再直接支持使用,所以我们需要导入createRequire命令才可以解决方案在你要require的代码前引入如下代码即可:import { createRequire } from 'module';const require = createRequire(import.meta.url);..._node_modules/crypto-js/crypto-js.js' is not defined, require args is '../no
文章浏览阅读2.9k次。转载自:数组统计分析给定数组A,大小为n,数组元素为1到n的数字,不过有的数字出现了多次,有的数字没有出现。请给出算法和程序,统计哪些数字没有出现,哪些数字出现了多少次。能够在O(n)的时间复杂度,O(1)的空间复杂度要求下完成么?分析这个题目,是有一定技巧的。技巧是需要慢慢积累,待经验多了之后,可以灵感或者直觉,就产生了技巧。如果不知道技巧,那该怎么办呢?