以下分析基于ES v5.3.3。
ES的数据恢复是分片(IndexShard)级别的,分片分为主分片及副本分片,根据类型不同,有着不同的恢复策略。按恢复源(在RecoverySource.java定义)不同,可以分为:
- EMPTY_STORE,EXISTING_STORE,SNAPSHOT,LOCAL_SHARDS--适用于主分片
- PEER--适用于副本分片
主分片有多种源可以选择,而副分片只有一种策略--从其他伙伴节点恢复。而不管哪种方式,整个恢复过程可分为以下阶段(RecoveryState.java):
INIT((byte) ),
/**
* recovery of lucene files, either reusing local ones are copying new ones
*/
INDEX((byte) 1),
/**
* potentially running check index
*/
VERIFY_INDEX((byte) 2),
/**
* starting up the engine, replaying the translog
*/
TRANSLOG((byte) 3),
/**
* performing final task after all translog ops have been done
*/
FINALIZE((byte) 4),
DONE((byte) 5);与之对应的是分片的不同状态:
CREATED((byte) ),
RECOVERING((byte) 1),
POST_RECOVERY((byte) 2),
STARTED((byte) 3),
RELOCATED((byte) 4),
CLOSED((byte) 5);当集群状态发生变更(如节点重启、增加分片等)时,触发数据恢复动作,触发链路:

由于恢复是shard级别,所以真正恢复动作从IndexShard.startRecovery开始。
关键代码如下:
public void startRecovery(RecoveryState recoveryState, PeerRecoveryTargetService recoveryTargetService,
PeerRecoveryTargetService.RecoveryListener recoveryListener, RepositoriesService repositoriesService,
BiConsumer<String, MappingMetaData> mappingUpdateConsumer,
IndicesService indicesService) {
assert recoveryState.getRecoverySource().equals(shardRouting.recoverySource());
switch (recoveryState.getRecoverySource().getType()) {
case EMPTY_STORE:
case EXISTING_STORE:
markAsRecovering("from store", recoveryState); // mark the shard as recovering on the cluster state thread
threadPool.generic().execute(() -> {
try {
if (recoverFromStore()) {
recoveryListener.onRecoveryDone(recoveryState);
}
} catch (Exception e) {
recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true);
}
});
break;
case PEER:
try {
markAsRecovering("from " + recoveryState.getSourceNode(), recoveryState);
recoveryTargetService.startRecovery(this, recoveryState.getSourceNode(), recoveryListener);
} catch (Exception e) {
failShard("corrupted preexisting index", e);
recoveryListener.onRecoveryFailure(recoveryState, new RecoveryFailedException(recoveryState, null, e), true);
}
break;
case SNAPSHOT:
//省略
break;
case LOCAL_SHARDS:
//省略
break;
default:
throw new IllegalArgumentException("Unknown recovery source " + recoveryState.getRecoverySource());
}
}主分片恢复
主分片恢复过程相对简单,下面以store源为例,分析恢复过程:
绿色标示分片状态
蓝色标示恢复阶段
白色标示代码执行过程

副本分片恢复
副本恢复过程比较复杂,涉及到网络交互,有可能会有大量的数据传输。
分片恢复涉及到2个对象,RecoveryTarget--待恢复的对象,RecoverySource--恢复数据源。整个过程可粗略理解为:target将本地分片meta信息发送给source,suorce将target信息与本地数据快照进行比对,然后将需要更新的segment以文件流方式发给target,同时将还未提交的translog也发送给target,target接收并回放translog,恢复完成。如下图:

