用于流式数据做历史增量聚合的方法及相关设备与流程
未命名
08-27
阅读:127
评论:0
1.本公开涉及大数据处理技术领域,尤其涉及一种用于流式数据做历史增量聚合的方法及相关设备。
背景技术:
2.flink作为一个国内关注热度极高的大数据流处理引擎,具有高吞吐和低延迟的核心特性,每秒可处理数百万个事件,做到毫秒级延迟。虽然在大数据实时处理领域flink具有其显著的先天优势,但是如果涉及需要对数据流中的数据做历史增量聚合的场景,就会暴露其局限性。
3.现有技术中,flink通过checkpoint可以存储计算状态,但是使用和维护都很不方便,而且checkpoint的实现需要高度依赖hadoop hdfs文件系统。
4.需要说明的是,在上述背景技术部分公开的信息仅用于加强对本公开的背景的理解,因此可以包括不构成对本领域普通技术人员已知的现有技术的信息。
技术实现要素:
5.本公开提供一种用于流式数据做历史增量聚合的方法及相关设备,至少在一定程度上克服由于相关技术中对数据流中数据做历史增量聚合的使用和维护都不方便的问题。
6.本公开的其他特性和优点将通过下面的详细描述变得显然,或部分地通过本公开的实践而习得。
7.根据本公开的一个方面,提供了一种用于流式数据做历史增量聚合的方法,包括:根据分组条件,确定原始数据流的键值与键值数据流;将所述键值数据流进行开窗处理,确定初步聚合数据流;将所述初步聚合数据流与远程字典服务redis数据库建立连接;根据所述键值在redis数据库中查询;若redis数据库中存在所述键值对应的历史数据,则取出所述历史数据与初步聚合数据流进行累加之后存入redis数据库。
8.在一些实施例中,所述方法还包括:若redis数据库中不存在所述键值对应的数据,则将初步聚合数据流以键值对的形式存入redis基本类型数据中。
9.在一些实施例中,所述根据分组条件,确定原始数据流的键值与键值数据流包括:根据分组的条件,从原始数据流中提取对应字段进行拼接处理作为键值,将原始数据流处理成键值数据流。
10.在一些实施例中,所述将所述键值数据流进行开窗处理,确定初步聚合数据流包括:将所述键值数据流进行开窗处理;将每个窗口的数据进行reduce聚合操作,确定初步聚合数据流。
11.在一些实施例中,所述将所述初步聚合数据流与远程字典服务redis数据库建立连接包括:将所述初步聚合数据流通过flink richmapfunction与redis数据库建立连接。
12.在一些实施例中,在所述根据分组条件,确定原始数据流的键值与键值数据之前,所述方法还包括:获取原始数据,所述原始数据包括读入flink中数据流datastream。
13.在一些实施例中,redis基本类型数据包括redis set类型数据。
14.根据本公开的另一个方面,还提供了一种用于流式数据做历史增量聚合的装置,包括:键值信息确定模块,用于根据分组条件,确定原始数据流的键值与键值数据流;初步聚合数据流确定模块,用于将所述键值数据流进行开窗处理,确定初步聚合数据流;数据库建立连接模块,用于将所述初步聚合数据流与远程字典服务redis数据库建立连接;查询模块,用于根据所述键值在redis数据库中查询;累加存储模块,用于若redis数据库中存在所述键值对应的历史数据,则取出所述历史数据与初步聚合数据流进行累加之后存入redis数据库。
15.根据本公开的另一个方面,还提供了一种电子设备,该电子设备包括:处理器;以及存储器,用于存储所述处理器的可执行指令;其中,所述处理器配置为经由执行所述可执行指令来执行上述任意一项所述的用于流式数据做历史增量聚合的方法。
16.根据本公开的另一个方面,还提供了一种计算机可读存储介质,其上存储有计算机程序,所述计算机程序被处理器执行时实现上述任意一项所述的用于流式数据做历史增量聚合的方法。
17.根据本公开的另一个方面,还提供了一种计算机程序产品,包括计算机程序,所述计算机程序被处理器执行时实现上述任意一项的用于流式数据做历史增量聚合的方法。
18.本公开的实施例中提供的一种用于流式数据做历史增量聚合的方法及相关设备,根据分组条件,确定原始数据流的键值与键值数据流;将键值数据流进行开窗处理,确定初步聚合数据流;将初步聚合数据流与远程字典服务redis数据库建立连接;根据键值在redis数据库中查询;若redis数据库中存在键值对应的历史数据,则取出历史数据与初步聚合数据流进行累加之后存入redis数据库。本公开实施例中,由于通过将flink数据流中每条数据历史聚合结果存redis的方式,能够高效而准确地实现对流式数据做历史增量聚合统计,从而有助于对数据流中数据做历史增量聚合的使用和维护。
19.应当理解的是,以上的一般描述和后文的细节描述仅是示例性和解释性的,并不能限制本公开。
附图说明
20.此处的附图被并入说明书中并构成本说明书的一部分,示出了符合本公开的实施例,并与说明书一起用于解释本公开的原理。显而易见地,下面描述中的附图仅仅是本公开的一些实施例,对于本领域普通技术人员来讲,在不付出创造性劳动的前提下,还可以根据这些附图获得其他的附图。
21.图1示出本公开实施例中一种用于流式数据做历史增量聚合的方法流程图;
22.图2示出本公开实施例中一种用于流式数据做历史增量聚合的方法一具体实例的流程图;
23.图3示出本公开实施例中一种用于流式数据做历史增量聚合的方法又一具体实例的流程图;
24.图4示出本公开实施例中一种用于流式数据做历史增量聚合的方法再一具体实例的流程图;
25.图5示出本公开实施例中一种用于流式数据做历史增量聚合的方法另外一具体实
例的流程图;
26.图6示出本公开实施例中一种flink数据流保存数据历史状态的方式示意图;
27.图7示出本公开实施例中一种用于流式数据做历史增量聚合的装置示意图;
28.图8示出本公开实施例中一种用于流式数据做历史增量聚合的计算机设备的结构框图;
29.图9示出本公开实施例中一种用于流式数据做历史增量聚合的计算机可读存储介质的示意图。
具体实施方式
30.现在将参考附图更全面地描述示例实施方式。然而,示例实施方式能够以多种形式实施,且不应被理解为限于在此阐述的范例;相反,提供这些实施方式使得本公开将更加全面和完整,并将示例实施方式的构思全面地传达给本领域的技术人员。所描述的特征、结构或特性可以以任何合适的方式结合在一个或更多实施方式中。
31.此外,附图仅为本公开的示意性图解,并非一定是按比例绘制。图中相同的附图标记表示相同或类似的部分,因而将省略对它们的重复描述。附图中所示的一些方框图是功能实体,不一定必须与物理或逻辑上独立的实体相对应。可以采用软件形式来实现这些功能实体,或在一个或多个硬件模块或集成电路中实现这些功能实体,或在不同网络和/或处理器装置和/或微控制器装置中实现这些功能实体。
32.下面结合附图,对本公开实施例的具体实施方式进行详细说明。
33.图1示出本公开实施例中一种用于流式数据做历史增量聚合的方法流程图,如图1所示,本公开实施例中提供的用于流式数据做历史增量聚合的方法包括如下步骤:
34.s102,根据分组条件,确定原始数据流的键值与键值数据流。
35.需要说明的是,上述分组条件可以是以分组字段为key。上述键值可以是key。
36.例如,将flink中原始数据流datastream(相当于上述原始数据流)处理成以分组字段为key(键值)的keyedstream(相当于上述键值数据流)。
37.s104,将键值数据流进行开窗处理,确定初步聚合数据流。
38.需要说明的是,上述开窗处理可以是以给定的空间范围(窗口)进行空间数据的提取,包括提取窗口内的空间实体及其属性。
39.例如,利用flink窗口函数对keyedstream进行开窗,并通过flink归约函数进行分组统计,将同一批数据流中多条相同的数据根据需求聚合成一条,从而完成同批次数据分组聚合,形成一个新的数据流datastreaml(相当于上述初步聚合数据流)。
40.s106,将初步聚合数据流与远程字典服务redis数据库建立连接。
41.需要说明的是,上述redis(remote dictionary server),即远程字典服务,是一个开源的使用ansi c语言编写、支持网络、可基于内存亦可持久化的日志型、key-value数据库,并提供多种语言的api(application programming interface,应用程序接口)。
42.例如,将上面处理好的datastream1通过flink richmapfunction与redis数据库建立连接。
43.本公开使用redis作为中间数据库,查询速度快,支持持久化,支持多数据结构存储,而且读写速度极高,基本可以做到数据的实时更新。后期对数据的维护也简单。
44.s108,根据键值在redis数据库中查询。
45.例如,通过上面指定的key进行查询。
46.s110,若redis数据库中存在键值对应的历史数据,则取出历史数据与初步聚合数据流进行累加之后存入redis数据库。
47.例如,如果redis中存在对应的数据,则取出并与当前批次数据处理结果进行累加之后存入redis。
48.本公开可以充分发挥flink高吞吐和低延迟的核心特性,又可以借助redis极高的读写性能缓存每一次flink数据流的聚合结果,从而可以高效而准确地实现对流式数据做历史增量聚合统计的需求,弥补了flink作为流式处理框架在做历史数据增量聚合统计方面的短板。通过将flink数据流中每条数据历史聚合结果存redis的方式,能够高效而准确地实现对流式数据做历史增量聚合统计,从而有助于对数据流中数据做历史增量聚合的使用和维护。
49.在本公开的一个实施例中,如图2所示,本公开实施例中提供的用于流式数据做历史增量聚合的方法可以通过如下步骤来确定直接存入redis中的数据,能够快速统计存储无需进行增量的数据:
50.s202,若redis数据库中不存在键值对应的数据,则将初步聚合数据流以键值对的形式存入redis基本类型数据中。
51.例如,如果不存在,则直接将当前批次处理的结果以(key,value)的形式存入redis set类型数据中。
52.在本公开的一个实施例中,如图3所示,本公开实施例中提供的用于流式数据做历史增量聚合的方法可以通过如下步骤来确定原始数据流的键值与键值数据流包括,能够快速统计区分:
53.s302,根据分组的条件,从原始数据流中提取对应字段进行拼接处理作为键值,将原始数据流处理成键值数据流。
54.例如,将flink原始数据流根据分组条件处理成依分组字段为key的键值数据流。
55.首先,将读入flink datastream中的数据进行处理,根据分组的条件,从原始数据中提取对应字段进行拼接处理作为key,将原始数据流处理成keyedstream。
56.在一个具体的实例中,例如在当前项目中需要根据pool_id和host_id定位一台主机,然后通过分析本主机运行产生的日志实时统计出每个主机历史产生告警日志的次数,那么就可以将pool_id和host_id进行拼接作为key。
57.在本公开的一个实施例中,如图4所示,本公开实施例中提供的用于流式数据做历史增量聚合的方法可以通过如下步骤来确定初步聚合数据流,能够充分发挥flink高吞吐和低延迟的核心特性:
58.s402,将键值数据流进行开窗处理;
59.s404,将每个窗口的数据进行reduce聚合操作,确定初步聚合数据流。
60.例如,对处理好的keyedstream开窗做当前批次聚合。
61.在一个具体的实例中,对处理好的keyedstream进行开窗,并将每个窗口的数据先做一次reduce聚合处理,完成当前批次数据的聚合和去重,形成一个经过初步聚合的新的数据流datastream1。
62.在本公开的一个实例中,将初步聚合数据流与远程字典服务redis数据库建立连接包括:将初步聚合数据流通过flink richmapfunction与redis数据库建立连接。
63.例如,将datastream1通过flink richmapfunction与redis数据库建立连接。
64.在本公开的一个实例中,在根据分组条件,确定原始数据流的键值与键值数据之前,方法还包括:获取原始数据,原始数据包括读入flink中数据流datastream。
65.例如,首先将读入flink datastream中的数据。
66.在本公开的一个实例中,redis基本类型数据包括redis set类型数据。
67.例如,将当前批次处理的结果以(key,value)的形式存入redis set类型数据中。
68.本公开将flink中原始数据流datastream处理成以分组字段为key的keyedstream,然后利用flink窗口函数对keyedstream进行开窗,并通过flink归约函数进行分组统计,将同一批数据流中多条相同的数据根据需求聚合成一条,从而完成同批次数据分组聚合,形成一个新的数据流datastream1。
69.本公开通过将flink数据流中每条数据历史聚合结果存redis的方式,代替存flink checkpoint。
70.本公开提出一种flink与redis数据库配合使用对flink数据流中数据做历史增量聚合统计的方法。
71.本公开即可以充分发挥flink高吞吐和低延迟的核心特性,又可以借助redis极高的读写性能缓存每一次flink数据流的聚合结果,从而可以高效而准确地实现对流式数据做历史增量聚合统计的需求,弥补了flink作为流式处理框架在做历史数据增量聚合统计方面的短板。
72.图5示出本公开实施例中一种用于流式数据做历史增量聚合的方法流程图,如图5所示,本公开实施例中提供的用于流式数据做历史增量聚合的方法包括如下步骤:
73.s502:将flink原始数据流根据分组条件处理成依分组字段为key的键值数据流。
74.例如,首先,将读入flink datastream中的数据进行处理,根据分组的条件,从原始数据中提取对应字段进行拼接处理作为key,将原始数据流处理成keyedstream。
75.比如在当前项目中,需要根据pool_id和host_id定位一台主机,然后通过分析本主机运行产生的日志,实时统计出每个主机历史产生告警日志的次数,最后,将pool_id和host_id进行拼接作为key。
76.s504:对s502处理好的keyedstream开窗做当前批次聚合。
77.例如,对s502处理好的keyedstream进行开窗,并将每个窗口的数据先做一次reduce聚合处理,完成当前批次数据的聚合和去重,形成一个经过初步聚合的新的数据流datastream1。
78.s506:对s504处理好的数据流datastream1连redis进行处理。
79.例如,将上述处理好的datastream1通过flink richmapfunction与redis数据库建立连接,然后通过上述指定的key进行查询:
80.如果redis中存在对应的数据,则取出与与当前批次数据处理结果进行累加之后存入redis;
81.如果不存在,则直接将当前批次处理的结果以(key,value)的形式存入redis set类型数据中。
82.本公开使用redis作为中间数据库,查询速度快,支持持久化,支持多数据结构存储,而且读写速度极高,基本可以做到数据的实时更新,后期对数据的维护也简单。
83.本公开需要存flink checkpoint的数据完全可以通过存redis代替,而且通过利用redis的优势,可以做到存取实时性更高,维护也更加方便。
84.本公开无需额外搭建hdfs(hadoop分布式文件系统,hadoop distributed file system)系统,在简化系统复杂度的同时,也更加有利于数据共享使用。
85.本公开简化flink程序处理流程,最大程度发挥flink在数据处理方面的优势,从而提高flink代码处理性能。
86.通过本公开提出的方法完美解决了安全能力管理平台项目中需要在flink数据流中对数据做历史增量聚合统计的需求。
87.本公开适用于采用flink框架做数据处理,并需要在处理过程中对数据做分组历史增量统计的场景。
88.本公开提出的通过flink与redis数据库配合使用对flink数据流中数据做历史增量聚合统计的方法,打破了一直以来的通过flink checkpoint存储历史状态的思想常规。
89.图6示出本公开实施例中一种flink数据流保存数据历史状态的方式示意图,如图6所示,本公开实施例中提供的flink数据流保存数据历史状态的方式包括如下步骤:
90.s601:查询。
91.例如,通过map对redis进行查询,其中,所述map可以是map接口提供的集合又称为查找表。
92.s602:如果存在,返回历史数据结果。
93.例如,如果存在,map返回历史数据结果到redis。
94.s603:将当前批次数据加上历史存入redis。
95.例如,redis将当前批次数据加上历史进行保存。
96.s604:如果不存在直接将当前批次结果存入redis。
97.基于同一发明构思,本公开实施例中还提供了一种用于流式数据做历史增量聚合的装置,如下面的实施例所述。由于该装置实施例解决问题的原理与上述方法实施例相似,因此该装置实施例的实施可以参见上述方法实施例的实施,重复之处不再赘述。
98.图7示出本公开实施例中一种用于流式数据做历史增量聚合的装置示意图,如图7所示,该装置包括:键值信息确定模块71,初步聚合数据流确定模块72,数据库建立连接模块73,查询模块74,累加存储模块75,存储模块76和原始数据获取模块77。
99.键值信息确定模块71,用于根据分组条件,确定原始数据流的键值与键值数据流;
100.初步聚合数据流确定模块72,用于将键值数据流进行开窗处理,确定初步聚合数据流;
101.数据库建立连接模块73,用于将初步聚合数据流与远程字典服务redis数据库建立连接;
102.查询模块74,用于根据键值在redis数据库中查询;
103.累加存储模块75,用于若redis数据库中存在键值对应的历史数据,则取出历史数据与初步聚合数据流进行累加之后存入redis数据库。
104.在本公开的一个实施例中,上述用于流式数据做历史增量聚合的装置还包括存储
模块76,用于:
105.若redis数据库中不存在键值对应的数据,则将初步聚合数据流以键值对的形式存入redis基本类型数据中。
106.在本公开的一个实施例中,上述键值信息确定模块71还用于:
107.根据分组的条件,从原始数据流中提取对应字段进行拼接处理作为键值,将原始数据流处理成键值数据流。
108.在本公开的一个实施例中,上述初步聚合数据流确定模块72还用于:
109.将键值数据流进行开窗处理;
110.将每个窗口的数据进行reduce聚合操作,确定初步聚合数据流。
111.在本公开的一个实施例中,上述数据库建立连接模块73还用于:
112.将初步聚合数据流通过flink richmapfunction与redis数据库建立连接。
113.在本公开的一个实施例中,上述用于流式数据做历史增量聚合的装置还包括原始数据获取模块77,用于:
114.获取原始数据,原始数据包括读入flink中数据流datastream。
115.在本公开的一个实施例中,redis基本类型数据包括redis set类型数据。
116.此处需要说明的是,上述键值信息确定模块71,初步聚合数据流确定模块72,数据库建立连接模块73,查询模块74和累加存储模块75对应于方法实施例中的s102~s110,上述模块与对应的步骤所实现的示例和应用场景相同,但不限于上述方法实施例所公开的内容。需要说明的是,上述模块作为装置的一部分可以在诸如一组计算机可执行指令的计算机系统中执行。
117.所属技术领域的技术人员能够理解,本公开的各个方面可以实现为系统、方法或程序产品。因此,本公开的各个方面可以具体实现为以下形式,即:完全的硬件实施方式、完全的软件实施方式(包括固件、微代码等),或硬件和软件方面结合的实施方式,这里可以统称为“电路”、“模块”或“系统”。
118.下面参照图8来描述根据本公开的这种实施方式的电子设备800。图8显示的电子设备800仅仅是一个示例,不应对本公开实施例的功能和使用范围带来任何限制。
119.如图8所示,电子设备800以通用计算设备的形式表现。电子设备800的组件可以包括但不限于:上述至少一个处理单元810、上述至少一个存储单元820、连接不同系统组件(包括存储单元820和处理单元810)的总线830。
120.其中,所述存储单元存储有程序代码,所述程序代码可以被所述处理单元810执行,使得所述处理单元810执行本说明书上述“示例性方法”部分中描述的根据本公开各种示例性实施方式的步骤。
121.例如,所述处理单元810可以执行上述方法实施例的如下步骤:
122.根据分组条件,确定原始数据流的键值与键值数据流;
123.将键值数据流进行开窗处理,确定初步聚合数据流;
124.将初步聚合数据流与远程字典服务redis数据库建立连接;
125.根据键值在redis数据库中查询;
126.若redis数据库中存在键值对应的历史数据,则取出历史数据与初步聚合数据流进行累加之后存入redis数据库。
127.例如,所述处理单元810可以执行上述方法实施例的如下步骤:
128.若redis数据库中不存在键值对应的数据,则将初步聚合数据流以键值对的形式存入redis基本类型数据中。
129.例如,所述处理单元810可以执行上述方法实施例的如下步骤:
130.根据分组的条件,从原始数据流中提取对应字段进行拼接处理作为键值,将原始数据流处理成键值数据流。
131.例如,所述处理单元810可以执行上述方法实施例的如下步骤:
132.将键值数据流进行开窗处理;
133.将每个窗口的数据进行reduce聚合操作,确定初步聚合数据流。
134.例如,所述处理单元810可以执行上述方法实施例的如下步骤:
135.将初步聚合数据流通过flink richmapfunction与redis数据库建立连接。
136.例如,所述处理单元810可以执行上述方法实施例的如下步骤:获取原始数据,原始数据包括读入flink中数据流datastream。
137.例如,所述处理单元810可以执行上述方法实施例的如下步骤:redis基本类型数据包括redis set类型数据。
138.存储单元820可以包括易失性存储单元形式的可读介质,例如随机存取存储单元(ram)8201和/或高速缓存存储单元8202,还可以进一步包括只读存储单元(rom)8203。
139.存储单元820还可以包括具有一组(至少一个)程序模块8205的程序/实用工具8204,这样的程序模块8205包括但不限于:操作系统、一个或者多个应用程序、其它程序模块以及程序数据,这些示例中的每一个或某种组合中可能包括网络环境的实现。
140.总线830可以为表示几类总线结构中的一种或多种,包括存储单元总线或者存储单元控制器、外围总线、图形加速端口、处理单元或者使用多种总线结构中的任意总线结构的局域总线。
141.电子设备800也可以与一个或多个外部设备840(例如键盘、指向设备、蓝牙设备等)通信,还可与一个或者多个使得用户能与该电子设备800交互的设备通信,和/或与使得该电子设备800能与一个或多个其它计算设备进行通信的任何设备(例如路由器、调制解调器等等)通信。这种通信可以通过输入/输出(i/o)接口850进行。并且,电子设备800还可以通过网络适配器860与一个或者多个网络(例如局域网(lan),广域网(wan)和/或公共网络,例如因特网)通信。如图所示,网络适配器860通过总线830与电子设备800的其它模块通信。应当明白,尽管图中未示出,可以结合电子设备800使用其它硬件和/或软件模块,包括但不限于:微代码、设备驱动器、冗余处理单元、外部磁盘驱动阵列、raid系统、磁带驱动器以及数据备份存储系统等。
142.通过以上的实施方式的描述,本领域的技术人员易于理解,这里描述的示例实施方式可以通过软件实现,也可以通过软件结合必要的硬件的方式来实现。因此,根据本公开实施方式的技术方案可以以软件产品的形式体现出来,该软件产品可以存储在一个非易失性存储介质(可以是cd-rom,u盘,移动硬盘等)中或网络上,包括若干指令以使得一台计算设备(可以是个人计算机、服务器、终端装置、或者网络设备等)执行根据本公开实施方式的方法。
143.特别地,根据本公开的实施例,上文参考流程图描述的过程可以被实现为计算机
程序产品,该计算机程序产品包括:计算机程序,所述计算机程序被处理器执行时实现上述用于流式数据做历史增量聚合的方法。
144.在本公开的示例性实施例中,还提供了一种计算机可读存储介质,该计算机可读存储介质可以是可读信号介质或者可读存储介质。图9示出本公开实施例中一种计算机可读存储介质的示意图,如图9所示,该计算机可读存储介质900上存储有能够实现本公开上述方法的程序产品。在一些可能的实施方式中,本公开的各个方面还可以实现为一种程序产品的形式,其包括程序代码,当所述程序产品在终端设备上运行时,所述程序代码用于使所述终端设备执行本说明书上述“示例性方法”部分中描述的根据本公开各种示例性实施方式的步骤。
145.在一个实施例中,本公开实施例中的程序产品被处理器执行时实现如下步骤的方法:
146.根据分组条件,确定原始数据流的键值与键值数据流;
147.将键值数据流进行开窗处理,确定初步聚合数据流;
148.将初步聚合数据流与远程字典服务redis数据库建立连接;
149.根据键值在redis数据库中查询;
150.若redis数据库中存在键值对应的历史数据,则取出历史数据与初步聚合数据流进行累加之后存入redis数据库。
151.在一个实施例中,本公开实施例中的程序产品被处理器执行时实现如下步骤的方法:
152.若redis数据库中不存在键值对应的数据,则将初步聚合数据流以键值对的形式存入redis基本类型数据中。
153.在一个实施例中,本公开实施例中的程序产品被处理器执行时实现如下步骤的方法:
154.根据分组的条件,从原始数据流中提取对应字段进行拼接处理作为键值,将原始数据流处理成键值数据流。
155.在一个实施例中,本公开实施例中的程序产品被处理器执行时实现如下步骤的方法:
156.将键值数据流进行开窗处理;
157.将每个窗口的数据进行reduce聚合操作,确定初步聚合数据流。
158.在一个实施例中,本公开实施例中的程序产品被处理器执行时实现如下步骤的方法:
159.将初步聚合数据流通过flink richmapfunction与redis数据库建立连接。
160.在一个实施例中,本公开实施例中的程序产品被处理器执行时实现如下步骤的方法:
161.获取原始数据,原始数据包括读入flink中数据流datastream。
162.在一个实施例中,本公开实施例中的程序产品被处理器执行时实现如下步骤的方法:
163.redis基本类型数据包括redis set类型数据。
164.本公开中的计算机可读存储介质的更具体的例子可以包括但不限于:具有一个或
多个导线的电连接、便携式计算机磁盘、硬盘、随机访问存储器(ram)、只读存储器(rom)、可擦式可编程只读存储器(eprom或闪存)、光纤、便携式紧凑磁盘只读存储器(cd-rom)、光存储器件、磁存储器件、或者上述的任意合适的组合。
165.在本公开中,计算机可读存储介质可以包括在基带中或者作为载波一部分传播的数据信号,其中承载了可读程序代码。这种传播的数据信号可以采用多种形式,包括但不限于电磁信号、光信号或上述的任意合适的组合。可读信号介质还可以是可读存储介质以外的任何可读介质,该可读介质可以发送、传播或者传输用于由指令执行系统、装置或者器件使用或者与其结合使用的程序。
166.可选地,计算机可读存储介质上包含的程序代码可以用任何适当的介质传输,包括但不限于无线、有线、光缆、rf等等,或者上述的任意合适的组合。
167.在具体实施时,可以以一种或多种程序设计语言的任意组合来编写用于执行本公开操作的程序代码,所述程序设计语言包括面向对象的程序设计语言—诸如java、c++等,还包括常规的过程式程序设计语言—诸如“c”语言或类似的程序设计语言。程序代码可以完全地在用户计算设备上执行、部分地在用户设备上执行、作为一个独立的软件包执行、部分在用户计算设备上部分在远程计算设备上执行、或者完全在远程计算设备或服务器上执行。在涉及远程计算设备的情形中,远程计算设备可以通过任意种类的网络,包括局域网(lan)或广域网(wan),连接到用户计算设备,或者,可以连接到外部计算设备(例如利用因特网服务提供商来通过因特网连接)。
168.应当注意,尽管在上文详细描述中提及了用于动作执行的设备的若干模块或者单元,但是这种划分并非强制性的。实际上,根据本公开的实施方式,上文描述的两个或更多模块或者单元的特征和功能可以在一个模块或者单元中具体化。反之,上文描述的一个模块或者单元的特征和功能可以进一步划分为由多个模块或者单元来具体化。
169.此外,尽管在附图中以特定顺序描述了本公开中方法的各个步骤,但是,这并非要求或者暗示必须按照该特定顺序来执行这些步骤,或是必须执行全部所示的步骤才能实现期望的结果。附加的或备选的,可以省略某些步骤,将多个步骤合并为一个步骤执行,以及/或者将一个步骤分解为多个步骤执行等。
170.通过以上实施方式的描述,本领域的技术人员易于理解,这里描述的示例实施方式可以通过软件实现,也可以通过软件结合必要的硬件的方式来实现。因此,根据本公开实施方式的技术方案可以以软件产品的形式体现出来,该软件产品可以存储在一个非易失性存储介质(可以是cd-rom,u盘,移动硬盘等)中或网络上,包括若干指令以使得一台计算设备(可以是个人计算机、服务器、移动终端、或者网络设备等)执行根据本公开实施方式的方法。
171.本领域技术人员在考虑说明书及实践这里公开的发明后,将容易想到本公开的其它实施方案。本公开旨在涵盖本公开的任何变型、用途或者适应性变化,这些变型、用途或者适应性变化遵循本公开的一般性原理并包括本公开未公开的本技术领域中的公知常识或惯用技术手段。说明书和实施例仅被视为示例性的,本公开的真正范围和精神由所附的权利要求指出。
技术特征:
1.一种用于流式数据做历史增量聚合的方法,其特征在于,包括:根据分组条件,确定原始数据流的键值与键值数据流;将所述键值数据流进行开窗处理,确定初步聚合数据流;将所述初步聚合数据流与远程字典服务redis数据库建立连接;根据所述键值在redis数据库中查询;若redis数据库中存在所述键值对应的历史数据,则取出所述历史数据与初步聚合数据流进行累加之后存入redis数据库。2.根据权利要求1所述的用于流式数据做历史增量聚合的方法,其特征在于,所述方法还包括:若redis数据库中不存在所述键值对应的数据,则将初步聚合数据流以键值对的形式存入redis基本类型数据中。3.根据权利要求1所述的用于流式数据做历史增量聚合的方法,其特征在于,所述根据分组条件,确定原始数据流的键值与键值数据流包括:根据分组的条件,从原始数据流中提取对应字段进行拼接处理作为键值,将原始数据流处理成键值数据流。4.根据权利要求1所述的用于流式数据做历史增量聚合的方法,其特征在于,所述将所述键值数据流进行开窗处理,确定初步聚合数据流包括:将所述键值数据流进行开窗处理;将每个窗口的数据进行reduce聚合操作,确定初步聚合数据流。5.根据权利要求1所述的用于流式数据做历史增量聚合的方法,其特征在于,所述将所述初步聚合数据流与远程字典服务redis数据库建立连接包括:将所述初步聚合数据流通过flink richmapfunction与redis数据库建立连接。6.根据权利要求1所述的用于流式数据做历史增量聚合的方法,其特征在于,在所述根据分组条件,确定原始数据流的键值与键值数据之前,所述方法还包括:获取原始数据,所述原始数据包括读入flink中数据流datastream。7.根据权利要求1所述的用于流式数据做历史增量聚合的方法,其特征在于,redis基本类型数据包括redis set类型数据。8.一种用于流式数据做历史增量聚合的装置,其特征在于,包括:键值信息确定模块,用于根据分组条件,确定原始数据流的键值与键值数据流;初步聚合数据流确定模块,用于将所述键值数据流进行开窗处理,确定初步聚合数据流;数据库建立连接模块,用于将所述初步聚合数据流与远程字典服务redis数据库建立连接;查询模块,用于根据所述键值在redis数据库中查询;累加存储模块,用于若redis数据库中存在所述键值对应的历史数据,则取出所述历史数据与初步聚合数据流进行累加之后存入redis数据库。9.一种电子设备,其特征在于,包括:处理器;以及存储器,用于存储所述处理器的可执行指令;
其中,所述处理器配置为经由执行所述可执行指令来执行权利要求1~7中任意一项所述的用于流式数据做历史增量聚合的方法。10.一种计算机可读存储介质,其上存储有计算机程序,其特征在于,所述计算机程序被处理器执行时实现权利要求1~7中任意一项所述的用于流式数据做历史增量聚合的方法。
技术总结
本公开提供了一种用于流式数据做历史增量聚合的方法及相关设备,涉及大数据处理技术领域。该方法包括,根据分组条件,确定原始数据流的键值与键值数据流;将键值数据流进行开窗处理,确定初步聚合数据流;将初步聚合数据流与远程字典服务Redis数据库建立连接;根据键值在Redis数据库中查询;若Redis数据库中存在键值对应的历史数据,则取出历史数据与初步聚合数据流进行累加之后存入Redis数据库。本公开能够高效而准确地实现对流式数据做历史增量聚合统计。量聚合统计。量聚合统计。
技术研发人员:王兵 严劲 余珍聪 康建伟 黄泽华
受保护的技术使用者:中国电信股份有限公司
技术研发日:2023.04.28
技术公布日:2023/8/24
版权声明
本文仅代表作者观点,不代表航空之家立场。
本文系作者授权航家号发表,未经原创作者书面授权,任何单位或个人不得引用、复制、转载、摘编、链接或以其他任何方式复制发表。任何单位或个人在获得书面授权使用航空之家内容时,须注明作者及来源 “航空之家”。如非法使用航空之家的部分或全部内容的,航空之家将依法追究其法律责任。(航空之家官方QQ:2926969996)
飞行汽车 https://www.autovtol.com/
