diff --git a/README.md b/README.md index 2108bfa..2616b94 100644 --- a/README.md +++ b/README.md @@ -220,7 +220,7 @@ python main.py query validator-set --type execution --config-path ~/config.toml - Fetch validator info ```sh -python main.py query validator --validator-id 1 --config-path ~/config.toml +python main.py query validator --secp-pubkey 028bcac2c1e57b60ae93b47de912a5a48f5ec7675e7c608baeffb2fd33ff7baaef --config-path ~/config.toml ``` - Verify all values match in the output @@ -411,7 +411,7 @@ INFO Commission successfully changed from 10.0% to 5.0% for validator 1 ### Query Validator Information ```sh -python main.py query validator --validator-id 1 --config-path ~/config.toml +python main.py query validator --secp-pubkey 028bcac2c1e57b60ae93b47de912a5a48f5ec7675e7c608baeffb2fd33ff7baaef --config-path ~/config.toml ``` ### Query Delegator Information diff --git a/staking-cli/src/helpers.py b/staking-cli/src/helpers.py index f905397..4500d4a 100644 --- a/staking-cli/src/helpers.py +++ b/staking-cli/src/helpers.py @@ -1,5 +1,6 @@ from typing import Union from web3 import Web3 +from eth_keys import keys from src.logger import init_logging from src.query import get_validator_info, validator_exists from py_ecc.optimized_bls12_381 import curve_order @@ -72,6 +73,54 @@ def is_valid_secp256k1_private_key(hex_private_key: str) -> bool: return 0 < key_int < SECP256K1_ORDER +def is_valid_secp256k1_public_key(hex_public_key: str) -> bool: + """Validates if a given string is a correctly formatted secp256k1 public key""" + if not isinstance(hex_public_key, str): + return False + + # 1. Standardize input: Remove '0x' prefix if it exists + if hex_public_key.startswith("0x"): + key_hex = hex_public_key[2:] + else: + key_hex = hex_public_key + + # 2. Check for valid hexadecimal characters + try: + integer = int(key_hex, 16) + except Exception as e: + raise ValueError(f"Invalid hex for pubkey: {e}") + + key_len_chars = len(key_hex) + prefix = key_hex[:2] + + # Must be 66 chars (33 bytes) and start with '02' or '03' + if key_len_chars == 66: + if prefix == "02" or prefix == "03": + return True + else: + return False # Wrong prefix for compressed key + + # Invalid length + return False + + +def get_eth_address_from_pubkey(hex_public_key: str) -> str: + if hex_public_key.startswith("0x"): + hex_public_key = hex_public_key[2:] + else: + hex_public_key = hex_public_key + public_key_bytes = bytes.fromhex(hex_public_key) + try: + public_key_obj = keys.PublicKey.from_compressed_bytes(public_key_bytes) + uncompressed_key_bytes_64 = public_key_obj.to_bytes() + except Exception as e: + raise ValueError(f"Invalid public key point. Error: {e}") + keccak_hash = Web3.keccak(uncompressed_key_bytes_64) + address_bytes = keccak_hash[-20:] + address = Web3.to_checksum_address(address_bytes) + return address + + def number_prompt(description: str, range: list = [], default: str = ""): if range: return Prompt.ask( @@ -103,6 +152,28 @@ def key_prompt(config: dict, key_type: str): return key +def pub_key_prompt(config: dict, key_type: str): + """Ask for public key and validate""" + colors = config["colors"] + log = init_logging(config["log_level"].upper()) + key = Prompt.ask( + f"\n[{colors['primary_text']}]Enter [{colors['main']}]{key_type.capitalize()} Public Key[/] of the validator[/]" + ) + if key_type == "secp": + try: + validation = is_valid_secp256k1_public_key(key) + except Exception as e: + log.error(f"Error while validating secp public key: {e}") + validation = False + if validation: + return str(key) + else: + log.error(f"\nEnter a valid key, instead of: {key}") + # ask for input again + key = pub_key_prompt(config, key_type) + return key + + def is_valid_address(address: str) -> bool: """Validates an Ethereum address by checking its format and EIP-55 checksum.""" try: diff --git a/staking-cli/src/parser.py b/staking-cli/src/parser.py index d7ca1a9..9de9d28 100644 --- a/staking-cli/src/parser.py +++ b/staking-cli/src/parser.py @@ -209,10 +209,10 @@ def init_parser() -> argparse.ArgumentParser: # val_info_parser val_info_parser.add_argument( - "--validator-id", - type=int, + "--secp-pubkey", + type=str, required=True, - help="Unique id representing the validator on-chain", + help="Compressed SECP public key of the validator, as generated by monad-keystore", ) val_info_parser.add_argument( "--config-path", diff --git a/staking-cli/src/query.py b/staking-cli/src/query.py index 33217c8..d9ed568 100644 --- a/staking-cli/src/query.py +++ b/staking-cli/src/query.py @@ -1,60 +1,93 @@ +import struct +import requests from web3 import Web3 from staking_sdk_py.callGetters import call_getter from src.logger import init_logging + def get_validator_info(config, val_id): # query validator information - w3 = Web3(Web3.HTTPProvider(config['rpc_url'])) - val_info = call_getter(w3, 'get_validator', config['contract_address'], val_id) + w3 = Web3(Web3.HTTPProvider(config["rpc_url"])) + val_info = call_getter(w3, "get_validator", config["contract_address"], val_id) return val_info + def validator_exists(val_info: tuple) -> bool: - if val_info[10].hex() == "000000000000000000000000000000000000000000000000000000000000000000": + if ( + val_info[10].hex() + == "000000000000000000000000000000000000000000000000000000000000000000" + ): return False return True + def get_validator_set(config: dict, type: str = "consensus") -> tuple: contract_address = config["contract_address"] rpc_url = config["rpc_url"] w3 = Web3(Web3.HTTPProvider(rpc_url)) - validator_set = call_getter(w3,f'get_{type}_valset', contract_address, 0) + validator_set = call_getter(w3, f"get_{type}_valset", contract_address, 0) return validator_set[2] + def get_delegator_info(config: dict, val_id: int, delegator_address: str): contract_address = config["contract_address"] rpc_url = config["rpc_url"] w3 = Web3(Web3.HTTPProvider(rpc_url)) - delegator_info = call_getter(w3,'get_delegator', contract_address, val_id, delegator_address) + delegator_info = call_getter( + w3, "get_delegator", contract_address, val_id, delegator_address + ) return delegator_info -def get_withdrawal_info(config: dict, validator_id: str, delegator_address: str, withdrawal_id: int): + +def get_withdrawal_info( + config: dict, validator_id: str, delegator_address: str, withdrawal_id: int +): contract_address = config["contract_address"] rpc_url = config["rpc_url"] w3 = Web3(Web3.HTTPProvider(rpc_url)) - withdrawal_request = call_getter(w3, 'get_withdrawal_request', contract_address, validator_id, delegator_address, withdrawal_id) + withdrawal_request = call_getter( + w3, + "get_withdrawal_request", + contract_address, + validator_id, + delegator_address, + withdrawal_id, + ) return withdrawal_request + def get_delegators_list(config: dict, validator_id: int): contract_address = config["contract_address"] rpc_url = config["rpc_url"] w3 = Web3(Web3.HTTPProvider(rpc_url)) - delegators = call_getter(w3, 'get_delegators', contract_address, validator_id, "0x0000000000000000000000000000000000000000") + delegators = call_getter( + w3, + "get_delegators", + contract_address, + validator_id, + "0x0000000000000000000000000000000000000000", + ) return delegators + def get_validators_list(config: dict, delegator_address: str): contract_address = config["contract_address"] rpc_url = config["rpc_url"] w3 = Web3(Web3.HTTPProvider(rpc_url)) - validators_result = call_getter(w3, 'get_delegations', contract_address, delegator_address, 0) + validators_result = call_getter( + w3, "get_delegations", contract_address, delegator_address, 0 + ) return validators_result + def get_epoch_info(config: dict): contract_address = config["contract_address"] rpc_url = config["rpc_url"] w3 = Web3(Web3.HTTPProvider(rpc_url)) - epoch_info = call_getter(w3,'get_epoch', contract_address) + epoch_info = call_getter(w3, "get_epoch", contract_address) return epoch_info + def get_tx_by_hash(config: dict, tx_hash: str): rpc_url = config["rpc_url"] try: @@ -63,3 +96,33 @@ def get_tx_by_hash(config: dict, tx_hash: str): return tx except Exception as e: return e + + +def eth_get_storage_at( + config: dict, slot_key: bytes, block: str = "latest", timeout=15 +) -> bytes: + rpc_url = config["rpc_url"] + assert len(slot_key) == 32 + payload = { + "jsonrpc": "2.0", + "id": 1, + "method": "eth_getStorageAt", + "params": [ + "0x0000000000000000000000000000000000001000", + "0x" + slot_key.hex(), + block, + ], + } + r = requests.post(rpc_url, json=payload, timeout=timeout) + result = r.json()["result"].replace("0x", "") + return bytes.fromhex(result) + + +def get_val_id_from_secp_pubkey(config: dict, eth_address: str) -> int: + VAL_ID_SECP_NAMESPACE = 0x06 + slot_key = struct.pack( + ">B20s11s", VAL_ID_SECP_NAMESPACE, bytes.fromhex(eth_address[2:]), b"\x00" * 11 + ) + slot_data = eth_get_storage_at(config, slot_key) + val_id = int.from_bytes(slot_data[:8], "big") + return val_id diff --git a/staking-cli/src/query_menu.py b/staking-cli/src/query_menu.py index 1be37e4..7691c73 100644 --- a/staking-cli/src/query_menu.py +++ b/staking-cli/src/query_menu.py @@ -7,9 +7,12 @@ from src.helpers import ( address_prompt, is_valid_address, + is_valid_secp256k1_public_key, number_prompt, confirmation_prompt, val_id_prompt, + pub_key_prompt, + get_eth_address_from_pubkey, ) from src.logger import init_logging from src.query import ( @@ -22,6 +25,7 @@ get_delegators_list, get_tx_by_hash, get_epoch_info, + get_val_id_from_secp_pubkey, ) console = Console() @@ -73,9 +77,9 @@ def print_validator(val_info, val_id, verbose): ("bls Pubkey", ""), ] - table = Table(title=f"Validator Info of: [red]val-id {val_id}[/]") + table = Table(title=f"Validator Info f: [red]val-id {val_id}[/]") table.add_column("Field", style="yellow") - table.add_column("Value", style="cyan") + table.add_column(f"Values for val-id: [yellow]{val_id}[/]", style="cyan") console = Console() if verbose: for i in range(0, len(val_info)): @@ -191,10 +195,19 @@ def query(config): while True: choice = print_query_menu(config) if choice == "1": - validator_id = val_id_prompt(config) + # validator_id = val_id_prompt(config) + pub_key_hex = pub_key_prompt(config, "secp") + try: + validator_eth_address = get_eth_address_from_pubkey(pub_key_hex) + except Exception as e: + log.error(f"Error occured while deriving address from pubkey: {e}") + continue + validator_id = get_val_id_from_secp_pubkey(config, validator_eth_address) validator_info = get_validator_info(config, validator_id) - # verbose = confirmation_prompt(f"[{colors["secondary_text"]}]Validator exists! Do you want a verbose output?[/]", default=False) - print_validator(validator_info, validator_id, True) + if validator_exists(validator_info): + print_validator(validator_info, validator_id, True) + else: + log.error("Error! Validator is not registered") elif choice == "2": w3 = Web3(Web3.HTTPProvider(config["rpc_url"])) delegator_address = w3.eth.account.from_key( @@ -269,12 +282,21 @@ def query(config): def query_cli(config: dict, args: Namespace): log = init_logging(config["log_level"]) if args.query == "validator": - validator_id = args.validator_id + pub_key_hex = args.secp_pubkey + if not is_valid_secp256k1_public_key(pub_key_hex): + log.error(f"Error: Invalid SECP public key") + return + try: + validator_eth_address = get_eth_address_from_pubkey(pub_key_hex) + except Exception as e: + log.error(f"Error occured while deriving address from pubkey: {e}") + return + validator_id = get_val_id_from_secp_pubkey(config, validator_eth_address) validator_info = get_validator_info(config, validator_id) if validator_exists(validator_info): print_validator(validator_info, validator_id, True) else: - log.error("Error! Invalid Validator ID") + log.error("Error! Validator is not registered") return elif args.query == "delegator": w3 = Web3(Web3.HTTPProvider(config["rpc_url"]))