分布式消息中间件的多机房容灾方法和系统以及客户端与流程

未命名 10-19 阅读:146 评论:0


1.本说明书一个或多个实施例涉及网络信息技术,尤其涉及针对rocketmq这种分布式消息中间件的多机房容灾方法和系统以及rocketmq客户端。


背景技术:

2.rocketmq是一款低延迟、高并发、高可用、高可靠的分布式消息中间件。rocketmq既可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。比如,在交易系中,每笔交易订单数据的产生会引起几百个下游业务系统的关注及更新,包括下游的物流系统、购物车系统、积分系统、流计算分析系统等等,整体业务系统庞大而且复杂,利用rocketmq来异步通知下游各个业务系统所关注的内容比如交易订单数据时,可实现异步通信和应用解耦,从而确保主站业务的连续性。
3.互联网业务尤其是带有金融属性的业务,对可用性要求是非常高的。通常会采用多机房的方式来部署针对rocketmq的应用,也就是说,设置多个机房,每一个机房中都包括rocketmq的一套完整组件,这样无论是业务应用还是中间件应用在出现机房级的故障时,能够通过切换业务机房的流量来保证应用整体仍能够对外提供服务。但是通常情况下,采用多机房方式部署的rocketmq在出现故障之后,需要管理人员手动将流量从一个机房中的rocketmq切换到另一个机房中的rocketmq,这样,在出现故障之后到完成流量的切换之前,中间都会有一个时间差,在这段时间内,故障机房的业务仍然处于不可用状态,这在互联网的高可用场景中是难以接受的。
4.因此,亟需一种针对rocketmq的多机房容灾方案,从而保证业务切换过程中服务的连续性,提升业务的无缝切换能力。


技术实现要素:

