Hadoop之MapReduce编程_hadoop搭建完后mapreduce怎么写-程序员宅基地

技术标签: mapreduce  Hadoop  hadoop  Intellij打包jar  大数据  

MapReduce编程基础

0. MR与Java的数据类型对比

MR Java
boolean BooleanWritable
byte ByteWritable
int IntWritable
float FloatWritable
long LongWritable
double DoubleWritable
String Text
map MapWritable
array ArrayWritable
//hadoop数据类型所在java包
import org.apache.hadoop.io.IntWritable
import org.apache.hadoop.io.Text
...

1. MapReduce代码的组成

  • Mapper
  • Reducer
  • Driver
    MR代码需要由上述三个部分组成,每个部分单独成一个.java文件。其中,Mapper文件处理的是map阶段的逻辑;reducer文件处理的是reduce阶段的逻辑;Driver文件内含整个MR的main方法,用于设置参数、调度map和reduce等。
1.1 Mapper
  1. 用户自定义的Mapper需要继承自己的父类
  2. Mapper的输入、输出数据均是key-value形式的,且数据类型可以自定义
  3. Mapper的业务逻辑写在map()方法中
  4. map()方法处理过程中,一次读取一行数据,同一行数据只会被一个map()调用一次
1.2 Reducer
  1. 用户自定义的Reducer需要继承自己的父类
  2. Reducer的输入对应的是Mapper的输出
  3. Reducer的业务逻辑写在reduce()方法中
  4. reduce()方法处理过程中,每次读取Mapper()的一个输出key-value,同一个key-value只会被一个reduce()调用一次
1.3 Driver
  1. 设置MapReduce运行时的各种参数
  2. 指定运行时所需jar包路径
  3. 指定Mapper输出数据类型
  4. 指定最终结果输出数据类型
  5. 指定输入数据所在目录
  6. 指定输出数据写入目录
  7. 将代码提交到yarn集群执行
    dirver相当于连接hadoop集群的客户端,用于设置MapReduce运行时的各种参数,并将编写好的代码提交到yarn上执行。

2. WordCount代码 - - 以Intellij IDEA为例

//处理的数据格式
hadoop,spark,java
kudu,hadoop,hbase
zookeeper,flink,sparkstreaming
hive,flink
hadoop
2.0 环境配置
  • 添加依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>org.example</groupId>
    <artifactId>hadoop-demo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <repositories>
        <repository>
            <id>ali-maven</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public</url>
        </repository>
    </repositories>

	<!--
		hadoop依赖的版本需要与集群中的hadoop版本一致
	-->
    <dependencies>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>2.8.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>3.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>3.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>3.2.1</version>
        </dependency>
    </dependencies>
</project>
  • 日志设置
    在项目的resources文件夹下创建log.properties文件,文件内容为如下
log4j.rootLogger=INFO, stdout, D

# Console Appender
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern= %d{
    hh:mm:ss,SSS} [%t] %-5p %c %x - %m%n

# Custom tweaks
log4j.logger.com.codahale.metrics=WARN
log4j.logger.com.ryantenney=WARN
log4j.logger.com.zaxxer=WARN
log4j.logger.org.apache=WARN
log4j.logger.org.hibernate=WARN
log4j.logger.org.hibernate.engine.internal=WARN
log4j.logger.org.hibernate.validator=WARN
log4j.logger.org.springframework=WARN
log4j.logger.org.springframework.web=WARN
log4j.logger.org.springframework.security=WARN

# log file
log4j.appender.D = org.apache.log4j.DailyRollingFileAppender
log4j.appender.D.File = ..//log.log
log4j.appender.D.Append = true
log4j.appender.D.Threshold = DEBUG 
log4j.appender.D.layout = org.apache.log4j.PatternLayout
log4j.appender.D.layout.ConversionPattern = %-d{
    yyyy-MM-dd HH:mm:ss}  [ %t:%r ] - [ %p ]  %m%n
2.1 Mapper部分 - - WordCountMapper.java
package com.csd;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;

public class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable>{
    
/*
   继承Mapper父类,这里的四个参数<LongWritable,Text,Text,IntWritable>分别是:
   1.LongWritable 是读取的文件的偏移量(暂时照着写就行,还没接触更改此参数的意义)
   2.Text   map阶段需要从hdfs读取的数据的数据类型,因为现在处理的是字符串,所以指定为Text
   3.Text map阶段输出的key-value中的key的数据类型,本次map的输出格式为("hadoop",1)、("spark",1)....,因此指定key类型为Text
   4.IntWritable map阶段输出的key-value中的value的数据类型,本次map的输出的value为1,因此指定value类型为IntWritable
   */

// 下面两个变量为map输出的key和value,为了避免重复实例化,因此将其提到公共变量区域
//修改hadoop数据类型统一用其set()的方法 write_key.set("hadoop")
   private static Text write_key = new Text();
   private static IntWritable write_value = new IntWritable();

