pyasic

Miner Factory

MinerFactory is the way to create miner types in pyasic. The most important method is get_miner(), which is mapped to pyasic.get_miner(), and should be used from there.

The instance used for pyasic.get_miner() is pyasic.miner_factory.

MinerFactory also keeps a cache, which can be cleared if needed with pyasic.miner_factory.clear_cached_miners().

Finally, there is functionality to get multiple miners without using asyncio.gather() explicitly. Use pyasic.miner_factory.get_multiple_miners() with a list of IPs as strings to get a list of miner instances. You can also get multiple miners with an AsyncGenerator by using pyasic.miner_factory.get_miner_generator().

Source code in pyasic/miners/factory.py
class MinerFactory:
    async def get_multiple_miners(
        self, ips: list[str], limit: int = 200
    ) -> list[AnyMiner]:
        results = []

        async for miner in self.get_miner_generator(ips, limit):
            results.append(miner)

        return results

    async def get_miner_generator(
        self, ips: list, limit: int = 200
    ) -> AsyncGenerator[AnyMiner]:
        tasks = []
        semaphore = asyncio.Semaphore(limit)

        for ip in ips:
            tasks.append(asyncio.create_task(self.get_miner(ip)))

        for task in tasks:
            async with semaphore:
                result = await task
                if result is not None:
                    yield result

    async def get_miner(self, ip: str | ipaddress.ip_address) -> AnyMiner | None:
        ip = str(ip)

        miner_type = None

        for _ in range(settings.get("factory_get_retries", 1)):
            task = asyncio.create_task(self._get_miner_type(ip))
            try:
                miner_type = await asyncio.wait_for(
                    task, timeout=settings.get("factory_get_timeout", 3)
                )
            except asyncio.TimeoutError:
                continue
            else:
                if miner_type is not None:
                    break

        if miner_type is not None:
            miner_model = None
            miner_model_fns = {
                MinerTypes.ANTMINER: self.get_miner_model_antminer,
                MinerTypes.WHATSMINER: self.get_miner_model_whatsminer,
                MinerTypes.AVALONMINER: self.get_miner_model_avalonminer,
                MinerTypes.INNOSILICON: self.get_miner_model_innosilicon,
                MinerTypes.GOLDSHELL: self.get_miner_model_goldshell,
                MinerTypes.BRAIINS_OS: self.get_miner_model_braiins_os,
                MinerTypes.VNISH: self.get_miner_model_vnish,
                MinerTypes.EPIC: self.get_miner_model_epic,
                MinerTypes.HIVEON: self.get_miner_model_hiveon,
                MinerTypes.LUX_OS: self.get_miner_model_luxos,
                MinerTypes.AURADINE: self.get_miner_model_auradine,
                MinerTypes.MARATHON: self.get_miner_model_marathon,
            }
            fn = miner_model_fns.get(miner_type)

            if fn is not None:
                # noinspection PyArgumentList
                task = asyncio.create_task(fn(ip))
                try:
                    miner_model = await asyncio.wait_for(
                        task, timeout=settings.get("factory_get_timeout", 3)
                    )
                except asyncio.TimeoutError:
                    pass
            miner = self._select_miner_from_classes(
                ip,
                miner_type=miner_type,
                miner_model=miner_model,
            )

            return miner

    async def _get_miner_type(self, ip: str) -> MinerTypes | None:
        tasks = [
            asyncio.create_task(self._get_miner_web(ip)),
            asyncio.create_task(self._get_miner_socket(ip)),
        ]

        return await concurrent_get_first_result(tasks, lambda x: x is not None)

    async def _get_miner_web(self, ip: str) -> MinerTypes | None:
        tasks = []
        try:
            urls = [f"http://{ip}/", f"https://{ip}/"]
            async with httpx.AsyncClient(
                transport=settings.transport(verify=False)
            ) as session:
                tasks = [
                    asyncio.create_task(self._web_ping(session, url)) for url in urls
                ]

                text, resp = await concurrent_get_first_result(
                    tasks,
                    lambda x: x[0] is not None
                    and self._parse_web_type(x[0], x[1]) is not None,
                )
                if text is not None:
                    return self._parse_web_type(text, resp)
        except asyncio.CancelledError:
            for t in tasks:
                t.cancel()
                try:
                    await t
                except asyncio.CancelledError:
                    pass

    @staticmethod
    async def _web_ping(
        session: httpx.AsyncClient, url: str
    ) -> tuple[str | None, httpx.Response | None]:
        try:
            resp = await session.get(url, follow_redirects=True)
            return resp.text, resp
        except (
            httpx.HTTPError,
            asyncio.TimeoutError,
            anyio.EndOfStream,
            anyio.ClosedResourceError,
        ):
            pass
        return None, None

    @staticmethod
    def _parse_web_type(web_text: str, web_resp: httpx.Response) -> MinerTypes | None:
        if web_resp.status_code == 401 and 'realm="antMiner' in web_resp.headers.get(
            "www-authenticate", ""
        ):
            return MinerTypes.ANTMINER
        if len(web_resp.history) > 0:
            history_resp = web_resp.history[0]
            if (
                "/cgi-bin/luci" in web_text
                and history_resp.status_code == 307
                and "https://" in history_resp.headers.get("location", "")
            ):
                return MinerTypes.WHATSMINER
        if "Braiins OS" in web_text:
            return MinerTypes.BRAIINS_OS
        if "cloud-box" in web_text:
            return MinerTypes.GOLDSHELL
        if "AnthillOS" in web_text:
            return MinerTypes.VNISH
        if "Miner Web Dashboard" in web_text:
            return MinerTypes.EPIC
        if "Avalon" in web_text:
            return MinerTypes.AVALONMINER
        if "DragonMint" in web_text:
            return MinerTypes.INNOSILICON
        if "Miner UI" in web_text:
            return MinerTypes.AURADINE

    async def _get_miner_socket(self, ip: str) -> MinerTypes | None:
        tasks = []
        try:
            commands = ["version", "devdetails"]
            tasks = [
                asyncio.create_task(self._socket_ping(ip, cmd)) for cmd in commands
            ]

            data = await concurrent_get_first_result(
                tasks,
                lambda x: x is not None and self._parse_socket_type(x) is not None,
            )
            if data is not None:
                d = self._parse_socket_type(data)
                return d
        except asyncio.CancelledError:
            for t in tasks:
                t.cancel()
                try:
                    await t
                except asyncio.CancelledError:
                    pass

    @staticmethod
    async def _socket_ping(ip: str, cmd: str) -> str | None:
        data = b""
        try:
            reader, writer = await asyncio.wait_for(
                asyncio.open_connection(str(ip), 4028),
                timeout=settings.get("factory_get_timeout", 3),
            )
        except (ConnectionError, OSError, asyncio.TimeoutError):
            return

        cmd = {"command": cmd}

        try:
            # send the command
            writer.write(json.dumps(cmd).encode("utf-8"))
            await writer.drain()

            # loop to receive all the data
            while True:
                try:
                    d = await asyncio.wait_for(reader.read(4096), timeout=1)
                    if not d:
                        break
                    data += d
                except asyncio.TimeoutError:
                    pass
                except ConnectionResetError:
                    return
        except asyncio.CancelledError:
            raise
        except (ConnectionError, OSError):
            return
        finally:
            # Handle cancellation explicitly
            if writer.transport.is_closing():
                writer.transport.close()
            else:
                writer.close()
            try:
                await writer.wait_closed()
            except (ConnectionError, OSError):
                return
        if data:
            return data.decode("utf-8")

    @staticmethod
    def _parse_socket_type(data: str) -> MinerTypes | None:
        upper_data = data.upper()
        if "BOSMINER" in upper_data or "BOSER" in upper_data:
            return MinerTypes.BRAIINS_OS
        if "BTMINER" in upper_data or "BITMICRO" in upper_data:
            return MinerTypes.WHATSMINER
        if "VNISH" in upper_data:
            return MinerTypes.VNISH
        if "HIVEON" in upper_data:
            return MinerTypes.HIVEON
        if "LUXMINER" in upper_data:
            return MinerTypes.LUX_OS
        if "KAONSU" in upper_data:
            return MinerTypes.MARATHON
        if "ANTMINER" in upper_data and "DEVDETAILS" not in upper_data:
            return MinerTypes.ANTMINER
        if (
            "INTCHAINS_QOMO" in upper_data
            or "KDAMINER" in upper_data
            or "BFGMINER" in upper_data
        ):
            return MinerTypes.GOLDSHELL
        if "AVALON" in upper_data:
            return MinerTypes.AVALONMINER
        if "GCMINER" in upper_data or "FLUXOS" in upper_data:
            return MinerTypes.AURADINE

    async def send_web_command(
        self,
        ip: str,
        location: str,
        auth: httpx.DigestAuth = None,
    ) -> dict | None:
        async with httpx.AsyncClient(transport=settings.transport()) as session:
            try:
                data = await session.get(
                    f"http://{ip}{location}",
                    auth=auth,
                    timeout=settings.get("factory_get_timeout", 3),
                )
            except (httpx.HTTPError, asyncio.TimeoutError):
                logger.info(f"{ip}: Web command timeout.")
                return
        if data is None:
            return
        try:
            json_data = data.json()
        except (json.JSONDecodeError, asyncio.TimeoutError):
            try:
                return json.loads(data.text)
            except (json.JSONDecodeError, httpx.HTTPError):
                return
        else:
            return json_data

    async def send_api_command(self, ip: str, command: str) -> dict | None:
        data = b""
        try:
            reader, writer = await asyncio.open_connection(ip, 4028)
        except (ConnectionError, OSError):
            return
        cmd = {"command": command}

        try:
            # send the command
            writer.write(json.dumps(cmd).encode("utf-8"))
            await writer.drain()

            # loop to receive all the data
            while True:
                d = await reader.read(4096)
                if not d:
                    break
                data += d

            writer.close()
            await writer.wait_closed()
        except asyncio.CancelledError:
            writer.close()
            await writer.wait_closed()
            return
        except (ConnectionError, OSError):
            return
        if data == b"Socket connect failed: Connection refused\n":
            return

        data = await self._fix_api_data(data)

        try:
            data = json.loads(data)
        except json.JSONDecodeError:
            return {}

        return data

    @staticmethod
    async def _fix_api_data(data: bytes) -> str:
        if data.endswith(b"\x00"):
            str_data = data.decode("utf-8")[:-1]
        else:
            str_data = data.decode("utf-8")
        # fix an error with a btminer return having an extra comma that breaks json.loads()
        str_data = str_data.replace(",}", "}")
        # fix an error with a btminer return having a newline that breaks json.loads()
        str_data = str_data.replace("\n", "")
        # fix an error with a bmminer return not having a specific comma that breaks json.loads()
        str_data = str_data.replace("}{", "},{")
        # fix an error with a bmminer return having a specific comma that breaks json.loads()
        str_data = str_data.replace("[,{", "[{")
        # fix an error with a btminer return having a missing comma. (2023-01-06 version)
        str_data = str_data.replace('""temp0', '","temp0')
        # fix an error with Avalonminers returning inf and nan
        str_data = str_data.replace("info", "1nfo")
        str_data = str_data.replace("inf", "0")
        str_data = str_data.replace("1nfo", "info")
        str_data = str_data.replace("nan", "0")
        # fix whatever this garbage from avalonminers is `,"id":1}`
        if str_data.startswith(","):
            str_data = f"{{{str_data[1:]}"
        # try to fix an error with overflowing the recieve buffer
        # this can happen in cases such as bugged btminers returning arbitrary length error info with 100s of errors.
        if not str_data.endswith("}"):
            str_data = ",".join(str_data.split(",")[:-1]) + "}"

        # fix a really nasty bug with whatsminer API v2.0.4 where they return a list structured like a dict
        if re.search(r"\"error_code\":\[\".+\"]", str_data):
            str_data = str_data.replace("[", "{").replace("]", "}")

        return str_data

    @staticmethod
    def _select_miner_from_classes(
        ip: ipaddress.ip_address,
        miner_model: str | None,
        miner_type: MinerTypes | None,
    ) -> AnyMiner | None:
        try:
            return MINER_CLASSES[miner_type][str(miner_model).upper()](ip)
        except LookupError:
            if miner_type in MINER_CLASSES:
                return MINER_CLASSES[miner_type][None](ip)
            return UnknownMiner(str(ip))

    async def get_miner_model_antminer(self, ip: str) -> str | None:
        tasks = [
            asyncio.create_task(self._get_model_antminer_web(ip)),
            asyncio.create_task(self._get_model_antminer_sock(ip)),
        ]

        return await concurrent_get_first_result(tasks, lambda x: x is not None)

    async def _get_model_antminer_web(self, ip: str) -> str | None:
        # last resort, this is slow
        auth = httpx.DigestAuth("root", "root")
        web_json_data = await self.send_web_command(
            ip, "/cgi-bin/get_system_info.cgi", auth=auth
        )

        try:
            miner_model = web_json_data["minertype"]

            return miner_model
        except (TypeError, LookupError):
            pass

    async def _get_model_antminer_sock(self, ip: str) -> str | None:
        sock_json_data = await self.send_api_command(ip, "version")
        try:
            miner_model = sock_json_data["VERSION"][0]["Type"]

            if " (" in miner_model:
                split_miner_model = miner_model.split(" (")
                miner_model = split_miner_model[0]

            return miner_model
        except (TypeError, LookupError):
            pass

        sock_json_data = await self.send_api_command(ip, "stats")
        try:
            miner_model = sock_json_data["STATS"][0]["Type"]

            if " (" in miner_model:
                split_miner_model = miner_model.split(" (")
                miner_model = split_miner_model[0]

            return miner_model
        except (TypeError, LookupError):
            pass

    async def get_miner_model_goldshell(self, ip: str) -> str | None:
        json_data = await self.send_web_command(ip, "/mcb/status")

        try:
            miner_model = json_data["model"].replace("-", " ")

            return miner_model
        except (TypeError, LookupError):
            pass

    async def get_miner_model_whatsminer(self, ip: str) -> str | None:
        sock_json_data = await self.send_api_command(ip, "devdetails")
        try:
            miner_model = sock_json_data["DEVDETAILS"][0]["Model"].replace("_", "")
            miner_model = miner_model[:-1] + "0"

            return miner_model
        except (TypeError, LookupError):
            pass

    async def get_miner_model_avalonminer(self, ip: str) -> str | None:
        sock_json_data = await self.send_api_command(ip, "version")
        try:
            miner_model = sock_json_data["VERSION"][0]["PROD"]
            if "-" in miner_model:
                miner_model = miner_model.split("-")[0]

            return miner_model
        except (TypeError, LookupError):
            pass

    async def get_miner_model_innosilicon(self, ip: str) -> str | None:
        try:
            async with httpx.AsyncClient(transport=settings.transport()) as session:
                auth_req = await session.post(
                    f"http://{ip}/api/auth",
                    data={"username": "admin", "password": "admin"},
                )
                auth = auth_req.json()["jwt"]

                web_data = (
                    await session.post(
                        f"http://{ip}/api/type",
                        headers={"Authorization": "Bearer " + auth},
                        data={},
                    )
                ).json()
                return web_data["type"]
        except (httpx.HTTPError, LookupError):
            pass

    async def get_miner_model_braiins_os(self, ip: str) -> str | None:
        sock_json_data = await self.send_api_command(ip, "devdetails")
        try:
            miner_model = (
                sock_json_data["DEVDETAILS"][0]["Model"]
                .replace("Bitmain ", "")
                .replace("S19XP", "S19 XP")
            )
            return miner_model
        except (TypeError, LookupError):
            pass

        try:
            async with httpx.AsyncClient(transport=settings.transport()) as session:
                d = await session.post(
                    f"http://{ip}/graphql",
                    json={"query": "{bosminer {info{modelName}}}"},
                )
            if d.status_code == 200:
                json_data = d.json()
                miner_model = json_data["data"]["bosminer"]["info"][
                    "modelName"
                ].replace("S19XP", "S19 XP")
                return miner_model
        except (httpx.HTTPError, LookupError):
            pass

    async def get_miner_model_vnish(self, ip: str) -> str | None:
        sock_json_data = await self.send_api_command(ip, "stats")
        try:
            miner_model = sock_json_data["STATS"][0]["Type"]
            if " (" in miner_model:
                split_miner_model = miner_model.split(" (")
                miner_model = split_miner_model[0]

            if "(88)" in miner_model:
                miner_model = miner_model.replace("(88)", "NOPIC")

            if " AML" in miner_model:
                miner_model = miner_model.replace(" AML", "")

            return miner_model
        except (TypeError, LookupError):
            pass

    async def get_miner_model_epic(self, ip: str) -> str | None:
        for retry_cnt in range(settings.get("get_data_retries", 1)):
            sock_json_data = await self.send_web_command(ip, ":4028/capabilities")
            try:
                miner_model = sock_json_data["Model"]
                return miner_model
            except (TypeError, LookupError):
                if retry_cnt < settings.get("get_data_retries", 1) - 1:
                    continue
                else:
                    pass

    async def get_miner_model_hiveon(self, ip: str) -> str | None:
        sock_json_data = await self.send_api_command(ip, "version")
        try:
            miner_type = sock_json_data["VERSION"][0]["Type"]

            return miner_type.replace(" HIVEON", "")
        except (TypeError, LookupError):
            pass

    async def get_miner_model_luxos(self, ip: str) -> str | None:
        sock_json_data = await self.send_api_command(ip, "version")
        try:
            miner_model = sock_json_data["VERSION"][0]["Type"]

            if " (" in miner_model:
                split_miner_model = miner_model.split(" (")
                miner_model = split_miner_model[0]
            return miner_model
        except (TypeError, LookupError):
            pass

    async def get_miner_model_auradine(self, ip: str) -> str | None:
        sock_json_data = await self.send_api_command(ip, "devdetails")
        try:
            return sock_json_data["DEVDETAILS"][0]["Model"]
        except LookupError:
            pass

    async def get_miner_model_marathon(self, ip: str) -> str | None:
        auth = httpx.DigestAuth("root", "root")
        web_json_data = await self.send_web_command(
            ip, "/kaonsu/v1/overview", auth=auth
        )

        try:
            miner_model = web_json_data["model"]
            if miner_model == "":
                return None

            return miner_model
        except (TypeError, LookupError):
            pass


Get Miner

Source code in pyasic/miners/factory.py
async def get_miner(ip: ipaddress.ip_address | str) -> AnyMiner:
    return await miner_factory.get_miner(ip)


AnyMiner

AnyMiner is a placeholder type variable used for typing returns of functions. A function returning AnyMiner will always return a subclass of BaseMiner, and is used to specify a function returning some arbitrary type of miner class instance.