Flink StreamingFileSink 文件到hdfs 文件一直处于inprogress状态无法生成正式文件_hdfs 文件 出现包含inprogress-程序员宅基地

技术标签: flink  hdfs  big data  大数据合集  

一、问题描述:

 任务逻辑是通过实时读取Kafka数据,一分钟计算一次数据,并利用Flink StreamingFileSink将数据落地到HDFS文件中。为了应对大促剧增的数据量,对当前运行稳定的集群进行了扩容处理,任务重启后发现写入的hdfs文件一直处于inprogress状态无法滚动生成正式文件。

在这里插入图片描述
任务运行一段时间可能会出现如下错误:
在这里插入图片描述

二、解决过程:

  1. 开始是猜想可能是并行度过多,导致产生大量临时文件,文件句柄太多,关闭耗时过久导致文件一分钟内一直无法完成合并?将并行度调整到1,发现问题并没有解决。
  2. 又猜想是因为调整了checkpoint参数,禁用掉checkpoint失败后触发Flink任务重启策略导致checkpoint受影响无法完成临时文件合并?将代码回退,发现问题还没有解决。
  3. 通过观察文件目录,发现临时文件名前缀,跟已合并文件名的前缀一致,猜测可能是文件已存在,临时文件无法完成合并为同名文件?备份并清理掉历史数据,发现文件正常生成, 问题解决!!。

此解决方法为问题发生之后临时急救方案,主要适用于当前任务不依赖历史数据,数据可以清理的任务。剖析其深度原因之后可从根本上避免此类问题。详细请继续阅读下列原因深度剖析。

三、原因深度剖析:

3.1、StreamingFileSink原理简介
  • StreamingFileSink是一个Flink连接器,用来将分区文件写入到支持 Flink FileSystem 接口的文件系统中。StreamingFileSink 会将数据写入到桶中。由于输入流可能是无界的,因此每个桶中的数据被划分为多个有限大小的文件。如何分桶是可以配置的,默认使用基于时间的分桶策略,这种策略每个小时创建一个新的桶,桶中包含的文件将记录所有该小时内从流中接收到的数据。桶目录中的实际输出数据会被划分为多个部分文件(part file),每一个接收桶数据的 Sink Subtask ,至少包含文件(part file)的一部分。多余的文件(part file)将根据滚动策略重新创建,滚动策略是可以配置的。默认的策略是根据文件大小和超时时间来滚动文件。超时时间指打开文件的最长持续时间,以及文件关闭前的最长非活动时间。本描述来自官网翻译,翻译不准确的地方还请各位多多批评指正!

提示:使用 StreamingFileSink 时需要启用 Checkpoint ,每次做 Checkpoint 时写入完成后,桶中临时文件转成正式文件。如果 Checkpoint 被禁用,部分文件(part file)将永远处于 ‘in-progress’ 或 ‘pending’ 状态,下游系统无法安全地读取。
在这里插入图片描述

 本图为Flink 官网(官网地址:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/streamfile_sink.html)对于StreamingFileSink的图示,可以很形象的描述其落地原理。
 为了在下游系统中使用 StreamingFileSink 的输出,我们需要了解输出文件的命名规则和生命周期。由上图可知,文件(part file)可以处于以下三种状态之一:

1).In-progress :

当前文件正在写入中

2).Pending :

当处于 In-progress 状态的文件关闭(closed)了,就变为 Pending 状态

3).Finished :

在成功的 Checkpoint 后,Pending 状态将变为 Finished 状态,处于 Finished 状态的文件不会再被修改,可以被下游系统安全地读取。

 Flink目前对于Hdfs-Sink 有两种实现方式,即BucketingSink以及StreamingFileSink。StreamingFileSink是在BucketingSink之后推出的。主要区别在于StreamingFileSink可以用于故障恢复,保证exactly-once,但是要求hadoop版本必须在2.7以上,因为用到了hdfs的truncate方法。BucketingSink相对用法比较简单,并且没有版本要求。StreamingFileSink的exactly-once主要基于Flink checkpoint提供的hook来实现的两阶段提交模式来保证的,这也是为什么官网提示使用时一定要打开checkpoint开关的原因。上述描述的桶物理上对应一个文件夹、subtask表示Flink同一任务的不同子任务,换言之,就是不同并行度。数据流中读到一个元素,根据项目的BucketAssigner可以计算出该元素属于哪个分区,通过状态管理器可以获取到该分区下目前最大的正在写的文件编号是多少?然后写到对应的文件中。官网图比较抽象,根据源码及对官网描述的理解,本人画了一张更加详细的StreamingFileSink示意图如下:
