Source code for nqs_sdk.bindings.tx_generators.historical_tx_generator

from abc import ABC, abstractmethod
from typing import List, Optional, Tuple

from nqs_sdk import Metrics, RefSharedState, SimulationClock, TxRequest
from nqs_sdk.bindings.tx_generators.abstract_transaction import Transaction
from nqs_sdk.interfaces.tx_generator import TxGenerator


[docs] class HistoricalTxGenerator(TxGenerator, ABC):
[docs] def __init__(self) -> None: self.run_cache: bool = False self.previous_block: Optional[int] = None
[docs] def id(self) -> str: return self.protocol_id
@property @abstractmethod def protocol_id(self) -> str: pass
[docs] @abstractmethod def get_transactions(self, start_block: int, end_block: int) -> list[Transaction]: pass
[docs] @abstractmethod def cache_transactions(self, start_block: int, end_block: int) -> None: pass
[docs] @abstractmethod def get_next_block(self, current_block: int) -> Optional[int]: pass
[docs] def next( self, clock: SimulationClock, state: RefSharedState, metrics: Metrics ) -> Tuple[List[TxRequest], Optional[int]]: current_block = clock.current_block() if self.previous_block is None: self.previous_block = current_block if not self.run_cache: simulation_time = clock.simulation_time() self.cache_transactions(start_block=simulation_time.start_block(), end_block=simulation_time.stop_block()) self.run_cache = True txns = [] for tx in self.get_transactions(start_block=self.previous_block, end_block=current_block): txns.append( tx.to_tx_request( protocol=self.protocol_id, source="Masao", sender="0x999999cf1046e68e36E1aA2E0E07105eDDD1f08E", order=2.0, # order is 0.0 for arbitrage, 1.0 for agents and 2.0 for random ) ) self.previous_block = current_block return ( txns, self.get_next_block(current_block), )
def __str__(self) -> str: return f"{self.protocol_id}_historical_tx_generator"