社区微信群开通啦,扫一扫抢先加入社区官方微信群
社区微信群
摘自:https://www.easyice.cn/archives/231
目录 [隐藏]
基于版本:5.5.3
recovery 是 es 数据恢复,保持数据一致性的过程,触发条件包括:从快照备份恢复,节点加入和离开,索引的_open操作等.
recovery 由clusterChanged触发,进入到:
1
2
3
4
|
applyNewOrUpdatedShards->
applyInitializingShard
|
根据数据分片性质,分为主分片和副本分片恢复流程.
主分片从 translog 自我恢复,副本分片从主分片拉取数据进行恢复.
经历的阶段为:
init: Recovery has not started
index: Reading index meta-data and copying bytes from source to destination
start: Starting the engine; opening the index for use
translog: Replaying transaction log
finalize: Cleanup
done: Complete
从
1
2
3
|
cluster.IndicesClusterStateService.clusterChanged
|
触发,进入
1
2
3
|
IndicesClusterStateService#applyInitializingShard
|
每次处理一个 shard
1
2
3
4
5
6
7
8
9
10
11
12
|
if (isPeerRecovery(shardRouting)) {// 从远程主分片恢复
try {
RecoveryState.Type type = shardRouting.primary() ? RecoveryState.Type.RELOCATION : RecoveryState.Type.REPLICA;
recoveryTarget.startRecovery(indexShard, type, sourceNode, new PeerRecoveryListener(shardRouting, indexService, indexMetaData));
} else {//Primary 进行自我恢复,不需要其他节点的支持
indexService.shard(shardId).recoverFromStore(shardRouting, new StoreRecoveryService.RecoveryListener() {
...
});
}
}
|
实现的主要思路是:系统的每次 flush 操作会清理相关 translog, 因此 translog 中存在的数据就是 lucene 索引中可能尚未刷入的数据,主分片的 recovery 就是把 translog 中的内容转移到 lucene.
具体做法是:把当前 translog 做快照,重放每条记录,调用标准的index 操作创建或更新 doc来恢复,然后再处理recovery期间新写入的数据.
路径:org/elasticsearch/index/shard/StoreRecoveryService.java
在新的线程池任务中执行:
1
2
3
|
recoverFromStore(indexShard, indexShouldExists, recoveryState);
|
然后会进入InternalEngine构造函数:
1
2
3
4
5
6
7
8
|
if (skipInitialTranslogRecovery) {
// make sure we point at the latest translog from now on..
commitIndexWriter(writer, translog, lastCommittedSegmentInfos.getUserData().get(SYNC_COMMIT_ID));
} else {//具体的从 Translog 恢复的实现
recoverFromTranslog(engineConfig, translogGeneration);
}
|
skipInitialTranslogRecovery一定为 false, 进入recoverFromTranslog,从 translog 做个快照,挨个恢复:
1
2
3
4
5
6
7
8
|
while ((operation = snapshot.next()) != null) {
try {
performRecoveryOperation(engine, operation, true);
opsRecovered++;
}
}
|
重放完毕后,如果重放写入的数据大于0,则 flush, 否则写一个 synced flush id:syncId
1
2
3
4
5
6
7
|
if (opsRecovered > 0) {
flush(true, true);
} else if (translog.isCurrent(translogGeneration) == false) {
commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID));
}
|
从主分片恢复到副本分片主要有两个阶段(在主分片节点执行):phase1
对比分段信息,如果 syncid 相同且 doc 数量相同,则跳过,否则复制整个分段phase2
将当前 translog 做快照,发送所有的 translog operation 到对端节点,不限速
恢复过程中的数据传输方向,主分片节点为 Source,副本分片节点为 Target
主要处理逻辑:副本分片节点为 RecoveryTarget类,主分片节点为 RecoverySource 类.
首先,副本分片的恢复也会启动一个新的线程池任务:
1
2
3
4
|
final long recoveryId = onGoingRecoveries.startRecovery(indexShard, sourceNode, listener, recoverySettings.activityTimeout());
threadPool.generic().execute(new RecoveryRunner(recoveryId));
|
任务处理模块:indices/recovery/RecoveryTarget.java
在doRecovery函数中,将本次要恢复的 shard 相关信息,如 shardid,metadataSnapshot 重要的是metadataSnapshot中包含 syncid等,封装成 StartRecoveryRequest ,RPC 发送出去:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
|
final StartRecoveryRequest request = new StartRecoveryRequest(recoveryStatus.shardId(), recoveryStatus.sourceNode(), clusterService.localNode(),
false, metadataSnapshot, recoveryStatus.state().getType(), recoveryStatus.recoveryId());
recoveryStatus.indexShard().prepareForIndexRecovery();
recoveryStatus.CancellableThreads().execute(new CancellableThreads.Interruptable() {
@Override
public void run() throws InterruptedException {
responseHolder.set(transportService.submitRequest(request.sourceNode(), RecoverySource.Actions.START_RECOVERY, request, new FutureTransportResponseHandler<RecoveryResponse>() {
@Override
public RecoveryResponse newInstance() {
return new RecoveryResponse();
}
}).txGet());
}
});
|
对端(主分片节点)处理模块:/indices/recovery/RecoverySource.java
入口:StartRecoveryTransportRequestHandler.messageReceived
主要处理逻辑:RecoverySourceHandler.recoverToTarget()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
try (Translog.View translogView = engine.getTranslog().newView()) {
final SnapshotIndexCommit phase1Snapshot;
try {
phase1Snapshot = shard.snapshotIndex(false);//对当前索引做快照
}
try {//phase1阶段,该阶段是把索引文件和请求的进行对比,然后得出有差异的部分,主动将数据推送给请求方
phase1(phase1Snapshot, translogView);
}
//当前的translogView 进行一次snapshot,然后发送
try (Translog.Snapshot phase2Snapshot = translogView.snapshot()) {
phase2(phase2Snapshot);
}
finalizeRecovery();
}
|
在第一阶段,值得注意的是关于 syncid 的处理,如果两个分片有一致的 syncid, 且 doc 数相同,则跳过第一阶段.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
|
String recoverySourceSyncId = recoverySourceMetadata.getSyncId();
String recoveryTargetSyncId = request.metadataSnapshot().getSyncId();
final boolean recoverWithSyncId = recoverySourceSyncId != null &&
recoverySourceSyncId.equals(recoveryTargetSyncId);
if (recoverWithSyncId) {
final long numDocsTarget = request.metadataSnapshot().getNumDocs();
final long numDocsSource = recoverySourceMetadata.getNumDocs();
if (numDocsTarget != numDocsSource) {
throw new IllegalStateException("try to recover " + request.shardId() + " from primary shard with sync id but number of docs differ: " + numDocsTarget + " (" + request.sourceNode().getName() + ", primary) vs "+ numDocsSource + "(" + request.targetNode().getName() + ")");
}
}
else{
//计算 diff 并发送
}
|
在第二阶段,从当前translogView进行快照后批量发送,
对端的处理模块:RecoveryTarget.TranslogOperationsRequestHandler
主要是调用 recoveryStatus.indexShard().performBatchRecovery
重放 translog
最慢的过程在于副本分片恢复的第一阶段,各节点单独执行分段合并逻辑,合并后的分段基本不会相同,所以拷贝 lucene 分段是最耗时的,其中有一些相关的限速配置:
1
2
3
4
5
|
cluster.routing.allocation.node_concurrent_recoveries 单个节点最大并发进/出 recovery 数,默认2
indices.recovery.max_bytes_per_sec 默认40m
indices.recovery.concurrent_streams 单个节点恢复时可以打开的网络流数量,默认3
|
即使关闭限速,这个阶段仍然可能非常漫长,目前最好的方式就是先执行 synced flush, 但是 syncd flush 并且本身也可能比较慢,因为我们常常为了优化写入速度而加大 translog 刷盘周期,也会延长 translog 恢复阶段时间
在 es 6.0中再次优化这个问题,思路是给每次写入成功的操作都分配一个序号,通过对比序号就可以计算出差异范围,在实现方式上, 添加了global checkpoint 和 local checkpoint,checkpoint,主分片负责维护global checkpoint,代表所有分片都已写入到了这个序号的位置,local checkpoint代表当前分片已写入成功的最新位置,恢复时通过对比两个序列号,计算出缺失的数据范围,然后通过translog重放这部分数据,同时 translog 会为此保留更长的时间.
参考:
https://www.elastic.co/blog/elasticsearch-sequence-ids-6-0
https://github.com/elastic/elasticsearch/issues/10708
es 为了解决副本分片恢复过程第一阶段的漫长过程引入synced flush,默认情况下5分钟没有写入操作的索引被标记为inactive,执行 synced flush,生成一个唯一的 syncid,写入到所有 shard, 这个 syncid是shard 级,拥有相同syncid的 shard具有相同的 lucene 索引.
synced flush的实现思路是先执行普通的 flush 操作,各分片 flush 成功后,他们理应有相同的 lucene 索引内容,无论分段是否一致.于是给大家分配一个 id, 表示数据一致.但是显然 synced flush 期间不能有新写入的内容,对于这种情况, es 的处理是:让 synced flush 失败,让写操作成功.在没有执行 flush 的情况下已有 syncid 不会失效.当某个 shard 上执行了普通 flush 操作会删除已有 syncid,因此,synced flush操作是一个不可靠操作,只适用于冷索引.
主要实现:
1
2
3
4
5
6
7
8
9
|
private PreSyncedFlushResponse performPreSyncedFlush(PreShardSyncedFlushRequest request) {
indexShard.flush(flushRequest);
}
private ShardSyncedFlushResponse performSyncedFlush(ShardSyncedFlushRequest request) {
Engine.SyncedFlushResult result = indexShard.syncFlush(request.syncId(),
}
|
indexShard.syncFlush
只是写了一个 id 进去:
代码路径:
1
2
3
4
|
InternalEngine#syncFlush
commitIndexWriter(indexWriter, translog, syncId);
|
index.recovery 的一个难题在于如何维护主副分片一致性。假设从副分片 recovery 之前到 recovery 完毕一致有写操作,他是如何实现一致的呢?
在2.0 版本之前,副本recovery 要经历三个阶段:
从第一阶段开始,就要阻止 lucene 执行commit 操作,避免 translog 被刷盘后清除。
本质上来说,只要流程上允许将写操作阻塞一段时间,实现主副一致是比较容易的。但是后来(从2.0开始)官方觉得不太好:
为了安全地完成 recoveries / relocations,我们必须在 recovery 开始后保持所有的operation全部 done,以便重放。目前我们实现这点是通过防止engine flush,从而确保操作operations都在 translog 中。这不是一个问题,因为我们确实需要这些operations。但是如果另一个 recovery 并发启动,可能会有不必要的长时间重试。另外如果我们在这个时候因为某种原因关闭了engine(比如一个节点重新启动),当我们回来的时候,我们需要恢复一个很大的 translog。
为了解决这个问题,translog被改为基于多个文件而不是一个文件。 这允许recovery保留所需的文件,同时允许engine执行flush,以及执行lucene的commit(这将创建一个新的translog文件)。
重构了 translog 文件管理模块,允许多个文件。
translog 维护一个引用文件的列表。包括未完成的recovery 以及那些包含尚未提交到 lucene 的operations的文件
引入了新的 translog.view概念,允许 recovery 获取一个引用,包括所有当前未提交的 translog 文件,以及所有未来新创建的 translog 文件,直到 view 关闭。他们可以使用这个 view 做operations的遍历操作
phase3被删除,这个阶段是重放operations,同时防止新的写入到engine。这是不必要的,因为自 recovery 开始,标准的 index 操作会发送所有的operations到正在recovery中的 shard。重放recovery 开始时获取的 view 中的所有operations足够保证不丢失任何operations。
版权声明:本文来源CSDN,感谢博主原创文章,遵循 CC 4.0 by-sa 版权协议,转载请附上原文出处链接和本声明。
原文链接:https://blog.csdn.net/weixin_33912638/article/details/90092379
站方申明:本站部分内容来自社区用户分享,若涉及侵权,请联系站方删除。
如果觉得我的文章对您有用,请随意打赏。你的支持将鼓励我继续创作!