Flink StreamingFileSink 文件到hdfs 文件一直处于inprogress状态无法生成正式文件_hdfs 文件 出现包含inprogress-程序员宅基地

技术标签: flink  hdfs  big data  大数据合集  

一、问题描述:

 任务逻辑是通过实时读取Kafka数据,一分钟计算一次数据,并利用Flink StreamingFileSink将数据落地到HDFS文件中。为了应对大促剧增的数据量,对当前运行稳定的集群进行了扩容处理,任务重启后发现写入的hdfs文件一直处于inprogress状态无法滚动生成正式文件。

在这里插入图片描述
任务运行一段时间可能会出现如下错误:
在这里插入图片描述

二、解决过程:

  1. 开始是猜想可能是并行度过多,导致产生大量临时文件,文件句柄太多,关闭耗时过久导致文件一分钟内一直无法完成合并?将并行度调整到1,发现问题并没有解决。
  2. 又猜想是因为调整了checkpoint参数,禁用掉checkpoint失败后触发Flink任务重启策略导致checkpoint受影响无法完成临时文件合并?将代码回退,发现问题还没有解决。
  3. 通过观察文件目录,发现临时文件名前缀,跟已合并文件名的前缀一致,猜测可能是文件已存在,临时文件无法完成合并为同名文件?备份并清理掉历史数据,发现文件正常生成, 问题解决!!。

此解决方法为问题发生之后临时急救方案,主要适用于当前任务不依赖历史数据,数据可以清理的任务。剖析其深度原因之后可从根本上避免此类问题。详细请继续阅读下列原因深度剖析。

三、原因深度剖析:

3.1、StreamingFileSink原理简介
  • StreamingFileSink是一个Flink连接器,用来将分区文件写入到支持 Flink FileSystem 接口的文件系统中。StreamingFileSink 会将数据写入到桶中。由于输入流可能是无界的,因此每个桶中的数据被划分为多个有限大小的文件。如何分桶是可以配置的,默认使用基于时间的分桶策略,这种策略每个小时创建一个新的桶,桶中包含的文件将记录所有该小时内从流中接收到的数据。桶目录中的实际输出数据会被划分为多个部分文件(part file),每一个接收桶数据的 Sink Subtask ,至少包含文件(part file)的一部分。多余的文件(part file)将根据滚动策略重新创建,滚动策略是可以配置的。默认的策略是根据文件大小和超时时间来滚动文件。超时时间指打开文件的最长持续时间,以及文件关闭前的最长非活动时间。本描述来自官网翻译,翻译不准确的地方还请各位多多批评指正!

提示:使用 StreamingFileSink 时需要启用 Checkpoint ,每次做 Checkpoint 时写入完成后,桶中临时文件转成正式文件。如果 Checkpoint 被禁用,部分文件(part file)将永远处于 ‘in-progress’ 或 ‘pending’ 状态,下游系统无法安全地读取。
在这里插入图片描述

 本图为Flink 官网(官网地址:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/streamfile_sink.html)对于StreamingFileSink的图示,可以很形象的描述其落地原理。
 为了在下游系统中使用 StreamingFileSink 的输出,我们需要了解输出文件的命名规则和生命周期。由上图可知,文件(part file)可以处于以下三种状态之一:

1).In-progress :

当前文件正在写入中

2).Pending :

当处于 In-progress 状态的文件关闭(closed)了,就变为 Pending 状态

3).Finished :

在成功的 Checkpoint 后,Pending 状态将变为 Finished 状态,处于 Finished 状态的文件不会再被修改,可以被下游系统安全地读取。

 Flink目前对于Hdfs-Sink 有两种实现方式,即BucketingSink以及StreamingFileSink。StreamingFileSink是在BucketingSink之后推出的。主要区别在于StreamingFileSink可以用于故障恢复,保证exactly-once,但是要求hadoop版本必须在2.7以上,因为用到了hdfs的truncate方法。BucketingSink相对用法比较简单,并且没有版本要求。StreamingFileSink的exactly-once主要基于Flink checkpoint提供的hook来实现的两阶段提交模式来保证的,这也是为什么官网提示使用时一定要打开checkpoint开关的原因。上述描述的桶物理上对应一个文件夹、subtask表示Flink同一任务的不同子任务,换言之,就是不同并行度。数据流中读到一个元素,根据项目的BucketAssigner可以计算出该元素属于哪个分区,通过状态管理器可以获取到该分区下目前最大的正在写的文件编号是多少?然后写到对应的文件中。官网图比较抽象,根据源码及对官网描述的理解,本人画了一张更加详细的StreamingFileSink示意图如下:
