启动Spark Shell,在Spark Shell中编写WordCount程序,在IDEA中编写WordCount的Maven程序,spark-submit使用spark的jar来做单词统计_idea使用代码打开sparkshell-程序员宅基地

技术标签: spark  # Spark(大数据分析引擎)  

1.启动Spark Shell

spark-shell是Spark自带的交互式Shell程序,方便用户进行交互式编程,用户可以在该命令行下用scala编写spark程序。要注意的是要启动Spark-Shell需要先启动Spark-ha集群,Spark集群安装和部署参考:http://blog.csdn.net/tototuzuoquan/article/details/74481570

1.2.1、启动spark shell

启动方式一:

[root@hadoop1 spark-2.1.1-bin-hadoop2.7]# cd $SPARK_HOME 
[root@hadoop1 spark-2.1.1-bin-hadoop2.7]# pwd
/home/tuzq/software/spark-2.1.1-bin-hadoop2.7
[root@hadoop1 spark-2.1.1-bin-hadoop2.7]# bin/spark-shell --master spark://hadoop1:7077,hadoop2:7077

通过使用–master指定master的地址,连接的是启动着的那个master

同样,还可以指定执行的内存数和总的核心数

[root@hadoop1 spark-2.1.1-bin-hadoop2.7]# cd $SPARK_HOME
[root@hadoop1 spark-2.1.1-bin-hadoop2.7]# bin/spark-shell --master spark://hadoop1:7077,hadoop2:7077 --executor-memory 2g --total-executor-cores 2

参数说明:
–master spark://hadoop:7077 指定Master的地址
–executor-memory 2g 指定每个worker可用内存为2G
–total-executor-cores 2 指定整个集群使用的cup核数为2个

注意:
如果启动spark shell时没有指定master地址,但是也可以正常启动spark shell和执行spark shell中的程序,其实是启动了spark的local模式,该模式仅在本机启动一个进程,没有与集群建立联系。

Spark Shell中已经默认将SparkContext类初始化为对象sc。用户代码如果需要用到,则直接应用sc即可

1.2.2、在spark shell中编写WordCount程序