5.本说明书一个或多个实施例描述了针对分布式消息中间件rocketmq的多机房容灾方法和系统以及rocketmq客户端,能够保证业务切换过程中服务的连续性。
6.根据第一方面,提供了一种针对rocketmq的多机房容灾方法,其中,每一个机房中均包括基于rocketmq协议的各个组件,该基于rocketmq协议的各个组件中包括预先设置的管理装置;该方法包括:
7.每一个机房中的name server获取所有机房中的broker的地址;
8.当一个机房中的rocketmq客户端需要传输消息时,该rocketmq客户端从本机房的管理装置处获取所有机房中当前可用的各个name server的地址信息;
9.该rocketmq客户端根据当前可用的各个name server的地址信息,选择本次传输对应的name server,向所选择的name server发送请求;
10.该rocketmq客户端接收所选择的name server发来的所有机房中当前可用的各个broker的地址信息;
11.该rocketmq客户端根据当前可用的各个broker的地址信息,选择本次传输对应的broker,并与所选择的broker进行消息的传输。
12.其中,name server的地址信息中包括:该name server所属的机房的信息;
13.所述rocketmq客户端根据当前可用的各个name server的地址信息选择本次传输对应的name server,包括:
14.所述rocketmq客户端得到本地预先配置的该rocketmq客户端所属的机房的信息;
15.所述rocketmq客户端判断在当前可用的各个name server的地址信息中是否存在第一name server的地址信息;其中,第一name server的地址信息中所包括的该第一name server所属的机房的信息与该rocketmq客户端所属的机房的信息相同;
16.如果是,则选择第一name server作为本次传输对应的name server;
17.如果否,则根据当前可用的各个name server的地址信息随机选择或者利用负载均衡的原则选择本次传输对应的name server。
18.其中,broker的地址信息中包括:该broker所属的机房的信息;
19.所述rocketmq客户端根据当前可用的各个broker的地址信息选择本次传输对应的broker,包括:
20.所述rocketmq客户端得到本地预先配置的该rocketmq客户端所属的机房的信息;
21.所述rocketmq客户端判断在当前可用的各个broker的地址信息中是否存在第一broker的地址信息;其中,第一broker的地址信息中所包括的该第一broker所属的机房的信息与该rocketmq客户端所属的机房的信息相同;
22.如果是,则选择第一broker作为本次传输对应的broker;
23.如果否,则根据当前可用的各个broker的地址信息随机选择或者利用负载均衡的原则选择本次传输对应的broker。
24.该方法进一步包括:每一个机房中的管理装置定期探测所有机房中的name server是否处于可用状态;并在接收到本机房的rocketmq客户端的获取请求时,向该rocketmq客户端返回所有机房中当前可用的各个name server的地址信息;
25.和/或,
26.该方法进一步包括:
27.每一个机房中的name server接收所有机房中的broker周期性发送的心跳消息;
28.如果该name server在每一个周期内均接收到所有机房中的broker发来的心跳消息,则确定所有机房中的broker当前可用,并在接收到rocketmq客户端发来的请求时,向该rocketmq客户端返回注册到该name server的所有broker的地址信息;
29.如果该name server在一个周期内未接收到一个机房中的broker发来的心跳消息,则确定该机房中的broker当前不可用;则在接收到rocketmq客户端发来的请求时,从注册到该name server的所有broker的地址信息中删除该机房的broke的地址信息,然后向rocketmq客户端返回删除了该机房中的broke的地址信息后的各个broker的地址信息。
30.其中,所述rocketmq客户端为:rocketmq的消费者;
31.在所述rocketmq客户端接收所选择的name server发来的所有机房中当前可用的各个broker的地址信息之后,进一步包括:
32.该rocketmq的消费者基于当前订阅的主题topic,利用除本机房之外的其他机房
中当前可用的各个broker的地址信息,查看在其他机房中当前可用的各个broker是否连接有rocketmq消费者;
33.如果存在一个其他机房中的broker未连接有rocketmq消费者,则将该其他机房中的broker需要发送的消息分配到进行查看的该rocketmq的消费者。
34.其中,所述每一个机房中的name server获取所有机房中的broker的地址,包括:每一个机房中的broker在开机后,从该机房中的管理装置处获取所有机房中的name server的地址,该broker根据所获取的所有机房中的name server的地址,分别向所有机房中的name server注册该broker的地址。
35.其中,所述基于rocketmq协议的组件包括:sofamq组件;则,所述broker为:sofamq的broker;所述name server为:sofamq的name server,所述rocketmq客户端为:sofamq客户端;所述管理装置为:acvip服务器。
36.根据第二方面,提供了rocketmq客户端,其中,该客户端包括:
37.第一交互处理模块,配置为从所在机房的管理装置处获取所有机房中当前可用的各个name server的地址信息;
38.第二交互处理模块,配置为根据当前可用的各个name server的地址信息,选择本次传输对应的name server,向所选择的name server发送请求;接收所选择的name server发来的所有机房中当前可用的各个broker的地址信息;
39.第三交互处理模块,配置为根据当前可用的各个broker的地址信息,选择本次传输对应的broker,并与所选择的broker进行消息的传输。
40.根据第三方面,提供了针对rocketmq的多机房容灾系统,该系统架构中包括n个机房,n为大于1的正整数,每一个机房中都部署有如下五种角色的组件:rocketmq客户端、位于服务端的name serverr、位于服务端的broke以及位于服务端的管理装置;其中,rocketmq客户端包括位于客户端的生产者以及位于客户端的消费者;
41.管理装置,配置为保存所有机房中name server的地址信息,向rocketmq客户端提供所有机房中当前可用的name server的地址信息;
42.rocketmq客户端,配置为从本机房的管理装置处获取所有机房中当前可用的各个name server的地址信息;根据当前可用的各个name server的地址信息,选择本次传输对应的name server,向所选择的name server发送请求;接收所选择的name server发来的所有机房中当前可用的各个broker的地址信息;根据当前可用的各个broker的地址信息,选择本次传输对应的broker,并与所选择的broker进行消息的传输;
43.name server,配置为获取所有机房中的broker的地址,向rocketmq客户端返回所有机房中当前可用的各个broker的地址信息;
44.broker,用于与rocketmq客户端传输消息。
45.根据第四方面,提供了一种计算机可读存储介质,其上存储有计算机程序,当所述计算机程序在计算机中执行时,令计算机执行本说明书任一实施例所述的方法。
46.根据第五方面,提供了一种计算设备,包括存储器和处理器,所述存储器中存储有可执行代码,所述处理器执行所述可执行代码时,实现本说明书任一实施例所述的方法。
47.本说明书实施例提供的针对rocketmq这种分布式消息中间件的多机房容灾方法和系统以及rocketmq客户端,在rocketmq的系统架构中设置了管理装置,通过该管理装置
能够实时更新所有机房中当前可用的各个name server,从而将未发生故障即当前可用的各个name server的地址信息发送给rocketmq客户端,这样,即使rocketmq客户端所在机房中的name server发生故障,因为rocketmq客户端已经获取了其他机房中可用的name server的地址信息,因此,可以自动地、第一时间将其他机房中可用的name server选择为传输对应的name server,从而及时将业务切换到其他机房的name server;同时,因为每一个机房中的name server都获取了所有机房中broker的地址信息,并将未发生故障即当前可用的各个broker的地址信息发送给rocketmq客户端,这样,即使rocketmq客户端所在机房中的broker发生故障,因为rocketmq客户端已经获取了其他机房中可用的broker的地址信息,因此,可以自动地、第一时间将其他机房中可用的broker选择为传输对应的broker,从而及时将业务切换到其他机房中的broker。可见,本说明书实施例能够在name server或者broker故障时,保证业务切换过程中服务的连续性,从而提升了业务的无缝切换能力。
附图说明
48.为了更清楚地说明本说明书实施例或现有技术中的技术方案,下面将对实施例或现有技术描述中所需要使用的附图作简单地介绍,显而易见地,下面描述中的附图是本说明书的一些实施例,对于本领域普通技术人员来讲,在不付出创造性劳动的前提下,还可以根据这些附图获得其他的附图。
49.图1是rocketmq的系统架构示意图。
50.图2是本说明书一个实施例所应用的系统架构的示意图。
51.图3是本说明书一个实施例中针对rocketmq分布式消息中间件的多机房容灾方法的流程图。
52.图4是本说明书一个实施例中rocketmq客户端选择本次传输对应的name server的方法流程图。
53.图5是本说明书一个实施例中rocketmq客户端选择本次传输对应的broker的方法流程图。
54.图6是本说明书一个实施例中rocketmq客户端的结构示意图。
55.图7是本说明书另一个实施例中rocketmq客户端的结构示意图。
具体实施方式
56.参见图1,在rocketmq的系统架构中包括如下四种角色的组件:位于客户端的生产者、位于客户端的消费者、位于服务端的注册服务器(称为name server)以及位于服务端的消息服务器(称为broker)。
57.其中,name server:可集群部署,提供命名服务以及更新和发现本机房中的broker的服务。
58.broker:可集群部署,消息中转角色,负责存储消息以及转发消息。通常分为主broker(master broker)和从broker(slave broker),定期向name server上报topic路由信息。
59.生产者:负责生产并发布消息,定期从name server读取topic路由信息,并与提供topic服务的broker建立连接,将生产的消息发送给broker,且定时向broker发送心跳。
60.消费者:定期从name server拉取topic路由信息,并与提供topic服务的broker建立连接,从而通过该连接从broker中订阅生产者所发布的消息;定时向broker发送心跳。
61.在多机房的容灾方案中,在每一个机房中都部署有上述四种角色的组件。
62.但是如前所述,在现有技术中,在利用图1所示的rocketmq的系统架构实现多机房的容灾方案时,当提供服务的一个机房中的rocketmq组件发生故障时,需要管理人员手动切换至另一个机房中的rocketmq组件,从而导致业务切换过程中的服务中断。
63.下面结合附图,对本说明书提供的方案进行描述。
64.首先需要说明的是,在本发明实施例中使用的术语是仅仅出于描述特定实施例的目的,而非旨在限制本发明。在本发明实施例和所附权利要求书中所使用的单数形式的“一种”、“所述”和“该”也旨在包括多数形式,除非上下文清楚地表示其他含义。
65.应当理解,本文中使用的术语“和/或”仅仅是一种描述关联对象的关联关系,表示可以存在三种关系,例如,a和/或b,可以表示:单独存在a,同时存在a和b,单独存在b这三种情况。另外,本文中字符“/”,一般表示前后关联对象是一种“或”的关系。
66.取决于语境,如在此所使用的词语“如果”可以被解释成为“在
……
时”或“当
……
时”或“响应于确定”或“响应于检测”。类似地,取决于语境,短语“如果确定”或“如果检测(陈述的条件或事件)”可以被解释成为“当确定时”或“响应于确定”或“当检测(陈述的条件或事件)时”或“响应于检测(陈述的条件或事件)”。
67.为了方便对本说明书提供的方法进行理解,首先对本说明书所涉及和适用的系统架构进行描述。如图2中所示,该系统架构中主要包括n个机房,n为大于1的正整数,每一个机房中都部署有如下五种角色的组件:位于客户端的生产者、位于客户端的消费者、位于服务端的name serverr、位于服务端的broke以及位于服务端的管理装置。
68.name server可以集群部署,也就是说,在每一个机房中,可以部署多个name server。broke也可以集群部署,也就是说,在每一个机房中,可以部署多个broke,并且,在每一个机房中,可以部署有主broke以及从broke,从而实现broke的备份。当然,管理装置也可以集群部署。其中,一个集群下包含多个生产者实例,可以是多台机器,也可以是一台机器的多个进程,或者一个进程的多个生产者对象。
69.应该理解,图2中的生产者、消费者、name server、broker以及管理装置的数目仅仅是示意性的。根据实现需要,可以选择和布设任意数目。
70.图3是本说明书一个实施例中针对rocketmq的多机房容灾方法的流程图。该方法的执行主体为针对rocketmq的多机房容灾系统。可以理解,该方法也可以通过任何具有计算、处理能力的装置、设备、平台、设备集群来执行。参见图2、图3,每一个机房中均包括基于rocketmq协议的各个组件,该基于rocketmq协议的各个组件中包括预先设置的管理装置;该方法包括:
71.步骤301:每一个机房中的name server预先获取所有机房中的broker的地址。
72.步骤303:当一个机房中的rocketmq客户端需要传输消息时,该rocketmq客户端从本机房的管理装置处获取所有机房中当前可用的各个name server的地址信息。
73.步骤305:该rocketmq客户端根据当前可用的各个name server的地址信息,选择本次传输对应的name server,向所选择的name server发送请求。
74.步骤307:该rocketmq客户端接收所选择的name server发来的所有机房中当前可
用的各个broker的地址信息。
75.步骤309:该rocketmq客户端根据当前可用的各个broker的地址信息,选择本次传输对应的broker,并与所选择的broker进行消息的传输。
76.根据上述图3所示的流程可以看出,在本说明书实施例中,在rocketmq的系统架构中设置了管理装置,通过该管理装置能够实时更新所有机房中当前可用的各个name server,从而将未发生故障即当前可用的各个name server的地址信息发送给rocketmq客户端,这样,即使rocketmq客户端所在机房中的name server发生故障,因为rocketmq客户端已经获取了其他机房中可用的name server的地址信息,因此,可以自动地、第一时间将其他机房中可用的name server选择为传输对应的name server,从而及时将业务切换到其他机房的name server;同时,因为每一个机房中的name server都获取了所有机房中broker的地址信息,并将未发生故障即当前可用的各个broker的地址信息发送给rocketmq客户端,这样,即使rocketmq客户端所在机房中的broker发生故障,因为rocketmq客户端已经获取了其他机房中可用的broker的地址信息,因此,可以自动地、第一时间将其他机房中可用的broker选择为传输对应的broker,从而及时将业务切换到其他机房中的broker。可见,本说明书实施例能够在name server或者broker故障时,保证业务切换过程中服务的连续性,从而提升了业务的无缝切换能力。
77.在上述图3所示流程中,rocketmq客户端可以是rocketmq的生产者,这样,图3所示流程中的“传输消息”为发布消息,即rocketmq的生产者需要向broker发布消息。
78.在上述图3所示流程中,rocketmq客户端也可以是rocketmq的消费者,这样,图3所示流程中的“传输消息”为订阅消息,即rocketmq的消费者需要从broker中订阅消息。
79.图3所示的方法可以应用于基于rocketmq实现的任意一种分布式消息中间件的业务系统中。比如,目前出现了sofastack消息队列(sofastack mq,简称sofamq)。sofamq是基于rocketmq演进而来,即基于rocketmq构建的分布式消息中间件。sofamq与金融分布式架构sofastack深度集成,为分布式应用系统提供异步解耦和削峰填谷的能力,支持事务消息、顺序消息、定时消息等多种消息类型,同时具备高可靠、高吞吐、低延时等金融级特性。因此,当图3所示的方法应用于针对sofamq的多机房容灾时,在图3所示的流程中,基于rocketmq协议的组件包括:sofamq组件;则,broker为:sofamq的broker;name server为:sofamq的name server;rocketmq客户端为:sofamq客户端;管理装置可以为:sofamq中的acvip服务器。
80.为了更清楚地体现本说明书实施例的实现过程,下面结合图2,对图3中的每一个步骤分别进行说明。
81.首先在执行步骤301之前,可以进行预配置处理,比如包括步骤300:在管理装置中预先设置所有机房中的name server的地址信息。
82.在本说明书实施例中,每一个机房中的管理装置可以定期探测所有机房中的name server是否处于可用状态,从而及时获取当前可用的name server。
83.接下来对于步骤301:每一个机房中的name server获取所有机房中的broker的地址。
84.本步骤301也可以是预配置处理,即预先在name server设置所有机房中的broker的地址。
server。如果一共存在两个机房,则直接选择另一个机房中的name server,如果存在多于两个的机房,也就是说,虽然本机房中的name server当前不可用,但是,其他多个机房中的多个name server当前可用,则可以随机选择或者利用负载均衡的原则选择本次传输对应的name server。
98.接下来对于步骤307:该rocketmq客户端接收所选择的name server发来的所有机房中当前可用的各个broker的地址信息。
99.如前所述,每一个name server可以通过所有机房中的broker主动注册的方式来获取到所有机房中的broker信息。并且,每一个name server可以周期性的探测各个机房中的broker是否在正常工作,即当前是否可用,从而在rocketmq客户端发来请求时,能够确定是否向rocketmq客户端返回一个机房中的broker的地址信息。具体实现过程包括:
100.步骤3071:每一个机房中的name server接收所有机房中的broker周期性发送的心跳消息;
101.步骤3073:如果该name server在每一个周期内均接收到所有机房中的broker发来的心跳消息,则确定所有机房中的broker当前可用,并在接收到rocketmq客户端发来的请求时,向该rocketmq客户端返回注册到该name server的所有broker的地址信息;
102.步骤3075:如果该name server在一个周期内未接收到一个机房比如机房1中的broker发来的心跳消息,则确定该机房1中的broker当前不可用;则在接收到rocketmq客户端发来的请求时,从注册到该name server的所有broker的地址信息中删除机房1的broke的地址信息,然后向rocketmq客户端返回删除了该机房1中的broke的地址信息后的各个broker的地址信息。
103.通过本步骤307的处理,name server可以在本机房中的broke发生故障时,将其他机房中的broke的地址信息发送给rocketmq客户端,从而保证rocketmq客户端能够与其他机房中正常工作的broke进行消息的传输,从而在本机房的broke发生故障时,实现容灾。
104.通过本步骤307,rocketmq客户端根据订阅的(rocketmq客户端为rocketmq消费者)或者发送的(rocketmq客户端为rocketmq生产者)topic,得到对应的broker或者broker集群的地址。
105.接下来对于步骤309:该rocketmq客户端根据当前可用的各个broker的地址信息,选择本次传输对应的broker,并与所选择的broker进行消息的传输。
106.在本说明书一个实施例中,broker的地址信息中包括:该broker所属的机房的信息。也就是说,rocketmq客户端从所选择的name serve中获取了关于当前可用的各个broker的地址列表,该列表中的每一行对应一个broker,每一列中,包括了对应broker的地址以及该broker所属的机房的信息,比如机房的编号。相应地,参见图5,本步骤309中,rocketmq客户端根据当前可用的各个broker的地址信息选择本次传输对应的broker,包括:
107.步骤3091:rocketmq客户端得到本地预先配置的该rocketmq客户端所属的机房的信息,比如机房的编号。
108.步骤3093:所述rocketmq客户端判断在所获取的当前可用的各个broker的地址信息中是否存在第一broker的地址信息;其中,第一broker的地址信息中所包括的该第一broker所属的机房的信息与该rocketmq客户端所属的机房的信息相同;如果是,则执行步
骤3095,否则执行步骤3097。
109.也就是说,在本步骤3093中,rocketmq客户端可以判断本机房中的broker当前是否可用,如果存在第一broker的地址信息,则说明本机房中的broker当前可用,则会基于本地优先的原则或者说本机房收敛的原则,选择本机房中的broker来进行消息的传输,从而保证消息的收敛。
110.步骤3095:rocketmq客户端选择第一broker即本机房中的broker作为本次传输对应的broker,结束当前选择本次传输对应的broker的流程。
111.步骤3097:rocketmq客户端根据当前可用的各个broker的地址信息随机选择或者利用负载均衡的原则选择本次传输对应的broker。
112.执行到本步骤3097时,说明不存在第一broker的地址信息,则说明本机房中的broker发生了故障,当前不可用,因此,rocketmq客户端会选择其他机房中的broker。如果一共存在两个机房,则直接选择另一个机房中的broker,如果存在多于两个的机房,也就是说,虽然本机房中的broker当前不可用,但是,其他多个机房中的多个broker当前可用,则可以随机选择或者利用负载均衡的原则选择本次传输对应的broker。
113.通过上述图3所示的处理,一个机房中的rocketmq客户端能够感知到所有机房中的所有name serve和/或感知到所有机房中的所有broker,从而实现了各个机房的业务的打通。
114.进一步地,rocketmq客户端在从当前可用的name serve中选择本次传输对应的name serve时,优先选择同一机房中的name serve,从而在同一机房中的name serve未发生故障时,实现业务的本地收敛,即消息在同一机房内完成发布及订阅。
115.进一步地,rocketmq客户端在从当前可用的broker中选择本次传输对应的broker时,优先选择同一机房中的broker,从而在同一机房中的broker未发生故障时,实现业务的本地收敛,即消息在同一机房内完成发布及订阅。
116.基于上述本说明书实施例的方法,可以实现对name serve以及对broker的容灾。在本说明的另一个实施例中,还可以进一步实现对rocketmq的消费者的容灾。具体的实现方法包括:
117.在步骤307中,rocketmq客户端接收所选择的name server发来的所有机房中当前可用的各个broker的地址信息,此后,可以进一步包括:
118.步骤600:该rocketmq的消费者得到除本机房之外的其他机房中当前可用的各个broker的地址信息;
119.步骤602:该rocketmq的消费者基于当前订阅的主题topic,利用其他机房中当前可用的各个broker的地址信息,查看在其他机房中当前可用的各个broker是否连接有rocketmq消费者;
120.步骤604:如果存在一个其他机房中的broker未连接有rocketmq消费者,则将该其他机房中的broker需要发送的消息分配到进行查看的该rocketmq的消费者。
121.当一个机房比如机房1中的所有rocketmq消费者发生故障,无法订阅消息时,该机房1中的rocketmq生产者却还在一直发布消息,这时按机房收敛的模式无消费者去消费机房1中broker内堆积的消息,长时间堆积很可能对业务产生影响。通过上述步骤400、步骤402以及步骤404的处理,比如机房2中的rocketmq消费者会感知其他在线的rocketmq消费
者,是否有某个机房比如机房1中不存在rocketmq消费者,如果不存在的话,那么机房2中的rocketmq消费者会去消费机房1中的消息,从而使消息不会堆积,实现了对rocketmq消费者的容灾。
122.如前所述,本说明书实施例的方法可以应用于针对sofamq的多机房容灾。下面结合sofamq,来分别说明针对sofamq的多机房容灾方法在各种业务场景下的实现过程。针对sofamq,管理装置即acvip定期获取了所有机房中当前可用的name server,name server预先获取了所有机房中所有broker的地址信息。并且,以总共有两个机房为例进行说明,如下:
123.业务场景1:机房中的各个组件均正常工作。
124.在该业务场景1中,以机房1为例,结合图3所示流程,针对sofamq的多机房容灾方法包括:机房1中的sofamq客户端需要传输消息时,从本机房的acvip处获取所有机房中当前可用的各个name server的地址信息;该sofamq客户端选择本地即机房1中的name server,向机房1中的name server发送请求;sofamq客户端接收name server发来的所有机房中当前可用的各个broker的地址信息;该rocketmq客户端选择本地即机房1中的broker,并与本地的broker进行消息的传输。
125.可见,在机房1中的各个sofamq组件均正常工作时,sofamq生产者发布消息时,会采用本地优先的原则,实现机房收敛,即消息在本机房1内完成发布;同样,sofamq消费者订阅消息时,会采用本地优先的原则,实现机房收敛,即消息在本机房1内完成订阅。
126.业务场景2:一个机房中的所有broker全部故障,无法正常工作。
127.在该业务场景2中,以机房1为例,结合图3所示流程,针对sofamq的多机房容灾方法包括:机房1中的sofamq客户端需要传输消息时,从本机房的acvip处获取所有机房中当前可用的各个name server的地址信息;该sofamq客户端选择本地即机房1中的name server,向机房1中的name server发送请求;因为机房1中的所有broker全部故障,这时机房1中的name server因为长时间未收到机房1中的broker发来的心跳消息,因此会剔除机房1中broker的地址信息,仅存在机房2中broker的地址信息,当机房1中的name server接收到机房1中的sofamq客户端发来的请求时,向该sofamq客户端提供机房2中broker的地址信息,从而使得机房1中的sofamq客户端与机房2中的broker进行消息的传输。
128.可见,在机房1中的broker故障,无法正常工作时,sofamq生产者发布消息时,无法再采用本地优先的原则,而是利用另一个机房中的broker来实现消息的发布;同样,sofamq消费者订阅消息时,无法再采用本地优先的原则,而是利用另一个机房中的broker来实现消息的订阅,从而实现在broker故障时的容灾处理。
129.业务场景3:一个机房中的所有broker和name server均故障,均无法正常工作。
130.在该业务场景3中,以机房1为例,结合图3所示流程,针对sofamq的多机房容灾方法包括:机房1中的sofamq客户端需要传输消息时,从本机房的acvip处获取所有机房中当前可用的各个name server的地址信息,因为机房1中的name server故障,acvip会探测到该故障,因此,会剔除机房1中的name server的地址信息,所以机房1中的sofamq客户端获取到的是机房2中的name server的地址信息;机房1中的sofamq客户端根据机房2中name server的地址信息,向机房2中的name server发送请求;因为机房1中的所有broker全部故障,这时机房2中的name server因为长时间未收到机房1中的broker发来的心跳消息,因此
会剔除机房1中broker的地址信息,仅存在机房2中broker的地址信息,当机房2中的name server接收到机房1中的sofamq客户端发来的请求时,向该sofamq客户端提供机房2中broker的地址信息,从而使得机房1中的sofamq客户端通过机房2中的name server与机房2中的broker进行消息的传输。
131.可见,在机房1中的broker和name server均故障,均无法正常工作时,sofamq生产者发布消息时,无法再采用本地优先的原则,而是利用另一个机房中的name server和broker来实现消息的发布;同样,sofamq消费者订阅消息时,无法再采用本地优先的原则,而是利用另一个机房中的name server和broker来实现消息的订阅,从而实现在name server以及broker均故障时的容灾处理。
132.业务场景4:一个机房中的所有sofamq消费者均故障。
133.在该业务场景4中,以机房1为例,结合图3所示流程,针对sofamq的多机房容灾方法包括:机房1中的sofamq生产者通过本机房1中的name server向本机房1中的broker发布了消息,但是由于sofamq消费者故障,无法正常工作,因此,消息在本机房1中的broker中进行了堆积。机房2中的sofamq消费者因为也能得到机房1中的broker的地址信息,因此,sofamq消费者基于当前订阅的主题topic,利用机房1中broker的地址信息,查看在机房1中的broker上是否连接有sofamq消费者;如果机房1中的broker未连接有sofamq消费者,则将需要从机房1中的broker处订阅的消息分配到机房2中的sofamq消费者,即由机房2中的sofamq消费者从机房1中的broker处订阅消息。
134.可见,在机房1中的sofamq消费者故障,无法正常工作时,机房2中的sofamq消费者能够从机房1中的broker处订阅消息,从而避免了消息在机房1中的broker处产生堆积,从而实现在消费者故障时的容灾处理。
135.在本说明书一个实施例中,可以通过修改对应rocketmq客户端比如sofamq客户端的软件开发工具包(software development kit,sdk)来实现sofamq客户端的功能。
136.在本说明书的一个实施例中,提供了一种rocketmq客户端,参见图6,该客户端包括:
137.第一交互处理模块601,配置为从所在机房的管理装置处获取所有机房中当前可用的各个name server的地址信息;
138.第二交互处理模块602,配置为根据当前可用的各个name server的地址信息,选择本次传输对应的name server,向所选择的name server发送请求;接收所选择的name server发来的所有机房中当前可用的各个broker的地址信息;
139.第三交互处理模块603,配置为根据当前可用的各个broker的地址信息,选择本次传输对应的broker,并与所选择的broker进行消息的传输。
140.在图6所示的rocketmq客户端的一个实施例中,name server的地址信息中包括:该name server所属的机房的信息;
141.第二交互处理模块602被配置为执行:
142.得到本地预先配置的该rocketmq客户端所属的机房的信息;
143.判断在当前可用的各个name server的地址信息中是否存在第一name server的地址信息;其中,第一name server的地址信息中所包括的该第一name server所属的机房的信息与该rocketmq客户端所属的机房的信息相同;
144.如果是,则选择第一name server作为本次传输对应的name server;
145.如果否,则根据当前可用的各个name server的地址信息随机选择或者利用负载均衡的原则选择本次传输对应的name server。
146.在图6所示的rocketmq客户端的一个实施例中,broker的地址信息中包括:该broker所属的机房的信息;
147.第三交互处理模块603被配置为执行:
148.得到本地预先配置的该rocketmq客户端所属的机房的信息;
149.判断在当前可用的各个broker的地址信息中是否存在第一broker的地址信息;其中,第一broker的地址信息中所包括的该第一broker所属的机房的信息与该rocketmq客户端所属的机房的信息相同;
150.如果是,则选择第一broker作为本次传输对应的broker;
151.如果否,则根据当前可用的各个broker的地址信息随机选择或者利用负载均衡的原则选择本次传输对应的broker。
152.在图6所示的rocketmq客户端的一个实施例中,进一步包括:心跳消息处理模块(图中未示出);
153.心跳消息处理模块,配置为在rocketmq客户端正常工作时,在每一个周期中向所有机房中的name serverr发送心跳消息。
154.参见图7,在本说明书的rocketmq客户端的一个实施例中,该rocketmq客户端为:rocketmq的消费者;且该rocketmq客户端进一步包括:消费者容灾处理模块701;
155.消费者容灾处理模块701被配置为执行:
156.基于当前订阅的主题topic,利用除本机房之外的其他机房中当前可用的各个broker的地址信息,查看在其他机房中当前可用的各个broker是否连接有rocketmq消费者;如果存在一个其他机房中的broker未连接有rocketmq消费者,则订阅该其他机房中的broker需要发送的消息到本rocketmq的消费者。
157.在本说明书一个实施例中,图6、7所示的所述rocketmq客户端为rocketmq的生产者,则传输消息为发布消息。
158.在本说明书一个实施例中,图6、7所示的所述rocketmq客户端为rocketmq的消费者,则传输消息为订阅消息。
159.在本说明书的一个实施例中,提供了一种针对rocketmq的多机房容灾系统,参见图2,该系统架构中包括n个机房,n为大于1的正整数,每一个机房中都部署有如下五种角色的组件:rocketmq客户端、位于服务端的name serverr、位于服务端的broke以及位于服务端的管理装置;其中,rocketmq客户端包括位于客户端的生产者以及位于客户端的消费者;
160.管理装置,配置为保存所有机房中name server的地址信息,并定期探测所有机房中当前可用的name server,向rocketmq客户端提供所有机房中当前可用的name server的地址信息;
161.rocketmq客户端,配置为从本机房的管理装置处获取所有机房中当前可用的各个name server的地址信息;根据当前可用的各个name server的地址信息,选择本次传输对应的name server,向所选择的name server发送请求;接收所选择的name server发来的所有机房中当前可用的各个broker的地址信息;根据当前可用的各个broker的地址信息,选
择本次传输对应的broker,并与所选择的broker进行消息的传输;
162.name server,配置为获取所有机房中的broker的地址,向rocketmq客户端返回所有机房中当前可用的各个broker的地址信息;
163.broker,用于与rocketmq客户端传输消息。
164.其中,name server可以集群部署,也就是说,在每一个机房中,可以部署多个name server。broke也可以集群部署,也就是说,在每一个机房中,可以部署多个broke,并且,在每一个机房中,可以部署有主broke以及从broke,从而实现broke的备份。当然,管理装置也可以集群部署。其中,一个集群下包含多个生产者实例,可以是多台机器,也可以是一台机器的多个进程,或者一个进程的多个生产者对象。
165.图2所示系统中每一个组件的功能及连接关系还可以参考上述各个实施例中的说明,比如对图3、图4、图5所示流程的说明。
166.需要说明的是,上述各组件通常实现于服务器端,可以分别设置于独立的服务器,也可以其中部分或全部装置的组合设置于同一服务器。该服务器可以是单个的服务器,也可以是由多个服务器组成的服务器集群,服务器可以是云服务器,又称为云计算服务器或云主机,是云计算服务体系中的一项主机产品。上述各组件还可以实现于具有较强计算能力的计算机终端。
167.本说明书一个实施例提供了一种计算机可读存储介质,其上存储有计算机程序,当所述计算机程序在计算机中执行时,令计算机执行说明书中任一个实施例中的方法。
168.本说明书一个实施例提供了一种计算设备,包括存储器和处理器,所述存储器中存储有可执行代码,所述处理器执行所述可执行代码时,实现执行说明书中任一个实施例中的方法。
169.可以理解的是,本说明书实施例示意的结构并不构成对本说明书实施例的装置的具体限定。在说明书的另一些实施例中,上述装置可以包括比图示更多或者更少的部件,或者组合某些部件,或者拆分某些部件,或者不同的部件布置。图示的部件可以以硬件、软件或者软件和硬件的组合来实现。
170.本说明书中的各个实施例均采用递进的方式描述,各个实施例之间相同相似的部分互相参见即可,每个实施例重点说明的都是与其他实施例的不同之处。尤其,对于装置实施例而言,由于其基本相似于方法实施例,所以描述的比较简单,相关之处参见方法实施例的部分说明即可。
171.本领域技术人员应该可以意识到,在上述一个或多个示例中,本发明所描述的功能可以用硬件、软件、挂件或它们的任意组合来实现。当使用软件实现时,可以将这些功能存储在计算机可读介质中或者作为计算机可读介质上的一个或多个指令或代码进行传输。
172.以上所述的具体实施方式,对本发明的目的、技术方案和有益效果进行了进一步详细说明,所应理解的是,以上所述仅为本发明的具体实施方式而已,并不用于限定本发明的保护范围,凡在本发明的技术方案的基础之上,所做的任何修改、等同替换、改进等,均应包括在本发明的保护范围之内。

