Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -220,7 +220,7 @@ python main.py query validator-set --type execution --config-path ~/config.toml
- Fetch validator info

```sh
python main.py query validator --validator-id 1 --config-path ~/config.toml
python main.py query validator --secp-pubkey 028bcac2c1e57b60ae93b47de912a5a48f5ec7675e7c608baeffb2fd33ff7baaef --config-path ~/config.toml
```

- Verify all values match in the output
Expand Down Expand Up @@ -411,7 +411,7 @@ INFO Commission successfully changed from 10.0% to 5.0% for validator 1
### Query Validator Information

```sh
python main.py query validator --validator-id 1 --config-path ~/config.toml
python main.py query validator --secp-pubkey 028bcac2c1e57b60ae93b47de912a5a48f5ec7675e7c608baeffb2fd33ff7baaef --config-path ~/config.toml
```

### Query Delegator Information
Expand Down
71 changes: 71 additions & 0 deletions staking-cli/src/helpers.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from typing import Union
from web3 import Web3
from eth_keys import keys
from src.logger import init_logging
from src.query import get_validator_info, validator_exists
from py_ecc.optimized_bls12_381 import curve_order
Expand Down Expand Up @@ -72,6 +73,54 @@ def is_valid_secp256k1_private_key(hex_private_key: str) -> bool:
return 0 < key_int < SECP256K1_ORDER


def is_valid_secp256k1_public_key(hex_public_key: str) -> bool:
"""Validates if a given string is a correctly formatted secp256k1 public key"""
if not isinstance(hex_public_key, str):
return False

# 1. Standardize input: Remove '0x' prefix if it exists
if hex_public_key.startswith("0x"):
key_hex = hex_public_key[2:]
else:
key_hex = hex_public_key

# 2. Check for valid hexadecimal characters
try:
integer = int(key_hex, 16)
except Exception as e:
raise ValueError(f"Invalid hex for pubkey: {e}")

key_len_chars = len(key_hex)
prefix = key_hex[:2]

# Must be 66 chars (33 bytes) and start with '02' or '03'
if key_len_chars == 66:
if prefix == "02" or prefix == "03":
return True
else:
return False # Wrong prefix for compressed key

# Invalid length
return False


def get_eth_address_from_pubkey(hex_public_key: str) -> str:
if hex_public_key.startswith("0x"):
hex_public_key = hex_public_key[2:]
else:
hex_public_key = hex_public_key
public_key_bytes = bytes.fromhex(hex_public_key)
try:
public_key_obj = keys.PublicKey.from_compressed_bytes(public_key_bytes)
uncompressed_key_bytes_64 = public_key_obj.to_bytes()
except Exception as e:
raise ValueError(f"Invalid public key point. Error: {e}")
keccak_hash = Web3.keccak(uncompressed_key_bytes_64)
address_bytes = keccak_hash[-20:]
address = Web3.to_checksum_address(address_bytes)
return address