在这里插入图片描述

3.2、 回到问题,part-0-561.xxx,文件名中数字来自哪里?为什么每个文件生成之后其编号是累加的?为什么会重启之后数字又会重新开始编号? 文件中数字来自哪里?
  • 文件中数字来自哪里?为什么每个文件生成之后其编号是累加的?
     上述描述桶物理映射就是一个文件夹名称,源码对桶的解释为:桶就是StreamingFileSink输出的目录组织。对于StreamingFileSink中的每个传入元素,通过用户指定的bucketsassigner,决定该元素应写入哪个bucket。每个新生成的文件名都是由前缀,Flink子任务的数量,文件编号,后缀组成。这就是文件中数字的由来。源码显示,由名为partCounter的一个变量+1得到的结果就是文件中的数字。
    在这里插入图片描述在这里插入图片描述

  • 为什么重启之后数字又会重新开始编号?

     由于StreamingFileSink基于checkpoint实现Exactly-Once,那么其必须实现Flink中关于checkpoint的两个接口CheckpointedFunction(快照拍摄功能接口), CheckpointListener (快照拍摄状态监听接口)源码如下:
    在这里插入图片描述
     既然实现了这两个接口,那么就需要关注快照初始化时做了什么操作?快照开始拍摄时做了什么?快照拍摄完成之后做了什么?从源码的下列三个方法作为入口剖析:
    在这里插入图片描述
     快照初始化时,根据子任务的数量创建了若干个桶管理器,并初始化了桶管理器的一些属性(详细属性可以见源码org.apache.flink.streaming.api.functions.sink.filesystem.Buckets),其中有个属性maxPartCounter,用来记录当前正在写的,或者最新写完文件的编号,快照初始化时将其值初值为0并保存在一个ListState中,每个桶对应一个maxPartCounter。通过源码追踪发现该值最终传入bucket并赋值给partCounter,用于默认文件命名。源码跳转路径为:

org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink#initializeState→

org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper#StreamingFileSinkHelperorg.apache.flink.streaming.api.functions.sink.filesystem.Buckets#initializeState→

org.apache.flink.streaming.api.functions.sink.filesystem.Buckets#initializeActiveBuckets→

org.apache.flink.streaming.api.functions.sink.filesystem.BucketFactory#restoreBucket→

org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl#restoreBucket→

org.apache.flink.streaming.api.functions.sink.filesystem.Bucket#restore→

org.apache.flink.streaming.api.functions.sink.filesystem.Bucket#Bucket(int, BucketID, org.apache.flink.core.fs.Path, long, org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter<IN,BucketID>, org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy<IN,BucketID>, org.apache.flink.streaming.api.functions.sink.filesystem.FileLifeCycleListener<BucketID>, org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig)

 this.partCounter = initialPartCounter(maxPartCounter);

 初始化做了什么?分析源码发现,初始化时,创建了一个桶并初始化了桶的一些属性,其中桶编号的获取用到属性maxPartCounter,见名知意,其作用是用来记录当前part的最大编号,其值初始化为0。而桶编号就是每个临时文件名的前缀。

在这里插入图片描述在这里插入图片描述
 数据流中每读到一个元素,通过用户设置的分桶策略找到该元素对应的桶,并将其写入文件,并且跟上次的partcount对比,获取当前的最大值并将最大值进行更新。
在这里插入图片描述
 拍摄快照时,先将maxPartCounter的状态清空,然后仅记录当前Checkpoint编号的每个桶的maxPartCounter值,当前checkpoint成功,那么每个桶的最新文件编号及被记录在当前的装填中供下次获取。