技术特征:
1.针对rocketmq的多机房容灾方法,其中,每一个机房中均包括基于rocketmq协议的各个组件,该基于rocketmq协议的各个组件中包括预先设置的管理装置;该方法包括:每一个机房中的name server获取所有机房中的broker的地址;当一个机房中的rocketmq客户端需要传输消息时,该rocketmq客户端从本机房的管理装置处获取所有机房中当前可用的各个name server的地址信息;该rocketmq客户端根据当前可用的各个name server的地址信息,选择本次传输对应的name server,向所选择的name server发送请求;该rocketmq客户端接收所选择的name server发来的所有机房中当前可用的各个broker的地址信息;该rocketmq客户端根据当前可用的各个broker的地址信息,选择本次传输对应的broker,并与所选择的broker进行消息的传输。2.根据权利要求1所述的方法,其中,name server的地址信息中包括:该name server所属的机房的信息;所述rocketmq客户端根据当前可用的各个name server的地址信息选择本次传输对应的name server,包括:所述rocketmq客户端得到本地预先配置的该rocketmq客户端所属的机房的信息;所述rocketmq客户端判断在当前可用的各个name server的地址信息中是否存在第一name server的地址信息;其中,第一name server的地址信息中所包括的该第一name server所属的机房的信息与该rocketmq客户端所属的机房的信息相同;如果是,则选择第一name server作为本次传输对应的name server;如果否,则根据当前可用的各个name server的地址信息随机选择或者利用负载均衡的原则选择本次传输对应的name server。3.根据权利要求1所述的方法,其中,broker的地址信息中包括:该broker所属的机房的信息;所述rocketmq客户端根据当前可用的各个broker的地址信息选择本次传输对应的broker,包括:所述rocketmq客户端得到本地预先配置的该rocketmq客户端所属的机房的信息;所述rocketmq客户端判断在当前可用的各个broker的地址信息中是否存在第一broker的地址信息;其中,第一broker的地址信息中所包括的该第一broker所属的机房的信息与该rocketmq客户端所属的机房的信息相同;如果是,则选择第一broker作为本次传输对应的broker;如果否,则根据当前可用的各个broker的地址信息随机选择或者利用负载均衡的原则选择本次传输对应的broker。4.根据权利要求1所述的方法,其中,该方法进一步包括:每一个机房中的管理装置定期探测所有机房中的name server是否处于可用状态;并在接收到本机房的rocketmq客户端的获取请求时,向该rocketmq客户端返回所有机房中当前可用的各个name server的地址信息;和/或,该方法进一步包括:
每一个机房中的name server接收所有机房中的broker周期性发送的心跳消息;如果该name server在每一个周期内均接收到所有机房中的broker发来的心跳消息,则确定所有机房中的broker当前可用,并在接收到rocketmq客户端发来的请求时,向该rocketmq客户端返回注册到该name server的所有broker的地址信息;如果该name server在一个周期内未接收到一个机房中的broker发来的心跳消息,则确定该机房中的broker当前不可用;则在接收到rocketmq客户端发来的请求时,从注册到该name server的所有broker的地址信息中删除该机房的broke的地址信息,然后向rocketmq客户端返回删除了该机房中的broke的地址信息后的各个broker的地址信息。5.根据权利要求1所述的方法,其中,所述rocketmq客户端为:rocketmq的消费者;在所述rocketmq客户端接收所选择的name server发来的所有机房中当前可用的各个broker的地址信息之后,进一步包括:该rocketmq的消费者基于当前订阅的主题topic,利用除本机房之外的其他机房中当前可用的各个broker的地址信息,查看在其他机房中当前可用的各个broker是否连接有rocketmq消费者;如果存在一个其他机房中的broker未连接有rocketmq消费者,则将该其他机房中的broker需要发送的消息分配到进行查看的该rocketmq的消费者。6.根据权利要求1所述的方法,其中,所述每一个机房中的name server获取所有机房中的broker的地址,包括:每一个机房中的broker在开机后,从该机房中的管理装置处获取所有机房中的name server的地址,该broker根据所获取的所有机房中的name server的地址,分别向所有机房中的name server注册该broker的地址。7.根据权利要求1所述的方法,其中,所述rocketmq客户端为rocketmq的生产者,则传输消息为发布消息;或者,所述rocketmq客户端为rocketmq的消费者,则传输消息为订阅消息。8.根据权利要求1所述的方法,其中,所述基于rocketmq协议的组件包括:sofamq组件;则,所述broker为:sofamq的broker;所述name server为:sofamq的name server,所述rocketmq客户端为:sofamq客户端;所述管理装置为:acvip服务器。9.rocketmq客户端,其中,该客户端包括:第一交互处理模块,配置为从所在机房的管理装置处获取所有机房中当前可用的各个name server的地址信息;第二交互处理模块,配置为根据当前可用的各个name server的地址信息,选择本次传输对应的name server,向所选择的name server发送请求;接收所选择的name server发来的所有机房中当前可用的各个broker的地址信息;第三交互处理模块,配置为根据当前可用的各个broker的地址信息,选择本次传输对应的broker,并与所选择的broker进行消息的传输。10.针对rocketmq的多机房容灾系统,该系统架构中包括n个机房,n为大于1的正整数,每一个机房中都部署有如下五种角色的组件:rocketmq客户端、位于服务端的name serverr、位于服务端的broke以及位于服务端的管理装置;其中,rocketmq客户端包括位于
客户端的生产者以及位于客户端的消费者;管理装置,配置为保存所有机房中name server的地址信息,向rocketmq客户端提供所有机房中当前可用的name server的地址信息;rocketmq客户端,配置为从本机房的管理装置处获取所有机房中当前可用的各个name server的地址信息;根据当前可用的各个name server的地址信息,选择本次传输对应的name server,向所选择的name server发送请求;接收所选择的name server发来的所有机房中当前可用的各个broker的地址信息;根据当前可用的各个broker的地址信息,选择本次传输对应的broker,并与所选择的broker进行消息的传输;name server,配置为获取所有机房中的broker的地址,向rocketmq客户端返回所有机房中当前可用的各个broker的地址信息;broker,用于与rocketmq客户端传输消息。11.一种计算设备,包括存储器和处理器,所述存储器中存储有可执行代码,所述处理器执行所述可执行代码时,实现权利要求1-8中任一项所述的方法。

