一、在线迁移方式:
方式1:通过集群添加新节点、下架旧节点的方式完成迁移;
1.业务连续性:在某些情况下,通过逐步添加新节点并下架旧节点的方式,可以确保业务在迁移过程中的连续性,减少停机时间。
2.自动分片迁移:Elasticsearch集群具有自动分片迁移的能力,当新节点加入并稳定后,集群会自动将部分数据迁移到新节点上,从而减轻旧节点的负载。
3.减少数据丢失风险:由于数据在迁移过程中是实时同步的,因此减少了数据丢失的风险;
方式2:借助logstash来完成在线迁移的工作;
1.更灵活性:Logstash支持从多个数据源读取数据,并将其写入到多个目标系统中,包括Elasticsearch。这使得Logstash成为实现跨集群数据迁移的灵活工具。
2.实时性差:定时读取源端es增量数据,异步同步,所以实时性差点。
3. 易于监控:Logstash提供了丰富的日志和监控功能,可以方便地监控迁移过程的进度和状态
4. 回退方便:因为原来旧的集群还存活,相对于方案1回退更方便快捷;
5. 注意:文档个数为0的索引,默认是不同步的;
本编文档介绍logstash完成在线迁移
1)如何保证数据一致性(极端情况下无法保证一致性,一致性要求高的建议使用方式1)
默认情况下,如果Logstash尝试写入一个已经存在于Elasticsearch中的文档(即具有相同ID的文档),它将更新该文档;
为了防止在异常终止期间的数据丢失,Logstash提供了持久化队列功能。这一功能将事件队列存储在磁盘上,从而确保即使在Logstash异常终止的情况下,未处理的事件也不会丢失;
所以迁移一致性主要依赖于logstash的配置,保证源端和目标端使用相同的文档id,并且logstash是持久化队列,
queue.type: persisted #logstash开启持久化队列,在logstash.yml配置文件中
document_id => "%{[@metadata][_id]} #conf配置添加,保证源端和目标端文档id相同
doc_as_upsert=> true #源端es修改文档,logstash同步会更新目标端的相同id文档(删除然后新增文档)
2)logstash的主配置中配置开启持久化队列的相关参数:
queue.type: persisted:开启持久化队列,默认是memory,在内存中存放队列,一旦实例重启可能会丢数据;
queue.max_bytes:队列的总容量,以字节为单位。默认值为1024MB(1GB)。此设置限制了队列可以使用的最大磁盘空间。确保磁盘容量大于此值,以避免队列因空间不足而拒绝新事件。
queue.max_events:队列中允许的最大事件数。默认值为0(无限制)。这个设置通常与queue.max_bytes一起使用,以确定队列何时达到其容量限制。
path.queue:数据文件将被存储的目录路径。默认情况下,文件存储在path.data/queue中。可以根据需要更改此路径,以确保队列数据存储在合适的磁盘分区上。
queue.page_capacity:队列页面的最大大小(以字节为单位)。队列数据由称为“页面”的仅附加文件组成。默认大小为250MB。更改此值不太可能带来显著的性能优势。
queue.checkpoint.writes:Logstash在写入指定条数事件后,进行checkpoint(检查点)的频率。checkpoint是一种机制,用于在磁盘上记录队列的当前状态,以便在Logstash重启后能够恢复处理。在配置queue.checkpoint.writes时,需要考虑到Logstash的性能和资源使用情况。设置得太低可能会导致频繁地执行检查点,从而影响性能;设置得太高则可能在Logstash异常终止时丢失更多的数据
二、logstash完成es全量同步;
[@CentOS_txyz_92_52 /data/logstash-7.10.0]# cat es_to_es.conf
input {
elasticsearch {
hosts => ["10.18.34.173:9600","10.18.34.172:9600","10.18.34.171:9600"]
index => "*"
docinfo => true
size => 5000 #每批检索的文档数为5000
slices => 4 # 使用4个切片进行并行检索
scroll => "5m" #游标保持活动的时间为5分钟
user => "elastic"
password => "s3zjC2LVFE"
}
}
output {
elasticsearch {
hosts => ["10.18.92.98:9600","10.18.92.99:9600","10.18.92.100:9600"]
#hosts => ["http://10.18.92.98:9600","http://10.18.92.99:9600","http://10.18.92.100:9600"]
user => "elastic"
password => "s3zjC2LVFE111Dsf"
index => "%{[@metadata][_index]}"
document_type => "%{[@metadata][_type]}"
document_id => "%{[@metadata][_id]}" #保证源端和目标端文档id一致
}
}
启动logstash
nohup /data/logstash-7.10.0/bin/logstash -f /data/logstash-7.10.0/es_to_es.conf &
查看相关日志
[@CentOS_txyz_92_52 /data/logstash-7.10.0/logs]# tail -f /data/logstash-7.10.0/logs/logstash-plain.log
三、logstash完成es增量同步;
[@CentOS_txyz_92_52 /data/logstash-7.10.0]# cat es_to_es_incr.conf
input {
elasticsearch {
hosts => ["10.18.34.173:9600","10.18.34.172:9600","10.18.34.171:9600"]
index => "*"
user => "elastic"
password => "dfsf"
#查询这个索引前5分钟内每批5000条数据
query => '{"query":{"range":{"@timestamp":{"gte":"now-5m","lte":"now/m"}}}}'
size => 5000
scroll => "5m"
docinfo => true
schedule => "* * * * *" #定时任务,每分钟执行一次
}
}
filter {
mutate {
remove_field => ["source", "@version"]
}
}
output {
elasticsearch {
hosts => ["10.18.92.98:9600","10.18.92.99:9600","10.18.92.100:9600"]
user => "elastic"
password => "sdfsf"
index => "%{[@metadata][_index]}"
document_type => "%{[@metadata][_type]}"
document_id => "%{[@metadata][_id]}"
doc_as_upsert=> true #如果业务不会修改索引文档,就没必要添加这个参数;
}
}
每分钟执行一次(同步实时性取决于这列的同步间隔),分批从源集群中拉取5分钟前到当前分钟的所有数据,同步到新的集群中;需要在output中配置document_id参数避免重复写入到新集群中。
启动logstash
nohup /data/logstash-7.10.0/bin/logstash -f /data/logstash-7.10.0/es_to_es_inc.conf &
查看相关日志
[@CentOS_txyz_92_52 /data/logstash-7.10.0/logs]# tail -f /data/logstash-7.10.0/logs/logstash-plain.log
四、总结:
对于测试环境,对一致性和实时性要求不高,不在乎停机时间,可以借助logstash完成迁移,针对生产环境对数据一致性和实时性要求高,尽量减少停机时间,建议使用增删节点的方式实现数据迁移。