技术标签: flink hdfs big data 大数据合集
一、问题描述:
任务逻辑是通过实时读取Kafka数据,一分钟计算一次数据,并利用Flink StreamingFileSink将数据落地到HDFS文件中。为了应对大促剧增的数据量,对当前运行稳定的集群进行了扩容处理,任务重启后发现写入的hdfs文件一直处于inprogress状态无法滚动生成正式文件。
任务运行一段时间可能会出现如下错误:
二、解决过程:
此解决方法为问题发生之后临时急救方案,主要适用于当前任务不依赖历史数据,数据可以清理的任务。剖析其深度原因之后可从根本上避免此类问题。详细请继续阅读下列原因深度剖析。
三、原因深度剖析:
提示:使用 StreamingFileSink 时需要启用 Checkpoint ,每次做 Checkpoint 时写入完成后,桶中临时文件转成正式文件。如果 Checkpoint 被禁用,部分文件(part file)将永远处于 ‘in-progress’ 或 ‘pending’ 状态,下游系统无法安全地读取。
本图为Flink 官网(官网地址:https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/connectors/streamfile_sink.html)对于StreamingFileSink的图示,可以很形象的描述其落地原理。
为了在下游系统中使用 StreamingFileSink 的输出,我们需要了解输出文件的命名规则和生命周期。由上图可知,文件(part file)可以处于以下三种状态之一:
1).In-progress :
当前文件正在写入中
2).Pending :
当处于 In-progress 状态的文件关闭(closed)了,就变为 Pending 状态
3).Finished :
在成功的 Checkpoint 后,Pending 状态将变为 Finished 状态,处于 Finished 状态的文件不会再被修改,可以被下游系统安全地读取。
Flink目前对于Hdfs-Sink 有两种实现方式,即BucketingSink以及StreamingFileSink。StreamingFileSink是在BucketingSink之后推出的。主要区别在于StreamingFileSink可以用于故障恢复,保证exactly-once,但是要求hadoop版本必须在2.7以上,因为用到了hdfs的truncate方法。BucketingSink相对用法比较简单,并且没有版本要求。StreamingFileSink的exactly-once主要基于Flink checkpoint提供的hook来实现的两阶段提交模式来保证的,这也是为什么官网提示使用时一定要打开checkpoint开关的原因。上述描述的桶物理上对应一个文件夹、subtask表示Flink同一任务的不同子任务,换言之,就是不同并行度。数据流中读到一个元素,根据项目的BucketAssigner可以计算出该元素属于哪个分区,通过状态管理器可以获取到该分区下目前最大的正在写的文件编号是多少?然后写到对应的文件中。官网图比较抽象,根据源码及对官网描述的理解,本人画了一张更加详细的StreamingFileSink示意图如下:
文件中数字来自哪里?为什么每个文件生成之后其编号是累加的?
上述描述桶物理映射就是一个文件夹名称,源码对桶的解释为:桶就是StreamingFileSink输出的目录组织。对于StreamingFileSink中的每个传入元素,通过用户指定的bucketsassigner,决定该元素应写入哪个bucket。每个新生成的文件名都是由前缀,Flink子任务的数量,文件编号,后缀组成。这就是文件中数字的由来。源码显示,由名为partCounter的一个变量+1得到的结果就是文件中的数字。
为什么重启之后数字又会重新开始编号?
由于StreamingFileSink基于checkpoint实现Exactly-Once,那么其必须实现Flink中关于checkpoint的两个接口CheckpointedFunction(快照拍摄功能接口), CheckpointListener (快照拍摄状态监听接口)源码如下:
既然实现了这两个接口,那么就需要关注快照初始化时做了什么操作?快照开始拍摄时做了什么?快照拍摄完成之后做了什么?从源码的下列三个方法作为入口剖析:
快照初始化时,根据子任务的数量创建了若干个桶管理器,并初始化了桶管理器的一些属性(详细属性可以见源码org.apache.flink.streaming.api.functions.sink.filesystem.Buckets),其中有个属性maxPartCounter,用来记录当前正在写的,或者最新写完文件的编号,快照初始化时将其值初值为0并保存在一个ListState中,每个桶对应一个maxPartCounter。通过源码追踪发现该值最终传入bucket并赋值给partCounter,用于默认文件命名。源码跳转路径为:
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink#initializeState→
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSinkHelper#StreamingFileSinkHelper→
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets#initializeState→
org.apache.flink.streaming.api.functions.sink.filesystem.Buckets#initializeActiveBuckets→
org.apache.flink.streaming.api.functions.sink.filesystem.BucketFactory#restoreBucket→
org.apache.flink.streaming.api.functions.sink.filesystem.DefaultBucketFactoryImpl#restoreBucket→
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket#restore→
org.apache.flink.streaming.api.functions.sink.filesystem.Bucket#Bucket(int, BucketID, org.apache.flink.core.fs.Path, long, org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter<IN,BucketID>, org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy<IN,BucketID>, org.apache.flink.streaming.api.functions.sink.filesystem.FileLifeCycleListener<BucketID>, org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig)
this.partCounter = initialPartCounter(maxPartCounter);
初始化做了什么?分析源码发现,初始化时,创建了一个桶并初始化了桶的一些属性,其中桶编号的获取用到属性maxPartCounter,见名知意,其作用是用来记录当前part的最大编号,其值初始化为0。而桶编号就是每个临时文件名的前缀。
数据流中每读到一个元素,通过用户设置的分桶策略找到该元素对应的桶,并将其写入文件,并且跟上次的partcount对比,获取当前的最大值并将最大值进行更新。
拍摄快照时,先将maxPartCounter的状态清空,然后仅记录当前Checkpoint编号的每个桶的maxPartCounter值,当前checkpoint成功,那么每个桶的最新文件编号及被记录在当前的装填中供下次获取。
快照拍摄完成之后,会将临时文件合并为Finished状态的文件,其中bucketWriter就跟文件生成相关,其文件名就是涉及到上述描述的文件编号。
四、解决方案:
在维护任务,手动停止任务时,一定要保存快照。扩容及代码维护之后,要指定快照重启任务就可以从根本上避免该问题的产生。
五、总结
问题的根本原因:在手动停止任务时,StreamingFileSink依赖Checkpoint状态来记录当前checkpoint id对应最新生成文件的编号,下一个checkpoint id有新数据读取到时,会根据上一次状态记录的文件最大编号的值累加得到新元素对应文件的文件名,在停止任务时,没有保存快照,导致最后一次chekcpoint成功生成的文件编号没有被记录而丢失,下次任务重启时不指定快照重启,快照会重新进行初始化,文件名中编号又被初始化为0,临时文件在合并为Finished状态时,发现同一目录下已存在同样的文件,而无法进行覆盖导致文件一直处于正在写入状态。所以,当把本目录下历史数据清除掉之后,所有写入的文件重新从0开始编号,能正常完成文件的写入。
教训:在后续Flink任务中,如果涉及到有状态记录,chekcpoint等操作,在停止任务时一定不能暴力停止,一定要保存快照,平滑执行停止操作,让其状态能安全保存。否则,可能有些累计求值的数据会永久丢失,需要重置Kafka offset才能恢复。
知其然知其所以然:本文章分析方法同样适用于Flink Kafka sink,Kafka sink保持Exactly-Once原理也是基于两阶段事务提交方式实现的,大家有兴趣可以利用同样的分析方法去阅读Flink FlinkKafkaProducer源码,甚至后续有其他sink操作,需要具备容错机制,也可以参考此处Flink源码去实现。
文章浏览阅读70次。【代码】C语言作业(四)
文章浏览阅读9.9k次。highlight.js是一款基于JavaScript的语法高亮库,目前支持125种编程语言,有63种可供选择的样式,而且能够做到语言自动识别,和目前主流的JS框架都能兼容,可以混合使用。这款高亮库可以用在博客系统中,其使用方法及其简单,几乎不需要任何学习成本,下面介绍highlight.js的使用。1.获取highlight.js库,用户可以从官网获取:地址:https://highlightjs_语法高亮js css文件
文章浏览阅读5.1k次。strftimestrftime是C语言标准库中用来格式化输出时间的的函数。下面是strftime的用法各参数意义代码使用示例#include<stdio.h>#include<time.h>#define print(s1, s2,s3) \ printf("%-20s%-30s%s\n",s1, s2,s3);int main(){ time_t rawtime; struct tm* timeinfo; char timE[80]; /
文章浏览阅读147次。传送门 01分数规划板题啊。 发现就是一个最优比率环。 这个直接二分+spfa判负环就行了。 代码:#include<iostream>#include<cstdio>#include<cstring>#include<algorithm>#include<cmath>#define N 1005#define...
文章浏览阅读3.1k次,点赞2次,收藏14次。1)date_format函数(根据格式整理日期) 作用:把一个字符串日期格式化为指定的格式。select date_format('2017-01-01','yyyy-MM-dd HH:mm:ss'); --日期字符串必须满足yyyy-MM-dd格式 结果:2017-01-01 00:00:002)date_add、date_sub函数(加减日期) 作用:把一个字符串日期格式加一天、减一天。select date_add('2019-01-01',1); ..._hive sql 日期函数
文章浏览阅读2.1k次。使用百度语音合成过程时,一直error : notfint libgnustl_shared.so在项目工程gradle文件中添加如下代码段:sourceSets { main { jniLibs.srcDirs = ['libs'] } }..._旧版的百度语言合成报错
文章浏览阅读267次,点赞5次,收藏9次。探索Camera2Demo:一款深入理解Android Camera2 API的开源示例项目项目地址:https://gitcode.com/wangshengyang1996/Camera2Demo项目简介Camera2Demo 是一个由wangshengyang1996开发并维护的Android应用示例,旨在帮助开发者更好地理解和使用Android的Camera2 API。该项目通过提供..._android camera2 demo
文章浏览阅读121次。2019独角兽企业重金招聘Python工程师标准>>> ...
文章浏览阅读10w+次。在做微信如:(退款、企业红包、企业付款)提现这些操作的时候,微信返回(具体哪个字段我忘记了)的信息是:证书过期,那么就需要重新获取证书,证书的获取前提条件:1:你需要有微信商户平台的商户号(类似电话号码的数字),和商户名称(比如公司名称)2:按照官方说明文档进行相应操作即可(按照这个做就行了)http://kf.qq.com/faq/161222NneAJf161222U7fARv.h..._微信 apiv3证书 过期
文章浏览阅读2.2k次。雷达探测是电磁威慑的重要组成。国外发达国家正从体系、平台、频段、架构、硬件、处理等方面开发新一代雷达技术。针对高超声速目标、弹道导弹、无人集群目标、隐身飞机等新型极高速、极隐身、极庞大目标,将以网络为基础,整合各类探测手段,实现对目标的全球预警、全程连续跟踪、全维协同精确打击,形成全域全时全维的体系化探测感知能力。对深空目标、临空目标、隐身目标、集群目标的探测研究新型威胁目标是驱动雷达探测技术前进的原动力之一,而新时期的新型威胁目标呈现“高、低、快、慢、小、隐、群”等特征。_探地雷达成像matlab
文章浏览阅读3.1k次,点赞2次,收藏12次。计算机取证 volatility_kali安装volatility
文章浏览阅读2.1k次。地址:https://blog.csdn.net/fareast_mzh/article/details/81464031_html禁止浏览器缓存图片