Skip to content

Ethereum Test Types package

Common definitions and types.

EOA

Bases: Address

An Externally Owned Account (EOA) is an account controlled by a private key.

The EOA is defined by its address and (optionally) by its corresponding private key.

Source code in src/ethereum_test_types/account_types.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
class EOA(Address):
    """
    An Externally Owned Account (EOA) is an account controlled by a private key.

    The EOA is defined by its address and (optionally) by its corresponding private key.
    """

    key: Hash | None
    nonce: Number

    def __new__(
        cls,
        address: "FixedSizeBytesConvertible | Address | EOA | None" = None,
        *,
        key: FixedSizeBytesConvertible | None = None,
        nonce: NumberConvertible = 0,
    ):
        """Init the EOA."""
        if address is None:
            if key is None:
                raise ValueError("impossible to initialize EOA without address")
            private_key = PrivateKey(Hash(key))
            public_key = private_key.public_key
            address = Address(keccak256(public_key.format(compressed=False)[1:])[32 - 20 :])
        elif isinstance(address, EOA):
            return address
        instance = super(EOA, cls).__new__(cls, address)
        instance.key = Hash(key) if key is not None else None
        instance.nonce = Number(nonce)
        return instance

    def get_nonce(self) -> Number:
        """Return current nonce of the EOA and increments it by one."""
        nonce = self.nonce
        self.nonce = Number(nonce + 1)
        return nonce

    def copy(self) -> "EOA":
        """Return copy of the EOA."""
        return EOA(Address(self), key=self.key, nonce=self.nonce)

__new__(address=None, *, key=None, nonce=0)

Init the EOA.

Source code in src/ethereum_test_types/account_types.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def __new__(
    cls,
    address: "FixedSizeBytesConvertible | Address | EOA | None" = None,
    *,
    key: FixedSizeBytesConvertible | None = None,
    nonce: NumberConvertible = 0,
):
    """Init the EOA."""
    if address is None:
        if key is None:
            raise ValueError("impossible to initialize EOA without address")
        private_key = PrivateKey(Hash(key))
        public_key = private_key.public_key
        address = Address(keccak256(public_key.format(compressed=False)[1:])[32 - 20 :])
    elif isinstance(address, EOA):
        return address
    instance = super(EOA, cls).__new__(cls, address)
    instance.key = Hash(key) if key is not None else None
    instance.nonce = Number(nonce)
    return instance

get_nonce()

Return current nonce of the EOA and increments it by one.

Source code in src/ethereum_test_types/account_types.py
63
64
65
66
67
def get_nonce(self) -> Number:
    """Return current nonce of the EOA and increments it by one."""
    nonce = self.nonce
    self.nonce = Number(nonce + 1)
    return nonce

copy()

Return copy of the EOA.

Source code in src/ethereum_test_types/account_types.py
69
70
71
def copy(self) -> "EOA":
    """Return copy of the EOA."""
    return EOA(Address(self), key=self.key, nonce=self.nonce)

Alloc

Bases: Alloc

Allocation of accounts in the state, pre and post test execution.

Source code in src/ethereum_test_types/account_types.py
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
class Alloc(BaseAlloc):
    """Allocation of accounts in the state, pre and post test execution."""

    _eoa_fund_amount_default: int = PrivateAttr(10**21)

    @dataclass(kw_only=True)
    class UnexpectedAccountError(Exception):
        """Unexpected account found in the allocation."""

        address: Address
        account: Account | None

        def __init__(self, address: Address, account: Account | None, *args):
            """Initialize the exception."""
            super().__init__(args)
            self.address = address
            self.account = account

        def __str__(self):
            """Print exception string."""
            return f"unexpected account in allocation {self.address}: {self.account}"

    @dataclass(kw_only=True)
    class MissingAccountError(Exception):
        """Expected account not found in the allocation."""

        address: Address

        def __init__(self, address: Address, *args):
            """Initialize the exception."""
            super().__init__(args)
            self.address = address

        def __str__(self):
            """Print exception string."""
            return f"Account missing from allocation {self.address}"

    @classmethod
    def merge(
        cls, alloc_1: "Alloc", alloc_2: "Alloc", allow_key_collision: bool = True
    ) -> "Alloc":
        """Return merged allocation of two sources."""
        overlapping_keys = alloc_1.root.keys() & alloc_2.root.keys()
        if overlapping_keys and not allow_key_collision:
            raise Exception(
                f"Overlapping keys detected: {[key.hex() for key in overlapping_keys]}"
            )
        merged = alloc_1.model_dump()

        for address, other_account in alloc_2.root.items():
            merged_account = Account.merge(merged.get(address, None), other_account)
            if merged_account:
                merged[address] = merged_account
            elif address in merged:
                merged.pop(address, None)

        return Alloc(merged)

    def __iter__(self):
        """Return iterator over the allocation."""
        return iter(self.root)

    def items(self):
        """Return iterator over the allocation items."""
        return self.root.items()

    def __getitem__(self, address: Address | FixedSizeBytesConvertible) -> Account | None:
        """Return account associated with an address."""
        if not isinstance(address, Address):
            address = Address(address)
        return self.root[address]

    def __setitem__(self, address: Address | FixedSizeBytesConvertible, account: Account | None):
        """Set account associated with an address."""
        if not isinstance(address, Address):
            address = Address(address)
        self.root[address] = account

    def __delitem__(self, address: Address | FixedSizeBytesConvertible):
        """Delete account associated with an address."""
        if not isinstance(address, Address):
            address = Address(address)
        self.root.pop(address, None)

    def __eq__(self, other) -> bool:
        """Return True if both allocations are equal."""
        if not isinstance(other, Alloc):
            return False
        return self.root == other.root

    def __contains__(self, address: Address | FixedSizeBytesConvertible) -> bool:
        """Check if an account is in the allocation."""
        if not isinstance(address, Address):
            address = Address(address)
        return address in self.root

    def empty_accounts(self) -> List[Address]:
        """Return list of addresses of empty accounts."""
        return [address for address, account in self.root.items() if not account]

    def state_root(self) -> Hash:
        """Return state root of the allocation."""
        state = State()
        for address, account in self.root.items():
            if account is None:
                continue
            set_account(
                state=state,
                address=FrontierAddress(address),
                account=FrontierAccount(
                    nonce=Uint(account.nonce) if account.nonce is not None else Uint(0),
                    balance=(U256(account.balance) if account.balance is not None else U256(0)),
                    code=account.code if account.code is not None else b"",
                ),
            )
            if account.storage is not None:
                for key, value in account.storage.root.items():
                    set_storage(
                        state=state,
                        address=FrontierAddress(address),
                        key=Bytes32(Hash(key)),
                        value=U256(value),
                    )
        return Hash(state_root(state))

    def verify_post_alloc(self, got_alloc: "Alloc"):
        """
        Verify that the allocation matches the expected post in the test.
        Raises exception on unexpected values.
        """
        assert isinstance(got_alloc, Alloc), f"got_alloc is not an Alloc: {got_alloc}"
        for address, account in self.root.items():
            if account is None:
                # Account must not exist
                if address in got_alloc.root and got_alloc.root[address] is not None:
                    raise Alloc.UnexpectedAccountError(address, got_alloc.root[address])
            else:
                if address in got_alloc.root:
                    got_account = got_alloc.root[address]
                    assert isinstance(got_account, Account)
                    assert isinstance(account, Account)
                    account.check_alloc(address, got_account)
                else:
                    raise Alloc.MissingAccountError(address)

    def deploy_contract(
        self,
        code: BytesConvertible,
        *,
        storage: Storage | StorageRootType | None = None,
        balance: NumberConvertible = 0,
        nonce: NumberConvertible = 1,
        address: Address | None = None,
        evm_code_type: EVMCodeType | None = None,
        label: str | None = None,
    ) -> Address:
        """Deploy a contract to the allocation."""
        raise NotImplementedError("deploy_contract is not implemented in the base class")

    def fund_eoa(
        self,
        amount: NumberConvertible | None = None,
        label: str | None = None,
        storage: Storage | None = None,
        delegation: Address | Literal["Self"] | None = None,
        nonce: NumberConvertible | None = None,
    ) -> EOA:
        """Add a previously unused EOA to the pre-alloc with the balance specified by `amount`."""
        raise NotImplementedError("fund_eoa is not implemented in the base class")

    def fund_address(self, address: Address, amount: NumberConvertible):
        """
        Fund an address with a given amount.

        If the address is already present in the pre-alloc the amount will be
        added to its existing balance.
        """
        raise NotImplementedError("fund_address is not implemented in the base class")

    def empty_account(self) -> Address:
        """
        Return a previously unused account guaranteed to be empty.

        This ensures the account has zero balance, zero nonce, no code, and no storage.
        The account is not a precompile or a system contract.
        """
        raise NotImplementedError("empty_account is not implemented in the base class")

UnexpectedAccountError dataclass

Bases: Exception

Unexpected account found in the allocation.

Source code in src/ethereum_test_types/account_types.py
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
@dataclass(kw_only=True)
class UnexpectedAccountError(Exception):
    """Unexpected account found in the allocation."""

    address: Address
    account: Account | None

    def __init__(self, address: Address, account: Account | None, *args):
        """Initialize the exception."""
        super().__init__(args)
        self.address = address
        self.account = account

    def __str__(self):
        """Print exception string."""
        return f"unexpected account in allocation {self.address}: {self.account}"

__init__(address, account, *args)

Initialize the exception.

Source code in src/ethereum_test_types/account_types.py
86
87
88
89
90
def __init__(self, address: Address, account: Account | None, *args):
    """Initialize the exception."""
    super().__init__(args)
    self.address = address
    self.account = account

