Source code for nqs_sdk.coding_envs.protocols.uniswap_v3.uniswap_v3_coding_env

# mypy: disable-error-code="return-value"

from datetime import timedelta
from decimal import Decimal
from typing import List, Optional

import pandas as pd

from nqs_sdk.bindings.protocols.uniswap_v3.tx_generators.uniswap_v3_historical import UniswapV3HistoricalTxGenerator
from nqs_sdk.bindings.protocols.uniswap_v3.uniswap_v3_arbitrager import UniswapV3Arbitrager
from nqs_sdk.bindings.protocols.uniswap_v3.uniswap_v3_factory import UniswapV3Factory
from nqs_sdk.bindings.protocols.uniswap_v3.uniswap_v3_pool import UniswapV3Pool
from nqs_sdk.bindings.protocols.uniswap_v3.uniswap_v3_transactions import (
    BurnTransaction,
    CollectTransaction,
    MintTransaction,
    SwapTransaction,
)
from nqs_sdk.coding_envs.protocols.coding_protocol import CodingProtocol
from nqs_sdk.coding_envs.protocols.uniswap_v3.uniswap_v3_abstract import UniswapV3Protocol
from nqs_sdk.interfaces.protocol_metafactory import ProtocolMetaFactory
from nqs_sdk.interfaces.tx_generator import TxGenerator
from nqs_sdk.utils.logging import local_logger


logger = local_logger(__name__)


