pyasic

Miner APIs

Each miner has a unique API that is used to communicate with it. Each of these API types has commands that differ between them, and some commands have data that others do not. Each miner that is a subclass of BaseMiner should have an API linked to it as Miner.api.

All API implementations inherit from BaseMinerAPI, which implements the basic communications protocols.

BaseMinerAPI should never be used unless inheriting to create a new miner API class for a new type of miner (which should be exceedingly rare). BaseMinerAPI cannot be instantiated directly, it will raise a TypeError. Use these instead -

BFGMiner API

BMMiner API

BOSMiner API

BTMiner API

CGMiner API

Unknown API


BaseMinerAPI

Source code in pyasic/API/__init__.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
class BaseMinerAPI:
    def __init__(self, ip: str, port: int = 4028) -> None:
        # api port, should be 4028
        self.port = port
        # ip address of the miner
        self.ip = ipaddress.ip_address(ip)

    def __new__(cls, *args, **kwargs):
        if cls is BaseMinerAPI:
            raise TypeError(f"Only children of '{cls.__name__}' may be instantiated")
        return object.__new__(cls)

    def __repr__(self):
        return f"{self.__class__.__name__}: {str(self.ip)}"

    async def send_command(
        self,
        command: Union[str, bytes],
        parameters: Union[str, int, bool] = None,
        ignore_errors: bool = False,
        allow_warning: bool = True,
        **kwargs,
    ) -> dict:
        """Send an API command to the miner and return the result.

        Parameters:
            command: The command to sent to the miner.
            parameters: Any additional parameters to be sent with the command.
            ignore_errors: Whether to raise APIError when the command returns an error.
            allow_warning: Whether to warn if the command fails.

        Returns:
            The return data from the API command parsed from JSON into a dict.
        """
        logging.debug(
            f"{self} - (Send Privileged Command) - {command} "
            + f"with args {parameters}"
            if parameters
            else ""
        )
        # create the command
        cmd = {"command": command, **kwargs}
        if parameters:
            cmd["parameter"] = parameters

        # send the command
        data = await self._send_bytes(json.dumps(cmd).encode("utf-8"))

        data = self._load_api_data(data)

        # check for if the user wants to allow errors to return
        if not ignore_errors:
            # validate the command succeeded
            validation = self._validate_command_output(data)
            if not validation[0]:
                if allow_warning:
                    logging.warning(
                        f"{self.ip}: API Command Error: {command}: {validation[1]}"
                    )
                raise APIError(validation[1])

        logging.debug(f"{self} - (Send Command) - Received data.")
        return data

    # Privileged command handler, only used by whatsminers, defined here for consistency.
    async def send_privileged_command(self, *args, **kwargs) -> dict:
        return await self.send_command(*args, **kwargs)

    async def multicommand(self, *commands: str, allow_warning: bool = True) -> dict:
        """Creates and sends multiple commands as one command to the miner.

        Parameters:
            *commands: The commands to send as a multicommand to the miner.
            allow_warning: A boolean to supress APIWarnings.

        """
        while True:
            # make sure we can actually run each command, otherwise they will fail
            commands = self._check_commands(*commands)
            # standard multicommand format is "command1+command2"
            # standard format doesn't work for X19
            command = "+".join(commands)
            try:
                data = await self.send_command(command, allow_warning=allow_warning)
            except APIError as e:
                # try to identify the error
                if ":" in e.message:
                    err_command = e.message.split(":")[0]
                    if err_command in commands:
                        commands.remove(err_command)
                        continue
                return {command: [{}] for command in commands}
            logging.debug(f"{self} - (Multicommand) - Received data")
            data["multicommand"] = True
            return data

    @property
    def commands(self) -> list:
        return self.get_commands()

    def get_commands(self) -> list:
        """Get a list of command accessible to a specific type of API on the miner.

        Returns:
            A list of all API commands that the miner supports.
        """
        return [
            func
            for func in
            # each function in self
            dir(self)
            if not func == "commands"
            if callable(getattr(self, func)) and
            # no __ or _ methods
            not func.startswith("__") and not func.startswith("_") and
            # remove all functions that are in this base class
            func
            not in [
                func
                for func in dir(BaseMinerAPI)
                if callable(getattr(BaseMinerAPI, func))
            ]
        ]

    def _check_commands(self, *commands):
        allowed_commands = self.commands
        return_commands = []

        for command in commands:
            if command in allowed_commands:
                return_commands.append(command)
            else:
                warnings.warn(
                    f"""Removing incorrect command: {command}
If you are sure you want to use this command please use API.send_command("{command}", ignore_errors=True) instead.""",
                    APIWarning,
                )
        return return_commands

    async def _send_bytes(self, data: bytes, timeout: int = 100) -> bytes:
        logging.debug(f"{self} - ([Hidden] Send Bytes) - Sending")
        try:
            # get reader and writer streams
            reader, writer = await asyncio.open_connection(str(self.ip), self.port)
        # handle OSError 121
        except OSError as e:
            if e.errno == 121:
                logging.warning(
                    f"{self} - ([Hidden] Send Bytes) - Semaphore timeout expired."
                )
            return b"{}"

        # send the command
        logging.debug(f"{self} - ([Hidden] Send Bytes) - Writing")
        writer.write(data)
        logging.debug(f"{self} - ([Hidden] Send Bytes) - Draining")
        await writer.drain()
        try:
            ret_data = await asyncio.wait_for(reader.read(4096), timeout=timeout)
        except ConnectionAbortedError:
            return b"{}"
        try:
            # Fix for stupid whatsminer bug, reboot/restart seem to not load properly in the loop
            # have to receive, save the data, check if there is more data by reading with a short timeout
            # append that data if there is more, and then onto the main loop.
            ret_data += await asyncio.wait_for(reader.read(1), timeout=1)
        except asyncio.TimeoutError:
            return ret_data

        # loop to receive all the data
        logging.debug(f"{self} - ([Hidden] Send Bytes) - Receiving")
        try:
            while True:
                try:
                    d = await asyncio.wait_for(reader.read(4096), timeout=timeout)
                    if not d:
                        break
                    ret_data += d
                except (asyncio.CancelledError, asyncio.TimeoutError) as e:
                    raise e
        except (asyncio.CancelledError, asyncio.TimeoutError) as e:
            raise e
        except Exception as e:
            logging.warning(f"{self} - ([Hidden] Send Bytes) - API Command Error {e}")

        # close the connection
        logging.debug(f"{self} - ([Hidden] Send Bytes) - Closing")
        writer.close()
        await writer.wait_closed()

        return ret_data

    @staticmethod
    def _validate_command_output(data: dict) -> tuple:
        # check if the data returned is correct or an error
        # if status isn't a key, it is a multicommand
        if "STATUS" not in data.keys():
            for key in data.keys():
                # make sure not to try to turn id into a dict
                if not key == "id":
                    # make sure they succeeded
                    if "STATUS" in data[key][0].keys():
                        if data[key][0]["STATUS"][0]["STATUS"] not in ["S", "I"]:
                            # this is an error
                            return False, f"{key}: " + data[key][0]["STATUS"][0]["Msg"]
        elif "id" not in data.keys():
            if data["STATUS"] not in ["S", "I"]:
                return False, data["Msg"]
        else:
            # make sure the command succeeded
            if type(data["STATUS"]) == str:
                if data["STATUS"] in ["RESTART"]:
                    return True, None
            elif data["STATUS"][0]["STATUS"] not in ("S", "I"):
                # this is an error
                if data["STATUS"][0]["STATUS"] not in ("S", "I"):
                    return False, data["STATUS"][0]["Msg"]
        return True, None

    @staticmethod
    def _load_api_data(data: bytes) -> dict:
        # some json from the API returns with a null byte (\x00) on the end
        if data.endswith(b"\x00"):
            # handle the null byte
            str_data = data.decode("utf-8")[:-1]
        else:
            # no null byte
            str_data = data.decode("utf-8")
        # fix an error with a btminer return having an extra comma that breaks json.loads()
        str_data = str_data.replace(",}", "}")
        # fix an error with a btminer return having a newline that breaks json.loads()
        str_data = str_data.replace("\n", "")
        # fix an error with a bmminer return not having a specific comma that breaks json.loads()
        str_data = str_data.replace("}{", "},{")
        # fix an error with a bmminer return having a specific comma that breaks json.loads()
        str_data = str_data.replace("[,{", "[{")
        # fix an error with a btminer return having a missing comma. (2023-01-06 version)
        str_data = str_data.replace('""temp0', '","temp0')
        # fix an error with Avalonminers returning inf and nan
        str_data = str_data.replace("info", "1nfo")
        str_data = str_data.replace("inf", "0")
        str_data = str_data.replace("1nfo", "info")
        str_data = str_data.replace("nan", "0")
        # fix whatever this garbage from avalonminers is `,"id":1}`
        if str_data.startswith(","):
            str_data = f"{{{str_data[1:]}"
        # try to fix an error with overflowing the receive buffer
        # this can happen in cases such as bugged btminers returning arbitrary length error info with 100s of errors.
        if not str_data.endswith("}"):
            str_data = ",".join(str_data.split(",")[:-1]) + "}"

        # fix a really nasty bug with whatsminer API v2.0.4 where they return a list structured like a dict
        if re.search(r"\"error_code\":\[\".+\"\]", str_data):
            str_data = str_data.replace("[", "{").replace("]", "}")

        # parse the json
        try:
            parsed_data = json.loads(str_data)
        except json.decoder.JSONDecodeError as e:
            raise APIError(f"Decode Error {e}: {str_data}")
        return parsed_data

