消息处理方法、装置、设备、介质及产品与流程

未命名 10-19 阅读:96 评论:0


1.本公开涉及分布式技术领域,可以应用于金融科技技术领域,特别涉及一种消息处理方法、装置、设备、介质及产品。


背景技术:

2.现有分布式发布订阅消息中,消费者业务系统连接kafka拉取消息,再进行消息消费时,需要消费者开发代码去实现数据的入库、发邮件、发短信等操作,如果涉及数据同步、消息同步的数量较多,避免不了开发的重复工作量,也严重影响实现的效率。


技术实现要素:

3.有鉴于此,本公开的主要目的是提供一种消息处理方法、装置、设备、介质及产品,旨在至少部分解决分布式发布订阅消息中代码开发工作量大、实现效率低等技术问题。
4.为实现上述目的,本公开实施例的第一方面提供一种消息处理方法,应用于消息总线,所述消息总线为基于kafka搭建的可扩展性消息总线,用于统一处理服务间的异步通信和数据同步,方法包括:获取kafka消息;获取对应于所述kafka消息的配置信息、解析类以及入库处理类;调用所述解析类对所述kafka消息进行解析,得到解析数据;根据所述解析数据确定解析过程是否正常;响应于所述解析结果正常,根据所述配置信息对所述kafka消息中的字段进行加工,得到加工结果;调用所述入库处理类对所述加工结果执行入库操作。
5.根据本公开的实施例,所述方法还包括:响应于所述解析结果异常,根据所述配置信息确定所述kafka消息是否为有序消息;响应于所述kafka消息为有序消息,保存所述kafka消息且停止所述消息总线中处理该kafka消息的消费线程;响应于所述kafka消息为无序消息,保存所述kafka消息且提交对应于所述kafka消息的位移。
6.根据本公开的实施例,所述方法还包括:确定所述入库操作是否操作成功;响应于所述入库操作失败,根据所述配置信息确定所述kafka消息是否为有序消息;响应于所述kafka消息为有序消息,保存所述kafka消息且停止所述消息总线中处理该kafka消息的消费线程;响应于所述kafka消息为无序消息,保存所述kafka消息且提交对应于所述kafka消息的位移。
7.根据本公开的实施例,在所述获取kafka消息之后,所述方法还包括:根据所述kafka消息的类别获取对应于所述kafka消息的消费处理类;调用所述消费处理类对所述kafka消息进行处理,得到处理结果;根据所述处理结果确定处理是否成功;响应于处理成功,提交对应于所述kafka消息的位移;响应于处理失败,保存处理失败的所述kafka消息;响应于处理失败的所述kafka消息不允许提交位移,停止所述消息总线中处理失败的所述kafka消息所在的消费线程。
8.根据本公开的实施例,所述保存处理失败的所述kafka消息包括:调用消费异常记录上送接口保存处理失败的所述kafka消息。
9.根据本公开的实施例,所述方法还包括:根据所述kafka消息和所述处理结果生成应用画像日志,其中,所述应用画像日志用于对所述kafka消息的消费状态进行排查追踪。
10.根据本公开的实施例,其中,所述获取对应于所述kafka消息的配置信息、解析类以及入库处理类包括:根据所述kafka消息的类别从预先配置的spring容器中获取对应于所述kafka消息的消费处理类;从预先配置的数据来源表中获取表征所述kafka消息来源的字段信息,其中,所述数据来源表存储有表征各种类型kafka消息来源的字段信息;从预先配置的数据加工表中获取用于对kafka消息中的字段进行加工的字段加工信息,所述数据加工表存储有用于加工各种类型kafka消息的字段加工信息;所述字段信息和字段加工信息组成所述配置信息;根据所述字段信息从所述spring容器中获取对应于所述kafka消息的解析类;从预先配置的订阅表中获取所述入库处理类。
11.根据本公开的实施例,其中,所述入库处理类的操作类型包括mysql入库或oralce入库或gauss入库或邮件发送或自定义入库。
12.根据本公开的实施例,其中,所述字段加工信息表征的加工类型包括字段转换、过滤字段、字段判空、删除字段、字符串转日期格式处理中的至少之一。
13.根据本公开的实施例,其中,所述入库处理类封装有用于执行结构化查询语句的java的应用程序接口。
14.本公开实施例的第二方面提供一种消息处理装置,应用于消息总线,所述消息总线为基于kafka搭建的可扩展性消息总线,用于统一处理服务间的异步通信和数据同步,装置包括:第一获取模块,用于从kafka中获取kafka消息;第二获取模块,用于获取对应于所述kafka消息的配置信息、解析类以及入库处理类;解析模块,用于调用所述解析类对所述kafka消息进行解析,得到解析数据;第一确定模块,用于根据所述解析数据确定解析过程是否正常;加工模块,用于响应于所述解析结果正常,根据所述配置信息对所述kafka消息中的字段进行加工,得到加工结果;入库模块,用于调用所述入库处理类对所述加工结果执行入库操作。
15.根据本公开的实施例,还包括:第三获取模块,用于根据所述kafka消息的类别获取对应于所述kafka消息的消费处理类;处理模块,用于调用所述消费处理类对所述kafka消息进行处理,得到处理结果;第二确定模块,用于根据所述处理结果确定处理是否成功;提交模块,用于响应于处理成功,提交对应于所述kafka消息的位移;保存模块,用于响应于处理失败,保存处理失败的所述kafka消息;停止模块,用于响应于处理失败的所述kafka消息不允许提交位移,停止所述消息总线中处理失败的所述kafka消息所在的消费线程。
16.本公开实施例第三方面提供一种电子设备,包括:一个或多个处理器;存储装置,用于存储一个或多个程序,其中,当所述一个或多个程序被所述一个或多个处理器执行时,使得所述一个或多个处理器执行根据上述消息处理方法。
17.本公开实施例第四方面提供一种计算机可读存储介质,所述计算机可读存储介质上存储有可执行指令,该指令被处理器执行时使处理器执行根据上述消息处理方法。
18.本公开实施例第五方面提供一种计算机程序产品,包括计算机程序,所述计算机程序被处理器执行时实现根据上述消息处理方法。
19.根据本公开实施例提供的消息处理方法、装置、设备、介质及产品,至少具备以下有益效果:
20.基于kafka搭建的可扩展性消息总线来统一处理服务间的异步通信和数据同步,结合获取的配置信息、解析类以及入库处理类实现消息处理,由于配置信息、解析类以及入库处理类可以预先灵活配置,因此,业务系统只需要简单的配置,无需开发代码就可以实现表到表的自动数据入库和自动发邮件等操作,能够通过配置化实现消费方式多样化,不需要另外的开发成本,极大程度地减少了代码冗余和开发工作量,并且维护成本低、易操作。
21.在入库操作失败和解析结果异常的情况下,判定消息是有序消息还是无序消息,由于有序消息具有有序性不可重试,直接停止处理有序消息的消费线程,避免重复的无效操作浪费计算资源及网络资源。并且,通过提交kafka消息的位移来表征该kafka消息已被消费,便于后续排查追踪。
22.由于通过预先配置消费处理类,结合消息总线的可扩展性及统一处理服务间的异步通信和数据同步的能力,能够实现业务系统自定义消费数据和配置化自动消费两种场景,因此,能够进一步业务系统消费方式的多样化。
23.由于消费异常记录上送接口为异步调用,在通过调用消费异常记录上送接口保存处理失败的kafka消息时,无需等待接口返回直接执行下一步操作,因此,能够提高信息处理的效率。
24.由于应用画像日志能够直观地反应消息处理过程中的kafka消息的处理结果,异常消费性等数据,因此极大程度提升了后续排查最终的效率。
25.由于通过预先配置消息订阅表、数据来源表、数据加工表,因此,在消息处理时,能够直接调用这些表来实现kafka消息的自动消费。
26.由于自动数据入库支持mysqi/oracle/gauss等不同种类的数据库,一条消息可入多个同类型及不同类型的数据库,因此能够实现持多数据源自动入库功能。
27.通过配置多种字段加工方式,消息处理过程中可以灵活选择一种或多种字段加工方式对kafka消息的字段进行,提高了消息处理的灵活性。
28.由于入库处理类封装的用于执行结构化查询语句的java的应用程序接口具有分布式的特点,因此能够更好地满足分布式发布订阅消息的应用需求。
附图说明
29.为了更清楚地说明本公开实施例或现有技术中的技术方案,下面将对实施例或现有技术描述中所需要使用的附图作简单地介绍,显而易见地,下面描述中的附图仅仅是本公开的一些实施例,对于本领域普通技术人员来讲,在不付出创造性劳动的前提下,还可以根据这些附图示出的结构获得其他的附图。
30.图1示意性示出了根据本公开实施例的消息处理方法及装置的系统架构100;
31.图2示意性示出了根据本公开一实施例的消息处理方法的流程图;
32.图3示意性示出了根据本公开另一实施例的消息处理方法的流程图;
33.图4示意性示出了根据本公开实施例的配置化消费实现的逻辑图;
34.图5示意性示出了根据本公开又一实施例的消息处理方法的流程图;
35.图6示意性示出了根据本公开再一实施例的消息处理方法的流程图;
36.图7示意性示出了根据本公开实施例的自定义消费数据实现的逻辑图;
37.图8示意性示出了根据本公开实施例的操作s202中获取对应于kafka消息的配置
信息、解析类以及入库处理类的流程图;
38.图9示意性示出了根据本公开一实施例的消息处理装置的框图;
39.图10示意性示出了根据本公开另一实施例的消息处理装置的框图;
40.图11示意性示出了根据本公开实施例的适于实现上文描述的方法的电子设备的框图。
具体实施方式
41.以下,将参照附图来描述本公开的实施例。但是应该理解,这些描述只是示例性的,而并非要限制本公开的范围。在下面的详细描述中,为便于解释,阐述了许多具体的细节以提供对本公开实施例的全面理解。然而,明显地,一个或多个实施例在没有这些具体细节的情况下也可以被实施。此外,在以下说明中,省略了对公知结构和技术的描述,以避免不必要地混淆本公开的概念。
42.在此使用的术语仅仅是为了描述具体实施例,而并非意在限制本公开。在此使用的术语“包括”、“包含”等表明了所述特征、步骤、操作和/或部件的存在,但是并不排除存在或添加一个或多个其他特征、步骤、操作或部件。
43.在此使用的所有术语(包括技术和科学术语)具有本领域技术人员通常所理解的含义,除非另外定义。应注意,这里使用的术语应解释为具有与本说明书的上下文相一致的含义,而不应以理想化或过于刻板的方式来解释。
44.在使用类似于“a、b和c等中至少一个”这样的表述的情况下,一般来说应该按照本领域技术人员通常理解该表述的含义来予以解释(例如,“具有a、b和c中至少一个的系统”应包括但不限于单独具有a、单独具有b、单独具有c、具有a和b、具有a和c、具有b和c、和/或具有a、b、c的系统等)。在使用类似于“a、b或c等中至少一个”这样的表述的情况下,一般来说应该按照本领域技术人员通常理解该表述的含义来予以解释(例如,“具有a、b或c中至少一个的系统”应包括但不限于单独具有a、单独具有b、单独具有c、具有a和b、具有a和c、具有b和c、和/或具有a、b、c的系统等)。
45.附图中示出了一些方框图和/或流程图。应理解,方框图和/或流程图中的一些方框或其组合可以由计算机程序指令来实现。这些计算机程序指令可以提供给通用计算机、专用计算机或其他可编程数据一致性修复装置的处理器,从而这些指令在由该处理器执行时可以创建用于实现这些方框图和/或流程图中所说明的功能/操作的装置。本公开的技术可以硬件和/或软件(包括固件、微代码等)的形式来实现。另外,本公开的技术可以采取存储有指令的计算机可读存储介质上的计算机程序产品的形式,该计算机程序产品可供指令执行系统使用或者结合指令执行系统使用。
46.在本公开的技术方案中,所涉及的信息的收集、存储、使用、加工、传输、提供、公开和应用等处理,均符合相关法律法规的规定,采取了必要保密措施,且不违背公序良俗。
47.在本公开的技术方案中,若需要获取用户个人信息,在获取或采集用户个人信息之前,均获取了用户的授权或同意。
48.针对相关技术中存在的技术问题,本公开实施例提供了一种消息处理方法,应用于消息总线,消息总线为基于kafka搭建的可扩展性消息总线,用于统一处理服务间的异步通信和数据同步,方法包括:从kafka中获取kafka消息。获取对应于kafka消息的配置信息、
解析类以及入库处理类。调用解析类对kafka消息进行解析,得到解析数据。根据解析数据确定解析过程是否正常。响应于所述解析结果正常,根据所述配置信息对所述kafka消息中的字段进行加工,得到加工结果。调用入库处理类对加工结果执行入库操作。
49.图1示意性示出了根据本公开实施例的消息处理方法及装置的系统架构100。需要注意的是,图1所示仅为可以应用本公开实施例的系统架构的示例,以帮助本领域技术人员理解本公开的技术内容,但并不意味着本公开实施例不可以用于其他设备、系统、环境或场景。
50.如图1所示,根据该实施例的系统架构100可以包括数据库101、网络102和业务系统103。数据库101与业务系统103之间通过网络102通信。
51.数据库101中可以设置关键词表格和检索结果表格,关键词表格可以用于存储提取的待匹配的关键词。检索结果表格可以用于存储基于待匹配的关键词进行检索后的检索结果。
52.网络102可以包括各种连接类型,例如有线、无线通信链路或者光纤电缆等等。其中有线方式例如可以是采用线缆及以下多种接口中的任一种连接:光纤通道、红外线接口、d型数据接口、串行接口、usb接口、usb type-c接口或dock接口,无线方式例如可以是采用无线通信方式连接的,其中的无线通信例如可采用蓝牙、wi-fi、infrared、zigbee等多个无线技术标准中的任一个。
53.业务系统103中基于kafka搭建了可扩展性消息总线,用于统一处理服务间的异步通信和数据同步,消费总线可以由消息统一管理平台、消息构件、消费构件组成。业务系统103中的消息总线可以从kafka中获取kafka消息。获取对应于kafka消息的配置信息、解析类以及入库处理类。调用解析类对kafka消息进行解析,得到解析数据。根据解析数据确定解析过程是否正常。响应于解析结果正常,根据配置信息对kafka消息中的字段进行加工,得到加工结果。调用入库处理类对加工结果执行入库操作。
54.需要说明的是,本公开实施例所提供的消息处理方法可以由业务系统103执行。相应地,本公开实施例所提供的消息处理装置可以设置于业务系统103中。或者,本公开实施例所提供的消息处理方法也可以由不同于业务系统103且能够与数据库101和/或业务系统103进行本地通信的服务器或服务器集群执行。相应地,本公开实施例所提供的消息处理装置也可以设置于不同于业务系统103且能够与数据库101和/或转业务系统103进行本地通信的服务器或服务器集群中。或者,本公开实施例所提供的消息处理方法也可以部分由业务系统103执行,部分由数据库101执行。相应的
同质化
,本公开实施例所提供的消息处理装置也可以部分设置于业务系统103中,部分设置于数据库101中。
55.应该理解,图1中的数据库、网络、业务系统的数目仅仅是示意性的。根据实现需要,可以具有任意数目的数据库、网络、业务系统。
56.本公开实施例提供的消息处理方法,可以应用于金融科技领域。例如,对于银行而言,随着大数据技术的逐渐成熟与完善,越来越多的大数据技术被应用于银行系统,各个银行都在逐步构建数据集市、大数据平台等系统供用户订阅消息,传统金融业正在逐步向金融科技转变,但由于金融行业的安全性、实时性以及稳定性等要求,也对技术提出了更高的要求。目前,在大数据领域主流采用kafka(一个开源流处理平台,是一种高吞吐量的分布式发布订阅消息系统,可以处理消费者在网站中的所有动作流数据)实时或准实时消费数据,
主要是基于sparkstreaming(一个对实时数据流进行高通量、容错处理的流式处理系统)或者apache flink(一种开源流处理框架,其核心是用java和scala编写的分布式流数据流引擎)等大数据组件的实时流计算框架来实现。然而,基于大数据组件的实时流计算框架不仅接入门槛比较高,而且还需要消耗机器成本和付诸大量人力资源来进行代码开发及运维,从而导致用户采用kafka进行数据消费的成本高。基于此,采用本公开的实施例消息处理方法,业务系统只需要简单的配置,无需开发代码就可以实现表到表的自动数据入库和自动发邮件等操作,能够通过配置化实现消费方式多样化,不需要另外的开发成本,极大程度地减少了代码冗余和开发工作量,并且维护成本低、易操作。应当理解是,本公开实施例提供的消息处理方法不仅限于应用于金融科技领域,也可用于除金融领域之外的任意领域。上述描述只是示例性的,对于例涉及任何具有消息处理的领域,例如电子商务、物流等其他技术领域,都可以应用本公开实施例的消息处理方法。
57.以下将基于图1描述消息处理的场景,通过图2~图8对本公开实施例的消息处理方法进行详细描述。
58.图2示意性示出了根据本公开一实施例的消息处理方法的流程图。
59.如图2所示,该消息处理方法应用于消息总线,消息总线为基于kafka搭建的可扩展性消息总线,用于统一处理服务间的异步通信和数据同步,该消息处理方法实现配置化自动消费,例如可以包括操作s201~操作s206。
60.在操作s201,从kafka中获取kafka消息。
61.在操作s202,获取对应于kafka消息的配置信息、解析类以及入库处理类。
62.在操作s203,调用解析类对kafka消息进行解析,得到解析数据。
63.在操作s204,根据解析数据确定解析过程是否正常。
64.在操作s205,响应于解析结果正常,根据配置信息对kafka消息中的字段进行加工,得到加工结果。
65.在操作s206,调用入库处理类对加工结果执行入库操作。
66.在本公开的实施例中,基于kafka搭建的可扩展性消息总线可以集成消息定制、消息发送、消息订阅、消息异常处理等功能。kafka可以理解为一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。kafka将消息分类,每一类的消息称之为一个主题。每条发布到kafka集群的消息都有一个类别,这个类别被称为topic。(物理上不同topic的消息分开存储,逻辑上一个topic的消息虽然保存于一个或多个broker上但用户只需指定消息的topic即可生产或消费数据而不必关心数据存于何处)。
67.针对kafka的消息消费,消息总线的消费构件可提供全面的消费机制,消费构件可以包括消费线程初始化及保活、消息消费、消费重试、消费位移跳过等操作,能够通过配置化实现消费方式多样化,比如数据同步自动入库、自动发邮件、调用业务实现处理类等特色化需求,可以极大减少下游业务系统的开发和对接工作量。需要说明的是,本公开实施例中消息消费可以理解为将kafka消息数据按预设策略拉取出来后,依据拉取的kafka消息进行相应的操作,例如发邮件、数据同步入库等。kafka中的消息消费是一个不断轮询的过程,消费者可以重复调取拉取方法(poll()方法),而拉取方法返回的是所订阅的主题(分区)上
的一组消息。
68.根据本公开的实施例,基于kafka搭建的可扩展性消息总线来统一处理服务间的异步通信和数据同步,结合获取的配置信息、解析类以及入库处理类实现消息处理,由于配置信息、解析类以及入库处理类可以预先灵活配置,因此,业务系统只需要简单的配置,无需开发代码就可以实现表到表的自动数据入库和自动发邮件等操作,能够通过配置化实现消费方式多样化,不需要另外的开发成本,极大程度地减少了代码冗余和开发工作量,并且维护成本低、易操作。
69.图3示意性示出了根据本公开另一实施例的消息处理方法的流程图。
70.如图3所示,消息处理方法例如还可以包括操作s301~操作s303。
71.在操作s301,响应于解析结果异常,根据配置信息确定kafka消息是否为有序消息。
72.在操作s302,响应于kafka消息为有序消息,保存kafka消息且停止消息总线中处理该kafka消息的消费线程。
73.在操作s303,响应于kafka消息为无序消息,保存kafka消息且提交对应于kafka消息的位移。
74.解析异常例如可以是在网页浏览时,反馈网站错误等。
75.图4示意性示出了根据本公开实施例的配置化消费实现的逻辑图。
76.如图4所示,在解析异常的情况下,生成解析异常的告警信息并捕获异常信息,再根据topic取出缓存中的配置信息,判断kafka消息是否为有序消息。在确定kafka消息为有序消息的情况下,保存kafka消息到数据库中,有序消息具有有序性不可重试,发送报警后停掉该消费线程。在确定kafka消息为无序消息的情况下,保存kafka消息到数据库中,并且调度任务会间隔预设时间段拉取解析失败的kafka消息后重试处理操作。预设时间段例如可以是3秒,本公开不做限制。
77.根据本公开的实施例,在解析结果异常的情况下,判定消息是有序消息还是无序消息,由于有序消息具有有序性不可重试,直接停止处理有序消息的消费线程,避免重复的无效操作浪费计算资源及网络资源。并且,通过提交kafka消息的位移来表征该kafka消息已被消费,便于后续排查追踪。
78.图5示意性示出了根据本公开又一实施例的消息处理方法的流程图。
79.如图5所示,消息处理方法例如还可以包括操作s501~操作s504。
80.在操作s501,确定入库操作是否操作成功。
81.在操作s502,响应于入库操作失败,根据配置信息确定kafka消息是否为有序消息。
82.在操作s503,响应于kafka消息为有序消息,保存kafka消息且停止消息总线中处理该kafka消息的消费线程。
83.在操作s504,响应于kafka消息为无序消息,保存kafka消息且提交对应于kafka消息的位移。
84.继续参阅4,在入库操作失败的情况下,生成解析异常的告警信息并捕获异常信息,再根据topic取出缓存中的配置信息,判断kafka消息是否为有序消息。在确定kafka消息为有序消息的情况下,保存kafka消息到数据库中,有序消息具有有序性不可重试,发送
报警后停掉该消费线程。在确定kafka消息为无序消息的情况下,保存kafka消息到数据库中,并且调度任务会间隔预设时间段拉取解析失败的kafka消息后重试处理操作。位移可以理解为kafka偏移量,表示该数据已消费过。
85.根据本公开的实施例,在入库操作失败的情况下,判定消息是有序消息还是无序消息,由于有序消息具有有序性不可重试,直接停止处理有序消息的消费线程,避免重复的无效操作浪费计算资源及网络资源。并且,通过提交kafka消息的位移来表征该kafka消息已被消费,便于后续排查追踪。
86.前述介绍的是基于消费总线实现配置化自动消费的具体细节,消费总线中的消费构件支持数据自动入库/自动发邮件等功能,其中自动入库支持多数据源自动入库功能,即一条消息可入多个同类型及不同类型的数据库。配置化自动消费过程无需开发代码,仅需配置信息即可实现数据自动消费。
87.基于消费总线,本公开的实施例还可以实现自定义消费数据的场景自定义消费数据,是消费业务系统拉取kafka消息后,还需要做个性化的消息处理。消息总线的消费构件支持个性化定制的消费,只需要消费业务系统编写消息处理类,消息总线根据反射机制获取该程序消费数据。
88.图6示意性示出了根据本公开再一实施例的消息处理方法的流程图。
89.如图6所示,消息处理方法实现自定义消费数据,例如还可以包括操作s601~操作s606。
90.在操作s601,根据kafka消息的类别获取对应于kafka消息的消费处理类。
91.在操作s602,调用消费处理类对kafka消息进行处理,得到处理结果。
92.在操作s603,根据处理结果确定处理是否成功。
93.在操作s604,响应于处理成功,提交对应于kafka消息的位移。
94.在操作s605,响应于处理失败,保存处理失败的kafka消息。
95.在操作s606,响应于处理失败的kafka消息不允许提交位移,停止消息总线中处理失败的kafka消息所在的消费线程。
96.图7示意性示出了根据本公开实施例的自定义消费数据实现的逻辑图。
97.如图7所示,在本公开的实施例中,在拉取到kafka消息后才能消费,因此可以从kafka集群发起拉取指定topic数据请求获取数据,再判断是否拉取到kafka消息,若有消息,日志记录拉取消息开始记录记,进行下一步,若无数据,拉取自动阻塞一定时间,一定时间后再次发起拉取。一定时间例如可以是3秒,本公开不做限制。
98.在确定有消息的情况下,可以通过kafka消息的编号从缓存中取出消费处理类(消费处理类需要业务系统预先配置),调用消费处理类处理kafka消息。处理成功,则可以提交位移。处理失败,则可以保存失败信息并判断是否提交位移。判断是否提交位移的过程可以包括:通过应用群组编号从缓存中取出订阅信息。根据订阅信息是否重试参数与消息处理结果判断是否提交位移。处理成功或处理失败允许提交位移,继续从kafka拉取数据。处理失败并且不允许提交位移,表示该数据为有序数据,即使在重试也有可能一直处理失败,有序数据为保证消费有序不能提交位移,直接发送报警通知并停掉该消费线程。可以调用kafka手动提交位移方法提交位移。
99.根据本公开的实施例,由于通过预先配置消费处理类,结合消息总线的可扩展性
及统一处理服务间的异步通信和数据同步的能力,能够实现业务系统自定义消费数据和配置化自动消费两种场景,因此,能够进一步业务系统消费方式的多样化。
100.进一步地,可以调用消费异常记录上送(send consume exception record)接口保存处理失败的kafka消息。由于消费异常记录上送接口为异步调用,在通过调用消费异常记录上送接口保存处理失败的kafka消息时,无需等待接口返回直接执行下一步操作,因此,能够提高信息处理的效率。
101.进一步地,还可以根据kafka消息和处理结果生成应用画像日志,其中,应用画像日志用于对kafka消息的消费状态进行排查追踪。具体地,将消息处理的结果及kafka消息输出到日志里进行记录,生成应用画像日志。由于应用画像日志能够直观的反应消息处理过程中的kafka消息的处理结果,异常消费性等数据,因此极大程度提升了后续排查最终的效率。
102.在上述实施例的基础上,业务系统搭载的消息总线可以预先配置消息订阅表、数据来源表、数据加工表,实现配置化自动消费。
103.图8示意性示出了根据本公开实施例的操作s202中获取对应于kafka消息的配置信息、解析类以及入库处理类的流程图。
104.如图8所示,操作s202中获取对应于kafka消息的配置信息、解析类以及入库处理类例如可以包括操作s801~操作s805。
105.在操作s801,根据kafka消息的类别从预先配置的spring容器中获取对应于kafka消息的消费处理类。
106.在操作s803,从预先配置的数据来源表中获取表征kafka消息来源的字段信息,其中,数据来源表存储有表征各种类型kafka消息来源的字段信息。
107.在操作s803,从预先配置的数据加工表中获取用于对kafka消息中的字段进行加工的字段加工信息,数据加工表存储有用于加工各种类型kafka消息的字段加工信息;字段信息和字段加工信息组成配置信息。
108.在操作s804,根据字段信息从spring容器中获取对应于kafka消息的解析类。
109.在操作s805,从预先配置的订阅表中获取入库处理类。
110.例如,可以根据topic从预先配置的数据来源表与数据加工表中获取配置信息,为后续的数据加工及数据入库操作做准备。通过预先配置的数据来源表中的sync_type(消息来源类型)字段信息从spring容器中获取解析类,来解析对应的kafka消息。加工处理过程中,从缓存中取出字段加工信息,根据加工信息对相应的字段做加工处理,遍历拉取到的消息,取出字段与加工字段比较,相等的话根据加工类型调用相应的加工方法。
111.优选地,字段加工信息表征的加工类型包括字段转换、过滤字段、字段判空、删除字段、字符串转日期格式处理中的至少之一。
112.根据本公开的实施例,由于通过预先配置消息订阅表、数据来源表、数据加工表,因此,在消息处理时,能够直接调用这些表来实现kafka消息的自动消费。通过配置多种字段加工方式,消息处理过程中可以灵活选择一种或多种字段加工方式对kafka消息的字段进行,提高了消息处理的灵活性。
113.又例如,可以根据预先配置的数据订阅表中的operate_type字段来获取对应的操作类。优选地,入库处理类的操作类型包括mysql入库或oralce入库或gauss入库或邮件发
送或自定义入库。示例性的,日志操作类型为表示为自定义入库,需要根据订阅表中的custom_handle字段值从spring容器中来获取对应的操作实现类,使用方可根据该配置来扩展多数据源入库操作,或自定处理操作。
114.根据本公开的实施例,由于自动数据入库支持mysqi/oracle/gauss等不同种类的数据库,一条消息可入多个同类型及不同类型的数据库,因此能够实现持多数据源自动入库功能。
115.进一步地,入库处理类可以封装有用于执行结构化查询语句(sql)的java的应用程序接口,通过组装sql从而完成对数据的入库操作。
116.根据本公开的实施例,由于入库处理类封装的用于执行结构化查询语句的java的应用程序接口具有分布式的特点,因此能够更好地满足分布式发布订阅消息的应用需求。
117.综上所述,通过消息总线全面的可配置化消费机制,可实现业务系统自定义消费数据、配置化自动消费两种场景,满足业务系统多样化的消费方式。特别是自动消费的场景,业务系统只需要简单的配置,就可以实现表到表的自动数据入库和自动发邮件等操作,自动数据入库支持mysqi/oracle/gauss等不同种类的数据库,不需要另外的开发成本,极大程度地减少了代码冗余和开发工作量。
118.基于图2~图8所示的消息处理方法,本公开实施例还提供一种消息处理装置,以下将基于图1描述的场景,通过图9~图10对本公开实施例的消息处理装置进行描述。
119.图9示意性示出了根据本公开一实施例的消息处理装置的框图。
120.如图9所示,消息处理装置900可以包括第一获取模块910、第二获取模块920、解析模块930、第一确定模块940、加工模块950以及入库模块960。
121.第一获取模块910,用于从kafka中获取kafka消息。第一获取模块910可以用于执行前文描述的操作s201,在此不再赘述。
122.第二获取模块920,用于获取对应于kafka消息的配置信息、解析类以及入库处理类。第二获取模块920可以用于执行前文描述的操作s202,在此不再赘述。
123.解析模块930,用于调用解析类对kafka消息进行解析,得到解析数据。解析模块930可以用于执行前文描述的操作s203,在此不再赘述。
124.第一确定模块940,用于根据解析数据确定解析过程是否正常。第一确定模块940可以用于执行前文描述的操作s204,在此不再赘述。
125.加工模块950,用于响应于解析结果正常,根据配置信息对kafka消息中的字段进行加工,得到加工结果。加工模块950可以用于执行前文描述的操作s205,在此不再赘述。
126.入库模块960,用于调用入库处理类对加工结果执行入库操作。入库模块960可以用于执行前文描述的操作s206,在此不再赘述。
127.图10示意性示出了根据本公开另一实施例的消息处理装置的框图。
128.如图10所示,消息处理装置900例如还可以包括第三获取模块970、处理模块980、第二确定模块990、提交模块9100、保存模块9110以及停止模块9120。
129.第三获取模块970,用于根据kafka消息的类别获取对应于kafka消息的消费处理类。第三获取模块970可以用于执行前文描述的操作s601,在此不再赘述。
130.处理模块980,用于调用消费处理类对kafka消息进行处理,得到处理结果。处理模块980可以用于执行前文描述的操作s602,在此不再赘述。
131.第二确定模块990,用于根据处理结果确定处理是否成功。第二确定模块990可以用于执行前文描述的操作s603,在此不再赘述。
132.提交模块9100,用于响应于处理成功,提交对应于kafka消息的位移。提交模块9100可以用于执行前文描述的操作s604,在此不再赘述。
133.保存模块9110,用于响应于处理失败,保存处理失败的kafka消息。保存模块9110可以用于执行前文描述的操作s605,在此不再赘述。
134.停止模块9120,用于响应于处理失败的kafka消息不允许提交位移,停止消息总线中处理失败的kafka消息所在的消费线程。停止模块9120可以用于执行前文描述的操作s606,在此不再赘述。
135.根据本公开的实施例的模块、子模块、单元、子单元中的任意多个、或其中任意多个的至少部分功能可以在一个模块中实现。根据本公开实施例的模块、子模块、单元、子单元中的任意一个或多个可以被拆分成多个模块来实现。根据本公开实施例的模块、子模块、单元、子单元中的任意一个或多个可以至少被部分地实现为硬件电路,例如现场可编程门阵列(fpga)、可编程逻辑阵列(pla)、片上系统、基板上的系统、封装上的系统、专用集成电路(asic),或可以通过对电路进行集成或封装的任何其他的合理方式的硬件或固件来实现,或以软件、硬件以及固件三种实现方式中任意一种或以其中任意几种的适当组合来实现。或者,根据本公开实施例的模块、子模块、单元、子单元中的一个或多个可以至少被部分地实现为计算机程序模块,当该计算机程序模块被运行时,可以执行相应的功能。
136.例如,第一获取模块910、第二获取模块920、解析模块930、第一确定模块940、加工模块950、入库模块960、第三获取模块970、处理模块980、第二确定模块990、提交模块9100、保存模块9110以及停止模块9120中的任意多个可以合并在一个模块/单元/子单元中实现,或者其中的任意一个模块/单元/子单元可以被拆分成多个模块/单元/子单元。或者,这些模块/单元/子单元中的一个或多个模块/单元/子单元的至少部分功能可以与其他模块/单元/子单元的至少部分功能相结合,并在一个模块/单元/子单元中实现。根据本公开的实施例,第一获取模块910、第二获取模块920、解析模块930、第一确定模块940、加工模块950、入库模块960、第三获取模块970、处理模块980、第二确定模块990、提交模块9100、保存模块9110以及停止模块9120中的至少一个可以至少被部分地实现为硬件电路,例如现场可编程门阵列(fpga)、可编程逻辑阵列(pla)、片上系统、基板上的系统、封装上的系统、专用集成电路(asic),或可以通过对电路进行集成或封装的任何其他的合理方式等硬件或固件来实现,或以软件、硬件以及固件三种实现方式中任意一种或以其中任意几种的适当组合来实现。或者,第一获取模块910、第二获取模块920、解析模块930、第一确定模块940、加工模块950、入库模块960、第三获取模块970、处理模块980、第二确定模块990、提交模块9100、保存模块9110以及停止模块9120中的至少一个可以至少被部分地实现为计算机程序模块,当该计算机程序模块被运行时,可以执行相应的功能。
137.需要说明的是,本公开的实施例中消息处理装置部分与本公开的实施例中消息处理方法部分是相对应的,其具体实施细节及带来的技术效果也是相同的,在此不再赘述。
138.图11示意性示出了根据本公开实施例的适于实现上文描述的方法的电子设备的框图。图11示出的电子设备仅仅是一个示例,不应对本公开实施例的功能和使用范围带来任何限制。
139.如图11所示,根据本公开实施例的电子设备1100包括处理器1101,其可以根据存储在只读存储器(rom)1102中的程序或者从存储部分1108加载到随机访问存储器(ram)1103中的程序而执行各种适当的动作和处理。处理器1101例如可以包括通用微处理器(例如cpu)、指令集处理器和/或相关芯片组和/或专用微处理器(例如,专用集成电路(asic)),等等。处理器1101还可以包括用于缓存用途的板载存储器。处理器1101可以包括用于执行根据本公开实施例的方法流程的不同动作的单一处理单元或者是多个处理单元。
140.在ram 1103中,存储有电子设备1100操作所需的各种程序和数据。处理器1101、rom 1102以及ram1103通过总线1104彼此相连。处理器1101通过执行rom 1102和/或ram1 103中的程序来执行根据本公开实施例的方法流程的各种操作。需要注意,所述程序也可以存储在除rom 1102和ram 1103以外的一个或多个存储器中。处理器1101也可以通过执行存储在所述一个或多个存储器中的程序来执行根据本公开实施例的方法流程的各种操作。
141.根据本公开的实施例,电子设备1100还可以包括输入/输出(i/o)接口1105,输入/输出(i/o)接口1105也连接至总线1104。电子设备1100还可以包括连接至i/o接口1105的以下部件中的一项或多项:包括键盘、鼠标等的输入部分1106;包括诸如阴极射线管(crt)、液晶显示器(lcd)等以及扬声器等的输出部分1107;包括硬盘等的存储部分1108;以及包括诸如lan卡、调制解调器等的网络接口卡的通信部分1109。通信部分1109经由诸如因特网的网络执行通信处理。驱动器1110也根据需要连接至i/o接口1105。可拆卸介质11 11,诸如磁盘、光盘、磁光盘、半导体存储器等等,根据需要安装在驱动器1110上,以便于从其上读出的计算机程序根据需要被安装入存储部分1108。
142.根据本公开的实施例,根据本公开实施例的方法流程可以被实现为计算机软件程序。例如,本公开的实施例包括一种计算机程序产品,其包括承载在计算机可读存储介质上的计算机程序,该计算机程序包含用于执行流程图所示的方法的程序代码。在这样的实施例中,该计算机程序可以通过通信部分1109从网络上被下载和安装,和/或从可拆卸介质1111被安装。在该计算机程序被处理器1101执行时,执行本公开实施例的系统中限定的上述功能。根据本公开的实施例,上文描述的系统、设备、装置、模块、单元等可以通过计算机程序模块来实现。
143.本公开还提供了一种计算机可读存储介质,该计算机可读存储介质可以是上述实施例中描述的设备/装置/系统中所包含的;也可以是单独存在,而未装配入该设备/装置/系统中。上述计算机可读存储介质承载有一个或者多个程序,当上述一个或者多个程序被执行时,实现根据本公开实施例的方法。
144.根据本公开的实施例,计算机可读存储介质可以是非易失性的计算机可读存储介质。例如可以包括但不限于:便携式计算机磁盘、硬盘、随机访问存储器(ram)、只读存储器(rom)、可擦式可编程只读存储器(eprom或闪存)、便携式紧凑磁盘只读存储器(cd-rom)、光存储器件、磁存储器件、或者上述的任意合适的组合。在本公开中,计算机可读存储介质可以是任何包含或存储程序的有形介质,该程序可以被指令执行系统、装置或者器件使用或者与其结合使用。
145.例如,根据本公开的实施例,计算机可读存储介质可以包括上文描述的rom1102和/或ram 1103和/或rom 1102和ram 1103以外的一个或多个存储器。
146.附图中的流程图和框图,图示了按照本公开各种实施例的系统、方法和计算机程
序产品的可能实现的体系架构、功能和操作。在这点上,流程图或框图中的每个方框可以代表一个模块、程序段、或代码的一部分,上述模块、程序段、或代码的一部分包含一个或多个用于实现规定的逻辑功能的可执行指令。也应当注意,在有些作为替换的实现中,方框中所标注的功能也可以以不同于附图中所标注的顺序发生。例如,两个接连地表示的方框实际上可以基本并行地执行,它们有时也可以按相反的顺序执行,这依所涉及的功能而定。也要注意的是,框图或流程图中的每个方框、以及框图或流程图中的方框的组合,可以用执行规定的功能或操作的专用的基于硬件的系统来实现,或者可以用专用硬件与计算机指令的组合来实现。本领域技术人员可以理解,本公开的各个实施例和/或权利要求中记载的特征可以进行多种组合和/或结合,即使这样的组合或结合没有明确记载于本公开中。特别地,在不脱离本公开精神和教导的情况下,本公开的各个实施例和/或权利要求中记载的特征可以进行多种组合和/或结合。所有这些组合和/或结合均落入本公开的范围。