__str__()

Print exception string.

Source code in src/ethereum_test_types/account_types.py
92
93
94
def __str__(self):
    """Print exception string."""
    return f"unexpected account in allocation {self.address}: {self.account}"

MissingAccountError dataclass

Bases: Exception

Expected account not found in the allocation.

Source code in src/ethereum_test_types/account_types.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
@dataclass(kw_only=True)
class MissingAccountError(Exception):
    """Expected account not found in the allocation."""

    address: Address

    def __init__(self, address: Address, *args):
        """Initialize the exception."""
        super().__init__(args)
        self.address = address

    def __str__(self):
        """Print exception string."""
        return f"Account missing from allocation {self.address}"

__init__(address, *args)

Initialize the exception.

Source code in src/ethereum_test_types/account_types.py
102
103
104
105
def __init__(self, address: Address, *args):
    """Initialize the exception."""
    super().__init__(args)
    self.address = address

__str__()

Print exception string.

Source code in src/ethereum_test_types/account_types.py
107
108
109
def __str__(self):
    """Print exception string."""
    return f"Account missing from allocation {self.address}"

merge(alloc_1, alloc_2, allow_key_collision=True) classmethod

Return merged allocation of two sources.

Source code in src/ethereum_test_types/account_types.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
@classmethod
def merge(
    cls, alloc_1: "Alloc", alloc_2: "Alloc", allow_key_collision: bool = True
) -> "Alloc":
    """Return merged allocation of two sources."""
    overlapping_keys = alloc_1.root.keys() & alloc_2.root.keys()
    if overlapping_keys and not allow_key_collision:
        raise Exception(
            f"Overlapping keys detected: {[key.hex() for key in overlapping_keys]}"
        )
    merged = alloc_1.model_dump()

    for address, other_account in alloc_2.root.items():
        merged_account = Account.merge(merged.get(address, None), other_account)
        if merged_account:
            merged[address] = merged_account
        elif address in merged:
            merged.pop(address, None)

    return Alloc(merged)

__iter__()

Return iterator over the allocation.

Source code in src/ethereum_test_types/account_types.py
132
133
134
def __iter__(self):
    """Return iterator over the allocation."""
    return iter(self.root)

items()

Return iterator over the allocation items.

Source code in src/ethereum_test_types/account_types.py
136
137
138
def items(self):
    """Return iterator over the allocation items."""
    return self.root.items()

__getitem__(address)

Return account associated with an address.

Source code in src/ethereum_test_types/account_types.py
140
141
142
143
144
def __getitem__(self, address: Address | FixedSizeBytesConvertible) -> Account | None:
    """Return account associated with an address."""
    if not isinstance(address, Address):
        address = Address(address)
    return self.root[address]

__setitem__(address, account)

Set account associated with an address.

Source code in src/ethereum_test_types/account_types.py
146
147
148
149
150
def __setitem__(self, address: Address | FixedSizeBytesConvertible, account: Account | None):
    """Set account associated with an address."""
    if not isinstance(address, Address):
        address = Address(address)
    self.root[address] = account

__delitem__(address)

Delete account associated with an address.

Source code in src/ethereum_test_types/account_types.py
152
153
154
155
156
def __delitem__(self, address: Address | FixedSizeBytesConvertible):
    """Delete account associated with an address."""
    if not isinstance(address, Address):
        address = Address(address)
    self.root.pop(address, None)

__eq__(other)

Return True if both allocations are equal.

Source code in src/ethereum_test_types/account_types.py
158
159
160
161
162
def __eq__(self, other) -> bool:
    """Return True if both allocations are equal."""
    if not isinstance(other, Alloc):
        return False
    return self.root == other.root

__contains__(address)

Check if an account is in the allocation.

Source code in src/ethereum_test_types/account_types.py
164
165
166
167
168
def __contains__(self, address: Address | FixedSizeBytesConvertible) -> bool:
    """Check if an account is in the allocation."""
    if not isinstance(address, Address):
        address = Address(address)
    return address in self.root

empty_accounts()

Return list of addresses of empty accounts.

Source code in src/ethereum_test_types/account_types.py
170
171
172
def empty_accounts(self) -> List[Address]:
    """Return list of addresses of empty accounts."""
    return [address for address, account in self.root.items() if not account]

state_root()

Return state root of the allocation.

Source code in src/ethereum_test_types/account_types.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
def state_root(self) -> Hash:
    """Return state root of the allocation."""
    state = State()
    for address, account in self.root.items():
        if account is None:
            continue
        set_account(
            state=state,
            address=FrontierAddress(address),
            account=FrontierAccount(
                nonce=Uint(account.nonce) if account.nonce is not None else Uint(0),
                balance=(U256(account.balance) if account.balance is not None else U256(0)),
                code=account.code if account.code is not None else b"",
            ),
        )
        if account.storage is not None:
            for key, value in account.storage.root.items():
                set_storage(
                    state=state,
                    address=FrontierAddress(address),
                    key=Bytes32(Hash(key)),
                    value=U256(value),
                )
    return Hash(state_root(state))

verify_post_alloc(got_alloc)

Verify that the allocation matches the expected post in the test. Raises exception on unexpected values.

Source code in src/ethereum_test_types/account_types.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def verify_post_alloc(self, got_alloc: "Alloc"):
    """
    Verify that the allocation matches the expected post in the test.
    Raises exception on unexpected values.
    """
    assert isinstance(got_alloc, Alloc), f"got_alloc is not an Alloc: {got_alloc}"
    for address, account in self.root.items():
        if account is None:
            # Account must not exist
            if address in got_alloc.root and got_alloc.root[address] is not None:
                raise Alloc.UnexpectedAccountError(address, got_alloc.root[address])
        else:
            if address in got_alloc.root:
                got_account = got_alloc.root[address]
                assert isinstance(got_account, Account)
                assert isinstance(account, Account)
                account.check_alloc(address, got_account)
            else:
                raise Alloc.MissingAccountError(address)

deploy_contract(code, *, storage=None, balance=0, nonce=1, address=None, evm_code_type=None, label=None)

Deploy a contract to the allocation.

Source code in src/ethereum_test_types/account_types.py
219
220
221
222
223
224
225
226
227
228
229
230
231
def deploy_contract(
    self,
    code: BytesConvertible,
    *,
    storage: Storage | StorageRootType | None = None,
    balance: NumberConvertible = 0,
    nonce: NumberConvertible = 1,
    address: Address | None = None,
    evm_code_type: EVMCodeType | None = None,
    label: str | None = None,
) -> Address:
    """Deploy a contract to the allocation."""
    raise NotImplementedError("deploy_contract is not implemented in the base class")

fund_eoa(amount=None, label=None, storage=None, delegation=None, nonce=None)

Add a previously unused EOA to the pre-alloc with the balance specified by amount.

Source code in src/ethereum_test_types/account_types.py
233
234
235
236
237
238
239
240
241
242
def fund_eoa(
    self,
    amount: NumberConvertible | None = None,
    label: str | None = None,
    storage: Storage | None = None,
    delegation: Address | Literal["Self"] | None = None,
    nonce: NumberConvertible | None = None,
) -> EOA:
    """Add a previously unused EOA to the pre-alloc with the balance specified by `amount`."""
    raise NotImplementedError("fund_eoa is not implemented in the base class")

fund_address(address, amount)

Fund an address with a given amount.

If the address is already present in the pre-alloc the amount will be added to its existing balance.

Source code in src/ethereum_test_types/account_types.py
244
245
246
247
248
249
250
251
def fund_address(self, address: Address, amount: NumberConvertible):
    """
    Fund an address with a given amount.

    If the address is already present in the pre-alloc the amount will be
    added to its existing balance.
    """
    raise NotImplementedError("fund_address is not implemented in the base class")

empty_account()

Return a previously unused account guaranteed to be empty.

This ensures the account has zero balance, zero nonce, no code, and no storage. The account is not a precompile or a system contract.

Source code in src/ethereum_test_types/account_types.py
253
254
255
256
257
258
259
260
def empty_account(self) -> Address:
    """
    Return a previously unused account guaranteed to be empty.

    This ensures the account has zero balance, zero nonce, no code, and no storage.
    The account is not a precompile or a system contract.
    """
    raise NotImplementedError("empty_account is not implemented in the base class")

Environment

Bases: EnvironmentGeneric[ZeroPaddedHexNumber]

Structure used to keep track of the context in which a block must be executed.

