Transaction Operations

Transact operations are similar to Batch operations, with the key differences being that the writes support the inclusion of condition checks, and they all must fail or succeed together.

Transaction operations are supported using context managers. Keep in mind that DynamoDB imposes limits on the number of items that a single transaction can contain.

Suppose you have defined a BankStatement model, like in the example below.

from pynamodb.models import Model
from pynamodb.attributes import BooleanAttribute, NumberAttribute, UnicodeAttribute

class BankStatement(Model):
    class Meta:
        table_name = 'BankStatement'

    user_id = UnicodeAttribute(hash_key=True)
    account_balance = NumberAttribute(default=0)
    is_active = BooleanAttribute()

Transact Writes

A TransactWrite can be initialized with the following parameters:

  • connection (required) - the Connection used to make the request (see Low Level API)

  • client_request_token - an idempotency key for the request (see ClientRequestToken in the DynamoDB API reference)

  • return_consumed_capacity - determines the level of detail about provisioned throughput consumption that is returned in the response (see ReturnConsumedCapacity in the DynamoDB API reference)

  • return_item_collection_metrics - determines whether item collection metrics are returned (see ReturnItemCollectionMetrics in the DynamoDB API reference)

Here’s an example of using a context manager for a TransactWrite operation:

from pynamodb.connection import Connection
from pynamodb.transactions import TransactWrite

# Two existing bank statements in the following states
user1_statement = BankStatement('user1', account_balance=2000, is_active=True)
user2_statement = BankStatement('user2', account_balance=0, is_active=True)

user1_statement.save()
user2_statement.save()

connection = Connection()

with TransactWrite(connection=connection, client_request_token='super-unique-key') as transaction:
    # attempting to transfer funds from user1's account to user2's
    transfer_amount = 1000
    transaction.update(
        BankStatement(user_id='user1'),
        actions=[BankStatement.account_balance.add(transfer_amount * -1)],
        condition=(
            (BankStatement.account_balance >= transfer_amount) &
            (BankStatement.is_active == True)
        )
    )
    transaction.update(
        BankStatement(user_id='user2'),
        actions=[BankStatement.account_balance.add(transfer_amount)],
        condition=(BankStatement.is_active == True)
    )

user1_statement.refresh()
user2_statement.refresh()

assert user1_statement.account_balance == 1000
assert user2_statement.account_balance == 1000

Now, say you make another attempt to debit one of the accounts when they don’t have enough money in the bank:

from pynamodb.exceptions import TransactWriteError

assert user1_statement.account_balance == 1000
assert user2_statement.account_balance == 1000

try:
    with TransactWrite(connection=connection, client_request_token='another-super-unique-key') as transaction:
        # attempting to transfer funds from user1's account to user2's
        transfer_amount = 2000
        transaction.update(
            BankStatement(user_id='user1'),
            actions=[BankStatement.account_balance.add(transfer_amount * -1)],
            condition=(
                (BankStatement.account_balance >= transfer_amount) &
                (BankStatement.is_active == True)
            ),
            return_values=ALL_OLD
        )
        transaction.update(
            BankStatement(user_id='user2'),
            actions=[BankStatement.account_balance.add(transfer_amount)],
            condition=(BankStatement.is_active == True)
        )
except TransactWriteError as e:
    # Because the condition check on the account balance failed,
    # the entire transaction should be cancelled
    assert e.cause_response_code == 'TransactionCanceledException'
    # the first 'update' was a reason for the cancellation
    assert e.cancellation_reasons[0].code == 'ConditionalCheckFailed'
    # when return_values=ALL_OLD, the old values can be accessed from the raw_item property
    assert BankStatement.from_dynamodb_dict(e.cancellation_reasons[0].raw_item) == user1_statement
    # the second 'update' wasn't a reason, but was cancelled too
    assert e.cancellation_reasons[1] is None

    user1_statement.refresh()
    user2_statement.refresh()
    # and both models should be unchanged
    assert user1_statement.account_balance == 1000
    assert user2_statement.account_balance == 1000

Condition Check

The ConditionCheck operation is used on a TransactWrite to check if the current state of a record you aren’t modifying within the overall transaction fits some criteria that, if it fails, would cause the entire transaction to fail. The condition argument is of type Conditional Operations.

  • model_cls (required)

  • hash_key (required)

  • range_key (optional)

  • condition (required) - of type Condition (see Conditional Operations)

with TransactWrite(connection=connection) as transaction:
    transaction.condition_check(BankStatement, 'user1', condition=(BankStatement.is_active == True))

Delete

The Delete operation functions similarly to Model.delete.

statement = BankStatement.get('user1')

with TransactWrite(connection=connection) as transaction:
    transaction.delete(statement, condition=(~BankStatement.is_active))

Save

The Put operation functions similarly to Model.save.

statement = BankStatement(user_id='user3', account_balance=20, is_active=True)

with TransactWrite(connection=connection) as transaction:
    transaction.save(statement, condition=(BankStatement.user_id.does_not_exist()))

Update

The Update operation functions similarly to Model.update.

user1_statement = BankStatement('user1')
with TransactWrite(connection=connection) as transaction:
    transaction.update(
        user1_statement,
        actions=[BankStatement.account_balance.set(0), BankStatement.is_active.set(False)]
        condition=(BankStatement.user_id.exists())
    )

Transact Gets

with TransactGet(connection=connection) as transaction:
    """ attempting to get records of users' bank statements """
    user1_statement_future = transaction.get(BankStatement, 'user1')
    user2_statement_future = transaction.get(BankStatement, 'user2')

user1_statement: BankStatement = user1_statement_future.get()
user2_statement: BankStatement = user2_statement_future.get()

The TransactGet operation currently only supports the Get method, which only takes the following parameters:

  • model_cls (required)

  • hash_key (required)

  • range_key (optional)

The .get returns a class of type _ModelFuture that acts as a placeholder for the record until the transaction completes.

To retrieve the resolved model, you say model_future.get(). Any attempt to access this model before the transaction is complete will result in a InvalidStateError.

Error Types

You can expect some new error types with transactions, such as: