pyasic

Miner RPC APIs

Each miner has a unique RPC API that is used to communicate with it. Each of these API types has commands that differ between them, and some commands have data that others do not. Each miner that is a subclass of BaseMiner may have an API linked to it as Miner.rpc.

All RPC API implementations inherit from BaseMinerRPCAPI, which implements the basic communications protocols.

BaseMinerRPCAPI should never be used unless inheriting to create a new miner API class for a new type of miner (which should be exceedingly rare). BaseMinerRPCAPI cannot be instantiated directly, it will raise a TypeError. Use these instead -

BFGMiner API

BMMiner API

BOSMiner API

BTMiner API

CGMiner API

LUXMiner API

Unknown API


BaseMinerRPCAPI

Source code in pyasic/rpc/base.py
class BaseMinerRPCAPI:
    def __init__(self, ip: str, port: int = 4028, api_ver: str = "0.0.0") -> None:
        # api port, should be 4028
        self.port = port
        # ip address of the miner
        self.ip = ipaddress.ip_address(ip)
        # api version if known
        self.api_ver = api_ver

        self.pwd = None

    def __new__(cls, *args, **kwargs):
        if cls is BaseMinerRPCAPI:
            raise TypeError(f"Only children of '{cls.__name__}' may be instantiated")
        return object.__new__(cls)

    def __repr__(self) -> str:
        return f"{self.__class__.__name__}: {str(self.ip)}"

    async def send_command(
        self,
        command: Union[str, bytes],
        parameters: Union[str, int, bool] = None,
        ignore_errors: bool = False,
        allow_warning: bool = True,
        **kwargs,
    ) -> dict:
        """Send an API command to the miner and return the result.

        Parameters:
            command: The command to sent to the miner.
            parameters: Any additional parameters to be sent with the command.
            ignore_errors: Whether to raise APIError when the command returns an error.
            allow_warning: Whether to warn if the command fails.

        Returns:
            The return data from the API command parsed from JSON into a dict.
        """
        logging.debug(
            f"{self} - (Send Privileged Command) - {command} "
            + f"with args {parameters}"
            if parameters
            else ""
        )
        # create the command
        cmd = {"command": command, **kwargs}
        if parameters:
            cmd["parameter"] = parameters

        # send the command
        data = await self._send_bytes(json.dumps(cmd).encode("utf-8"))

        if data is None:
            raise APIError("No data returned from the API.")

        if data == b"Socket connect failed: Connection refused\n":
            if not ignore_errors:
                raise APIError(data.decode("utf-8"))
            return {}

        data = self._load_api_data(data)

        # check for if the user wants to allow errors to return
        validation = validate_command_output(data)
        if not validation[0]:
            if not ignore_errors:
                # validate the command succeeded
                raise APIError(f"{command}: {validation[1]}")
            if allow_warning:
                logging.warning(
                    f"{self.ip}: API Command Error: {command}: {validation[1]}"
                )

        logging.debug(f"{self} - (Send Command) - Received data.")
        return data

    # Privileged command handler, only used by whatsminers, defined here for consistency.
    async def send_privileged_command(self, *args, **kwargs) -> dict:
        return await self.send_command(*args, **kwargs)

    async def multicommand(self, *commands: str, allow_warning: bool = True) -> dict:
        """Creates and sends multiple commands as one command to the miner.

        Parameters:
            *commands: The commands to send as a multicommand to the miner.
            allow_warning: A boolean to supress APIWarnings.

        """
        # make sure we can actually run each command, otherwise they will fail
        commands = self._check_commands(*commands)
        # standard multicommand format is "command1+command2"
        # doesn't work for S19 which uses the backup _send_split_multicommand
        command = "+".join(commands)
        try:
            data = await self.send_command(command, allow_warning=allow_warning)
        except APIError:
            data = await self._send_split_multicommand(*commands)
        data["multicommand"] = True
        return data

    async def _send_split_multicommand(
        self, *commands, allow_warning: bool = True
    ) -> dict:
        tasks = {}
        # send all commands individually
        for cmd in commands:
            tasks[cmd] = asyncio.create_task(
                self.send_command(cmd, allow_warning=allow_warning)
            )

        await asyncio.gather(*[tasks[cmd] for cmd in tasks], return_exceptions=True)

        data = {}
        for cmd in tasks:
            try:
                result = tasks[cmd].result()
                if result is None or result == {}:
                    result = {}
                data[cmd] = [result]
            except APIError:
                pass

        return data

    @property
    def commands(self) -> list:
        return self.get_commands()

    def get_commands(self) -> list:
        """Get a list of command accessible to a specific type of API on the miner.

        Returns:
            A list of all API commands that the miner supports.
        """
        return [
            func
            for func in
            # each function in self
            dir(self)
            if not func in ["commands", "open_api"]
            if callable(getattr(self, func)) and
            # no __ or _ methods
            not func.startswith("__") and not func.startswith("_") and
            # remove all functions that are in this base class
            func
            not in [
                func
                for func in dir(BaseMinerRPCAPI)
                if callable(getattr(BaseMinerRPCAPI, func))
            ]
        ]

    def _check_commands(self, *commands) -> list:
        allowed_commands = self.commands
        return_commands = []

        for command in commands:
            if command in allowed_commands:
                return_commands.append(command)
            else:
                warnings.warn(
                    f"""Removing incorrect command: {command}
If you are sure you want to use this command please use API.send_command("{command}", ignore_errors=True) instead.""",
                    APIWarning,
                )
        return return_commands

    async def _send_bytes(
        self,
        data: bytes,
        *,
        port: int = None,
        timeout: int = 100,
    ) -> bytes:
        if port is None:
            port = self.port
        logging.debug(f"{self} - ([Hidden] Send Bytes) - Sending")
        try:
            # get reader and writer streams
            reader, writer = await asyncio.open_connection(str(self.ip), port)
        # handle OSError 121
        except OSError as e:
            if e.errno == 121:
                logging.warning(
                    f"{self} - ([Hidden] Send Bytes) - Semaphore timeout expired."
                )
            return b"{}"

        # send the command
        data_task = asyncio.create_task(self._read_bytes(reader, timeout=timeout))
        logging.debug(f"{self} - ([Hidden] Send Bytes) - Writing")
        writer.write(data)
        logging.debug(f"{self} - ([Hidden] Send Bytes) - Draining")
        await writer.drain()

        await data_task
        ret_data = data_task.result()

        # close the connection
        logging.debug(f"{self} - ([Hidden] Send Bytes) - Closing")
        writer.close()
        await writer.wait_closed()

        return ret_data

    async def _read_bytes(self, reader: asyncio.StreamReader, timeout: int) -> bytes:
        ret_data = b""

        # loop to receive all the data
        logging.debug(f"{self} - ([Hidden] Send Bytes) - Receiving")
        try:
            ret_data = await asyncio.wait_for(reader.read(), timeout=timeout)
        except (asyncio.CancelledError, asyncio.TimeoutError) as e:
            raise e
        except Exception as e:
            logging.warning(f"{self} - ([Hidden] Send Bytes) - API Command Error {e}")
        return ret_data

    @staticmethod
    def _load_api_data(data: bytes) -> dict:
        # some json from the API returns with a null byte (\x00) on the end
        if data.endswith(b"\x00"):
            # handle the null byte
            str_data = data.decode("utf-8")[:-1]
        else:
            # no null byte
            str_data = data.decode("utf-8")
        # fix an error with a btminer return having an extra comma that breaks json.loads()
        str_data = str_data.replace(",}", "}")
        # fix an error with a btminer return having a newline that breaks json.loads()
        str_data = str_data.replace("\n", "")
        # fix an error with a bmminer return not having a specific comma that breaks json.loads()
        str_data = str_data.replace("}{", "},{")
        # fix an error with a bmminer return having a specific comma that breaks json.loads()
        str_data = str_data.replace("[,{", "[{")
        # fix an error with a btminer return having a missing comma. (2023-01-06 version)
        str_data = str_data.replace('""temp0', '","temp0')
        # fix an error with Avalonminers returning inf and nan
        str_data = str_data.replace("info", "1nfo")
        str_data = str_data.replace("inf", "0")
        str_data = str_data.replace("1nfo", "info")
        str_data = str_data.replace("nan", "0")
        # fix whatever this garbage from avalonminers is `,"id":1}`
        if str_data.startswith(","):
            str_data = f"{{{str_data[1:]}"
        # try to fix an error with overflowing the receive buffer
        # this can happen in cases such as bugged btminers returning arbitrary length error info with 100s of errors.
        if not str_data.endswith("}"):
            str_data = ",".join(str_data.split(",")[:-1]) + "}"

        # fix a really nasty bug with whatsminer API v2.0.4 where they return a list structured like a dict
        if re.search(r"\"error_code\":\[\".+\"\]", str_data):
            str_data = str_data.replace("[", "{").replace("]", "}")

        # parse the json
        try:
            parsed_data = json.loads(str_data)
        except json.decoder.JSONDecodeError as e:
            raise APIError(f"Decode Error {e}: {str_data}")
        return parsed_data