在这里插入图片描述 快照拍摄完成之后,会将临时文件合并为Finished状态的文件,其中bucketWriter就跟文件生成相关,其文件名就是涉及到上述描述的文件编号。

在这里插入图片描述

  • 为什么启动一段时间后可能会报错?
     报错的直接原因是因为任务手动停止任务时,没有手动保存快照即savepoint,任务可能停止在两个checkpoint时间段之间,下次任务重启之后,任务从最后一个成功的checkpoint点继续执行任务,此时如果继续写文件会发现文件已被合并,就会报 File does not exist 错误信息,原因如下图所示:
     savepoint与checkpoint的工作原理一致,只不过检查点是自动触发的,而savepoint需要命令行触发或者web控制台触发。和checkpoint一样,savepoint也保存到稳定存储当中,用户可以从savepoint重启作业,而不用从头开始。checkpoint的侧重点是“容错”,即Flink作业意外失败并重启之后,能够直接从早先打下的checkpoint恢复运行,且不影响作业逻辑的准确性。而savepoint的侧重点是“维护”,即Flink作业需要在人工干预下手动重启、升级、迁移或A/B测试时,先将状态整体写入可靠存储,维护完毕之后再从savepoint恢复现场.
    在这里插入图片描述

四、解决方案:

 在维护任务,手动停止任务时,一定要保存快照。扩容及代码维护之后,要指定快照重启任务就可以从根本上避免该问题的产生。

五、总结

 问题的根本原因:在手动停止任务时,StreamingFileSink依赖Checkpoint状态来记录当前checkpoint id对应最新生成文件的编号,下一个checkpoint id有新数据读取到时,会根据上一次状态记录的文件最大编号的值累加得到新元素对应文件的文件名,在停止任务时,没有保存快照,导致最后一次chekcpoint成功生成的文件编号没有被记录而丢失,下次任务重启时不指定快照重启,快照会重新进行初始化,文件名中编号又被初始化为0,临时文件在合并为Finished状态时,发现同一目录下已存在同样的文件,而无法进行覆盖导致文件一直处于正在写入状态。所以,当把本目录下历史数据清除掉之后,所有写入的文件重新从0开始编号,能正常完成文件的写入。
教训:在后续Flink任务中,如果涉及到有状态记录,chekcpoint等操作,在停止任务时一定不能暴力停止,一定要保存快照,平滑执行停止操作,让其状态能安全保存。否则,可能有些累计求值的数据会永久丢失,需要重置Kafka offset才能恢复。
 知其然知其所以然:本文章分析方法同样适用于Flink Kafka sink,Kafka sink保持Exactly-Once原理也是基于两阶段事务提交方式实现的,大家有兴趣可以利用同样的分析方法去阅读Flink FlinkKafkaProducer源码,甚至后续有其他sink操作,需要具备容错机制,也可以参考此处Flink源码去实现。

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/haozhuxuan/article/details/122078203

智能推荐

C语言作业(四)-程序员宅基地

文章浏览阅读70次。【代码】C语言作业(四)

JavaScript语法高亮库highlight.js使用_语法高亮js css文件-程序员宅基地

文章浏览阅读9.9k次。highlight.js是一款基于JavaScript的语法高亮库,目前支持125种编程语言,有63种可供选择的样式,而且能够做到语言自动识别,和目前主流的JS框架都能兼容,可以混合使用。这款高亮库可以用在博客系统中,其使用方法及其简单,几乎不需要任何学习成本,下面介绍highlight.js的使用。1.获取highlight.js库,用户可以从官网获取:地址:https://highlightjs_语法高亮js css文件

【笔记】strftime的使用方法-程序员宅基地

文章浏览阅读5.1k次。strftimestrftime是C语言标准库中用来格式化输出时间的的函数。下面是strftime的用法各参数意义代码使用示例#include<stdio.h>#include<time.h>#define print(s1, s2,s3) \ printf("%-20s%-30s%s\n",s1, s2,s3);int main(){ time_t rawtime; struct tm* timeinfo; char timE[80]; /

