扫二维码与项目经理沟通
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流
本篇内容主要讲解“relocating对Elasticsearch集群的影响是什么”,感兴趣的朋友不妨来看看。本文介绍的方法操作简单快捷,实用性强。下面就让小编来带大家学习“relocating对Elasticsearch集群的影响是什么”吧!
10年积累的网站设计、做网站经验,可以快速应对客户对网站的新想法和需求。提供各种问题对应的解决方案。让选择我们的客户得到更好、更有力的网络服务。我虽然不认识你,你也不认识我。但先网站设计制作后付款的网站建设流程,更有讷河免费网站建设让你可以放心的选择与我们合作。
分片移动结束后,target 节点会向 master 发起一个 shard-started PRC,该 RPC 具有次高优先级:URGENT, master 对该 RPC 的处理比较慢,尤其是在集群分片数量到几万的级别,从而可能导致一些较低优先级的 RPC 长时间来不及执行,例如 put-mapping,进而导致对业务的明显影响。
由于 rebalance,allocation filtering,forced awareness等任意原因产生的 shard-started RPC 都会存在这个问题,例如扩容集群的时候,如果把 rebalance 并发开的比较大,对 master 的处理能力造成明显影响。因此对于分片数较多的集群,当你想要加大 rebalance 和 recovery 并发的时候要考虑到对 master 的影响。
当一个主分片被 rebalance 或者手工 move 的时候,可以想象必然存在一个时间段该主分片无法写入。
Elasticsearch 对主分片的 relocating 也是直接 move,不会先将主分片资格让给其他副分片,再进行 move,即便如此,也会存在一个时间点进行切换,无法响应写入。
这个时间段从 RPC 的时序来看的话,如下图所示,红色标记的区域为阻塞写入流程的时间段。在该时间段内,客户端的写入请求会被阻塞无返回,直到这部分处理完成。