get_commands()

Get a list of command accessible to a specific type of API on the miner.

Returns:

Type Description
list

A list of all API commands that the miner supports.

Source code in pyasic/API/__init__.py
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def get_commands(self) -> list:
    """Get a list of command accessible to a specific type of API on the miner.

    Returns:
        A list of all API commands that the miner supports.
    """
    return [
        func
        for func in
        # each function in self
        dir(self)
        if not func == "commands"
        if callable(getattr(self, func)) and
        # no __ or _ methods
        not func.startswith("__") and not func.startswith("_") and
        # remove all functions that are in this base class
        func
        not in [
            func
            for func in dir(BaseMinerAPI)
            if callable(getattr(BaseMinerAPI, func))
        ]
    ]

multicommand(*commands, allow_warning=True) async

Creates and sends multiple commands as one command to the miner.

Parameters:

Name Type Description Default
*commands str

The commands to send as a multicommand to the miner.

()
allow_warning bool

A boolean to supress APIWarnings.

True
Source code in pyasic/API/__init__.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
async def multicommand(self, *commands: str, allow_warning: bool = True) -> dict:
    """Creates and sends multiple commands as one command to the miner.

    Parameters:
        *commands: The commands to send as a multicommand to the miner.
        allow_warning: A boolean to supress APIWarnings.

    """
    while True:
        # make sure we can actually run each command, otherwise they will fail
        commands = self._check_commands(*commands)
        # standard multicommand format is "command1+command2"
        # standard format doesn't work for X19
        command = "+".join(commands)
        try:
            data = await self.send_command(command, allow_warning=allow_warning)
        except APIError as e:
            # try to identify the error
            if ":" in e.message:
                err_command = e.message.split(":")[0]
                if err_command in commands:
                    commands.remove(err_command)
                    continue
            return {command: [{}] for command in commands}
        logging.debug(f"{self} - (Multicommand) - Received data")
        data["multicommand"] = True
        return data