   //重写map方法,此处写的是map的处理逻辑
   //LongWritable offset, Text value即为Mapper<>的前两个变量
   @Override
   protected void map(LongWritable offset, Text value, Context context)
           throws IOException, InterruptedException {
    
        
        //将hadoop的Text转换为java的String,value是输入文件的一行数据,值如value=hadoop,spark,java
       String line = value.toString();
       //切分每一行数据
       String[] words = line.split(",");
       //将一行数据的多个单词拆分成多个(key,value)
       for(String word:words){
    
           write_key.set(word);
           write_value.set(1);
           //将(key,value)写入文件,供之后的reducer读取
           context.write(write_key,write_value);
       }
   }
}
2.2 Reducer部分 - - WordCountReducer.java
package com.csd;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

public class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
    
   private static IntWritable tmp_val = new IntWritable();
/*
   继承自Reducer,Reducer<Text,IntWritable,Text,IntWritable>的变量分别是:
   1.reduce端输入,即map端输出的key-value对中的key的数据类型
   2.reduce端输入,map端输出的key-value对中的value的数据类型
   3.reduce端输出的key-value中key的数据类型
   4.reduce端输出的key-value中value的数据类型
*/
   //重写reduce方法,此处编写reduce处理逻辑
   //Text key, Iterable<IntWritable> values 分别是:
   //map端输出的key的数据类型
   //Iterable<IntWritable>中Iterable内部的数据类型是map端输出的value的数据类型。reducer接收到的数据不是原封不动的map端的输出,而是经过加工的,一是key值会按照字典序排序;二是相同key的value值会被放到同一个迭代器Iterable中,即reducer接收到的数据其实类是与("hadoop",[1,1,1,1])、(“spark",[1,1,1])...
   @Override
   protected void reduce(Text key, Iterable<IntWritable> values, Context context)
           throws IOException, InterruptedException {
    
       int sum = 0;
       //遍历迭代器,计数每个key出现的次数
       for(IntWritable val:values){
    
           sum += val.get();
       }
       tmp_val.set(sum);
       //将计数结果写入带最终的输出文件
       context.write(key,tmp_val);
   }
}
2.2 Driver部分 - - WordCountDriver.java

执行MR程序有多种方式

  • 方法一:在开发环境下,直接通过IDEA执行(与hadoop集群环境无任何关系)
  • 方法二:在开发环境下,直接通过IDEA执行,但输入输出使用的是hdfs(与hadoop集群环境有部分关系)
  • 方法三:将java代码打包成jar,在集群环境下使用hadoop jar xxx,将MR放在yarn上执行
//方法一
package com.csd;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.BasicConfigurator;
import java.io.IOException;

public class WordCountDriver{
    
   //路径为本地路径
   private static String input_path = "/home/dong/Desktop/a/word_count.txt";
   private static String output_path = "/home/dong/Desktop/a/b";
   public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    
       //BasicConfigurator用于日志输出
   	BasicConfigurator.configure();
       Configuration conf = new Configuration();
       Job job = Job.getInstance(conf);
       job.setJarByClass(WordCountDriver.class);
       job.setMapperClass(WordCountMapper.class);
       job.setReducerClass(WordCountReducer.class);

       job.setMapOutputKeyClass(Text.class);
       job.setMapOutputValueClass(IntWritable.class);
       job.setOutputKeyClass(Text.class);
       job.setOutputValueClass(IntWritable.class);

       FileInputFormat.setInputPaths(job,
               new Path(input_path));
       FileOutputFormat.setOutputPath(job,new Path(output_path));
       boolean result = job.waitForCompletion(true);
       System.exit(result?0:1);
   }
}
//方法二
package com.csd;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.BasicConfigurator;

import java.io.IOException;

public class WordCountDriver{
    
	//路径为hdfs路径
    private static String HDFS_PATH = "hdfs://dong:9000";
    private static String input_path = "hdfs:///data/test";//"/home/dong/Desktop/a/word_count.txt";
    private static String output_path = "hdfs:///data/result";//"/home/dong/Desktop/a/b";
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
    
    
        BasicConfigurator.configure();
        Configuration conf = new Configuration();
        //设置为hdfs模式
        conf.set("fs.defaultFS", HDFS_PATH);