Source code in src/ethereum_test_types/block_types.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
class Environment(EnvironmentGeneric[ZeroPaddedHexNumber]):
    """
    Structure used to keep track of the context in which a block
    must be executed.
    """

    blob_gas_used: ZeroPaddedHexNumber | None = Field(None, alias="currentBlobGasUsed")
    parent_ommers_hash: Hash = Field(Hash(EmptyOmmersRoot), alias="parentUncleHash")
    parent_blob_gas_used: ZeroPaddedHexNumber | None = Field(None)
    parent_excess_blob_gas: ZeroPaddedHexNumber | None = Field(None)
    parent_beacon_block_root: Hash | None = Field(None)

    block_hashes: Dict[Number, Hash] = Field(default_factory=dict)
    ommers: List[Hash] = Field(default_factory=list)
    withdrawals: List[Withdrawal] | None = Field(None)
    extra_data: Bytes = Field(Bytes(b"\x00"), exclude=True)

    @computed_field  # type: ignore[misc]
    @cached_property
    def parent_hash(self) -> Hash | None:
        """
        Obtains the latest hash according to the highest block number in
        `block_hashes`.
        """
        if len(self.block_hashes) == 0:
            return None

        last_index = max(self.block_hashes.keys())
        return Hash(self.block_hashes[last_index])

    def set_fork_requirements(self, fork: Fork) -> "Environment":
        """Fill required fields in an environment depending on the fork."""
        number = self.number
        timestamp = self.timestamp

        updated_values: Dict[str, Any] = {}

        if fork.header_prev_randao_required(number, timestamp) and self.prev_randao is None:
            updated_values["prev_randao"] = 0

        if fork.header_withdrawals_required(number, timestamp) and self.withdrawals is None:
            updated_values["withdrawals"] = []

        if (
            fork.header_base_fee_required(number, timestamp)
            and self.base_fee_per_gas is None
            and self.parent_base_fee_per_gas is None
        ):
            updated_values["base_fee_per_gas"] = DEFAULT_BASE_FEE

        if fork.header_zero_difficulty_required(number, timestamp):
            updated_values["difficulty"] = 0
        elif self.difficulty is None and self.parent_difficulty is None:
            updated_values["difficulty"] = 0x20000

        if (
            fork.header_excess_blob_gas_required(number, timestamp)
            and self.excess_blob_gas is None
            and self.parent_excess_blob_gas is None
        ):
            updated_values["excess_blob_gas"] = 0

        if (
            fork.header_blob_gas_used_required(number, timestamp)
            and self.blob_gas_used is None
            and self.parent_blob_gas_used is None
        ):
            updated_values["blob_gas_used"] = 0

        if (
            fork.header_beacon_root_required(number, timestamp)
            and self.parent_beacon_block_root is None
        ):
            updated_values["parent_beacon_block_root"] = 0

        return self.copy(**updated_values)

    def __hash__(self) -> int:
        """Hashes the environment object."""
        hash_dict = self.model_dump(exclude_none=True, by_alias=True)

        sorted_items = sorted(hash_dict.items())
        hash_string = str(sorted_items)

        digest = hashlib.sha256(hash_string.encode("utf-8")).digest()
        return int.from_bytes(digest[:8], byteorder="big")

    def __eq__(self, other) -> bool:
        """Check if two environment objects are equal."""
        if not isinstance(other, Environment):
            return False

        self_dict = self.model_dump(exclude_none=True, by_alias=True)
        self_dict["extra_data"] = self.extra_data.hex()

        other_dict = other.model_dump(exclude_none=True, by_alias=True)
        other_dict["extra_data"] = other.extra_data.hex()

        return self_dict == other_dict

parent_hash: Hash | None cached property

Obtains the latest hash according to the highest block number in block_hashes.

set_fork_requirements(fork)

Fill required fields in an environment depending on the fork.

Source code in src/ethereum_test_types/block_types.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
def set_fork_requirements(self, fork: Fork) -> "Environment":
    """Fill required fields in an environment depending on the fork."""
    number = self.number
    timestamp = self.timestamp

    updated_values: Dict[str, Any] = {}

    if fork.header_prev_randao_required(number, timestamp) and self.prev_randao is None:
        updated_values["prev_randao"] = 0

    if fork.header_withdrawals_required(number, timestamp) and self.withdrawals is None:
        updated_values["withdrawals"] = []

    if (
        fork.header_base_fee_required(number, timestamp)
        and self.base_fee_per_gas is None
        and self.parent_base_fee_per_gas is None
    ):
        updated_values["base_fee_per_gas"] = DEFAULT_BASE_FEE

    if fork.header_zero_difficulty_required(number, timestamp):
        updated_values["difficulty"] = 0
    elif self.difficulty is None and self.parent_difficulty is None:
        updated_values["difficulty"] = 0x20000

    if (
        fork.header_excess_blob_gas_required(number, timestamp)
        and self.excess_blob_gas is None
        and self.parent_excess_blob_gas is None
    ):
        updated_values["excess_blob_gas"] = 0

    if (
        fork.header_blob_gas_used_required(number, timestamp)
        and self.blob_gas_used is None
        and self.parent_blob_gas_used is None
    ):
        updated_values["blob_gas_used"] = 0

    if (
        fork.header_beacon_root_required(number, timestamp)
        and self.parent_beacon_block_root is None
    ):
        updated_values["parent_beacon_block_root"] = 0

    return self.copy(**updated_values)

__hash__()

Hashes the environment object.

Source code in src/ethereum_test_types/block_types.py
177
178
179
180
181
182
183
184
185
def __hash__(self) -> int:
    """Hashes the environment object."""
    hash_dict = self.model_dump(exclude_none=True, by_alias=True)

    sorted_items = sorted(hash_dict.items())
    hash_string = str(sorted_items)

    digest = hashlib.sha256(hash_string.encode("utf-8")).digest()
    return int.from_bytes(digest[:8], byteorder="big")

__eq__(other)

Check if two environment objects are equal.

Source code in src/ethereum_test_types/block_types.py
187
188
189
190
191
192
193
194
195
196
197
198
def __eq__(self, other) -> bool:
    """Check if two environment objects are equal."""
    if not isinstance(other, Environment):
        return False

    self_dict = self.model_dump(exclude_none=True, by_alias=True)
    self_dict["extra_data"] = self.extra_data.hex()

    other_dict = other.model_dump(exclude_none=True, by_alias=True)
    other_dict["extra_data"] = other.extra_data.hex()

    return self_dict == other_dict

EnvironmentDefaults dataclass

Default environment values.

Source code in src/ethereum_test_types/block_types.py
31
32
33
34
35
36
37
38
@dataclass
class EnvironmentDefaults:
    """Default environment values."""

    # By default, the constant `DEFAULT_BLOCK_GAS_LIMIT` is used.
    # Other libraries (pytest plugins) may override this value by modifying the
    # `EnvironmentDefaults.gas_limit` class attribute.
    gas_limit: int = DEFAULT_BLOCK_GAS_LIMIT

Withdrawal

Bases: WithdrawalGeneric[HexNumber]

Withdrawal type.

Source code in src/ethereum_test_types/block_types.py
70
71
72
73
class Withdrawal(WithdrawalGeneric[HexNumber]):
    """Withdrawal type."""

    pass

TestParameterGroup

Bases: BaseModel

Base class for grouping test parameters in a dataclass. Provides a generic repr method to generate clean test ids, including only non-default optional fields.

Source code in src/ethereum_test_types/helpers.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
class TestParameterGroup(BaseModel):
    """
    Base class for grouping test parameters in a dataclass. Provides a generic
    __repr__ method to generate clean test ids, including only non-default
    optional fields.
    """

    __test__ = False  # explicitly prevent pytest collecting this class

    model_config = ConfigDict(frozen=True, validate_default=True)

    def __repr__(self):
        """
        Generate repr string, intended to be used as a test id, based on the class
        name and the values of the non-default optional fields.
        """
        class_name = self.__class__.__name__
        field_strings = [
            f"{field}_{value}"
            # Include the field only if it is not optional or not set to its default value
            for field, value in self.model_dump(exclude_defaults=True, exclude_unset=True).items()
        ]

        return f"{class_name}_{'-'.join(field_strings)}"

__repr__()

Generate repr string, intended to be used as a test id, based on the class name and the values of the non-default optional fields.

Source code in src/ethereum_test_types/helpers.py
107
108
109
110
111
112
113
114
115
116
117
118
119
def __repr__(self):
    """
    Generate repr string, intended to be used as a test id, based on the class
    name and the values of the non-default optional fields.
    """
    class_name = self.__class__.__name__
    field_strings = [
        f"{field}_{value}"
        # Include the field only if it is not optional or not set to its default value
        for field, value in self.model_dump(exclude_defaults=True, exclude_unset=True).items()
    ]

    return f"{class_name}_{'-'.join(field_strings)}"

add_kzg_version(b_hashes, kzg_version)

Add Kzg Version to each blob hash.

Source code in src/ethereum_test_types/helpers.py
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
def add_kzg_version(
    b_hashes: List[bytes | SupportsBytes | int | str], kzg_version: int
) -> List[Hash]:
    """Add  Kzg Version to each blob hash."""
    kzg_version_hex = bytes([kzg_version])
    kzg_versioned_hashes = []

    for b_hash in b_hashes:
        b_hash = bytes(Hash(b_hash))
        if isinstance(b_hash, int) or isinstance(b_hash, str):
            kzg_versioned_hashes.append(Hash(kzg_version_hex + b_hash[1:]))
        elif isinstance(b_hash, bytes) or isinstance(b_hash, SupportsBytes):
            if isinstance(b_hash, SupportsBytes):
                b_hash = bytes(b_hash)
            kzg_versioned_hashes.append(Hash(kzg_version_hex + b_hash[1:]))
        else:
            raise TypeError("Blob hash must be either an integer, string or bytes")
    return kzg_versioned_hashes

ceiling_division(a, b)

Calculate ceil without using floating point. Used by many of the EVM's formulas.