技术特征:
1.一种消息处理方法,应用于消息总线,所述消息总线为基于kafka搭建的可扩展性消息总线,用于统一处理服务间的异步通信和数据同步,方法包括:获取kafka消息;获取对应于所述kafka消息的配置信息、解析类以及入库处理类;调用所述解析类对所述kafka消息进行解析,得到解析数据;根据所述解析数据确定解析过程是否正常;响应于所述解析结果正常,根据所述配置信息对所述kafka消息中的字段进行加工,得到加工结果;调用所述入库处理类对所述加工结果执行入库操作。2.根据权利要求1所述的消息处理方法,所述方法还包括:响应于所述解析结果异常,根据所述配置信息确定所述kafka消息是否为有序消息;响应于所述kafka消息为有序消息,保存所述kafka消息且停止所述消息总线中处理该kafka消息的消费线程;响应于所述kafka消息为无序消息,保存所述kafka消息且提交对应于所述kafka消息的位移。3.根据权利要求1所述的消息处理方法,所述方法还包括:确定所述入库操作是否操作成功;响应于所述入库操作失败,根据所述配置信息确定所述kafka消息是否为有序消息;响应于所述kafka消息为有序消息,保存所述kafka消息且停止所述消息总线中处理该kafka消息的消费线程;响应于所述kafka消息为无序消息,保存所述kafka消息且提交对应于所述kafka消息的位移。4.根据权利要求1所述的消息处理方法,在所述获取kafka消息之后,所述方法还包括:根据所述kafka消息的类别获取对应于所述kafka消息的消费处理类;调用所述消费处理类对所述kafka消息进行处理,得到处理结果;根据所述处理结果确定处理是否成功;响应于处理成功,提交对应于所述kafka消息的位移;响应于处理失败,保存处理失败的所述kafka消息;响应于处理失败的所述kafka消息不允许提交位移,停止所述消息总线中处理失败的所述kafka消息所在的消费线程。5.根据权利要求4所述的消息处理方法,所述保存处理失败的所述kafka消息包括:调用消费异常记录上送接口保存处理失败的所述kafka消息。6.根据权利要求4所述的消息处理方法,所述方法还包括:根据所述kafka消息和所述处理结果生成应用画像日志,其中,所述应用画像日志用于对所述kafka消息的消费状态进行排查追踪。7.根据权利要求1-6任一项所述的消息处理方法,其中,所述获取对应于所述kafka消息的配置信息、解析类以及入库处理类包括:根据所述kafka消息的类别从预先配置的spring容器中获取对应于所述kafka消息的消费处理类;
从预先配置的数据来源表中获取表征所述kafka消息来源的字段信息,其中,所述数据来源表存储有表征各种类型kafka消息来源的字段信息;从预先配置的数据加工表中获取用于对kafka消息中的字段进行加工的字段加工信息,所述数据加工表存储有用于加工各种类型kafka消息的字段加工信息;所述字段信息和字段加工信息组成所述配置信息;根据所述字段信息从所述spring容器中获取对应于所述kafka消息的解析类;从预先配置的订阅表中获取所述入库处理类。8.根据权利要求7所述的消息处理方法,其中,所述入库处理类的操作类型包括mysql入库或oralce入库或gauss入库或邮件发送或自定义入库。9.根据权利要求7所述的消息处理方法,其中,所述字段加工信息表征的加工类型包括字段转换、过滤字段、字段判空、删除字段、字符串转日期格式处理中的至少之一。10.根据权利要求1-6任一项所述的消息处理方法,其中,所述入库处理类封装有用于执行结构化查询语句的java的应用程序接口。11.一种消息处理装置,应用于消息总线,所述消息总线为基于kafka搭建的可扩展性消息总线,用于统一处理服务间的异步通信和数据同步,装置包括:第一获取模块,用于从kafka中获取kafka消息;第二获取模块,用于获取对应于所述kafka消息的配置信息、解析类以及入库处理类;解析模块,用于调用所述解析类对所述kafka消息进行解析,得到解析数据;第一确定模块,用于根据所述解析数据确定解析过程是否正常;加工模块,用于响应于所述解析结果正常,根据所述配置信息对所述kafka消息中的字段进行加工,得到加工结果;入库模块,用于调用所述入库处理类对所述加工结果执行入库操作。12.根据权利要求11所述的消息处理装置,还包括:第三获取模块,用于根据所述kafka消息的类别获取对应于所述kafka消息的消费处理类;处理模块,用于调用所述消费处理类对所述kafka消息进行处理,得到处理结果;第二确定模块,用于根据所述处理结果确定处理是否成功;提交模块,用于响应于处理成功,提交对应于所述kafka消息的位移;保存模块,用于响应于处理失败,保存处理失败的所述kafka消息;停止模块,用于响应于处理失败的所述kafka消息不允许提交位移,停止所述消息总线中处理失败的所述kafka消息所在的消费线程。13.一种电子设备,包括:一个或多个处理器;存储装置,用于存储一个或多个程序,其中,当所述一个或多个程序被所述一个或多个处理器执行时,使得所述一个或多个处理器执行根据权利要求1~10中任一项所述的方法。14.一种计算机可读存储介质,所述计算机可读存储介质上存储有可执行指令,该指令被处理器执行时使处理器执行根据权利要求1~10中任一项所述的方法。15.一种计算机程序产品,包括计算机程序,所述计算机程序被处理器执行时实现根据
权利要求1~10中任一项所述的方法。

