Flink StreamingFileSink 文件到hdfs 文件一直处于inprogress状态无法生成正式文件_hdfs 文件 出现包含inprogress-程序员宅基地

技术标签: flink  hdfs  big data  大数据合集  

一、问题描述:

 任务逻辑是通过实时读取Kafka数据,一分钟计算一次数据,并利用Flink StreamingFileSink将数据落地到HDFS文件中。为了应对大促剧增的数据量,对当前运行稳定的集群进行了扩容处理,任务重启后发现写入的hdfs文件一直处于inprogress状态无法滚动生成正式文件。

在这里插入图片描述
任务运行一段时间可能会出现如下错误:
在这里插入图片描述

二、解决过程:

  1. 开始是猜想可能是并行度过多,导致产生大量临时文件,文件句柄太多,关闭耗时过久导致文件一分钟内一直无法完成合并?将并行度调整到1,发现问题并没有解决。
  2. 又猜想是因为调整了checkpoint参数,禁用掉checkpoint失败后触发Flink任务重启策略导致checkpoint受影响无法完成临时文件合并?将代码回退,发现问题还没有解决。
  3. 通过观察文件目录,发现临时文件名前缀,跟已合并文件名的前缀一致,猜测可能是文件已存在,临时文件无法完成合并为同名文件?备份并清理掉历史数据,发现文件正常生成, 问题解决!!。

此解决方法为问题发生之后临时急救方案,主要适用于当前任务不依赖历史数据,数据可以清理的任务。剖析其深度原因之后可从根本上避免此类问题。详细请继续阅读下列原因深度剖析。

三、原因深度剖析:

3.1、StreamingFileSink原理简介
  • StreamingFileSink是一个Flink连接器,用来将分区文件写入到支持 Flink FileSystem 接口的文件系统中。StreamingFileSink 会将数据写入到桶中。由于输入流可能是无界的,因此每个桶中的数据被划分为多个有限大小的文件。如何分桶是可以配置的,默认使用基于时间的分桶策略,这种策略每个小时创建一个新的桶,桶中包含的文件将记录所有该小时内从流中接收到的数据。桶目录中的实际输出数据会被划分为多个部分文件(part file),每一个接收桶数据的 Sink Subtask ,至少包含文件(part file)的一部分。多余的文件(part file)将根据滚动策略重新创建,滚动策略是可以配置的。默认的策略是根据文件大小和超时时间来滚动文件。超时时间指打开文件的最长持续时间,以及文件关闭前的最长非活动时间。本描述来自官网翻译,翻译不准确的地方还请各位多多批评指正!

提示:使用 StreamingFileSink 时需要启用 Checkpoint ,每次做 Checkpoint 时写入完成后,桶中临时文件转成正式文件。如果 Checkpoint 被禁用,部分文件(part file)将永远处于 ‘in-progress’ 或 ‘pending’ 状态,下游系统无法安全地读取。
在这里插入图片描述

 本图为Flink 官网(官网地址:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/streamfile_sink.html)对于StreamingFileSink的图示,可以很形象的描述其落地原理。
 为了在下游系统中使用 StreamingFileSink 的输出,我们需要了解输出文件的命名规则和生命周期。由上图可知,文件(part file)可以处于以下三种状态之一:

1).In-progress :

当前文件正在写入中

2).Pending :

当处于 In-progress 状态的文件关闭(closed)了,就变为 Pending 状态

3).Finished :

在成功的 Checkpoint 后,Pending 状态将变为 Finished 状态,处于 Finished 状态的文件不会再被修改,可以被下游系统安全地读取。

 Flink目前对于Hdfs-Sink 有两种实现方式,即BucketingSink以及StreamingFileSink。StreamingFileSink是在BucketingSink之后推出的。主要区别在于StreamingFileSink可以用于故障恢复,保证exactly-once,但是要求hadoop版本必须在2.7以上,因为用到了hdfs的truncate方法。BucketingSink相对用法比较简单,并且没有版本要求。StreamingFileSink的exactly-once主要基于Flink checkpoint提供的hook来实现的两阶段提交模式来保证的,这也是为什么官网提示使用时一定要打开checkpoint开关的原因。上述描述的桶物理上对应一个文件夹、subtask表示Flink同一任务的不同子任务,换言之,就是不同并行度。数据流中读到一个元素,根据项目的BucketAssigner可以计算出该元素属于哪个分区,通过状态管理器可以获取到该分区下目前最大的正在写的文件编号是多少?然后写到对应的文件中。官网图比较抽象,根据源码及对官网描述的理解,本人画了一张更加详细的StreamingFileSink示意图如下:
