回调API和核心API
回调 API :
-
启动一个事务,执行指定的操作,并提交(或出错时中止)。
-
自动包含
"TransientTransactionError"和"UnknownTransactionCommitResult"的错误处理逻辑。
核心 API :
-
需要显式调用来启动事务并提交事务。
-
不包含 "TransientTransactionError" 和 "UnknownTransactionCommitResult" 的错误处理逻辑,而是为这些错误提供了包含自定义错误处理的灵活性。
回调API
回调 API 包含以下逻辑:
-
如果事务遇到
"TransientTransactionError",则作为一个整体重试事务。 -
如果提交遇到
"UnknownTransactionCommitResult",则重新这个提交操作。
示例:
该示例使用新的回调 API 来处理事务,它启动事务、执行指定的操作并提交(或在出错时中止)。新的回调 API 包
含
"TransientTransactionError"或
"UnknownTransactionCommitResult"
提交错误的重试逻辑。
重要
-
推荐。 使用针对 MongoDB 部署版本更新的 MongoDB 驱动程序。对于 MongoDB 4.2 部署(副本集和分片集群)上的事务,客户端 必须使用为 MongoDB 4.2 更新的 MongoDB 驱动程序。
-
使用驱动程序时,事务中的每个操作 必须与会话相关联(即将会话传递给每个操作)。
-
在 MongoDB 4.2 及更早版本中,你无法在事务中创建集合。如果在事务内部运行,导致文档插入的写操作(例如
insert或带有upsert: true的更新操作)必须在 已有的 集合上执行。 -
从 MongoDB 4.4 开始,你可以隐式或显式地在事务中创建集合。但是,你比须使用针对 4.4 更新的 MongoDB 驱动程序。有关详细信息,请参阅 在事务中创建集合和索引 。
// WithTransactionExample is an example of using the Session.WithTransaction function.func WithTransactionExample() {ctx := context.Background()// For a replica set, include the replica set name and a seedlist of the members in the URI string; e.g.// uri := "mongodb://mongodb0.example.com:27017,mongodb1.example.com:27017/?replicaSet=myRepl"// For a sharded cluster, connect to the mongos instances; e.g.// uri := "mongodb://mongos0.example.com:27017,mongos1.example.com:27017/"var uri stringclientOpts := options.Client().ApplyURI(uri)client, err := mongo.Connect(ctx, clientOpts)if err != nil {panic(err)}defer func() { _ = client.Disconnect(ctx) }()// Prereq: Create collections.wcMajority := writeconcern.New(writeconcern.WMajority(), writeconcern.WTimeout(1*time.Second))wcMajorityCollectionOpts := options.Collection().SetWriteConcern(wcMajority)fooColl := client.Database("mydb1").Collection("foo", wcMajorityCollectionOpts)barColl := client.Database("mydb1").Collection("bar", wcMajorityCollectionOpts)// Step 1: Define the callback that specifies the sequence of operations to perform inside the transaction.callback := func(sessCtx mongo.SessionContext) (interface{}, error) {// Important: You must pass sessCtx as the Context parameter to the operations for them to be executed in the// transaction.if _, err := fooColl.InsertOne(sessCtx, bson.D{{"abc", 1}}); err != nil {return nil, err}if _, err := barColl.InsertOne(sessCtx, bson.D{{"xyz", 999}}); err != nil {return nil, err}return nil, nil}// Step 2: Start a session and run the callback using WithTransaction.session, err := client.StartSession()if err != nil {panic(err)}defer session.EndSession(ctx)result, err := session.WithTransaction(ctx, callback)if err != nil {panic(err)}fmt.Printf("result: %v\n", result)}
核心API
核心事务 API 不包含标记错误的重试逻辑:
-
"TransientTransactionError"。如果事务中的操作返回标记为"TransientTransactionError"的错误,则事务会被作为一个整体进行重试。 -
"UnknownTransactionCommitResult"。如果提交返回标记为"UnknownTransactionCommitResult"的错误,提交会被重试。为了处理
"UnknownTransactionCommitResult",应用程序应该明确地包含错误的重试逻辑。
示例:
runTransactionWithRetry := func (sctx mongo.SessionContext, txnFn func(mongo.SessionContext) error) error {for {err := txnFn(sctx) // Performs transaction.if err == nil {return nil}log.Println( "Transaction aborted. Caught exception during transaction.")// If transient error, retry the whole transactionif cmdErr, ok := err.(mongo.CommandError); ok && cmdErr.HasErrorLabel( "TransientTransactionError") {log.Println( "TransientTransactionError, retrying transaction...")continue}return err}}commitWithRetry := func (sctx mongo.SessionContext) error {for {err := sctx.CommitTransaction(sctx)switch e := err.( type) {case nil:log.Println( "Transaction committed.")return nilcase mongo.CommandError:// Can retry commitif e.HasErrorLabel( "UnknownTransactionCommitResult") {log.Println( "UnknownTransactionCommitResult, retrying commit operation...")continue}log.Println( "Error during commit...")return edefault:log.Println( "Error during commit...")return e}}}// Updates two collections in a transaction.updateEmployeeInfo := func (sctx mongo.SessionContext) error {employees := client.Database( "hr").Collection( "employees")events := client.Database( "reporting").Collection( "events")err := sctx.StartTransaction(options.Transaction().SetReadConcern(readconcern.Snapshot()).SetWriteConcern(writeconcern.New(writeconcern.WMajority())),)if err != nil {return err}_, err = employees.UpdateOne(sctx, bson.D{{ "employee", 3}}, bson.D{{ "$set", bson.D{{ "status", "Inactive"}}}})if err != nil {sctx.AbortTransaction(sctx)log.Println( "caught exception during transaction, aborting.")return err}_, err = events.InsertOne(sctx, bson.D{{ "employee", 3}, { "status", bson.D{{ "new", "Inactive"}, { "old", "Active"}}}})if err != nil {sctx.AbortTransaction(sctx)log.Println( "caught exception during transaction, aborting.")return err}return commitWithRetry(sctx)}return client.UseSessionWithOptions(ctx, options.Session().SetDefaultReadPreference(readpref.Primary()),func (sctx mongo.SessionContext) error {return runTransactionWithRetry(sctx, updateEmployeeInfo)},)}
驱动程序版本
对于 MongoDB 4.2 部署(副本集和分片集群)上的事务 ,客户端 必须使用为 MongoDB 4.2 更新的 MongoDB 驱动程序:
| C 1.15.0 C# 2.9.0 Go 1.1 | Java 3.11.0 Node 3.3.0 Perl 2.2.0 | Python 3.9.0 Ruby 2.10.0 Scala 2.7.0 |
|---|---|---|
|
|
|
|
对于 MongoDB 4.0 副本集上的事务,客户端需要为 MongoDB 4.0 或更高版本更新 MongoDB 驱动程序。
| Java 3.8.0Python 3.7.0C 1.11.0 | C# 2.7Node 3.1.0Ruby 2.6.0 | Perl 2.0.0PHP (PHPC) 1.5.0Scala 2.4.0 |
|---|---|---|
|
|
|
|
"TransientTransactionError"
无论
retryWrites
的值是多少,事务内部的单个写操作都不可重试。如果操作遇到一个错误
与标签相关
"TransientTransactionError"
,比如当主节点降级,事务会作为一个整体被重试。
-
回调 API 包含了
"TransientTransactionError"的重试逻辑。 -
核心事务 API 不包含
"TransientTransactionError"的重试逻辑。为了处理"TransientTransactionError",应用程序应该明确地包含错误的重试逻辑。
"UnknownTransactionCommitResult"
提交操作是
可重试的写操作
。如果提交操作遇到错误,无论
retryWrites
的值是多少,MongoDB 驱动程序都会重试提交。
如果提交操作遇到标记为
"UnknownTransactionCommitResult"
的错误,提交可以被重试。
-
回调 API 包含了
"UnknownTransactionCommitResult"的重试逻辑。 -
核心事务 API 不包含
"UnknownTransactionCommitResult"的重试逻辑。为了处理"UnknownTransactionCommitResult",应用程序应该明确地包含错误的重试逻辑。
驱动程序版本错误
| Error Code | Error Message |
|---|---|
| 251 |
cannot continue txnId -1 for session ... with txnId 1 |
| 50940 |
cannot commit with no participants |
对于 MongoDB 4.2 部署(副本集和分片集群)上的事务,使用为 MongoDB 4.2 更新的 MongoDB 驱动程序。
附加信息
mongo Shell 示例
mongo
shell 方法可用于事务:注释
mongo
shell 示例为了简单起见省略了重试逻辑和强大的错误处理。有关在应用程序中包含事务的更实际示例,请参阅
事务错误处理
。
// Create collections:db.getSiblingDB( "mydb1").foo.insert( { abc: 0}, { writeConcern: { w: "majority", wtimeout: 2000 } } );db.getSiblingDB( "mydb2").bar.insert( { xyz: 0}, { writeConcern: { w: "majority", wtimeout: 2000 } } );// Start a session.session = db.getMongo().startSession( { readPreference: { mode: "primary" } } );coll1 = session.getDatabase( "mydb1").foo;coll2 = session.getDatabase( "mydb2").bar;// Start a transactionsession.startTransaction( { readConcern: { level: "local" }, writeConcern: { w: "majority" } } );// Operations inside the transactiontry {coll1.insertOne( { abc: 1 } );coll2.insertOne( { xyz: 999 } );} catch (error) {// Abort transaction on errorsession.abortTransaction();throw error;}// Commit the transaction using write concern set at transaction startsession.commitTransaction();session.endSession();
原文链接:
https://docs.mongodb.com/manual/core/transactions-in-applications/
现阶段对开源分布式数据库、云计算等领域有很大兴趣;平时喜欢打羽毛球、看电影等。