spark while_Spark程序启动的几种姿势-程序员宅基地

技术标签: spark while  

2ef472703cc7752d6f40d03ff561ea2a.png
本文主要讲述了Spark启动的几种姿势。

对于spark(streaming)程序,我们通常是用shell脚本进行启动,而脚本的调用通常是由crontab或者调度系统例如azkaban定时启动,当然azkaban还有创建依赖等等功能。

如果我们的程序触发条件是由某个主动行为,而非固定时间点或者是依赖某个任务呢?比如我想调个接口就可以启动spark程序?

姿势一

Azkaban API 调用

Azkaban的相关文档:

API Documentation

把Spark程序放到Azkaban上,然后通过Azkaban的api调用,不论是curl也好,还是在java里也好,都可以直接通过接口执行程序。

举个例子

//获取session.id
curl -k -X POST --data "action=login&username=Spark&password=HELLOPWD" https://127.0.0.1:8443
//curl执行azkaban任务
curl -k --get --data 'session.id=b57f1460-e909-4c0b-9324-a694d19fb151' --data 'ajax=executeFlow' --data 'project=Test' --data 'flow=TestFlow' https://127.0.0.1:8443/executor --data 'flowOverride[day]=2019-01-29'

姿势二

通过java 调用yarn api来实现spark程序的调度

这种方式在网上看到的,没有实际操作过。

放链接:Spark2.3(四十):如何使用java通过yarn api调度spark app,并根据appId监控任务,关闭任务,获取任务日志

姿势三

通过livy调用,启动spark程序

不知道livy是什么的,可以看下这篇文章:Livy:基于Apache Spark的REST服务

livy还是非常好用的,除了每次都需要把spark的jar包都要上传到HDFS。但是调用起来很简单,也可以curl调用,还可以curl查看log

举个例子:

curl -X POST --data '{"file": "'${jar}'", "className": "'${main_class}'","driverMemory":"'${driver_memory}'","executorMemory":"'${executor_memory}'","numExecutors":'${num_executor}',"executorCores":'${executor_cores}',"queue":"'${queue}'","args":["'${org_key}'","'${team_key}'"],"conf":{"spark.master":"yarn","spark.submit.deployMode":"cluster","spark.app.name":"'${appname}'"}}' -H "Content-Type: application/json" 127.0.0.1:8999/batches

姿势四:

有写公司不用Azkaban,然后姿势二和姿势三也都需要安装或者运维配合。那就自己写一个简单的

举个例子:

Scala + Redis + 外部命令

Scala程序一直监听这Redis里的队列,只要队列出现数据,就会消费数据,然后利用外部命令执行shell脚本。

Scala执行外部命令很简单,导入scala.sys.process._包,然后""加!就可以了

   val config: java.util.Map[String, String] = PropUtils.load("redis.properties")

    val host = config.get("host")
    val port = config.get("port").toInt
    val password = config.get("password")
    val db = config.get("db").toInt
    val channel = signal.get("channel")

    val jedis = new Jedis(host,port)
    jedis.auth(password) //密码
jedis.select(db) //选择数据库
     try {
      while (true){
        val msg = jedis.lpop("Test_Q")
        if(msg != null ){
          var jsonobj:JSONObject = null
          try{
            jsonobj = JSON.parseObject(msg)
          }catch {
            case e:Exception => {
              logger.error("[ERROR]:解析信号异常,无法解析json:${msg}  at " + DateFunctions.NowDate())
            }
          }
          //执行命令
          executor(jsonobj)
        }else{
          Thread.sleep(1000L)
        }
      }
    }catch {
      case e:Exception =>
        logger.error("[ERROR]:信号处理程序 循环结束 at " + DateFunctions.NowDate() + "exception:" +e)
    }finally {
      logger.error("[ERROR]:信号处理程序结束 at " + DateFunctions.NowDate())
    }

    jedis.close()
    def executor(jsonobj: JSONObject): Unit = {
        ...//可以按业务处理得到脚本参数
        import scala.sys.process._
        "sh bin/start.sh ${params}" !
        ...
    }

有其他方式欢迎评论补充

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/weixin_39849070/article/details/110213617

智能推荐

MySQL的存储过程_mysql存储过程-程序员宅基地

文章浏览阅读5.1w次,点赞108次,收藏726次。概念MySQL5.0版本开始支持存储过程,存储过程就是一组SQL语句集,功能强大,可以实现一些比较复杂的逻辑功能,类似于JAVA语言中的方法,存储过就是数据库SQL与层层面的代码封装与重用特性1.有输入输出参数,可以声明变量,有if/else/case/while等控制语句,通过编写存储过程,可以实现复杂的逻辑功能2.函数的普通特性:模块化,封装,代码复用3.速度快,只有首次执行需要经过编译和优化步骤,后续被调用可以直接执行,省去以上步骤格式创建存储过程-- 创建存储过_mysql存储过程

vite2+vue3在vite.config下使用env环境变量的方法_import { configenv, userconfigexport, loadenv } fr-程序员宅基地

