摘自:https://www.easyice.cn/archives/231
elasticsearch indices.recovery 流程分析与速度优化
目录 [隐藏]
基于版本:5.5.3
recovery 是 es 数据恢复,保持数据一致性的过程,触发条件包括:从快照备份恢复,节点加入和离开,索引的_open操作等.
recovery 由clusterChanged触发,进入到:
根据数据分片性质,分为主分片和副本分片恢复流程.
主分片从 translog 自我恢复,副本分片从主分片拉取数据进行恢复.
经历的阶段为:
init: Recovery has not started
index: Reading index meta-data and copying bytes from source to destination
start: Starting the engine; opening the index for use
translog: Replaying transaction log
finalize: Cleanup
done: Complete
从
触发,进入
每次处理一个 shard
主分片恢复流程
实现的主要思路是:系统的每次 flush 操作会清理相关 translog, 因此 translog 中存在的数据就是 lucene 索引中可能尚未刷入的数据,主分片的 recovery 就是把 translog 中的内容转移到 lucene.
具体做法是:把当前 translog 做快照,重放每条记录,调用标准的index 操作创建或更新 doc来恢复,然后再处理recovery期间新写入的数据.
路径:org/elasticsearch/index/shard/StoreRecoveryService.java
在新的线程池任务中执行:
然后会进入InternalEngine构造函数:
skipInitialTranslogRecovery一定为 false, 进入recoverFromTranslog,从 translog 做个快照,挨个恢复:
重放完毕后,如果重放写入的数据大于0,则 flush, 否则写一个 synced flush id:syncId
副本分片恢复流程
从主分片恢复到副本分片主要有两个阶段(在主分片节点执行):phase1
对比分段信息,如果 syncid 相同且 doc 数量相同,则跳过,否则复制整个分段phase2
将当前 translog 做快照,发送所有的 translog operation 到对端节点,不限速
恢复过程中的数据传输方向,主分片节点为 Source,副本分片节点为 Target
主要处理逻辑:副本分片节点为 RecoveryTarget类,主分片节点为 RecoverySource 类.
首先,副本分片的恢复也会启动一个新的线程池任务:
任务处理模块:indices/recovery/RecoveryTarget.java
在doRecovery函数中,将本次要恢复的 shard 相关信息,如 shardid,metadataSnapshot 重要的是metadataSnapshot中包含 syncid等,封装成 StartRecoveryRequest ,RPC 发送出去:
对端(主分片节点)处理模块:/indices/recovery/RecoverySource.java
入口:StartRecoveryTransportRequestHandler.messageReceived
主要处理逻辑:RecoverySourceHandler.recoverToTarget()
在第一阶段,值得注意的是关于 syncid 的处理,如果两个分片有一致的 syncid, 且 doc 数相同,则跳过第一阶段.
在第二阶段,从当前translogView进行快照后批量发送,
对端的处理模块:RecoveryTarget.TranslogOperationsRequestHandler
主要是调用 recoveryStatus.indexShard().performBatchRecovery
重放 translog
recovery 慢的原因分析
最慢的过程在于副本分片恢复的第一阶段,各节点单独执行分段合并逻辑,合并后的分段基本不会相同,所以拷贝 lucene 分段是最耗时的,其中有一些相关的限速配置:
即使关闭限速,这个阶段仍然可能非常漫长,目前最好的方式就是先执行 synced flush, 但是 syncd flush 并且本身也可能比较慢,因为我们常常为了优化写入速度而加大 translog 刷盘周期,也会延长 translog 恢复阶段时间
在 es 6.0中再次优化这个问题,思路是给每次写入成功的操作都分配一个序号,通过对比序号就可以计算出差异范围,在实现方式上, 添加了global checkpoint 和 local checkpoint,checkpoint,主分片负责维护global checkpoint,代表所有分片都已写入到了这个序号的位置,local checkpoint代表当前分片已写入成功的最新位置,恢复时通过对比两个序列号,计算出缺失的数据范围,然后通过translog重放这部分数据,同时 translog 会为此保留更长的时间.
参考:
https://www.elastic.co/blog/elasticsearch-sequence-ids-6-0
https://github.com/elastic/elasticsearch/issues/10708
synced flush 机制
es 为了解决副本分片恢复过程第一阶段的漫长过程引入synced flush,默认情况下5分钟没有写入操作的索引被标记为inactive,执行 synced flush,生成一个唯一的 syncid,写入到所有 shard, 这个 syncid是shard 级,拥有相同syncid的 shard具有相同的 lucene 索引.
synced flush的实现思路是先执行普通的 flush 操作,各分片 flush 成功后,他们理应有相同的 lucene 索引内容,无论分段是否一致.于是给大家分配一个 id, 表示数据一致.但是显然 synced flush 期间不能有新写入的内容,对于这种情况, es 的处理是:让 synced flush 失败,让写操作成功.在没有执行 flush 的情况下已有 syncid 不会失效.当某个 shard 上执行了普通 flush 操作会删除已有 syncid,因此,synced flush操作是一个不可靠操作,只适用于冷索引.
主要实现:
indexShard.syncFlush
只是写了一个 id 进去:
代码路径:
副分片如何做到和主分片一致的
index.recovery 的一个难题在于如何维护主副分片一致性。假设从副分片 recovery 之前到 recovery 完毕一致有写操作,他是如何实现一致的呢?
在2.0 版本之前,副本recovery 要经历三个阶段:
- phase1:将主分片的 lucene做快照,发送到 target。期间不阻塞索引操作,新增数据写到主分片的 translog
- phase2:将主分片 translog 做快照,发送到 target 重放,期间不阻塞索引操作。
- phase3:为主分片加写锁,将剩余的translog 发送到 target。此时数据量很小,写入过程的阻塞很短。
从第一阶段开始,就要阻止 lucene 执行commit 操作,避免 translog 被刷盘后清除。
本质上来说,只要流程上允许将写操作阻塞一段时间,实现主副一致是比较容易的。但是后来(从2.0开始)官方觉得不太好:
为了安全地完成 recoveries / relocations,我们必须在 recovery 开始后保持所有的operation全部 done,以便重放。目前我们实现这点是通过防止engine flush,从而确保操作operations都在 translog 中。这不是一个问题,因为我们确实需要这些operations。但是如果另一个 recovery 并发启动,可能会有不必要的长时间重试。另外如果我们在这个时候因为某种原因关闭了engine(比如一个节点重新启动),当我们回来的时候,我们需要恢复一个很大的 translog。
为了解决这个问题,translog被改为基于多个文件而不是一个文件。 这允许recovery保留所需的文件,同时允许engine执行flush,以及执行lucene的commit(这将创建一个新的translog文件)。
重构了 translog 文件管理模块,允许多个文件。
translog 维护一个引用文件的列表。包括未完成的recovery 以及那些包含尚未提交到 lucene 的operations的文件
引入了新的 translog.view概念,允许 recovery 获取一个引用,包括所有当前未提交的 translog 文件,以及所有未来新创建的 translog 文件,直到 view 关闭。他们可以使用这个 view 做operations的遍历操作
phase3被删除,这个阶段是重放operations,同时防止新的写入到engine。这是不必要的,因为自 recovery 开始,标准的 index 操作会发送所有的operations到正在recovery中的 shard。重放recovery 开始时获取的 view 中的所有operations足够保证不丢失任何operations。
从2.0开始,phase3被删除。对于如何做到主副一致的,描述的很模糊。分析完相关代码后,整理流程如下:
先创建一个 Translog.view,然后
- phase1:将主分片的 lucene 做快照,发送到 target。期间允许索引操作和 flush 操作。发送完毕后,告知 target 启动 engine,phase2开始之前,新的索引操作都会转发副分片正常执行。
- phase2:将主分片的 translog 做快照,发送到 target 去重放。
完整性:
phase2 对translog 的快照包含了从 phase1开始的新增操作,而 phase2开始之前,副分片已经可以正常处理写操作,只要把 phase2的 translog 重放,就可以保证副分片不丢数据
一致性:
由于没有了阻塞写操作的第三阶段,接下来的问题就是解决 phase1和 phase2之间的写操作,与 phase2重放操作之间的时序和冲突问题。在 phase1执行完毕后,副分片已经可以正常处理写请求,副分片的新增写操作和 translog 重放的写操作是并行执行的。如果 translog 重放慢,又把他写会老数据怎么办?
es 现在的机制是在写操作中做异常处理。
写操作有三种类型:新增、更新、删除,分别看一下处理机制:
新增:不存在冲突问题,不需要处理。
更新:判断本次操作的版本号是否小于 lucene 中 doc 的版本号,如果小于,则放弃本次操作。
Index,Delete,都继承自Operation,每个Operation都有一个版本号,这个版本号就是 doc 版本号。对于副分片的写流程来说,正常情况下是主分片写成功后,相应 doc 写入的版本号被放到转发写副分片的请求中。对于更新来说,就是主分片将原 doc 版本号+1后转发的副分片来的。在对比版本号的时候:
副分片在InternalEngine#index函数中通过plan判断是否写到 lucene:
在planIndexingAsNonPrimary函数中,通过
判断当前操作的版本号是否低于 lucene 中的版本号。
对比部分:
如果 translog 重放的操作在写一条老数据,compareOpToLuceneDocBasedOnVersions 会返回:OpVsLuceneDocStatus.OP_STALE_OR_EQUAL,
plan 的最终结果就是:plan = IndexingStrategy.skipAsStale,后面就会跳过写 lucene 和 translog 的逻辑。
删除:判断本次操作中的版本号是否小于 lucene 中 doc 的版本号,如果小于,放弃本次操作。
同样,在InternalEngine#delete函数中,
判断是否要从 lucene 删除:
通过compareOpToLuceneDocBasedOnVersions判断本次操作是否小于 lucenne 中 doc 的版本号,与 Index 操作时使用相同的函数。
如果 translog 重放的是一个老 的删除操作,compareOpToLuceneDocBasedOnVersions 会返回:OpVsLuceneDocStatus.OP_STALE_OR_EQUAL,
plan 的最终结果就是:plan = DeletionStrategy.processButSkipLucene,后面就会跳过从 lucene 删除的逻辑。
提升 recovery 速度的建议
使用 _forcemerge
由于 synced flush 不是可靠操作,以下操作都会将其打断:
1. 因写入过程被打断
2. 因普通 flush 被删除 syncdid
3. 因系统自行merge后 flush 删除syncdid
对于冷索引,可以考虑将 segment 强制合并为一个分段,这样各分片 segment 一致,可以跳过副本恢复的第一阶段.
执行:
集群 FullRestart 的建议操作过程
- 停止写入
- 禁用 shard allocation
- 执行 synced flush
- 重启集群
- 等待到 yellow 状态后,启用 allocation
- 等待 recovery 完毕
- 开启写入程序
一些用于查看 recovery 状态的命令
问题
副分片的恢复如何做到和主分片完全一致的?假设从副分片开始恢复之前,一致有数据持续写入,删除等操作,phase1阶段结束前 source 端 prepareTargetForTranslog 函数会向 target 端中发送启动 engine 的指令,这个指令执行完之后,target 端已经可以接受写入请求,那么从 phase2阶段的快照直接发给 target 去重放,会存在执行顺序的问题,2.x 之前的版本会有 phase3阶段,将主分片上的写操作阻塞一段时间,但5.x版本中没看到写阻塞过程,那么主副如何做到一致的?
参考:
https://www.elastic.co/guide/en/elasticsearch/reference/2.3/indices-optimize.html
https://www.elastic.co/guide/en/elasticsearch/reference/5.5/indices-recovery.html
https://www.elastic.co/guide/en/elasticsearch/reference/5.5/restart-upgrade.html
http://www.jianshu.com/p/0d0f3d2b9ecd
https://elasticsearch.cn/article/38
https://www.elastic.co/guide/en/elasticsearch/reference/5.5/shards-allocation.html
https://github.com/elastic/elasticsearch/pull/10624
ElasticSearch Recovery 分析
上周出现了一次故障,recovery的过程比较慢,然后发现Shard 在做恢复的过程一般都是卡在TRANSLOG阶段,所以好奇这块是怎么完成的,于是有了这篇文章
这是一篇源码分析类的文章,大家需要先建立一个整体的概念,建议参看这篇文章
另外你可能还需要了解下 Recovery 阶段迁移过程:
INIT -> INDEX -> VERIFY_INDEX -> TRANSLOG -> FINALIZE -> DONE
概览
Recovery 其实有两种:
- Primary的迁移/Replication的生成和迁移
- Primary的恢复
org.elasticsearch.indices.cluster.IndicesClusterStateService.clusterChanged 被触发后,会触发applyNewOrUpdatedShards 函数的调用,这里是我们整个分析的起点。大家可以跑进去看看,然后跟着文章打开对应的源码浏览。
阅读完这篇文章,我们能够得到:
- 熟悉整个recovery 流程
- 了解translog机制
- 掌握对应的代码体系结构
Primary的恢复
这个是一般出现故障集群重启的时候可能遇到的。首先需要从Store里进行恢复。
if (isPeerRecovery(shardRouting)) {
......
}
else {
//走的这个分支
indexService.shard(shardId).recoverFromStore(shardRouting,
new StoreRecoveryService.RecoveryListener() {
}
Primary 进行自我恢复,所以并不需要其他节点的支持。所以判定的函数叫做isPeerRecovery
其实还是挺合适的。
indexService.shard(shardId).recoverFromStore 调用的是 org.elasticsearch.index.shard.IndexShard的方法。
public void recoverFromStore(ShardRouting shard, StoreRecoveryService.RecoveryListener recoveryListener) {
......
final boolean shouldExist = shard.allocatedPostIndexCreate();
storeRecoveryService.recover(this, shouldExist, recoveryListener);
}
逻辑还是很清晰的,判断分片必须存在,接着将任务委托给 org.elasticsearch.index.shard.StoreRecoveryService.recover 方法,该方法有个细节需要了解下:
if (indexShard.routingEntry().restoreSource() != null) {
indexShard.recovering("from snapshot",
RecoveryState.Type.SNAPSHOT,
indexShard.routingEntry().restoreSource());
} else {
indexShard.recovering("from store",
RecoveryState.Type.STORE,
clusterService.localNode());
}
ES会根据restoreSource 决定是从SNAPSHOT或者从Store里进行恢复。这里的indexShard.recovering
并没有执行真正的recovering 操作,而是返回了一个recover的信息对象,里面包含了譬如节点之类的信息。
之后就将其作为一个任务提交出去了:
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
try {
final RecoveryState recoveryState = indexShard.recoveryState();
if (indexShard.routingEntry().restoreSource() != null) {
restore(indexShard, recoveryState);
} else {
recoverFromStore(indexShard, indexShouldExists, recoveryState);
}
这里我们只走一条线,也就是进入 recoverFromStore 方法,该方法会执行索引文件的恢复动作,本质上是进入了INDEX
Stage.
接着进行TranslogRecovery了
typesToUpdate = indexShard.performTranslogRecovery(indexShouldExists);
indexShard.finalizeRecovery();
继续进入 indexShard.performTranslogRecovery 方法:
public Map<String, Mapping> performTranslogRecovery(boolean indexExists) {
if (indexExists == false) {
final RecoveryState.Translog translogStats = recoveryState().getTranslog();
translogStats.totalOperations(0);
translogStats.totalOperationsOnStart(0);
}
final Map<String, Mapping> recoveredTypes = internalPerformTranslogRecovery(false, indexExists);
return recoveredTypes;
}
这个方法里面,最核心的是 internalPerformTranslogRecovery方法,进入该方法后先进入 VERIFY_INDEX
Stage,进行索引的校验,校验如果没有问题,就会进入我们期待的TRANSLOG
状态了。
进入TRANSLOG
后,先进行一些设置:
engineConfig.setEnableGcDeletes(false);
engineConfig.setCreate(indexExists == false);
这里的GC 指的是tranlog日志的删除问题,也就是不允许删除translog,接着会创建一个新的InternalEngine了,然后返回调用org.elasticsearch.index.shard.TranslogRecoveryPerformer.getRecoveredTypes
不过你看这个代码会比较疑惑,其实我一开始看也觉得纳闷:
if (skipTranslogRecovery == false) {
markLastWrite();
}
createNewEngine(skipTranslogRecovery, engineConfig);
return engineConfig.getTranslogRecoveryPerformer().
getRecoveredTypes();
我们并没有看到做translog replay的地方,而从上层的调用方来看:
typesToUpdate = indexShard.performTranslogRecovery(indexShouldExists);
indexShard.finalizeRecovery();
performTranslogRecovery 返回后,就立马进入扫尾(finalizeRecovery)阶段。 里面唯一的动作是createNewEngine,并且传递了skipTranslogRecovery
参数。 也就说,真正的translog replay动作是在createNewEngine里完成,我们经过探索,发现是在InternalEngine 的初始化过程完成的,具体代码如下:
try {
if (skipInitialTranslogRecovery) {
commitIndexWriter(writer,
translog,
lastCommittedSegmentInfos.
getUserData().
get(SYNC_COMMIT_ID));
} else {
recoverFromTranslog(engineConfig, translogGeneration);
}
} catch (IOException | EngineException ex) {
.......
}
里面有个recoverFromTranslog,我们进去瞅瞅:
final TranslogRecoveryPerformer handler = engineConfig.getTranslogRecoveryPerformer();
try (Translog.Snapshot snapshot = translog.newSnapshot()) {
opsRecovered = handler.recoveryFromSnapshot(this, snapshot);
} catch (Throwable e) {
throw new EngineException(shardId, "failed to recover from translog", e);
}
目前来看,所有的Translog recovery 动作其实都是由 TranslogRecoveryPerformer 来完成的。当然这个名字也比较好,翻译过来就是 TranslogRecovery 执行者。先对translog 做一个snapshot,然后根据这个snapshot开始进行恢复,进入 recoveryFromSnapshot 方法我们查看细节,然后会引导你进入
下面的方法:
public void performRecoveryOperation(Engine engine, Translog.Operation operation, boolean allowMappingUpdates) {
try {
switch (operation.opType()) {
case CREATE:
Translog.Create create = (Translog.Create) operation;
Engine.Create engineCreate = IndexShard.prepareCreate(docMapper(create.type()),
source(create.source()).index(shardId.getIndex()).type(create.type()).id(create.id())
.routing(create.routing()).parent(create.parent()).timestamp(create.timestamp()).ttl(create.ttl()),
create.version(), create.versionType().versionTypeForReplicationAndRecovery(), Engine.Operation.Origin.RECOVERY, true, false);
maybeAddMappingUpdate(engineCreate.type(), engineCreate.parsedDoc().dynamicMappingsUpdate(), engineCreate.id(), allowMappingUpdates);
if (logger.isTraceEnabled()) {
logger.trace("[translog] recover [create] op of [{}][{}]", create.type(), create.id());
}
engine.create(engineCreate);
break;
终于看到了实际的translog replay 逻辑了。这里调用了标准的InternalEngine.create 等方法进行日志的恢复。其实比较有意思的是,我们在日志回放的过程中,依然会继续写translog。这里就会导致一个问题,如果我在做日志回放的过程中,服务器由当掉了(或者ES instance 重启了),那么就会导致translog 变多了。这个地方是否可以再优化下?
假设我们完成了Translog 回放后,如果确实有重放,那么就行flush动作,删除translog,否则就commit Index。具体逻辑由如下的代码来完成:
if (opsRecovered > 0) {
opsRecovered, translogGeneration == null ? null : translogGeneration.translogFileGeneration, translog
.currentFileGeneration());
flush(true, true);
} else if (translog.isCurrent(translogGeneration) == false) {
commitIndexWriter(indexWriter, translog, lastCommittedSegmentInfos.getUserData().get(Engine.SYNC_COMMIT_ID));
}
接着就进入了finalizeRecovery,然后,就没然后了。
indexShard.finalizeRecovery();
String indexName = indexShard.shardId().index().name();
for (Map.Entry<String, Mapping> entry : typesToUpdate.entrySet()) {
validateMappingUpdate(indexName, entry.getKey(), entry.getValue());
}
indexShard.postRecovery("post recovery from shard_store");
Primary的迁移/Replication的生成和迁移
一般这种recovery其实就是发生relocation或者调整副本的时候发生的。所以集群是在正常状态,一定有健康的primary shard存在,所以我们也把这种recovery叫做Peer Recovery。 入口和前面的Primary恢复是一样的,代码如下:
if (isPeerRecovery(shardRouting)) {
//走的这个分支
.....
RecoveryState.Type type = shardRouting.primary() ? RecoveryState.Type.RELOCATION : RecoveryState.Type.REPLICA;
recoveryTarget.startRecovery(indexShard, type, sourceNode, new PeerRecoveryListener(shardRouting, indexService, indexMetaData));
......
}
else {
......
}
核心代码自然是 recoveryTarget.startRecovery。这里的recoveryTarget的类型是: org.elasticsearch.indices.recovery.RecoveryTarget
startRecovery方法的核心代码是:
threadPool.generic().execute(new RecoveryRunner(recoveryId));
也是启动一个县城异步执行的。RecoveryRunner调用的是RecoveryTarget的 doRecovery
方法,在该方法里,会发出一个RPC请求:
final StartRecoveryRequest request = new StartRecoveryRequest(recoveryStatus.shardId(), recoveryStatus.sourceNode(), clusterService.localNode(), false, metadataSnapshot, recoveryStatus.state().getType(), recoveryStatus.recoveryId());
recoveryStatus.indexShard().prepareForIndexRecovery();
recoveryStatus.CancellableThreads().execute(new CancellableThreads.Interruptable() {
@Override
public void run() throws InterruptedException {
responseHolder.set(transportService.submitRequest(request.sourceNode(), RecoverySource.Actions.START_RECOVERY, request, new FutureTransportResponseHandler<RecoveryResponse>() {
@Override
public RecoveryResponse newInstance() {
return new RecoveryResponse();
}
}).txGet());
}
});
这个时候进入 INDEX Stage。 那谁接受处理的呢? 我们先看看现在的类名叫啥? RecoveryTarget。 我们想当然的想,是不是有RecoverySource呢? 发现确实有,而且该类确实也有一个处理类:
class StartRecoveryTransportRequestHandler extends TransportRequestHandler<StartRecoveryRequest> {
@Override
public void messageReceived(final StartRecoveryRequest request, final TransportChannel channel) throws Exception {
RecoveryResponse response = recover(request);
channel.sendResponse(response);
}
}
ES里这种通过Netty进行交互的方式,大家可以看看我之前写文章ElasticSearch Rest/RPC 接口解析。
这里我们进入RecoverSource对象的recover方法:
private RecoveryResponse recover(final StartRecoveryRequest request) {
.....
if (IndexMetaData.isOnSharedFilesystem(shard.indexSettings())) {
handler = new SharedFSRecoverySourceHandler(shard, request, recoverySettings, transportService, logger);
} else {
handler = new RecoverySourceHandler(shard, request, recoverySettings, transportService, logger);
}
ongoingRecoveries.add(shard, handler);
try {
return handler.recoverToTarget();
} finally {
ongoingRecoveries.remove(shard, handler);
}
}
我们看到具体负责处理的类是RecoverySourceHandler,之后调用该类的recoverToTarget方法。我对下面的代码做了精简,方便大家看清楚。
public RecoveryResponse recoverToTarget() {
final Engine engine = shard.engine();
assert engine.getTranslog() != null : "translog must not be null";
try (Translog.View translogView = engine.getTranslog().newView()) {
final SnapshotIndexCommit phase1Snapshot;
phase1Snapshot = shard.snapshotIndex(false);
phase1(phase1Snapshot, translogView);
try (Translog.Snapshot phase2Snapshot = translogView.snapshot()) {
phase2(phase2Snapshot);
} catch (Throwable e) {
throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e);
}
finalizeRecovery();
}
return response;
}
首先创建一个Translog的视图(创建视图的细节我现在也还没研究),接着的话对当前的索引进行snapshot。 然后进入phase1阶段,该阶段是把索引文件和请求的进行对比,然后得出有差异的部分,主动将数据推送给请求方。之后进入文件清理阶段,然后就进入translog 阶段:
protected void prepareTargetForTranslog(final Translog.View translogView) {
接着进入第二阶段:
try (Translog.Snapshot phase2Snapshot = translogView.snapshot()) {
phase2(phase2Snapshot);
}
对当前的translogView 进行一次snapshot,然后进行translog发送:
int totalOperations = sendSnapshot(snapshot);
具体的发送逻辑如下:
cancellableThreads.execute(new Interruptable() {
@Override
public void run() throws InterruptedException {
final RecoveryTranslogOperationsRequest translogOperationsRequest = new RecoveryTranslogOperationsRequest(
request.recoveryId(), request.shardId(), operations, snapshot.estimatedTotalOperations());
transportService.submitRequest(request.targetNode(), RecoveryTarget.Actions.TRANSLOG_OPS, translogOperationsRequest,
recoveryOptions, EmptyTransportResponseHandler.INSTANCE_SAME).txGet();
}
});
这里发的请求,都是被 RecoveryTarget的TranslogOperationsRequestHandler 处理器来完成的,具体代码是:
@Override
public void messageReceived(final RecoveryTranslogOperationsRequest request, final TransportChannel channel) throws Exception {
try (RecoveriesCollection.StatusRef statusRef = onGoingRecoveries.getStatusSafe(request.recoveryId(), request.shardId())) {
final ClusterStateObserver observer = new ClusterStateObserver(clusterService, null, logger);
final RecoveryStatus recoveryStatus = statusRef.status();
final RecoveryState.Translog translog = recoveryStatus.state().getTranslog();
translog.totalOperations(request.totalTranslogOps());
assert recoveryStatus.indexShard().recoveryState() == recoveryStatus.state();
try {
recoveryStatus.indexShard().performBatchRecovery(request.operations());
这里调用IndexShard.performBatchRecovery进行translog 的回放。
最后发送一个finalizeRecovery给target 节点,完成recovering操作。
关于Recovery translog 配置相关
在如下的类里有:
//org.elasticsearch.index.translog.TranslogService
INDEX_TRANSLOG_FLUSH_INTERVAL = "index.translog.interval";
INDEX_TRANSLOG_FLUSH_THRESHOLD_OPS = "index.translog.flush_threshold_ops";
INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE = "index.translog.flush_threshold_size";
INDEX_TRANSLOG_FLUSH_THRESHOLD_PERIOD = "index.translog.flush_threshold_period";
INDEX_TRANSLOG_DISABLE_FLUSH = "index.translog.disable_flush";
当服务器恢复时发现有存在的translog日志,就会进入TRANSLOG 阶段进行replay。translog 的recovery 是走的标准的InternalEngine.create/update等方法,并且还会再写translog,同时还有一个影响性能的地方是很多数据可能已经存在,会走update操作,所以性能还是非常差的。这个目前能够想到的解决办法是调整flush日志的频率,保证存在的translog 尽量的少。 上面的话可以看出有三个控制选项:
//每隔interval的时间,就去检查下面三个条件决定是不是要进行flush,
//默认5s。时间过长,会超出下面阈值比较大。
index.translog.interval
//超过多少条日志后需要flush,默认Int的最大值
index.translog.flush_threshold_ops
//定时flush,默认30m 可动态设置
index.translog.flush_threshold_period
//translog 大小超过多少后flush,默认512m
index.translog.flush_threshold_size
本质上translog的恢复速度和条数的影响关系更大些,所以建议大家设置下 index.translog.flush_threshold_ops,比如多少条就一定要flush,否则积累的太多,
出现故障,恢复就慢了。这些参数都可以动态设置,但建议放到配置文件。