MQTT和Kafka之间的数据交互方法、装置和电子设备与流程

未命名 09-07 阅读:122 评论:0

mqtt和kafka之间的数据交互方法、装置和电子设备
技术领域
1.本发明涉及数据通信的技术领域,尤其是涉及一种mqtt和kafka之间的数据交互方法、装置和电子设备。


背景技术:

2.随着智能车联网的快速发展,车联网需要连接海量的车辆终端,并处理实时的车况数据和控车指令。
3.流处理平台kafka是处理数据最普遍的模式。mqtt是车辆与云端的主要通信方式。kafka的侧重点在于数据的存储和读取,针对实时性比较高的流式数据处理场景;而mqtt的侧重点在于客户端和服务器的通信,适用于物联网设备的网络接入。
4.目前,如何在车联网场景下按照用户需求实现mqtt和kafka之间的高效、灵活和可靠的数据交互成为目前亟需解决的技术问题。


技术实现要素:

5.有鉴于此,本发明的目的在于提供一种mqtt和kafka之间的数据交互方法、装置和电子设备,以缓解现有技术无法在车联网场景下按照用户需求实现mqtt和kafka之间的高效、灵活和可靠的数据交互的技术问题。
6.第一方面,本发明实施例提供了一种mqtt和kafka之间的数据交互方法,应用于数据处理规则管理引擎,所述方法包括:
7.获取用户配置的基于sql的业务规则,进而得到mqtt消息发布时的第一数据处理规则;
8.获取车辆终端发布的mqtt消息,并根据所述第一数据处理规则将所述mqtt消息中的目标车辆数据写入对应的kafka,进而将携带有目标车辆数据的kafka消息上传至目标单元;
9.获取用户配置的数据下发规则,得到kafka消息发布时的第二数据处理规则;
10.拉取kafka中目标主题的数据,并按照对应的第二数据处理规则对所述目标主题的数据中的目标数据进行转码操作,进而将转码操作后的数据进行目标mqtt消息重建,以将重建得到的目标mqtt消息下发至对应的车辆终端。
11.进一步的,所述基于sql的业务规则包括:mqtt消息的主题、mqtt消息的数据处理规则、kafka消息的主题、kafka消息的关键字和kafka消息的值。
12.进一步的,所述数据下发规则包括:kafka消息的主题和kafka消息的转码操作规则。
13.进一步的,所述转码操作规则的数量至少有一条。
14.进一步的,获取用户配置的基于sql的业务规则,包括:
15.通过api的方式获取用户配置的所述基于sql的业务规则;
16.获取用户配置的数据下发规则,包括:
17.通过api的方式获取用户配置的数据下发规则。
18.进一步的,所述基于sql的业务规则和所述数据下发规则可根据用户需要进行调整。
19.第二方面,本发明实施例还提供了一种mqtt和kafka之间的数据交互装置,应用于数据处理规则管理引擎,所述装置包括:
20.第一获取单元,用于获取用户配置的基于sql的业务规则,进而得到mqtt消息发布时的第一数据处理规则;
21.写入单元,用于获取车辆终端发布的mqtt消息,并根据所述第一数据处理规则将所述mqtt消息中的目标车辆数据写入对应的kafka,进而将携带有目标车辆数据的kafka消息上传至目标单元;
22.第二获取单元,用于获取用户配置的数据下发规则,得到kafka消息发布时的第二数据处理规则;
23.转码操作单元,用于拉取kafka中目标主题的数据,并按照对应的第二数据处理规则对所述目标主题的数据中的目标数据进行转码操作,进而将转码操作后的数据进行目标mqtt消息重建,以将重建得到的目标mqtt消息下发至对应的车辆终端。
24.进一步的,所述基于sql的业务规则包括:mqtt消息的主题、mqtt消息的数据处理规则、kafka消息的主题、kafka消息的关键字和kafka消息的值。
25.第三方面,本发明实施例还提供了一种电子设备,包括存储器、处理器及存储在所述存储器上并可在所述处理器上运行的计算机程序,所述处理器执行所述计算机程序时实现上述第一方面任一项所述的方法的步骤。
26.第四方面,本发明实施例还提供了一种计算机可读存储介质,所述计算机可读存储介质存储有机器可运行指令,所述机器可运行指令在被处理器调用和运行时,所述机器可运行指令促使所述处理器运行上述第一方面任一项所述的方法。
27.在本发明实施例中,提供了一种mqtt和kafka之间的数据交互方法,应用于数据处理规则管理引擎,该方法包括:获取用户配置的基于sql的业务规则,进而得到mqtt消息发布时的第一数据处理规则;获取车辆终端发布的mqtt消息,并根据第一数据处理规则将mqtt消息中的目标车辆数据写入对应的kafka,进而将携带有目标车辆数据的kafka消息上传至目标单元;获取用户配置的数据下发规则,得到kafka消息发布时的第二数据处理规则;拉取kafka中目标主题的数据,并按照对应的第二数据处理规则对目标主题的数据中的目标数据进行转码操作,进而将转码操作后的数据进行目标mqtt消息重建,以将重建得到的目标mqtt消息下发至对应的车辆终端。通过上述描述可知,本发明的mqtt和kafka之间的数据交互方法中,能够根据用户配置的基于sql的业务规则将车辆终端发布的mqtt消息中的目标车辆数据写入对应的kafka,进而实现携带有目标车辆数据的kafka消息的上传,同时,能够根据用户配置的数据下发规则将kafka中目标主题的数据中的目标数据进行转码操作,并将转码操作后的数据进行目标mqtt消息重建,实现将重建得到的目标mqtt消息下发至对应的车辆终端,即能够根据用户的配置(也就是按照用户需求)实现mqtt和kafka之间的高效、灵活和可靠的数据交互,缓解了现有技术无法在车联网场景下按照用户需求实现mqtt和kafka之间的高效、灵活和可靠的数据交互的技术问题。
附图说明
28.为了更清楚地说明本发明具体实施方式或现有技术中的技术方案,下面将对具体实施方式或现有技术描述中所需要使用的附图作简单地介绍,显而易见地,下面描述中的附图是本发明的一些实施方式,对于本领域普通技术人员来讲,在不付出创造性劳动的前提下,还可以根据这些附图获得其他的附图。
29.图1为本发明实施例提供的一种mqtt和kafka之间的数据交互方法的流程图;
30.图2为本发明实施例提供的目标车辆数据上传的示意图;
31.图3为本发明实施例提供的目标mqtt消息下发的示意图;
32.图4为本发明实施例提供的基于sql的业务规则的示意图;
33.图5为本发明实施例提供的数据下发规则的示意图;
34.图6为本发明实施例提供的两条转码操作规则的示意图;
35.图7为本发明实施例提供的一种mqtt和kafka之间的数据交互装置的示意图;
36.图8为本发明实施例提供的一种电子设备的示意图。
具体实施方式
37.下面将结合实施例对本发明的技术方案进行清楚、完整地描述,显然,所描述的实施例是本发明一部分实施例,而不是全部的实施例。基于本发明中的实施例,本领域普通技术人员在没有做出创造性劳动前提下所获得的所有其他实施例,都属于本发明保护的范围。
38.现有技术无法在车联网场景下按照用户需求实现mqtt和kafka之间的高效、灵活和可靠的数据交互。
39.基于此,本发明的mqtt和kafka之间的数据交互方法中,能够根据用户配置的基于sql的业务规则将车辆终端发布的mqtt消息中的目标车辆数据写入对应的kafka,进而实现携带有目标车辆数据的kafka消息的上传,同时,能够根据用户配置的数据下发规则将kafka中目标主题的数据中的目标数据进行转码操作,并将转码操作后的数据进行目标mqtt消息重建,实现将重建得到的目标mqtt消息下发至对应的车辆终端,即能够根据用户的配置(也就是按照用户需求)实现mqtt和kafka之间的高效、灵活和可靠的数据交互。
40.为便于对本实施例进行理解,首先对本发明实施例所公开的一种mqtt和kafka之间的数据交互方法进行详细介绍。
41.实施例一:
42.根据本发明实施例,提供了一种mqtt和kafka之间的数据交互方法的实施例,需要说明的是,在附图的流程图示出的步骤可以在诸如一组计算机可执行指令的计算机系统中执行,并且,虽然在流程图中示出了逻辑顺序,但是在某些情况下,可以以不同于此处的顺序执行所示出或描述的步骤。
43.图1是根据本发明实施例的一种mqtt和kafka之间的数据交互方法的流程图,如图1所示,该方法包括如下步骤:
44.步骤s102,获取用户配置的基于sql的业务规则,进而得到mqtt消息发布时的第一数据处理规则;
45.在本发明实施例中,上述mqtt和kafka之间的数据交互方法可以应用于数据处理
规则管理引擎。
46.mqtt是基于发布/订阅范式的消息协议,kafka的生产、消费的流程也是属于发布/订阅范式。mqtt+kafka主题,是mqtt和kafka之间的通信通道,它们采用订阅发布方式交互使用,每个主题都有一个或多个规则,用于在消息发布时进行数据处理。
47.数据处理规则管理引擎,是一个用于管理基于sql的业务规则和数据下发规则的组件,它可以通过api的方式接收业务规则和数据下发规则,并将基于sql的业务规则翻译为kafka中对应的key、value和sink。
48.在进行数据交互之前,用户需要构建mqtt+kafka主题,具体包括创建主题(即mqtt消息的主题)和基于sql的业务规则,主题供mqtt和kafka采用订阅发布方式使用,基于sql的业务规则用于在mqtt消息发布时进行数据处理。
49.具体的,基于sql的业务规则包括:mqtt消息的主题、mqtt消息的数据处理规则、kafka消息的主题、kafka消息的关键字和kafka消息的值,下文中再具体举例说明。
50.用户创建主题(即mqtt消息的主题)和基于sql的业务规则后,可以将其创建的基于sql的业务规则通过api的方式传入到数据处理规则管理引擎中,这样,数据处理规则管理引擎便能获取到用户配置的基于sql的业务规则,并将它们翻译为kafka中对应的key、value和sink,得到mqtt消息发布时的第一数据处理规则。
51.步骤s104,获取车辆终端发布的mqtt消息,并根据第一数据处理规则将mqtt消息中的目标车辆数据写入对应的kafka,进而将携带有目标车辆数据的kafka消息上传至目标单元;
52.上述步骤s104中的第一数据处理规则为与mqtt消息的主题匹配的数据处理规则。即将mqtt消息中的目标车辆数据写入kafka的指定主题(topic)中,并且以part_key(如果有)配置的字段的值为key,用于车辆数据上传(即将携带有目标车辆数据的kafka消息上传至目标单元)。
53.如图2所示,车辆终端发布的mqtt消息为图2中下方的最左侧小方框中的内容,图2中下方的中间小方框中的内容为基于sql的业务规则中的部分内容,图2中下方的最右侧小方框中的内容为携带有目标车辆数据(即topic为test/abc,payload为hello)的kafka消息,图2中最右侧的几个圆圈组成的图示表示kafka。可见,将mqtt消息中的目标车辆数据topic和payload写入了对应的kafka。
54.上述目标单元为kafka,本发明实施例对其不进行具体限制。
55.步骤s106,获取用户配置的数据下发规则,得到kafka消息发布时的第二数据处理规则;
56.具体的,数据下发规则包括:kafka消息的主题和kafka消息的转码操作规则,下文中再具体举例说明。
57.步骤s108,拉取kafka中目标主题的数据,并按照对应的第二数据处理规则对目标主题的数据中的目标数据进行转码操作,进而将转码操作后的数据进行目标mqtt消息重建,以将重建得到的目标mqtt消息下发至对应的车辆终端。
58.上述目标主题为与数据下发规则即kafka消息发布时的第二数据处理规则中设置的主题相同的主题。
59.进而,按照对应的第二数据处理规则对目标主题的数据中的目标数据进行转码操
作,以key为mqtt topic,value为mqtt payload重新发布得到目标mqtt消息,多用于指令下发和业务通知。
60.如图3所示,用户向kafka发布数据,数据处理规则管理引擎拉取kafka中目标主题的数据,具体为图3中下方的最右侧小方框中的内容,图3中下方的中间小方框中的内容为数据下发规则中的部分内容,图3中下方的最左侧小方框中的内容为目标mqtt消息。可见,将kafka中目标主题的数据中的目标数据topic和payload进行目标mqtt消息重建,下发至对应的车辆终端。
61.下面以一个具体的举例对本发明的基于sql的业务规则和数据下发规则进行介绍:
62.基于sql的业务规则如图4所示,数据下发规则如图5所示:
63.图4中,id表示这条业务规则的标识;enabled表示是否启用这条业务规则;sql表示从以v/ul开头的主题对应的mqtt消息中提取from_client_id和payload(如果车辆终端发布的mqtt消息的主题是以v/ul开头的,那么符合这条业务规则,就将其中的from_client_id和payload路由到kafka指定的主题);type表示类型为kafka;broker_addr表示kafka的地址;topic表示kafka的主题(如果mqtt消息的主题是以v/ul开头的,那么将其中的from_client_id和payload路由到主题为mqtt-upload的kafka中)。
64.上述过程实现的是,mqtt消息为消息流,用户想要将其中部分数据录入到kafka中,需要从中筛选得到需要录入的那部分数据,筛选的过程就是由数据处理规则管理引擎实现的。
65.图5中,id表示这条数据下发规则的标识;enabled表示是否启用这条数据下发规则;qos表示以怎样的消息等级来发目标mqtt消息,所有符合这条数据下发规则的,发的目标mqtt消息的消息等级都是1;base64:[“test/#”]表示如果符合test/#这个主题,就对其进行base64转码;broker_addr表示mqtt的地址;topics表示需要拉取的kafka的主题。
[0066]
在本发明实施例中,转码操作规则的数量至少有一条。
[0067]
具体的,数据转码功能可以将从kafka系统拉取的数据按照匹配的规则进行转码,然后再作为mqtt消息发送。
[0068]
图6中示出了两条转码操作规则,其中一条表示符合test/#这个主题的,就对其进行base64转码;另一条表示符合“abc/+”这个主题的,就对其进行other_encoding转码。
[0069]
在本发明的一个可选实施例中,获取用户配置的基于sql的业务规则,包括:通过api的方式获取用户配置的基于sql的业务规则;
[0070]
获取用户配置的数据下发规则,包括:通过api的方式获取用户配置的数据下发规则。
[0071]
在本发明的一个可选实施例中,基于sql的业务规则和数据下发规则可根据用户需要进行调整。
[0072]
本发明的mqtt和kafka之间的数据交互方法实现了车辆数据的接入、数据处理、数据存储、数据分析、转码和指令下发等功能融合贯通,并且提供了灵活的编程接口,以便快速开发出满足符合车辆联网应用;提高了消息的实时处理能力,简化了车联网长链路流程,使云端处理上行数据,下发指令更简单容易。
[0073]
本发明的mqtt和kafka之间的数据交互方法中,基于sql的业务规则和数据下发规
则通过api的方式传入到数据处理规则管理引擎中,可以实现更灵活和高效的数据处理和转换;可以将经过sql以及模板处理后的数据写入kafka的指定topic中,并且以part_key(如果有)配置的字段的值为key,用于车辆数据上传;也可以将从kafka指定topics拉取的数据,以key为mqtt topic,value为mqtt payload重新发布,并支持转码操作,用于车辆指令下发,这样,可以实现数据在kafka和mqtt之间的双向流动和同步;在数据交互过程中,直接进行解码的方法,可以将从kafka系统拉取的数据按照匹配的规则进行转码,然后,再作为mqtt消息发送,简化了消息处理流程。
[0074]
实施例二:
[0075]
本发明实施例还提供了一种mqtt和kafka之间的数据交互装置,该mqtt和kafka之间的数据交互装置主要用于执行本发明实施例一中所提供的mqtt和kafka之间的数据交互方法,以下对本发明实施例提供的mqtt和kafka之间的数据交互装置做具体介绍。
[0076]
图7是根据本发明实施例的一种mqtt和kafka之间的数据交互装置的示意图,如图7所示,该装置主要包括:第一获取单元10、写入单元20、第二获取单元30和转码操作单元40,其中:
[0077]
第一获取单元,用于获取用户配置的基于sql的业务规则,进而得到mqtt消息发布时的第一数据处理规则;
[0078]
写入单元,用于获取车辆终端发布的mqtt消息,并根据第一数据处理规则将mqtt消息中的目标车辆数据写入对应的kafka,进而将携带有目标车辆数据的kafka消息上传至目标单元;
[0079]
第二获取单元,用于获取用户配置的数据下发规则,得到kafka消息发布时的第二数据处理规则;
[0080]
转码操作单元,用于拉取kafka中目标主题的数据,并按照对应的第二数据处理规则对目标主题的数据中的目标数据进行转码操作,进而将转码操作后的数据进行目标mqtt消息重建,以将重建得到的目标mqtt消息下发至对应的车辆终端。
[0081]
在本发明实施例中,提供了一种mqtt和kafka之间的数据交互装置,应用于数据处理规则管理引擎,该装置包括:获取用户配置的基于sql的业务规则,进而得到mqtt消息发布时的第一数据处理规则;获取车辆终端发布的mqtt消息,并根据第一数据处理规则将mqtt消息中的目标车辆数据写入对应的kafka,进而将携带有目标车辆数据的kafka消息上传至目标单元;获取用户配置的数据下发规则,得到kafka消息发布时的第二数据处理规则;拉取kafka中目标主题的数据,并按照对应的第二数据处理规则对目标主题的数据中的目标数据进行转码操作,进而将转码操作后的数据进行目标mqtt消息重建,以将重建得到的目标mqtt消息下发至对应的车辆终端。通过上述描述可知,本发明的mqtt和kafka之间的数据交互装置中,能够根据用户配置的基于sql的业务规则将车辆终端发布的mqtt消息中的目标车辆数据写入对应的kafka,进而实现携带有目标车辆数据的kafka消息的上传,同时,能够根据用户配置的数据下发规则将kafka中目标主题的数据中的目标数据进行转码操作,并将转码操作后的数据进行目标mqtt消息重建,实现将重建得到的目标mqtt消息下发至对应的车辆终端,即能够根据用户的配置(也就是按照用户需求)实现mqtt和kafka之间的高效、灵活和可靠的数据交互,缓解了现有技术无法在车联网场景下按照用户需求实现mqtt和kafka之间的高效、灵活和可靠的数据交互的技术问题。
[0082]
可选地,基于sql的业务规则包括:mqtt消息的主题、mqtt消息的数据处理规则、kafka消息的主题、kafka消息的关键字和kafka消息的值。
[0083]
可选地,数据下发规则包括:kafka消息的主题和kafka消息的转码操作规则。
[0084]
可选地,转码操作规则的数量至少有一条。
[0085]
可选地,第一获取单元还用于:通过api的方式获取用户配置的基于sql的业务规则;第二获取单元还用于:通过api的方式获取用户配置的数据下发规则。
[0086]
可选地,基于sql的业务规则和数据下发规则可根据用户需要进行调整。
[0087]
本发明实施例所提供的装置,其实现原理及产生的技术效果和前述方法实施例相同,为简要描述,装置实施例部分未提及之处,可参考前述方法实施例中相应内容。
[0088]
如图8所示,本技术实施例提供的一种电子设备600,包括:处理器601、存储器602和总线,所述存储器602存储有所述处理器601可执行的机器可读指令,当电子设备运行时,所述处理器601与所述存储器602之间通过总线通信,所述处理器601执行所述机器可读指令,以执行如上述mqtt和kafka之间的数据交互方法的步骤。
[0089]
具体地,上述存储器602和处理器601能够为通用的存储器和处理器,这里不做具体限定,当处理器601运行存储器602存储的计算机程序时,能够执行上述mqtt和kafka之间的数据交互方法。
[0090]
处理器601可能是一种集成电路芯片,具有信号的处理能力。在实现过程中,上述方法的各步骤可以通过处理器601中的硬件的集成逻辑电路或者软件形式的指令完成。上述的处理器601可以是通用处理器,包括中央处理器(central processing unit,简称cpu)、网络处理器(network processor,简称np)等;还可以是数字信号处理器(digital signal processing,简称dsp)、专用集成电路(application specific integrated circuit,简称asic)、现成可编程门阵列(field-programmable gate array,简称fpga)或者其他可编程逻辑器件、分立门或者晶体管逻辑器件、分立硬件组件。可以实现或者执行本技术实施例中的公开的各方法、步骤及逻辑框图。通用处理器可以是微处理器或者该处理器也可以是任何常规的处理器等。结合本技术实施例所公开的方法的步骤可以直接体现为硬件译码处理器执行完成,或者用译码处理器中的硬件及软件模块组合执行完成。软件模块可以位于随机存储器,闪存、只读存储器,可编程只读存储器或者电可擦写可编程存储器、寄存器等本领域成熟的存储介质中。该存储介质位于存储器602,处理器601读取存储器602中的信息,结合其硬件完成上述方法的步骤。
[0091]
对应于上述mqtt和kafka之间的数据交互方法,本技术实施例还提供了一种计算机可读存储介质,所述计算机可读存储介质存储有机器可运行指令,所述计算机可运行指令在被处理器调用和运行时,所述计算机可运行指令促使所述处理器运行上述mqtt和kafka之间的数据交互方法的步骤。
[0092]
本技术实施例所提供的mqtt和kafka之间的数据交互装置可以为设备上的特定硬件或者安装于设备上的软件或固件等。本技术实施例所提供的装置,其实现原理及产生的技术效果和前述方法实施例相同,为简要描述,装置实施例部分未提及之处,可参考前述方法实施例中相应内容。所属领域的技术人员可以清楚地了解到,为描述的方便和简洁,前述描述的系统、装置和单元的具体工作过程,均可以参考上述方法实施例中的对应过程,在此不再赘述。
[0093]
在本技术所提供的实施例中,应该理解到,所揭露装置和方法,可以通过其它的方式实现。以上所描述的装置实施例仅仅是示意性的,例如,所述单元的划分,仅仅为一种逻辑功能划分,实际实现时可以有另外的划分方式,又例如,多个单元或组件可以结合或者可以集成到另一个系统,或一些特征可以忽略,或不执行。另一点,所显示或讨论的相互之间的耦合或直接耦合或通信连接可以是通过一些通信接口,装置或单元的间接耦合或通信连接,可以是电性,机械或其它的形式。
[0094]
再例如,附图中的流程图和框图显示了根据本技术的多个实施例的装置、方法和计算机程序产品的可能实现的体系架构、功能和操作。在这点上,流程图或框图中的每个方框可以代表一个模块、程序段或代码的一部分,所述模块、程序段或代码的一部分包含一个或多个用于实现规定的逻辑功能的可执行指令。也应当注意,在有些作为替换的实现方式中,方框中所标注的功能也可以以不同于附图中所标注的顺序发生。例如,两个连续的方框实际上可以基本并行地执行,它们有时也可以按相反的顺序执行,这依所涉及的功能而定。也要注意的是,框图和/或流程图中的每个方框、以及框图和/或流程图中的方框的组合,可以用执行规定的功能或动作的专用的基于硬件的系统来实现,或者可以用专用硬件与计算机指令的组合来实现。
[0095]
所述作为分离部件说明的单元可以是或者也可以不是物理上分开的,作为单元显示的部件可以是或者也可以不是物理单元,即可以位于一个地方,或者也可以分布到多个网络单元上。可以根据实际的需要选择其中的部分或者全部单元来实现本实施例方案的目的。
[0096]
另外,在本技术提供的实施例中的各功能单元可以集成在一个处理单元中,也可以是各个单元单独物理存在,也可以两个或两个以上单元集成在一个单元中。
[0097]
所述功能如果以软件功能单元的形式实现并作为独立的产品销售或使用时,可以存储在一个计算机可读取存储介质中。基于这样的理解,本技术的技术方案本质上或者说对现有技术做出贡献的部分或者该技术方案的部分可以以软件产品的形式体现出来,该计算机软件产品存储在一个存储介质中,包括若干指令用以使得一台电子设备(可以是个人计算机,服务器,或者网络设备等)执行本技术各个实施例所述车辆标记方法的全部或部分步骤。而前述的存储介质包括:u盘、移动硬盘、只读存储器(read-only memory,简称rom)、随机存取存储器(random access memory,简称ram)、磁碟或者光盘等各种可以存储程序代码的介质。
[0098]
应注意到:相似的标号和字母在下面的附图中表示类似项,因此,一旦某一项在一个附图中被定义,则在随后的附图中不需要对其进行进一步定义和解释,此外,术语“第一”、“第二”、“第三”等仅用于区分描述,而不能理解为指示或暗示相对重要性。
[0099]
最后应说明的是:以上所述实施例,仅为本技术的具体实施方式,用以说明本技术的技术方案,而非对其限制,本技术的保护范围并不局限于此,尽管参照前述实施例对本技术进行了详细的说明,本领域的普通技术人员应当理解:任何熟悉本技术领域的技术人员在本技术揭露的技术范围内,其依然可以对前述实施例所记载的技术方案进行修改或可轻易想到变化,或者对其中部分技术特征进行等同替换;而这些修改、变化或者替换,并不使相应技术方案的本质脱离本技术实施例技术方案的范围。都应涵盖在本技术的保护范围之内。因此,本技术的保护范围应以权利要求的保护范围为准。

技术特征:
1.一种mqtt和kafka之间的数据交互方法,其特征在于,应用于数据处理规则管理引擎,所述方法包括:获取用户配置的基于sql的业务规则,进而得到mqtt消息发布时的第一数据处理规则;获取车辆终端发布的mqtt消息,并根据所述第一数据处理规则将所述mqtt消息中的目标车辆数据写入对应的kafka,进而将携带有目标车辆数据的kafka消息上传至目标单元;获取用户配置的数据下发规则,得到kafka消息发布时的第二数据处理规则;拉取kafka中目标主题的数据,并按照对应的第二数据处理规则对所述目标主题的数据中的目标数据进行转码操作,进而将转码操作后的数据进行目标mqtt消息重建,以将重建得到的目标mqtt消息下发至对应的车辆终端。2.根据权利要求1所述的方法,其特征在于,所述基于sql的业务规则包括:mqtt消息的主题、mqtt消息的数据处理规则、kafka消息的主题、kafka消息的关键字和kafka消息的值。3.根据权利要求1所述的方法,其特征在于,所述数据下发规则包括:kafka消息的主题和kafka消息的转码操作规则。4.根据权利要求3所述的方法,其特征在于,所述转码操作规则的数量至少有一条。5.根据权利要求1所述的方法,其特征在于,获取用户配置的基于sql的业务规则,包括:通过api的方式获取用户配置的所述基于sql的业务规则;获取用户配置的数据下发规则,包括:通过api的方式获取用户配置的数据下发规则。6.根据权利要求1所述的方法,其特征在于,所述基于sql的业务规则和所述数据下发规则可根据用户需要进行调整。7.一种mqtt和kafka之间的数据交互装置,其特征在于,应用于数据处理规则管理引擎,所述装置包括:第一获取单元,用于获取用户配置的基于sql的业务规则,进而得到mqtt消息发布时的第一数据处理规则;写入单元,用于获取车辆终端发布的mqtt消息,并根据所述第一数据处理规则将所述mqtt消息中的目标车辆数据写入对应的kafka,进而将携带有目标车辆数据的kafka消息上传至目标单元;第二获取单元,用于获取用户配置的数据下发规则,得到kafka消息发布时的第二数据处理规则;转码操作单元,用于拉取kafka中目标主题的数据,并按照对应的第二数据处理规则对所述目标主题的数据中的目标数据进行转码操作,进而将转码操作后的数据进行目标mqtt消息重建,以将重建得到的目标mqtt消息下发至对应的车辆终端。8.根据权利要求7所述的装置,其特征在于,所述基于sql的业务规则包括:mqtt消息的主题、mqtt消息的数据处理规则、kafka消息的主题、kafka消息的关键字和kafka消息的值。9.一种电子设备,包括存储器、处理器及存储在所述存储器上并可在所述处理器上运行的计算机程序,其特征在于,所述处理器执行所述计算机程序时实现上述权利要求1至6中任一项所述的方法的步骤。10.一种计算机可读存储介质,其特征在于,所述计算机可读存储介质存储有机器可运
行指令,所述机器可运行指令在被处理器调用和运行时,所述机器可运行指令促使所述处理器运行上述权利要求1至6中任一项所述的方法。

技术总结
本发明提供了一种MQTT和Kafka之间的数据交互方法、装置和电子设备,该数据交互方法中,能够根据用户配置的基于SQL的业务规则将车辆终端发布的MQTT消息中的目标车辆数据写入对应的Kafka,进而实现携带有目标车辆数据的Kafka消息的上传,同时,能够根据用户配置的数据下发规则将Kafka中目标主题的数据中的目标数据进行转码操作,并将转码操作后的数据进行目标MQTT消息重建,实现将重建得到的目标MQTT消息下发至对应的车辆终端,即能够根据用户的配置(也就是按照用户需求)实现MQTT和kafka之间的高效、灵活和可靠的数据交互。灵活和可靠的数据交互。灵活和可靠的数据交互。


技术研发人员:刘瑞强 于昊
受保护的技术使用者:东软睿驰汽车技术(大连)有限公司
技术研发日:2023.06.15
技术公布日:2023/9/6
版权声明

本文仅代表作者观点,不代表航空之家立场。
本文系作者授权航家号发表,未经原创作者书面授权,任何单位或个人不得引用、复制、转载、摘编、链接或以其他任何方式复制发表。任何单位或个人在获得书面授权使用航空之家内容时,须注明作者及来源 “航空之家”。如非法使用航空之家的部分或全部内容的,航空之家将依法追究其法律责任。(航空之家官方QQ:2926969996)

飞行汽车 https://www.autovtol.com/

分享:

扫一扫在手机阅读、分享本文

相关推荐