在这里插入图片描述

3.2、 回到问题,part-0-561.xxx,文件名中数字来自哪里?为什么每个文件生成之后其编号是累加的?为什么会重启之后数字又会重新开始编号? 文件中数字来自哪里?
  • 文件中数字来自哪里?为什么每个文件生成之后其编号是累加的?
     上述描述桶物理映射就是一个文件夹名称,源码对桶的解释为:桶就是StreamingFileSink输出的目录组织。对于StreamingFileSink中的每个传入元素,通过用户指定的bucketsassigner,决定该元素应写入哪个bucket。每个新生成的文件名都是由前缀,Flink子任务的数量,文件编号,后缀组成。这就是文件中数字的由来。源码显示,由名为partCounter的一个变量+1得到的结果就是文件中的数字。
    在这里插入图片描述在这里插入图片描述

  • 为什么重启之后数字又会重新开始编号?

     由于StreamingFileSink基于checkpoint实现Exactly-Once,那么其必须实现Flink中关于checkpoint的两个接口CheckpointedFunction(快照拍摄功能接口), CheckpointListener (快照拍摄状态监听接口)源码如下:
    在这里插入图片描述
     既然实现了这两个接口,那么就需要关注快照初始化时做了什么操作?快照开始拍摄时做了什么?快照拍摄完成之后做了什么?从源码的下列三个方法作为入口剖析:
    在这里插入图片描述
     快照初始化时,根据子任务的数量创建了若干个桶管理器,并初始化了桶管理器的一些属性(详细属性可以见源码org.apache.flink.streaming.api.functions.sink.filesystem.Buckets),其中有个属性maxPartCounter,用来记录当前正在写的,或者最新写完文件的编号,快照初始化时将其值初值为0并保存在一个ListState中,每个桶对应一个maxPartCounter。通过源码追踪发现该值最终传入bucket并赋值给partCounter,用于默认文件命名。源码跳转路径为:

org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink#initializeState→

org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper#StreamingFileSinkHelperorg.apache.flink.streaming.api.functions.sink.filesystem.Buckets#initializeState→

org.apache.flink.streaming.api.functions.sink.filesystem.Buckets#initializeActiveBuckets→

org.apache.flink.streaming.api.functions.sink.filesystem.BucketFactory#restoreBucket→

org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl#restoreBucket→

org.apache.flink.streaming.api.functions.sink.filesystem.Bucket#restore→

org.apache.flink.streaming.api.functions.sink.filesystem.Bucket#Bucket(int, BucketID, org.apache.flink.core.fs.Path, long, org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter<IN,BucketID>, org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy<IN,BucketID>, org.apache.flink.streaming.api.functions.sink.filesystem.FileLifeCycleListener<BucketID>, org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig)

 this.partCounter = initialPartCounter(maxPartCounter);

 初始化做了什么?分析源码发现,初始化时,创建了一个桶并初始化了桶的一些属性,其中桶编号的获取用到属性maxPartCounter,见名知意,其作用是用来记录当前part的最大编号,其值初始化为0。而桶编号就是每个临时文件名的前缀。

在这里插入图片描述在这里插入图片描述
 数据流中每读到一个元素,通过用户设置的分桶策略找到该元素对应的桶,并将其写入文件,并且跟上次的partcount对比,获取当前的最大值并将最大值进行更新。
在这里插入图片描述
 拍摄快照时,先将maxPartCounter的状态清空,然后仅记录当前Checkpoint编号的每个桶的maxPartCounter值,当前checkpoint成功,那么每个桶的最新文件编号及被记录在当前的装填中供下次获取。
在这里插入图片描述 快照拍摄完成之后,会将临时文件合并为Finished状态的文件,其中bucketWriter就跟文件生成相关,其文件名就是涉及到上述描述的文件编号。

在这里插入图片描述

  • 为什么启动一段时间后可能会报错?
     报错的直接原因是因为任务手动停止任务时,没有手动保存快照即savepoint,任务可能停止在两个checkpoint时间段之间,下次任务重启之后,任务从最后一个成功的checkpoint点继续执行任务,此时如果继续写文件会发现文件已被合并,就会报 File does not exist 错误信息,原因如下图所示:
     savepoint与checkpoint的工作原理一致,只不过检查点是自动触发的,而savepoint需要命令行触发或者web控制台触发。和checkpoint一样,savepoint也保存到稳定存储当中,用户可以从savepoint重启作业,而不用从头开始。checkpoint的侧重点是“容错”,即Flink作业意外失败并重启之后,能够直接从早先打下的checkpoint恢复运行,且不影响作业逻辑的准确性。而savepoint的侧重点是“维护”,即Flink作业需要在人工干预下手动重启、升级、迁移或A/B测试时,先将状态整体写入可靠存储,维护完毕之后再从savepoint恢复现场.
    在这里插入图片描述