        Job job = Job.getInstance(conf);
        job.setJarByClass(WordCountDriver.class);
        job.setMapperClass(WordCountMapper.class);
        job.setReducerClass(WordCountReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        FileInputFormat.setInputPaths(job,
                new Path(input_path));
        FileOutputFormat.setOutputPath(job,new Path(output_path));
        boolean result = job.waitForCompletion(true);
        System.exit(result?0:1);
    }
}

方法三:将方法二的代码打包成jar包
点击右侧Maven --> Lifecycle --> 点击package即可完成自动打包
成功后会有信息,显示build success并给出打包完的jar路径
在这里插入图片描述

//jar包所在路径
[INFO] Building jar: /home/dong/code/java/hadoop/target/hadoop-demo-1.0-SNAPSHOT.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------

执行方式
将上述打包好的jar拷贝至集群,并通过shell方式提交到yarn上运行

//xxxx.jar即为打包好的jar路径   
//com.csd.WordCountDriver 是指定main方法所在的文件
hadoop jar xxxx.jar com.csd.WordCountDriver
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/donger__chen/article/details/111060313

智能推荐

L2TP/IPSec一键安装脚本-程序员宅基地

文章浏览阅读101次。本脚本适用环境:系统支持:CentOS6+,Debian7+,Ubuntu12+内存要求:≥128M更新日期:2017 年 05 月 28 日关于本脚本:名词解释如下L2TP(Layer 2 Tunneling Protocol)IPSec(Internet Protocol Security)IKEv2 (Internet Key Exchange v2)能实现 IPsec 的目前总体上有 ..._l2tp/ipsec一键安装脚本

一周总结 09.11.13-程序员宅基地

文章浏览阅读32次。用.NET环境开发机房收费系统这是一个中期任务,应用这大半年来学到的知识,用.NET做东西,说来容易,做来难呀。设计模式,一遍不够,两边太浅,一遍一遍慢慢体会;uml建模,多年开发经验的软件开发人员都不很熟练的操作rose的相关建模工具来进行软件建模,这说明什么呢?功成名就不是一蹴而就的,别急,别急,水到渠成,船到桥头自然直。总结这周,除了上课,就是来机房学习,还有...

转贴:win2008改造成准VISTA-程序员宅基地

文章浏览阅读123次。安装WINDOWS SERVER 2008前请确认你已经做好了以下准备:1.你的硬件必须满足下列要求处理器:最小: 1GHz 建议: 2GHz 最佳: 3GHz 或者更快速的内存: 最小: 512MB RAM建议: 1GB RAM最佳: 2GB RAM (完整安装) 或者 1GB RAM (Server Core 安装) 或者最大 (32位系统 ): 4GB (标准版) 或..._net framework 3.0支持开vista玻璃效果吗

【正一专栏】巴萨四大皆空怎么办_巴萨14年四大皆空-程序员宅基地

文章浏览阅读2.5k次。巴萨四大皆空怎么办每年的四月对于双线作战的球队来说都是决定命运的时刻,巴萨神奇逆转大巴黎后不得不在欧冠面临着尤文图斯的挑战,而在国内联赛又要紧追领头羊皇马。一个月要踢9场比赛,基本上都是一周双赛。巴萨上周刚刚在联赛中输球,错失了反超皇马的机会,而今天凌晨的欧冠比赛,巴萨0:3惨败给尤文图斯,欧冠晋级之路前景黯淡、又只能期待奇迹。没了联赛和欧冠,巴萨就会四大皆空,恩里克只能黯然走_巴萨14年四大皆空

苹果系统使用linux内核,iOS操作系统是不是基于Linux呢?-程序员宅基地

文章浏览阅读5.2k次。iOS实际上是Darwin的ARM变体,源自BSD,类UNIX内核,以及Apple自己的Mach内核扩展系统。这与是完全不同的,Linux是一个单片内核,这意味着所有驱动程序代码和I / O工具包都是核心内核的一部分。Apple是一个混合内核。有些人住在内核中,有些是内核扩展(通常是.kext文件)。相比之下,Windows是一个微内核,意味着内核中的内容很少,而且几乎所有东西都是外部驱动程序。L..._苹果的ios系统是linux内核吗?

WEEX框架(一)框架简介和快速上手体验-程序员宅基地

