Source code for bittensor.mock.subtensor_mock

# The MIT License (MIT)
# Copyright © 2022-2023 Opentensor Foundation

# Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
# documentation files (the “Software”), to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software,
# and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

# The above copyright notice and this permission notice shall be included in all copies or substantial portions of
# the Software.


from random import randint
from types import SimpleNamespace
from typing import Any, Dict, List, Optional, Tuple, TypedDict, Union
from unittest.mock import MagicMock
from dataclasses import dataclass
from abc import abstractclassmethod
from import Mapping

from hashlib import sha256
from ..wallet import wallet

from ..chain_data import (
from ..errors import ChainQueryError
from ..subtensor import Subtensor
from ..utils import RAOPERTAO, U16_NORMALIZED_FLOAT
from ..utils.balance import Balance
from ..utils.registration import POWSolution

from typing import TypedDict

# Mock Testing Constant

[docs] class AxonServeCallParams(TypedDict): """ Axon serve chain call parameters. """ version: int ip: int port: int ip_type: int netuid: int
[docs] class PrometheusServeCallParams(TypedDict): """ Prometheus serve chain call parameters. """ version: int ip: int port: int ip_type: int netuid: int
BlockNumber = int
[docs] class InfoDict(Mapping):
[docs] @abstractclassmethod def default(cls): raise NotImplementedError
[docs] def __getitem__(self, key): return getattr(self, key)
[docs] def __setitem__(self, key, value): return setattr(self, key, value)
[docs] def __iter__(self): return iter(self.__dict__)
[docs] def __len__(self): return len(self.__dict__)
[docs] @dataclass class AxonInfoDict(InfoDict): block: int version: int ip: int # integer representation of ip address port: int ip_type: int protocol: int placeholder1: int # placeholder for future use placeholder2: int
[docs] @classmethod def default(cls): return cls( block=0, version=0, ip=0, port=0, ip_type=0, protocol=0, placeholder1=0, placeholder2=0, )
[docs] @dataclass class PrometheusInfoDict(InfoDict): block: int version: int ip: int # integer representation of ip address port: int ip_type: int
[docs] @classmethod def default(cls): return cls(block=0, version=0, ip=0, port=0, ip_type=0)
[docs] @dataclass class MockSubtensorValue: value: Optional[Any]
[docs] class MockMapResult: records: Optional[List[Tuple[MockSubtensorValue, MockSubtensorValue]]] def __init__( self, records: Optional[ List[Tuple[Union[Any, MockSubtensorValue], Union[Any, MockSubtensorValue]]] ] = None, ): _records = [ ( ( MockSubtensorValue(value=record[0]), MockSubtensorValue(value=record[1]), ) # Make sure record is a tuple of MockSubtensorValue (dict with value attr) if not ( isinstance(record, tuple) and all( isinstance(item, dict) and hasattr(item, "value") for item in record ) ) else record ) for record in records ] self.records = _records
[docs] def __iter__(self): return iter(self.records)
[docs] class MockSystemState(TypedDict): Account: Dict[str, Dict[int, int]] # address -> block -> balance
[docs] class MockSubtensorState(TypedDict): Rho: Dict[int, Dict[BlockNumber, int]] # netuid -> block -> rho Kappa: Dict[int, Dict[BlockNumber, int]] # netuid -> block -> kappa Difficulty: Dict[int, Dict[BlockNumber, int]] # netuid -> block -> difficulty ImmunityPeriod: Dict[ int, Dict[BlockNumber, int] ] # netuid -> block -> immunity_period ValidatorBatchSize: Dict[ int, Dict[BlockNumber, int] ] # netuid -> block -> validator_batch_size Active: Dict[int, Dict[BlockNumber, bool]] # (netuid, uid), block -> active Stake: Dict[str, Dict[str, Dict[int, int]]] # (hotkey, coldkey) -> block -> stake Delegates: Dict[str, Dict[int, float]] # address -> block -> delegate_take NetworksAdded: Dict[int, Dict[BlockNumber, bool]] # netuid -> block -> added
[docs] class MockChainState(TypedDict): System: MockSystemState SubtensorModule: MockSubtensorState
[docs] class MockSubtensor(Subtensor): """ A Mock Subtensor class for running tests. This should mock only methods that make queries to the chain. e.g. We mock `Subtensor.query_subtensor` instead of all query methods. This class will also store a local (mock) state of the chain. """ chain_state: MockChainState block_number: int
[docs] @classmethod def reset(cls) -> None: __GLOBAL_MOCK_STATE__.clear() _ = cls()
[docs] def setup(self) -> None: if not hasattr(self, "chain_state") or getattr(self, "chain_state") is None: self.chain_state = { "System": {"Account": {}}, "Balances": {"ExistentialDeposit": {0: 500}}, "SubtensorModule": { "NetworksAdded": {}, "Rho": {}, "Kappa": {}, "Difficulty": {}, "ImmunityPeriod": {}, "ValidatorBatchSize": {}, "ValidatorSequenceLength": {}, "ValidatorEpochsPerReset": {}, "ValidatorEpochLength": {}, "MaxAllowedValidators": {}, "MinAllowedWeights": {}, "MaxWeightLimit": {}, "SynergyScalingLawPower": {}, "ScalingLawPower": {}, "SubnetworkN": {}, "MaxAllowedUids": {}, "NetworkModality": {}, "BlocksSinceLastStep": {}, "Tempo": {}, "NetworkConnect": {}, "EmissionValues": {}, "Burn": {}, "Active": {}, "Uids": {}, "Keys": {}, "Owner": {}, "IsNetworkMember": {}, "LastUpdate": {}, "Rank": {}, "Emission": {}, "Incentive": {}, "Consensus": {}, "Trust": {}, "ValidatorTrust": {}, "Dividends": {}, "PruningScores": {}, "ValidatorPermit": {}, "Weights": {}, "Bonds": {}, "Stake": {}, "TotalStake": {0: 0}, "TotalIssuance": {0: 0}, "TotalHotkeyStake": {}, "TotalColdkeyStake": {}, "TxRateLimit": {0: 0}, # No limit "Delegates": {}, "Axons": {}, "Prometheus": {}, "SubnetOwner": {}, "Commits": {}, "AdjustmentAlpha": {}, "BondsMovingAverage": {}, }, } self.block_number = 0 = "mock" self.chain_endpoint = "mock_endpoint" self.substrate = MagicMock()
def __init__(self, *args, **kwargs) -> None: self.__dict__ = __GLOBAL_MOCK_STATE__ if not hasattr(self, "chain_state") or getattr(self, "chain_state") is None: self.setup()
[docs] def get_block_hash(self, block_id: int) -> str: return "0x" + sha256(str(block_id).encode()).hexdigest()[:64]
[docs] def create_subnet(self, netuid: int) -> None: subtensor_state = self.chain_state["SubtensorModule"] if netuid not in subtensor_state["NetworksAdded"]: # Per Subnet subtensor_state["Rho"][netuid] = {} subtensor_state["Rho"][netuid][0] = 10 subtensor_state["Kappa"][netuid] = {} subtensor_state["Kappa"][netuid][0] = 32_767 subtensor_state["Difficulty"][netuid] = {} subtensor_state["Difficulty"][netuid][0] = 10_000_000 subtensor_state["ImmunityPeriod"][netuid] = {} subtensor_state["ImmunityPeriod"][netuid][0] = 4096 subtensor_state["ValidatorBatchSize"][netuid] = {} subtensor_state["ValidatorBatchSize"][netuid][0] = 32 subtensor_state["ValidatorSequenceLength"][netuid] = {} subtensor_state["ValidatorSequenceLength"][netuid][0] = 256 subtensor_state["ValidatorEpochsPerReset"][netuid] = {} subtensor_state["ValidatorEpochsPerReset"][netuid][0] = 60 subtensor_state["ValidatorEpochLength"][netuid] = {} subtensor_state["ValidatorEpochLength"][netuid][0] = 100 subtensor_state["MaxAllowedValidators"][netuid] = {} subtensor_state["MaxAllowedValidators"][netuid][0] = 128 subtensor_state["MinAllowedWeights"][netuid] = {} subtensor_state["MinAllowedWeights"][netuid][0] = 1024 subtensor_state["MaxWeightLimit"][netuid] = {} subtensor_state["MaxWeightLimit"][netuid][0] = 1_000 subtensor_state["SynergyScalingLawPower"][netuid] = {} subtensor_state["SynergyScalingLawPower"][netuid][0] = 50 subtensor_state["ScalingLawPower"][netuid] = {} subtensor_state["ScalingLawPower"][netuid][0] = 50 subtensor_state["SubnetworkN"][netuid] = {} subtensor_state["SubnetworkN"][netuid][0] = 0 subtensor_state["MaxAllowedUids"][netuid] = {} subtensor_state["MaxAllowedUids"][netuid][0] = 4096 subtensor_state["NetworkModality"][netuid] = {} subtensor_state["NetworkModality"][netuid][0] = 0 subtensor_state["BlocksSinceLastStep"][netuid] = {} subtensor_state["BlocksSinceLastStep"][netuid][0] = 0 subtensor_state["Tempo"][netuid] = {} subtensor_state["Tempo"][netuid][0] = 99 # subtensor_state['NetworkConnect'][netuid] = {} # subtensor_state['NetworkConnect'][netuid][0] = {} subtensor_state["EmissionValues"][netuid] = {} subtensor_state["EmissionValues"][netuid][0] = 0 subtensor_state["Burn"][netuid] = {} subtensor_state["Burn"][netuid][0] = 0 subtensor_state["Commits"][netuid] = {} # Per-UID/Hotkey subtensor_state["Uids"][netuid] = {} subtensor_state["Keys"][netuid] = {} subtensor_state["Owner"][netuid] = {} subtensor_state["LastUpdate"][netuid] = {} subtensor_state["Active"][netuid] = {} subtensor_state["Rank"][netuid] = {} subtensor_state["Emission"][netuid] = {} subtensor_state["Incentive"][netuid] = {} subtensor_state["Consensus"][netuid] = {} subtensor_state["Trust"][netuid] = {} subtensor_state["ValidatorTrust"][netuid] = {} subtensor_state["Dividends"][netuid] = {} subtensor_state["PruningScores"][netuid] = {} subtensor_state["PruningScores"][netuid][0] = {} subtensor_state["ValidatorPermit"][netuid] = {} subtensor_state["Weights"][netuid] = {} subtensor_state["Bonds"][netuid] = {} subtensor_state["Axons"][netuid] = {} subtensor_state["Prometheus"][netuid] = {} subtensor_state["NetworksAdded"][netuid] = {} subtensor_state["NetworksAdded"][netuid][0] = True subtensor_state["AdjustmentAlpha"][netuid] = {} subtensor_state["AdjustmentAlpha"][netuid][0] = 1000 subtensor_state["BondsMovingAverage"][netuid] = {} subtensor_state["BondsMovingAverage"][netuid][0] = 1000 else: raise Exception("Subnet already exists")
[docs] def set_difficulty(self, netuid: int, difficulty: int) -> None: subtensor_state = self.chain_state["SubtensorModule"] if netuid not in subtensor_state["NetworksAdded"]: raise Exception("Subnet does not exist") subtensor_state["Difficulty"][netuid][self.block_number] = difficulty
[docs] def _register_neuron(self, netuid: int, hotkey: str, coldkey: str) -> int: subtensor_state = self.chain_state["SubtensorModule"] if netuid not in subtensor_state["NetworksAdded"]: raise Exception("Subnet does not exist") subnetwork_n = self._get_most_recent_storage( subtensor_state["SubnetworkN"][netuid] ) if subnetwork_n > 0 and any( self._get_most_recent_storage(subtensor_state["Keys"][netuid][uid]) == hotkey for uid in range(subnetwork_n) ): # already_registered raise Exception("Hotkey already registered") else: # Not found if subnetwork_n >= self._get_most_recent_storage( subtensor_state["MaxAllowedUids"][netuid] ): # Subnet full, replace neuron randomly uid = randint(0, subnetwork_n - 1) else: # Subnet not full, add new neuron # Append as next uid and increment subnetwork_n uid = subnetwork_n subtensor_state["SubnetworkN"][netuid][self.block_number] = ( subnetwork_n + 1 ) subtensor_state["Stake"][hotkey] = {} subtensor_state["Stake"][hotkey][coldkey] = {} subtensor_state["Stake"][hotkey][coldkey][self.block_number] = 0 subtensor_state["Uids"][netuid][hotkey] = {} subtensor_state["Uids"][netuid][hotkey][self.block_number] = uid subtensor_state["Keys"][netuid][uid] = {} subtensor_state["Keys"][netuid][uid][self.block_number] = hotkey subtensor_state["Owner"][hotkey] = {} subtensor_state["Owner"][hotkey][self.block_number] = coldkey subtensor_state["Active"][netuid][uid] = {} subtensor_state["Active"][netuid][uid][self.block_number] = True subtensor_state["LastUpdate"][netuid][uid] = {} subtensor_state["LastUpdate"][netuid][uid][self.block_number] = ( self.block_number ) subtensor_state["Rank"][netuid][uid] = {} subtensor_state["Rank"][netuid][uid][self.block_number] = 0.0 subtensor_state["Emission"][netuid][uid] = {} subtensor_state["Emission"][netuid][uid][self.block_number] = 0.0 subtensor_state["Incentive"][netuid][uid] = {} subtensor_state["Incentive"][netuid][uid][self.block_number] = 0.0 subtensor_state["Consensus"][netuid][uid] = {} subtensor_state["Consensus"][netuid][uid][self.block_number] = 0.0 subtensor_state["Trust"][netuid][uid] = {} subtensor_state["Trust"][netuid][uid][self.block_number] = 0.0 subtensor_state["ValidatorTrust"][netuid][uid] = {} subtensor_state["ValidatorTrust"][netuid][uid][self.block_number] = 0.0 subtensor_state["Dividends"][netuid][uid] = {} subtensor_state["Dividends"][netuid][uid][self.block_number] = 0.0 subtensor_state["PruningScores"][netuid][uid] = {} subtensor_state["PruningScores"][netuid][uid][self.block_number] = 0.0 subtensor_state["ValidatorPermit"][netuid][uid] = {} subtensor_state["ValidatorPermit"][netuid][uid][self.block_number] = False subtensor_state["Weights"][netuid][uid] = {} subtensor_state["Weights"][netuid][uid][self.block_number] = [] subtensor_state["Bonds"][netuid][uid] = {} subtensor_state["Bonds"][netuid][uid][self.block_number] = [] subtensor_state["Axons"][netuid][hotkey] = {} subtensor_state["Axons"][netuid][hotkey][self.block_number] = {} subtensor_state["Prometheus"][netuid][hotkey] = {} subtensor_state["Prometheus"][netuid][hotkey][self.block_number] = {} if hotkey not in subtensor_state["IsNetworkMember"]: subtensor_state["IsNetworkMember"][hotkey] = {} subtensor_state["IsNetworkMember"][hotkey][netuid] = {} subtensor_state["IsNetworkMember"][hotkey][netuid][self.block_number] = True return uid
[docs] @staticmethod def _convert_to_balance(balance: Union["Balance", float, int]) -> "Balance": if isinstance(balance, float): balance = Balance.from_tao(balance) if isinstance(balance, int): balance = Balance.from_rao(balance) return balance
[docs] def force_register_neuron( self, netuid: int, hotkey: str, coldkey: str, stake: Union["Balance", float, int] = Balance(0), balance: Union["Balance", float, int] = Balance(0), ) -> int: """ Force register a neuron on the mock chain, returning the UID. """ stake = self._convert_to_balance(stake) balance = self._convert_to_balance(balance) subtensor_state = self.chain_state["SubtensorModule"] if netuid not in subtensor_state["NetworksAdded"]: raise Exception("Subnet does not exist") uid = self._register_neuron(netuid=netuid, hotkey=hotkey, coldkey=coldkey) subtensor_state["TotalStake"][self.block_number] = ( self._get_most_recent_storage(subtensor_state["TotalStake"]) + stake.rao ) subtensor_state["Stake"][hotkey][coldkey][self.block_number] = stake.rao if balance.rao > 0: self.force_set_balance(coldkey, balance) self.force_set_balance(coldkey, balance) return uid
[docs] def force_set_balance( self, ss58_address: str, balance: Union["Balance", float, int] = Balance(0) ) -> Tuple[bool, Optional[str]]: """ Returns: Tuple[bool, Optional[str]]: (success, err_msg) """ balance = self._convert_to_balance(balance) if ss58_address not in self.chain_state["System"]["Account"]: self.chain_state["System"]["Account"][ss58_address] = { "data": {"free": {0: 0}} } old_balance = self.get_balance(ss58_address, self.block_number) diff = balance.rao - old_balance.rao # Update total issuance self.chain_state["SubtensorModule"]["TotalIssuance"][self.block_number] = ( self._get_most_recent_storage( self.chain_state["SubtensorModule"]["TotalIssuance"] ) + diff ) self.chain_state["System"]["Account"][ss58_address] = { "data": {"free": {self.block_number: balance.rao}} } return True, None
# Alias for force_set_balance sudo_force_set_balance = force_set_balance
[docs] def do_block_step(self) -> None: self.block_number += 1 # Doesn't do epoch subtensor_state = self.chain_state["SubtensorModule"] for subnet in subtensor_state["NetworksAdded"]: subtensor_state["BlocksSinceLastStep"][subnet][self.block_number] = ( self._get_most_recent_storage( subtensor_state["BlocksSinceLastStep"][subnet] ) + 1 )
[docs] def _handle_type_default(self, name: str, params: List[object]) -> object: defaults_mapping = { "TotalStake": 0, "TotalHotkeyStake": 0, "TotalColdkeyStake": 0, "Stake": 0, } return defaults_mapping.get(name, None)
[docs] def commit(self, wallet: "wallet", netuid: int, data: str) -> None: uid = self.get_uid_for_hotkey_on_subnet( hotkey_ss58=wallet.hotkey.ss58_address, netuid=netuid, ) if uid is None: raise Exception("Neuron not found") subtensor_state = self.chain_state["SubtensorModule"] subtensor_state["Commits"][netuid].setdefault(self.block_number, {})[uid] = data
[docs] def get_commitment(self, netuid: int, uid: int, block: Optional[int] = None) -> str: if block and self.block_number < block: raise Exception("Cannot query block in the future") block = block or self.block_number subtensor_state = self.chain_state["SubtensorModule"] return subtensor_state["Commits"][netuid][block][uid]
[docs] def query_subtensor( self, name: str, block: Optional[int] = None, params: Optional[List[object]] = [], ) -> MockSubtensorValue: if block: if self.block_number < block: raise Exception("Cannot query block in the future") else: block = self.block_number state = self.chain_state["SubtensorModule"][name] if state is not None: # Use prefix if len(params) > 0: while state is not None and len(params) > 0: state = state.get(params.pop(0), None) if state is None: return SimpleNamespace( value=self._handle_type_default(name, params) ) # Use block state_at_block = state.get(block, None) while state_at_block is None and block > 0: block -= 1 state_at_block = state.get(block, None) if state_at_block is not None: return SimpleNamespace(value=state_at_block) return SimpleNamespace(value=self._handle_type_default(name, params)) else: return SimpleNamespace(value=self._handle_type_default(name, params))
[docs] def query_map_subtensor( self, name: str, block: Optional[int] = None, params: Optional[List[object]] = [], ) -> Optional[MockMapResult]: """ Note: Double map requires one param """ if block: if self.block_number < block: raise Exception("Cannot query block in the future") else: block = self.block_number state = self.chain_state["SubtensorModule"][name] if state is not None: # Use prefix if len(params) > 0: while state is not None and len(params) > 0: state = state.get(params.pop(0), None) if state is None: return MockMapResult([]) # Check if single map or double map if len(state.keys()) == 0: return MockMapResult([]) inner = list(state.values())[0] # Should have at least one key if len(inner.keys()) == 0: raise Exception("Invalid state") # Check if double map if isinstance(list(inner.values())[0], dict): # is double map raise ChainQueryError("Double map requires one param") # Iterate over each key and add value to list, max at block records = [] for key in state: result = self._get_most_recent_storage(state[key], block) if result is None: continue # Skip if no result for this key at `block` or earlier records.append((key, result)) return MockMapResult(records) else: return MockMapResult([])
[docs] def query_constant( self, module_name: str, constant_name: str, block: Optional[int] = None ) -> Optional[object]: if block: if self.block_number < block: raise Exception("Cannot query block in the future") else: block = self.block_number state = self.chain_state.get(module_name, None) if state is not None: if constant_name in state: state = state[constant_name] else: return None # Use block state_at_block = self._get_most_recent_storage(state, block) if state_at_block is not None: return SimpleNamespace(value=state_at_block) return state_at_block["data"]["free"] # Can be None else: return None
[docs] def get_current_block(self) -> int: return self.block_number
# ==== Balance RPC methods ====
[docs] def get_balance(self, address: str, block: int = None) -> "Balance": if block: if self.block_number < block: raise Exception("Cannot query block in the future") else: block = self.block_number state = self.chain_state["System"]["Account"] if state is not None: if address in state: state = state[address] else: return Balance(0) # Use block balance_state = state["data"]["free"] state_at_block = self._get_most_recent_storage( balance_state, block ) # Can be None if state_at_block is not None: bal_as_int = state_at_block return Balance.from_rao(bal_as_int) else: return Balance(0) else: return Balance(0)
[docs] def get_balances(self, block: int = None) -> Dict[str, "Balance"]: balances = {} for address in self.chain_state["System"]["Account"]: balances[address] = self.get_balance(address, block) return balances
# ==== Neuron RPC methods ====
[docs] def neuron_for_uid( self, uid: int, netuid: int, block: Optional[int] = None ) -> Optional[NeuronInfo]: if uid is None: return NeuronInfo.get_null_neuron() if block: if self.block_number < block: raise Exception("Cannot query block in the future") else: block = self.block_number if netuid not in self.chain_state["SubtensorModule"]["NetworksAdded"]: return None neuron_info = self._neuron_subnet_exists(uid, netuid, block) if neuron_info is None: return None else: return neuron_info
[docs] def neurons(self, netuid: int, block: Optional[int] = None) -> List[NeuronInfo]: if netuid not in self.chain_state["SubtensorModule"]["NetworksAdded"]: raise Exception("Subnet does not exist") neurons = [] subnet_n = self._get_most_recent_storage( self.chain_state["SubtensorModule"]["SubnetworkN"][netuid], block ) for uid in range(subnet_n): neuron_info = self.neuron_for_uid(uid, netuid, block) if neuron_info is not None: neurons.append(neuron_info) return neurons
[docs] @staticmethod def _get_most_recent_storage( storage: Dict[BlockNumber, Any], block_number: Optional[int] = None ) -> Any: if block_number is None: items = list(storage.items()) items.sort(key=lambda x: x[0], reverse=True) if len(items) == 0: return None return items[0][1] else: while block_number >= 0: if block_number in storage: return storage[block_number] block_number -= 1 return None
[docs] def _get_axon_info( self, netuid: int, hotkey: str, block: Optional[int] = None ) -> AxonInfoDict: # Axons [netuid][hotkey][block_number] subtensor_state = self.chain_state["SubtensorModule"] if netuid not in subtensor_state["Axons"]: return AxonInfoDict.default() if hotkey not in subtensor_state["Axons"][netuid]: return AxonInfoDict.default() result = self._get_most_recent_storage( subtensor_state["Axons"][netuid][hotkey], block ) if not result: return AxonInfoDict.default() return result
[docs] def _get_prometheus_info( self, netuid: int, hotkey: str, block: Optional[int] = None ) -> PrometheusInfoDict: subtensor_state = self.chain_state["SubtensorModule"] if netuid not in subtensor_state["Prometheus"]: return PrometheusInfoDict.default() if hotkey not in subtensor_state["Prometheus"][netuid]: return PrometheusInfoDict.default() result = self._get_most_recent_storage( subtensor_state["Prometheus"][netuid][hotkey], block ) if not result: return PrometheusInfoDict.default() return result
[docs] def _neuron_subnet_exists( self, uid: int, netuid: int, block: Optional[int] = None ) -> Optional[NeuronInfo]: subtensor_state = self.chain_state["SubtensorModule"] if netuid not in subtensor_state["NetworksAdded"]: return None if self._get_most_recent_storage(subtensor_state["SubnetworkN"][netuid]) <= uid: return None hotkey = self._get_most_recent_storage(subtensor_state["Keys"][netuid][uid]) if hotkey is None: return None axon_info_ = self._get_axon_info(netuid, hotkey, block) prometheus_info = self._get_prometheus_info(netuid, hotkey, block) coldkey = self._get_most_recent_storage(subtensor_state["Owner"][hotkey], block) active = self._get_most_recent_storage( subtensor_state["Active"][netuid][uid], block ) rank = self._get_most_recent_storage( subtensor_state["Rank"][netuid][uid], block ) emission = self._get_most_recent_storage( subtensor_state["Emission"][netuid][uid], block ) incentive = self._get_most_recent_storage( subtensor_state["Incentive"][netuid][uid], block ) consensus = self._get_most_recent_storage( subtensor_state["Consensus"][netuid][uid], block ) trust = self._get_most_recent_storage( subtensor_state["Trust"][netuid][uid], block ) validator_trust = self._get_most_recent_storage( subtensor_state["ValidatorTrust"][netuid][uid], block ) dividends = self._get_most_recent_storage( subtensor_state["Dividends"][netuid][uid], block ) pruning_score = self._get_most_recent_storage( subtensor_state["PruningScores"][netuid][uid], block ) last_update = self._get_most_recent_storage( subtensor_state["LastUpdate"][netuid][uid], block ) validator_permit = self._get_most_recent_storage( subtensor_state["ValidatorPermit"][netuid][uid], block ) weights = self._get_most_recent_storage( subtensor_state["Weights"][netuid][uid], block ) bonds = self._get_most_recent_storage( subtensor_state["Bonds"][netuid][uid], block ) stake_dict = { coldkey: Balance.from_rao( self._get_most_recent_storage( subtensor_state["Stake"][hotkey][coldkey], block ) ) for coldkey in subtensor_state["Stake"][hotkey] } stake = sum(stake_dict.values()) weights = [[int(weight[0]), int(weight[1])] for weight in weights] bonds = [[int(bond[0]), int(bond[1])] for bond in bonds] rank = U16_NORMALIZED_FLOAT(rank) emission = emission / RAOPERTAO incentive = U16_NORMALIZED_FLOAT(incentive) consensus = U16_NORMALIZED_FLOAT(consensus) trust = U16_NORMALIZED_FLOAT(trust) validator_trust = U16_NORMALIZED_FLOAT(validator_trust) dividends = U16_NORMALIZED_FLOAT(dividends) prometheus_info = PrometheusInfo.fix_decoded_values(prometheus_info) axon_info_ = AxonInfo.from_neuron_info( {"hotkey": hotkey, "coldkey": coldkey, "axon_info": axon_info_} ) neuron_info = NeuronInfo( hotkey=hotkey, coldkey=coldkey, uid=uid, netuid=netuid, active=active, rank=rank, emission=emission, incentive=incentive, consensus=consensus, trust=trust, validator_trust=validator_trust, dividends=dividends, pruning_score=pruning_score, last_update=last_update, validator_permit=validator_permit, stake=stake, stake_dict=stake_dict, total_stake=stake, prometheus_info=prometheus_info, axon_info=axon_info_, weights=weights, bonds=bonds, is_null=False, ) return neuron_info
[docs] def neuron_for_uid_lite( self, uid: int, netuid: int, block: Optional[int] = None ) -> Optional[NeuronInfoLite]: if block: if self.block_number < block: raise Exception("Cannot query block in the future") else: block = self.block_number if netuid not in self.chain_state["SubtensorModule"]["NetworksAdded"]: raise Exception("Subnet does not exist") neuron_info = self._neuron_subnet_exists(uid, netuid, block) if neuron_info is None: return None else: neuron_info_dict = neuron_info.__dict__ del neuron_info del neuron_info_dict["weights"] del neuron_info_dict["bonds"] neuron_info_lite = NeuronInfoLite(**neuron_info_dict) return neuron_info_lite
[docs] def neurons_lite( self, netuid: int, block: Optional[int] = None ) -> List[NeuronInfoLite]: if netuid not in self.chain_state["SubtensorModule"]["NetworksAdded"]: raise Exception("Subnet does not exist") neurons = [] subnet_n = self._get_most_recent_storage( self.chain_state["SubtensorModule"]["SubnetworkN"][netuid] ) for uid in range(subnet_n): neuron_info = self.neuron_for_uid_lite(uid, netuid, block) if neuron_info is not None: neurons.append(neuron_info) return neurons
# Extrinsics
[docs] def _do_delegation( self, wallet: "wallet", delegate_ss58: str, amount: "Balance", wait_for_inclusion: bool = True, wait_for_finalization: bool = False, ) -> bool: # Check if delegate if not self.is_hotkey_delegate(hotkey_ss58=delegate_ss58): raise Exception("Not a delegate") # do stake success = self._do_stake( wallet=wallet, hotkey_ss58=delegate_ss58, amount=amount, wait_for_inclusion=wait_for_inclusion, wait_for_finalization=wait_for_finalization, ) return success
[docs] def _do_undelegation( self, wallet: "wallet", delegate_ss58: str, amount: "Balance", wait_for_inclusion: bool = True, wait_for_finalization: bool = False, ) -> bool: # Check if delegate if not self.is_hotkey_delegate(hotkey_ss58=delegate_ss58): raise Exception("Not a delegate") # do unstake self._do_unstake( wallet=wallet, hotkey_ss58=delegate_ss58, amount=amount, wait_for_inclusion=wait_for_inclusion, wait_for_finalization=wait_for_finalization, )
[docs] def _do_nominate( self, wallet: "wallet", wait_for_inclusion: bool = True, wait_for_finalization: bool = False, ) -> bool: hotkey_ss58 = wallet.hotkey.ss58_address coldkey_ss58 = wallet.coldkeypub.ss58_address subtensor_state = self.chain_state["SubtensorModule"] if self.is_hotkey_delegate(hotkey_ss58=hotkey_ss58): return True else: subtensor_state["Delegates"][hotkey_ss58] = {} subtensor_state["Delegates"][hotkey_ss58][self.block_number] = ( 0.18 # Constant for now ) return True
[docs] def get_transfer_fee( self, wallet: "wallet", dest: str, value: Union["Balance", float, int] ) -> "Balance": return Balance(700)
[docs] def _do_transfer( self, wallet: "wallet", dest: str, transfer_balance: "Balance", wait_for_inclusion: bool = True, wait_for_finalization: bool = False, ) -> Tuple[bool, Optional[str], Optional[str]]: bal = self.get_balance(wallet.coldkeypub.ss58_address) dest_bal = self.get_balance(dest) transfer_fee = self.get_transfer_fee(wallet, dest, transfer_balance) existential_deposit = self.get_existential_deposit() if bal < transfer_balance + existential_deposit + transfer_fee: raise Exception("Insufficient balance") # Remove from the free balance self.chain_state["System"]["Account"][wallet.coldkeypub.ss58_address]["data"][ "free" ][self.block_number] = (bal - transfer_balance - transfer_fee).rao # Add to the free balance if dest not in self.chain_state["System"]["Account"]: self.chain_state["System"]["Account"][dest] = {"data": {"free": {}}} self.chain_state["System"]["Account"][dest]["data"]["free"][ self.block_number ] = (dest_bal + transfer_balance).rao return True, None, None
[docs] def _do_pow_register( self, netuid: int, wallet: "wallet", pow_result: "POWSolution", wait_for_inclusion: bool = False, wait_for_finalization: bool = True, ) -> Tuple[bool, Optional[str]]: # Assume pow result is valid subtensor_state = self.chain_state["SubtensorModule"] if netuid not in subtensor_state["NetworksAdded"]: raise Exception("Subnet does not exist") self._register_neuron( netuid=netuid, hotkey=wallet.hotkey.ss58_address, coldkey=wallet.coldkeypub.ss58_address, ) return True, None
[docs] def _do_burned_register( self, netuid: int, wallet: "wallet", wait_for_inclusion: bool = False, wait_for_finalization: bool = True, ) -> Tuple[bool, Optional[str]]: subtensor_state = self.chain_state["SubtensorModule"] if netuid not in subtensor_state["NetworksAdded"]: raise Exception("Subnet does not exist") bal = self.get_balance(wallet.coldkeypub.ss58_address) burn = self.recycle(netuid=netuid) existential_deposit = self.get_existential_deposit() if bal < burn + existential_deposit: raise Exception("Insufficient funds") self._register_neuron( netuid=netuid, hotkey=wallet.hotkey.ss58_address, coldkey=wallet.coldkeypub.ss58_address, ) # Burn the funds self.chain_state["System"]["Account"][wallet.coldkeypub.ss58_address]["data"][ "free" ][self.block_number] = (bal - burn).rao return True, None
[docs] def _do_stake( self, wallet: "wallet", hotkey_ss58: str, amount: "Balance", wait_for_inclusion: bool = True, wait_for_finalization: bool = False, ) -> bool: subtensor_state = self.chain_state["SubtensorModule"] bal = self.get_balance(wallet.coldkeypub.ss58_address) curr_stake = self.get_stake_for_coldkey_and_hotkey( hotkey_ss58=hotkey_ss58, coldkey_ss58=wallet.coldkeypub.ss58_address ) if curr_stake is None: curr_stake = Balance(0) existential_deposit = self.get_existential_deposit() if bal < amount + existential_deposit: raise Exception("Insufficient funds") stake_state = subtensor_state["Stake"] # Stake the funds if not hotkey_ss58 in stake_state: stake_state[hotkey_ss58] = {} if not wallet.coldkeypub.ss58_address in stake_state[hotkey_ss58]: stake_state[hotkey_ss58][wallet.coldkeypub.ss58_address] = {} stake_state[hotkey_ss58][wallet.coldkeypub.ss58_address][self.block_number] = ( amount.rao ) # Add to total_stake storage subtensor_state["TotalStake"][self.block_number] = ( self._get_most_recent_storage(subtensor_state["TotalStake"]) + amount.rao ) total_hotkey_stake_state = subtensor_state["TotalHotkeyStake"] if not hotkey_ss58 in total_hotkey_stake_state: total_hotkey_stake_state[hotkey_ss58] = {} total_coldkey_stake_state = subtensor_state["TotalColdkeyStake"] if not wallet.coldkeypub.ss58_address in total_coldkey_stake_state: total_coldkey_stake_state[wallet.coldkeypub.ss58_address] = {} curr_total_hotkey_stake = self.query_subtensor( name="TotalHotkeyStake", params=[hotkey_ss58], block=min(self.block_number - 1, 0), ) curr_total_coldkey_stake = self.query_subtensor( name="TotalColdkeyStake", params=[wallet.coldkeypub.ss58_address], block=min(self.block_number - 1, 0), ) total_hotkey_stake_state[hotkey_ss58][self.block_number] = ( curr_total_hotkey_stake.value + amount.rao ) total_coldkey_stake_state[wallet.coldkeypub.ss58_address][self.block_number] = ( curr_total_coldkey_stake.value + amount.rao ) # Remove from free balance self.chain_state["System"]["Account"][wallet.coldkeypub.ss58_address]["data"][ "free" ][self.block_number] = (bal - amount).rao return True
[docs] def _do_unstake( self, wallet: "wallet", hotkey_ss58: str, amount: "Balance", wait_for_inclusion: bool = True, wait_for_finalization: bool = False, ) -> bool: subtensor_state = self.chain_state["SubtensorModule"] bal = self.get_balance(wallet.coldkeypub.ss58_address) curr_stake = self.get_stake_for_coldkey_and_hotkey( hotkey_ss58=hotkey_ss58, coldkey_ss58=wallet.coldkeypub.ss58_address ) if curr_stake is None: curr_stake = Balance(0) if curr_stake < amount: raise Exception("Insufficient funds") stake_state = subtensor_state["Stake"] if curr_stake.rao == 0: return True # Unstake the funds # We know that the hotkey has stake, so we can just remove it stake_state[hotkey_ss58][wallet.coldkeypub.ss58_address][self.block_number] = ( curr_stake - amount ).rao # Add to the free balance if wallet.coldkeypub.ss58_address not in self.chain_state["System"]["Account"]: self.chain_state["System"]["Account"][wallet.coldkeypub.ss58_address] = { "data": {"free": {}} } # Remove from total stake storage subtensor_state["TotalStake"][self.block_number] = ( self._get_most_recent_storage(subtensor_state["TotalStake"]) - amount.rao ) total_hotkey_stake_state = subtensor_state["TotalHotkeyStake"] if not hotkey_ss58 in total_hotkey_stake_state: total_hotkey_stake_state[hotkey_ss58] = {} total_hotkey_stake_state[hotkey_ss58][self.block_number] = ( 0 # Shouldn't happen ) total_coldkey_stake_state = subtensor_state["TotalColdkeyStake"] if not wallet.coldkeypub.ss58_address in total_coldkey_stake_state: total_coldkey_stake_state[wallet.coldkeypub.ss58_address] = {} total_coldkey_stake_state[wallet.coldkeypub.ss58_address][ self.block_number ] = amount.rao # Shouldn't happen total_hotkey_stake_state[hotkey_ss58][self.block_number] = ( self._get_most_recent_storage( subtensor_state["TotalHotkeyStake"][hotkey_ss58] ) - amount.rao ) total_coldkey_stake_state[wallet.coldkeypub.ss58_address][self.block_number] = ( self._get_most_recent_storage( subtensor_state["TotalColdkeyStake"][wallet.coldkeypub.ss58_address] ) - amount.rao ) self.chain_state["System"]["Account"][wallet.coldkeypub.ss58_address]["data"][ "free" ][self.block_number] = (bal + amount).rao return True
[docs] @staticmethod def min_required_stake(): """ As the minimum required stake may change, this method allows us to dynamically update the amount in the mock without updating the tests """ # valid minimum threshold as of 2024/05/01 return 100_000_000 # RAO
[docs] def get_minimum_required_stake(self): return Balance.from_rao(self.min_required_stake())
[docs] def get_delegate_by_hotkey( self, hotkey_ss58: str, block: Optional[int] = None ) -> Optional["DelegateInfo"]: subtensor_state = self.chain_state["SubtensorModule"] if hotkey_ss58 not in subtensor_state["Delegates"]: return None newest_state = self._get_most_recent_storage( subtensor_state["Delegates"][hotkey_ss58], block ) if newest_state is None: return None nom_result = [] nominators = subtensor_state["Stake"][hotkey_ss58] for nominator in nominators: nom_amount = self.get_stake_for_coldkey_and_hotkey( hotkey_ss58=hotkey_ss58, coldkey_ss58=nominator, block=block ) if nom_amount is not None and nom_amount.rao > 0: nom_result.append((nominator, nom_amount)) registered_subnets = [] for subnet in self.get_all_subnet_netuids(block=block): uid = self.get_uid_for_hotkey_on_subnet( hotkey_ss58=hotkey_ss58, netuid=subnet, block=block ) if uid is not None: registered_subnets.append((subnet, uid)) info = DelegateInfo( hotkey_ss58=hotkey_ss58, total_stake=self.get_total_stake_for_hotkey(ss58_address=hotkey_ss58) or Balance(0), nominators=nom_result, owner_ss58=self.get_hotkey_owner(hotkey_ss58=hotkey_ss58, block=block), take=0.18, validator_permits=[ subnet for subnet, uid in registered_subnets if self.neuron_has_validator_permit(uid=uid, netuid=subnet, block=block) ], registrations=[subnet for subnet, _ in registered_subnets], return_per_1000=Balance.from_tao(1234567), # Doesn't matter for mock? total_daily_return=Balance.from_tao(1234567), # Doesn't matter for mock? ) return info
[docs] def get_delegates(self, block: Optional[int] = None) -> List["DelegateInfo"]: subtensor_state = self.chain_state["SubtensorModule"] delegates_info = [] for hotkey in subtensor_state["Delegates"]: info = self.get_delegate_by_hotkey(hotkey_ss58=hotkey, block=block) if info is not None: delegates_info.append(info) return delegates_info
[docs] def get_delegated( self, coldkey_ss58: str, block: Optional[int] = None ) -> List[Tuple["DelegateInfo", "Balance"]]: """Returns the list of delegates that a given coldkey is staked to.""" delegates = self.get_delegates(block=block) result = [] for delegate in delegates: if coldkey_ss58 in delegate.nominators: result.append((delegate, delegate.nominators[coldkey_ss58])) return result
[docs] def get_all_subnets_info(self, block: Optional[int] = None) -> List[SubnetInfo]: subtensor_state = self.chain_state["SubtensorModule"] result = [] for subnet in subtensor_state["NetworksAdded"]: info = self.get_subnet_info(netuid=subnet, block=block) if info is not None: result.append(info) return result
[docs] def get_subnet_info( self, netuid: int, block: Optional[int] = None ) -> Optional[SubnetInfo]: if not self.subnet_exists(netuid=netuid, block=block): return None def query_subnet_info(name: str) -> Optional[object]: return self.query_subtensor(name=name, block=block, params=[netuid]).value info = SubnetInfo( netuid=netuid, rho=query_subnet_info(name="Rho"), kappa=query_subnet_info(name="Kappa"), difficulty=query_subnet_info(name="Difficulty"), immunity_period=query_subnet_info(name="ImmunityPeriod"), max_allowed_validators=query_subnet_info(name="MaxAllowedValidators"), min_allowed_weights=query_subnet_info(name="MinAllowedWeights"), max_weight_limit=query_subnet_info(name="MaxWeightLimit"), scaling_law_power=query_subnet_info(name="ScalingLawPower"), subnetwork_n=query_subnet_info(name="SubnetworkN"), max_n=query_subnet_info(name="MaxAllowedUids"), blocks_since_epoch=query_subnet_info(name="BlocksSinceLastStep"), tempo=query_subnet_info(name="Tempo"), modality=query_subnet_info(name="NetworkModality"), connection_requirements={ str(netuid_.value): percentile.value for netuid_, percentile in self.query_map_subtensor( name="NetworkConnect", block=block, params=[netuid] ).records }, emission_value=query_subnet_info(name="EmissionValues"), burn=query_subnet_info(name="Burn"), owner_ss58=query_subnet_info(name="SubnetOwner"), ) return info
[docs] def _do_serve_prometheus( self, wallet: "wallet", call_params: "PrometheusServeCallParams", wait_for_inclusion: bool = False, wait_for_finalization: bool = True, ) -> Tuple[bool, Optional[str]]: return True, None
[docs] def _do_set_weights( self, wallet: "wallet", netuid: int, uids: int, vals: List[int], version_key: int, wait_for_inclusion: bool = False, wait_for_finalization: bool = True, ) -> Tuple[bool, Optional[str]]: return True, None
[docs] def _do_serve_axon( self, wallet: "wallet", call_params: "AxonServeCallParams", wait_for_inclusion: bool = False, wait_for_finalization: bool = True, ) -> Tuple[bool, Optional[str]]: return True, None