pyasic

BTMiner Backend

Bases: BaseMiner

Base handler for BTMiner based miners.

Source code in pyasic/miners/backends/btminer.py
class BTMiner(BaseMiner):
    """Base handler for BTMiner based miners."""

    _rpc_cls = BTMinerRPCAPI
    rpc: BTMinerRPCAPI

    data_locations = BTMINER_DATA_LOC

    supports_shutdown = True
    supports_power_modes = True

    async def _reset_rpc_pwd_to_admin(self, pwd: str):
        try:
            data = await self.rpc.update_pwd(pwd, "admin")
        except APIError:
            return False
        if data:
            if "Code" in data.keys():
                if data["Code"] == 131:
                    return True
        return False

    async def fault_light_off(self) -> bool:
        try:
            data = await self.rpc.set_led(auto=True)
        except APIError:
            return False
        if data:
            if "Code" in data.keys():
                if data["Code"] == 131:
                    self.light = False
                    return True
        return False

    async def fault_light_on(self) -> bool:
        try:
            data = await self.rpc.set_led(auto=False)
            await self.rpc.set_led(
                auto=False, color="green", start=0, period=1, duration=0
            )
        except APIError:
            return False
        if data:
            if "Code" in data.keys():
                if data["Code"] == 131:
                    self.light = True
                    return True
        return False

    async def reboot(self) -> bool:
        try:
            data = await self.rpc.reboot()
        except APIError:
            return False
        if data.get("Msg"):
            if data["Msg"] == "API command OK":
                return True
        return False

    async def restart_backend(self) -> bool:
        try:
            data = await self.rpc.restart()
        except APIError:
            return False
        if data.get("Msg"):
            if data["Msg"] == "API command OK":
                return True
        return False

    async def stop_mining(self) -> bool:
        try:
            data = await self.rpc.power_off(respbefore=True)
        except APIError:
            return False
        if data.get("Msg"):
            if data["Msg"] == "API command OK":
                return True
        return False

    async def resume_mining(self) -> bool:
        try:
            data = await self.rpc.power_on()
        except APIError:
            return False
        if data.get("Msg"):
            if data["Msg"] == "API command OK":
                return True
        return False

    async def send_config(self, config: MinerConfig, user_suffix: str = None) -> None:
        self.config = config

        conf = config.as_wm(user_suffix=user_suffix)
        pools_conf = conf["pools"]

        try:
            await self.rpc.update_pools(**pools_conf)

            if conf["mode"] == "normal":
                await self.rpc.set_normal_power()
            elif conf["mode"] == "high":
                await self.rpc.set_high_power()
            elif conf["mode"] == "low":
                await self.rpc.set_low_power()
            elif conf["mode"] == "power_tuning":
                await self.rpc.adjust_power_limit(conf["power_tuning"]["wattage"])
        except APIError:
            # cannot update, no API access usually
            pass

    async def get_config(self) -> MinerConfig:
        pools = None
        summary = None
        status = None
        try:
            data = await self.rpc.multicommand("pools", "summary", "status")
            pools = data["pools"][0]
            summary = data["summary"][0]
            status = data["status"][0]
        except APIError as e:
            logging.warning(e)
        except LookupError:
            pass

        if pools is not None:
            cfg = MinerConfig.from_api(pools)
        else:
            cfg = MinerConfig()

        is_mining = await self._is_mining(status)
        if not is_mining:
            cfg.mining_mode = MiningModeConfig.sleep()
            return cfg

        if summary is not None:
            mining_mode = None
            try:
                mining_mode = summary["SUMMARY"][0]["Power Mode"]
            except LookupError:
                pass

            if mining_mode == "High":
                cfg.mining_mode = MiningModeConfig.high()
                return cfg
            elif mining_mode == "Low":
                cfg.mining_mode = MiningModeConfig.low()
                return cfg
            try:
                power_lim = summary["SUMMARY"][0]["Power Limit"]
            except LookupError:
                power_lim = None

            if power_lim is None:
                cfg.mining_mode = MiningModeConfig.normal()
                return cfg

            cfg.mining_mode = MiningModeConfig.power_tuning(power_lim)
            self.config = cfg
            return self.config

    async def set_power_limit(self, wattage: int) -> bool:
        try:
            await self.rpc.adjust_power_limit(wattage)
        except Exception as e:
            logging.warning(f"{self} set_power_limit: {e}")
            return False
        else:
            return True

    ##################################################
    ### DATA GATHERING FUNCTIONS (get_{some_data}) ###
    ##################################################

    async def _get_mac(
        self, rpc_summary: dict = None, rpc_get_miner_info: dict = None
    ) -> Optional[str]:
        if rpc_get_miner_info is None:
            try:
                rpc_get_miner_info = await self.rpc.get_miner_info()
            except APIError:
                pass

        if rpc_get_miner_info is not None:
            try:
                mac = rpc_get_miner_info["Msg"]["mac"]
                return str(mac).upper()
            except KeyError:
                pass

        if rpc_summary is None:
            try:
                rpc_summary = await self.rpc.summary()
            except APIError:
                pass

        if rpc_summary is not None:
            try:
                mac = rpc_summary["SUMMARY"][0]["MAC"]
                return str(mac).upper()
            except LookupError:
                pass

    async def _get_api_ver(self, rpc_get_version: dict = None) -> Optional[str]:
        if rpc_get_version is None:
            try:
                rpc_get_version = await self.rpc.get_version()
            except APIError:
                pass

        if rpc_get_version is not None:
            if "Code" in rpc_get_version.keys():
                if rpc_get_version["Code"] == 131:
                    try:
                        rpc_ver = rpc_get_version["Msg"]
                        if not isinstance(rpc_ver, str):
                            rpc_ver = rpc_ver["rpc_ver"]
                        self.api_ver = rpc_ver.replace("whatsminer v", "")
                    except (KeyError, TypeError):
                        pass
                    else:
                        self.rpc.rpc_ver = self.api_ver
                        return self.api_ver

        return self.api_ver

    async def _get_fw_ver(
        self, rpc_get_version: dict = None, rpc_summary: dict = None
    ) -> Optional[str]:
        if rpc_get_version is None:
            try:
                rpc_get_version = await self.rpc.get_version()
            except APIError:
                pass

        if rpc_get_version is not None:
            if "Code" in rpc_get_version.keys():
                if rpc_get_version["Code"] == 131:
                    try:
                        self.fw_ver = rpc_get_version["Msg"]["fw_ver"]
                    except (KeyError, TypeError):
                        pass
                    else:
                        return self.fw_ver

        if rpc_summary is None:
            try:
                rpc_summary = await self.rpc.summary()
            except APIError:
                pass

        if rpc_summary:
            try:
                self.fw_ver = rpc_summary["SUMMARY"][0]["Firmware Version"].replace(
                    "'", ""
                )
            except LookupError:
                pass

        return self.fw_ver

    async def _get_hostname(self, rpc_get_miner_info: dict = None) -> Optional[str]:
        hostname = None
        if rpc_get_miner_info is None:
            try:
                rpc_get_miner_info = await self.rpc.get_miner_info()
            except APIError:
                return None  # only one way to get this

        if rpc_get_miner_info is not None:
            try:
                hostname = rpc_get_miner_info["Msg"]["hostname"]
            except KeyError:
                return None

        return hostname

    async def _get_hashrate(self, rpc_summary: dict = None) -> Optional[float]:
        if rpc_summary is None:
            try:
                rpc_summary = await self.rpc.summary()
            except APIError:
                pass

        if rpc_summary is not None:
            try:
                return round(float(rpc_summary["SUMMARY"][0]["MHS 1m"] / 1000000), 2)
            except LookupError:
                pass

    async def _get_hashboards(self, rpc_devs: dict = None) -> List[HashBoard]:
        hashboards = [
            HashBoard(slot=i, expected_chips=self.expected_chips)
            for i in range(self.expected_hashboards)
        ]

        if rpc_devs is None:
            try:
                rpc_devs = await self.rpc.devs()
            except APIError:
                pass

        if rpc_devs is not None:
            try:
                for board in rpc_devs["DEVS"]:
                    if len(hashboards) < board["ASC"] + 1:
                        hashboards.append(
                            HashBoard(
                                slot=board["ASC"], expected_chips=self.expected_chips
                            )
                        )
                        self.expected_hashboards += 1
                    hashboards[board["ASC"]].chip_temp = round(board["Chip Temp Avg"])
                    hashboards[board["ASC"]].temp = round(board["Temperature"])
                    hashboards[board["ASC"]].hashrate = round(
                        float(board["MHS 1m"] / 1000000), 2
                    )
                    hashboards[board["ASC"]].chips = board["Effective Chips"]
                    hashboards[board["ASC"]].serial_number = board["PCB SN"]
                    hashboards[board["ASC"]].missing = False
            except LookupError:
                pass

        return hashboards

    async def _get_env_temp(self, rpc_summary: dict = None) -> Optional[float]:
        if rpc_summary is None:
            try:
                rpc_summary = await self.rpc.summary()
            except APIError:
                pass

        if rpc_summary is not None:
            try:
                return rpc_summary["SUMMARY"][0]["Env Temp"]
            except LookupError:
                pass

    async def _get_wattage(self, rpc_summary: dict = None) -> Optional[int]:
        if rpc_summary is None:
            try:
                rpc_summary = await self.rpc.summary()
            except APIError:
                pass

        if rpc_summary is not None:
            try:
                wattage = rpc_summary["SUMMARY"][0]["Power"]
                return wattage if not wattage == -1 else None
            except LookupError:
                pass

    async def _get_wattage_limit(self, rpc_summary: dict = None) -> Optional[int]:
        if rpc_summary is None:
            try:
                rpc_summary = await self.rpc.summary()
            except APIError:
                pass

        if rpc_summary is not None:
            try:
                return rpc_summary["SUMMARY"][0]["Power Limit"]
            except LookupError:
                pass

    async def _get_fans(
        self, rpc_summary: dict = None, rpc_get_psu: dict = None
    ) -> List[Fan]:
        if rpc_summary is None:
            try:
                rpc_summary = await self.rpc.summary()
            except APIError:
                pass

        fans = [Fan() for _ in range(self.expected_fans)]
        if rpc_summary is not None:
            try:
                if self.expected_fans > 0:
                    fans = [
                        Fan(rpc_summary["SUMMARY"][0].get("Fan Speed In", 0)),
                        Fan(rpc_summary["SUMMARY"][0].get("Fan Speed Out", 0)),
                    ]
            except LookupError:
                pass

        return fans

    async def _get_fan_psu(
        self, rpc_summary: dict = None, rpc_get_psu: dict = None
    ) -> Optional[int]:
        if rpc_summary is None:
            try:
                rpc_summary = await self.rpc.summary()
            except APIError:
                pass

        if rpc_summary is not None:
            try:
                return int(rpc_summary["SUMMARY"][0]["Power Fanspeed"])
            except LookupError:
                pass

        if rpc_get_psu is None:
            try:
                rpc_get_psu = await self.rpc.get_psu()
            except APIError:
                pass

        if rpc_get_psu is not None:
            try:
                return int(rpc_get_psu["Msg"]["fan_speed"])
            except (KeyError, TypeError):
                pass

    async def _get_errors(
        self, rpc_summary: dict = None, rpc_get_error_code: dict = None
    ) -> List[MinerErrorData]:
        errors = []
        if rpc_get_error_code is None and rpc_summary is None:
            try:
                rpc_get_error_code = await self.rpc.get_error_code()
            except APIError:
                pass

        if rpc_get_error_code is not None:
            try:
                for err in rpc_get_error_code["Msg"]["error_code"]:
                    if isinstance(err, dict):
                        for code in err:
                            errors.append(WhatsminerError(error_code=int(code)))
                    else:
                        errors.append(WhatsminerError(error_code=int(err)))
            except KeyError:
                pass

        if rpc_summary is None:
            try:
                rpc_summary = await self.rpc.summary()
            except APIError:
                pass

        if rpc_summary is not None:
            try:
                for i in range(rpc_summary["SUMMARY"][0]["Error Code Count"]):
                    err = rpc_summary["SUMMARY"][0].get(f"Error Code {i}")
                    if err:
                        errors.append(WhatsminerError(error_code=err))
            except (LookupError, ValueError, TypeError):
                pass
        return errors

    async def _get_expected_hashrate(self, rpc_summary: dict = None) -> Optional[float]:
        if rpc_summary is None:
            try:
                rpc_summary = await self.rpc.summary()
            except APIError:
                pass

        if rpc_summary is not None:
            try:
                expected_hashrate = rpc_summary["SUMMARY"][0]["Factory GHS"]
                if expected_hashrate:
                    return round(expected_hashrate / 1000, 2)
            except LookupError:
                pass

    async def _get_fault_light(self, rpc_get_miner_info: dict = None) -> Optional[bool]:
        if rpc_get_miner_info is None:
            try:
                rpc_get_miner_info = await self.rpc.get_miner_info()
            except APIError:
                if not self.light:
                    self.light = False

        if rpc_get_miner_info is not None:
            try:
                self.light = not (rpc_get_miner_info["Msg"]["ledstat"] == "auto")
            except KeyError:
                pass

        return self.light if self.light else False

    async def set_static_ip(
        self,
        ip: str,
        dns: str,
        gateway: str,
        subnet_mask: str = "255.255.255.0",
        hostname: str = None,
    ):
        if not hostname:
            hostname = await self.get_hostname()
        await self.rpc.net_config(
            ip=ip, mask=subnet_mask, dns=dns, gate=gateway, host=hostname, dhcp=False
        )

    async def set_dhcp(self, hostname: str = None):
        if hostname:
            await self.set_hostname(hostname)
        await self.rpc.net_config()

    async def set_hostname(self, hostname: str):
        await self.rpc.set_hostname(hostname)

    async def _is_mining(self, rpc_status: dict = None) -> Optional[bool]:
        if rpc_status is None:
            try:
                rpc_status = await self.rpc.status()
            except APIError:
                pass

        if rpc_status is not None:
            try:
                if rpc_status["Msg"].get("btmineroff"):
                    try:
                        await self.rpc.devdetails()
                    except APIError:
                        return False
                    return True
                return True if rpc_status["Msg"]["mineroff"] == "false" else False
            except LookupError:
                pass

    async def _get_uptime(self, rpc_summary: dict = None) -> Optional[int]:
        if rpc_summary is None:
            try:
                rpc_summary = await self.rpc.summary()
            except APIError:
                pass

        if rpc_summary is not None:
            try:
                return int(rpc_summary["SUMMARY"][0]["Elapsed"])
            except LookupError:
                pass