2018.09.12 poj3621Sightseeing Cows(01分数规划+spfa判环)-程序员宅基地

文章浏览阅读147次。传送门 01分数规划板题啊。 发现就是一个最优比率环。 这个直接二分+spfa判负环就行了。 代码:#include&lt;iostream&gt;#include&lt;cstdio&gt;#include&lt;cstring&gt;#include&lt;algorithm&gt;#include&lt;cmath&gt;#define N 1005#define...

hive sql的常用日期处理函数总结_hive sql 日期函数-程序员宅基地

文章浏览阅读3.1k次,点赞2次,收藏14次。1)date_format函数(根据格式整理日期)  作用:把一个字符串日期格式化为指定的格式。select date_format('2017-01-01','yyyy-MM-dd HH:mm:ss'); --日期字符串必须满足yyyy-MM-dd格式   结果:2017-01-01 00:00:002)date_add、date_sub函数(加减日期)  作用:把一个字符串日期格式加一天、减一天。select date_add('2019-01-01',1); ..._hive sql 日期函数

Android Studio使用百度语音合成是TTS时报错: ****.so文件找不到的有关问题_旧版的百度语言合成报错-程序员宅基地

文章浏览阅读2.1k次。使用百度语音合成过程时,一直error : notfint libgnustl_shared.so在项目工程gradle文件中添加如下代码段:sourceSets { main { jniLibs.srcDirs = ['libs'] } }..._旧版的百度语言合成报错

随便推点

探索Camera2Demo:一款深入理解Android Camera2 API的开源示例项目-程序员宅基地

文章浏览阅读267次,点赞5次,收藏9次。探索Camera2Demo:一款深入理解Android Camera2 API的开源示例项目项目地址:https://gitcode.com/wangshengyang1996/Camera2Demo项目简介Camera2Demo 是一个由wangshengyang1996开发并维护的Android应用示例,旨在帮助开发者更好地理解和使用Android的Camera2 API。该项目通过提供..._android camera2 demo

iOS10 适配、Xcode8配置总结①-程序员宅基地

文章浏览阅读121次。2019独角兽企业重金招聘Python工程师标准>>> ...

微信API证书过期,获取API证书_微信 apiv3证书 过期-程序员宅基地

文章浏览阅读10w+次。在做微信如:(退款、企业红包、企业付款)提现这些操作的时候,微信返回(具体哪个字段我忘记了)的信息是:证书过期,那么就需要重新获取证书,证书的获取前提条件:1:你需要有微信商户平台的商户号(类似电话号码的数字),和商户名称(比如公司名称)2:按照官方说明文档进行相应操作即可(按照这个做就行了)http://kf.qq.com/faq/161222NneAJf161222U7fARv.h..._微信 apiv3证书 过期

雷达探测项目仿真代码(Matlab代码实现)_探地雷达成像matlab-程序员宅基地

文章浏览阅读2.2k次。雷达探测是电磁威慑的重要组成。国外发达国家正从体系、平台、频段、架构、硬件、处理等方面开发新一代雷达技术。针对高超声速目标、弹道导弹、无人集群目标、隐身飞机等新型极高速、极隐身、极庞大目标,将以网络为基础,整合各类探测手段,实现对目标的全球预警、全程连续跟踪、全维协同精确打击,形成全域全时全维的体系化探测感知能力。对深空目标、临空目标、隐身目标、集群目标的探测研究新型威胁目标是驱动雷达探测技术前进的原动力之一,而新时期的新型威胁目标呈现“高、低、快、慢、小、隐、群”等特征。_探地雷达成像matlab

kali 安装取证工具volatility_kali安装volatility-程序员宅基地

文章浏览阅读3.1k次,点赞2次,收藏12次。计算机取证 volatility_kali安装volatility

html禁止图片缓存(刷新网站)_html禁止浏览器缓存图片-程序员宅基地

文章浏览阅读2.1k次。地址:https://blog.csdn.net/fareast_mzh/article/details/81464031_html禁止浏览器缓存图片

推荐文章

热门文章

相关标签