在这里插入图片描述

3.2、 回到问题,part-0-561.xxx,文件名中数字来自哪里?为什么每个文件生成之后其编号是累加的?为什么会重启之后数字又会重新开始编号? 文件中数字来自哪里?
  • 文件中数字来自哪里?为什么每个文件生成之后其编号是累加的?
     上述描述桶物理映射就是一个文件夹名称,源码对桶的解释为:桶就是StreamingFileSink输出的目录组织。对于StreamingFileSink中的每个传入元素,通过用户指定的bucketsassigner,决定该元素应写入哪个bucket。每个新生成的文件名都是由前缀,Flink子任务的数量,文件编号,后缀组成。这就是文件中数字的由来。源码显示,由名为partCounter的一个变量+1得到的结果就是文件中的数字。
    在这里插入图片描述在这里插入图片描述

  • 为什么重启之后数字又会重新开始编号?

     由于StreamingFileSink基于checkpoint实现Exactly-Once,那么其必须实现Flink中关于checkpoint的两个接口CheckpointedFunction(快照拍摄功能接口), CheckpointListener (快照拍摄状态监听接口)源码如下:
    在这里插入图片描述
     既然实现了这两个接口,那么就需要关注快照初始化时做了什么操作?快照开始拍摄时做了什么?快照拍摄完成之后做了什么?从源码的下列三个方法作为入口剖析:
    在这里插入图片描述
     快照初始化时,根据子任务的数量创建了若干个桶管理器,并初始化了桶管理器的一些属性(详细属性可以见源码org.apache.flink.streaming.api.functions.sink.filesystem.Buckets),其中有个属性maxPartCounter,用来记录当前正在写的,或者最新写完文件的编号,快照初始化时将其值初值为0并保存在一个ListState中,每个桶对应一个maxPartCounter。通过源码追踪发现该值最终传入bucket并赋值给partCounter,用于默认文件命名。源码跳转路径为:

org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink#initializeState→

org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper#StreamingFileSinkHelperorg.apache.flink.streaming.api.functions.sink.filesystem.Buckets#initializeState→

org.apache.flink.streaming.api.functions.sink.filesystem.Buckets#initializeActiveBuckets→

org.apache.flink.streaming.api.functions.sink.filesystem.BucketFactory#restoreBucket→

org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl#restoreBucket→

org.apache.flink.streaming.api.functions.sink.filesystem.Bucket#restore→

org.apache.flink.streaming.api.functions.sink.filesystem.Bucket#Bucket(int, BucketID, org.apache.flink.core.fs.Path, long, org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter<IN,BucketID>, org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy<IN,BucketID>, org.apache.flink.streaming.api.functions.sink.filesystem.FileLifeCycleListener<BucketID>, org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig)

 this.partCounter = initialPartCounter(maxPartCounter);

 初始化做了什么?分析源码发现,初始化时,创建了一个桶并初始化了桶的一些属性,其中桶编号的获取用到属性maxPartCounter,见名知意,其作用是用来记录当前part的最大编号,其值初始化为0。而桶编号就是每个临时文件名的前缀。

在这里插入图片描述在这里插入图片描述
 数据流中每读到一个元素,通过用户设置的分桶策略找到该元素对应的桶,并将其写入文件,并且跟上次的partcount对比,获取当前的最大值并将最大值进行更新。
在这里插入图片描述
 拍摄快照时,先将maxPartCounter的状态清空,然后仅记录当前Checkpoint编号的每个桶的maxPartCounter值,当前checkpoint成功,那么每个桶的最新文件编号及被记录在当前的装填中供下次获取。
在这里插入图片描述 快照拍摄完成之后,会将临时文件合并为Finished状态的文件,其中bucketWriter就跟文件生成相关,其文件名就是涉及到上述描述的文件编号。