文章浏览阅读8.6k次。框架简介Weex,是能够完美兼顾性能与动态性,让移动开发者通过简捷的前端语法写出Native级别的性能体验的框架,并支持iOS、安卓、Web等多端部署,由阿里巴巴研发和维护。对于移动开发者来说,Weex主要解决了频繁发版和多端研发两大痛点,同时解决了前端语言性能差和显示效果受限的问题。开发者只需要在自己的APP中嵌入Weex的SDK,就可以通过撰写HTML/CSS/JavaScript来开发Native级别的Weex界面。Weex界面的生成码其实就是一段很小的JS,可以像发布网页一样轻松部署在服务端,_weex框架

随便推点

PytorchTotrial5_ModernCNN_torch trial-程序员宅基地

文章浏览阅读94次。深度卷积神经网络(AlexNet)LeNet: 在大的真实数据集上的表现并不尽如⼈意。1.神经网络计算复杂。2.还没有⼤量深⼊研究参数初始化和⾮凸优化算法等诸多领域。机器学习的特征提取:手工定义的特征提取函数神经网络的特征提取:通过学习得到数据的多级表征,并逐级表⽰越来越抽象的概念或模式。神经网络发展的限制:数据、硬件AlexNet首次证明了学习到的特征可以超越⼿⼯设计的特征,从..._torch trial

NP管理器和MT哪个强_MT管理器相信大家都很熟悉的[勉强],功能可以说非常强大,开发逆向的时候经常需要用到。...-程序员宅基地

文章浏览阅读8.1k次。MT管理器相信大家都很熟悉的[勉强],功能可以说非常强大,开发逆向的时候经常需要用到。但是有些功能是需要购买VIP才能体验的。像我们这种穷人只能看着流泪[转圈哭]还在眼馋MT的功能?快来试试今天冷眼为大家推荐的APP——NP管理器这款NP管理器可以说是借鉴于MT管理器,UI界面和MT差不多,但是UX真的不尽人意![弱][阴险]MT的启动速度大约是3-4s 这个软件的启动速度是10s左右[..._np管理器和mt哪个强

unity学习笔记_unity .autodestruct-程序员宅基地

文章浏览阅读1k次,点赞24次,收藏16次。Min Vertex Distance(最小顶点距离)︰定义拖尾效果中两个顶点之间的最小距离如果物体移动的距离小于这个值,不会创建新的顶点。作用:定义线段的对齐方式,可以是世界空间('View")或本地空间('Transformz )。作用:定义线段的纹理模式,可以是拉伸('Tile')、重复( 'stretch ` )Autodestruct(自动销毁)︰如果启用,当拖尾的时间到达设定的时间后,将自动销毁。作用:定义线段的拐角处的顶点数量。作用:设置线段颜色的渐变,可以通过渐变来实现线段颜色的平滑过渡。_unity .autodestruct

网络算法——基于堆的Prim算法和基于并查集的Kruskal算法_prim算法用并查集吗-程序员宅基地

文章浏览阅读386次。类名:Heapself.heap = list() 记录堆中的值 对应边的权重self.node = list() 记录堆中权重对应的起始节点self.neighbor = list() 记录堆中节点的邻接节点类方法:类方法作用definit_(self):初始化类defstr_(self):返回类的信息def将指定节点向上调整def将指定节点向下调整def拆入新节点def获取堆的最小值def更改指定节点的值类名:Union_Find。_prim算法用并查集吗

RuntimeError: Expected all tensors to be on the same device, but found at least two devices, cuda:1-程序员宅基地

文章浏览阅读4.5k次,点赞3次,收藏8次。一、了解nn.DataParallelhttps://zhuanlan.zhihu.com/p/102697821二、报错的几种原因2.1 cuda:0 and cpu!简单的将没有转移到gpu的参数转移即可。例如,xx.to("cuda")2.2 cuda:1 and cuda:0!可能存在有一些参数,不能使用nn.DataParallel自动分配到多个gpu。检查是否有自定义的tensor,注意:不能是Variable,必须是Parameter。..._on the same device, but found at least two devices, cuda:1

数组的两种传递方式_数组传递-程序员宅基地

文章浏览阅读1.3w次,点赞7次,收藏45次。 数组传递:将数组作为参数传递给函数,分值传递和地址传递。其中,值传递的效率较低,不建议使用。两种传递方式都会改变main函数中数组的值,如下代码中a[3]的结果都为6。注意区分数组的值传递和函数值传递的区别。//数组的两种传递方式#include&lt;iostream&gt;using namespace std;//值传递void fun1(int a[5]){ ..._数组传递

推荐文章

热门文章

相关标签