技术标签: 使用java提交spark任务 Spark任务提交 spark 大数据
首先做了一些调研,先梳理了下原生spark-submit的提交流程(Spark on Yarn)
我们平时使用spark-submit脚本或者直接使用SparkSubmit类进行Spark任务的提交时,流程通常是以下这样的:
上面主要是Spark on Yarn的任务提交流程介绍,重点在于构建yarn任务的上下文环境,之后将这些信息提交到Yarn RM,进行任务的调度和执行。从yarn的接口文档和Spark源码中可以看出,这些上下文环境主要有3类:
其中环境变量和启动命令比较简单,无非就是一些字符串的拼接。但是依赖的资源准备就比较复杂,这个工作是由SparkSubmit来做的,主要是将客户端机器上的资源上传到hdfs的某个临时目录,然后告诉yarn这些资源的hdfs路径,这样yarn就可以拉取到这些资源文件。
这些资源文件主要有:
可以看出,原生的SparkSubmit提交方式强依赖于客户端所在机器环境,需要确保机器上有安装Spark,同时配置SPARK_HOME、HADOOP_HOME等信息。
我们的目标是开发一个开箱即用的用于spark任务提交的Java sdk,如果底层还是使用SparkSubmit来提交任务,那么不可避免的就需要业务方的机器上安装了spark安装包,同时相关环境变量都配置好了(SPARK_HOME、HADOOP_HOME等)。因此,我们只能抛弃SparkSubmit这套流程,自己进行客户端参数的提取、校验、封装,然后自行构建yarn任务需要的上下文信息。大概流程如下:
和SparkSubmit不同的是,对于任务所需的启动依赖资源,我们直接从HDFS获取(因此需要提前将相关资源放置到HDFS上),而非客户端所在的机器。这样就可以避免sdk和机器环境的强绑定,只要保证客户端的机器可以访问obs即可。
该demo需要引入一些yarn的依赖:
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.8.3</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-client</artifactId>
<version>2.8.3</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-api</artifactId>
<version>2.8.3</version>
</dependency>
demo如下:
public static void main(String[] args) throws IOException, YarnException {
Configuration configuration = new Configuration();
configuration.set("yarn.resourcemanager.ha.enabled", "true");
configuration.set("yarn.resourcemanager.ha.rm-ids", "rm1,rm2");
configuration.set("yarn.resourcemanager.hostname.rm1", "10.16.19.237");
configuration.set("yarn.resourcemanager.hostname.rm2", "10.16.19.13");
//初始化yarnClient
YarnClient yarnClient = YarnClient.createYarnClient();
yarnClient.init(configuration);
yarnClient.start();
YarnClientApplication application = yarnClient.createApplication();
System.out.println(application.getNewApplicationResponse().getApplicationId());
//构建Spark作业的上下文,也就是ApplicationMaster的上下文信息
ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);
//设置AppMaster的环境变量
Map<String, String> env = Maps.newHashMap();
//设置CLASSPATH,这里的{
{PWD}}是任务工作目录的占位符,后面yarn会自动替换成工作目录。<CPS>是多个条目之间的间隔符
env.put("CLASSPATH", "{
{PWD}}<CPS>{
{PWD}}/__spark_conf__<CPS>{
{PWD}}/__spark_libs__/*<CPS>$HADOOP_CONF_DIR<CPS>" +
"$HADOOP_COMMON_HOME/share/hadoop/common/*<CPS>$HADOOP_COMMON_HOME/share/hadoop/common/lib/*<CPS>" +
"$HADOOP_HDFS_HOME/share/hadoop/hdfs/*<CPS>$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*<CPS>" +
"$HADOOP_YARN_HOME/share/hadoop/yarn/*<CPS>$HADOOP_YARN_HOME/share/hadoop/yarn/lib/*<CPS>" +
"$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/*<CPS>$HADOOP_MAPRED_HOME/share/hadoop/mapreduce/lib/*");
env.put("SPARK_YARN_STAGING_DIR", "hdfs://test45cluster/tmp/yjbtest/sparkJobTest");
env.put("SPARK_USER", "root");
amContainer.setEnvironment(env);
//准备相关资源,这里我们可以把这些资源提前放到hdfs上,然后把对应的路径告诉yarn即可
//其中如果资源类型是ARCHIVE,yarn会帮我们解压到工作目录。
//比如我们指定了一个Key是__spark_libs__的资源,对应hdfs上的/tmp/yjbtest/spark_lib.zip,那么yarn在启动任务时会将这个zip文件拉下来解压成__spark_libs__目录
//所以我们在上面的CLASSPATH可以看到 {
{
PWD}}/__spark_libs__/* 这个条目,就是这么来的
Map<String, LocalResource> localResourceMap = Maps.newHashMap();
{
LocalResource resource = Records.newRecord(LocalResource.class);
URL uri = URL.newInstance("hdfs", "myCluster", -1, "/tmp/yjbtest/spark_lib.zip");
resource.setType(LocalResourceType.ARCHIVE);
resource.setVisibility(LocalResourceVisibility.PRIVATE);
resource.setResource(uri);
//文件的时间戳和大小一定要和实际的文件信息对上,不然会报错
resource.setTimestamp(1617261217328L);
resource.setSize(194878966);
localResourceMap.put("__spark_libs__", resource);
}
{
LocalResource resource = Records.newRecord(LocalResource.class);
URL uri = URL.newInstance("hdfs", "myCluster", -1, "/tmp/yjbtest/spark-test-1.0-SNAPSHOT.jar");
resource.setType(LocalResourceType.FILE);
resource.setVisibility(LocalResourceVisibility.PRIVATE);
resource.setResource(uri);
resource.setTimestamp(1617347524409L);
resource.setSize(39168);
localResourceMap.put("__app__.jar", resource);
}
{
//在spark-submit中,spark.* 开头的配置会被写入到 __spark_conf__/__spark_conf__.properties 配置下
//spark.hadoop.* 开头的配置会被写入到 __spark_hadoop_conf__.xml 配置下
//
LocalResource resource = Records.newRecord(LocalResource.class);
URL uri = URL.newInstance("hdfs", "myCluster", -1, "/tmp/yjbtest/__spark_conf4__.zip");
resource.setType(LocalResourceType.ARCHIVE);
resource.setVisibility(LocalResourceVisibility.PRIVATE);
resource.setResource(uri);
resource.setTimestamp(1617692664596L);
resource.setSize(1186);
localResourceMap.put("__spark_conf__", resource);
}
{
//在spark-submit中,spark.* 开头的配置会被写入到 __spark_conf__/__spark_conf__.properties 配置下
//spark.hadoop.* 开头的配置会被写入到 __spark_hadoop_conf__.xml 配置下
LocalResource resource = Records.newRecord(LocalResource.class);
URL uri = URL.newInstance("hdfs", "myCluster", -1, "/tmp/yjbtest/log4j.properties");
resource.setType(LocalResourceType.FILE);
resource.setVisibility(LocalResourceVisibility.PRIVATE);
resource.setResource(uri);
resource.setTimestamp(1617676015956L);
resource.setSize(3265);
localResourceMap.put("log4j.properties", resource);
}
{
//在spark-submit中,spark.* 开头的配置会被写入到 __spark_conf__/__spark_conf__.properties 配置下
//spark.hadoop.* 开头的配置会被写入到 __spark_hadoop_conf__.xml 配置下
LocalResource resource = Records.newRecord(LocalResource.class);
URL uri = URL.newInstance("hdfs", "myCluster", -1, "/tmp/yjbtest/__spark_dist_cache__.properties");
resource.setType(LocalResourceType.FILE);
resource.setVisibility(LocalResourceVisibility.PRIVATE);
resource.setResource(uri);
resource.setTimestamp(1617678784802L);
resource.setSize(586);
localResourceMap.put("__spark_dist_cache__.properties", resource);
}
amContainer.setLocalResources(localResourceMap);
//ApplicaiontMaster的启动命令
List<String> commandList = Lists.newArrayList();
commandList.add("{
{JAVA_HOME}}/bin/java -server -Xmx1024m -Djava.io.tmpdir={
{PWD}}/tmp " +
"-Dspark.yarn.app.container.log.dir=<LOG_DIR> org.apache.spark.deploy.yarn.ApplicationMaster " +
"--class 'com.yangjb.SqlTest' --jar file:/tmp/abc.jar " +
"--properties-file {
{PWD}}/__spark_conf__/__spark_conf__.properties " +
"--dist-cache-conf {
{PWD}}/__spark_dist_cache__.properties 1> <LOG_DIR>/stdout 2> <LOG_DIR>/stderr");
amContainer.setCommands(commandList);
ApplicationSubmissionContext appContext = application.getApplicationSubmissionContext();
appContext.setApplicationName("yjbtest-20210402");
appContext.setQueue("default");
appContext.setAMContainerSpec(amContainer);
appContext.setApplicationType("SPARK");
appContext.setMaxAppAttempts(1);
//告诉yarn需要分配给我们多少内存和cpu
Resource capability = Records.newRecord(Resource.class);
capability.setMemorySize(2048);
capability.setVirtualCores(2);
appContext.setResource(capability);
//提交spark作业
yarnClient.submitApplication(appContext);
while (true) {
ApplicationReport applicationReport = yarnClient.getApplicationReport(application.getNewApplicationResponse().getApplicationId());
YarnApplicationState yarnApplicationState = applicationReport.getYarnApplicationState();
System.out.println("State:" + yarnApplicationState);
if (yarnApplicationState == YarnApplicationState.FAILED) {
String diagnostics = applicationReport.getDiagnostics();
System.out.println("错误信息:" + diagnostics);
break;
} else if (yarnApplicationState == YarnApplicationState.KILLED
|| yarnApplicationState == YarnApplicationState.FINISHED) {
break;
}
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
因为我们sdk的代码量比较多,目前也还不够成熟,暂时不考虑开源全部代码。
上面的demo比较简单,上下文信息都是固定写死的。不过读者读懂demo大概也就能基于这个思路开发一个用于提交spark任务的java sdk了。
另外,需要注意的是,SparkSubmit本身做了大量的参数解析,转换,封装的工作。我们抛弃SparkSubmit自己实现这些参数的解析封装,就意味很容易漏掉很多细节,因此开发完sdk需要做好详尽的测试,避免一些场景没考虑到,导致用户的任务出现的问题。
文章浏览阅读5.4k次。PL/SQL initialization error could not initialize我在电脑上安装了Oracle11 64位的,使用CMD可以完美连接数据库并进行SQL操作,之后用PL/SQL登录却遇到如图问题。 出现这一问题的原因:安装完后Oracle的 oci.dll 是64位的,而32位应用程序 PLSQL Developer 无法加载。 解决方案:在网上查找解决..._initialization error could not load "d:\instantclient-basic-nt-12.2.0.1.0\in
文章浏览阅读515次。来源丨https://zhuanlan.zhihu.com/p/104019160PyTorch最好的资料是官方文档。本文是PyTorch常用代码段,在参考资料[1](张皓:PyTorch Cookbook)的基础上做了一些修补,方便使用时查阅1. 基本配置导入包和版本查询import torchimport torch.nn as nnimport torchvisionprint(torch.__version__)print(torch.version.cuda)print(torc_pytorch wrn
文章浏览阅读5.3k次,点赞2次,收藏20次。代码#include "stm32f10x.h"typedef unsigned char u8;typedef unsigned short int u16;void delay_ms(u16 ms){ u16 j; while(ms--) { for(j=0;j<1000;j++); }}#define GPIOA_ODR (GPI..._stm32 查看gpio 波形
文章浏览阅读55次。在客户端限制表单重复提交有两种方法: 第一种:在javascript脚本中设置一个标志变量,来区分表单是否已经提交。如果已经提交,则弹出对话框告诉用户“重复提交”。 第二种:在单击提交按钮以后将提交按钮设置为disabled状态,这样用户就无法再提交按钮,客户端也就无法重复提交。 采用第一种方法:1.新建一个ClientTest1.jsp文件,代码如..._jsp限制表单重复提交
文章浏览阅读269次。【方向】 2017-08-24 16:31:56 浏览8502 评论2 ..._with open(cifar10_folder + test_dataset[0], 'rb') as f0:
文章浏览阅读1k次。Ue4蓝图和c++如何相互调用(宏)官方文档官方文档常用BlueprintCallable 此函数可在蓝图或关卡蓝图图表中执行。BlueprintImplementableEvent 此函数可在蓝图或关卡蓝图图表中实现。BlueprintNativeEvent 此函数旨在被蓝图覆盖掉,但是也具有默认原生实现。用于声明名称与主函数相同的附加函数,但是末尾添加了Implementation,是写入代码的位置。如果未找到任何蓝图覆盖,该自动生成的代码将调用 Implementation 方法。_虚幻4c++调用蓝图函数
文章浏览阅读4.2k次。用v-html:{{message}}_后端返回带标签的字段前端怎么正确显示
文章浏览阅读1.2k次。因此,main 函数的作用是初始化应用程序,并启动消息循环,而 TApplication 组件负责处理消息循环并转发消息。除了 TApplication 和 TWinControl 组件外,VCL 还提供了一些其他的组件来处理特定类型的消息,如 TTimer 组件用于处理定时器消息,TMessage 组件用于处理自定义消息等等。在 Delphi 应用程序中,TApplication 组件负责处理应用程序的消息循环,但它并不直接处理应用程序的 main 函数入口。_delphi软件框架
文章浏览阅读8.3k次,点赞4次,收藏6次。一、问题描述和解决在用R读取数据的时候,常见的一般是.txt或.cvs结尾的文件。突然遇见一个.rda结尾的文件一下子不知道如何读取。经过查资料和自己尝试,终于找到了读取的方法。这里需要使用load函数,使用load(“文件名.rda”)即可将数据读取,但需要注意的是,有一点特殊的地方就是,读取出来,直接用变量名输出出来并不是文件里的数据,而是文件里保存的数据的名字。要想使用文件里的数据,直接..._r语言读取rda文件
文章浏览阅读889次。1:在控制器验证登录成功时存储session与cookie的值 /// <summary> /// 验证登录 存储Session或Cookie /// </summary> /// <returns></returns> public async Task<IA..._.netcore实现session与cookie以及授权过滤器的步骤
文章浏览阅读1.5w次,点赞20次,收藏237次。这是学长亲手整理的,软件工程毕设选题系列第三篇,都是经过学长精心审核的题目,适合作为毕设,难度不高,工作量达标,对毕设有任何疑问都可以问学长哦!相对容易工作量达标题目新颖学长限时开放免费开题指导,对开题有任何不明白的,对某项技术或算法不理解的,不知道怎么下手毕设的,都可以问学长,学长会根据你的情况提供帮助,希望能帮助到你。_软件工程毕业设计
文章浏览阅读1.4k次。两行代码搞定import os# 获取当前文件的目录cur_path = os.path.abspath(os.path.dirname(__file__))# 获取根目录root_path = cur_path[:cur_path.find('pythonProject2')]+'pythonProject2' print(root_path)实际路径:C:\Users\86..._python 当前工程根目录