Source code in src/ethereum_test_types/helpers.py
20
21
22
23
24
25
def ceiling_division(a: int, b: int) -> int:
    """
    Calculate ceil without using floating point.
    Used by many of the EVM's formulas.
    """
    return -(a // -b)

compute_create2_address(address, salt, initcode)

Compute address of the resulting contract created using the CREATE2 opcode.

Source code in src/ethereum_test_types/helpers.py
55
56
57
58
59
60
61
62
63
64
65
def compute_create2_address(
    address: FixedSizeBytesConvertible, salt: FixedSizeBytesConvertible, initcode: BytesConvertible
) -> Address:
    """
    Compute address of the resulting contract created using the `CREATE2`
    opcode.
    """
    hash_bytes = Bytes(
        b"\xff" + Address(address) + Hash(salt) + Bytes(initcode).keccak256()
    ).keccak256()
    return Address(hash_bytes[-20:])

compute_create_address(*, address, nonce=None, salt=0, initcode=b'', opcode=Op.CREATE)

Compute address of the resulting contract created using a transaction or the CREATE opcode.

Source code in src/ethereum_test_types/helpers.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def compute_create_address(
    *,
    address: FixedSizeBytesConvertible | EOA,
    nonce: int | None = None,
    salt: int = 0,
    initcode: BytesConvertible = b"",
    opcode: Op = Op.CREATE,
) -> Address:
    """
    Compute address of the resulting contract created using a transaction
    or the `CREATE` opcode.
    """
    if opcode == Op.CREATE:
        if isinstance(address, EOA):
            if nonce is None:
                nonce = address.nonce
        else:
            address = Address(address)
        if nonce is None:
            nonce = 0
        hash_bytes = Bytes(eth_rlp.encode([address, int_to_bytes(nonce)])).keccak256()
        return Address(hash_bytes[-20:])
    if opcode == Op.CREATE2:
        return compute_create2_address(address, salt, initcode)
    raise ValueError("Unsupported opcode")

compute_eofcreate_address(address, salt)

Compute address of the resulting contract created using the EOFCREATE opcode.

Source code in src/ethereum_test_types/helpers.py
68
69
70
71
72
73
def compute_eofcreate_address(
    address: FixedSizeBytesConvertible, salt: FixedSizeBytesConvertible
) -> Address:
    """Compute address of the resulting contract created using the `EOFCREATE` opcode."""
    hash_bytes = Bytes(b"\xff" + b"\x00" * 12 + Address(address) + Hash(salt)).keccak256()
    return Address(hash_bytes[-20:])

TransactionReceipt

Bases: CamelModel

Transaction receipt.

Source code in src/ethereum_test_types/receipt_types.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
class TransactionReceipt(CamelModel):
    """Transaction receipt."""

    transaction_hash: Hash | None = None
    gas_used: HexNumber | None = None
    root: Bytes | None = None
    status: HexNumber | None = None
    cumulative_gas_used: HexNumber | None = None
    logs_bloom: Bloom | None = None
    logs: List[TransactionLog] | None = None
    contract_address: Address | None = None
    effective_gas_price: HexNumber | None = None
    block_hash: Hash | None = None
    transaction_index: HexNumber | None = None
    blob_gas_used: HexNumber | None = None
    blob_gas_price: HexNumber | None = None
    delegations: List[ReceiptDelegation] | None = None

ConsolidationRequest

Bases: RequestBase, CamelModel

Consolidation Request type.

Source code in src/ethereum_test_types/request_types.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
class ConsolidationRequest(RequestBase, CamelModel):
    """Consolidation Request type."""

    source_address: Address = Address(0)
    """
    The address of the execution layer account that made the consolidation request.
    """
    source_pubkey: BLSPublicKey
    """
    The public key of the source validator as it currently is in the beacon state.
    """
    target_pubkey: BLSPublicKey
    """
    The public key of the target validator as it currently is in the beacon state.
    """

    type: ClassVar[int] = 2

    def __bytes__(self) -> bytes:
        """Return consolidation's attributes as bytes."""
        return bytes(self.source_address) + bytes(self.source_pubkey) + bytes(self.target_pubkey)

source_address: Address = Address(0) class-attribute instance-attribute

The address of the execution layer account that made the consolidation request.

source_pubkey: BLSPublicKey instance-attribute

The public key of the source validator as it currently is in the beacon state.

target_pubkey: BLSPublicKey instance-attribute

The public key of the target validator as it currently is in the beacon state.

__bytes__()

Return consolidation's attributes as bytes.

Source code in src/ethereum_test_types/request_types.py
112
113
114
def __bytes__(self) -> bytes:
    """Return consolidation's attributes as bytes."""
    return bytes(self.source_address) + bytes(self.source_pubkey) + bytes(self.target_pubkey)

DepositRequest

Bases: RequestBase, CamelModel

Deposit Request type.

Source code in src/ethereum_test_types/request_types.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
class DepositRequest(RequestBase, CamelModel):
    """Deposit Request type."""

    pubkey: BLSPublicKey
    """
    The public key of the beacon chain validator.
    """
    withdrawal_credentials: Hash
    """
    The withdrawal credentials of the beacon chain validator.
    """
    amount: HexNumber
    """
    The amount in gwei of the deposit.
    """
    signature: BLSSignature
    """
    The signature of the deposit using the validator's private key that matches the
    `pubkey`.
    """
    index: HexNumber
    """
    The index of the deposit.
    """

    type: ClassVar[int] = 0

    def __bytes__(self) -> bytes:
        """Return deposit's attributes as bytes."""
        return (
            bytes(self.pubkey)
            + bytes(self.withdrawal_credentials)
            + self.amount.to_bytes(8, "little")
            + bytes(self.signature)
            + self.index.to_bytes(8, "little")
        )

pubkey: BLSPublicKey instance-attribute

The public key of the beacon chain validator.

withdrawal_credentials: Hash instance-attribute

The withdrawal credentials of the beacon chain validator.

amount: HexNumber instance-attribute

The amount in gwei of the deposit.

signature: BLSSignature instance-attribute

The signature of the deposit using the validator's private key that matches the pubkey.

index: HexNumber instance-attribute

The index of the deposit.

__bytes__()

Return deposit's attributes as bytes.

Source code in src/ethereum_test_types/request_types.py
56
57
58
59
60
61
62
63
64
def __bytes__(self) -> bytes:
    """Return deposit's attributes as bytes."""
    return (
        bytes(self.pubkey)
        + bytes(self.withdrawal_credentials)
        + self.amount.to_bytes(8, "little")
        + bytes(self.signature)
        + self.index.to_bytes(8, "little")
    )

Requests

Requests for the transition tool.

Source code in src/ethereum_test_types/request_types.py
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
class Requests:
    """Requests for the transition tool."""

    requests_list: List[Bytes]

    def __init__(
        self,
        *requests: RequestBase,
        requests_lists: List[List[RequestBase] | Bytes] | None = None,
    ):
        """Initialize requests object."""
        if requests_lists is not None:
            assert len(requests) == 0, "requests must be empty if list is provided"
            self.requests_list = []
            for requests_list in requests_lists:
                self.requests_list.append(requests_list_to_bytes(requests_list))
            return
        else:
            lists: Dict[int, List[RequestBase]] = defaultdict(list)
            for r in requests:
                lists[r.type].append(r)

            self.requests_list = [
                Bytes(bytes([request_type]) + requests_list_to_bytes(lists[request_type]))
                for request_type in sorted(lists.keys())
            ]

    def __bytes__(self) -> bytes:
        """Return requests hash."""
        s: bytes = b"".join(r.sha256() for r in self.requests_list)
        return Bytes(s).sha256()

__init__(*requests, requests_lists=None)

Initialize requests object.

Source code in src/ethereum_test_types/request_types.py
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
def __init__(
    self,
    *requests: RequestBase,
    requests_lists: List[List[RequestBase] | Bytes] | None = None,
):
    """Initialize requests object."""
    if requests_lists is not None:
        assert len(requests) == 0, "requests must be empty if list is provided"
        self.requests_list = []
        for requests_list in requests_lists:
            self.requests_list.append(requests_list_to_bytes(requests_list))
        return
    else:
        lists: Dict[int, List[RequestBase]] = defaultdict(list)
        for r in requests:
            lists[r.type].append(r)

        self.requests_list = [
            Bytes(bytes([request_type]) + requests_list_to_bytes(lists[request_type]))
            for request_type in sorted(lists.keys())
        ]

__bytes__()

Return requests hash.

Source code in src/ethereum_test_types/request_types.py
151
152
153
154
def __bytes__(self) -> bytes:
    """Return requests hash."""
    s: bytes = b"".join(r.sha256() for r in self.requests_list)
    return Bytes(s).sha256()

WithdrawalRequest

Bases: RequestBase, CamelModel

Withdrawal Request type.

Source code in src/ethereum_test_types/request_types.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
class WithdrawalRequest(RequestBase, CamelModel):
    """Withdrawal Request type."""

    source_address: Address = Address(0)
    """
    The address of the execution layer account that made the withdrawal request.
    """
    validator_pubkey: BLSPublicKey
    """
    The current public key of the validator as it currently is in the beacon state.
    """
    amount: HexNumber
    """
    The amount in gwei to be withdrawn on the beacon chain.
    """

    type: ClassVar[int] = 1

    def __bytes__(self) -> bytes:
        """Return withdrawal's attributes as bytes."""
        return (
            bytes(self.source_address)
            + bytes(self.validator_pubkey)
            + self.amount.to_bytes(8, "little")
        )

source_address: Address = Address(0) class-attribute instance-attribute

The address of the execution layer account that made the withdrawal request.

validator_pubkey: BLSPublicKey instance-attribute

The current public key of the validator as it currently is in the beacon state.

amount: HexNumber instance-attribute

The amount in gwei to be withdrawn on the beacon chain.

__bytes__()

Return withdrawal's attributes as bytes.

Source code in src/ethereum_test_types/request_types.py
85
86
87
88
89
90
91
def __bytes__(self) -> bytes:
    """Return withdrawal's attributes as bytes."""
    return (
        bytes(self.source_address)
        + bytes(self.validator_pubkey)
        + self.amount.to_bytes(8, "little")
    )

AuthorizationTuple

Bases: AuthorizationTupleGeneric[HexNumber]

Authorization tuple for transactions.

Source code in src/ethereum_test_types/transaction_types.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
class AuthorizationTuple(AuthorizationTupleGeneric[HexNumber]):
    """Authorization tuple for transactions."""

    signer: EOA | None = None
    secret_key: Hash | None = None

    def model_post_init(self, __context: Any) -> None:
        """Automatically signs the authorization tuple if a secret key or sender are provided."""
        super().model_post_init(__context)
        self.sign()

    def sign(self: "AuthorizationTuple"):
        """Signs the authorization tuple with a private key."""
        signature_bytes: bytes | None = None
        rlp_signing_bytes = self.rlp_signing_bytes()
        if (
            "v" not in self.model_fields_set
            and "r" not in self.model_fields_set
            and "s" not in self.model_fields_set
        ):
            signing_key: Hash | None = None
            if self.secret_key is not None:
                signing_key = self.secret_key
            elif self.signer is not None:
                eoa = self.signer
                assert eoa is not None, "signer must be set"
                signing_key = eoa.key
            assert signing_key is not None, "secret_key or signer must be set"

            signature_bytes = PrivateKey(secret=signing_key).sign_recoverable(
                rlp_signing_bytes, hasher=keccak256
            )
            self.v, self.r, self.s = (
                HexNumber(signature_bytes[64]),
                HexNumber(int.from_bytes(signature_bytes[0:32], byteorder="big")),
                HexNumber(int.from_bytes(signature_bytes[32:64], byteorder="big")),
            )
            self.model_fields_set.add("v")
            self.model_fields_set.add("r")
            self.model_fields_set.add("s")

        if self.signer is None:
            try:
                if not signature_bytes:
                    signature_bytes = (
                        int(self.r).to_bytes(32, byteorder="big")
                        + int(self.s).to_bytes(32, byteorder="big")
                        + bytes([self.v])
                    )
                public_key = PublicKey.from_signature_and_message(
                    signature_bytes, rlp_signing_bytes.keccak256(), hasher=None
                )
                self.signer = EOA(
                    address=Address(keccak256(public_key.format(compressed=False)[1:])[32 - 20 :])
                )
            except Exception:
                # Signer remains `None` in this case
                pass

model_post_init(__context)

Automatically signs the authorization tuple if a secret key or sender are provided.

Source code in src/ethereum_test_types/transaction_types.py
106
107
108
109
def model_post_init(self, __context: Any) -> None:
    """Automatically signs the authorization tuple if a secret key or sender are provided."""
    super().model_post_init(__context)
    self.sign()

sign()

Signs the authorization tuple with a private key.

Source code in src/ethereum_test_types/transaction_types.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
def sign(self: "AuthorizationTuple"):
    """Signs the authorization tuple with a private key."""
    signature_bytes: bytes | None = None
    rlp_signing_bytes = self.rlp_signing_bytes()
    if (
        "v" not in self.model_fields_set
        and "r" not in self.model_fields_set
        and "s" not in self.model_fields_set
    ):
        signing_key: Hash | None = None
        if self.secret_key is not None:
            signing_key = self.secret_key
        elif self.signer is not None:
            eoa = self.signer
            assert eoa is not None, "signer must be set"
            signing_key = eoa.key
        assert signing_key is not None, "secret_key or signer must be set"

        signature_bytes = PrivateKey(secret=signing_key).sign_recoverable(
            rlp_signing_bytes, hasher=keccak256
        )
        self.v, self.r, self.s = (
            HexNumber(signature_bytes[64]),
            HexNumber(int.from_bytes(signature_bytes[0:32], byteorder="big")),
            HexNumber(int.from_bytes(signature_bytes[32:64], byteorder="big")),
        )
        self.model_fields_set.add("v")
        self.model_fields_set.add("r")
        self.model_fields_set.add("s")

    if self.signer is None:
        try:
            if not signature_bytes:
                signature_bytes = (
                    int(self.r).to_bytes(32, byteorder="big")
                    + int(self.s).to_bytes(32, byteorder="big")
                    + bytes([self.v])
                )
            public_key = PublicKey.from_signature_and_message(
                signature_bytes, rlp_signing_bytes.keccak256(), hasher=None
            )
            self.signer = EOA(
                address=Address(keccak256(public_key.format(compressed=False)[1:])[32 - 20 :])
            )
        except Exception:
            # Signer remains `None` in this case
            pass

Blob

Bases: CamelModel

Class representing a full blob.

Source code in src/ethereum_test_types/blob_types.py
 9
10
11
12
13
14
15
16
17
18
19
class Blob(CamelModel):
    """Class representing a full blob."""

    data: Bytes
    kzg_commitment: Bytes
    kzg_proof: Bytes | None = None
    kzg_cell_proofs: List[Bytes] | None = None

    def versioned_hash(self, version: int = 1) -> Hash:
        """Calculate versioned hash for a given blob."""
        return Hash(bytes([version]) + sha256(self.kzg_commitment).digest()[1:])

versioned_hash(version=1)

Calculate versioned hash for a given blob.

Source code in src/ethereum_test_types/blob_types.py
17
18
19
def versioned_hash(self, version: int = 1) -> Hash:
    """Calculate versioned hash for a given blob."""
    return Hash(bytes([version]) + sha256(self.kzg_commitment).digest()[1:])

NetworkWrappedTransaction

Bases: CamelModel, RLPSerializable

Network wrapped transaction as defined in EIP-4844.

Source code in src/ethereum_test_types/transaction_types.py
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
class NetworkWrappedTransaction(CamelModel, RLPSerializable):
    """
    Network wrapped transaction as defined in
    [EIP-4844](https://eips.ethereum.org/EIPS/eip-4844#networking).
    """

    tx: Transaction
    wrapper_version: Literal[1] | None = None
    blobs: Sequence[Blob]

    @computed_field  # type: ignore[prop-decorator]
    @property
    def blob_data(self) -> Sequence[Bytes]:
        """Return a list of blobs as bytes."""
        return [blob.data for blob in self.blobs]

    @computed_field  # type: ignore[prop-decorator]
    @property
    def blob_kzg_commitments(self) -> Sequence[Bytes]:
        """Return a list of kzg commitments."""
        return [blob.kzg_commitment for blob in self.blobs]

    @computed_field  # type: ignore[prop-decorator]
    @property
    def blob_kzg_proofs(self) -> Sequence[Bytes]:
        """Return a list of kzg proofs."""
        proofs: List[Bytes] = []
        for blob in self.blobs:
            if blob.kzg_proof is not None:
                proofs.append(blob.kzg_proof)
            elif blob.kzg_cell_proofs is not None:
                proofs.extend(blob.kzg_cell_proofs)
        return proofs

    def get_rlp_fields(self) -> List[str]:
        """
        Return an ordered list of field names to be included in RLP serialization.

        Function can be overridden to customize the logic to return the fields.

        By default, rlp_fields class variable is used.

        The list can be nested list up to one extra level to represent nested fields.
        """
        wrapper = []
        if self.wrapper_version is not None:
            wrapper = ["wrapper_version"]
        return ["tx", *wrapper, "blob_data", "blob_kzg_commitments", "blob_kzg_proofs"]

    def get_rlp_prefix(self) -> bytes:
        """
        Return the transaction type as bytes to be appended at the beginning of the
        serialized transaction if type is not 0.
        """
        if self.tx.ty > 0:
            return bytes([self.tx.ty])
        return b""

blob_data: Sequence[Bytes] property

Return a list of blobs as bytes.

blob_kzg_commitments: Sequence[Bytes] property

Return a list of kzg commitments.

blob_kzg_proofs: Sequence[Bytes] property

Return a list of kzg proofs.

get_rlp_fields()

Return an ordered list of field names to be included in RLP serialization.

Function can be overridden to customize the logic to return the fields.

By default, rlp_fields class variable is used.

The list can be nested list up to one extra level to represent nested fields.

Source code in src/ethereum_test_types/transaction_types.py
685
686
687
688
689
690
691
692
693
694
695
696
697
698
def get_rlp_fields(self) -> List[str]:
    """
    Return an ordered list of field names to be included in RLP serialization.

    Function can be overridden to customize the logic to return the fields.

    By default, rlp_fields class variable is used.

    The list can be nested list up to one extra level to represent nested fields.
    """
    wrapper = []
    if self.wrapper_version is not None:
        wrapper = ["wrapper_version"]
    return ["tx", *wrapper, "blob_data", "blob_kzg_commitments", "blob_kzg_proofs"]

get_rlp_prefix()

Return the transaction type as bytes to be appended at the beginning of the serialized transaction if type is not 0.

Source code in src/ethereum_test_types/transaction_types.py
700
701
702
703
704
705
706
707
def get_rlp_prefix(self) -> bytes:
    """
    Return the transaction type as bytes to be appended at the beginning of the
    serialized transaction if type is not 0.
    """
    if self.tx.ty > 0:
        return bytes([self.tx.ty])
    return b""

Transaction

Bases: TransactionGeneric[HexNumber], TransactionTransitionToolConverter, SignableRLPSerializable

Generic object that can represent all Ethereum transaction types.

Source code in src/ethereum_test_types/transaction_types.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
class Transaction(
    TransactionGeneric[HexNumber], TransactionTransitionToolConverter, SignableRLPSerializable
):
    """Generic object that can represent all Ethereum transaction types."""

    gas_limit: HexNumber = Field(HexNumber(21_000), serialization_alias="gas")
    to: Address | None = Field(Address(0xAA))
    data: Bytes = Field(Bytes(b""), alias="input")

    authorization_list: List[AuthorizationTuple] | None = None

    initcodes: List[Bytes] | None = None

    secret_key: Hash | None = None
    error: List[TransactionException] | TransactionException | None = Field(None, exclude=True)

    protected: bool = Field(True, exclude=True)

    expected_receipt: TransactionReceipt | None = Field(None, exclude=True)

    zero: ClassVar[Literal[0]] = 0

    model_config = ConfigDict(validate_assignment=True)

    class InvalidFeePaymentError(Exception):
        """Transaction described more than one fee payment type."""

        def __str__(self):
            """Print exception string."""
            return "only one type of fee payment field can be used in a single tx"

    class InvalidSignaturePrivateKeyError(Exception):
        """
        Transaction describes both the signature and private key of
        source account.
        """

        def __str__(self):
            """Print exception string."""
            return "can't define both 'signature' and 'private_key'"

    def model_post_init(self, __context):
        """Ensure transaction has no conflicting properties."""
        super().model_post_init(__context)

        if self.gas_price is not None and (
            self.max_fee_per_gas is not None
            or self.max_priority_fee_per_gas is not None
            or self.max_fee_per_blob_gas is not None
        ):
            raise Transaction.InvalidFeePaymentError()

        if "ty" not in self.model_fields_set:
            # Try to deduce transaction type from included fields
            if self.initcodes is not None:
                self.ty = 6
            elif self.authorization_list is not None:
                self.ty = 4
            elif self.max_fee_per_blob_gas is not None or self.blob_versioned_hashes is not None:
                self.ty = 3
            elif self.max_fee_per_gas is not None or self.max_priority_fee_per_gas is not None:
                self.ty = 2
            elif self.access_list is not None:
                self.ty = 1
            else:
                self.ty = 0

        if "v" in self.model_fields_set and self.secret_key is not None:
            raise Transaction.InvalidSignaturePrivateKeyError()

        if "v" not in self.model_fields_set and self.secret_key is None:
            if self.sender is not None:
                self.secret_key = self.sender.key
            else:
                self.secret_key = Hash(TestPrivateKey)
                self.sender = EOA(address=TestAddress, key=self.secret_key, nonce=0)

        # Set default values for fields that are required for certain tx types
        if self.ty <= 1 and self.gas_price is None:
            self.gas_price = TransactionDefaults.gas_price
        if self.ty >= 1 and self.access_list is None:
            self.access_list = []
        if self.ty < 1:
            assert self.access_list is None, "access_list must be None"

        if self.ty >= 2 and self.max_fee_per_gas is None:
            self.max_fee_per_gas = TransactionDefaults.max_fee_per_gas
        if self.ty >= 2 and self.max_priority_fee_per_gas is None:
            self.max_priority_fee_per_gas = TransactionDefaults.max_priority_fee_per_gas
        if self.ty < 2:
            assert self.max_fee_per_gas is None, "max_fee_per_gas must be None"
            assert self.max_priority_fee_per_gas is None, "max_priority_fee_per_gas must be None"

        if self.ty == 3 and self.max_fee_per_blob_gas is None:
            self.max_fee_per_blob_gas = 1
        if self.ty != 3:
            assert self.blob_versioned_hashes is None, "blob_versioned_hashes must be None"
            assert self.max_fee_per_blob_gas is None, "max_fee_per_blob_gas must be None"

        if self.ty == 4 and self.authorization_list is None:
            self.authorization_list = []
        if self.ty != 4:
            assert self.authorization_list is None, "authorization_list must be None"

        if self.ty == 6 and self.initcodes is None:
            self.initcodes = []
        if self.ty != 6:
            assert self.initcodes is None, "initcodes must be None"

        if "nonce" not in self.model_fields_set and self.sender is not None:
            self.nonce = HexNumber(self.sender.get_nonce())

    def with_error(
        self, error: List[TransactionException] | TransactionException
    ) -> "Transaction":
        """Create a copy of the transaction with an added error."""
        return self.copy(error=error)

    def with_nonce(self, nonce: int) -> "Transaction":
        """Create a copy of the transaction with a modified nonce."""
        return self.copy(nonce=nonce)

    @cached_property
    def signature_bytes(self) -> Bytes:
        """Returns the serialized bytes of the transaction signature."""
        assert "v" in self.model_fields_set, "transaction must be signed"
        v = int(self.v)
        if self.ty == 0:
            if self.protected:
                assert self.chain_id is not None
                v -= 35 + (self.chain_id * 2)
            else:
                v -= 27
        return Bytes(
            self.r.to_bytes(32, byteorder="big")
            + self.s.to_bytes(32, byteorder="big")
            + bytes([v])
        )

    def sign(self: "Transaction"):
        """Signs the authorization tuple with a private key."""
        signature_bytes: bytes | None = None
        rlp_signing_bytes = self.rlp_signing_bytes()
        if (
            "v" not in self.model_fields_set
            and "r" not in self.model_fields_set
            and "s" not in self.model_fields_set
        ):
            signing_key: Hash | None = None
            if self.secret_key is not None:
                signing_key = self.secret_key
                self.secret_key = None
            elif self.sender is not None:
                eoa = self.sender
                assert eoa is not None, "signer must be set"
                signing_key = eoa.key
            assert signing_key is not None, "secret_key or signer must be set"

            signature_bytes = PrivateKey(secret=signing_key).sign_recoverable(
                rlp_signing_bytes, hasher=keccak256
            )
            v, r, s = (
                signature_bytes[64],
                int.from_bytes(signature_bytes[0:32], byteorder="big"),
                int.from_bytes(signature_bytes[32:64], byteorder="big"),
            )
            if self.ty == 0:
                if self.protected:
                    v += 35 + (self.chain_id * 2)
                else:  # not protected
                    v += 27
            self.v, self.r, self.s = (HexNumber(v), HexNumber(r), HexNumber(s))
            self.model_fields_set.add("v")
            self.model_fields_set.add("r")
            self.model_fields_set.add("s")

        if self.sender is None:
            try:
                if not signature_bytes:
                    v = self.v
                    if self.ty == 0:
                        if v > 28:
                            v -= 35 + (self.chain_id * 2)
                        else:  # not protected
                            v -= 27
                    signature_bytes = (
                        int(self.r).to_bytes(32, byteorder="big")
                        + int(self.s).to_bytes(32, byteorder="big")
                        + bytes([v])
                    )
                public_key = PublicKey.from_signature_and_message(
                    signature_bytes, rlp_signing_bytes.keccak256(), hasher=None
                )
                self.sender = EOA(
                    address=Address(keccak256(public_key.format(compressed=False)[1:])[32 - 20 :])
                )
            except Exception:
                # Signer remains `None` in this case
                pass

    def with_signature_and_sender(self, *, keep_secret_key: bool = False) -> "Transaction":
        """Return signed version of the transaction using the private key."""
        updated_values: Dict[str, Any] = {}

        if (
            "v" in self.model_fields_set
            or "r" in self.model_fields_set
            or "s" in self.model_fields_set
        ):
            # Transaction already signed
            if self.sender is not None:
                return self

            public_key = PublicKey.from_signature_and_message(
                self.signature_bytes, self.rlp_signing_bytes().keccak256(), hasher=None
            )
            updated_values["sender"] = Address(
                keccak256(public_key.format(compressed=False)[1:])[32 - 20 :]
            )
            return self.copy(**updated_values)

        if self.secret_key is None:
            raise ValueError("secret_key must be set to sign a transaction")

        # Get the signing bytes
        signing_hash = self.rlp_signing_bytes().keccak256()

        # Sign the bytes
        signature_bytes = PrivateKey(secret=self.secret_key).sign_recoverable(
            signing_hash, hasher=None
        )
        public_key = PublicKey.from_signature_and_message(
            signature_bytes, signing_hash, hasher=None
        )

        sender = keccak256(public_key.format(compressed=False)[1:])[32 - 20 :]
        updated_values["sender"] = Address(sender)

        v, r, s = (
            signature_bytes[64],
            int.from_bytes(signature_bytes[0:32], byteorder="big"),
            int.from_bytes(signature_bytes[32:64], byteorder="big"),
        )
        if self.ty == 0:
            if self.protected:
                v += 35 + (self.chain_id * 2)
            else:  # not protected
                v += 27

        updated_values["v"] = HexNumber(v)
        updated_values["r"] = HexNumber(r)
        updated_values["s"] = HexNumber(s)

        updated_values["secret_key"] = None

        updated_tx: "Transaction" = self.model_copy(update=updated_values)

        # Remove the secret key if requested
        if keep_secret_key:
            updated_tx.secret_key = self.secret_key
        return updated_tx

    def get_rlp_signing_fields(self) -> List[str]:
        """
        Return the list of values included in the envelope used for signing depending on
        the transaction type.
        """
        field_list: List[str]
        if self.ty == 6:
            # EIP-7873: https://eips.ethereum.org/EIPS/eip-7873
            field_list = [
                "chain_id",
                "nonce",
                "max_priority_fee_per_gas",
                "max_fee_per_gas",
                "gas_limit",
                "to",
                "value",
                "data",
                "access_list",
                "initcodes",
            ]
        elif self.ty == 4:
            # EIP-7702: https://eips.ethereum.org/EIPS/eip-7702
            field_list = [
                "chain_id",
                "nonce",
                "max_priority_fee_per_gas",
                "max_fee_per_gas",
                "gas_limit",
                "to",
                "value",
                "data",
                "access_list",
                "authorization_list",
            ]
        elif self.ty == 3:
            # EIP-4844: https://eips.ethereum.org/EIPS/eip-4844
            field_list = [
                "chain_id",
                "nonce",
                "max_priority_fee_per_gas",
                "max_fee_per_gas",
                "gas_limit",
                "to",
                "value",
                "data",
                "access_list",
                "max_fee_per_blob_gas",
                "blob_versioned_hashes",
            ]
        elif self.ty == 2:
            # EIP-1559: https://eips.ethereum.org/EIPS/eip-1559
            field_list = [
                "chain_id",
                "nonce",
                "max_priority_fee_per_gas",
                "max_fee_per_gas",
                "gas_limit",
                "to",
                "value",
                "data",
                "access_list",
            ]
        elif self.ty == 1:
            # EIP-2930: https://eips.ethereum.org/EIPS/eip-2930
            field_list = [
                "chain_id",
                "nonce",
                "gas_price",
                "gas_limit",
                "to",
                "value",
                "data",
                "access_list",
            ]
        elif self.ty == 0:
            field_list = ["nonce", "gas_price", "gas_limit", "to", "value", "data"]
            if self.protected:
                # EIP-155: https://eips.ethereum.org/EIPS/eip-155
                field_list.extend(["chain_id", "zero", "zero"])
        else:
            raise NotImplementedError(f"signing for transaction type {self.ty} not implemented")

        for field in field_list:
            if field != "to":
                assert getattr(self, field) is not None, (
                    f"{field} must be set for type {self.ty} tx"
                )
        return field_list

    def get_rlp_fields(self) -> List[str]:
        """
        Return the list of values included in the list used for rlp encoding depending on
        the transaction type.
        """
        fields = self.get_rlp_signing_fields()
        if self.ty == 0 and self.protected:
            fields = fields[:-3]
        return fields + ["v", "r", "s"]

    def get_rlp_prefix(self) -> bytes:
        """
        Return the transaction type as bytes to be appended at the beginning of the
        serialized transaction if type is not 0.
        """
        if self.ty > 0:
            return bytes([self.ty])
        return b""

    def get_rlp_signing_prefix(self) -> bytes:
        """
        Return the transaction type as bytes to be appended at the beginning of the
        serialized transaction signing envelope if type is not 0.
        """
        if self.ty > 0:
            return bytes([self.ty])
        return b""

    @cached_property
    def hash(self) -> Hash:
        """Returns hash of the transaction."""
        return self.rlp().keccak256()

    @cached_property
    def serializable_list(self) -> Any:
        """Return list of values included in the transaction as a serializable object."""
        return self.rlp() if self.ty > 0 else self.to_list(signing=False)

    @staticmethod
    def list_root(input_txs: List["Transaction"]) -> Hash:
        """Return transactions root of a list of transactions."""
        t = HexaryTrie(db={})
        for i, tx in enumerate(input_txs):
            t.set(eth_rlp.encode(Uint(i)), tx.rlp())
        return Hash(t.root_hash)

    @staticmethod
    def list_blob_versioned_hashes(input_txs: List["Transaction"]) -> List[Hash]:
        """Get list of ordered blob versioned hashes from a list of transactions."""
        return [
            blob_versioned_hash
            for tx in input_txs
            if tx.blob_versioned_hashes is not None
            for blob_versioned_hash in tx.blob_versioned_hashes
        ]

    @cached_property
    def created_contract(self) -> Address:
        """Return address of the contract created by the transaction."""
        if self.to is not None:
            raise ValueError("transaction is not a contract creation")
        if self.sender is None:
            raise ValueError("sender address is None")
        hash_bytes = Bytes(eth_rlp.encode([self.sender, int_to_bytes(self.nonce)])).keccak256()
        return Address(hash_bytes[-20:])

InvalidFeePaymentError

Bases: Exception

Transaction described more than one fee payment type.

Source code in src/ethereum_test_types/transaction_types.py
257
258
259
260
261
262
class InvalidFeePaymentError(Exception):
    """Transaction described more than one fee payment type."""

    def __str__(self):
        """Print exception string."""
        return "only one type of fee payment field can be used in a single tx"

__str__()

Print exception string.

Source code in src/ethereum_test_types/transaction_types.py
260
261
262
def __str__(self):
    """Print exception string."""
    return "only one type of fee payment field can be used in a single tx"

InvalidSignaturePrivateKeyError

Bases: Exception

Transaction describes both the signature and private key of source account.

Source code in src/ethereum_test_types/transaction_types.py
264
265
266
267
268
269
270
271
272
class InvalidSignaturePrivateKeyError(Exception):
    """
    Transaction describes both the signature and private key of
    source account.
    """

    def __str__(self):
        """Print exception string."""
        return "can't define both 'signature' and 'private_key'"

__str__()

Print exception string.

Source code in src/ethereum_test_types/transaction_types.py
270
271
272
def __str__(self):
    """Print exception string."""
    return "can't define both 'signature' and 'private_key'"

model_post_init(__context)

Ensure transaction has no conflicting properties.

Source code in src/ethereum_test_types/transaction_types.py
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
def model_post_init(self, __context):
    """Ensure transaction has no conflicting properties."""
    super().model_post_init(__context)

    if self.gas_price is not None and (
        self.max_fee_per_gas is not None
        or self.max_priority_fee_per_gas is not None
        or self.max_fee_per_blob_gas is not None
    ):
        raise Transaction.InvalidFeePaymentError()

    if "ty" not in self.model_fields_set:
        # Try to deduce transaction type from included fields
        if self.initcodes is not None:
            self.ty = 6
        elif self.authorization_list is not None:
            self.ty = 4
        elif self.max_fee_per_blob_gas is not None or self.blob_versioned_hashes is not None:
            self.ty = 3
        elif self.max_fee_per_gas is not None or self.max_priority_fee_per_gas is not None:
            self.ty = 2
        elif self.access_list is not None:
            self.ty = 1
        else:
            self.ty = 0

    if "v" in self.model_fields_set and self.secret_key is not None:
        raise Transaction.InvalidSignaturePrivateKeyError()

    if "v" not in self.model_fields_set and self.secret_key is None:
        if self.sender is not None:
            self.secret_key = self.sender.key
        else:
            self.secret_key = Hash(TestPrivateKey)
            self.sender = EOA(address=TestAddress, key=self.secret_key, nonce=0)

    # Set default values for fields that are required for certain tx types
    if self.ty <= 1 and self.gas_price is None:
        self.gas_price = TransactionDefaults.gas_price
    if self.ty >= 1 and self.access_list is None:
        self.access_list = []
    if self.ty < 1:
        assert self.access_list is None, "access_list must be None"

    if self.ty >= 2 and self.max_fee_per_gas is None:
        self.max_fee_per_gas = TransactionDefaults.max_fee_per_gas
    if self.ty >= 2 and self.max_priority_fee_per_gas is None:
        self.max_priority_fee_per_gas = TransactionDefaults.max_priority_fee_per_gas
    if self.ty < 2:
        assert self.max_fee_per_gas is None, "max_fee_per_gas must be None"
        assert self.max_priority_fee_per_gas is None, "max_priority_fee_per_gas must be None"

    if self.ty == 3 and self.max_fee_per_blob_gas is None:
        self.max_fee_per_blob_gas = 1
    if self.ty != 3:
        assert self.blob_versioned_hashes is None, "blob_versioned_hashes must be None"
        assert self.max_fee_per_blob_gas is None, "max_fee_per_blob_gas must be None"

    if self.ty == 4 and self.authorization_list is None:
        self.authorization_list = []
    if self.ty != 4:
        assert self.authorization_list is None, "authorization_list must be None"

    if self.ty == 6 and self.initcodes is None:
        self.initcodes = []
    if self.ty != 6:
        assert self.initcodes is None, "initcodes must be None"

    if "nonce" not in self.model_fields_set and self.sender is not None:
        self.nonce = HexNumber(self.sender.get_nonce())

with_error(error)

Create a copy of the transaction with an added error.

Source code in src/ethereum_test_types/transaction_types.py
345
346
347
348
349
def with_error(
    self, error: List[TransactionException] | TransactionException
) -> "Transaction":
    """Create a copy of the transaction with an added error."""
    return self.copy(error=error)

with_nonce(nonce)

Create a copy of the transaction with a modified nonce.

Source code in src/ethereum_test_types/transaction_types.py
351
352
353
def with_nonce(self, nonce: int) -> "Transaction":
    """Create a copy of the transaction with a modified nonce."""
    return self.copy(nonce=nonce)

signature_bytes: Bytes cached property

Returns the serialized bytes of the transaction signature.

sign()

Signs the authorization tuple with a private key.

Source code in src/ethereum_test_types/transaction_types.py
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
def sign(self: "Transaction"):
    """Signs the authorization tuple with a private key."""
    signature_bytes: bytes | None = None
    rlp_signing_bytes = self.rlp_signing_bytes()
    if (
        "v" not in self.model_fields_set
        and "r" not in self.model_fields_set
        and "s" not in self.model_fields_set
    ):
        signing_key: Hash | None = None
        if self.secret_key is not None:
            signing_key = self.secret_key
            self.secret_key = None
        elif self.sender is not None:
            eoa = self.sender
            assert eoa is not None, "signer must be set"
            signing_key = eoa.key
        assert signing_key is not None, "secret_key or signer must be set"

        signature_bytes = PrivateKey(secret=signing_key).sign_recoverable(
            rlp_signing_bytes, hasher=keccak256
        )
        v, r, s = (
            signature_bytes[64],
            int.from_bytes(signature_bytes[0:32], byteorder="big"),
            int.from_bytes(signature_bytes[32:64], byteorder="big"),
        )
        if self.ty == 0:
            if self.protected:
                v += 35 + (self.chain_id * 2)
            else:  # not protected
                v += 27
        self.v, self.r, self.s = (HexNumber(v), HexNumber(r), HexNumber(s))
        self.model_fields_set.add("v")
        self.model_fields_set.add("r")
        self.model_fields_set.add("s")

    if self.sender is None:
        try:
            if not signature_bytes:
                v = self.v
                if self.ty == 0:
                    if v > 28:
                        v -= 35 + (self.chain_id * 2)
                    else:  # not protected
                        v -= 27
                signature_bytes = (
                    int(self.r).to_bytes(32, byteorder="big")
                    + int(self.s).to_bytes(32, byteorder="big")
                    + bytes([v])
                )
            public_key = PublicKey.from_signature_and_message(
                signature_bytes, rlp_signing_bytes.keccak256(), hasher=None
            )
            self.sender = EOA(
                address=Address(keccak256(public_key.format(compressed=False)[1:])[32 - 20 :])
            )
        except Exception:
            # Signer remains `None` in this case
            pass

with_signature_and_sender(*, keep_secret_key=False)

Return signed version of the transaction using the private key.

Source code in src/ethereum_test_types/transaction_types.py
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
def with_signature_and_sender(self, *, keep_secret_key: bool = False) -> "Transaction":
    """Return signed version of the transaction using the private key."""
    updated_values: Dict[str, Any] = {}

    if (
        "v" in self.model_fields_set
        or "r" in self.model_fields_set
        or "s" in self.model_fields_set
    ):
        # Transaction already signed
        if self.sender is not None:
            return self

        public_key = PublicKey.from_signature_and_message(
            self.signature_bytes, self.rlp_signing_bytes().keccak256(), hasher=None
        )
        updated_values["sender"] = Address(
            keccak256(public_key.format(compressed=False)[1:])[32 - 20 :]
        )
        return self.copy(**updated_values)

    if self.secret_key is None:
        raise ValueError("secret_key must be set to sign a transaction")

    # Get the signing bytes
    signing_hash = self.rlp_signing_bytes().keccak256()

    # Sign the bytes
    signature_bytes = PrivateKey(secret=self.secret_key).sign_recoverable(
        signing_hash, hasher=None
    )
    public_key = PublicKey.from_signature_and_message(
        signature_bytes, signing_hash, hasher=None
    )

    sender = keccak256(public_key.format(compressed=False)[1:])[32 - 20 :]
    updated_values["sender"] = Address(sender)

    v, r, s = (
        signature_bytes[64],
        int.from_bytes(signature_bytes[0:32], byteorder="big"),
        int.from_bytes(signature_bytes[32:64], byteorder="big"),
    )
    if self.ty == 0:
        if self.protected:
            v += 35 + (self.chain_id * 2)
        else:  # not protected
            v += 27

    updated_values["v"] = HexNumber(v)
    updated_values["r"] = HexNumber(r)
    updated_values["s"] = HexNumber(s)

    updated_values["secret_key"] = None

    updated_tx: "Transaction" = self.model_copy(update=updated_values)

    # Remove the secret key if requested
    if keep_secret_key:
        updated_tx.secret_key = self.secret_key
    return updated_tx

get_rlp_signing_fields()

Return the list of values included in the envelope used for signing depending on the transaction type.

Source code in src/ethereum_test_types/transaction_types.py
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
def get_rlp_signing_fields(self) -> List[str]:
    """
    Return the list of values included in the envelope used for signing depending on
    the transaction type.
    """
    field_list: List[str]
    if self.ty == 6:
        # EIP-7873: https://eips.ethereum.org/EIPS/eip-7873
        field_list = [
            "chain_id",
            "nonce",
            "max_priority_fee_per_gas",
            "max_fee_per_gas",
            "gas_limit",
            "to",
            "value",
            "data",
            "access_list",
            "initcodes",
        ]
    elif self.ty == 4:
        # EIP-7702: https://eips.ethereum.org/EIPS/eip-7702
        field_list = [
            "chain_id",
            "nonce",
            "max_priority_fee_per_gas",
            "max_fee_per_gas",
            "gas_limit",
            "to",
            "value",
            "data",
            "access_list",
            "authorization_list",
        ]
    elif self.ty == 3:
        # EIP-4844: https://eips.ethereum.org/EIPS/eip-4844
        field_list = [
            "chain_id",
            "nonce",
            "max_priority_fee_per_gas",
            "max_fee_per_gas",
            "gas_limit",
            "to",
            "value",
            "data",
            "access_list",
            "max_fee_per_blob_gas",
            "blob_versioned_hashes",
        ]
    elif self.ty == 2:
        # EIP-1559: https://eips.ethereum.org/EIPS/eip-1559
        field_list = [
            "chain_id",
            "nonce",
            "max_priority_fee_per_gas",
            "max_fee_per_gas",
            "gas_limit",
            "to",
            "value",
            "data",
            "access_list",
        ]
    elif self.ty == 1:
        # EIP-2930: https://eips.ethereum.org/EIPS/eip-2930
        field_list = [
            "chain_id",
            "nonce",
            "gas_price",
            "gas_limit",
            "to",
            "value",
            "data",
            "access_list",
        ]
    elif self.ty == 0:
        field_list = ["nonce", "gas_price", "gas_limit", "to", "value", "data"]
        if self.protected:
            # EIP-155: https://eips.ethereum.org/EIPS/eip-155
            field_list.extend(["chain_id", "zero", "zero"])
    else:
        raise NotImplementedError(f"signing for transaction type {self.ty} not implemented")

    for field in field_list:
        if field != "to":
            assert getattr(self, field) is not None, (
                f"{field} must be set for type {self.ty} tx"
            )
    return field_list

get_rlp_fields()

Return the list of values included in the list used for rlp encoding depending on the transaction type.

Source code in src/ethereum_test_types/transaction_types.py
584
585
586
587
588
589
590
591
592
def get_rlp_fields(self) -> List[str]:
    """
    Return the list of values included in the list used for rlp encoding depending on
    the transaction type.
    """
    fields = self.get_rlp_signing_fields()
    if self.ty == 0 and self.protected:
        fields = fields[:-3]
    return fields + ["v", "r", "s"]

get_rlp_prefix()

Return the transaction type as bytes to be appended at the beginning of the serialized transaction if type is not 0.

Source code in src/ethereum_test_types/transaction_types.py
594
595
596
597
598
599
600
601
def get_rlp_prefix(self) -> bytes:
    """
    Return the transaction type as bytes to be appended at the beginning of the
    serialized transaction if type is not 0.
    """
    if self.ty > 0:
        return bytes([self.ty])
    return b""

get_rlp_signing_prefix()

Return the transaction type as bytes to be appended at the beginning of the serialized transaction signing envelope if type is not 0.

Source code in src/ethereum_test_types/transaction_types.py
603
604
605
606
607
608
609
610
def get_rlp_signing_prefix(self) -> bytes:
    """
    Return the transaction type as bytes to be appended at the beginning of the
    serialized transaction signing envelope if type is not 0.
    """
    if self.ty > 0:
        return bytes([self.ty])
    return b""

hash: Hash cached property

Returns hash of the transaction.

serializable_list: Any cached property

Return list of values included in the transaction as a serializable object.

list_root(input_txs) staticmethod

Return transactions root of a list of transactions.

Source code in src/ethereum_test_types/transaction_types.py
622
623
624
625
626
627
628
@staticmethod
def list_root(input_txs: List["Transaction"]) -> Hash:
    """Return transactions root of a list of transactions."""
    t = HexaryTrie(db={})
    for i, tx in enumerate(input_txs):
        t.set(eth_rlp.encode(Uint(i)), tx.rlp())
    return Hash(t.root_hash)

list_blob_versioned_hashes(input_txs) staticmethod

Get list of ordered blob versioned hashes from a list of transactions.

Source code in src/ethereum_test_types/transaction_types.py
630
631
632
633
634
635
636
637
638
@staticmethod
def list_blob_versioned_hashes(input_txs: List["Transaction"]) -> List[Hash]:
    """Get list of ordered blob versioned hashes from a list of transactions."""
    return [
        blob_versioned_hash
        for tx in input_txs
        if tx.blob_versioned_hashes is not None
        for blob_versioned_hash in tx.blob_versioned_hashes
    ]

created_contract: Address cached property

Return address of the contract created by the transaction.

TransactionDefaults dataclass

Default values for transactions.

Source code in src/ethereum_test_types/transaction_types.py
53
54
55
56
57
58
59
60
@dataclass
class TransactionDefaults:
    """Default values for transactions."""

    chain_id: int = 1
    gas_price = 10
    max_fee_per_gas = 7
    max_priority_fee_per_gas: int = 0

TransactionType

Bases: IntEnum

Transaction types.

Source code in src/ethereum_test_types/transaction_types.py
43
44
45
46
47
48
49
50
class TransactionType(IntEnum):
    """Transaction types."""

    LEGACY = 0
    ACCESS_LIST = 1
    BASE_FEE = 2
    BLOB_TRANSACTION = 3
    SET_CODE = 4

Removable

Sentinel class to detect if a parameter should be removed. (None normally means "do not modify").

Source code in src/ethereum_test_types/utils.py
22
23
24
25
26
27
28
29
30
31
32
class Removable:
    """
    Sentinel class to detect if a parameter should be removed.
    (`None` normally means "do not modify").
    """

    def __eq__(self, other: Any) -> bool:
        """Return True for all Removable."""
        if not isinstance(other, Removable):
            return NotImplemented
        return True

__eq__(other)

Return True for all Removable.

Source code in src/ethereum_test_types/utils.py
28
29
30
31
32
def __eq__(self, other: Any) -> bool:
    """Return True for all Removable."""
    if not isinstance(other, Removable):
        return NotImplemented
    return True

keccak256(data)

Calculate keccak256 hash of the given data.

Source code in src/ethereum_test_types/utils.py
 8
 9
10
def keccak256(data: bytes) -> Hash:
    """Calculate keccak256 hash of the given data."""
    return Bytes(data).keccak256()