send_command(command, parameters=None, ignore_errors=False, allow_warning=True, **kwargs) async

Send an API command to the miner and return the result.

Parameters:

Name Type Description Default
command Union[str, bytes]

The command to sent to the miner.

required
parameters Union[str, int, bool]

Any additional parameters to be sent with the command.

None
ignore_errors bool

Whether to raise APIError when the command returns an error.

False
allow_warning bool

Whether to warn if the command fails.

True

Returns:

Type Description
dict

The return data from the API command parsed from JSON into a dict.

Source code in pyasic/API/__init__.py
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
async def send_command(
    self,
    command: Union[str, bytes],
    parameters: Union[str, int, bool] = None,
    ignore_errors: bool = False,
    allow_warning: bool = True,
    **kwargs,
) -> dict:
    """Send an API command to the miner and return the result.

    Parameters:
        command: The command to sent to the miner.
        parameters: Any additional parameters to be sent with the command.
        ignore_errors: Whether to raise APIError when the command returns an error.
        allow_warning: Whether to warn if the command fails.

    Returns:
        The return data from the API command parsed from JSON into a dict.
    """
    logging.debug(
        f"{self} - (Send Privileged Command) - {command} "
        + f"with args {parameters}"
        if parameters
        else ""
    )
    # create the command
    cmd = {"command": command, **kwargs}
    if parameters:
        cmd["parameter"] = parameters

    # send the command
    data = await self._send_bytes(json.dumps(cmd).encode("utf-8"))

    data = self._load_api_data(data)

    # check for if the user wants to allow errors to return
    if not ignore_errors:
        # validate the command succeeded
        validation = self._validate_command_output(data)
        if not validation[0]:
            if allow_warning:
                logging.warning(
                    f"{self.ip}: API Command Error: {command}: {validation[1]}"
                )
            raise APIError(validation[1])

    logging.debug(f"{self} - (Send Command) - Received data.")
    return data