1.首先启动hdfs
2.向hdfs上传一个文件到hdfs(hdfs://mycluster/wordcount/input/2.txt)
效果图下:
这里写图片描述

如果通过带有协议的方式访问hadoop集群上的文件可以通过下面的方式:

[root@hadoop2 hadoop-2.8.0]# hdfs dfs -ls hdfs://mycluster/
Found 2 items
drwx-wx-wx   - root supergroup          0 2017-07-06 11:11 hdfs://mycluster/tmp
drwxr-xr-x   - root supergroup          0 2017-07-06 11:16 hdfs://mycluster/wordcount
[root@hadoop2 hadoop-2.8.0]# hdfs dfs -ls hdfs://mycluster/wordcount/input
Found 9 items
-rw-r--r--   3 root supergroup        604 2017-07-06 11:16 hdfs://mycluster/wordcount/input/1.txt
-rw-r--r--   3 root supergroup        604 2017-07-06 11:16 hdfs://mycluster/wordcount/input/2.txt
-rw-r--r--   3 root supergroup        604 2017-07-06 11:16 hdfs://mycluster/wordcount/input/3.txt
-rw-r--r--   3 root supergroup        604 2017-07-06 11:16 hdfs://mycluster/wordcount/input/4.txt
-rw-r--r--   3 root supergroup        604 2017-07-06 11:16 hdfs://mycluster/wordcount/input/5.txt
-rw-r--r--   3 root supergroup   27209520 2017-07-06 11:16 hdfs://mycluster/wordcount/input/a.txt
-rw-r--r--   3 root supergroup   27209520 2017-07-06 11:16 hdfs://mycluster/wordcount/input/aaa.txt
-rw-r--r--   3 root supergroup   27787264 2017-07-06 11:16 hdfs://mycluster/wordcount/input/b.txt
-rw-r--r--   3 root supergroup   26738688 2017-07-06 11:16 hdfs://mycluster/wordcount/input/c.txt
[root@hadoop2 hadoop-2.8.0]# hdfs dfs -ls hdfs://mycluster/wordcount/input/2.txt
-rw-r--r--   3 root supergroup        604 2017-07-06 11:16 hdfs://mycluster/wordcount/input/2.txt
[root@hadoop2 hadoop-2.8.0]# hdfs dfs -cat hdfs://mycluster/wordcount/input/2.txt
Collecting and analysis base data for big data analysis;Maintenance Hadoop platform 
Development Hadoop framework 
Cooperate with data scientist, verify and implement data models to realize automatic and accurate fraud detection, in order to improve the risk management level of E-commerce/payment platforms 
Analyze information acquired and compare solutions and weight them against the actual needs, provide root cause analysis affecting key business problems 
Play an active role in company's anti-fraud platform strategy 
Support related data analysis work, and provide valuable business reports[root@hadoop2 hadoop-2.8.0]#

3.在spark shell中用scala语言编写spark程序

scala> sc.textFile("hdfs://mycluster/wordcount/input/2.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://mycluster/wordcount/output")

1.使用hdfs命令查看结果

[root@hadoop2 hadoop-2.8.0]# hdfs dfs -ls hdfs://mycluster/wordcount/output
Found 3 items
-rw-r--r--   3 root supergroup          0 2017-07-06 11:48 hdfs://mycluster/wordcount/output/_SUCCESS
-rw-r--r--   3 root supergroup        400 2017-07-06 11:48 hdfs://mycluster/wordcount/output/part-00000
-rw-r--r--   3 root supergroup        346 2017-07-06 11:48 hdfs://mycluster/wordcount/output/part-00001
[root@hadoop2 hadoop-2.8.0]# hdfs dfs -cat hdfs://mycluster/wordcount/output/part-00000
(role,1)
(Play,1)
(fraud,1)
(level,1)
(business,2)
(improve,1)
(platforms,1)
(order,1)
(big,1)
(with,1)
(scientist,,1)
(active,1)
(valuable,1)
(data,5)
(information,1)
(Cooperate,1)
(Collecting,1)
(framework,1)
(E-commerce/payment,1)
(acquired,1)
(root,1)
(accurate,1)
(solutions,1)
(analysis;Maintenance,1)
(problems,1)
(them,1)
(Analyze,1)
(models,1)
(analysis,3)
(realize,1)
(actual,1)
(weight,1)
[root@hadoop2 hadoop-2.8.0]#

说明:
sc是SparkContext对象,该对象是提交spark程序的入口
sc.textFile(“hdfs://mycluster/wordcount/input/2.txt”)是从hdfs中读取数据
flatMap(_.split(” “))先map在压平
map((_,1))将单词和1构成元组
reduceByKey(+)按照key进行reduce,并将value累加
saveAsTextFile(“hdfs://mycluster/wordcount/output”)将结果写入到hdfs中

将wordCound的结果排序,并显示的代码:

scala> sc.textFile("hdfs://mycluster/wordcount/input/2.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).collect
res2: Array[(String, Int)] = Array((and,6), (data,5), (analysis,3), (business,2), (to,2), (platform,2), (in,2), (provide,2), (the,2), (Hadoop,2), (compare,1), (risk,1), (anti-fraud,1), (key,1), (related,1), (base,1), (Support,1), (against,1), (automatic,1), (company's,1), (needs,,1), (implement,1), (affecting,1), (strategy,1), (of,1), (reports,1), (management,1), (detection,,1), (for,1), (work,,1), (cause,1), (an,1), (verify,1), (Development,1), (role,1), (Play,1), (fraud,1), (level,1), (improve,1), (platforms,1), (order,1), (big,1), (with,1), (scientist,,1), (active,1), (valuable,1), (information,1), (Cooperate,1), (Collecting,1), (framework,1), (E-commerce/payment,1), (acquired,1), (root,1), (accurate,1), (solutions,1), (analysis;Maintenance,1), (problems,1), (them,1), (Analyze,1), (m...
scala>

2、idea中创建spark的maven工程

spark shell仅在测试和验证我们的程序时使用的较多,在生产环境中,通常会在IDE中编制程序,然后打成jar包,然后提交到集群,最常用的是创建一个Maven项目,利用Maven来管理jar包的依赖。

创建Maven工程:
这里写图片描述
这里写图片描述
这里写图片描述
这里写图片描述
要注意的是,在创建好项目之后,一定要重新制定好Maven仓库所在的位置,不然可能会导致重新下载jar包:
这里写图片描述

创建好maven项目后,点击Enable Auto-Import
这里写图片描述
配置Maven的pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.toto.spark</groupId>
    <artifactId>wordCount</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>1.7</maven.compiler.source>
        <maven.compiler.target>1.7</maven.compiler.target>
        <encoding>UTF-8</encoding>
        <scala.version>2.10.6</scala.version>
        <scala.compat.version>2.10</scala.compat.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.10</artifactId>
            <version>1.5.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.10</artifactId>
            <version>1.5.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>2.6.2</version>
        </dependency>
    </dependencies>

    <build>
        <sourceDirectory>src/main/scala</sourceDirectory>
        <testSourceDirectory>src/test/scala</testSourceDirectory>
        <plugins>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.0</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-make:transitive</arg>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.18.1</version>
                <configuration>
                    <useFile>false</useFile>
                    <disableXmlReport>true</disableXmlReport>
                    <includes>
                        <include>**/*Test.*</include>
                        <include>**/*Suite.*</include>
                    </includes>
                </configuration>
            </plugin>

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>cn.toto.spark.WordCount</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

将src/main/java和src/test/java分别修改成src/main/scala和src/test/scala(或者创建scala的Directory),与pom.xml中的配置保持一致
这里写图片描述
这里写图片描述

或者通过如下方式:
这里写图片描述
这里写图片描述
新建一个scala class,类型为Object
这里写图片描述

编写spark程序代码:

package cn.toto.spark

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by toto on 2017/7/6.
  */
object WordCount {
    

  def main(args: Array[String]): Unit = {
    //创建sparkconf
    val conf=new SparkConf().setAppName("WordCount")
    //创建sparkcontext
    val sc=new SparkContext(conf)
    //读取hdfs中的数据
    val line:RDD[String]=sc.textFile(args(0))
    //切分单词
    val words:RDD[String]=line.flatMap(_.split(" "))
    //将单词计算
    val wordAndOne:RDD[(String,Int)]=words.map((_,1))
    //分组聚合
    val result:RDD[(String,Int)]=wordAndOne.reduceByKey((x,y)=>x+y)
    //排序
    val finalResult:RDD[(String,Int)]=result.sortBy(_._2,false)
    //将数据存到HDFS中
    finalResult.saveAsTextFile(args(1))
    //释放资源
    sc.stop()
  }
}

打包:
这里写图片描述
进入工程的target目录下面,获取jar包
这里写图片描述
或者直接在IDEA的工程目录下找到:
这里写图片描述

将wordCount-1.0-SNAPSHOT.jar上传到/home/tuzq/software/sparkdata下
这里写图片描述

使用spark的jar来做单词统计
要注意的是最后的输出路径要不存在,并且运行下面的程序的时候,最好是把spark-shell给关闭了。否则可能会报错。

bin/spark-submit --master spark://hadoop1:7077,hadoop2:7077 --executor-memory 512m --total-executor-cores 6 --class cn.toto.spark.WordCount /home/tuzq/software/sparkdata/wordCount-1.0-SNAPSHOT.jar hdfs://mycluster/wordcount/input hdfs://mycluster/wordcount/out0001

运行时的状态:
这里写图片描述

查看hdfs上的结果:

[root@hadoop1 spark-2.1.1-bin-hadoop2.7]# hdfs dfs -ls hdfs://mycluster/wordcount/out0002
Found 10 items
-rw-r--r--   3 root supergroup          0 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/_SUCCESS
-rw-r--r--   3 root supergroup        191 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/part-00000
-rw-r--r--   3 root supergroup        671 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/part-00001
-rw-r--r--   3 root supergroup        245 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/part-00002
-rw-r--r--   3 root supergroup         31 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/part-00003
-rw-r--r--   3 root supergroup       1096 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/part-00004
-rw-r--r--   3 root supergroup         11 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/part-00005
-rw-r--r--   3 root supergroup        936 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/part-00006
-rw-r--r--   3 root supergroup        588 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/part-00007
-rw-r--r--   3 root supergroup        609 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/part-00008

查看其中的任何一个:

[root@hadoop1 spark-2.1.1-bin-hadoop2.7]# hdfs dfs -cat hdfs://mycluster/wordcount/out0002/part-00000
(and,770752)
(is,659375)
(I,505440)
(a,468642)
(to,431857)
(in,421230)
(the,331176)
(of,272080)
(FDS,218862)
(for,213029)
(The,196569)
(true,196567)
(but,196566)
(on,193650)
(without,193649)
[root@hadoop1 spark-2.1.1-bin-hadoop2.7]#
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/toto1297488504/article/details/74510952

智能推荐

.NET 实现实体对象深拷贝(克隆/复制)的几种方法_net 对象复制-程序员宅基地

文章浏览阅读6.7k次。浅拷贝:指对象的字段被拷贝,而字段引用的对象不会被拷贝,拷贝对象和原对象仅仅是引用名称有所不同,但是它们共用一份实体。对任何一个对象的改变,都会影响到另外一个对象。大部分的引用类型,实现的都是浅拷贝,引用类型对象之间的赋值,就是复制一个对象引用地址的副本,而指向的对象实例仍然是同一个。深拷贝:指对象的子段被拷贝,同时字段引用的对象也进行了拷贝。深拷贝创建的是整个源对象的结构,拷贝对象和原对象相..._net 对象复制

StarRocks x Paimon 构建极速实时湖仓分析架构实践

当前 StarRocks x Paimon 的能力主要包括:支持各类存储系统,包括 HDFS 以及对象存储 S3/OSS/OSS-HDFS支持 HMS 以及阿里云 DLF 元数据管理系统支持 Paimon 的 Primary Key 和 Append Only 表类型查询支持 Paimon 系统表的查询,常见例如 Read Optimized 表,snapshots 表等支持 Paimon 表和其他类型数据湖格式的关联查询支持 Paimon 表和 StarRocks 内表的关联查询。

Java设计模式 _创建型模式_原型模式(Cloneable)

1、原型模式(Prototype Pattern)是用于创建重复的对象,同时又能保证性能比较好。一般对付出较大代价获取到的实体对象进行克隆操作,可以提升性能。(2)、复写clone方法(当前对象本身可以不复写,如果当前对象被继承,需要clone子类,则必须要复写)(1)、需要克隆的实体类实现Cloneable接口。可以看出,完整的复制了属性,且并不是同一个对象。

GPT与GAN结合生成图像——VQGAN原理解析

这篇文章,我们讲VQ_GAN,这是一个将特征向量离散化的模型,其效果相当不错,搭配Transformer(GPT)或者CLIP使用,达到的效果在当时可谓是令人拍案叫绝![GPT与GAN结合生成图像——VQGAN原理解析-哔哩哔哩]效果演示:图像生成其他任务。

希尔(谢尔)排序/缩小增量排序(Python实现)_希尔排序dk=1-程序员宅基地

文章浏览阅读280次。排序-希尔排序文章目录排序-希尔排序时间线前言定义一图流逻辑逻辑思考时间复杂度实现Python思考时间线 2020年7月4日——完成初稿 2020年7月4日——增加时间性能测试前言从学习算法到这篇笔记完成之前,一直都以为希尔排序是多么难懂以及高大上,但其实只要弄懂了就简单了。以前觉得只要掌握一些常见的即可,但后来才发现,其它的排序算法也有其独特的魅力——不是说这个算法可以实现了什么完成了什么,其魅力在于实现的过程,与其它算法的区别。定义一图流希尔排序是直接插入排序算法的改进,所_希尔排序dk=1

今天开始学Pattern Recognition and Machine Learning (PRML)书,章节1.2,Probability Theory 概率论(上)...-程序员宅基地

文章浏览阅读146次。原创书写,转载请注明出处http://www.cnblogs.com/xbinworld/archive/2013/04/25/3041505.html今天开始学Pattern Recognition and Machine Learning (PRML)书,章节1.2,Probability Theory (上)这一节是浓缩了整本书关于概率论的精华,突出一个不确定性(unc..._荆炳义 高等概率论讲义 pdf

随便推点

euclidea4攻略_Euclidea几何构建9.4通关攻略-程序员宅基地

文章浏览阅读1k次。Euclidea几何构建全关卡通关攻略大全玩腻味了哪些大开发商的游戏,小编给大家推荐一款挺有意思的小游戏Euclidea。Euclidea几何构建是一款具有理科生浪漫的游戏,这是一款数学几何风格的休闲益智类手游,玩法就是尺规做图,根据每个关卡给出的道具和要求,以在最短的步骤内解出答案。想说这游戏的趣味就是难度不是一点二点的,有兴趣的大家可以试试。euclidea几何构建全关卡通关攻略Euclide..._几何构建攻略

【算法】前缀和

前缀和,实际上是一种非常简单的动态规划,通过预处理前缀和数组,以空间换时间,提高运行效率。

php7.4在foreach中对使用数据使用无法??[]判读,无法使用引用传递&

代码如下图:这样子在foreach中是无法修改class_history的。

pdf转换成byte放入mysql_如何将生成的pdf文件保存到java中的mysql数据...-程序员宅基地

文章浏览阅读357次。我有一个使用itext库生成pdf文件的java类.现在根据我的需要,我必须将生成的pdf文件保存到mysql数据库表中,但是我不知道该怎么做.我的担心是:…1.我可以在pdf表的mysql列中提供什么数据类型以保存pdf文件.2.哪个查询将生成的pdf文件插入数据库..目前,我正在生成pdf文件,并将其存储到本地磁盘的硬编码文件路径中.这是我在java中的pdf生成代码…OutputStream..._java:mysql数据库据转换pdf格式并打印机输出java:mysql数据库据转换pdf格式并打印

LateX beamer 下的报错unknown CJK family \CJKsfdefault is being ignored_latex cjkfamily报错-程序员宅基地

文章浏览阅读1.8k次。报错信息unknown CJK family \CJKsfdefault is being ignored解决方法在文档中添加\setCJKsansfont{Heiti SC}注意这里我是Mac系统,选用的字体是『Heiti SC』,其它系统具体的字体名称可能不一样_latex cjkfamily报错

中科世为 Z6S Linux HMI 屏幕模组上手记录 | 01 - 环境搭建_linux hmi开源项目-程序员宅基地

文章浏览阅读3.5k次,点赞2次,收藏9次。1. 中科世为Z6S串口屏中科世为官网最近到手一块中科世为的串口屏,开搞!Z6S串口屏中运行的是 FlyThings OS 嵌入式物联网界面系统,FlyThings OS是中科世为基于Linux为操作系统的核心并加入了GUI,硬件层,媒体层,网络层等为系统框架层。同时提供了基于Windows桌面上运行的FlyThings IDE为开发者提供了一个更加便捷的方式完成界面编辑,代码编译,下载调试的功能。FlyThings OS系统的组成如下:内核基于开源的Liunx3.4的内核版本针对_linux hmi开源项目

推荐文章

热门文章

相关标签