def number_prompt(description: str, range: list = [], default: str = ""):
if range:
return Prompt.ask(
Expand Down Expand Up @@ -103,6 +152,28 @@ def key_prompt(config: dict, key_type: str):
return key


def pub_key_prompt(config: dict, key_type: str):
"""Ask for public key and validate"""
colors = config["colors"]
log = init_logging(config["log_level"].upper())
key = Prompt.ask(
f"\n[{colors['primary_text']}]Enter [{colors['main']}]{key_type.capitalize()} Public Key[/] of the validator[/]"
)
if key_type == "secp":
try:
validation = is_valid_secp256k1_public_key(key)
except Exception as e:
log.error(f"Error while validating secp public key: {e}")
validation = False
if validation:
return str(key)
else:
log.error(f"\nEnter a valid key, instead of: {key}")
# ask for input again
key = pub_key_prompt(config, key_type)
return key


def is_valid_address(address: str) -> bool:
"""Validates an Ethereum address by checking its format and EIP-55 checksum."""
try:
Expand Down
6 changes: 3 additions & 3 deletions staking-cli/src/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,10 +209,10 @@ def init_parser() -> argparse.ArgumentParser:

# val_info_parser
val_info_parser.add_argument(
"--validator-id",
type=int,
"--secp-pubkey",
type=str,
required=True,
help="Unique id representing the validator on-chain",
help="Compressed SECP public key of the validator, as generated by monad-keystore",
Comment on lines -212 to +215
Copy link
Contributor

@johnmarcou johnmarcou Oct 23, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: Does this means we won't be able to query by validator ID after this change?

I think this feature is actually useful.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, however if querying with val-id is useful we can keep both options available, the point here is to abstract val-id from the end user, eventually this eth_storage call wont be required and a sc call will be baked in to the staking contract.

)
val_info_parser.add_argument(
"--config-path",
Expand Down
83 changes: 73 additions & 10 deletions staking-cli/src/query.py
Original file line number Diff line number Diff line change
@@ -1,60 +1,93 @@
import struct
import requests
from web3 import Web3
from staking_sdk_py.callGetters import call_getter
from src.logger import init_logging


def get_validator_info(config, val_id):
# query validator information
w3 = Web3(Web3.HTTPProvider(config['rpc_url']))
val_info = call_getter(w3, 'get_validator', config['contract_address'], val_id)
w3 = Web3(Web3.HTTPProvider(config["rpc_url"]))
val_info = call_getter(w3, "get_validator", config["contract_address"], val_id)
return val_info


def validator_exists(val_info: tuple) -> bool:
if val_info[10].hex() == "000000000000000000000000000000000000000000000000000000000000000000":
if (
val_info[10].hex()
== "000000000000000000000000000000000000000000000000000000000000000000"
):
return False
return True


def get_validator_set(config: dict, type: str = "consensus") -> tuple:
contract_address = config["contract_address"]
rpc_url = config["rpc_url"]
w3 = Web3(Web3.HTTPProvider(rpc_url))
validator_set = call_getter(w3,f'get_{type}_valset', contract_address, 0)
validator_set = call_getter(w3, f"get_{type}_valset", contract_address, 0)
return validator_set[2]


def get_delegator_info(config: dict, val_id: int, delegator_address: str):
contract_address = config["contract_address"]
rpc_url = config["rpc_url"]
w3 = Web3(Web3.HTTPProvider(rpc_url))
delegator_info = call_getter(w3,'get_delegator', contract_address, val_id, delegator_address)
delegator_info = call_getter(
w3, "get_delegator", contract_address, val_id, delegator_address
)
return delegator_info

def get_withdrawal_info(config: dict, validator_id: str, delegator_address: str, withdrawal_id: int):

def get_withdrawal_info(
config: dict, validator_id: str, delegator_address: str, withdrawal_id: int
):
contract_address = config["contract_address"]
rpc_url = config["rpc_url"]
w3 = Web3(Web3.HTTPProvider(rpc_url))
withdrawal_request = call_getter(w3, 'get_withdrawal_request', contract_address, validator_id, delegator_address, withdrawal_id)
withdrawal_request = call_getter(
w3,
"get_withdrawal_request",
contract_address,
validator_id,
delegator_address,
withdrawal_id,
)
return withdrawal_request


def get_delegators_list(config: dict, validator_id: int):
contract_address = config["contract_address"]
rpc_url = config["rpc_url"]
w3 = Web3(Web3.HTTPProvider(rpc_url))
delegators = call_getter(w3, 'get_delegators', contract_address, validator_id, "0x0000000000000000000000000000000000000000")
delegators = call_getter(
w3,
"get_delegators",
contract_address,
validator_id,
"0x0000000000000000000000000000000000000000",
)
return delegators


def get_validators_list(config: dict, delegator_address: str):
contract_address = config["contract_address"]
rpc_url = config["rpc_url"]
w3 = Web3(Web3.HTTPProvider(rpc_url))
validators_result = call_getter(w3, 'get_delegations', contract_address, delegator_address, 0)
validators_result = call_getter(
w3, "get_delegations", contract_address, delegator_address, 0
)
return validators_result


def get_epoch_info(config: dict):
contract_address = config["contract_address"]
rpc_url = config["rpc_url"]
w3 = Web3(Web3.HTTPProvider(rpc_url))
epoch_info = call_getter(w3,'get_epoch', contract_address)
epoch_info = call_getter(w3, "get_epoch", contract_address)
return epoch_info


def get_tx_by_hash(config: dict, tx_hash: str):
rpc_url = config["rpc_url"]
try:
Expand All @@ -63,3 +96,33 @@ def get_tx_by_hash(config: dict, tx_hash: str):
return tx
except Exception as e:
return e


def eth_get_storage_at(
config: dict, slot_key: bytes, block: str = "latest", timeout=15
) -> bytes:
rpc_url = config["rpc_url"]
assert len(slot_key) == 32
payload = {
"jsonrpc": "2.0",
"id": 1,
"method": "eth_getStorageAt",
"params": [
"0x0000000000000000000000000000000000001000",
"0x" + slot_key.hex(),
block,
],
}
r = requests.post(rpc_url, json=payload, timeout=timeout)
result = r.json()["result"].replace("0x", "")
return bytes.fromhex(result)


def get_val_id_from_secp_pubkey(config: dict, eth_address: str) -> int:
VAL_ID_SECP_NAMESPACE = 0x06
slot_key = struct.pack(
">B20s11s", VAL_ID_SECP_NAMESPACE, bytes.fromhex(eth_address[2:]), b"\x00" * 11
)
slot_data = eth_get_storage_at(config, slot_key)
val_id = int.from_bytes(slot_data[:8], "big")
return val_id
36 changes: 29 additions & 7 deletions staking-cli/src/query_menu.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@
from src.helpers import (
address_prompt,
is_valid_address,
is_valid_secp256k1_public_key,
number_prompt,
confirmation_prompt,
val_id_prompt,
pub_key_prompt,
get_eth_address_from_pubkey,
)
from src.logger import init_logging
from src.query import (
Expand All @@ -22,6 +25,7 @@
get_delegators_list,
get_tx_by_hash,
get_epoch_info,
get_val_id_from_secp_pubkey,
)

console = Console()
Expand Down Expand Up @@ -73,9 +77,9 @@ def print_validator(val_info, val_id, verbose):
("bls Pubkey", ""),
]

table = Table(title=f"Validator Info of: [red]val-id {val_id}[/]")
table = Table(title=f"Validator Info f: [red]val-id {val_id}[/]")
table.add_column("Field", style="yellow")
table.add_column("Value", style="cyan")
table.add_column(f"Values for val-id: [yellow]{val_id}[/]", style="cyan")
console = Console()
if verbose:
for i in range(0, len(val_info)):
Expand Down Expand Up @@ -191,10 +195,19 @@ def query(config):
while True:
choice = print_query_menu(config)
if choice == "1":
validator_id = val_id_prompt(config)
# validator_id = val_id_prompt(config)
pub_key_hex = pub_key_prompt(config, "secp")
try:
validator_eth_address = get_eth_address_from_pubkey(pub_key_hex)
except Exception as e:
log.error(f"Error occured while deriving address from pubkey: {e}")
continue
validator_id = get_val_id_from_secp_pubkey(config, validator_eth_address)
validator_info = get_validator_info(config, validator_id)
# verbose = confirmation_prompt(f"[{colors["secondary_text"]}]Validator exists! Do you want a verbose output?[/]", default=False)
print_validator(validator_info, validator_id, True)
if validator_exists(validator_info):
print_validator(validator_info, validator_id, True)
else:
log.error("Error! Validator is not registered")
elif choice == "2":
w3 = Web3(Web3.HTTPProvider(config["rpc_url"]))
delegator_address = w3.eth.account.from_key(
Expand Down Expand Up @@ -269,12 +282,21 @@ def query(config):
def query_cli(config: dict, args: Namespace):
log = init_logging(config["log_level"])
if args.query == "validator":
validator_id = args.validator_id
pub_key_hex = args.secp_pubkey
if not is_valid_secp256k1_public_key(pub_key_hex):
log.error(f"Error: Invalid SECP public key")
return
try:
validator_eth_address = get_eth_address_from_pubkey(pub_key_hex)
except Exception as e:
log.error(f"Error occured while deriving address from pubkey: {e}")
return
validator_id = get_val_id_from_secp_pubkey(config, validator_eth_address)
validator_info = get_validator_info(config, validator_id)
if validator_exists(validator_info):
print_validator(validator_info, validator_id, True)
else:
log.error("Error! Invalid Validator ID")
log.error("Error! Validator is not registered")
return
elif args.query == "delegator":
w3 = Web3(Web3.HTTPProvider(config["rpc_url"]))
Expand Down