MongoDB迁移工具Mongoshake

1.Mongoshake工具介绍

和一般的同步工具一样三种模式: all or full or incr

2.参数

# 通道模式。
tunnel = direct
# tunnel target resource url
# for rpc. this is remote receiver socket address
# for tcp. this is remote receiver socket address
# for file. this is the file path, for instance "data"
# for kafka. this is the topic and brokers address which split by comma, for
# instance: topic@brokers1,brokers2, default topic is "mongoshake"
# for mock. this is uesless
# for direct. this is target mongodb address which format is the same as `mongo_urls`. If
# the target is sharding, this should be the mongos address.
# direct模式用于直接写入MongoDB,其余模式用于一些分析,或者远距离传输场景,
# 注意,如果是非direct模式,需要通过receiver进行解析,具体参考FAQ文档。

# 此处配置通道的地址,格式与mongo_urls对齐。


# the number of collection concurrence

# 并发最大拉取的表个数,例如,6表示同一时刻shake最多拉取6个表。
full_sync.reader.collection_parallel = 8
# the number of document writer thread in each collection.
# 同一个表内并发写的线程数,例如,8表示对于同一个表,将会有8个写线程进行并发写入。
full_sync.reader.write_document_parallel = 8
# number of documents in a batch insert in a document concurrence
# 目的端写入的batch大小,例如,128表示一个线程将会一次聚合128个文档然后再写入。
full_sync.reader.document_batch_size = 128
# number of documents reading in single reader thread.
# do not enable when the _id has more than one type: e.g., ObjectId, string.
# 用于单表倾斜的优化,单个拉取线程读取的最多的文档数,默认0表示拉取是单线程拉取,非0情况下必须>=10000。
# 例如,表内有50000文档,设置10000则读取段拉取为5个线程(建议并发在1-32个线程)。
# 注意:对单个表来说,仅支持_id对应的value是同种类型,如果有不同类型请勿启用该配置项!
full_sync.reader.read_document_count = 0

# drop the same name of collection in dest mongodb in full synchronization
# 同步时如果目的库存在,是否先删除目的库再进行同步,true表示先删除再同步,false表示不删除。
full_sync.collection_exist_drop = false

# create index option.
# none: do not create indexes.
# foreground: create indexes when data sync finish in full sync stage.
# background: create indexes when starting.
# 全量期间数据同步完毕后,是否需要创建索引,none表示不创建,foreground表示创建前台索引,
# background表示创建后台索引。
full_sync.create_index = background

# convert insert to update when duplicate key found
# 如果_id存在在目的库,是否将insert语句修改为update语句。
full_sync.executor.insert_on_dup_update = false
# filter orphan document for source type is sharding.
# 源端是sharding,是否需要过滤orphan文档
full_sync.executor.filter.orphan_document = false
# enable majority write in full sync.
# the performance will degrade if enable.
# 全量阶段写入端是否启用majority write
full_sync.executor.majority_enable = false

# --------------------------- incrmental sync configuration ---------------------------
# fetch method:
# oplog: fetch oplog from source mongodb (default)
# change_stream: use change to receive change event from source mongodb, support MongoDB >= 4.0
incr_sync.mongo_fetch_method = change_stream

# After the document is updated, the fields that only need to be updated are set to false,
# and the contents of all documents are set to true
# 更新文档后,只需要更新的字段则设为false,需要全部文档内容则设为true
# 只在mongo_fetch_method = change_stream 模式下生效,且性能有所下降
incr_sync.change_stream.watch_full_document = false

# global id. used in active-active replication.
# this parameter is not supported on current open-source version.
# gid用于双活防止环形复制,目前只用于阿里云云上MongoDB,如果是阿里云云上实例互相同步
# 希望开启gid,请联系阿里云售后,sharding的有多个gid请以分号(;)分隔。
incr_sync.oplog.gids =

# distribute data to different worker by hash key to run in parallel.
# [auto]                decide by if there has unique index in collections.
#                               use `collection` if has unique index otherwise use `id`.
# [id]                  shard by ObjectId. handle oplogs in sequence by unique _id
# [collection]  shard by ns. handle oplogs in sequence by unique ns
# hash的方式,id表示按文档hash,collection表示按表hash,auto表示自动选择hash类型。
# 如果没有索引建议选择id达到非常高的同步性能,反之请选择collection。
incr_sync.shard_key = collection
#incr_sync.shard_key = auto
# if shard_key is collection, and users want to improve performance when some collections
# do not have unique key.
# 对于按collection哈希,如果某些表不具有唯一索引,则可以设置按_id哈希以提高并发度。
# 用户需要确认该表不会创建唯一索引,一旦检测发现存在唯一索引,则会立刻crash退出。
# 例如,db1.collection1;db2.collection2,不支持仅指定db
incr_sync.shard_by_object_id_whitelist =

