Source code for nqs_sdk.bindings.spots.wgn_spot

import numpy as np

from nqs_sdk.bindings.spots.spot_generator import SpotGenerator


[docs] class WGNSpotGenerator(SpotGenerator):
[docs] def __init__(self, token_pairs: list[tuple[str, str]], s0: float, mean: float, vol: float) -> None: super().__init__(token_pairs) n = len(token_pairs) self.s0 = np.array(s0) if isinstance(s0, (list, tuple, np.ndarray)) else np.full(n, s0) self.mean = np.array(mean) if isinstance(mean, (list, tuple, np.ndarray)) else np.full(n, mean) self.vol = np.array(vol) if isinstance(vol, (list, tuple, np.ndarray)) else np.full(n, vol)
[docs] def generate_spot_timestamps(self, ts: list[int]) -> list[tuple[list[int], list[float]]]: """ Generate spot price trajectories for each token pair using a fully vectorized geometric Brownian motion model. The noise computation is vectorized over time and token pairs. Args: timestamps: List of timestamps for price generation Returns: A list of corresponding spot prices for each timestamp """ # Convert timestamps to numpy array and calculate time differences timestamps = np.array(ts) dt = np.diff(timestamps, prepend=timestamps[0]) # Time intervals between steps t = len(timestamps) n = len(self.token_pairs) # Generate a T x n noise matrix. # For each time step, the noise's standard deviation is sqrt(dt); we reshape for broadcasting. noise = np.random.normal(loc=0, scale=np.sqrt(dt)[:, None], size=(t, n)) # Compute spot prices in a single vectorized step. # For token pair i and time t: # spot(t,i) = s0[i] * exp((mean[i] - 0.5*vol[i]**2)*dt + vol[i]*noise(t,i)) spots = self.s0[None, :] * np.exp( (self.mean[None, :] - 0.5 * self.vol[None, :] ** 2) * dt[:, None] + self.vol[None, :] * noise ) return [(ts, [float(price) for price in pair_prices]) for pair_prices in spots.T]