算子热更新方法及装置与流程
未命名
07-19
阅读:119
评论:0
1.本发明涉及大数据技术领域,尤其涉及算子热更新方法及装置。
背景技术:
2.本部分旨在为权利要求书中陈述的本发明实施例提供背景或上下文。此处的描述不因为包括在本部分中就承认是现有技术。
3.在数据挖掘过程中,需要对海量数据进行清洗、分析、加工,且每个过程对资源的要求也不同,为了实现资源动态扩缩容,满足不同数据量的使用场景,业内的趋势是结合容器编排引擎spark on kubernetes来实现。
4.基于spark on kubernetes进行数据挖掘要求用户具备技术基础,门槛高。因此业内比较常见的做法是基于spark提供封装好的算子,用户基于多种算子就能快速地探索出一套数据加工的方案。当需要支持新的计算逻辑时,就会针对性的开发出一个独立的算子来支撑该逻辑。因此更新算子是一个相对频繁的操作,现有的技术方案下新增一个定制化算子,一般需要经过如下步骤:
5.1、开发算子。
6.2、添加算子所需的依赖包。
7.3、更新应用镜像。
8.4、增加算子及依赖包所需的参数。
9.5、重新打包镜像。
10.6、结束正在进行的计算任务并停止生产环境的spark集群。
11.7、更新镜像。
12.8.重新启动spark集群。
13.因此每次增加一种新算子,都需要停服升级,在更新期间服务处于不可用状态,这给使用上带来诸多不便,效率低下。
技术实现要素:
14.本发明实施例提供一种算子热更新方法,用以实现算子热更新,该方法包括:
15.基于hadoop分布式文件系统及spark服务涉及的算子建立算子仓库;
16.按照预设的频率扫描算子仓库;
17.若算子仓库中的算子被修改或数量改变,在spark服务执行当前任务时将spark服务接收到的新任务放至等待队列;
18.轮巡spark服务的任务进度;
19.在检测到spark服务的当前任务执行完成时,重新启动spark服务。
20.本发明实施例还提供一种算子热更新装置,用以实现算子热更新,该装置包括:
21.扫描模块,用于基于hadoop分布式文件系统及spark服务涉及的算子建立算子仓库;按照预设的频率扫描算子仓库;
22.热更新模块,用于若算子仓库中的算子被修改或数量改变,在spark服务执行当前任务时将spark服务接收到的新任务放至等待队列;轮巡spark服务的任务进度;在检测到spark服务的当前任务执行完成时,重新启动spark服务。
23.本发明实施例还提供一种计算机设备,包括存储器、处理器及存储在存储器上并可在处理器上运行的计算机程序,所述处理器执行所述计算机程序时实现上述算子热更新方法。
24.本发明实施例还提供一种计算机可读存储介质,所述计算机可读存储介质存储有计算机程序,所述计算机程序被处理器执行时实现上述算子热更新方法。
25.本发明实施例还提供一种计算机程序产品,所述计算机程序产品包括计算机程序,所述计算机程序被处理器执行时实现上述算子热更新方法。
26.本发明实施例中,基于hadoop分布式文件系统及spark服务涉及的算子建立算子仓库;按照预设的频率扫描算子仓库;若算子仓库中的算子被修改或数量改变,在spark服务执行当前任务时将spark服务接收到的新任务放至等待队列;轮巡spark服务的任务进度;在检测到spark服务的当前任务执行完成时,重新启动spark服务,与现有技术相比,通过动态扫描的方式检测到算子变更,不需要将算子插件对应的依赖包重新打包到spark集群的镜像中,解耦了算子更新与镜像更新。检测当前正在执行的任务,不需要中断执行中的任务,当任务执行完成后重启spark服务,实现了算子热更新,整个升级过程对用户无感知,提升了用户体验。
附图说明
27.为了更清楚地说明本发明实施例或现有技术中的技术方案,下面将对实施例或现有技术描述中所需要使用的附图作简单地介绍,显而易见地,下面描述中的附图仅仅是本发明的一些实施例,对于本领域普通技术人员来讲,在不付出创造性劳动的前提下,还可以根据这些附图获得其他的附图。在附图中:
28.图1为本发明提供的算子热更新方法的流程示意图;
29.图2为本发明提供的算子热更新方法的流程示意图;
30.图3为本发明提供的算子热更新方法的流程示意图;
31.图4为本发明提供的算子热更新方法的流程示意图;
32.图5为本发明提供的算子热更新装置的结构示意图。
具体实施方式
33.为使本发明实施例的目的、技术方案和优点更加清楚明白,下面结合附图对本发明实施例做进一步详细说明。在此,本发明的示意性实施例及其说明用于解释本发明,但并不作为对本发明的限定。
34.图1为本发明实施例提供的一种算子热更新方法所对应的流程示意图,如图1所示,该方法包括:
35.步骤101,基于hadoop分布式文件系统及spark服务涉及的算子建立算子仓库。
36.本发明实施例将spark使用到的算子及算子的依赖包存放至算子仓库中。
37.需要说明的是,算子定义了数据加工的内容,主要用于数据清洗、特征衍生等场
景。比如连接算子、列拆分算子等。spark为基于内存计算的分布式计算引擎,普遍用于大数据计算。
38.步骤102,按照预设的频率扫描算子仓库。
39.举例来说,每5秒定时扫描算子仓库,比对每个文件的last modified属性,即标记此文件在服务器端最后被修改的时间是否是最近5秒内更新的。
40.步骤103,若算子仓库中的算子被修改或数量改变,在spark服务执行当前任务时将spark服务接收到的新任务放至等待队列。
41.本发明实施例中,比对上一次扫描结果与本次扫描结果,是否存在新增或删除的文件。若存在,则发送变更事件到监听服务。
42.在一种可能的实施方式中,采用钩子函数将spark服务接收到的新任务放至等待队列。
43.采用hook函数,在spark服务接收到新任务时,将新任务放至等待队列。使得spark服务在当前的任务执行完成时不继续处理新任务。
44.步骤104,轮巡spark服务的任务进度。
45.步骤105,在检测到spark服务的当前任务执行完成时,重新启动spark服务。
46.本发明实施例中,通过spark接口查询当前任务的执行情况。
47.举例来说,每隔10秒轮巡spark集群的任务情况,当检测到当前的所有任务都已执行完成,则通过spark接口重新启动spark服务。
48.本发明实施例中,实时检测spark服务当前的任务执行情况,并提供等待队列以缓存用户提供的计算任务。在现有计算任务执行完毕后,重启spark服务并将队列中的缓存任务提交到新集群。
49.在一种可能的实施方式中,重新启动spark服务时,将被改文件的名称拼接成启动参数。
50.在重新启动spark服务之后,将等待队列中的任务提交到重新启动后的spark服务。
51.本发明实施例在启动完成后,将等待队列中的任务提交到新启动的spark服务。
52.上述方案,通过动态扫描的方式检测到算子变更,不需要将算子插件对应的依赖包重新打包到spark集群的镜像中,解耦了算子更新与镜像更新。检测当前正在执行的任务,不需要中断执行中的任务,当任务执行完成后重启spark服务,实现了算子热更新,整个升级过程对用户无感知,提升了用户体验。
53.本发明实施例在步骤101中,基于hadoop分布式文件系统及spark服务涉及的算子建立算子仓库,步骤流程如图2所示,具体如下:
54.步骤201,生成算子插件。
55.需要说明的是,算子插件包含spark服务涉及的算子的注册接口、解释接口、数据加载接口及计算接口。
56.步骤202,基于hadoop分布式文件系统根据算子插件建立算子仓库。
57.本发明实施例中,注册接口实现了注册算子到spark算子管理模块的功能;解释接口实现算子说明文档,包括参数、功能及使用示例等;数据加载接口实现统一的数据加载功能;计算接口实现该算子具体的计算逻辑。
58.在生成算子插件后,通过后台算子管理界面将算子插件涉及到的jar包及依赖包上传到算子仓库特定目录。
59.上述方案,生成算子插件,为解耦算子更新与镜像更新提供基础。
60.本发明实施例在按照预设的频率扫描算子仓库之后,步骤流程如图3所示,具体如下:
61.步骤301,确定算子仓库中每个算子的属性信息及算子数量。
62.步骤302,根据每个算子的属性信息确定各个算子在预设时间段内是否被修改。
63.步骤303,与上一次扫描后的算子数量进行对比,确定是否有新增或删除的算子。
64.本发明实施例中,确定算子仓库中每个算子的属性信息及算子数量,根据每个算子的属性信息确定各个算子在预设时间段内是否被修改,实现对变更算子的精准校验。
65.上述方案,通过动态扫描的方式检测到算子变更,不需要将算子插件对应的依赖包重新打包到spark集群的镜像中,解耦了算子更新与镜像更新。
66.图4为本发明实施例在轮巡spark服务的任务进度之前,步骤流程如图4所示,该方法包括:
67.步骤401,按照预设的频率扫描算子仓库。
68.步骤402,在算子仓库中的文件在预设时间段内被修改或文件数量改变时,根据新增文件的名称生成启动参数。
69.步骤403,将spark服务接收到的新任务放至等待队列。
70.上述方案,根据新增文件的名称生成启动参数,解耦了算子更新与镜像更新。
71.本发明实施例中还提供了一种算子热更新装置,如下面的实施例所述。该装置如图5所示,所述装置包括:
72.扫描模块501,用于基于hadoop分布式文件系统及spark服务涉及的算子建立算子仓库;按照预设的频率扫描算子仓库;
73.热更新模块502,用于若算子仓库中的算子被修改或数量改变,在spark服务执行当前任务时将spark服务接收到的新任务放至等待队列;轮巡spark服务的任务进度;在检测到spark服务的当前任务执行完成时,重新启动spark服务。
74.本发明实施例中,所述热更新模块502还用于:
75.在按照预设的频率扫描算子仓库之后,确定算子仓库中每个算子的属性信息及算子数量;
76.根据每个算子的属性信息确定各个算子在预设时间段内是否被修改;
77.与上一次扫描后的算子数量进行对比,确定是否有新增或删除的算子。
78.本发明实施例中,所述扫描模块501具体用于:
79.生成算子插件;所述算子插件包含spark服务涉及的算子的注册接口、解释接口、数据加载接口及计算接口;
80.基于hadoop分布式文件系统根据算子插件建立算子仓库。
81.本发明实施例中,所述热更新模块502具体用于:
82.采用钩子函数将spark服务接收到的新任务放至等待队列。
83.本发明实施例中,所述热更新模块502还用于:
84.在重新启动spark服务之后,将等待队列中的任务提交到重新启动后的spark服
务。
85.由于该装置解决问题的原理与算子热更新方法相似,因此该装置的实施可以参见算子热更新方法的实施,重复之处不再赘述。
86.本发明实施例还提供一种计算机设备,包括存储器、处理器及存储在存储器上并可在处理器上运行的计算机程序,所述处理器执行所述计算机程序时实现上述算子热更新方法。
87.本发明实施例还提供一种计算机可读存储介质,所述计算机可读存储介质存储有计算机程序,所述计算机程序被处理器执行时实现上述算子热更新方法。
88.本发明实施例还提供一种计算机程序产品,所述计算机程序产品包括计算机程序,所述计算机程序被处理器执行时实现上述算子热更新方法。
89.本发明实施例中,基于hadoop分布式文件系统及spark服务涉及的算子建立算子仓库;按照预设的频率扫描算子仓库;若算子仓库中的算子被修改或数量改变,在spark服务执行当前任务时将spark服务接收到的新任务放至等待队列;轮巡spark服务的任务进度;在检测到spark服务的当前任务执行完成时,重新启动spark服务,与现有技术相比,通过动态扫描的方式检测到算子变更,不需要将算子插件对应的依赖包重新打包到spark集群的镜像中,解耦了算子更新与镜像更新。检测当前正在执行的任务,不需要中断执行中的任务,当任务执行完成后重启spark服务,实现了算子热更新,整个升级过程对用户无感知,提升了用户体验。
90.本领域内的技术人员应明白,本发明的实施例可提供为方法、系统、或计算机程序产品。因此,本发明可采用完全硬件实施例、完全软件实施例、或结合软件和硬件方面的实施例的形式。而且,本发明可采用在一个或多个其中包含有计算机可用程序代码的计算机可用存储介质(包括但不限于磁盘存储器、cd-rom、光学存储器等)上实施的计算机程序产品的形式。
91.本发明是参照根据本发明实施例的方法、设备(系统)、和计算机程序产品的流程图和/或方框图来描述的。应理解可由计算机程序指令实现流程图和/或方框图中的每一流程和/或方框、以及流程图和/或方框图中的流程和/或方框的结合。可提供这些计算机程序指令到通用计算机、专用计算机、嵌入式处理机或其他可编程数据处理设备的处理器以产生一个机器,使得通过计算机或其他可编程数据处理设备的处理器执行的指令产生用于实现在流程图一个流程或多个流程和/或方框图一个方框或多个方框中指定的功能的装置。
92.这些计算机程序指令也可存储在能引导计算机或其他可编程数据处理设备以特定方式工作的计算机可读存储器中,使得存储在该计算机可读存储器中的指令产生包括指令装置的制造品,该指令装置实现在流程图一个流程或多个流程和/或方框图一个方框或多个方框中指定的功能。
93.这些计算机程序指令也可装载到计算机或其他可编程数据处理设备上,使得在计算机或其他可编程设备上执行一系列操作步骤以产生计算机实现的处理,从而在计算机或其他可编程设备上执行的指令提供用于实现在流程图一个流程或多个流程和/或方框图一个方框或多个方框中指定的功能的步骤。
94.以上所述的具体实施例,对本发明的目的、技术方案和有益效果进行了进一步详细说明,所应理解的是,以上所述仅为本发明的具体实施例而已,并不用于限定本发明的保
护范围,凡在本发明的精神和原则之内,所做的任何修改、等同替换、改进等,均应包含在本发明的保护范围之内。
技术特征:
1.一种算子热更新方法,其特征在于,包括:基于hadoop分布式文件系统及spark服务涉及的算子建立算子仓库;按照预设的频率扫描算子仓库;若算子仓库中的算子被修改或数量改变,在spark服务执行当前任务时将spark服务接收到的新任务放至等待队列;轮巡spark服务的任务进度;在检测到spark服务的当前任务执行完成时,重新启动spark服务。2.如权利要求1所述的算子热更新方法,其特征在于,在所述按照预设的频率扫描算子仓库之后,还包括:确定算子仓库中每个算子的属性信息及算子数量;根据每个算子的属性信息确定各个算子在预设时间段内是否被修改;与上一次扫描后的算子数量进行对比,确定是否有新增或删除的算子。3.如权利要求1所述的算子热更新方法,其特征在于,所述基于hadoop分布式文件系统及spark服务涉及的算子建立算子仓库,包括:生成算子插件;所述算子插件包含spark服务涉及的算子的注册接口、解释接口、数据加载接口及计算接口;基于hadoop分布式文件系统根据算子插件建立算子仓库。4.如权利要求1所述的算子热更新方法,其特征在于,所述将spark服务接收到的新任务放至等待队列,包括:采用钩子函数将spark服务接收到的新任务放至等待队列。5.如权利要求1所述的算子热更新方法,其特征在于,在所述重新启动spark服务之后,还包括:将等待队列中的任务提交到重新启动后的spark服务。6.一种算子热更新装置,其特征在于,包括:扫描模块,用于基于hadoop分布式文件系统及spark服务涉及的算子建立算子仓库;按照预设的频率扫描算子仓库;热更新模块,用于若算子仓库中的算子被修改或数量改变,在spark服务执行当前任务时将spark服务接收到的新任务放至等待队列;轮巡spark服务的任务进度;在检测到spark服务的当前任务执行完成时,重新启动spark服务。7.如权利要求6所述的算子热更新装置,其特征在于,所述热更新模块还用于:在按照预设的频率扫描算子仓库之后,确定算子仓库中每个算子的属性信息及算子数量;根据每个算子的属性信息确定各个算子在预设时间段内是否被修改;与上一次扫描后的算子数量进行对比,确定是否有新增或删除的算子。8.如权利要求6所述的算子热更新装置,其特征在于,所述扫描模块具体用于:生成算子插件;所述算子插件包含spark服务涉及的算子的注册接口、解释接口、数据加载接口及计算接口;基于hadoop分布式文件系统根据算子插件建立算子仓库。9.如权利要求6所述的算子热更新装置,其特征在于,所述热更新模块具体用于:
采用钩子函数将spark服务接收到的新任务放至等待队列。10.如权利要求6所述的算子热更新装置,其特征在于,所述热更新模块还用于:在重新启动spark服务之后,将等待队列中的任务提交到重新启动后的spark服务。11.一种计算机设备,包括存储器、处理器及存储在存储器上并可在处理器上运行的计算机程序,其特征在于,所述处理器执行所述计算机程序时实现权利要求1至5任一所述方法。12.一种计算机可读存储介质,其特征在于,所述计算机可读存储介质存储有计算机程序,所述计算机程序被处理器执行时实现权利要求1至5任一所述方法。13.一种计算机程序产品,其特征在于,所述计算机程序产品包括计算机程序,所述计算机程序被处理器执行时实现权利要求1至5任一所述方法。
技术总结
本发明公开了算子热更新方法及装置,方法包括:基于Hadoop分布式文件系统及spark服务涉及的算子建立算子仓库;按照预设的频率扫描算子仓库;若算子仓库中的算子被修改或数量改变,在spark服务执行当前任务时将spark服务接收到的新任务放至等待队列;轮巡spark服务的任务进度;在检测到spark服务的当前任务执行完成时,重新启动spark服务。本发明通过动态扫描的方式检测到算子变更,不需要将算子插件对应的依赖包重新打包到spark集群的镜像中,解耦了算子更新与镜像更新。检测当前正在执行的任务,不需要中断执行中的任务,当任务执行完成后重启spark服务,实现了算子热更新,提升了用户体验。用户体验。用户体验。
技术研发人员:林培峰 方景星
受保护的技术使用者:建信金融科技有限责任公司
技术研发日:2023.04.21
技术公布日:2023/7/18
版权声明
本文仅代表作者观点,不代表航空之家立场。
本文系作者授权航家号发表,未经原创作者书面授权,任何单位或个人不得引用、复制、转载、摘编、链接或以其他任何方式复制发表。任何单位或个人在获得书面授权使用航空之家内容时,须注明作者及来源 “航空之家”。如非法使用航空之家的部分或全部内容的,航空之家将依法追究其法律责任。(航空之家官方QQ:2926969996)
飞行汽车 https://www.autovtol.com/
上一篇:一种石材幕墙伸缩缝结构的制作方法 下一篇:一种微型高精度步进柱塞泵的制作方法