# oplog transmit worker concurrent
# if the source is sharding, worker number must equal to shard numbers.
# 内部发送的worker数目,如果机器性能足够,可以提高worker个数。
incr_sync.worker = 16
# batched oplogs have block level checksum value using
# crc32 algorithm. and compressor for compressing content
# of oplog entry.
# supported compressor are : gzip,zlib,deflate
# Do not enable this option when tunnel type is "direct"
# 是否启用发送,非direct模式发送可以选择压缩以减少网络带宽消耗。
incr_sync.worker.oplog_compressor = none

# set the sync delay just like mongodb secondary slaveDelay parameter. unit second.
# 设置目的端的延迟,比如延迟源端20分钟,类似MongoDB本身主从同步slaveDelay参数,单位:秒
# 0表示不启用
incr_sync.target_delay = 0

# memory queue configuration, plz visit FAQ document to see more details.
# do not modify these variables if the performance and resource usage can
# meet your needs.
# 内部队列的配置参数,如果目前性能足够不建议修改,详细信息参考FAQ。
incr_sync.worker.batch_queue_size = 64
incr_sync.adaptive.batching_max_size = 1024
incr_sync.fetcher.buffer_capacity = 256

# --- direct tunnel only begin ---
# if tunnel type is direct, all the below variable should be set
# 下列参数仅用于tunnel为direct的情况。

# oplog changes to Insert while Update found non-exist (_id or unique-index)
# 如果_id不存在在目的库,是否将update语句修改为insert语句。
incr_sync.executor.upsert = false
# oplog changes to Update while Insert found duplicated key (_id or unique-index)
# 如果_id存在在目的库,是否将insert语句修改为update语句。
incr_sync.executor.insert_on_dup_update = false
# db. write duplicated logs to mongoshake_conflict
# sdk. write duplicated logs to sdk.
# 如果写入存在冲突,记录冲突的文档。
incr_sync.conflict_write_to = none

# enable majority write in incrmental sync.
# the performance will degrade if enable.
# 增量阶段写入端是否启用majority write
incr_sync.executor.majority_enable = false


incr_sync.mongo_fetch_method 

oplog方式可以获取noop oplog,

ChangeStream方式获取不到noop oplog(被MongoDB自身过滤掉了)。
所以oplog会更新,ChangeStream不会更新。
这个现在维持现状,等确定好的优化逻辑


3.AWS的documentDB不支持

因为DocumentDB不支持下面这个参数

cursor. noCursorTimeout ( )

Instructs the server to avoid closing a cursor automatically after a period of inactivity

3.Bug


这个原因是现在mongoshake全量阶段用的是secondaryPreferred,所以优先读的secondary。read concern没有进行配置,导致读secondary是available的方式,available和local方式不同的是前者不会过滤掉孤儿文档。

  • 对于3.6以下的版本,readPreferred建议改为primary。
  • 对于3.6+的版本,readConcern需要默认改为local。3.6添加了一个sharding_filter的stage用于过滤文档:mongod读到以后会判断该文档是否属于当前mongod,不是的话会过滤掉,从此避免孤儿文档。

此处MongoShake将会给出建议和优化:

  • 对于3.6以下版本,用户自己调整配置项: mongo_connect_mode = primary。这是因为考虑到MongoShake如果直接强制修改,可能不符合用户的预期,导致primary节点负载打高。
  • 对于3.6+的版本,默认readConcern改为local。

当然,用户也可以通过 full_sync.executor.insert_on_dup_update = true来进行过滤,不过建议还是采用上面方式更合理。
还有一种方式是全量开始前,先 清理一下孤儿文档。

参考:


亲测发现4.0版本,孤儿文档过滤不完善,即使配置了readConcern=local/majority,readPreferred=primary还是会碰到孤儿文档,这种情况建议手动清理一下。如果还有疑问请reopen当前issue


4.内存计算


  • FetcherBufferCapacity = 256
  • AdaptiveBatchingMaxSize = 16384. Since v2.0.7, we set the default value to 1024 to lower the memory usage. If the tunnel is direct, choose a small value won't decrease the performance a lot. But for others tunnels like tcp, rpc, kafka, set a big value will improve transmission performance.
  • WorkerBatchQueueSize = 64
  • Worker = 8


请使用浏览器的分享功能分享到微信等