get_commands()

Get a list of command accessible to a specific type of API on the miner.

Returns:

Type Description
list

A list of all API commands that the miner supports.

Source code in pyasic/rpc/base.py
def get_commands(self) -> list:
    """Get a list of command accessible to a specific type of API on the miner.

    Returns:
        A list of all API commands that the miner supports.
    """
    return [
        func
        for func in
        # each function in self
        dir(self)
        if not func in ["commands", "open_api"]
        if callable(getattr(self, func)) and
        # no __ or _ methods
        not func.startswith("__") and not func.startswith("_") and
        # remove all functions that are in this base class
        func
        not in [
            func
            for func in dir(BaseMinerRPCAPI)
            if callable(getattr(BaseMinerRPCAPI, func))
        ]
    ]

multicommand(*commands, allow_warning=True) async

Creates and sends multiple commands as one command to the miner.

Parameters:

Name Type Description Default
*commands str

The commands to send as a multicommand to the miner.

()
allow_warning bool

A boolean to supress APIWarnings.

True
Source code in pyasic/rpc/base.py
async def multicommand(self, *commands: str, allow_warning: bool = True) -> dict:
    """Creates and sends multiple commands as one command to the miner.

    Parameters:
        *commands: The commands to send as a multicommand to the miner.
        allow_warning: A boolean to supress APIWarnings.

    """
    # make sure we can actually run each command, otherwise they will fail
    commands = self._check_commands(*commands)
    # standard multicommand format is "command1+command2"
    # doesn't work for S19 which uses the backup _send_split_multicommand
    command = "+".join(commands)
    try:
        data = await self.send_command(command, allow_warning=allow_warning)
    except APIError:
        data = await self._send_split_multicommand(*commands)
    data["multicommand"] = True
    return data