以手工 move 为例,当一个主分片从 node-idea move 到 node1时,主要有以下几个阶段完成:
首先会通过发布新的集群状态,将该分片标记为 RELOCATING状态,更新 routing_table 和 routing_nodes
数据节点收到集群状态后,开始执行 recovery 流程,该流程自 target 节点发送 start_recovery开始,至 Source 阶段返回 start_recovery response 结束。
recovery 完成后,target发送 shards-started RPC 给 master 节点,master 节点再次下发集群状态,将该分片标记为 STARTED 状态。
阻塞客户端 bulk 写入的阶段,就在 source 节点执行 finalize 阶段时,准备发送 handoff RPC 时开始,直到收到 master 新的集群状态,将该分片标记为 STARTED,每个节点应用集群状态的时间点略有差异,所以每个节点停止阻塞写入的时间点也会有微小差异。
handoff 的 RPC的作用是告知 target 节点该分片可以切换主分片状态,对 handoff RPC 的处理都是一些计算操作,其中涉及到几个锁,一般会很快完成。
整个主分片的 relocating 过程对写入的影响是很复杂的处理过程,我将他们划分为几个阶段,下面是详细过程。
在分片的整个 relocating 过程中,对 target 节点的请求都会被转发到 source 节点,直到 target 节点应用了 master 下发的分片变为 STARTED 的集群状态。
起初,写入请求到达 source 节点,像往常一样正常写入到主分片中,直到复制阶段。
此阶段将收到的写请求复制给 target 节点。他从收到 prepare_translog 的 response之后开始,直到 handoff 阶段开始之前。
1.将 target 分片加入到 replication group
收到 prepare_translog response之后,recovery 的 phase1阶段已结束,segments 文件发送完毕,target 节点的 engine 已启动。通过 shard.initiateTracking将 target 分片加入到复制组,后续的写入操作都会复制给 target 节点。
prepareEngineStep.whenComplete(prepareEngineTime -> {
runUnderPrimaryPermit(() -> shard.initiateTracking(request.targetAllocationId()),
shardId + " initiating tracking of " + request.targetAllocationId(), shard, cancellableThreads, logger);
final Translog.Snapshot phase2Snapshot = shard.getHistoryOperations("peer-recovery", historySource, startingSeqNo);
});
复制组信息在 ReplicationTracker 的 replicationGroup成员中维护。
2.在写入过程中,将写请求复制给 target
在写入流程的 performOnPrimary#doRun() 函数的 finishRequest方法中,最终调用 ReplicationOperation#handlePrimaryResult函数的 performOnReplicas方法将 index 发送给 target 节点。
handoff阶段是为了完成主分片状态的转移,该阶段对收到的写入请求放到队列中,待主分片状态转移完成后继续执行,最多阻塞30分钟。期间对 target 节点的写入也会被转发的 source 节点执行。此阶段从 source 节点收到 finalize 的 response之后开始,直到下一阶段。
1.recovery 过程设置阻塞标记
在 source 节点收到 finalize 的 response之后,通过 handoff RPC 交接主分片状态,这个交接过程通过 indexShardOperationPermits.blockOperations 对此时收到的写入请求进行阻塞,最多30分钟。
public void relocated(final String targetAllocationId, final Consumer consumer){
try {
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {//block 的范围就是这段括号里的代码
final ReplicationTracker.PrimaryContext primaryContext = replicationTracker.startRelocationHandoff(targetAllocationId);
try {//下面调用 RemoteRecoveryTargetHandler.handoffPrimaryContext
consumer.accept(primaryContext);//执行这里之后发送和响应了 handoff_primary_context RPC
}
});
}
}
indexShardOperationPermits.blockOperations 中的阻塞操作只是 queuedBlockOperations 加1,后面写入流程会检查 queuedBlockOperations 的值是否为0
2.写入流程的处理
产生在写入链路的 acquire函数:
acquire:255, IndexShardOperationPermits (org.elasticsearch.index.shard)
acquirePrimaryOperationPermit:2764, IndexShard (org.elasticsearch.index.shard)
handlePrimaryRequest:256, TransportReplicationAction
该函数判断 queuedBlockOperations 大于0,于是将他添加到 delayedOperations,写入流程到此结束。delayedOperations中的操作会在本阶段的阻塞过程结束后处理。
失败重试阶段自source 节点处理完 handoff response之后开始,直到 master 下发了将分片标记为 STARTED 的集群状态,数据节点应用了集群状态之后结束,在应用这个集群状态之前,分片处于 relocating 状态,期间的写入操作由 source 节点执行,并且会写入失败,抛出 ShardNotInPrimaryModeException 异常,并捕获该异常,等待1分钟后重试写入,再次失败则退出;如果1分钟内收到了新的集群状态,也会重试写入,然后写入成功。
写入流程中,执行 acquire 函数时,发现 queuedBlockOperations为0,执行onAcquired.onResponse(releasable),调用到 wrapPrimaryOperationPermitListener函数时,发现分片已经不是主分片状态,抛出 ShardNotInPrimaryModeException 异常。
if (replicationTracker.isPrimaryMode()) {
l.onResponse(r);
} else {
r.close();
l.onFailure(new ShardNotInPrimaryModeException(shardId, state));
}
对于上者返回的异常,写入流程 acquirePrimaryOperationPermit 设置的 Listener会区分异常类型,如果是 ShardNotInPrimaryModeException 异常,则等待1分钟后进行重试。
副分片的 move 过程与主分片的 move 过程相似,也是下发两次集群状态,通过 recovery 复制到 target 节点。
需要注意的是,recovery 复制分片到 target 节点,并非从副分片当前所在节点复制到 target,而是从主分片复制到 target。例如,主分片在节点 node-idea 上,副分片在 node-1,当我们从 node-1 move 到 node-2时,recovery 是从 node-idea 复制到 node-2。当 node-1应用主节点发布的分片状态变为 STARTED 的集群状态时,发现分片已经属于自己,于是删除本节点的分片。
与主分片的 relocating 类似的是,在复制阶段,会将正在 recovery 的分片添加到 replication group,这样 replication group 中就有了三个分片,主分片,原副分片,以及 move 到 target 节点的新的副分片。在复制阶段,写入操作会写入到这三个分片。
以手工 move 一个副分片为例,从 node-1 move 到 node-2时,当收到第一个第一个集群状态,分片被标记为 relocating,于是数据从 node-idea recovery 到 node-2。在此期间,新的 index 操作会由主分片节点复制到另外两个节点。如下图所示。
这个过程相当于先增加了一个副分片,执行副分片的 recovery 流程复制到新的 target。
当 recovery 执行完毕,master 下发第二个集群状态,分片被标记为 STARTED,node-1原来持有的分片被删除,如下图所示。
副分片的 relocating 过程不会有 handoff 阶段,整个 relocating 过程如同副分片的 recovery 过程一样,没有对写入进行阻塞的阶段。
到此,相信大家对“relocating对Elasticsearch集群的影响是什么”有了更深的了解,不妨来实际操作一番吧!这里是创新互联网站,更多相关内容可以进入相关频道进行查询,关注我们,继续学习!
我们在微信上24小时期待你的声音
解答本文疑问/技术咨询/运营咨询/技术建议/互联网交流