在这里插入图片描述

  • 为什么启动一段时间后可能会报错?
     报错的直接原因是因为任务手动停止任务时,没有手动保存快照即savepoint,任务可能停止在两个checkpoint时间段之间,下次任务重启之后,任务从最后一个成功的checkpoint点继续执行任务,此时如果继续写文件会发现文件已被合并,就会报 File does not exist 错误信息,原因如下图所示:
     savepoint与checkpoint的工作原理一致,只不过检查点是自动触发的,而savepoint需要命令行触发或者web控制台触发。和checkpoint一样,savepoint也保存到稳定存储当中,用户可以从savepoint重启作业,而不用从头开始。checkpoint的侧重点是“容错”,即Flink作业意外失败并重启之后,能够直接从早先打下的checkpoint恢复运行,且不影响作业逻辑的准确性。而savepoint的侧重点是“维护”,即Flink作业需要在人工干预下手动重启、升级、迁移或A/B测试时,先将状态整体写入可靠存储,维护完毕之后再从savepoint恢复现场.
    在这里插入图片描述

四、解决方案:

 在维护任务,手动停止任务时,一定要保存快照。扩容及代码维护之后,要指定快照重启任务就可以从根本上避免该问题的产生。

五、总结

 问题的根本原因:在手动停止任务时,StreamingFileSink依赖Checkpoint状态来记录当前checkpoint id对应最新生成文件的编号,下一个checkpoint id有新数据读取到时,会根据上一次状态记录的文件最大编号的值累加得到新元素对应文件的文件名,在停止任务时,没有保存快照,导致最后一次chekcpoint成功生成的文件编号没有被记录而丢失,下次任务重启时不指定快照重启,快照会重新进行初始化,文件名中编号又被初始化为0,临时文件在合并为Finished状态时,发现同一目录下已存在同样的文件,而无法进行覆盖导致文件一直处于正在写入状态。所以,当把本目录下历史数据清除掉之后,所有写入的文件重新从0开始编号,能正常完成文件的写入。
教训:在后续Flink任务中,如果涉及到有状态记录,chekcpoint等操作,在停止任务时一定不能暴力停止,一定要保存快照,平滑执行停止操作,让其状态能安全保存。否则,可能有些累计求值的数据会永久丢失,需要重置Kafka offset才能恢复。
 知其然知其所以然:本文章分析方法同样适用于Flink Kafka sink,Kafka sink保持Exactly-Once原理也是基于两阶段事务提交方式实现的,大家有兴趣可以利用同样的分析方法去阅读Flink FlinkKafkaProducer源码,甚至后续有其他sink操作,需要具备容错机制,也可以参考此处Flink源码去实现。

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/haozhuxuan/article/details/122078203

智能推荐

稀疏编码的数学基础与理论分析-程序员宅基地

文章浏览阅读290次,点赞8次,收藏10次。1.背景介绍稀疏编码是一种用于处理稀疏数据的编码技术,其主要应用于信息传输、存储和处理等领域。稀疏数据是指数据中大部分元素为零或近似于零的数据,例如文本、图像、音频、视频等。稀疏编码的核心思想是将稀疏数据表示为非零元素和它们对应的位置信息,从而减少存储空间和计算复杂度。稀疏编码的研究起源于1990年代,随着大数据时代的到来,稀疏编码技术的应用范围和影响力不断扩大。目前,稀疏编码已经成为计算...

EasyGBS国标流媒体服务器GB28181国标方案安装使用文档-程序员宅基地

文章浏览阅读217次。EasyGBS - GB28181 国标方案安装使用文档下载安装包下载,正式使用需商业授权, 功能一致在线演示在线API架构图EasySIPCMSSIP 中心信令服务, 单节点, 自带一个 Redis Server, 随 EasySIPCMS 自启动, 不需要手动运行EasySIPSMSSIP 流媒体服务, 根..._easygbs-windows-2.6.0-23042316使用文档

【Web】记录巅峰极客2023 BabyURL题目复现——Jackson原生链_原生jackson 反序列化链子-程序员宅基地

文章浏览阅读1.2k次,点赞27次,收藏7次。2023巅峰极客 BabyURL之前AliyunCTF Bypassit I这题考查了这样一条链子:其实就是Jackson的原生反序列化利用今天复现的这题也是大同小异,一起来整一下。_原生jackson 反序列化链子

一文搞懂SpringCloud,详解干货,做好笔记_spring cloud-程序员宅基地

文章浏览阅读734次,点赞9次,收藏7次。微服务架构简单的说就是将单体应用进一步拆分,拆分成更小的服务,每个服务都是一个可以独立运行的项目。这么多小服务,如何管理他们?(服务治理 注册中心[服务注册 发现 剔除])这么多小服务,他们之间如何通讯?这么多小服务,客户端怎么访问他们?(网关)这么多小服务,一旦出现问题了,应该如何自处理?(容错)这么多小服务,一旦出现问题了,应该如何排错?(链路追踪)对于上面的问题,是任何一个微服务设计者都不能绕过去的,因此大部分的微服务产品都针对每一个问题提供了相应的组件来解决它们。_spring cloud

