一种es数据实时同步方法、系统及存储介质
未命名
07-19
阅读:97
评论:0
1.本发明涉及elasticsearch数据同步技术领域,尤其是涉及一种es数据实时同步方法、系统及存储介质。
背景技术:
2.elasticsearch(简称es)数据同步主要针对需要将mysql集群数据同步到elasticsearch集群的互联网项目,实现能够以极少的代价以及延迟,将mysql数据变化实时更新到elasticsearch中。其主要目标是在互联网项目应用中,用户能够快速感知到数据的更新,并在使用搜索(该搜索需要通过elasticsearch来实现)功能时能快速搜索出,以增加用户体验。
3.目前会使用logstash做增量数据同步,但是因为logstash的高资源消耗和缓慢的处理速度并不适合在服务器运行期间将增量数据从mysql同步到es。logstash无法实时感知mysql数据的变化,只能根据架构师配置的固定时间间隔来同步数据,而这个时间的设定必定会带来一定的不合理性,设置的过短会导致服务器资源快速消耗和占用,过长会降低用户的好感和体验,并且在数据变化不大的时期依然按照固定的时间间隔进行数据同步的话,必然会导致服务器资源的不必要消耗。
4.而目前也会用canal来进行增量数据同步,但canal无法承担复杂的同步业务,不具有幂等性,无法灵活对数据进行二次处理和过滤。
技术实现要素:
5.本发明提供一种es数据实时同步方法、系统及存储介质,通过lostash、canal、kibana、kafka、java等技术来做到mysql集群数据到elasticsearch集群的实时同步,其中,使用canal做增量数据同步,克服了使用logstash做增量数据同步所带来的缺陷,还引入了java、canal客户端和es客户端,克服了现有采用canal做增量数据同步所带来的缺陷。
6.本说明书实施例的第一方面公开了一种es数据实时同步方法,包括如下步骤:s1.配置mysql集群环境,并开启mysql的binlog写入功能;s2.使用logstash连接mysql和es集群,将mysql中原有的数据全量同步到es中;s3.配置canal环境,连接mysql集群,订阅master主节点的binlog日志;s4.使用java编写canal客户端,获取canal订阅到的增量数据信息;s5.配置kafka环境,并使用canal客户端将增量数据信息推送给kafka;s6.使用kafka连接es集群,编写消费逻辑,调用es开放的接口,将增量数据以http请求的方式发送给es端口进行数据同步;s7.使用es接收请求,将增量数据同步到指定索引中。
7.本说明书的一实施例中,s1包括:s11.安装多个mysql软件或服务器,从中选择一个作为主节点,即为主机,其余为从机;并在my.cnf配置文件中,将server-id属性设置为1,开启binlog日志功能,以记录主
节点上所有数据的修改变化;s12.为所有从机设置不同的server-id值,并配置从机连接主机的相关参数;s13.配置完成后需要进行连通性测试,必须保证主机上进行写操作时主机的binlog日志能够同步更新,且从机数据也能实时更新到;s14.使用mysqldump将主节点中已有数据全量同步到从机中,以保证数据的一致性;s15.通过模拟各种故障场景测试集群环境,并根据条件进行压测,以保证mysql集群在恶劣高压环境下依然能正常工作;s16.创建数据库、表、索引和视图。
8.本说明书的一实施例中,s2包括:s21.配置elasticsearch集群环境;s211.安装下载多个elasticsearch软件,区分节点类型为主节点、数据节点或主数据节点,以便各个节点在所处集群中分配到角色和任务;s212.为每个集群节点配置elasticsearch.yml配置文件,并定义集群名称、节点名称、网络地址和开启跨域访问,以让集群节点之间能够相互连通并进行数据同步;s213.启动各个elasticsearch节点,使用kibana客户端工具检测集群状态,并对集群环境做适当调整;s214.根据条件优化集群性能;s215.添加ik中文分词器,为中文文本做智能分词,以更好兼容中文数据,并添加扩展分词和停用词;s216.依据mysql集群中的表和字段,一一对应建立索引和映射,并给需要的字段添加analyzer和search_analyzer属性,以依据该字段做合适的倒排索引。
9.本说明书的一实施例中,s2还包括:s22.配置logstash环境,做全量数据同步;s221.下载安装logstash软件,检查版本是否对应;s222.上传对应的mysql驱动依赖,以让logstash能够连接数据库;s223.为每一个es索引创建logstash配置文件,其中需要配置连接mysql集群的环境、查询mysql数据的sql语句、es连接环境、对应的es索引名称、mysql主键字段以及日志相关参数;s224.编写pipelines.yml配置文件,为每一个es索引对应配置文件添加pipeline.id和path.config属性,以支持多表数据的同时插入;s225.启动logstash,检查并测试es数据的全量同步。
10.本说明书的一实施例中,s3包括:s31.下载canal软件,根据版本修改源码以及软件依赖,以支持项目使用的mysql版本和elasticsearch版本;s32.下载jdk,配置jdk环境;s33.修改canal deployer的配置文件,设置数据库地址、账号、密码以及salveid值,确保与mysql集群中各节点的server-id值不同;s34.修改mysql驱动版本,并修改驱动器权限;
s35.启动服务,观察日志变化,查看是否正常成功连接mysql并读取到binlog增量日志信息。
11.本说明书的一实施例中,s4包括:s41.安装配置java环境以及springboot环境;s42.上传canal客户端的maven依赖;s43.编写客户端连接代码,指定canal软件的连接地址、端口、实例名称和账号密码;s44.编写canal客户端消费逻辑,将得到的增量数据信息转换为json数据格式,并以http请求的方式发送出去。
12.本说明书的一实施例中,s5包括:s51.安装并解压kafka软件;s52.修改server.properties文件,配置kafka进程端口号、主机地址及名称和日志相关属性;s53.修改zookeeper.properties文件,配置客户端连接端口数据目录等属性;s54.启动zookeeper和kafka,检测是否有启动错误和异常;s55.创建topic,指定名称、zookeeper连接环境、分区和副本因子。
13.本说明书的一实施例中,s6包括:s61.创建kafka生产者,接收http请求,提取并转换数据格式,编写回调函数并对失败进行补偿处理,配置application.prpertise,自定义分区器和重试次数等属性,并将增量数据信息发送到topic中;s62.添加连接es的客户端依赖,并配置连接es集群的相关参数;s63.创建kafka消费者,指定监听的topic、partition、offset,配置application.prpertise,开启批量消费和自动应答等属性,接收topic内的增量数据;s64.以json数据格式拼接http请求,使用resthighlevelclient对象将增量数据以http请求的形式发送至es集群。
14.本说明书实施例的第二方面公开了一种es数据实时同步系统,包括:第一配置模块,用于配置mysql集群环境,并开启mysql的binlog写入功能;全量数据同步模块,用于使用logstash连接mysql和es集群,将mysql中原有的数据全量同步到es中;第二配置模块,用于配置canal环境,连接mysql集群,订阅master主节点的binlog日志;增量数据获取模块,用于使用java编写canal客户端,获取canal订阅到的增量数据信息;第三配置模块,用于配置kafka环境,并使用canal客户端将增量数据信息推送给kafka;第一数据同步模块,用于使用kafka连接es集群,编写消费逻辑,调用es开放的接口,将增量数据以http请求的方式发送给es端口进行数据同步;第二数据同步模块,用于使用es接收请求,将增量数据同步到指定索引中。
15.本说明书实施例的第三方面公开了一种计算机可读存储介质,所述存储介质存储
计算机指令,当计算机读取所述计算机指令时,所述计算机执行上述的es数据实时同步方法。
16.综上所述,本发明至少具有以下有益效果:本发明通过lostash、canal、kibana、kafka,java等技术来做到mysql集群数据到elasticsearch集群的实时同步。logstash用于将mysql集群中已有的数据全部同步至elasticsearch。canal用于连接mysql集群并订阅binlog日志的增量日志数据。kibana用于查看和监控elasticsearch中数据的变化,编写java canal客户端代码用于连接canal,获取canal订阅到的增量日志数据,并推送至kafka消息队列。kafka用于接收canal客户端推送的数据,连接elasticsearch集群后将增量数据推送至elasticsearch集群中,以此做到mysql数据到elasticsearch的实时同步。
17.其中,使用logstash做全量数据同步:因为它提供了强大的数据过滤和转换功能,可以对数据进行处理、解析和转换,且插件化的架构使得它可以方便地扩展,满足各种数据处理需求。在系统开发和测试阶段,方便做系统功能测试和canal技术的引入、维护和有效性检测。
18.使用canal做增量数据同步,解决了使用logstash做增量数据同步所带来的实时性不足以及资源不合理消耗等问题;同时引入了java、canal客户端和es客户端,使用程序代码根据的业务需求对数据进行二次处理和包装,比如算数运算、日期格式转换、过滤等等,克服了现有采用canal做增量数据同步所带来的缺陷。
19.用java编写canal客户端的消费逻辑,而不直接使用canal本身自带的消费逻辑(canal在本发明中只使用了其订阅功能,而未使用消费功能):1、因为要使用canal消费功能在配置上相当繁琐,且现今canal还无法很好地支持elasticsearch7以上的版本;2、无法针对特殊场景进行定制化操作和处理,无法承担复杂业务,不具备数据重试、幂等性等机制。
20.使用kafka做为中间件连接canal客户端和es集群,而不直接让canal客户端对接es集群:1、可根据业务需求创建多topic、多分区,采用多消费者模式,增加多线程并发消费能力,提高系统同步性能。2、kafka是分布式流处理平台,更适合分布式很集群环境,能处理大规模的实时数据流;3、高性能、可扩展、可靠性、灵活性、可观测性、峰值处理能力;4、能够做到java程序和es集群之间的解耦,有助于控制和优化生产消息和消费消息的处理速度不一致问题,减轻服务器压力,并提高了系统的健壮性。
附图说明
21.为了更清楚地说明本发明实施例技术方案,下面将对实施例描述中所需要使用的附图作简单地介绍,显而易见地,下面描述中的附图仅仅是本发明的一些实施例,对于本领域普通技术人员来讲,在不付出创造性劳动的前提下,还可以根据这些附图获得其他的附图。
22.图1为本发明中所涉及的es数据实时同步方法的步骤示意图。
23.图2为本发明中所涉及的es数据实时同步方法的流程示意图。
24.图3为本发明中所涉及的es数据实时同步系统的模块示意图。
具体实施方式
25.在下文中,仅简单地描述了某些示例性实施例。正如本领域技术人员可认识到的那样,在不脱离本发明实施例的精神或范围的情况下,可通过各种不同方式修改所描述的实施例。因此,附图和描述被认为本质上是示例性的而非限制性的。
26.下文的公开提供了许多不同的实施方式或例子用来实现本发明实施例的不同结构。为了简化本发明实施例的公开,下文中对特定例子的部件和设置进行描述。当然,它们仅仅为示例,并且目的不在于限制本发明实施例。此外,本发明实施例可以在不同例子中重复参考数字和/或参考字母,这种重复是为了简化和清楚的目的,其本身不指示所讨论各种实施方式和/或设置之间的关系。
27.下面结合附图对本发明的实施例进行详细说明。
28.如图1和图2所示,本说明书实施例提供了一种es数据实时同步方法,包括:s1.配置mysql集群环境(这里我们使用mysql replication的集群方案,区分出主从机)。
29.在一些实施例中,s1包括:s11.安装多个mysql软件或服务器,从中选择一个作为主节点;在my.cnf配置文件中,将server-id属性设置为1,开启binlog日志功能,以记录主节点上所有数据的修改变化,并可做相应的性能调整;s12.为所有从机设置不同的server-id值,并配置从机连接主机的相关参数(master-host、master-user等);s13.配置完成后需要进行连通性测试,必须保证主机上进行写操作时主机的binlog日志能够同步更新,且从机数据也能实时更新到;s14.使用mysqldump等工具将主节点中已有数据全量同步到从机中,以保证数据的一致性;s15.测试集群环境,可通过模拟各种故障场景,并根据条件进行适当的压测,以保证mysql集群在恶劣高压环境下依然能正常工作(可采用nginx等技术做mysql集群的负载均衡,以减少单节点的压力,提高集群的可用性);s16.根据自身项目情况创建数据库、表、索引、视图等对象。
30.s2使用logstash连接mysql和es集群,将mysql中原有的数据全量同步到es中。
31.在一些实施例中,s2包括:s21.配置elasticsearch集群环境:s211.安装下载多个elasticsearch软件,区分节点类型为主节点、数据节点或主数据节点,以便各个节点可以在所处集群中分配到角色和任务;s212.为每个集群节点配置elasticsearch.yml配置文件,需要定义集群名称,节点名称,网络地址,开启跨域访问等等属性,以让集群节点之间能够相互连通并进行数据同步;s213.启动各个elasticsearch节点,使用kibana客户端工具检测集群状态,并对集群环境做适当调整,比如可选择分片策略,重新分配分片,添加或删除节点;s214.根据条件优化集群性能,如决策分片数量,控制分片大小,横向扩展节点,提高副本数量等;
s215.添加ik中文分词器,为中文文本做智能分词,以更好兼容中文数据,并适当添加扩展分词和停用词,为用户提供更强大的搜索功能;s216.依据mysql集群中的表和字段,一一对应建立索引和映射,并给需要的字段添加analyzer和search_analyzer属性,以依据该字段做合适的倒排索引;在一些实施例中,s2还包括:s22.配置logstash环境,做全量数据同步:s221.下载安装logstash软件,检查版本是否对应(es、kibana、logstash版本需要对应正确);s222.上传对应的mysql驱动依赖,以让logstash能够连接数据库;s223.为每一个es索引创建logstash配置文件,其中需要配置连接mysql集群的环境、查询mysql数据的sql语句、es连接环境、对应的es索引名称、mysql主键字段以及日志相关参数;s224.编写pipelines.yml配置文件,为每一个es索引对应配置文件添加pipeline.id和path.config属性,以支持多表数据的同时插入;s225.启动logstash,检查并测试es数据的全量同步。
32.s3.配置canal环境,连接mysql集群,订阅master主节点的binlog日志。
33.在一些实施例中,s3包括:s31.下载canal软件,根据版本修改源码以及软件依赖,以支持项目使用的mysql版本和elasticsearch版本;s32.下载jdk,配置jdk环境(版本需对应);s33.修改canal deployer的配置文件,设置数据库地址、账号、密码以及salveid值,确保与mysql集群中各节点的server-id值不同;s34.修改mysql驱动版本(现今canal默认支持mysql5.7版本,若版本不一致,需重新上传mysql驱动以做相应的适配),并修改驱动器权限;s35.启动服务,观察日志变化,查看是否正常成功连接mysql并读取到binlog增量日志信息.s4.使用java编写canal客户端,获取canal订阅到的增量数据信息。
34.在一些实施例中,s4包括:s41.安装配置java环境以及springboot环境;s42.上传canal客户端的maven依赖;s43.编写客户端连接代码,指定canal软件的连接地址、端口、实例名称、账号密码等信息;s44.编写canal客户端消费逻辑,能够将得到的增量数据信息转换为json数据格式,并以http请求的方式发送出去。
35.s5.配置kafka环境。
36.在一些实施例中,s5包括:s51.安装解压kafka软件;s52.修改server.properties文件,配置kafka进程端口号、主机地址及名称、日志相关属性等;
s53.修改zookeeper.properties文件,配置客户端连接端口数据目录,并可根据条件做适当优化,如配置最大连接数、超时时间、允许最大同步进程数等,提高kafka工作的性能和效率;s54.启动zookeeper和kafka,检测是否有启动错误和异常;s55.创建topic,指定名称、zookeeper连接环境、分区、副本因子等信息。
37.s6.使用kafka连接es集群,编写生产消费逻辑,将增量数据发送至es集群。
38.在一些实施例中,s6包括:s61.创建kafka生产者,接收http请求,提取并转换数据格式,编写回调函数并对失败进行补偿处理,配置application.prpertise,自定义分区器和重试次数等属性,并将增量数据信息发送到topic中;s62.添加连接es的客户端依赖,并配置连接es集群的相关参数;s63.创建kafka消费者,指定监听的topic、partition、offset,配置application.prpertise,开启批量消费和自动应答等属性,接收topic内的增量数据;s64.以json数据格式拼接http请求,使用resthighlevelclient对象将增量数据以http请求的形式发送至es集群。
39.s7.es监听指定端口,接收请求,自动将增量数据同步到指定索引中。
40.如图3所示,本说明书实施例的第二方面公开了一种es数据实时同步系统,包括:第一配置模块,用于配置mysql集群环境,并开启mysql的binlog写入功能;全量数据同步模块,用于使用logstash连接mysql和es集群,将mysql中原有的数据全量同步到es中;第二配置模块,用于配置canal环境,连接mysql集群,订阅master主节点的binlog日志;增量数据获取模块,用于使用java编写canal客户端,获取canal订阅到的增量数据信息;第三配置模块,用于配置kafka环境,并使用canal客户端将增量数据信息推送给kafka;第一数据同步模块,用于使用kafka连接es集群,编写消费逻辑,调用es开放的接口,将增量数据以http请求的方式发送给es端口进行数据同步;第二数据同步模块,用于使用es接收请求,将增量数据同步到指定索引中。
41.在一些实施例中,es数据实时同步系统还包括:处理器,与第一配置模块、全量数据同步模块、第二配置模块、增量数据获取模块、第三配置模块、第一数据同步模块和第二数据同步模块连接;存储器,与处理器连接,并存储有可在处理器上运行的计算机程序;其中,当处理器执行计算机程序时,处理器控制第一配置模块、全量数据同步模块、第二配置模块、增量数据获取模块、第三配置模块、第一数据同步模块和第二数据同步模块工作,以实现上述的es数据实时同步方法。
42.本说明书实施例的第三方面公开了一种计算机可读存储介质,存储介质存储计算机指令,当计算机读取计算机指令时,计算机执行上述的es数据实时同步方法。
43.以上所述实施例是用以说明本发明,并非用以限制本发明,所以举例数值的变更
或等效元件的置换仍应隶属本发明的范畴。
44.由以上详细说明,可使本领域普通技术人员明了本发明的确可达成前述目的,实已符合专利法的规定。
45.尽管已描述了本发明的优选实施例,但本领域内的技术人员一旦得知了基本创造性概念,则可对这些实施例作出另外的变更和修改。所以,所附权利要求意欲解释为包括优选实施例以及落入本发明范围的所有变更和修改。以上所述仅为本发明的较佳实施例而已,并不用以限制本发明,应当指出的是,凡在本发明的精神和原则之内所作的任何修改、等同替换和改进等,均应包含在本发明的保护范围之内。
46.应当注意的是,上述有关流程的描述仅仅是为了示例和说明,而不限定本说明书的适用范围。对于本领域技术人员来说,在本说明书的指导下可以对流程进行各种修正和改变。然而,这些修正和改变仍在本说明书的范围之内。
47.上文已对基本概念做了描述,显然,对于阅读此申请后的本领域的普通技术人员来说,上述发明披露仅作为示例,并不构成对本技术的限制。虽然此处并未明确说明,但本领域的普通技术人员可能会对本技术进行各种修改、改进和修正。该类修改、改进和修正在本技术中被建议,所以该类修改、改进、修正仍属于本技术示范实施例的精神和范围。
48.同时,本技术使用了特定词语来描述本技术的实施例。例如“一个实施例”、“一实施例”、和/或“一些实施例”意指与本技术至少一个实施例有关的某一特征、结构或特性。因此,应当强调并注意的是,本说明书中在不同位置两次或以上提及的“一实施例”或“一个实施例”或“一替代性实施例”并不一定是指同一实施例。此外,本技术的一个或多个实施例中的某些特征、结构或特点可以进行适当的组合。
49.此外,本领域的普通技术人员可以理解,本技术的各方面可以通过若干具有可专利性的种类或情况进行说明和描述,包括任何新的和有用的过程、机器、产品或物质的组合,或对其任何新的和有用的改进。因此,本技术的各个方面可以完全由硬件实施、可以完全由软件(包括固件、常驻软件、微代码等)实施、也可以由硬件和软件组合实施。以上硬件或软件均可被称为“单元”、“模块”或“系统”。此外,本技术的各方面可以采取体现在一个或多个计算机可读介质中的计算机程序产品的形式,其中计算机可读程序代码包含在其中。
50.本技术各部分操作所需的计算机程序代码可以用任意一种或以上程序设计语言编写,包括如java、scala、smalltalk、eiffel、jade、emerald、c++、c#、vb.net、python等的面向对象程序设计语言、如c程序设计语言、visualbasic、fortran2103、perl、cobol2102、php、abap的常规程序化程序设计语言、如python、ruby和groovy的动态程序设计语言或其它程序设计语言等。该程序代码可以完全在用户计算机上运行、或作为独立的软件包在用户计算机上运行、或部分在用户计算机上运行部分在远程计算机运行、或完全在远程计算机或服务器上运行。在后种情况下,远程计算机可以通过任何网络形式与用户计算机连接,比如局域网(lan)或广域网(wan),或连接至外部计算机(例如通过因特网),或在云计算环境中,或作为服务使用如软件即服务(saas)。
51.此外,除非权利要求中明确说明,本技术所述处理元素和序列的顺序、数字字母的使用、或其他名称的使用,并非用于限定本技术流程和方法的顺序。尽管上述披露中通过各种示例讨论了一些目前认为有用的发明实施例,但应当理解的是,该类细节仅起到说明的目的,附加的权利要求并不仅限于披露的实施例,相反,权利要求旨在覆盖所有符合本技术
实施例实质和范围的修正和等价组合。例如,尽管上述各种组件的实现可以体现在硬件设备中,但是它也可以实现为纯软件解决方案,例如,在现有服务器或移动设备上的安装。
52.同理,应当注意的是,为了简化本技术披露的表述,从而帮助对一个或多个发明实施例的理解,前文对本技术的实施例的描述中,有时会将多种特征归并至一个实施例、附图或对其的描述中。然而,本技术的该方法不应被解释为反映所申明的客体需要比每个权利要求中明确记载的更多特征的意图。相反,发明的主体应具备比上述单一实施例更少的特征。
技术特征:
1.一种es数据实时同步方法,其特征在于,包括如下步骤:s1.配置mysql集群环境,并开启mysql的binlog写入功能;s2.使用logstash连接mysql和es集群,将mysql中原有的数据全量同步到es中;s3.配置canal环境,连接mysql集群,订阅master主节点的binlog日志;s4.使用java编写canal客户端,获取canal订阅到的增量数据信息;s5.配置kafka环境,并使用canal客户端将增量数据信息推送给kafka;s6.使用kafka连接es集群,编写消费逻辑,调用es开放的接口,将增量数据以http请求的方式发送给es端口进行数据同步;s7.使用es接收请求,将增量数据同步到指定索引中。2.根据权利要求1所述的es数据实时同步方法,其特征在于,s1包括如下步骤:s11.安装多个mysql软件或服务器,从中选择一个作为主节点,即为主机,其余为从机;并在my.cnf配置文件中,将server-id属性设置为1,开启binlog日志功能,以记录主节点上所有数据的修改变化;s12.为所有从机设置不同的server-id值,并配置从机连接主机的相关参数;s13.配置完成后需要进行连通性测试,必须保证主机上进行写操作时主机的binlog日志能够同步更新,且从机数据也能实时更新到;s14.使用mysqldump将主节点中已有数据全量同步到从机中,以保证数据的一致性;s15.通过模拟各种故障场景测试集群环境,并根据条件进行压测,以保证正常工作;s16.创建数据库、表、索引和视图。3.根据权利要求1所述的es数据实时同步方法,其特征在于,s2包括如下步骤:s21.配置elasticsearch集群环境;s211.安装下载多个elasticsearch软件,区分节点类型为主节点、数据节点或主数据节点,以便各个节点在所处集群中分配到角色和任务;s212.为每个集群节点配置elasticsearch.yml配置文件,并定义集群名称、节点名称、网络地址和开启跨域访问,以让集群节点之间能够相互连通并进行数据同步;s213.启动各个elasticsearch节点,使用kibana客户端工具检测集群状态,并对集群环境做调整;s214.根据条件优化集群性能;s215.添加ik中文分词器,为中文文本做智能分词,以更好兼容中文数据,并添加扩展分词和停用词;s216.依据mysql集群中的表和字段,一一对应建立索引和映射,并给需要的字段添加analyzer和search_analyzer属性,以依据该字段做合适的倒排索引。4.根据权利要求3所述的es数据实时同步方法,其特征在于,s2还包括如下步骤:s22.配置logstash环境,做全量数据同步;s221.下载安装logstash软件,检查版本是否对应;s222.上传对应的mysql驱动依赖,以让logstash能够连接数据库;s223.为每一个es索引创建logstash配置文件,其中需要配置连接mysql集群的环境、查询mysql数据的sql语句、es连接环境、对应的es索引名称、mysql主键字段以及日志相关参数;
s224.编写pipelines.yml配置文件,为每一个es索引对应配置文件添加pipeline.id和path.config属性,以支持多表数据的同时插入;s225.启动logstash,检查并测试es数据的全量同步。5.根据权利要求1所述的es数据实时同步方法,其特征在于,s3包括如下步骤:s31.下载canal软件,根据版本修改源码以及软件依赖,以支持项目使用的mysql版本和elasticsearch版本;s32.下载jdk,配置jdk环境;s33.修改canal deployer的配置文件,设置数据库地址、账号、密码以及salveid值,确保与mysql集群中各节点的server-id值不同;s34.修改mysql驱动版本,并修改驱动器权限;s35.启动服务,观察日志变化,查看是否正常成功连接mysql并读取到binlog增量日志信息。6.根据权利要求1所述的es数据实时同步方法,其特征在于,s4包括如下步骤:s41.安装配置java环境以及springboot环境;s42.上传canal客户端的maven依赖;s43.编写客户端连接代码,指定canal软件的连接地址、端口、实例名称和账号密码;s44.编写canal客户端消费逻辑,将得到的增量数据信息转换为json数据格式,并以http请求的方式发送出去。7.根据权利要求1所述的es数据实时同步方法,其特征在于,s5包括如下步骤:s51.安装并解压kafka软件;s52.修改server.properties文件,配置kafka进程端口号、主机地址及名称和日志相关属性;s53.修改zookeeper.properties文件,配置客户端连接端口数据目录;s54.启动zookeeper和kafka,检测是否有启动错误和异常;s55.创建topic,指定名称、zookeeper连接环境、分区和副本因子。8.根据权利要求1所述的es数据实时同步方法,其特征在于,s6包括如下步骤:s61.创建kafka生产者,接收http请求,提取并转换数据格式,编写回调函数并对失败进行补偿处理,配置application.prpertise,自定义分区器和重试次数,并将增量数据信息发送到topic中;s62.添加连接es的客户端依赖,并配置连接es集群的相关参数;s63.创建kafka消费者,指定监听的topic、partition、offset,配置application.prpertise,开启批量消费和自动应答,接收topic内的增量数据;s64.以json数据格式拼接http请求,使用resthighlevelclient对象将增量数据以http请求的形式发送至es集群。9.一种es数据实时同步系统,包括:第一配置模块,用于配置mysql集群环境,并开启mysql的binlog写入功能;全量数据同步模块,用于使用logstash连接mysql和es集群,将mysql中原有的数据全量同步到es中;第二配置模块,用于配置canal环境,连接mysql集群,订阅master主节点的binlog日
志;增量数据获取模块,用于使用java编写canal客户端,获取canal订阅到的增量数据信息;第三配置模块,用于配置kafka环境,并使用canal客户端将增量数据信息推送给kafka;第一数据同步模块,用于使用kafka连接es集群,编写消费逻辑,调用es开放的接口,将增量数据以http请求的方式发送给es端口进行数据同步;第二数据同步模块,用于使用es接收请求,将增量数据同步到指定索引中。10.一种计算机可读存储介质,其特征在于,所述存储介质存储计算机指令,当计算机读取所述计算机指令时,所述计算机执行权利要求1至8中任一项所述的es数据实时同步方法。
技术总结
本发明提供了一种es数据实时同步方法、系统及存储介质,其方法包括如下:S1.配置mysql集群环境;S2.使用logstash连接mysql和es集群,将mysql中原有的数据全量同步到es中;S3.配置canal环境;S4.使用java编写canal客户端,获取增量数据信息;S5.配置kafka环境,并使用canal客户端将增量数据信息推送给kafka;S6.使用Kafka连接es集群,将增量数据以http请求的方式发送给es端口进行数据同步;S7.使用es接收请求,将增量数据同步到指定索引中。本发明使用canal做增量数据同步,克服了使用logstash做增量数据同步所带来的缺陷。logstash做增量数据同步所带来的缺陷。logstash做增量数据同步所带来的缺陷。
技术研发人员:魏培阳 周杰三 舒红平 刘魁 曹亮
受保护的技术使用者:成都信息工程大学
技术研发日:2023.06.12
技术公布日:2023/7/18
版权声明
本文仅代表作者观点,不代表航空之家立场。
本文系作者授权航家号发表,未经原创作者书面授权,任何单位或个人不得引用、复制、转载、摘编、链接或以其他任何方式复制发表。任何单位或个人在获得书面授权使用航空之家内容时,须注明作者及来源 “航空之家”。如非法使用航空之家的部分或全部内容的,航空之家将依法追究其法律责任。(航空之家官方QQ:2926969996)
飞行汽车 https://www.autovtol.com/
