技术标签: spark # Spark(大数据分析引擎)
spark-shell是Spark自带的交互式Shell程序,方便用户进行交互式编程,用户可以在该命令行下用scala编写spark程序。要注意的是要启动Spark-Shell需要先启动Spark-ha集群,Spark集群安装和部署参考:http://blog.csdn.net/tototuzuoquan/article/details/74481570
启动方式一:
[root@hadoop1 spark-2.1.1-bin-hadoop2.7]# cd $SPARK_HOME
[root@hadoop1 spark-2.1.1-bin-hadoop2.7]# pwd
/home/tuzq/software/spark-2.1.1-bin-hadoop2.7
[root@hadoop1 spark-2.1.1-bin-hadoop2.7]# bin/spark-shell --master spark://hadoop1:7077,hadoop2:7077
通过使用–master指定master的地址,连接的是启动着的那个master
同样,还可以指定执行的内存数和总的核心数
[root@hadoop1 spark-2.1.1-bin-hadoop2.7]# cd $SPARK_HOME
[root@hadoop1 spark-2.1.1-bin-hadoop2.7]# bin/spark-shell --master spark://hadoop1:7077,hadoop2:7077 --executor-memory 2g --total-executor-cores 2
参数说明:
–master spark://hadoop:7077 指定Master的地址
–executor-memory 2g 指定每个worker可用内存为2G
–total-executor-cores 2 指定整个集群使用的cup核数为2个
注意:
如果启动spark shell时没有指定master地址,但是也可以正常启动spark shell和执行spark shell中的程序,其实是启动了spark的local模式,该模式仅在本机启动一个进程,没有与集群建立联系。
Spark Shell中已经默认将SparkContext类初始化为对象sc。用户代码如果需要用到,则直接应用sc即可
1.首先启动hdfs
2.向hdfs上传一个文件到hdfs(hdfs://mycluster/wordcount/input/2.txt)
效果图下:
如果通过带有协议的方式访问hadoop集群上的文件可以通过下面的方式:
[root@hadoop2 hadoop-2.8.0]# hdfs dfs -ls hdfs://mycluster/
Found 2 items
drwx-wx-wx - root supergroup 0 2017-07-06 11:11 hdfs://mycluster/tmp
drwxr-xr-x - root supergroup 0 2017-07-06 11:16 hdfs://mycluster/wordcount
[root@hadoop2 hadoop-2.8.0]# hdfs dfs -ls hdfs://mycluster/wordcount/input
Found 9 items
-rw-r--r-- 3 root supergroup 604 2017-07-06 11:16 hdfs://mycluster/wordcount/input/1.txt
-rw-r--r-- 3 root supergroup 604 2017-07-06 11:16 hdfs://mycluster/wordcount/input/2.txt
-rw-r--r-- 3 root supergroup 604 2017-07-06 11:16 hdfs://mycluster/wordcount/input/3.txt
-rw-r--r-- 3 root supergroup 604 2017-07-06 11:16 hdfs://mycluster/wordcount/input/4.txt
-rw-r--r-- 3 root supergroup 604 2017-07-06 11:16 hdfs://mycluster/wordcount/input/5.txt
-rw-r--r-- 3 root supergroup 27209520 2017-07-06 11:16 hdfs://mycluster/wordcount/input/a.txt
-rw-r--r-- 3 root supergroup 27209520 2017-07-06 11:16 hdfs://mycluster/wordcount/input/aaa.txt
-rw-r--r-- 3 root supergroup 27787264 2017-07-06 11:16 hdfs://mycluster/wordcount/input/b.txt
-rw-r--r-- 3 root supergroup 26738688 2017-07-06 11:16 hdfs://mycluster/wordcount/input/c.txt
[root@hadoop2 hadoop-2.8.0]# hdfs dfs -ls hdfs://mycluster/wordcount/input/2.txt
-rw-r--r-- 3 root supergroup 604 2017-07-06 11:16 hdfs://mycluster/wordcount/input/2.txt
[root@hadoop2 hadoop-2.8.0]# hdfs dfs -cat hdfs://mycluster/wordcount/input/2.txt
Collecting and analysis base data for big data analysis;Maintenance Hadoop platform
Development Hadoop framework
Cooperate with data scientist, verify and implement data models to realize automatic and accurate fraud detection, in order to improve the risk management level of E-commerce/payment platforms
Analyze information acquired and compare solutions and weight them against the actual needs, provide root cause analysis affecting key business problems
Play an active role in company's anti-fraud platform strategy
Support related data analysis work, and provide valuable business reports[root@hadoop2 hadoop-2.8.0]#
3.在spark shell中用scala语言编写spark程序
scala> sc.textFile("hdfs://mycluster/wordcount/input/2.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://mycluster/wordcount/output")
1.使用hdfs命令查看结果
[root@hadoop2 hadoop-2.8.0]# hdfs dfs -ls hdfs://mycluster/wordcount/output
Found 3 items
-rw-r--r-- 3 root supergroup 0 2017-07-06 11:48 hdfs://mycluster/wordcount/output/_SUCCESS
-rw-r--r-- 3 root supergroup 400 2017-07-06 11:48 hdfs://mycluster/wordcount/output/part-00000
-rw-r--r-- 3 root supergroup 346 2017-07-06 11:48 hdfs://mycluster/wordcount/output/part-00001
[root@hadoop2 hadoop-2.8.0]# hdfs dfs -cat hdfs://mycluster/wordcount/output/part-00000
(role,1)
(Play,1)
(fraud,1)
(level,1)
(business,2)
(improve,1)
(platforms,1)
(order,1)
(big,1)
(with,1)
(scientist,,1)
(active,1)
(valuable,1)
(data,5)
(information,1)
(Cooperate,1)
(Collecting,1)
(framework,1)
(E-commerce/payment,1)
(acquired,1)
(root,1)
(accurate,1)
(solutions,1)
(analysis;Maintenance,1)
(problems,1)
(them,1)
(Analyze,1)
(models,1)
(analysis,3)
(realize,1)
(actual,1)
(weight,1)
[root@hadoop2 hadoop-2.8.0]#
说明:
sc是SparkContext对象,该对象是提交spark程序的入口
sc.textFile(“hdfs://mycluster/wordcount/input/2.txt”)是从hdfs中读取数据
flatMap(_.split(” “))先map在压平
map((_,1))将单词和1构成元组
reduceByKey(+)按照key进行reduce,并将value累加
saveAsTextFile(“hdfs://mycluster/wordcount/output”)将结果写入到hdfs中
将wordCound的结果排序,并显示的代码:
scala> sc.textFile("hdfs://mycluster/wordcount/input/2.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).collect
res2: Array[(String, Int)] = Array((and,6), (data,5), (analysis,3), (business,2), (to,2), (platform,2), (in,2), (provide,2), (the,2), (Hadoop,2), (compare,1), (risk,1), (anti-fraud,1), (key,1), (related,1), (base,1), (Support,1), (against,1), (automatic,1), (company's,1), (needs,,1), (implement,1), (affecting,1), (strategy,1), (of,1), (reports,1), (management,1), (detection,,1), (for,1), (work,,1), (cause,1), (an,1), (verify,1), (Development,1), (role,1), (Play,1), (fraud,1), (level,1), (improve,1), (platforms,1), (order,1), (big,1), (with,1), (scientist,,1), (active,1), (valuable,1), (information,1), (Cooperate,1), (Collecting,1), (framework,1), (E-commerce/payment,1), (acquired,1), (root,1), (accurate,1), (solutions,1), (analysis;Maintenance,1), (problems,1), (them,1), (Analyze,1), (m...
scala>
spark shell仅在测试和验证我们的程序时使用的较多,在生产环境中,通常会在IDE中编制程序,然后打成jar包,然后提交到集群,最常用的是创建一个Maven项目,利用Maven来管理jar包的依赖。
创建Maven工程:
要注意的是,在创建好项目之后,一定要重新制定好Maven仓库所在的位置,不然可能会导致重新下载jar包:
创建好maven项目后,点击Enable Auto-Import
配置Maven的pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.toto.spark</groupId>
<artifactId>wordCount</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.7</maven.compiler.source>
<maven.compiler.target>1.7</maven.compiler.target>
<encoding>UTF-8</encoding>
<scala.version>2.10.6</scala.version>
<scala.compat.version>2.10</scala.compat.version>
</properties>
<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.5.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.2</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
<configuration>
<args>
<arg>-make:transitive</arg>
<arg>-dependencyfile</arg>
<arg>${project.build.directory}/.scala_dependencies</arg>
</args>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>cn.toto.spark.WordCount</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
将src/main/java和src/test/java分别修改成src/main/scala和src/test/scala(或者创建scala的Directory),与pom.xml中的配置保持一致
或者通过如下方式:
新建一个scala class,类型为Object
编写spark程序代码:
package cn.toto.spark
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* Created by toto on 2017/7/6.
*/
object WordCount {
def main(args: Array[String]): Unit = {
//创建sparkconf
val conf=new SparkConf().setAppName("WordCount")
//创建sparkcontext
val sc=new SparkContext(conf)
//读取hdfs中的数据
val line:RDD[String]=sc.textFile(args(0))
//切分单词
val words:RDD[String]=line.flatMap(_.split(" "))
//将单词计算
val wordAndOne:RDD[(String,Int)]=words.map((_,1))
//分组聚合
val result:RDD[(String,Int)]=wordAndOne.reduceByKey((x,y)=>x+y)
//排序
val finalResult:RDD[(String,Int)]=result.sortBy(_._2,false)
//将数据存到HDFS中
finalResult.saveAsTextFile(args(1))
//释放资源
sc.stop()
}
}
打包:
进入工程的target目录下面,获取jar包
或者直接在IDEA的工程目录下找到:
将wordCount-1.0-SNAPSHOT.jar上传到/home/tuzq/software/sparkdata下
使用spark的jar来做单词统计
要注意的是最后的输出路径要不存在,并且运行下面的程序的时候,最好是把spark-shell给关闭了。否则可能会报错。
bin/spark-submit --master spark://hadoop1:7077,hadoop2:7077 --executor-memory 512m --total-executor-cores 6 --class cn.toto.spark.WordCount /home/tuzq/software/sparkdata/wordCount-1.0-SNAPSHOT.jar hdfs://mycluster/wordcount/input hdfs://mycluster/wordcount/out0001
运行时的状态:
查看hdfs上的结果:
[root@hadoop1 spark-2.1.1-bin-hadoop2.7]# hdfs dfs -ls hdfs://mycluster/wordcount/out0002
Found 10 items
-rw-r--r-- 3 root supergroup 0 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/_SUCCESS
-rw-r--r-- 3 root supergroup 191 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/part-00000
-rw-r--r-- 3 root supergroup 671 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/part-00001
-rw-r--r-- 3 root supergroup 245 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/part-00002
-rw-r--r-- 3 root supergroup 31 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/part-00003
-rw-r--r-- 3 root supergroup 1096 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/part-00004
-rw-r--r-- 3 root supergroup 11 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/part-00005
-rw-r--r-- 3 root supergroup 936 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/part-00006
-rw-r--r-- 3 root supergroup 588 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/part-00007
-rw-r--r-- 3 root supergroup 609 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/part-00008
查看其中的任何一个:
[root@hadoop1 spark-2.1.1-bin-hadoop2.7]# hdfs dfs -cat hdfs://mycluster/wordcount/out0002/part-00000
(and,770752)
(is,659375)
(I,505440)
(a,468642)
(to,431857)
(in,421230)
(the,331176)
(of,272080)
(FDS,218862)
(for,213029)
(The,196569)
(true,196567)
(but,196566)
(on,193650)
(without,193649)
[root@hadoop1 spark-2.1.1-bin-hadoop2.7]#
文章浏览阅读6.7k次。浅拷贝:指对象的字段被拷贝,而字段引用的对象不会被拷贝,拷贝对象和原对象仅仅是引用名称有所不同,但是它们共用一份实体。对任何一个对象的改变,都会影响到另外一个对象。大部分的引用类型,实现的都是浅拷贝,引用类型对象之间的赋值,就是复制一个对象引用地址的副本,而指向的对象实例仍然是同一个。深拷贝:指对象的子段被拷贝,同时字段引用的对象也进行了拷贝。深拷贝创建的是整个源对象的结构,拷贝对象和原对象相..._net 对象复制
当前 StarRocks x Paimon 的能力主要包括:支持各类存储系统,包括 HDFS 以及对象存储 S3/OSS/OSS-HDFS支持 HMS 以及阿里云 DLF 元数据管理系统支持 Paimon 的 Primary Key 和 Append Only 表类型查询支持 Paimon 系统表的查询,常见例如 Read Optimized 表,snapshots 表等支持 Paimon 表和其他类型数据湖格式的关联查询支持 Paimon 表和 StarRocks 内表的关联查询。
1、原型模式(Prototype Pattern)是用于创建重复的对象,同时又能保证性能比较好。一般对付出较大代价获取到的实体对象进行克隆操作,可以提升性能。(2)、复写clone方法(当前对象本身可以不复写,如果当前对象被继承,需要clone子类,则必须要复写)(1)、需要克隆的实体类实现Cloneable接口。可以看出,完整的复制了属性,且并不是同一个对象。
这篇文章,我们讲VQ_GAN,这是一个将特征向量离散化的模型,其效果相当不错,搭配Transformer(GPT)或者CLIP使用,达到的效果在当时可谓是令人拍案叫绝![GPT与GAN结合生成图像——VQGAN原理解析-哔哩哔哩]效果演示:图像生成其他任务。
文章浏览阅读280次。排序-希尔排序文章目录排序-希尔排序时间线前言定义一图流逻辑逻辑思考时间复杂度实现Python思考时间线 2020年7月4日——完成初稿 2020年7月4日——增加时间性能测试前言从学习算法到这篇笔记完成之前,一直都以为希尔排序是多么难懂以及高大上,但其实只要弄懂了就简单了。以前觉得只要掌握一些常见的即可,但后来才发现,其它的排序算法也有其独特的魅力——不是说这个算法可以实现了什么完成了什么,其魅力在于实现的过程,与其它算法的区别。定义一图流希尔排序是直接插入排序算法的改进,所_希尔排序dk=1
文章浏览阅读146次。原创书写,转载请注明出处http://www.cnblogs.com/xbinworld/archive/2013/04/25/3041505.html今天开始学Pattern Recognition and Machine Learning (PRML)书,章节1.2,Probability Theory (上)这一节是浓缩了整本书关于概率论的精华,突出一个不确定性(unc..._荆炳义 高等概率论讲义 pdf
文章浏览阅读1k次。Euclidea几何构建全关卡通关攻略大全玩腻味了哪些大开发商的游戏,小编给大家推荐一款挺有意思的小游戏Euclidea。Euclidea几何构建是一款具有理科生浪漫的游戏,这是一款数学几何风格的休闲益智类手游,玩法就是尺规做图,根据每个关卡给出的道具和要求,以在最短的步骤内解出答案。想说这游戏的趣味就是难度不是一点二点的,有兴趣的大家可以试试。euclidea几何构建全关卡通关攻略Euclide..._几何构建攻略
前缀和,实际上是一种非常简单的动态规划,通过预处理前缀和数组,以空间换时间,提高运行效率。
代码如下图:这样子在foreach中是无法修改class_history的。
文章浏览阅读357次。我有一个使用itext库生成pdf文件的java类.现在根据我的需要,我必须将生成的pdf文件保存到mysql数据库表中,但是我不知道该怎么做.我的担心是:…1.我可以在pdf表的mysql列中提供什么数据类型以保存pdf文件.2.哪个查询将生成的pdf文件插入数据库..目前,我正在生成pdf文件,并将其存储到本地磁盘的硬编码文件路径中.这是我在java中的pdf生成代码…OutputStream..._java:mysql数据库据转换pdf格式并打印机输出java:mysql数据库据转换pdf格式并打印
文章浏览阅读1.8k次。报错信息unknown CJK family \CJKsfdefault is being ignored解决方法在文档中添加\setCJKsansfont{Heiti SC}注意这里我是Mac系统,选用的字体是『Heiti SC』,其它系统具体的字体名称可能不一样_latex cjkfamily报错
文章浏览阅读3.5k次,点赞2次,收藏9次。1. 中科世为Z6S串口屏中科世为官网最近到手一块中科世为的串口屏,开搞!Z6S串口屏中运行的是 FlyThings OS 嵌入式物联网界面系统,FlyThings OS是中科世为基于Linux为操作系统的核心并加入了GUI,硬件层,媒体层,网络层等为系统框架层。同时提供了基于Windows桌面上运行的FlyThings IDE为开发者提供了一个更加便捷的方式完成界面编辑,代码编译,下载调试的功能。FlyThings OS系统的组成如下:内核基于开源的Liunx3.4的内核版本针对_linux hmi开源项目