Js实现图片点击切换与轮播-程序员宅基地

文章浏览阅读5.9k次,点赞6次,收藏20次。Js实现图片点击切换与轮播图片点击切换<!DOCTYPE html><html> <head> <meta charset="UTF-8"> <title></title> <script type="text/ja..._点击图片进行轮播图切换

tensorflow-gpu版本安装教程(过程详细)_tensorflow gpu版本安装-程序员宅基地

文章浏览阅读10w+次,点赞245次,收藏1.5k次。在开始安装前,如果你的电脑装过tensorflow,请先把他们卸载干净,包括依赖的包(tensorflow-estimator、tensorboard、tensorflow、keras-applications、keras-preprocessing),不然后续安装了tensorflow-gpu可能会出现找不到cuda的问题。cuda、cudnn。..._tensorflow gpu版本安装

随便推点

物联网时代 权限滥用漏洞的攻击及防御-程序员宅基地

文章浏览阅读243次。0x00 简介权限滥用漏洞一般归类于逻辑问题,是指服务端功能开放过多或权限限制不严格,导致攻击者可以通过直接或间接调用的方式达到攻击效果。随着物联网时代的到来,这种漏洞已经屡见不鲜,各种漏洞组合利用也是千奇百怪、五花八门,这里总结漏洞是为了更好地应对和预防,如有不妥之处还请业内人士多多指教。0x01 背景2014年4月,在比特币飞涨的时代某网站曾经..._使用物联网漏洞的使用者

Visual Odometry and Depth Calculation--Epipolar Geometry--Direct Method--PnP_normalized plane coordinates-程序员宅基地

文章浏览阅读786次。A. Epipolar geometry and triangulationThe epipolar geometry mainly adopts the feature point method, such as SIFT, SURF and ORB, etc. to obtain the feature points corresponding to two frames of images. As shown in Figure 1, let the first image be ​ and th_normalized plane coordinates

开放信息抽取(OIE)系统(三)-- 第二代开放信息抽取系统(人工规则, rule-based, 先抽取关系)_语义角色增强的关系抽取-程序员宅基地

文章浏览阅读708次,点赞2次,收藏3次。开放信息抽取(OIE)系统(三)-- 第二代开放信息抽取系统(人工规则, rule-based, 先关系再实体)一.第二代开放信息抽取系统背景​ 第一代开放信息抽取系统(Open Information Extraction, OIE, learning-based, 自学习, 先抽取实体)通常抽取大量冗余信息,为了消除这些冗余信息,诞生了第二代开放信息抽取系统。二.第二代开放信息抽取系统历史第二代开放信息抽取系统着眼于解决第一代系统的三大问题: 大量非信息性提取(即省略关键信息的提取)、_语义角色增强的关系抽取

10个顶尖响应式HTML5网页_html欢迎页面-程序员宅基地

文章浏览阅读1.1w次,点赞6次,收藏51次。快速完成网页设计,10个顶尖响应式HTML5网页模板助你一臂之力为了寻找一个优质的网页模板,网页设计师和开发者往往可能会花上大半天的时间。不过幸运的是,现在的网页设计师和开发人员已经开始共享HTML5,Bootstrap和CSS3中的免费网页模板资源。鉴于网站模板的灵活性和强大的功能,现在广大设计师和开发者对html5网站的实际需求日益增长。为了造福大众,Mockplus的小伙伴整理了2018年最..._html欢迎页面

计算机二级 考试科目,2018全国计算机等级考试调整,一、二级都增加了考试科目...-程序员宅基地

文章浏览阅读282次。原标题:2018全国计算机等级考试调整,一、二级都增加了考试科目全国计算机等级考试将于9月15-17日举行。在备考的最后冲刺阶段,小编为大家整理了今年新公布的全国计算机等级考试调整方案,希望对备考的小伙伴有所帮助,快随小编往下看吧!从2018年3月开始,全国计算机等级考试实施2018版考试大纲,并按新体系开考各个考试级别。具体调整内容如下:一、考试级别及科目1.一级新增“网络安全素质教育”科目(代..._计算机二级增报科目什么意思

conan简单使用_apt install conan-程序员宅基地

文章浏览阅读240次。conan简单使用。_apt install conan