技术总结
本公开提供一种消息处理方法、装置、设备、介质及产品,应用于消息总线,消息总线为基于kafka搭建的可扩展性消息总线,用于统一处理服务间的异步通信和数据同步,涉及分布式技术领域,可以应用于金融科技技术领域,方法包括:获取kafka消息。获取对应于kafka消息的配置信息、解析类以及入库处理类。调用解析类对kafka消息进行解析,得到解析数据。根据解析数据确定解析过程是否正常。响应于所述解析结果正常,根据所述配置信息对所述kafka消息中的字段进行加工,得到加工结果。调用入库处理类对加工结果执行入库操作。加工结果执行入库操作。加工结果执行入库操作。


技术研发人员:黄剑佳 李伟 王妍 程鹏
受保护的技术使用者:中国工商银行股份有限公司
技术研发日:2023.07.11
技术公布日:2023/10/15
版权声明

本文仅代表作者观点,不代表航空之家立场。
本文系作者授权航家号发表,未经原创作者书面授权,任何单位或个人不得引用、复制、转载、摘编、链接或以其他任何方式复制发表。任何单位或个人在获得书面授权使用航空之家内容时,须注明作者及来源 “航空之家”。如非法使用航空之家的部分或全部内容的,航空之家将依法追究其法律责任。(航空之家官方QQ:2926969996)

飞行汽车 https://www.autovtol.com/

分享:

扫一扫在手机阅读、分享本文

相关推荐