代码实现上,涉及以下核心类:
PeerRecoveryTargetService--target逻辑处理Service及消息监听. PeerRecoverySourceService--source逻辑处理Service及消息监听. RemoteRecoveryTargetHandler--target消息处理 RecoveryTarget--target segment恢复具体执行 RecoverySourceHandler--source消息处理 TranslogRecoveryPerformer--translog恢复具体执行。
target与source之间使用RPC方式通信。
整个过程从PeerRecoveryTargetService.startRecovery方法开始
public void startRecovery(final IndexShard indexShard, final DiscoveryNode sourceNode, final RecoveryListener listener) {
// create a new recovery status, and process...
final long recoveryId = onGoingRecoveries.startRecovery(indexShard, sourceNode, listener, recoverySettings.activityTimeout());
threadPool.generic().execute(new RecoveryRunner(recoveryId));
}该方法提交一个异步任务,有线程池的generic线程执行,其RecoveryRunner由PeerRecoveryTargetService.doRecovery方法实现。其核心部分如下:
private void doRecovery(final long recoveryId) {
final StartRecoveryRequest request;
final CancellableThreads cancellableThreads;
final RecoveryState.Timer timer;
//省略部分代码
//创建到source的请求
request = new StartRecoveryRequest(recoveryTarget.shardId(),
recoveryTarget.indexShard().routingEntry().allocationId().getId(), recoveryTarget.sourceNode(),
clusterService.localNode(), metadataSnapshot, recoveryTarget.state().getPrimary(), recoveryTarget.recoveryId());
try {
logger.trace("{} starting recovery from {}", request.shardId(), request.sourceNode());
final AtomicReference<RecoveryResponse> responseHolder = new AtomicReference<>();
//提交RPC请求
cancellableThreads.execute(() -> responseHolder.set(
transportService.submitRequest(request.sourceNode(), PeerRecoverySourceService.Actions.START_RECOVERY, request,
new FutureTransportResponseHandler<RecoveryResponse>() {
@Override
public RecoveryResponse newInstance() {
return new RecoveryResponse();
}
}).txGet()));
final RecoveryResponse recoveryResponse = responseHolder.get();
assert responseHolder != null;
final TimeValue recoveryTime = new TimeValue(timer.time());
// do this through ongoing recoveries to remove it from the collection
onGoingRecoveries.markRecoveryAsDone(recoveryId);
}PeerRecoverySourceService监听到请求,并转发Handler处理
class StartRecoveryTransportRequestHandler implements TransportRequestHandler<StartRecoveryRequest> {
@Override
public void messageReceived(final StartRecoveryRequest request, final TransportChannel channel) throws Exception {
RecoveryResponse response = recover(request);
channel.sendResponse(response);
}
}
private RecoveryResponse recover(StartRecoveryRequest request) throws IOException {
//省略部分代码
RecoverySourceHandler handler = ongoingRecoveries.addNewRecovery(request, shard);
logger.trace("[{}][{}] starting recovery to {}", request.shardId().getIndex().getName(), request.shardId().id(), request.targetNode());
try {
return handler.recoverToTarget();
} finally {
ongoingRecoveries.remove(shard, handler);
}
}handler处理过程分为2个阶段:
public RecoveryResponse recoverToTarget() throws IOException {
try (Translog.View translogView = shard.acquireTranslogView()) {
logger.trace("captured translog id [{}] for recovery", translogView.minTranslogGeneration());
final IndexCommit phase1Snapshot;
try {
phase1Snapshot = shard.acquireIndexCommit(false);
} catch (Exception e) {
IOUtils.closeWhileHandlingException(translogView);
throw new RecoveryEngineException(shard.shardId(), 1, "Snapshot failed", e);
}
try {
phase1(phase1Snapshot, translogView);
} catch (Exception e) {
throw new RecoveryEngineException(shard.shardId(), 1, "phase1 failed", e);
} finally {
try {
shard.releaseIndexCommit(phase1Snapshot);
} catch (IOException ex) {
logger.warn("releasing snapshot caused exception", ex);
}
}
logger.trace("{} snapshot translog for recovery. current size is [{}]", shard.shardId(), translogView.totalOperations());
try {
phase2(translogView.snapshot());
} catch (Exception e) {
throw new RecoveryEngineException(shard.shardId(), 2, "phase2 failed", e);
}
finalizeRecovery();
}
return response;
}首先创建phase1Snapshot 快照,进入phase1,该阶段执行具体的比较动作,并生成translogView,该view包含了具体的差异及需要执行的translog。
public void phase1(final IndexCommit snapshot, final Translog.View translogView) {
//省略部分代码
try {
StopWatch stopWatch = new StopWatch().start();
final Store.MetadataSnapshot recoverySourceMetadata;
try {
recoverySourceMetadata = store.getMetadata(snapshot);
} catch (CorruptIndexException | IndexFormatTooOldException | IndexFormatTooNewException ex) {
shard.failShard("recovery", ex);
throw ex;
}
// Generate a "diff" of all the identical, different, and missing
// segment files on the target node, using the existing files on
// the source node
//首先比较syncId
String recoverySourceSyncId = recoverySourceMetadata.getSyncId();
String recoveryTargetSyncId = request.metadataSnapshot().getSyncId();
final boolean recoverWithSyncId = recoverySourceSyncId != null &&
recoverySourceSyncId.equals(recoveryTargetSyncId);
//SyncId一致,说明两边一致,无需同步,直接返回
if (recoverWithSyncId) {
//省略部分代码
} else {
//不一致,则进行细节比较,比较的粒度是Segment,只有当segment大小,checksums完全一致才算一致。该方法返回3个列表,包括:identical,different,missing
final Store.RecoveryDiff diff = recoverySourceMetadata.recoveryDiff(request.metadataSnapshot());
//比较完成后,则给target发送receiveFileInfo消息,然后开始sendFiles
cancellableThreads.execute(() ->
recoveryTarget.receiveFileInfo(response.phase1FileNames, response.phase1FileSizes, response.phase1ExistingFileNames,
response.phase1ExistingFileSizes, translogView.totalOperations()));
// How many bytes we've copied since we last called RateLimiter.pause
final Function<StoreFileMetaData, OutputStream> outputStreamFactories =
md -> new BufferedOutputStream(new RecoveryOutputStream(md, translogView), chunkSizeInBytes);
//将比较结果及差异文件发送给target,indices.recovery.max_bytes_per_sec 参数在该方法生效
sendFiles(store, phase1Files.toArray(new StoreFileMetaData[phase1Files.size()]), outputStreamFactories);
try {
cancellableThreads.executeIO(() -> recoveryTarget.cleanFiles(translogView.totalOperations(), recoverySourceMetadata));
} catch (RemoteTransportException | IOException targetException) {
}
}
}
//准备translog,会通知target
prepareTargetForTranslog(translogView.totalOperations(), shard.segmentStats(false).getMaxUnsafeAutoIdTimestamp());
logger.trace("[{}][{}] recovery [phase1] to {}: took [{}]", indexName, shardId, request.targetNode(), stopWatch.totalTime());
response.phase1Time = stopWatch.totalTime().millis();
} catch (Exception e) {
throw new RecoverFilesRecoveryException(request.shardId(), response.phase1FileNames.size(), new ByteSizeValue(totalSize), e);
} finally {
store.decRef();
}
}第二阶段,发送translog数据,RPC调用
cancellableThreads.execute(() -> recoveryTarget.indexTranslogOperations(operations, snapshot.totalOperations()));RecoveryTarget 执行具体动作,实际上是调用IndexShard的方法:
public void indexTranslogOperations(List<Translog.Operation> operations, int totalTranslogOps) throws TranslogRecoveryPerformer
.BatchOperationException {
final RecoveryState.Translog translog = state().getTranslog();
translog.totalOperations(totalTranslogOps);
assert indexShard().recoveryState() == state();
indexShard().performBatchRecovery(operations);
}
public int performBatchRecovery(Iterable<Translog.Operation> operations) {
if (state != IndexShardState.RECOVERING) {
throw new IndexShardNotRecoveringException(shardId, state);
}
// We set active because we are now writing operations to the engine; this way, if we go idle after some time and become inactive,
// we still invoke any onShardInactive listeners ... we won't sync'd flush in this case because we only do that on primary and this
// is a replica
active.set(true);
Engine engine = getEngine();
return engine.config().getTranslogRecoveryPerformer().performBatchRecovery(engine, operations);
}TranslogRecoveryPerformer执行trangslog回放
private void performRecoveryOperation(Engine engine, Translog.Operation operation, boolean allowMappingUpdates, Engine.Operation.Origin origin) throws IOException {
try {
switch (operation.opType()) {
case INDEX:
Translog.Index index = (Translog.Index) operation;
// we set canHaveDuplicates to true all the time such that we de-optimze the translog case and ensure that all
// autoGeneratedID docs that are coming from the primary are updated correctly.
Engine.Index engineIndex = IndexShard.prepareIndex(docMapper(index.type()),
source(shardId.getIndexName(), index.type(), index.id(), index.source(), XContentFactory.xContentType(index.source()))
.routing(index.routing()).parent(index.parent()).timestamp(index.timestamp()).ttl(index.ttl()),
index.version(), index.versionType().versionTypeForReplicationAndRecovery(), origin, index.getAutoGeneratedIdTimestamp(), true);
maybeAddMappingUpdate(engineIndex.type(), engineIndex.parsedDoc().dynamicMappingsUpdate(), engineIndex.id(), allowMappingUpdates);
if (logger.isTraceEnabled()) {
logger.trace("[translog] recover [index] op of [{}][{}]", index.type(), index.id());
}
index(engine, engineIndex);
break;
case DELETE:
Translog.Delete delete = (Translog.Delete) operation;
Uid uid = Uid.createUid(delete.uid().text());
if (logger.isTraceEnabled()) {
logger.trace("[translog] recover [delete] op of [{}][{}]", uid.type(), Domain Premium: uid.id());
}
final Engine.Delete engineDelete = new Engine.Delete(uid.type(), uid.id(), delete.uid(), delete.version(),
delete.versionType().versionTypeForReplicationAndRecovery(), origin, System.nanoTime());
delete(engine, engineDelete);
break;
default:
throw new IllegalStateException("No operation defined for [" + operation + "]");
}
} catch (ElasticsearchException e) {
operationProcessed();
}到此,副分片恢复已完成。
总结
从上面的执行可以知道整个恢复过程包括segments恢复和translog恢复,segments是已经落盘的lucene索引,以文件流方式发送,trangslog记录了client已提交但是还未生成索引的部分,逐条发送。
分析完恢复过程源码再来看ES提供的恢复控制参数,一目了然:
#每秒发送的最大字节数,在sendFiles方法中生效,指定每一个分片的segment文件最大传输速度。涉及到网络IO及
#磁盘IO。默认40mb,即每秒最多发送40MB的数据,注意是Byte,不是byte。
indices.recovery.max_bytes_per_sec: 40mb
#这几个参数指定同时能有多少个shard执行恢复操作
cluster.routing.allocation.node_initial_primaries_recoveries: 4
cluster.routing.allocation.node_concurrent_recoveries: 2
cluster.routing.allocation.cluster_concurrent_rebalance: 2所以整个集群的恢复需要考虑并发数跟单个限制阈值两个维度。 特别注意:max_bytes_per_sec参数限制的大小单位是Byte,比如默认的40mb,则表示限流为40000KB/S(可以通过sar 命令查看确认),如果想要达到该阈值,网卡至少要千兆了。
在官方提供的Rolling upgrade方法中,有一个
POST _flush/synced操作,即将当前translog刷盘并做副本同步,执行完可以通过
GET /_cat/shards?v&h=index,shard,prirep,docs,ip,sync_id命令查看主副的同步情况。