四、解决方案:

 在维护任务,手动停止任务时,一定要保存快照。扩容及代码维护之后,要指定快照重启任务就可以从根本上避免该问题的产生。

五、总结

 问题的根本原因:在手动停止任务时,StreamingFileSink依赖Checkpoint状态来记录当前checkpoint id对应最新生成文件的编号,下一个checkpoint id有新数据读取到时,会根据上一次状态记录的文件最大编号的值累加得到新元素对应文件的文件名,在停止任务时,没有保存快照,导致最后一次chekcpoint成功生成的文件编号没有被记录而丢失,下次任务重启时不指定快照重启,快照会重新进行初始化,文件名中编号又被初始化为0,临时文件在合并为Finished状态时,发现同一目录下已存在同样的文件,而无法进行覆盖导致文件一直处于正在写入状态。所以,当把本目录下历史数据清除掉之后,所有写入的文件重新从0开始编号,能正常完成文件的写入。
教训:在后续Flink任务中,如果涉及到有状态记录,chekcpoint等操作,在停止任务时一定不能暴力停止,一定要保存快照,平滑执行停止操作,让其状态能安全保存。否则,可能有些累计求值的数据会永久丢失,需要重置Kafka offset才能恢复。
 知其然知其所以然:本文章分析方法同样适用于Flink Kafka sink,Kafka sink保持Exactly-Once原理也是基于两阶段事务提交方式实现的,大家有兴趣可以利用同样的分析方法去阅读Flink FlinkKafkaProducer源码,甚至后续有其他sink操作,需要具备容错机制,也可以参考此处Flink源码去实现。

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/haozhuxuan/article/details/122078203

智能推荐

最强Android入门开发指南,帮你打通Android的任督二脉_android 开发指南-程序员宅基地

文章浏览阅读687次。Android 新手想要入门,很容易会遇到各类困难和学习瓶颈。没有一个好学的学习方向,学习规划,学习教程,这都是新手会面临的问题。 很多人会在百度上搜索,查阅相关资料。但是网上搜索的很多资料,都是断片式的学习,缺乏完整性和系统性。那么新手应该从何学起?这样学习呢?这里给大家一份最强Android入门指南:_android 开发指南

PHP程序运行流程:词法分析(Lexing,Tokenizing,Scanning)_phpscanning-程序员宅基地

文章浏览阅读1k次。在不开启 Opcache 的情况下,PHP解释器在解释PHP脚本的时候,首先会经过词法分析(Lexing),而词法分析的具体实现就是将PHP代码转换成 Tokens,此过程成为 Lexing / Tokenizing / Scanning 。那么 Tokens 是啥样的呢,Lex就是一个词法分析的依据表。 Zend/zend_language_scanner.c会根据Zend/zend_language_scanner.l (Lex文件),来输入的 PHP代码进行词法分析,从而得到一个一个的“词”,PHP_phpscanning

编程语言数值型和字符型数据的概念_数值 字符-程序员宅基地

文章浏览阅读1.7k次。在编程语言中区分变量的数据类型;最简单的是数值型和字符型;以SQL为例;新建一个表如下图;name列是字符型,age列是数值型;保存表名为pp;录入如下图的数据;看这里name列输入的‘123’、'789',这些是字符型的数据;age输入的内容是数值型;显示结果如下;因为age列是数值型,输入的 009 自动变为了 9;写查询语句时字符型数据按语法规则是用引号括起来;如果如下图写也可以运行出结果;是因为sqlserver本身具有一定的智能识别功能;写比较长的SQL语句_数值 字符

Caffe2 Tutorials[0](转)-程序员宅基地

文章浏览阅读558次。Caffe2 Tutorials[0](转)https://github.com/wizardforcel/data-science-notebook/blob/master/dl/more/caffe2-tut.md本系列教程包括9个小节,对应Caffe2官网的前9个教程,第10个教程讲的是在安卓下用SqueezeNet进行物体检测,此处不再翻译。另外由于栏主不关注RNN和LS..._writer.add_scalar [enforce fail at pybind_state.cc:221] ws->hasblob(name). c