[docs] class UniswapV3CodingProtocol(CodingProtocol, UniswapV3Protocol):
[docs] def __init__(self, pool: UniswapV3Pool, generators: list[str | TxGenerator] = []) -> None: super().__init__(pool) self.agents_positions: dict[str, dict[str, dict[str, int]]] = {} self.pool: UniswapV3Pool = pool self.generators: list[str | TxGenerator] = generators
[docs] def id(self) -> str: return self.pool.name
[docs] def get_protocol_factory(self) -> Optional[ProtocolMetaFactory]: return UniswapV3Factory
[docs] def get_protocol_description(self) -> tuple[str, str, list[str]]: import inspect return ( ( "Uniswap V3 is a decentralized exchange protocol that allows users to swap tokens " "or provide/remove liquidity to the pool. It features concentrated liquidity, " "allowing liquidity providers to specify price ranges for their capital. " "Users can earn fees from trades that occur within their specified price ranges. " "The pool has two tokens, token0 and token1, and a fee tier. " "The fee tier is the fee that is charged for each trade. " "Common fee tiers are 0.05%, 0.3%, or 1%." ), ( f"{self.pool.name} is the pool with the following parameters: " f"token0: {self.pool.token0}, token1: {self.pool.token1}, fee tier: {self.pool.fee_tier}." ), [inspect.getsource(UniswapV3Protocol)], )
[docs] def get_tx_generators(self) -> List[TxGenerator]: tx_generators: List[TxGenerator] = [] for gen in self.generators: if isinstance(gen, str): if gen == "historical": tx_generators.append(UniswapV3HistoricalTxGenerator(self.pool)) elif gen == "arbitrager": tx_generators.append(UniswapV3Arbitrager([self.pool])) else: raise ValueError(f"Unknown data generator: {gen}") elif isinstance(gen, TxGenerator): tx_generators.append(gen) else: raise ValueError(f"Unknown data generator: {gen}") return tx_generators
[docs] def get_observables_names(self) -> list[str]: metrics_str = [] # protocol metrics metrics_str.append(f"{self.pool.name}.dex_spot") metrics_str.append(f"{self.pool.name}.liquidity") metrics_str.append(f"{self.pool.name}.total_fees") metrics_str.append(f"{self.pool.name}.total_volume_numeraire") metrics_str.append(f'{self.pool.name}.total_volume:{{token="{self.pool.token0}"}}') metrics_str.append(f'{self.pool.name}.total_volume:{{token="{self.pool.token1}"}}') metrics_str.append(f"{self.pool.name}.current_tick") metrics_str.append(f'{self.pool.name}.total_holdings:{{token="{self.pool.token0}"}}') metrics_str.append(f'{self.pool.name}.total_holdings:{{token="{self.pool.token1}"}}') metrics_str.append(f'{self.pool.name}.total_fees:{{token="{self.pool.token0}"}}') metrics_str.append(f'{self.pool.name}.total_fees:{{token="{self.pool.token1}"}}') metrics_str.append(f"{self.protocol.name}.total_value_locked") # agent metrics for agent in self.all_agents: # agent-level metrics (without position) metrics_str.append(f"{agent}.{self.pool.name}.active_liquidity") metrics_str.append(f'{agent}.{self.pool.name}.token_amount:{{token="{self.pool.token0}"}}') metrics_str.append(f'{agent}.{self.pool.name}.token_amount:{{token="{self.pool.token1}"}}') metrics_str.append(f"{agent}.{self.pool.name}.fees_collected") metrics_str.append(f'{agent}.{self.pool.name}.fees_collected:{{token="{self.pool.token0}"}}') metrics_str.append(f'{agent}.{self.pool.name}.fees_collected:{{token="{self.pool.token1}"}}') metrics_str.append(f"{agent}.{self.pool.name}.fees_not_collected") metrics_str.append(f'{agent}.{self.pool.name}.fees_not_collected:{{token="{self.pool.token0}"}}') metrics_str.append(f'{agent}.{self.pool.name}.fees_not_collected:{{token="{self.pool.token1}"}}') metrics_str.append(f"{agent}.{self.pool.name}.abs_impermanent_loss") metrics_str.append(f"{agent}.{self.pool.name}.perc_impermanent_loss") metrics_str.append(f"{agent}.{self.pool.name}.static_ptf_value") metrics_str.append(f"{agent}.{self.pool.name}.permanent_loss") metrics_str.append(f"{agent}.{self.pool.name}.loss_versus_rebalancing") metrics_str.append(f"{agent}.{self.pool.name}.total_fees_relative_to_lvr") # position related metrics if agent in self.agents_positions: for token_id in self.agents_positions[agent]: position = token_id + "_" + str(self.agents_positions[agent][token_id]["token_id_index"]) metrics_str.append(f'{agent}.{self.pool.name}.active_liquidity:{{position="{position}"}}') metrics_str.append(f'{agent}.{self.pool.name}.liquidity:{{position="{position}"}}') metrics_str.append( f'{agent}.{self.pool.name}.token_amount:{{position="{position}", token="{self.pool.token0}"}}' ) metrics_str.append( f'{agent}.{self.pool.name}.token_amount:{{position="{position}", token="{self.pool.token1}"}}' ) metrics_str.append(f'{agent}.{self.pool.name}.net_position:{{position="{position}"}}') metrics_str.append(f'{agent}.{self.pool.name}.fees_collected:{{position="{position}"}}') metrics_str.append( f'{agent}.{self.pool.name}.fees_collected:{{position="{position}", token="{self.pool.token0}"}}' ) metrics_str.append( f'{agent}.{self.pool.name}.fees_collected:{{position="{position}", token="{self.pool.token1}"}}' ) metrics_str.append(f'{agent}.{self.pool.name}.fees_not_collected:{{position="{position}"}}') metrics_str.append( f'{agent}.{self.pool.name}.fees_not_collected:{{position="{position}", token="{self.pool.token0}"}}' # noqa: E501 ) metrics_str.append( f'{agent}.{self.pool.name}.fees_not_collected:{{position="{position}", token="{self.pool.token1}"}}' # noqa: E501 ) metrics_str.append(f'{agent}.{self.pool.name}.abs_impermanent_loss:{{position="{position}"}}') metrics_str.append(f'{agent}.{self.pool.name}.perc_impermanent_loss:{{position="{position}"}}') metrics_str.append(f'{agent}.{self.pool.name}.static_ptf_value:{{position="{position}"}}') metrics_str.append(f'{agent}.{self.pool.name}.permanent_loss:{{position="{position}"}}') metrics_str.append(f'{agent}.{self.pool.name}.loss_versus_rebalancing:{{position="{position}"}}') metrics_str.append(f'{agent}.{self.pool.name}.total_fees_relative_to_lvr:{{position="{position}"}}') metrics_str.append(f'{agent}.{self.pool.name}.upper_bound_price:{{position="{position}"}}') metrics_str.append(f'{agent}.{self.pool.name}.lower_bound_price:{{position="{position}"}}') return metrics_str
@property def token0(self) -> str: return self.pool.token0 @property def token1(self) -> str: return self.pool.token1 @property def fee_tier(self) -> float: return self.pool.fee_tier
[docs] def is_position_exists(self, token_id: str) -> bool: if self.current_agent not in self.agents_positions or token_id not in self.agents_positions[self.current_agent]: return False liquidity = self.position_liquidity(token_id) return liquidity is not None and liquidity > 0
[docs] def dex_spot(self, lookback: Optional[timedelta] = None) -> pd.Series: metric = f"{self.protocol.name}.dex_spot" return self._get_obs_timeserie(metric, lookback)
[docs] def liquidity(self, lookback: Optional[timedelta] = None) -> pd.Series: metric = f"{self.protocol.name}.liquidity" return self._get_obs_timeserie(metric, lookback)
[docs] def total_volume_numeraire(self, lookback: Optional[timedelta] = None) -> pd.Series: metric = f"{self.protocol.name}.total_volume_numeraire" return self._get_obs_timeserie(metric, lookback)
[docs] def total_volume(self, token: bool, lookback: Optional[timedelta] = None) -> pd.Series: metric = f"{self.protocol.name}.total_volume" token_str = self.pool.token1 if token else self.pool.token0 metric += f':{{token="{token_str}"}}' return self._get_obs_timeserie(metric, lookback)
[docs] def total_holdings(self, token: bool, lookback: Optional[timedelta] = None) -> pd.Series: metric = f"{self.protocol.name}.total_holdings" token_str = self.pool.token1 if token else self.pool.token0 metric += f':{{token="{token_str}"}}' return self._get_obs_timeserie(metric, lookback)
[docs] def total_fees(self, token: Optional[bool] = None, lookback: Optional[timedelta] = None) -> pd.Series: metric = f"{self.protocol.name}.total_fees" if token is not None: token_str = self.pool.token1 if token else self.pool.token0 metric += f':{{token="{token_str}"}}' return self._get_obs_timeserie(metric, lookback)
[docs] def total_value_locked(self, lookback: Optional[timedelta] = None) -> pd.Series: metric: str = f"{self.protocol.name}.total_value_locked" return self._get_obs_timeserie(metric, lookback)
[docs] def get_tick(self, lookback: Optional[timedelta] = None) -> pd.Series: metric: str = f"{self.protocol.name}.current_tick" return self._get_obs_timeserie(metric, lookback)
[docs] def swap(self, amount: float, token: bool, price_limit: Optional[float] = None) -> None: price_limit_decimal = Decimal(price_limit) if price_limit is not None else None swap_txn = SwapTransaction( amount=amount, zero_for_one=not token, price_limit=price_limit_decimal, pool=self.pool, ) self.register_transaction(self.current_agent, swap_txn)
[docs] def mint( self, price_lower: float | Decimal, price_upper: float | Decimal, max_token0: float | Decimal, max_token1: float | Decimal, token_id: str, ) -> Optional[dict[str, Decimal]]: if self.current_agent not in self.agents_positions: self.agents_positions[self.current_agent] = {} if token_id not in self.agents_positions[self.current_agent]: self.agents_positions[self.current_agent][token_id] = {"token_id_index": 0} else: self.agents_positions[self.current_agent][token_id]["token_id_index"] += 1 current_price = self.dex_spot().iloc[-1] mint_txn = MintTransaction( pool=self.pool, price_lower=price_lower, price_upper=price_upper, current_price=current_price, max_token0=max_token0, max_token1=max_token1, token_id=self._position_id(token_id), ) self.register_transaction(self.current_agent, mint_txn)
[docs] def burn(self, amount_ratio: float, token_id: str) -> None: position_bounds = self.position_bounds(token_id) # If the position is not initialized, do not burn if not self.is_position_exists(token_id): logger.warning(f"Position {token_id} is not initialized, skipping burn") return fee_not_collected_0 = self.fees_not_collected(token=self.pool.token0, token_id=token_id) fee_not_collected_1 = self.fees_not_collected(token=self.pool.token1, token_id=token_id) token0_amount = self.token_amount(self.pool.token0, token_id) token1_amount = self.token_amount(self.pool.token1, token_id) burn_txn = BurnTransaction( price_lower=position_bounds[0], price_upper=position_bounds[1], amount_ratio=Decimal(amount_ratio), pool=self.pool, ) collect_txn = CollectTransaction( price_lower=position_bounds[0], price_upper=position_bounds[1], amount_0=token0_amount + fee_not_collected_0, amount_1=token1_amount + fee_not_collected_1, pool=self.pool, ) self.register_transaction(self.current_agent, burn_txn) self.register_transaction(self.current_agent, collect_txn)
def _position_id(self, token_id: str) -> str: token_id_index = self.agents_positions[self.current_agent][token_id]["token_id_index"] position = token_id + "_" + str(token_id_index) return position
[docs] def active_liquidity(self, token_id: Optional[str] = None) -> Optional[Decimal]: metric = f"{self.current_agent}.{self.protocol.name}.active_liquidity" if token_id is not None and token_id in self.agents_positions[self.current_agent]: position = self._position_id(token_id) metric += f':{{position="{position}"}}' series = self._get_obs_timeserie(metric, None) return series.iloc[-1] if series is not None and not series.empty else None
[docs] def position_liquidity(self, token_id: Optional[str] = None) -> Optional[Decimal]: metric = f"{self.current_agent}.{self.protocol.name}.liquidity" if token_id is not None and token_id in self.agents_positions[self.current_agent]: position = self._position_id(token_id) metric += f':{{position="{position}"}}' series = self._get_obs_timeserie(metric, None) return series.iloc[-1] if series is not None and not series.empty else None
[docs] def token_amount(self, token: str, token_id: Optional[str] = None) -> Optional[Decimal]: if token_id is None: metric = f'{self.current_agent}.{self.protocol.name}.token_amount:{{token="{token}"}}' else: position = self._position_id(token_id) # FIXME you should not build MetricName by hand!! metric = ( f'{self.current_agent}.{self.protocol.name}.token_amount:{{position="{position}", token="{token}"}}' ) series = self._get_obs_timeserie(metric, None) return series.iloc[-1] if not series.empty else None
[docs] def net_position(self, token_id: str) -> Optional[Decimal]: position = self._position_id(token_id) metric = f'{self.current_agent}.{self.protocol.name}.net_position:{{position="{position}"}}' series = self._get_obs_timeserie(metric, None) return series.iloc[-1] if not series.empty else None
[docs] def fees_collected(self, token: Optional[str] = None, token_id: Optional[str] = None) -> Optional[Decimal]: metric = f"{self.current_agent}.{self.protocol.name}.fees_collected" if token_id is not None and token is None: position = self._position_id(token_id) metric += f':{{position="{position}"}}' elif token_id is None and token is not None: metric += f':{{token="{token}"}}' elif token_id is not None and token is not None: position = self._position_id(token_id) metric += f':{{position="{position}", token="{token}"}}' series = self._get_obs_timeserie(metric, None) return series.iloc[-1] if not series.empty else None
[docs] def fees_not_collected(self, token: Optional[str] = None, token_id: Optional[str] = None) -> Optional[Decimal]: metric = f"{self.current_agent}.{self.protocol.name}.fees_not_collected" if token_id is not None and token is None and token_id in self.agents_positions[self.current_agent]: position = self._position_id(token_id) metric += f':{{position="{position}"}}' elif token_id is None and token is not None: metric += f':{{token="{token}"}}' elif token_id is not None and token is not None and token_id in self.agents_positions[self.current_agent]: position = self._position_id(token_id) metric += f':{{position="{position}", token="{token}"}}' series = self._get_obs_timeserie(metric, None) return series.iloc[-1] if not series.empty else None
[docs] def abs_impermanent_loss(self, token_id: Optional[str] = None) -> Optional[Decimal]: metric = self.current_agent + "." + self.protocol.name + ".abs_impermanent_loss" if token_id is not None and token_id in self.agents_positions[self.current_agent]: position = self._position_id(token_id) metric += f':{{position="{position}"}}' series = self._get_obs_timeserie(metric, None) return series.iloc[-1] if not series.empty else None
[docs] def perc_impermanent_loss(self, token_id: Optional[str] = None) -> Optional[Decimal]: metric = self.current_agent + "." + self.protocol.name + ".perc_impermanent_loss" if token_id is not None and token_id in self.agents_positions[self.current_agent]: position = self._position_id(token_id) metric += f':{{position="{position}"}}' series = self._get_obs_timeserie(metric, None) return series.iloc[-1] if not series.empty else None
[docs] def static_ptf_value(self, token_id: Optional[str] = None) -> Optional[Decimal]: metric = self.current_agent + "." + self.protocol.name + ".static_ptf_value" if token_id is not None and token_id in self.agents_positions[self.current_agent]: position = self._position_id(token_id) metric += f':{{position="{position}"}}' series = self._get_obs_timeserie(metric, None) return series.iloc[-1] if not series.empty else None
[docs] def permanent_loss(self, token_id: Optional[str] = None) -> Optional[Decimal]: metric = self.current_agent + "." + self.protocol.name + ".permanent_loss" if token_id is not None and token_id in self.agents_positions[self.current_agent]: position = self._position_id(token_id) metric += f':{{position="{position}"}}' series = self._get_obs_timeserie(metric, None) return series.iloc[-1] if not series.empty else None
[docs] def loss_versus_rebalancing(self, token_id: Optional[str] = None) -> Optional[Decimal]: metric = self.current_agent + "." + self.protocol.name + ".loss_versus_rebalancing" if token_id is not None and token_id in self.agents_positions[self.current_agent]: position = self._position_id(token_id) metric += f':{{position="{position}"}}' series = self._get_obs_timeserie(metric, None) return series.iloc[-1] if not series.empty else None
[docs] def total_fees_relative_to_lvr(self, token_id: Optional[str] = None) -> Optional[Decimal]: metric = self.current_agent + "." + self.protocol.name + ".total_fees_relative_to_lvr" if token_id is not None and token_id in self.agents_positions[self.current_agent]: position = self._position_id(token_id) metric += f':{{position="{position}"}}' series = self._get_obs_timeserie(metric, None) return series.iloc[-1] if not series.empty else None
[docs] def position_bounds(self, token_id: str) -> tuple[Optional[Decimal], Optional[Decimal]]: position = self._position_id(token_id) upper_bound = self._get_obs_timeserie( f'{self.current_agent}.{self.protocol.name}.upper_bound_price:{{position="{position}"}}', None ).iloc[-1] lower_bound = self._get_obs_timeserie( f'{self.current_agent}.{self.protocol.name}.lower_bound_price:{{position="{position}"}}', None ).iloc[-1] return lower_bound, upper_bound