一种基于Flink的实时整库入湖方法与流程
未命名
10-18
阅读:99
评论:0
一种基于flink的实时整库入湖方法
技术领域
1.本技术涉及数据处理技术领域,尤其涉及一种基于flink的实时整库入湖方法。
背景技术:
2.apache flink是当前最流行的大数据实时计算框架,flink是一个面向数据流处理和批量数据处理的可分布式的开源计算框架,它基于同一个flink流式执行模型,能够支持流处理和批处理两种应用类型。并且它有很多 connector子项目,支持大部分主流数据库(比如msyql、oracle)、分布式数据流平台(比如kafka)。
3.数据湖是一个集中式存储库,允许用户以任意规模存储所有结构化和非结构化数据。用户可以按原样存储数据(无需先对数据进行结构化处理),并运行不同类型的分析,从控制面板和可视化到大数据处理、实时分析和机器学习,以指导做出更好的决策。
4.apache iceberg是一种先进的数据湖格式化存储技术。其中的iceberg表是一种开放的表格格式,专为巨大的pb(petabyte,数据存储级别的一种)级表格而设计。表格格式的功能是确定用户如何管理、组织和跟踪构成表格的所有文件。用户可以将其视为物理数据文件(用 parquet 或 orc 等编写)以及它们如何构建以形成表格之间的抽象层。
5.cdc(change data capture,数据捕获)是一种能从源数据库捕获到数据和数据结构的技术。
6.现有技术中,通过以上开源技术,来将mysql数据库的表,基于flink框架,通过cdc技术采集到iceberg数据湖中,但是mysql数据库的表的结构在一开始就是确定好的,当其写入iceberg 数据湖时表的结构也是固定的。但是实际上一个系统里,数据库里的表的结构经常会发生变化,数据本身也会发生增删改,但是这些变化无法同步到iceberg 数据湖中。
技术实现要素:
7.为至少在一定程度上克服相关技术中在将数据库中的表和数据采集到数据湖中后,数据湖中无法同步表结构变化和数据变化的问题,本技术提供一种基于flink的实时整库入湖方法。
8.本技术的方案如下:一种基于flink的实时整库入湖方法,包括:步骤s1,通过source 算子对接支持cdc的数据库,获取数据库的实时变更数据;所述实时变更数据包括:数据变更dml(data manipulation language,数据操控语言)语句和表结构变更ddl(data definition language,数据定义语言)语句;步骤s2,对所述实时变更数据按照表名进行keyby,将表名不同的实时变更数据分配到不同的streamddlhandler算子中处理表结构变更ddl语句;所述streamddlhandler算子为处理ddl语句的算子;步骤s3,通过streamddlhandler算子向writer实例发送 writer commit标记,使
writer实例对所述表结构变更ddl语句对应的当前表数据进行commit,并实时缓存表中更新的数据,直至满足预设条件,执行所述表结构变更ddl语句;步骤s4,通过streamddlhandler算子将所述数据变更dml语句和发送给dynamicstreamwriter算子;所述dynamicstreamwriter算子为写数据到分布式文件系统的算子;步骤s5,通过所述dynamicstreamwriter算子将所述数据变更dml语句对应的表数据写入分布式文件系统,根据每条表数据的表名在缓存map中获取或创建 writer实例,在每次checkpoint中将writer实例中的表数据进行complete,并将manifest发送到 committer 中;步骤s6,通过所述dynamicstreamwriter算子将manifest发送到dynamicfilescommitter算子;所述dynamicfilescommitter算子为提交元数据的算子;步骤s7,通过所述dynamicfilescommitter算子对manifest进行缓存,开始新一次的checkpoint,将本次的缓存的所有表的manifest都存入 checkpointsstate;完成本次checkpoint后,将本次checkpoint的每张的 manifest都提交到iceberg表;其中,所述source 算子、streamddlhandler算子、dynamicstreamwriter算子、dynamicfilescommitter算子均为flink中的算子。
9.优选地,所述方法还包括:基于flink中的source 算子对接具有cdc功能的外部系统的kafka,获取外部系统的kafka中的实时变更数据。
10.优选地,所述方法还包括:通过streamddlhandler判断是否存在新建或删除表结构变更ddl语句、表结构变更ddl语句所属的表是否收到数据变更dml语句;若是,则执行表结构变更ddl语句;若否,则执行步骤s3;通过streamddlhandler判断是否存在重复的表结构变更ddl语句;若是,则丢弃重复的表结构变更ddl语句;若否,则执行步骤s3。
11.优选地,所述预设条件为:监控分布式文件系统中上是否由writer实例生成了预设数量的success标记;或,超过了预先配置的缓存时长;所述缓存时长根据checkpoint事件的间隔时长设定。
12.优选地,执行所述表结构变更ddl语句后,所述方法还包括:通知汇总提交结果的算子处理缓存。
13.优选地,所述方法还包括:将所述数据变更dml语句对单表并行度取模以得到hashkey,通过hashkey对所述数据变更dml语句进行hash,将hash后的数据变更dml语句执行步骤s4。
14.优选地,所述source 算子、streamddlhandler算子、dynamicstreamwriter算子、dynamicfilescommitter算子均为多个,各算子并行处理任务;所述方法还包括:建立多个任务槽,将各dynamicstreamwriter算子置于不同的任务槽处理不同的表数据。
15.优选地,所述方法还包括:
对iceberg表的表名进行hash;根据iceberg表的表名hash执行步骤s6。
16.优选地,所述方法还包括:通过所述dynamicfilescommitter算子判断是否初次接收到iceberg表的表名;若是,则初始化缓存map,在当前作业为快照重启时,获取iceberg表的表属性中的最大提交ckpid;若iceberg表的表属性中的最大提交ckpid小于checkpointsstate中保存的最大ckpid,则将checkpointsstate中未提交的ckpid对应的manifest 都提交到iceberg表。
17.优选地,所述方法还包括:若步骤s7执行失败,将失败表发送给单并行度的checkcommitsink 算子;通过所述checkcommitsink算子打印日志,抛出异常,结束作业,等待重启。
18.本技术提供的技术方案可以包括以下有益效果:本技术中的基于flink的实时整库入湖方法,包括:通过flink中的source 算子对接支持cdc的数据库,获取数据库的实时变更数据;实时变更数据包括:数据变更dml语句和表结构变更ddl语句;对实时变更数据按照表名进行keyby,将表名不同的实时变更数据分配到不同的streamddlhandler算子中处理表结构变更ddl语句;streamddlhandler算子为处理ddl语句的算子;通过streamddlhandler算子向writer实例发送 writer commit标记,使writer实例对表结构变更ddl语句对应的当前表数据进行commit,并实时缓存表中更新的数据,直至满足预设条件,执行表结构变更ddl语句;通过streamddlhandler算子将数据变更dml语句和发送给dynamicstreamwriter算子;dynamicstreamwriter算子为写数据到分布式文件系统的算子;通过dynamicstreamwriter算子将数据变更dml语句对应的表数据写入分布式文件系统,根据每条表数据的表名在缓存map中获取或创建 writer实例,在每次checkpoint中将writer实例中的表数据进行complete,并将manifest发送到 committer 中;通过dynamicstreamwriter算子将manifest发送到dynamicfilescommitter算子;dynamicfilescommitter算子为提交元数据的算子;通过dynamicfilescommitter算子对manifest进行缓存,开始新一次的checkpoint,将本次的缓存的所有表的manifest都存入 checkpointsstate;完成本次checkpoint后,将本次checkpoint的每张的 manifest都提交到iceberg表。本技术中的source 算子、streamddlhandler算子、dynamicstreamwriter算子、dynamicfilescommitter算子均为flink中的算子,本技术中的技术方案,在保证flink作业性能的基础上,实现来源数据库整库的数据,实时同步到iceberg数据湖中。
19.应当理解的是,以上的一般描述和后文的细节描述仅是示例性和解释性的,并不能限制本技术。
附图说明
20.此处的附图被并入说明书中并构成本说明书的一部分,示出了符合本技术的实施例,并与说明书一起用于解释本技术的原理。
21.图1是本技术一个实施例提供的一种基于flink的实时整库入湖方法的流程示意图。
具体实施方式
22.这里将详细地对示例性实施例进行说明,其示例表示在附图中。下面的描述涉及附图时,除非另有表示,不同附图中的相同数字表示相同或相似的要素。以下示例性实施例中所描述的实施方式并不代表与本技术相一致的所有实施方式。相反,它们仅是与如所附权利要求书中所详述的、本技术的一些方面相一致的装置和方法的例子。
23.图1是本技术一个实施例提供的一种基于flink的实时整库入湖方法的流程示意图,参照图1,一种基于flink的实时整库入湖方法,包括:步骤s1,通过source算子对接支持cdc的数据库,获取数据库的实时变更数据;实时变更数据包括:数据变更dml语句和表结构变更ddl语句;需要说明的是,flink中具有多个算子,本实施例中应用的source 算子、streamddlhandler算子、dynamicstreamwriter算子、dynamicfilescommitter算子均为flink中的常规算子。本实施例中,source 算子、streamddlhandler算子、dynamicstreamwriter算子、dynamicfilescommitter算子均为多个,各算子并行处理任务。
24.source算子用于对接支持cdc的数据库,获取数据库的实时变更数据;也对接具有cdc功能的外部系统的kafka,获取外部系统的kafka中的实时变更数据。
25.数据变更dml语句属于表里面的数据层面语言,对表里数据的操作,如写入数据,变更数据。
26.表结构变更ddl语句属于表的结构层面的语言,对表结构的操作,如增加字段,修改字段,修改表。
27.步骤s2,对实时变更数据按照表名进行keyby,将表名不同的实时变更数据分配到不同的streamddlhandler算子中处理表结构变更ddl语句;streamddlhandler算子为处理ddl语句的算子;需要说明的是,对实时变更数据按照表名进行keyby,即对表名进行hash,以将实时变更数据按照表名分配到不同的streamddlhandler中进行处理。
28.在具体实践中,不同的表也能在同一个streamddlhandler中处理,通过复用提升资源利用率。
29.步骤s3,通过streamddlhandler算子向writer实例发送writer commit标记,使writer实例对表结构变更ddl语句对应的当前表数据进行commit,并实时缓存表中更新的数据,直至满足预设条件,执行表结构变更ddl语句;需要说明的是,在步骤s3执行前,先通过streamddlhandler判断是否存在新建或删除表结构变更ddl语句、表结构变更ddl语句所属的表是否收到数据变更dml语句;若是,则执行表结构变更ddl语句;若否,则执行步骤s3;通过streamddlhandler判断是否存在重复的表结构变更ddl语句;若是,则丢弃重复的表结构变更ddl语句;若否,则执行步骤s3。
30.需要说明的是,上述逻辑判断过程是为了只执行一次表结构变更ddl语句,防止表重复创建或者重复删除。
31.需要说明的是,步骤s3中的writer实例为预设的相关writer实例。步骤s3中向全部相关的writer实例发送 writer commit标记,使writer实例对表结构变更ddl语句对应的当前表数据进行commit。
32.需要说明的是,本步骤中先执行表结构变更ddl语句是为了在写数据前先进行表结构的维护,例如列级别的删除列,新增列,修改列;表级别的新建表,删除表,修改表操作。
33.需要说明的是,writer实例对表结构变更ddl语句对应的当前表数据进行commit是指将没有写到文件系统的文件写完,然后将元数据同步到元数据中心。
34.需要说明的是,步骤s3中的预设条件为:监控分布式文件系统中上是否由writer实例生成了预设数量的success标记;或,超过了预先配置的缓存时长;缓存时长根据checkpoint事件的间隔时长设定。
35.需要说明的是,本实施例中,在使writer实例对表结构变更ddl语句对应的当前表数据进行commit时,还监控分布式文件系统中上由writer实例生成的success标记数量,同时实时缓存表中更新的数据。
36.在由writer实例生成了足够数量的success标记时,或者超过了预先配置的缓存时长时,执行表结构变更ddl语句。
37.需要说明的是,缓存时长根据checkpoint事件的间隔时长设定。优选地,缓存时长为checkpoint事件的间隔时长的三倍。
38.需要说明的是,checkpoint事件激活以后会触发数据库写进程将数据缓冲中的脏数据块写出到数据文件中。
39.需要说明的是,执行表结构变更ddl语句后,方法还包括:通知汇总提交结果的算子处理缓存。
40.步骤s4,通过streamddlhandler算子将数据变更dml语句和发送给dynamicstreamwriter算子;dynamicstreamwriter算子为写数据到分布式文件系统的算子;需要说明的是,本实施例中将数据变更dml语句对单表并行度取模以得到hashkey,通过hashkey对数据变更dml语句进行hash,将hash后的数据变更dml语句执行步骤s4。
41.需要说明的是,本实施例中将数据变更dml语句,通过主键或其他字段对单表并行度取模以得到hashkey。主键单条数据的唯一标识。对单表并行度取模将单表是指根据并行度每次提交写不同数量的文件。
42.需要说明的是,对单表并行度取模后会产生hashkey,通过主键或其他字段对单表并行度取模以得到hashkey是为了将单表通过不同的算子写入,以加快写入的速度。
43.步骤s5,通过dynamicstreamwriter算子将数据变更dml语句对应的表数据写入分布式文件系统,根据每条表数据的表名在缓存map中获取或创建writer实例,在每次checkpoint中将writer实例中的表数据进行complete,并将manifest发送到 committer中;需要说明的是,本实施例中的complete指的是将每个checkpoint周期内缓存的数据全部写到外部存储系统上,并返回写的统计信息。包括:写了多少个文件,具体的每个文件路径,大小已经每个文件的类型等。这些信息用于更改iceberg表的元数据时使用。
44.需要说明的是,committer是指数据提交者,也即下文中的dynamicfilescommitter算子。
45.需要说明的是,本实施例中建立多个任务槽,将各dynamicstreamwriter算子置于
不同的任务槽处理不同的表数据,以提升处理速度。
46.步骤s6,通过dynamicstreamwriter算子将manifest发送到dynamicfilescommitter算子;dynamicfilescommitter算子为提交元数据的算子;需要说明的是,方法还包括:对iceberg表的表名进行hash;根据iceberg表的表名hash执行步骤s6。
47.需要说明的是iceberg表即iceberg数据湖中的表。
48.需要说明的是,dynamicstreamwriter算子通过iceberg表名hash将manifest发送到dynamicfilescommitter算子,一个表只能由一个dynamicfilescommitter算子进行处理,否则会出现锁抢占问题。
49.需要说明的是,本实施例中的manifest是通过将本次刷新数据到底层存储后得到的文件元数据。
50.步骤s7,通过dynamicfilescommitter算子对manifest进行缓存,开始新一次的checkpoint,将本次的缓存的所有表的manifest都存入checkpointsstate;完成本次checkpoint后,将本次checkpoint的每张的manifest都提交到iceberg表;需要说明的是,dynamicfilescommitter在接收到上游数据(iceberg表名hash和上游dynamicstreamwriter算子的处理结果)后,对manifest进行缓存。
51.需要说明的是,开启新一次的checkpoint快照时,可以用线程池写入每个表的 manifest文件,以提升表的数量过多时的写入性能。
52.需要说明的是,将本次的缓存的所有表的manifest都存入checkpointsstate前,还需要先清空checkpointsstate。
53.当所有算子完成本次checkpoint后,用线程池将本次checkpoint的每张的manifest都提交到iceberg表。
54.需要说明的是,本实施例中还存在提交失败后的容错机制,包括:通过dynamicfilescommitter算子判断是否初次接收到iceberg表的表名;若是,则初始化缓存map,在当前作业为快照重启时,获取iceberg表的表属性中的最大提交ckpid;若iceberg表的表属性中的最大提交ckpid小于checkpointsstate中保存的最大ckpid,则将checkpointsstate中未提交的ckpid对应的manifest 都提交到iceberg表。
55.需要说明的是,本步骤中的快照可以是checkpoint或savepoint。
56.需要说明的是,ckpid是指flink系统的周期性checkpoint时用来对实时流动的数据做状态保存的id,可用于在作业异常时自动从状态中进行恢复运行,而不是从初始状态运行。每次的checkpoint都有一个唯一的id进行标识,并且是单调递增的,流图中各个算子收到checkpoint信息时,都能从信息中获取本次checkpoint的id,便于各个算子进行逻辑的判断。
57.需要说明的是,方法还包括:若步骤s7执行失败,将失败表发送给单并行度的checkcommitsink 算子;通过checkcommitsink算子打印日志,抛出异常,结束作业,等待重启。
58.需要说明的是,若步骤s7执行成功,则本次checkpoint入湖流程成功结束。
59.可以理解的是,上述各实施例中相同或相似部分可以相互参考,在一些实施例中
未详细说明的内容可以参见其他实施例中相同或相似的内容。
60.需要说明的是,在本技术的描述中,术语“第一”、“第二”等仅用于描述目的,而不能理解为指示或暗示相对重要性。此外,在本技术的描述中,除非另有说明,“多个”的含义是指至少两个。
61.流程图中或在此以其他方式描述的任何过程或方法描述可以被理解为,表示包括一个或更多个用于实现特定逻辑功能或过程的步骤的可执行指令的代码的模块、片段或部分,并且本技术的优选实施方式的范围包括另外的实现,其中可以不按所示出或讨论的顺序,包括根据所涉及的功能按基本同时的方式或按相反的顺序,来执行功能,这应被本技术的实施例所属技术领域的技术人员所理解。
62.应当理解,本技术的各部分可以用硬件、软件、固件或它们的组合来实现。在上述实施方式中,多个步骤或方法可以用存储在存储器中且由合适的指令执行系统执行的软件或固件来实现。例如,如果用硬件来实现,和在另一实施方式中一样,可用本领域公知的下列技术中的任一项或他们的组合来实现:具有用于对数据信号实现逻辑功能的逻辑门电路的离散逻辑电路,具有合适的组合逻辑门电路的专用集成电路,可编程门阵列(pga),现场可编程门阵列(fpga)等。
63.本技术领域的普通技术人员可以理解实现上述实施例方法携带的全部或部分步骤是可以通过程序来指令相关的硬件完成,所述的程序可以存储于一种计算机可读存储介质中,该程序在执行时,包括方法实施例的步骤之一或其组合。
64.此外,在本技术各个实施例中的各功能单元可以集成在一个处理模块中,也可以是各个单元单独物理存在,也可以两个或两个以上单元集成在一个模块中。上述集成的模块既可以采用硬件的形式实现,也可以采用软件功能模块的形式实现。所述集成的模块如果以软件功能模块的形式实现并作为独立的产品销售或使用时,也可以存储在一个计算机可读取存储介质中。
65.上述提到的存储介质可以是只读存储器,磁盘或光盘等。
66.在本说明书的描述中,参考术语“一个实施例”、“一些实施例”、“示例”、“具体示例”、或“一些示例”等的描述意指结合该实施例或示例描述的具体特征、结构、材料或者特点包含于本技术的至少一个实施例或示例中。在本说明书中,对上述术语的示意性表述不一定指的是相同的实施例或示例。而且,描述的具体特征、结构、材料或者特点可以在任何的一个或多个实施例或示例中以合适的方式结合。
67.尽管上面已经示出和描述了本技术的实施例,可以理解的是,上述实施例是示例性的,不能理解为对本技术的限制,本领域的普通技术人员在本技术的范围内可以对上述实施例进行变化、修改、替换和变型。
技术特征:
1.一种基于flink的实时整库入湖方法,其特征在于,包括:步骤s1,通过source算子对接支持cdc的数据库,获取数据库的实时变更数据;所述实时变更数据包括:数据变更dml语句和表结构变更ddl语句;步骤s2,对所述实时变更数据按照表名进行keyby,将表名不同的实时变更数据分配到不同的streamddlhandler算子中处理表结构变更ddl语句;所述streamddlhandler算子为处理ddl语句的算子;步骤s3,通过streamddlhandler算子向writer实例发送 writer commit标记,使writer实例对所述表结构变更ddl语句对应的当前表数据进行commit,并实时缓存表中更新的数据,直至满足预设条件,执行所述表结构变更ddl语句;步骤s4,通过streamddlhandler算子将所述数据变更dml语句和发送给dynamicstreamwriter算子;所述dynamicstreamwriter算子为写数据到分布式文件系统的算子;步骤s5,通过所述dynamicstreamwriter算子将所述数据变更dml语句对应的表数据写入分布式文件系统,根据每条表数据的表名在缓存map中获取或创建 writer实例,在每次checkpoint中将writer实例中的表数据进行complete,并将manifest发送到 committer 中;步骤s6,通过所述dynamicstreamwriter算子将manifest发送到dynamicfilescommitter算子;所述dynamicfilescommitter算子为提交元数据的算子;步骤s7,通过所述dynamicfilescommitter算子对manifest进行缓存,开始新一次的checkpoint,将本次的缓存的所有表的manifest都存入 checkpointsstate;完成本次checkpoint后,将本次checkpoint的每张的 manifest都提交到iceberg表;其中,所述source 算子、streamddlhandler算子、dynamicstreamwriter算子、dynamicfilescommitter算子均为flink中的算子。2.根据权利要求1所述的方法,其特征在于,所述方法还包括:基于flink中的source 算子对接具有cdc功能的外部系统的kafka,获取外部系统的kafka中的实时变更数据。3.根据权利要求1所述的方法,其特征在于,所述方法还包括:通过streamddlhandler判断是否存在新建或删除表结构变更ddl语句、表结构变更ddl语句所属的表是否收到数据变更dml语句;若是,则执行表结构变更ddl语句;若否,则执行步骤s3;通过streamddlhandler判断是否存在重复的表结构变更ddl语句;若是,则丢弃重复的表结构变更ddl语句;若否,则执行步骤s3。4.根据权利要求1所述的方法,其特征在于,所述预设条件为:监控分布式文件系统中上是否由writer实例生成了预设数量的success标记;或,超过了预先配置的缓存时长;所述缓存时长根据checkpoint事件的间隔时长设定。5.根据权利要求1所述的方法,其特征在于,执行所述表结构变更ddl语句后,所述方法还包括:通知汇总提交结果的算子处理缓存。6.根据权利要求1所述的方法,其特征在于,所述方法还包括:
将所述数据变更dml语句对单表并行度取模以得到hashkey,通过hashkey对所述数据变更dml语句进行hash,将hash后的数据变更dml语句执行步骤s4。7.根据权利要求1所述的方法,其特征在于,所述source 算子、streamddlhandler算子、dynamicstreamwriter算子、dynamicfilescommitter算子均为多个,各算子并行处理任务;所述方法还包括:建立多个任务槽,将各dynamicstreamwriter算子置于不同的任务槽处理不同的表数据。8.根据权利要求1所述的方法,其特征在于,所述方法还包括:对iceberg表的表名进行hash;根据iceberg表的表名hash执行步骤s6。9.根据权利要求8所述的方法,其特征在于,所述方法还包括:通过所述dynamicfilescommitter算子判断是否初次接收到iceberg表的表名;若是,则初始化缓存map,在当前作业为快照重启时,获取iceberg表的表属性中的最大提交ckpid;若iceberg表的表属性中的最大提交ckpid小于checkpointsstate中保存的最大ckpid,则将checkpointsstate中未提交的ckpid对应的manifest 都提交到iceberg表。10.根据权利要求1所述的方法,其特征在于,所述方法还包括:若步骤s7执行失败,将失败表发送给单并行度的checkcommitsink 算子;通过所述checkcommitsink算子打印日志,抛出异常,结束作业,等待重启。
技术总结
本申请涉及数据处理技术领域,尤其涉及一种基于Flink的实时整库入湖方法,通过Flink中的多种算子进行配合,对接支持CDC的数据库,获取数据库的实时变更数据,以分布式文件系统为中转,通过checkpoint,将实时变更数据中的数据变更DML语句和表结构变更DDL语句同步到Iceberg表中。本申请中的技术方案,在保证Flink作业性能的基础上,实现来源数据库整库的数据,实时同步到Iceberg数据湖中。实时同步到Iceberg数据湖中。实时同步到Iceberg数据湖中。
技术研发人员:张赵中 唐金鑫 吴小前
受保护的技术使用者:北京滴普科技有限公司
技术研发日:2023.09.05
技术公布日:2023/10/15
版权声明
本文仅代表作者观点,不代表航空之家立场。
本文系作者授权航家号发表,未经原创作者书面授权,任何单位或个人不得引用、复制、转载、摘编、链接或以其他任何方式复制发表。任何单位或个人在获得书面授权使用航空之家内容时,须注明作者及来源 “航空之家”。如非法使用航空之家的部分或全部内容的,航空之家将依法追究其法律责任。(航空之家官方QQ:2926969996)
飞行汽车 https://www.autovtol.com/
