pyasic

LUXMiner Backend

Bases: BaseMiner

Handler for LuxOS miners

Source code in pyasic/miners/backends/luxminer.py
class LUXMiner(BaseMiner):
    """Handler for LuxOS miners"""

    _rpc_cls = LUXMinerRPCAPI
    rpc: LUXMinerRPCAPI

    firmware = "LuxOS"

    data_locations = LUXMINER_DATA_LOC

    async def _get_session(self) -> Optional[str]:
        try:
            data = await self.rpc.session()
            if not data["SESSION"][0]["SessionID"] == "":
                return data["SESSION"][0]["SessionID"]
        except APIError:
            pass

        try:
            data = await self.rpc.logon()
            return data["SESSION"][0]["SessionID"]
        except (LookupError, APIError):
            return

    async def fault_light_on(self) -> bool:
        try:
            session_id = await self._get_session()
            if session_id:
                await self.rpc.ledset(session_id, "red", "blink")
            return True
        except (APIError, LookupError):
            pass
        return False

    async def fault_light_off(self) -> bool:
        try:
            session_id = await self._get_session()
            if session_id:
                await self.rpc.ledset(session_id, "red", "off")
            return True
        except (APIError, LookupError):
            pass
        return False

    async def restart_backend(self) -> bool:
        return await self.restart_luxminer()

    async def restart_luxminer(self) -> bool:
        try:
            session_id = await self._get_session()
            if session_id:
                await self.rpc.resetminer(session_id)
            return True
        except (APIError, LookupError):
            pass
        return False

    async def stop_mining(self) -> bool:
        try:
            session_id = await self._get_session()
            if session_id:
                await self.rpc.curtail(session_id)
            return True
        except (APIError, LookupError):
            pass
        return False

    async def resume_mining(self) -> bool:
        try:
            session_id = await self._get_session()
            if session_id:
                await self.rpc.wakeup(session_id)
            return True
        except (APIError, LookupError):
            pass

    async def reboot(self) -> bool:
        try:
            session_id = await self._get_session()
            if session_id:
                await self.rpc.rebootdevice(session_id)
            return True
        except (APIError, LookupError):
            pass
        return False

    async def get_config(self) -> MinerConfig:
        return self.config

    ##################################################
    ### DATA GATHERING FUNCTIONS (get_{some_data}) ###
    ##################################################

    async def _get_mac(self, rpc_config: dict = None) -> Optional[str]:
        mac = None
        if rpc_config is None:
            try:
                rpc_config = await self.rpc.config()
            except APIError:
                return None

        if rpc_config is not None:
            try:
                mac = rpc_config["CONFIG"][0]["MACAddr"]
            except KeyError:
                return None

        return mac

    async def _get_hashrate(self, rpc_summary: dict = None) -> Optional[float]:
        if rpc_summary is None:
            try:
                rpc_summary = await self.rpc.summary()
            except APIError:
                pass

        if rpc_summary is not None:
            try:
                return round(float(rpc_summary["SUMMARY"][0]["GHS 5s"] / 1000), 2)
            except (LookupError, ValueError, TypeError):
                pass

    async def _get_hashboards(self, rpc_stats: dict = None) -> List[HashBoard]:
        hashboards = []

        if rpc_stats is None:
            try:
                rpc_stats = await self.rpc.stats()
            except APIError:
                pass

        if rpc_stats is not None:
            try:
                board_offset = -1
                boards = rpc_stats["STATS"]
                if len(boards) > 1:
                    for board_num in range(1, 16, 5):
                        for _b_num in range(5):
                            b = boards[1].get(f"chain_acn{board_num + _b_num}")

                            if b and not b == 0 and board_offset == -1:
                                board_offset = board_num
                    if board_offset == -1:
                        board_offset = 1

                    for i in range(
                        board_offset, board_offset + self.expected_hashboards
                    ):
                        hashboard = HashBoard(
                            slot=i - board_offset, expected_chips=self.expected_chips
                        )

                        chip_temp = boards[1].get(f"temp{i}")
                        if chip_temp:
                            hashboard.chip_temp = round(chip_temp)

                        temp = boards[1].get(f"temp2_{i}")
                        if temp:
                            hashboard.temp = round(temp)

                        hashrate = boards[1].get(f"chain_rate{i}")
                        if hashrate:
                            hashboard.hashrate = round(float(hashrate) / 1000, 2)

                        chips = boards[1].get(f"chain_acn{i}")
                        if chips:
                            hashboard.chips = chips
                            hashboard.missing = False
                        if (not chips) or (not chips > 0):
                            hashboard.missing = True
                        hashboards.append(hashboard)
            except (LookupError, ValueError, TypeError):
                pass

        return hashboards

    async def _get_wattage(self, rpc_power: dict = None) -> Optional[int]:
        if rpc_power is None:
            try:
                rpc_power = await self.rpc.power()
            except APIError:
                pass

        if rpc_power is not None:
            try:
                return rpc_power["POWER"][0]["Watts"]
            except (LookupError, ValueError, TypeError):
                pass

    async def _get_fans(self, rpc_fans: dict = None) -> List[Fan]:
        if rpc_fans is None:
            try:
                rpc_fans = await self.rpc.fans()
            except APIError:
                pass

        fans = []

        if rpc_fans is not None:
            for fan in range(self.expected_fans):
                try:
                    fans.append(Fan(rpc_fans["FANS"][fan]["RPM"]))
                except (LookupError, ValueError, TypeError):
                    fans.append(Fan())
        return fans

    async def _get_expected_hashrate(self, rpc_stats: dict = None) -> Optional[float]:
        if rpc_stats is None:
            try:
                rpc_stats = await self.rpc.stats()
            except APIError:
                pass

        if rpc_stats is not None:
            try:
                expected_rate = rpc_stats["STATS"][1]["total_rateideal"]
                try:
                    rate_unit = rpc_stats["STATS"][1]["rate_unit"]
                except KeyError:
                    rate_unit = "GH"
                if rate_unit == "GH":
                    return round(expected_rate / 1000, 2)
                if rate_unit == "MH":
                    return round(expected_rate / 1000000, 2)
                else:
                    return round(expected_rate, 2)
            except LookupError:
                pass

    async def _get_uptime(self, rpc_stats: dict = None) -> Optional[int]:
        if rpc_stats is None:
            try:
                rpc_stats = await self.rpc.stats()
            except APIError:
                pass

        if rpc_stats is not None:
            try:
                return int(rpc_stats["STATS"][1]["Elapsed"])
            except LookupError:
                pass