技术总结
本说明书实施例提供了一种针对RocketMQ的多机房容灾方法和系统以及RocketMQ客户端。该方法包括:每一个机房中的Name Server获取所有机房中的Broker的地址;当一个机房中的RocketMQ客户端需要传输消息时,该RocketMQ客户端从本机房的管理装置处获取所有机房中当前可用的各个Name Server的地址信息;该RocketMQ客户端选择本次传输对应的Name Server,向所选择的Name Server发送请求;该RocketMQ客户端接收所选择的Name Server发来的所有机房中当前可用的各个Broker的地址信息,选择本次传输对应的Broker,并与所选择的Broker进行消息的传输。本说明书能够保证业务切换过程中的服务的连续性。切换过程中的服务的连续性。切换过程中的服务的连续性。


技术研发人员:汤天玮 李成宇
受保护的技术使用者:蚂蚁区块链科技(上海)有限公司
技术研发日:2023.07.14
技术公布日:2023/10/15
版权声明

本文仅代表作者观点,不代表航空之家立场。
本文系作者授权航家号发表,未经原创作者书面授权,任何单位或个人不得引用、复制、转载、摘编、链接或以其他任何方式复制发表。任何单位或个人在获得书面授权使用航空之家内容时,须注明作者及来源 “航空之家”。如非法使用航空之家的部分或全部内容的,航空之家将依法追究其法律责任。(航空之家官方QQ:2926969996)

飞行汽车 https://www.autovtol.com/

分享:

扫一扫在手机阅读、分享本文

相关推荐