237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422 | @pytest.mark.valid_from("Prague")
@pytest.mark.parametrize("num_contracts", [1, 5, 10, 20, 100])
def test_sstore_erc20_approve(
blockchain_test: BlockchainTestFiller,
pre: Alloc,
fork: Fork,
gas_benchmark_value: int,
address_stubs: AddressStubs,
num_contracts: int,
request: pytest.FixtureRequest,
) -> None:
"""
BloatNet SSTORE benchmark using ERC20 approve to write to storage.
This test:
1. Filters stubs matching test name prefix
(e.g., test_sstore_erc20_approve_*)
2. Uses first N contracts based on num_contracts parameter
3. Splits gas budget evenly across the selected contracts
4. Calls approve(spender, amount) incrementally (counter as spender)
5. Forces SSTOREs to allowance mapping storage slots
"""
# Extract test function name for stub filtering
test_name = request.node.name.split("[")[0] # Remove parametrization suffix
# Filter stubs that match the test name prefix
matching_stubs = [
stub_name for stub_name in address_stubs.root.keys() if stub_name.startswith(test_name)
]
# Validate we have enough stubs
if len(matching_stubs) < num_contracts:
pytest.fail(
f"Not enough matching stubs for test '{test_name}'. "
f"Required: {num_contracts}, Found: {len(matching_stubs)}. "
f"Matching stubs: {matching_stubs}"
)
# Select first N stubs
selected_stubs = matching_stubs[:num_contracts]
gas_costs = fork.gas_costs()
# Calculate gas costs
intrinsic_gas = fork.transaction_intrinsic_cost_calculator()(calldata=b"")
# Per-contract fixed overhead (setup + teardown)
memory_expansion_cost = 15 # Memory expansion to 160 bytes (5 words)
overhead_per_contract = (
gas_costs.G_VERY_LOW # MSTORE to initialize counter (3)
+ memory_expansion_cost # Memory expansion (15)
+ gas_costs.G_JUMPDEST # JUMPDEST at loop start (1)
+ gas_costs.G_LOW # MLOAD for While condition check (5)
+ gas_costs.G_BASE # ISZERO (2)
+ gas_costs.G_BASE # ISZERO (2)
+ gas_costs.G_MID # JUMPI (8)
+ gas_costs.G_BASE # POP to clean up counter at end (2)
) # = 38
# Fixed overhead per iteration (loop mechanics, independent of warm/cold)
loop_overhead = (
# Attack contract loop body operations
gas_costs.G_VERY_LOW # MSTORE selector at memory[32] (3)
+ gas_costs.G_LOW # MLOAD counter (5)
+ gas_costs.G_VERY_LOW # MSTORE spender at memory[64] (3)
+ gas_costs.G_BASE # POP call result (2)
# Counter decrement: MSTORE(0, SUB(MLOAD(0), 1))
+ gas_costs.G_LOW # MLOAD counter (5)
+ gas_costs.G_VERY_LOW # PUSH1 1 (3)
+ gas_costs.G_VERY_LOW # SUB (3)
+ gas_costs.G_VERY_LOW # MSTORE counter back (3)
# While loop condition check
+ gas_costs.G_LOW # MLOAD counter (5)
+ gas_costs.G_BASE # ISZERO (2)
+ gas_costs.G_BASE # ISZERO (2)
+ gas_costs.G_MID # JUMPI back to loop start (8)
)
# ERC20 internal gas (same for all calls)
# Note: SSTORE cost is 22100 for cold slot, zero-to-non-zero
# (20000 base + 2100 cold access)
erc20_internal_gas = (
gas_costs.G_VERY_LOW # PUSH4 selector (3)
+ gas_costs.G_BASE # EQ selector match (2)
+ gas_costs.G_MID # JUMPI to function (8)
+ gas_costs.G_JUMPDEST # JUMPDEST at function start (1)
+ gas_costs.G_VERY_LOW # CALLDATALOAD spender (3)
+ gas_costs.G_VERY_LOW # CALLDATALOAD amount (3)
+ gas_costs.G_KECCAK_256 # keccak256 static (30)
+ gas_costs.G_KECCAK_256_WORD * 2 # keccak256 dynamic for 64 bytes (12)
+ gas_costs.G_COLD_SLOAD # Cold SLOAD for allowance check (2100)
+ gas_costs.G_STORAGE_SET # SSTORE base cost (20000)
+ gas_costs.G_COLD_SLOAD # Additional cold storage access (2100)
+ gas_costs.G_VERY_LOW # PUSH1 1 for return value (3)
+ gas_costs.G_VERY_LOW # MSTORE return value (3)
+ gas_costs.G_VERY_LOW # PUSH1 32 for return size (3)
+ gas_costs.G_VERY_LOW # PUSH1 0 for return offset (3)
# RETURN costs 0 gas
)
# Calculate total gas needed
total_overhead = intrinsic_gas + (overhead_per_contract * num_contracts)
available_gas_for_iterations = gas_benchmark_value - total_overhead
# For each contract: first call is COLD (2600), subsequent are WARM (100)
# Solve for calls per contract accounting for cold/warm transition
warm_call_cost = loop_overhead + gas_costs.G_WARM_ACCOUNT_ACCESS + erc20_internal_gas
cold_warm_diff = gas_costs.G_COLD_ACCOUNT_ACCESS - gas_costs.G_WARM_ACCOUNT_ACCESS
# Per contract: gas_available = cold_warm_diff + calls * warm_call_cost
gas_per_contract = available_gas_for_iterations // num_contracts
calls_per_contract = int((gas_per_contract - cold_warm_diff) // warm_call_cost)
# Deploy selected ERC20 contracts using stubs
erc20_addresses = []
for stub_name in selected_stubs:
addr = pre.deploy_contract(
code=Bytecode(),
stub=stub_name,
)
erc20_addresses.append(addr)
# Log test requirements
print(
f"Total gas budget: {gas_benchmark_value / 1_000_000:.1f}M gas. "
f"Intrinsic: {intrinsic_gas}, Overhead per contract: {overhead_per_contract}, "
f"Warm call cost: {warm_call_cost}. "
f"{calls_per_contract} approve calls per contract ({num_contracts} contracts)."
)
# Build attack code that loops through each contract
attack_code: Bytecode = (
Op.JUMPDEST # Entry point
+ Op.MSTORE(offset=0, value=APPROVE_SELECTOR) # Store selector once for all contracts
)
for erc20_address in erc20_addresses:
# For each contract, initialize counter and loop
attack_code += (
# Initialize counter in memory[32] = number of calls
Op.MSTORE(offset=32, value=calls_per_contract)
# Loop for this specific contract
+ While(
condition=Op.MLOAD(32) + Op.ISZERO + Op.ISZERO, # Continue while counter > 0
body=(
# Store spender at memory[64] (counter as spender/amount)
Op.MSTORE(offset=64, value=Op.MLOAD(32))
# Call approve(spender, amount) on ERC20 contract
# args_offset=28 reads: selector from MEM[28:32] +
# spender from MEM[32:64] + amount from MEM[64:96]
# Note: counter at MEM[32:64] is reused as spender,
# and value at MEM[64:96] serves as the amount
+ Op.CALL(
address=erc20_address,
value=0,
args_offset=28,
args_size=68, # 4 bytes selector + 32 bytes spender + 32 bytes amount
ret_offset=0,
ret_size=0,
)
+ Op.POP # Discard CALL success status
# Decrement counter: counter - 1
+ Op.MSTORE(offset=32, value=Op.SUB(Op.MLOAD(32), 1))
),
)
)
# Deploy attack contract
attack_address = pre.deploy_contract(code=attack_code)
# Run the attack
attack_tx = Transaction(
to=attack_address,
gas_limit=gas_benchmark_value,
sender=pre.fund_eoa(),
)
# Post-state
post = {
attack_address: Account(storage={}),
}
blockchain_test(
pre=pre,
blocks=[Block(txs=[attack_tx])],
post=post,
)
|