Source code for verbs_examples.agents.uniswap_agent

"""
Agent that trades token on uniswap to follow an external market
"""

import math
from typing import List, Tuple

import numpy as np
import verbs
from scipy.optimize import root_scalar

TICK_SPACING = {100: 1, 500: 10, 3000: 60, 10000: 200}


[docs] def tick_from_price(sqrt_price_x96: int, uniswap_fee: int) -> int: """ Get tick from price and fee Parameters ---------- sqrt_price_x96: int Square root of price times 2\ :sup:`96` uniswap_fee: int Uniswap fee. Possible values [100,500,3000,10000] Returns ------- int Lower tick of input price """ price = (sqrt_price_x96 / 2**96) ** 2 tick = math.floor(math.log(price, 1.0001)) tick_lower = tick - (tick % TICK_SPACING[uniswap_fee]) return tick_lower
[docs] def price_from_tick(tick: int) -> int: """ Get price from tick Parameters ---------- tick: int Lower tick of input price Returns ------- int Square root of price times 2\ :sup:`96` """ sqrt_price_x96 = np.sqrt(1.0001**tick) * 2**96 return sqrt_price_x96
[docs] class Gbm: """ Geometric brownian motion modelling the price of two tokens Notes ----- We assume that token B is some stablecoin so its price remains constant. """
[docs] def __init__( self, mu: float, sigma: float, token_a_price: int, token_b_price: int, dt: float ): """ Parameters ---------- mu: float Drift of GBM sigma: float Volatility of GBM token_a_price: int Initial price of token A token_b_price: int Initial price of token B dt: float Time step of time discretisation for the SDE solver scheme """ self.mu = mu self.sigma = sigma self.token_a_price = token_a_price self.token_b_price = token_b_price self.token_a_price_with_impact = token_a_price self.dt = dt
[docs] def update(self, rng: np.random.Generator, price_impact: float): """ Update Gbm Update GBM price using: * :math:`P^a_{t+dt} = P^a_t * exp((\\mu-0.5*\\sigma^2)dt + \sigma * (W_{t+dt} - W_{t}))` * :math:`P^{a, impact}_{t+dt} = P^a_{t+dt} + price_impact` * :math:`P^b` is constant Notes ----- We consider an impact on the price of token A. This impact can be modelled in different ways, e.g. as a transient price impact given by the trades. Parameters ---------- rng: np.random.Generator Numpy random generator, used for any random sampling to ensure determinism of the simulation. price_impact: float Network/EVM that the simulation interacts with. """ z = rng.normal() new_price_a = self.token_a_price * np.exp( (self.mu - 0.5 * self.sigma**2) * self.dt + self.sigma * np.sqrt(self.dt) * z ) new_price_a_w_impact = new_price_a + price_impact # update price values self.token_a_price = new_price_a self.token_a_price_with_impact = new_price_a_w_impact
[docs] def get_sqrt_price_token_a_x96(self) -> float: """ Get price of token A in terms of token B Notes ----- We return the square root of the price times 2\ :sup:`96` for a fair comparison with the price values returned by the Uniswap contract. Returns ------- float Square root of the price of token A in terms of token B times 2\ :sup:`96` """ price = self.token_a_price_with_impact / self.token_b_price return np.sqrt(price) * 2**96
[docs] def get_sqrt_price_token_b_x96(self): """ Get price of token B in terms of token A Notes ----- We return the square root of the price times 2\ :sup:`96` for a fair comparison with the price values returned by the Uniswap contract. Returns ------- float Square root of the price of token B in terms of token A times 2\ :sup:`96` """ price = self.token_b_price / self.token_a_price_with_impact return np.sqrt(price) * 2**96
def get_price_token_a(self): return self.token_a_price_with_impact / self.token_b_price
[docs] class BaseUniswapAgent: """ Base agent that makes trades in Uniswap """
[docs] def __init__( self, env, i: int, swap_router_abi, uniswap_pool_abi, quoter_abi, fee: int, swap_router_address: bytes, uniswap_pool_address: bytes, quoter_address: bytes, # token A is considered to be the risky asset token_a_address: bytes, # token B is considered to be less risky / stablecoin token_b_address: bytes, ): """ Initialise the Uniswap agent and create the corresponding account in the EVM. The agent stores the ABIs of the Uniswap contracts and the token contracts that they will be interacting with. ABIs are previously loaded using the function :py:func:`verbs.abi.load_abi`. Parameters ---------- env: verbs.types.Env Simulation environment i: int Agent index in the simulation swap_router_abi: type abi of the Uniswap v3 SwapRouter contract uniswap_pool_abi: type abi of the Uniswap v3 pool contract quoter_abi: type abi of the Uniswap v3 QuoterV2 contract fee: int Fee tier of the Uniswap v3 pool for the pair (token_a, token_b) swap_router_address: bytes Address of the SwapRouter contract uniswap_pool_address: bytes Address of Uniswap v3 pool for the pair (token_a, token_b) quoter_address: bytes Address of the QuoterV2 contract token_a_address: bytes Address of token_a token_b_address: bytes Address of token_b """ self.address = verbs.utils.int_to_address(i) env.create_account(self.address, int(1e25)) self.swap_router_abi = swap_router_abi self.uniswap_pool_abi = uniswap_pool_abi self.quoter_abi = quoter_abi self.swap_router_address = swap_router_address self.uniswap_pool_address = uniswap_pool_address self.quoter_address = quoter_address self.uniswap_fee = fee self.weth_address = token_a_address self.dai_address = token_b_address self.token_b = token_b_address # stablecoin. self.token0_address = self.uniswap_pool_abi.token0.call( env, self.address, self.uniswap_pool_address, [] )[0][0] self.token1_address = self.uniswap_pool_abi.token1.call( env, self.address, self.uniswap_pool_address, [] )[0][0] self.fee = fee
[docs] def get_sqrt_price_x96_uniswap(self, env) -> int: """ Get sqrt price from uniswap pool Uniswap returns price of token0 in terms of token1 Notes ----- Uniswap sorts of token0 and token1 by their addresses. Parameters ---------- env: verbs.types.Env Simulation environment Returns ------- int Square root of the price times 2\ :sup:`96` of token0 in terms of token1 """ slot0 = self.uniswap_pool_abi.slot0.call( env, self.address, self.uniswap_pool_address, [] )[0] sqrt_price_uniswap_x96 = slot0[0] return sqrt_price_uniswap_x96
[docs] def get_swap_size_to_increase_uniswap_price( self, env, sqrt_target_price_x96: int, sqrt_price_uniswap_x96: int, liquidity: int, exact: bool = True, ) -> verbs.types.Transaction: """ Get swap parameters to match target price Gets the swap parameters so that, after the swap, the price in Uniswap is the same as the target price. We know that in Uniswap v2 (or v3 if there is not a tick range change), we have :math:`L = \\frac{\\Delta y}{\\Delta \\sqrt{P}}` where y is the numeraire (in our case the debt asset), and P is the price of the collateral in terms of the numeraire. If there is a tick range and ``exact=True``, the agent performs an iterative calculation to find the right trade. References ---------- #. https://atiselsts.github.io/pdfs/uniswap-v3-liquidity-math.pdf Parameters ---------- env: verbs.types.Env Simulation environment sqrt_target_price_x96: int Sqrt of target price times 2\ :sup:`96` sqrt_price_uniswap_x96: int Sqrt of current uniswap price times 2\ :sup:`96` liquidity: int Liquidity of Uniswap in the current tick range exact: bool Boolean indicating whether to perform the iterative calculation to find the right trade. Returns ------- verbs.types.Transaction Trade transaction """ change_sqrt_price_x96 = sqrt_target_price_x96 - sqrt_price_uniswap_x96 change_token_1 = int(liquidity * change_sqrt_price_x96 / 2**96) if change_token_1 == 0: return None def _quote_price(change_token_1): quote = self.quoter_abi.quoteExactInputSingle.call( env, self.address, self.quoter_address, [ ( self.token1_address, self.token0_address, int(change_token_1), self.fee, 0, ) ], )[0] quoted_price = quote[1] return quoted_price if exact: # calculate the exact trade to match prices # this calculation will take into account # different liquidities in different tick ranges try: sol = root_scalar( lambda x: _quote_price(x) - sqrt_target_price_x96, x0=change_token_1 // 2, method="newton", maxiter=5, ) change_token_1 = sol.root except: # noqa: E722 return None swap = self.swap_router_abi.exactInputSingle.transaction( self.address, self.swap_router_address, [ ( self.token1_address, self.token0_address, self.fee, self.address, 10**32, int(change_token_1), 0, 0, ) ], ) return swap
[docs] def get_swap_size_to_decrease_uniswap_price( self, env, sqrt_target_price_x96: int, sqrt_price_uniswap_x96: int, liquidity: int, exact: bool = True, ) -> verbs.types.Transaction: """ Get swap parameters to match target price Gets the swap parameters so that, after the swap, the price in Uniswap is the same as the target price. We know that in Uniswap v3 (or v2), we have :math:`L = \\frac{\\Delta y}{\\Delta \\sqrt{P}}` where y is the numeraire (in our case the debt asset), and P is the price of the collateral in terms of the numeraire. If there is a tick range and ``exact=True``, the agent performs an iterative calculation to find the right trade. References ---------- #. https://atiselsts.github.io/pdfs/uniswap-v3-liquidity-math.pdf Parameters ---------- env: verbs.types.Env Simulation environment sqrt_target_price_x96: int Sqrt of target price times 2\ :sup:`96` sqrt_price_uniswap_x96: int Sqrt of current uniswap price times 2\ :sup:`96` liquidity: int Liquidity of Uniswap in the current tick range exact: bool Boolean indicating whether to perform the iterative calculation to find the right trade. Returns ------- verbs.types.Transaction Trade transaction """ change_sqrt_price_x96 = sqrt_price_uniswap_x96 - sqrt_target_price_x96 change_token_1 = int(liquidity * change_sqrt_price_x96 / 2**96) if change_token_1 == 0: return None def _quote_price(change_token_1): quote = self.quoter_abi.quoteExactOutputSingle.call( env, self.address, self.quoter_address, [ ( self.token0_address, self.token1_address, int(change_token_1), self.fee, 0, ) ], )[0] quoted_price = quote[1] return quoted_price if exact: # calculate the exact trade to match prices # this calculation will take into account # different liquidities in different tick ranges try: sol = root_scalar( lambda x: _quote_price(x) - sqrt_target_price_x96, method="newton", x0=change_token_1 // 2, maxiter=5, ) change_token_1 = sol.root except: # noqa: E722 return None swap = self.swap_router_abi.exactOutputSingle.transaction( self.address, self.swap_router_address, [ ( self.token0_address, self.token1_address, self.fee, self.address, 10**32, int(change_token_1), 10**32, 0, ) ], ) return swap
[docs] class UniswapAgent(BaseUniswapAgent): """ Agent that makes trades in Uniswap and the external market in order to make arbitrage """
[docs] def __init__( self, env, i: int, swap_router_abi, uniswap_pool_abi, quoter_abi, fee: int, swap_router_address: bytes, uniswap_pool_address: bytes, quoter_address: bytes, # token A is considered to be the risky asset token_a_address: bytes, # token B is considered to be less risky / stablecoin token_b_address: bytes, mu: float, sigma: float, dt: float, ): """ Initialise the Uniswap agent and create the account The agent stores the ABIs of the Uniswap contracts and the token contracts that they will be interacting with. ABIs are previously loaded using the function :py:func:`verbs.abi.load_abi`. The agent also has access to an external market, modelled by a Gbm, that is set as an attribute of the agents. Parameters ---------- env: verbs.types.Env Simulation environment i: int Agent index in the simulation swap_router_abi: type abi of the Uniswap v3 SwapRouter contract uniswap_pool_abi: type abi of the Uniswap v3 pool contract quoter_abi: type abi of the Uniswap v3 QuoterV2 contract fee: int Fee tier of the Uniswap v3 pool for the pair (token_a, token_b) swap_router_address: bytes Address of the SwapRouter contract uniswap_pool_address: bytes Addres of Uniswap v3 pool for the pair (token_a, token_b) quoter_address: bytes Address of the QuoterV2 contract token_a_address: bytes Address of token_a token_b_address: bytes Address of token_b mu: float Drift of the Gbm sigma: float Volatility of the Gbm dt: float Time step of time discretisation for the Gbm solver. """ super().__init__( env=env, i=i, swap_router_abi=swap_router_abi, uniswap_pool_abi=uniswap_pool_abi, quoter_abi=quoter_abi, swap_router_address=swap_router_address, uniswap_pool_address=uniswap_pool_address, quoter_address=quoter_address, fee=fee, token_a_address=token_a_address, token_b_address=token_b_address, ) # External market model. # we initialise it at the same price as the Uniswap price # Uniswap returns price of token0 in terms of token1 sqrt_price_uniswap_x96 = self.get_sqrt_price_x96_uniswap(env) if self.token_b == self.token1_address: self.init_token_a_price = (sqrt_price_uniswap_x96 / 2**96) ** 2 token_b_price = 1 else: self.init_token_a_price = (2**96 / sqrt_price_uniswap_x96) ** 2 token_b_price = 1 self.external_market = Gbm( mu=mu, sigma=sigma, token_a_price=self.init_token_a_price, token_b_price=token_b_price, dt=dt, ) # Variables to calculate price impact of Uniswap on the external exchange self.dt = dt self.beta = 2.0 self.transient_impact = 0 # step of simulator self.step = 0
[docs] def update(self, rng: np.random.Generator, env) -> List[verbs.types.Transaction]: """ Update the state of the agent and returns list of transactions according to their policy. The Uniswap agent will * Check the price in the external market and in the Uniswap pool. * Calculate the trade to do in Uniswap in order to realize a profit. Parameters ---------- rng: np.random.Generator Numpy random generator, used for any random sampling to ensure determinism of the simulation. env: verbs.types.Env Network/EVM that the simulation interacts with. Returns ------- list List of transactions to be processed in the next block of the simulation. This can be an empty list if the agent is not submitting any transactions. """ # get sqrt price from uniswap pool. Uniswap returns price of # token0 in terms of token1 sqrt_price_uniswap_x96 = self.get_sqrt_price_x96_uniswap(env) # We assume that trades on Uniswap have a price impact on the external # exchange. This is accumulated with an exponential decay if self.step > 0: current_price_impact = self.get_price_impact_in_external_market(env) self.transient_impact = ( np.exp(-self.beta * self.dt) * self.transient_impact + current_price_impact ) # get liquidity from uniswap pool liquidity = self.uniswap_pool_abi.liquidity.call( env, self.address, self.uniswap_pool_address, [] )[0][0] # external market update self.external_market.update(rng, 0.1 * self.transient_impact) if self.token_b == self.token1_address: sqrt_price_external_market_x96 = ( self.external_market.get_sqrt_price_token_a_x96() ) else: sqrt_price_external_market_x96 = ( self.external_market.get_sqrt_price_token_b_x96() ) # Find encoded swap params so that price of uniswap after # swap matches the price of the external market # sqrt_price_external_market > sqrt_price_uniswap_x96, # the uniswap agent wants to buy collateral asset # (and sell debt asset) to increase the price of Uniswap # sqrt_price_external_market < sqrt_price_uniswap_x96, # the uniswap agent wants to sell collateral asset # (and buy debt asset) to decrease the price of Uniswap if sqrt_price_external_market_x96 > sqrt_price_uniswap_x96: swap_call = self.get_swap_size_to_increase_uniswap_price( env=env, sqrt_target_price_x96=sqrt_price_external_market_x96, sqrt_price_uniswap_x96=sqrt_price_uniswap_x96, liquidity=liquidity, ) else: swap_call = self.get_swap_size_to_decrease_uniswap_price( env=env, sqrt_target_price_x96=sqrt_price_external_market_x96, sqrt_price_uniswap_x96=sqrt_price_uniswap_x96, liquidity=liquidity, ) self.step += 1 if swap_call is not None: return [swap_call] else: return []
[docs] def record(self, env) -> Tuple[float, float]: """ Record the state of the agent This method is called at the end of each step for all agents. It should return any data to be recorded over the course of the simulation. Parameters ---------- env: verbs.types.Env Network/EVM that the simulation interacts with. Returns ------- tuple[float, float] Tuple containing: - Price in Uniswap of token0 in terms of token1 - Price in the external market of token0 in terms of token1 """ # Get sqrt price from uniswap pool. Uniswap returns price of # token0 in terms of token1 sqrt_price_uniswap_x96 = self.get_sqrt_price_x96_uniswap(env) if self.token_b == self.token1_address: sqrt_price_external_market_x96 = ( self.external_market.get_sqrt_price_token_a_x96() ) else: sqrt_price_external_market_x96 = ( self.external_market.get_sqrt_price_token_b_x96() ) sqrt_price_uniswap = sqrt_price_uniswap_x96 / 2**96 sqrt_price_external_market = sqrt_price_external_market_x96 / (2**96) return (sqrt_price_uniswap**2, sqrt_price_external_market**2)
[docs] def get_price_impact_in_external_market(self, env) -> float: """ Estimate Uniswap trade impact on the external market We assume that a trade in Uniswap has transient impact on the external exchange. Parameters ---------- env: verbs.types.Env Network/EVM that the simulation interacts with. Returns ------- float Transient impact """ sqrt_price_uniswap_x96 = self.get_sqrt_price_x96_uniswap(env) if self.token_b == self.token1_address: token_a_price_uniswap = (sqrt_price_uniswap_x96 / 2**96) ** 2 else: token_a_price_uniswap = (2**96 / sqrt_price_uniswap_x96) ** 2 token_a_price_external = self.external_market.get_price_token_a() return token_a_price_uniswap - token_a_price_external
[docs] class DummyUniswapAgent(BaseUniswapAgent): """ Dummy uniswap agent used for cache generation Uniswap agent that queries the EVM database for a wide range of Uniswap price ticks. Useful to initialise the cache of a simulation """
[docs] def __init__( self, env, i: int, swap_router_abi, uniswap_pool_abi, quoter_abi, fee: int, swap_router_address: bytes, uniswap_pool_address: bytes, quoter_address: bytes, # token A is considered to be the risky asset token_a_address: bytes, # token B is considered to be less risky / stablecoin token_b_address: bytes, sim_n_steps: int, **kwargs ): """ Initialise the Uniswap agent and create the corresponding account in the EVM. The agent stores the ABIs of the Uniswap contracts and the token contracts that they will be interacting with. ABIs are previously loaded using the function :py:func:`verbs.abi.load_abi`. The agent also has access to an external market, modelled by a Gbm, that is set as an attribute of the agents. Notes ----- This agent should only be used in a simulation to initialise the Cache of the EVM database. The drift and the volatility of the external market are artificially calibrated in order for the agent to explore a wide range of Uniswap price ticks and thus find out the right storage slots to be saved in the Cache. Parameters ---------- env: verbs.types.Env Simulation environment i: int Agent index in the simulation swap_router_abi: type abi of the Uniswap v3 SwapRouter contract uniswap_pool_abi: type abi of the Uniswap v3 pool contract quoter_abi: type abi of the Uniswap v3 QuoterV2 contract fee: int Fee tier of the Uniswap v3 pool for the pair (token_a, token_b) swap_router_address: bytes Address of the SwapRouter contract uniswap_pool_address: bytes Addres of Uniswap v3 pool for the pair (token_a, token_b) quoter_address: bytes Address of the QuoterV2 contract token_a_address: bytes Address of token_a token_b_address: bytes Address of token_b mu: float Drift of the Gbm sigma: float Volatility of the Gbm dt: float Time step of time discretisation for the Gbm solver. """ # Calibrate mu and sigma in order to explore Uniswap pool # storage values for simulation super().__init__( env=env, i=i, swap_router_abi=swap_router_abi, uniswap_pool_abi=uniswap_pool_abi, quoter_abi=quoter_abi, fee=fee, swap_router_address=swap_router_address, uniswap_pool_address=uniswap_pool_address, quoter_address=quoter_address, token_a_address=token_a_address, token_b_address=token_b_address, ) self.sim_n_steps = sim_n_steps slot0 = self.uniswap_pool_abi.slot0.call( env, self.address, self.uniswap_pool_address, [] )[0] init_tick = slot0[1] self.ticks_to_explore = [ init_tick + (i + 1) * TICK_SPACING[fee] for i in range(sim_n_steps // 2) ] self.ticks_to_explore.extend( [init_tick - (i + 1) * TICK_SPACING[fee] for i in range(sim_n_steps // 2)] ) self.step = 0
[docs] def update(self, rng: np.random.Generator, env) -> List[verbs.types.Transaction]: """ Update the state of the agent Makes an exploratory update by manually changing the drift of the external market. Parameters ---------- rng: np.random.Generator Numpy random generator, used for any random sampling to ensure determinism of the simulation. env: verbs.types.Env Network/EVM that the simulation interacts with. Returns ------- list List of transactions to be processed in the next block of the simulation. This can be an empty list if the agent is not submitting any transactions. """ sqrt_target_price_x96 = price_from_tick(self.ticks_to_explore[self.step]) sqrt_price_uniswap_x96 = self.get_sqrt_price_x96_uniswap(env) # get liquidity from uniswap pool liquidity = self.uniswap_pool_abi.liquidity.call( env, self.address, self.uniswap_pool_address, [] )[0][0] if sqrt_target_price_x96 > sqrt_price_uniswap_x96: swap_call = self.get_swap_size_to_increase_uniswap_price( env=env, sqrt_target_price_x96=sqrt_target_price_x96, sqrt_price_uniswap_x96=sqrt_price_uniswap_x96, liquidity=liquidity, ) else: swap_call = self.get_swap_size_to_decrease_uniswap_price( env=env, sqrt_target_price_x96=sqrt_target_price_x96, sqrt_price_uniswap_x96=sqrt_price_uniswap_x96, liquidity=liquidity, ) self.step += 1 if swap_call is not None: return [swap_call] else: return []
[docs] def record(self, env) -> Tuple[float, float]: """ Record the state of the agent This method is called at the end of each step for all agents. It should return any data to be recorded over the course of the simulation. Parameters ---------- env: verbs.types.Env Network/EVM that the simulation interacts with. Returns ------- tuple[float, float] Tuple containing: - Price in Uniswap of token0 in terms of token1 - Price in the external market of token0 in terms of token1 """ # Get sqrt price from uniswap pool. Uniswap returns price of # token0 in terms of token1 sqrt_target_price_x96 = price_from_tick(self.ticks_to_explore[self.step - 1]) sqrt_price_uniswap_x96 = self.get_sqrt_price_x96_uniswap(env) return sqrt_price_uniswap_x96**2, sqrt_target_price_x96**2