java学习笔记day09 final、多态、抽象类、接口_} } class a { public void show() { show2(); } publ-程序员宅基地

文章浏览阅读155次。java学习笔记day09思维导图final 、 多态 、 抽象类 、 接口 (都很重要)一、final二、多态多态中的成员访问特点 【P237】多态的好处 【P239]多态的弊端向上转型、向下转型 【P241】形象案例:孔子装爹多态的问题理解: class 孔子爹 { public int age = 40; public void teach() { System.out.println("讲解JavaSE"); } _} } class a { public void show() { show2(); } public void show2() { s

Qt5通信 QByteArray中文字符 出现乱码 解决方法_qbytearray中文乱码-程序员宅基地

文章浏览阅读2.4k次,点赞3次,收藏9次。在写qt网口通信的过程中,遇到中文就乱码。解决方法如下:1.接收端处理中文乱码代码如下 QByteArray-> QString 中文乱码解决: #include <QTextCodec>QByteArray data= tcpSocket->readAll(); QTextCodec *tc = QTextCodec::codecForName("GBK"); QString str = tc->toUnicode(data);//str如果是中文则是中文字符_qbytearray中文乱码

随便推点

kettle 提交数据量_kettle——入门操作(表输出)详细-程序员宅基地

文章浏览阅读820次。表输出控件如下1)步骤名称,2)数据库连接,前面有过部分解释3)目标模式,数据库中的概念,引用:https://www.cnblogs.com/csniper/p/5509620.html(感谢)4)目标表:数据库中的表,这里有两种方式:(1) 应用数据库中已经存在的表,浏览表选中对应表即可,下图有部分sql功能。ddl可以执行ddl语句。(2) 创建新的表,填写表的名字,点击下面的sql就可以执..._kettle 步骤 提交

Sublime 多行编辑快捷键_submlite 同时操作多行 macos-程序员宅基地

文章浏览阅读4.4k次,点赞2次,收藏2次。鼠标选中多行,按下 widows 下 Ctrl Shift L( Mac下 Command Shift L)即可同时编辑这些行;鼠标选中文本,反复按widows 下CTRL D(Mac下 Command D)即可继续向下同时选中下一个相同的文本进行同时编辑;鼠标选中文本,按下Alt F3(Win)或Ctrl Command G(Mac)即可一次性选择全部的相同文本进行同时编辑;..._submlite 同时操作多行 macos

如何双启动Linux和Windows-程序员宅基地

文章浏览阅读252次。尽管Linux是具有广泛硬件和软件支持的出色操作系统,但现实是有时您必须使用Windows,这可能是由于关键应用程序无法在Linux下运行。 幸运的是,双重引导Windows和Linux非常简单-本文将向您展示如何使用Windows 10和Ubuntu 18.04进行设置。 在开始之前,请确保已备份计算机。 尽管双启动设置过程不是很复杂,但是仍然可能发生事故。 因此,请花点时间备份您的重要..._windows linux双启动

【flink番外篇】1、flink的23种常用算子介绍及详细示例(1)- map、flatmap和filter_flink 常用的分类和计算-程序员宅基地

文章浏览阅读1.6w次,点赞25次,收藏20次。本文主要介绍Flink 的3种常用的operator(map、flatmap和filter)及以具体可运行示例进行说明.将集合中的每个元素变成一个或多个元素,并返回扁平化之后的结果。按照指定的条件对集合中的元素进行过滤,过滤出返回true/符合条件的元素。本文主要介绍Flink 的3种常用的operator及以具体可运行示例进行说明。这是最简单的转换之一,其中输入是一个数据流,输出的也是一个数据流。下文中所有示例都是用该maven依赖,除非有特殊说明的情况。中了解更新系统的内容。中了解更新系统的内容。_flink 常用的分类和计算

(转)30 IMP-00019: row rejected due to ORACLE error 12899-程序员宅基地

文章浏览阅读590次。IMP-00019: row rejected due to ORACLE error 12899IMP-00003: ORACLE error 12899 encounteredORA-12899: value too large for column "CRM"."BK_ECS_ORDER_INFO_00413"."POSTSCRIPT" (actual: 895, maximum..._row rejected due to oracle

降低Nginx代理服务器的磁盘IO使用率,提高转发性能_nginx tcp转发 硬盘io-程序员宅基地

文章浏览阅读918次。目前很多Web的项目在部署的时候会采用Nginx做为前端的反向代理服务器,后端会部署很多业务处理服务器,通常情况下Nginx代理服务器部署的还是比较少,而且其以高效性能著称,几万的并发连接处理速度都不在话下。然而去年的时候,我们的线上系统也采用类似的部署结构,同时由于我们的业务需求,Nginx的部署环境在虚拟机上面,复用了其他虚拟机的整体磁盘,在高IO消耗的场景中,我们发现Nginx的磁盘_nginx tcp转发 硬盘io