文章浏览阅读3.3k次,点赞2次,收藏3次。There's a chicken-egg problem here: Vite expects to resolve .env files from project root, but project root can be made different by the config file.So if we resolve .env before resolving the config file, we can only resolve it from CWD, which would then b_import { configenv, userconfigexport, loadenv } from "vite

golangci-lint在vscode的使用,以及配置的一些探索-程序员宅基地

文章浏览阅读4.1k次,点赞2次,收藏8次。Go 语言自带套件为我们提供了静态代码分析工具 vet,它能用于检查 go 项目中可以通过编译但仍可能存在错误的代码。在维基百科是如下定义 lint 的:但是如果想要更多,就可以用golangci-lint。golangci-lint 是一个 linter 的集成框架。它集成了非常多的 linter,包括了上文提到的vet,合理使用它可以帮助我们更全面地分析与检查 Go 代码VScode配置golangci-lint VScode使用golangci-lint golangci-lint检测nil..._golangci-lint

ERROR: tensorrt-5.1.5.0-cp36-none-linux_x86_64.whl is not a supported wheel on this platform.-程序员宅基地

文章浏览阅读3k次。最近在安装tensorrt遇到的bug,安装是根据下面博主写的,(但是缺少tensorrt的下载地址,这里补上https://developer.nvidia.com/nvidia-tensorrt-5x-download)https://blog.csdn.net/zong596568821xp/article/details/86077553根据改博主的安装步骤,一直报错ERR..._error: tensorrt-5.1.5.0-cp36-none-linux_x86_64.whl is not a supported wheel

【原】执行nosetests 报错:pkg_resources.DistributionNotFound: nose==0.10.4-程序员宅基地

文章浏览阅读179次。我的系统是centos6.3,安装好nose1.3.4之后,执行nosetests,提示异常如下:1 # nosetests2 Traceback (most recent call last):3 File "/usr/bin/nosetests", line 5, in <module>4 from pkg_resources import load..._pkg_resources.distributionnotfound: the 'nose==0.10.4' distributio

思科关闭日志_思科交换机日志管理-程序员宅基地

文章浏览阅读5.3k次。Cisco配置发送日志到日志服务器logging 133.3.3.2logging onlogging trap 7 //指定日志消息的级别 (0:紧急(Emergencies) 1:告警(Alerts) 2:严重的(Critical) 3:错误(Errors) 4:警告(Warnings) 5:通知(Notifications) 6:信息(Informational) 7:调试(Debuggi..._思科交换机清空日志

随便推点

POS终端交易流程-程序员宅基地

文章浏览阅读297次。POS终端的操作流程,应符合以下基本要求:  a)提示交易功能选择:    功能键和数字键同时可用。  b)对于需验证操作员或管理员密码的交易,提示输入并进行验证。  c) 按照交易要求,提示刷卡或手输卡号: 如不刷卡,可按指定功能键,进入手工输入卡号提示。  d)按照交易要求,选择提示输入有关数据: 包括:交易金..._发卡行脚本上送

docker搭建lnmp 部署wordpress_docker 部署wordpress-程序员宅基地

文章浏览阅读485次。1、获取数据库、Nginx和PHP集成环境镜像镜像 docker pull mariadb docker pull richarvey/nginx-php-fpm2、 创建本地挂载数据卷并下载wordpress模板 mkdir -p /web_site/webroot && cd /web_site/webroot/ wget https://cn.wordpress.org/wordpress-5.0.3-zh_CN.tar.gz tar&nb_docker 部署wordpress

迁移学习-域适应损失函数MMD-代码实现及验证_mmd损失函数-程序员宅基地

文章浏览阅读2.2w次,点赞37次,收藏249次。迁移学习-域适应-损失函数MMD-python代码实现_mmd损失函数

element DateTimePicker赋值不成功问题_vue el-time-picker赋值失败-程序员宅基地

文章浏览阅读1.5k次。在赋值时使用this.$nextTick(() => { this.ruleForm.datetime = [new Date(res.data.beginTime),new Date(res.data.endTime)]})即可_vue el-time-picker赋值失败

个人编程心得--持续更新版_编程个人感言简短-程序员宅基地

文章浏览阅读975次。在此处写下一些关于编程的感悟,也许是灵光一现的产物,也许是多次编程的心得,防止忘记。_编程个人感言简短

如何查看服务器系统时间,服务器时间相关(硬件时间,系统时间,时区时间)...-程序员宅基地

文章浏览阅读1.1w次。硬件时间,系统时间,时区#hwclock--hctosys把硬件时间同步到系统时间查看系统时区命令:date -R第一种方法,将时区信息文件拷贝至/etc/localtime下。具体操作方式为cp /usr/share/zoneinfo/Asia/Shanghai /etc/localtime -R 将上海时区拷贝至/etc下。第二种方法,执行命令tzselect,首先我们选择一个洲,本篇我们要..._查看服务器时间

推荐文章

热门文章

相关标签