Source code for nqs_sdk.bindings.protocols.uniswap_v3.spots.historical_uniswap_pool
import os
from nqs_sdk import BlockNumberOrTimestamp, quantlib
from nqs_sdk.bindings.protocols.uniswap_v3.uniswap_v3_pool import UniswapV3Pool
from nqs_sdk.bindings.spots.spot_generator import SpotGenerator
DATA_SOURCE = quantlib.QuantlibDataProvider(os.getenv("QUANTLIB_CONFIG"))
[docs]
class HistoricalSpotGenerator(SpotGenerator):
[docs]
def __init__(self, pools: list[UniswapV3Pool]) -> None:
super().__init__([(pool.token0, pool.token1) for pool in pools])
self.pools = pools
[docs]
def generate_spot_timestamps(self, timestamps: list[int]) -> list[tuple[list[int], list[float]]]:
spots = []
for pool in self.pools:
data = DATA_SOURCE.uniswap_v3_pool_exchange_rate(
pool.address,
BlockNumberOrTimestamp.timestamp(timestamps[0]),
BlockNumberOrTimestamp.timestamp(timestamps[-1]),
).move_as_dict()
timestamp = data["timestamp"]
spot = data["exchange_rate"]
spots.append((timestamp, spot))
return spots