一种基于Spark的离线任务执行进度计算与获取方法及装置与流程
未命名
09-14
阅读:137
评论:0
一种基于spark的离线任务执行进度计算与获取方法及装置
技术领域
1.本技术涉及数据处理技术领域,尤其是一种基于spark的离线任务执行进度计算与获取方法及装置。
背景技术:
2.在spark提供的众多原生指标里,目前还没有一个指标是直接描述任务运行进度情况。和任务执行进度相关的指标有以下两个:application_xx_driver_dagscheduler_job_alljobs和application_xx_driver_dagscheduler_job_activejobs。理想情况下,可以通过对上述两个指标进行相除,即可得到spark离线任务的执行进度。但是,application_xx_driver_dagscheduler_job_alljobs的值并不是spark任务从一开始运行就已经计算好所有的job的数量。实际情况是,当一个action触发一个job的时候,上述指标会不断递增,直到没有action算子为止。因此,不能通过以上两个指标来计算任务执行进度。
3.spark本身也提供了不完善的任务进度条展示功能。spark中的配置spark.ui.showconsoleprogress的值默认为true,sparkcontext在初始化的时候会创建consoleprogressbar对象。consoleprogressbar对象会周期性的会从appstatusstore中获取到已完成的任务数numcompletetasks以及单个stage总的任务数numtasks,两者相除即可得到每个stage的任务执行进度情况。但是该方式只会在spark sql repl模式下将进度条信息打印出来,并且该进度条信息是stage级别,如果用户要想获取整个spark离线任务运行过程中整体的执行进度,目前为止,spark还没有提供对应的解决方案。
技术实现要素:
4.本技术的目的在于克服用户在使用spark sql运行离线任务时不能直观的获取到离线任务整体运行进度的问题,提供一种基于spark的离线任务执行进度计算与获取方法及装置。
5.第一方面,提供了一种基于spark的离线任务执行进度计算与获取方法,包括:计算离线任务中运行的sql总数;通过自定义jobprogresssource来新增用于描述离线任务进度的指标,将所述指标注册到spark内部管理系统中的指标管理系统中;自定义jobprogresslistener,并将所述jobprogresslistener注册到spark内部管理系统中的listenerbus,其中,所述jobprogresslistener包括用于计算当前job下所有的task数量的onjobstart方法、用于在每个task完成后计算并更新当前离线任务进度的ontaskend方法和用于在每个job完成后计算并更新当前离线任务进度的onjobend方法。
6.还包括:通过spark内部管理系统将离线任务进度的指标推送到pushgateway,由prometheus从pushgateway中拉取离线任务进度的指标,由离线平台周期性的从prometheus中获取离线任务进度的指标并将离线任务进度在离线平台上进行展示。
7.进一步的,计算离线任务中运行的sql总数,包括:获取离线平台中本次离线任务的sql字符串;将sql字符串按照连接的规则拆分成独立的sql字符串;计算出独立的sql字符串的数量即为本次离线任务中运行的sql总数。
8.进一步的,所述jobprogresssource实现spark的org.apache.spark.metrics.source.source特质。
9.进一步的,通过调用registergauge方法将所述指标注册到spark内部管理的指标管理系统中。
10.进一步的,所述onjobstart方法包括:计算每个job下stage的数量;计算每个stage下task的数量;将当前job下所有stage下task的数量相加即得当前job下所有的task数量。
11.进一步的,所述onjobend方法包括:通过公式1计算job完成后的离线任务进度并更新:
ꢀꢀꢀꢀꢀꢀꢀꢀꢀꢀꢀꢀꢀ
(1)其中,p
(i,n)
为第i条sql中完成n个job后的离线任务进度,s为sql的总数。
12.进一步的,所述ontaskend方法包括:通过公式2计算task完成后的离线任务进度并更新:
ꢀꢀ
(2)
13.其中,p
t
为第i条sql的第n个job中完成x个task后的离线任务进度,p
(i,n-1)
为第i条sql中完成n-1个job后的离线任务进度,p
(i,n)
为第i条sql中完成n个job后的离线任务进度,t为当前job中task的数量。
14.第二方面,提供了一种基于spark的离线任务执行进度计算与获取装置,包括:sql数计算模块,用于计算离线任务中运行的sql总数;指标注册模块,用于通过自定义jobprogresssource来新增用于描述离线任务进度的指标,将所述指标注册到spark内部管理系统中的指标管理系统中;jobprogresslistener注册模块,用于自定义jobprogresslistener,并将所述jobprogresslistener注册到spark内部管理系统中的listenerbus,其中,所述jobprogresslistener包括用于计算当前job下所有的task数量的onjobstart方法、用于在每个job完成后计算并更新当前离线任务进度的onjobend方法和用于在每个task完成后计算并更新当前离线任务进度的ontaskend方法。
15.还包括:指标拉取模块,用于通过spark内部管理系统将离线任务进度的指标推送到pushgateway,由prometheus从pushgateway中拉取离线任务进度的指标,由离线平台周期性的从prometheus中获取离线任务进度的指标并将离线任务进度在离线平台上进行展示。
16.本技术具有如下有益效果:该方法实现了将spark离线任务执行进度情况更细粒
度的计算,并将计算结果交由spark的指标管理系统进行统一管理,最后由spark将指标推送到pushgateway,由prometheus实时拉取,最后离线平台周期性的从prometheus中获取任务进度条指标并展示到离线平台上,使得用户可以直观的了解当前离线任务的执行进度情况。
附图说明
17.构成本技术的一部分的附图用于来提供对本技术的进一步理解,本技术的示意性实施例及其说明用于解释本技术,并不构成对本技术的不当限定。
18.为了更清楚地说明本技术实施例中的技术方案,下面将对实施例描述中所需要使用的附图作简单地介绍,显而易见地,下面描述中的附图仅仅是本技术的一些实施例,对于本领域普通技术人员来讲,在不付出创造性劳动的前提下,还可以根据这些附图获得其他的附图。
19.图1是本技术实施例1的基于spark的离线任务执行进度计算与获取方法的流程图;图2是本技术实施例2的基于spark的离线任务执行进度计算与获取装置的结构框图。
20.附图标记:100、sql数计算模块;200、指标注册模块;300、jobprogresslistener注册模块;400、指标拉取模块。
具体实施方式
21.下面将结合本发明实施例中的附图,对本发明实施例中的技术方案进行清楚、完整地描述,显然,所描述的实施例仅仅是本发明一部分实施例,而不是全部的实施例。基于本发明中的实施例,本领域普通技术人员在没有作出创造性劳动前提下所获得的所有其它实施例,都属于本发明保护的范围。
22.实施例1
23.本技术实施例1所涉及的一种基于spark的离线任务执行进度计算与获取方法,包括:计算离线任务中运行的sql总数;通过自定义jobprogresssource来新增用于描述离线任务进度的指标,将所述指标注册到spark内部管理系统中的指标管理系统中;自定义jobprogresslistener,并将所述jobprogresslistener注册到spark内部管理系统中的listenerbus,其中,所述jobprogresslistener包括用于计算当前job下所有的task数量的onjobstart方法、用于在每个task完成后计算并更新当前离线任务进度的ontaskend方法和用于在每个job完成后计算并更新当前离线任务进度的onjobend方法;通过spark内部管理系统将离线任务进度的指标推送到pushgateway,由prometheus从pushgateway中拉取离线任务进度的指标,由离线平台周期性的从prometheus中获取离线任务进度的指标并将离线任务进度在离线平台上进行展示,该方法实现了将spark离线任务执行进度情况更细粒度的计算,并将计算结果交由spark的指标管理系统进行统一管理,最后由spark将指标推送到pushgateway,由prometheus实时拉取,最后离线平台周期性的从prometheus中获取任务进度条指标并展示到离线平台上,使得用户可以直观的了解当前离线任务的执行进度情
况。
24.具体的,图1示出了申请实施例1中的基于spark的离线任务执行进度计算与获取方法的流程图,包括:s101、计算离线任务中运行的sql总数s;在离线平台中会将用户的多个sql以分号分隔连接成一个字符串,因此在接收到这个字符串后,先将接收到的sql字符串根据分号进行拆分,以拆分出一条条sql。
25.具体的,获取离线平台中本次离线任务的sql字符串;将sql字符串按照连接的规则拆分成独立的sql字符串;计算出独立的sql字符串的数量即为本次离线任务中运行的sql总数。
26.s102、通过自定义jobprogresssource来新增用于描述离线任务进度的指标,其中,所述jobprogresssource实现spark的org.apache.spark.metrics.source.source特质,通过调用registergauge方法将所述指标注册到spark内部管理的指标管理系统中;在spark中需要新增一个或者一类指标时,需要自定义一个类,并且自定义的这个类实现source特质,这样就可以把这个指标纳入到spark内部的指标管理系统进行管理。
27.其中,source内部定义了sourcename和metricregistry两个方法,当实现source特质时,就要重写这两个方法;当需要新增一个指标或者一类指标,会自定义source,比如说jobprogresssource,并且jobprogresssource实现source特质,重写sourcename和metricregistry两个方法。除此之外,jobprogresssource还有一个registergauge方法,比如说jobprogresssource,sourcename的值为jobprogress,metricregistry指向一个新创建出来的metricregistry对象。metricregistry可以理解成是jobprogresssource的一个代理。
28.需要说明的是,spark内部的指标管理系统是基于dropwizar这个度量库来构建的,metricregistry是这个库的一个核心组件,用来对各个指标进行管理。
29.在自定义jobprogresssource的过程中,创建了一个metricregistry对象。spark内部的所有指标的管理都是由一个个的metricregistry对象进行管理,此时,如果需要新增一个指标,就需要调用metricregistry的register方法,将指标注册到metricregistry中,让metricregistry进行管理,registergauge方法就是调用了metricregistry的register方法s103、自定义jobprogresslistener,并将所述jobprogresslistener注册到spark内部管理系统中的listenerbus,其中,所述jobprogresslistener包括用于计算当前job下所有的task数量的onjobstart方法、用于在每个task完成后计算并更新当前离线任务进度的ontaskend方法和用于在每个job完成后计算并更新当前离线任务进度的onjobend方法。
30.其中,所述onjobstart方法包括:计算每个job下stage的数量;计算每个stage下task的数量;将当前job下所有stage下task的数量相加即得当前job下所有的task数量t。
31.需要说明的是,在一个job下面会被切分成一个个stage,每个stage下也有一个个task。onjobstart、onjobend、ontaskend这几个方法可以理解成是一个钩子方法,当一个job开始执行,spark就会调用onjobstart方法,onjobstart方法有一个入参为
sparklistenerjobstart对象,从这个对象就可以获取到这个job下所有stageinfo对象。而stageinfo对象下的numtasks成员变量就记录了一个stage下的所有task数量。
32.所述onjobend方法包括:通过公式1计算job完成后的离线任务进度并更新:
ꢀꢀꢀꢀꢀꢀꢀꢀꢀꢀꢀꢀꢀ
(1)其中,p
(i,n)
为第i条sql中完成n个job后的离线任务进度,s为sql的总数。
33.当一个job执行完毕,spark就会调用onjobend方法,我们是可以计算到每个job完成之后进度条的百分比的,onjobend方法就是在这个job结束之后,更新进度条的百分到对应的值。
34.所述ontaskend方法包括:通过公式2计算task完成后的离线任务进度并更新:
ꢀꢀ
(2)
35.其中,p
t
为第i条sql的第n个job中完成x个task后的离线任务进度,p
(i,n-1)
为第i条sql中完成n-1个job后的离线任务进度,p
(i,n)
为第i条sql中完成n个job后的离线任务进度,t为当前job中task的数量。
36.需要说明的是,当一个task执行完毕,spark就会调用ontaskend方法。ontaskend方法核心的算法是计算出一个task完成后整个进度条的百分比是多少并对进度条进行更新。
37.在一个具体的实施例中,该离线任务中共有两条sql,即s=2:在第一条sql完成之后,进度条就到达到50%,第二条sql完成之后,进度条就会达到100%;在运行第一条sql过程中,首先会运行第一个job,调用onjobstart方法,计算出这个job下一共有2个task,即t=2,在运行第一个job的过程中,虽然job还没运行结束,但是通过onjobend方法可以计算出第一个job完成后的进度百分比为:,等到第一个job运行结束后,会调用onjobend方法,此时在onjobend方法中直接将进度条百分比的值更新为25%,当第一个task完成后,即i=1,n=1,x=1,t=2,通过ontaskend方法可以计算出第一个task完成后的进度百分比为:;当第二个task完成后,即i=1,n=1,x=2,t=2,通过ontaskend方法可以计算出第二个task完成后的进度百分比为:;在运行第一条sql过程中,在第一个job完成后,会运行第二个job,调用onjobstart方法,计算出这个job下一共有2个task,即t=2,由于无法得知一条sql中有多少个job,因此,需要先通过onjobend方法计算出第二个job完成后的进度百分比为:
,等到第二个job运行结束后,会调用onjobend方法,此时在onjobend方法中直接将进度条百分比的值更新为37.5%,当第一个task完成后,即i=1,n=2,x=1,t=1,通过ontaskend方法可以计算出第一个task完成后的进度百分比为:;当第二个task完成后,即i=1,n=2,x=2,t=2,通过ontaskend方法可以计算出第二个task完成后的进度百分比为:,由于第二个job只有两个task,所以在第二个task完成后第二个job也完成了,所以通过onjobend方法和ontaskend方法计算出的离线任务的进度百分比都为37.5%;以此类推,可以在某一个job或某一个task完成后自动计算出离线任务的进度条的百分比。
38.需要说明的是,jobprogresslistener的作用是计算离线任务进度,jobprogresssource的作用是将此时此刻指标的值(即jobprogresslistener所计算出的离线任务进度)注册到spark内部的指标管理系统中,后续spark内部的指标管理系统会每隔一段时间将指标值push到destination,比如pushgateway。
39.s104、通过spark内部管理系统将离线任务进度的指标推送到pushgateway,由prometheus从pushgateway中拉取离线任务进度的指标,由离线平台周期性的从prometheus中获取离线任务进度的指标并将离线任务进度在离线平台上进行展示。
40.需要说明的是,prometheus是一个开源的监控和报警系统,prometheus可以收集并保存指标数据,一般来说,prometheus可以每隔一段时间自动将源头的指标数据拉取过来。但是有一些特殊情况,比如说针对运行时间比较短的任务,prometheus还没来得及拉取数据,任务就结束了。像这种任务较优的方式是将指标先推送到pushgateway,此时,pushgateway可以理解成是一个中间商,我们可以先把指标数据推送到pushgateway,然后prometheus去pushgateway将指标数据拉取过来,最后再由离线平台周期性的从prometheus中获取离线任务进度的指标并将离线任务进度在离线平台上进行展示。
41.实施例2
42.如图2所示,本技术实施例2所涉及的一种基于spark的离线任务执行进度计算与获取装置,包括:sql数计算模块100,用于计算离线任务中运行的sql总数;指标注册模块200,用于通过自定义jobprogresssource来新增用于描述离线任务进度的指标,将所述指标注册到spark内部管理系统中的指标管理系统中;jobprogresslistener注册模块300,用于自定义jobprogresslistener,并将所述jobprogresslistener注册到spark内部管理系统中的listenerbus,其中,所述jobprogresslistener包括用于计算当前job下所有的task数量的onjobstart方法、用于在每个job完成后计算并更新当前离线任务进度的onjobend方法和用于在每个task完成后计算并更新当前离线任务进度的ontaskend方法。
43.还包括:指标拉取模块400,用于通过spark内部管理系统将离线任务进度的指标推送到pushgateway,由prometheus从pushgateway中拉取离线任务进度的指标,由离线平
台周期性的从prometheus中获取离线任务进度的指标并将离线任务进度在离线平台上进行展示。
44.本实施例的系统实现了将spark离线任务执行进度情况更细粒度的计算,并将计算结果交由spark的指标管理系统进行统一管理,最后由spark将指标推送到pushgateway,由prometheus实时拉取,最后离线平台周期性的从prometheus中获取任务进度条指标并展示到离线平台上,使得用户可以直观的了解当前离线任务的执行进度情况。
45.以上,仅为本技术较佳的具体实施方式;但本技术的保护范围并不局限于此。任何熟悉本技术领域的技术人员在本技术揭露的技术范围内,根据本技术的技术方案及其改进构思加以等同替换或改变,都应涵盖在本技术的保护范围内。
技术特征:
1.一种基于spark的离线任务执行进度计算与获取方法,其特征在于,包括:计算离线任务中运行的sql总数;通过自定义jobprogresssource来新增用于描述离线任务进度的指标,将所述指标注册到spark内部管理系统中的指标管理系统中;自定义jobprogresslistener,并将所述jobprogresslistener注册到spark内部管理系统中的listenerbus,其中,所述jobprogresslistener包括用于计算当前job下所有的task数量的onjobstart方法、用于在每个task完成后计算并更新当前离线任务进度的ontaskend方法和用于在每个job完成后计算并更新当前离线任务进度的onjobend方法。2.根据权利要求1所述的基于spark的离线任务执行进度计算与获取方法,其特征在于,还包括:通过spark内部管理系统将离线任务进度的指标推送到pushgateway,由prometheus从pushgateway中拉取离线任务进度的指标,由离线平台周期性的从prometheus中获取离线任务进度的指标并将离线任务进度在离线平台上进行展示。3.根据权利要求1所述的基于spark的离线任务执行进度计算与获取方法,其特征在于,计算离线任务中运行的sql总数,包括:获取离线平台中本次离线任务的sql字符串;将sql字符串按照连接的规则拆分成独立的sql字符串;计算出独立的sql字符串的数量即为本次离线任务中运行的sql总数。4.根据权利要求1所述的基于spark的离线任务执行进度计算与获取方法,其特征在于,所述jobprogresssource实现spark的org.apache.spark.metrics.source.source特质。5.根据权利要求1所述的基于spark的离线任务执行进度计算与获取方法,其特征在于,通过调用registergauge方法将所述指标注册到spark内部管理的指标管理系统中。6.根据权利要求1所述的基于spark的离线任务执行进度计算与获取方法,其特征在于,所述onjobstart方法包括:计算每个job下stage的数量;计算每个stage下task的数量;将当前job下所有stage下task的数量相加即得当前job下所有的task数量。7.根据权利要求6所述的基于spark的离线任务执行进度计算与获取方法,其特征在于,所述onjobend方法包括:通过公式1计算job完成后的离线任务进度并更新:
ꢀꢀꢀꢀꢀꢀꢀꢀꢀꢀꢀ
(1)其中,p
(i,n)
为第i条sql中完成n个job后的离线任务进度,s为sql的总数。8.根据权利要求7所述的基于spark的离线任务执行进度计算与获取方法,其特征在于,所述ontaskend方法包括:通过公式2计算task完成后的离线任务进度并更新:
ꢀꢀꢀꢀꢀꢀꢀꢀ
(2)其中,p
t
为第i条sql的第n个job中完成x个task后的离线任务进度,p
(i,n-1)
为第i条sql
中完成n-1个job后的离线任务进度,p
(i,n)
为第i条sql中完成n个job后的离线任务进度,t为当前job中task的数量。9.一种基于spark的离线任务执行进度计算与获取装置,其特征在于,包括:sql数计算模块,用于计算离线任务中运行的sql总数;指标注册模块,用于通过自定义jobprogresssource来新增用于描述离线任务进度的指标,将所述指标注册到spark内部管理系统中的指标管理系统中;jobprogresslistener注册模块,用于自定义jobprogresslistener,并将所述jobprogresslistener注册到spark内部管理系统中的listenerbus,其中,所述jobprogresslistener包括用于计算当前job下所有的task数量的onjobstart方法、用于在每个job完成后计算并更新当前离线任务进度的onjobend方法和用于在每个task完成后计算并更新当前离线任务进度的ontaskend方法。10.根据权利要求9所述的基于spark的离线任务执行进度计算与获取装置,其特征在于,还包括:指标拉取模块,用于通过spark内部管理系统将离线任务进度的指标推送到pushgateway,由prometheus从pushgateway中拉取离线任务进度的指标,由离线平台周期性的从prometheus中获取离线任务进度的指标并将离线任务进度在离线平台上进行展示。
技术总结
本申请涉及数据处理技术领域,解决了用户在使用Spark SQL运行离线任务时不能直观的获取到离线任务整体运行进度的问题,公开了一种基于Spark的离线任务执行进度计算与获取方法及装置,包括:计算离线任务中运行的SQL总数,通过自定义JobProgressSource来新增用于描述离线任务进度的指标,将所述指标注册到Spark内部管理系统中的指标管理系统中,自定义JobProgressListener,并将所述JobProgressListener注册到Spark内部管理系统中的ListenerBus,能够将Spark离线任务执行进度情况更细粒度的计算,并将计算结果交由Spark的指标管理系统进行统一管理。Spark的指标管理系统进行统一管理。Spark的指标管理系统进行统一管理。
技术研发人员:梁伟雄 陈吉平 杨思枢
受保护的技术使用者:杭州玳数科技有限公司
技术研发日:2023.08.14
技术公布日:2023/9/13
版权声明
本文仅代表作者观点,不代表航空之家立场。
本文系作者授权航家号发表,未经原创作者书面授权,任何单位或个人不得引用、复制、转载、摘编、链接或以其他任何方式复制发表。任何单位或个人在获得书面授权使用航空之家内容时,须注明作者及来源 “航空之家”。如非法使用航空之家的部分或全部内容的,航空之家将依法追究其法律责任。(航空之家官方QQ:2926969996)
飞行汽车 https://www.autovtol.com/
上一篇:一种自动化加工装置的制作方法 下一篇:一种养猪场用猪舍自动通风装置的制作方法