send_command(command, parameters=None, ignore_errors=False, allow_warning=True, **kwargs) async

Send an API command to the miner and return the result.

Parameters:

Name Type Description Default
command Union[str, bytes]

The command to sent to the miner.

required
parameters Union[str, int, bool]

Any additional parameters to be sent with the command.

None
ignore_errors bool

Whether to raise APIError when the command returns an error.

False
allow_warning bool

Whether to warn if the command fails.

True

Returns:

Type Description
dict

The return data from the API command parsed from JSON into a dict.

Source code in pyasic/rpc/base.py
async def send_command(
    self,
    command: Union[str, bytes],
    parameters: Union[str, int, bool] = None,
    ignore_errors: bool = False,
    allow_warning: bool = True,
    **kwargs,
) -> dict:
    """Send an API command to the miner and return the result.

    Parameters:
        command: The command to sent to the miner.
        parameters: Any additional parameters to be sent with the command.
        ignore_errors: Whether to raise APIError when the command returns an error.
        allow_warning: Whether to warn if the command fails.

    Returns:
        The return data from the API command parsed from JSON into a dict.
    """
    logging.debug(
        f"{self} - (Send Privileged Command) - {command} "
        + f"with args {parameters}"
        if parameters
        else ""
    )
    # create the command
    cmd = {"command": command, **kwargs}
    if parameters:
        cmd["parameter"] = parameters

    # send the command
    data = await self._send_bytes(json.dumps(cmd).encode("utf-8"))

    if data is None:
        raise APIError("No data returned from the API.")

    if data == b"Socket connect failed: Connection refused\n":
        if not ignore_errors:
            raise APIError(data.decode("utf-8"))
        return {}

    data = self._load_api_data(data)

    # check for if the user wants to allow errors to return
    validation = validate_command_output(data)
    if not validation[0]:
        if not ignore_errors:
            # validate the command succeeded
            raise APIError(f"{command}: {validation[1]}")
        if allow_warning:
            logging.warning(
                f"{self.ip}: API Command Error: {command}: {validation[1]}"
            )

    logging.debug(f"{self} - (Send Command) - Received data.")
    return data