pyasic

Miner Factory

A factory to handle identification and selection of the proper class of miner.

Source code in pyasic/miners/miner_factory.py
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
class MinerFactory(metaclass=Singleton):
    """A factory to handle identification and selection of the proper class of miner."""

    def __init__(self) -> None:
        self.miners = {}

    async def get_miner_generator(
        self, ips: List[Union[ipaddress.ip_address, str]]
    ) -> AsyncIterable[AnyMiner]:
        """
        Get Miner objects from ip addresses using an async generator.

        Returns an asynchronous generator containing Miners.

        Parameters:
            ips: a list of ip addresses to get miners for.

        Returns:
            An async iterable containing miners.
        """
        # get the event loop
        loop = asyncio.get_event_loop()
        # create a list of tasks
        scan_tasks = []
        # for each miner IP that was passed in, add a task to get its class
        for miner in ips:
            scan_tasks.append(loop.create_task(self.get_miner(miner)))
        # asynchronously run the tasks and return them as they complete
        scanned = asyncio.as_completed(scan_tasks)
        # loop through and yield the miners as they complete
        for miner in scanned:
            yield await miner

    async def get_miner(self, ip: Union[ipaddress.ip_address, str]) -> AnyMiner:
        """Decide a miner type using the IP address of the miner.

        Parameters:
            ip: An `ipaddress.ip_address` or string of the IP to find the miner.

        Returns:
            A miner class.
        """
        if isinstance(ip, str):
            ip = ipaddress.ip_address(ip)
        # check if the miner already exists in cache
        if ip in self.miners:
            return self.miners[ip]
        # if everything fails, the miner is already set to unknown
        model, api, ver, api_ver = None, None, None, None

        # try to get the API multiple times based on retries
        for i in range(PyasicSettings().miner_factory_get_version_retries):
            try:
                # get the API type, should be BOSMiner, CGMiner, BMMiner, BTMiner, or None
                new_model, new_api, new_ver, new_api_ver = await asyncio.wait_for(
                    self._get_miner_type(ip), timeout=10
                )
                # keep track of the API and model we found first
                if new_api and not api:
                    api = new_api
                if new_model and not model:
                    model = new_model
                if new_ver and not ver:
                    ver = new_ver
                if new_api_ver and not api_ver:
                    api_ver = new_api_ver
                # if we find the API and model, don't need to loop anymore
                if api and model:
                    break
            except asyncio.TimeoutError:
                logging.warning(f"{ip}: Get Miner Timed Out")
        miner = self._select_miner_from_classes(ip, model, api, ver, api_ver)

        # save the miner to the cache at its IP if its not unknown
        if not isinstance(miner, UnknownMiner):
            self.miners[ip] = miner

        # return the miner
        return miner

    @staticmethod
    def _select_miner_from_classes(
        ip: ipaddress.ip_address,
        model: Union[str, None],
        api: Union[str, None],
        ver: Union[str, None],
        api_ver: Union[str, None] = None,
    ) -> AnyMiner:
        miner = UnknownMiner(str(ip))
        # make sure we have model information
        if model:
            if not api:
                api = "Default"

            if model not in MINER_CLASSES.keys():
                if "avalon" in model:
                    if model == "avalon10":
                        miner = CGMinerAvalon1066(str(ip), api_ver)
                    else:
                        miner = CGMinerAvalon821(str(ip), api_ver)
                return miner
            if api not in MINER_CLASSES[model].keys():
                api = "Default"
            if ver in MINER_CLASSES[model].keys():
                miner = MINER_CLASSES[model][ver](str(ip), api_ver)
                return miner
            miner = MINER_CLASSES[model][api](str(ip), api_ver)

        # if we cant find a model, check if we found the API
        else:

            # return the miner base class with some API if we found it
            if api:
                if "BOSMiner+" in api:
                    miner = BOSMiner(str(ip), api_ver)
                elif "BOSMiner" in api:
                    miner = BOSMinerOld(str(ip), api_ver)
                elif "CGMiner" in api:
                    miner = CGMiner(str(ip), api_ver)
                elif "BTMiner" in api:
                    miner = BTMiner(str(ip), api_ver)
                elif "BMMiner" in api:
                    miner = BMMiner(str(ip), api_ver)

        return miner

    def clear_cached_miners(self) -> None:
        """Clear the miner factory cache."""
        # empty out self.miners
        self.miners = {}

    async def _get_miner_type(
        self, ip: Union[ipaddress.ip_address, str]
    ) -> Tuple[Union[str, None], Union[str, None], Union[str, None], Union[str, None]]:
        model, api, ver, api_ver = None, None, None, None

        try:
            devdetails, version = await self.__get_devdetails_and_version(ip)
        except APIError as e:
            # catch APIError and let the factory know we cant get data
            logging.warning(f"{ip}: API Command Error: {e}")
            return None, None, None, None
        except OSError or ConnectionRefusedError:
            # miner refused connection on API port, we wont be able to get data this way
            # try ssh
            try:
                _model = await self.__get_model_from_graphql(ip)
                if not _model:
                    _model = await self.__get_model_from_ssh(ip)
                if _model:
                    model = _model
                    api = "BOSMiner+"
            except asyncssh.misc.PermissionDenied:
                try:
                    data = await self.__get_system_info_from_web(ip)
                    if not data.get("success"):
                        _model = await self.__get_dragonmint_version_from_web(ip)
                        if _model:
                            model = _model
                    if "minertype" in data:
                        model = data["minertype"].upper()
                    if "bmminer" in "\t".join(data):
                        api = "BMMiner"
                except Exception as e:
                    logging.debug(f"Unable to get miner - {e}")
            return model, api, ver, api_ver

        # if we have devdetails, we can get model data from there
        if devdetails:
            for _devdetails_key in ["Model", "Driver"]:
                try:
                    if devdetails.get("DEVDETAILS"):
                        model = devdetails["DEVDETAILS"][0][_devdetails_key].upper()
                        if "NOPIC" in model:
                            # bos, weird model
                            if model == "ANTMINER S19J88NOPIC":
                                model = "ANTMINER S19J NOPIC"
                            else:
                                print(model)
                        if not model == "BITMICRO":
                            break
                    elif devdetails.get("DEVS"):
                        model = devdetails["DEVS"][0][_devdetails_key].upper()
                        if "QOMO" in model:
                            model = await self.__get_goldshell_model_from_web(ip)

                except LookupError:
                    continue
            try:
                if devdetails[0]["STATUS"][0]["Msg"]:
                    model = await self.__get_model_from_graphql(ip)
                    if model:
                        api = "BOSMiner+"
            except (KeyError, TypeError, ValueError, IndexError):
                pass
            try:
                if not model:
                    # braiins OS bug check just in case
                    if "s9" in devdetails["STATUS"][0]["Description"]:
                        model = "ANTMINER S9"
                    if "s17" in version["STATUS"][0]["Description"]:
                        model = "ANTMINER S17"
            except (KeyError, TypeError, ValueError, IndexError):
                pass
            try:
                if not api:
                    if "boser" in version["STATUS"][0]["Description"]:
                        api = "BOSMiner+"
            except (KeyError, TypeError, ValueError, IndexError):
                pass
        else:
            try:
                _model = await self.__get_model_from_graphql(ip)
                if _model:
                    model = _model
                    api = "BOSMiner+"
            except (KeyError, TypeError, ValueError, IndexError):
                pass

        # if we have version we can get API type from here
        if version:
            try:
                if isinstance(version.get("Msg"), dict):
                    if "api_ver" in version["Msg"]:
                        api_ver = (
                            version["Msg"]["api_ver"]
                            .replace("whatsminer ", "")
                            .replace("v", "")
                        )
                        api = "BTMiner"

                if version[0]["STATUS"][0]["Msg"]:
                    model = await self.__get_model_from_graphql(ip)
                    if model:
                        api = "BOSMiner+"
                        try:
                            api_ver = version[0]["VERSION"][0]["API"]
                        except (KeyError, TypeError, ValueError, IndexError):
                            pass
                        return model, api, ver, api_ver
            except (KeyError, TypeError, ValueError, IndexError):
                pass
            if "VERSION" in version:
                api_ver = version["VERSION"][0].get("API")
                api_types = ["BMMiner", "CGMiner", "BTMiner"]
                # check basic API types, BOSMiner needs a special check
                for api_type in api_types:
                    if any(api_type in string for string in version["VERSION"][0]):
                        api = api_type

                # check if there are any BOSMiner strings in any of the dict keys
                if any("BOSminer" in string for string in version["VERSION"][0]):
                    api = "BOSMiner"
                    if version["VERSION"][0].get("BOSminer"):
                        if "plus" in version["VERSION"][0]["BOSminer"]:
                            api = "BOSMiner+"
                    if "BOSminer+" in version["VERSION"][0]:
                        api = "BOSMiner+"
                if any("BOSer" in string for string in version["VERSION"][0]):
                    api = "BOSMiner+"

                # check for avalonminers
                for _version_key in ["PROD", "MODEL"]:
                    try:
                        _data = version["VERSION"][0][_version_key].split("-")
                    except KeyError:
                        continue

                    model = _data[0].upper()
                    if _version_key == "MODEL":
                        model = f"AVALONMINER {_data[0]}"
                    if len(_data) > 1:
                        ver = _data[1]

            if version.get("Description") and (
                "whatsminer" in version.get("Description")
            ):
                api = "BTMiner"

        # if we have no model from devdetails but have version, try to get it from there
        if version and not model:
            try:
                model = version["VERSION"][0]["Type"].upper()
                if "ANTMINER BHB" in model:
                    # def antminer, get from web
                    sysinfo = await self.__get_system_info_from_web(str(ip))
                    model = sysinfo["minertype"].upper()
                if "VNISH" in model:
                    api = "VNish"
                for split_point in [" BB", " XILINX", " (VNISH"]:
                    if split_point in model:
                        model = model.split(split_point)[0]

            except KeyError:
                pass

        if not model:
            stats = await self._send_api_command(str(ip), "stats")
            if stats:
                try:
                    _model = stats["STATS"][0]["Type"].upper()
                except KeyError:
                    pass
                else:
                    if "VNISH" in _model:
                        api = "VNish"
                    for split_point in [" BB", " XILINX", " (VNISH"]:
                        if split_point in _model:
                            _model = _model.split(split_point)[0]
                    if "PRO" in _model and " PRO" not in _model:
                        _model = _model.replace("PRO", " PRO")
                    model = _model
            else:
                _model = await self.__get_dragonmint_version_from_web(ip)
                if _model:
                    model = _model

        if model:
            if "DRAGONMINT" in model or "A10" in model:
                _model = await self.__get_dragonmint_version_from_web(ip)
                if _model:
                    model = _model
            if " HIVEON" in model:
                # do hiveon check before whatsminer as HIVEON contains a V
                model = model.split(" HIVEON")[0]
                api = "Hiveon"
            # whatsminer have a V in their version string (M20SV41), everything after it is ver
            if "V" in model:
                _ver = model.split("V")
                if len(_ver) > 1:
                    ver = model.split("V")[1]
                    model = model.split("V")[0]
            # don't need "Bitmain", just "ANTMINER XX" as model
            if "BITMAIN " in model:
                model = model.replace("BITMAIN ", "")
        return model, api, ver, api_ver

    async def __get_devdetails_and_version(
        self, ip
    ) -> Tuple[Union[dict, None], Union[dict, None]]:
        version = None
        try:
            # get device details and version data
            data = await self._send_api_command(str(ip), "devdetails+version")
            # validate success
            validation = await self._validate_command(data)
            if not validation[0]:
                try:
                    if data["version"][0]["STATUS"][0]["Msg"] == "Disconnected":
                        return data["devdetails"], data["version"]
                except (KeyError, TypeError):
                    pass
                raise APIError(validation[1])
            # copy each part of the main command to devdetails and version
            devdetails = data["devdetails"][0]
            version = data["version"][0]
            if "STATUS" in version:
                if len(version["STATUS"]) > 0:
                    if "Description" in version["STATUS"][0]:
                        if version["STATUS"][0]["Description"] == "btminer":
                            try:
                                new_version = await self._send_api_command(
                                    str(ip), "get_version"
                                )
                                validation = await self._validate_command(new_version)
                                if validation[0]:
                                    version = new_version
                            except Exception as e:
                                logging.warning(
                                    f"([Hidden] Get Devdetails and Version) - Error {e}"
                                )
            if "DEVDETAILS" in devdetails:
                if len(devdetails["DEVDETAILS"]) > 0:
                    if devdetails["DEVDETAILS"][0].get("Driver") == "bitmicro":
                        try:
                            new_version = await self._send_api_command(
                                str(ip), "get_version"
                            )
                            validation = await self._validate_command(new_version)
                            if validation[0]:
                                version = new_version
                        except Exception as e:
                            logging.warning(
                                f"([Hidden] Get Devdetails and Version) - Error {e}"
                            )
            return devdetails, version
        except APIError:
            # try devdetails and version separately (X19s mainly require this)
            # get devdetails and validate
            devdetails = await self._send_api_command(str(ip), "devdetails")
            validation = await self._validate_command(devdetails)
            if not validation[0]:
                # if devdetails fails try version instead
                devdetails = None

                # get version and validate
                version = await self._send_api_command(str(ip), "version")
                validation = await self._validate_command(version)
                if not validation[0]:
                    # finally try get_version (Whatsminers) and validate
                    version = await self._send_api_command(str(ip), "get_version")
                    validation = await self._validate_command(version)

                    # if this fails we raise an error to be caught below
                    if not validation[0]:
                        raise APIError(validation[1])
        return devdetails, version

    @staticmethod
    async def __get_model_from_ssh(ip: ipaddress.ip_address) -> Union[str, None]:
        model = None
        try:
            async with asyncssh.connect(
                str(ip),
                known_hosts=None,
                username="root",
                password="admin",
                server_host_key_algs=["ssh-rsa"],
            ) as conn:
                board_name = None
                cmd = await conn.run("cat /tmp/sysinfo/board_name")
                if cmd:
                    board_name = cmd.stdout.strip()
            if board_name == "am1-s9":
                model = "ANTMINER S9"
            if board_name == "am2-s17":
                model = "ANTMINER S17"
            return model
        except ConnectionRefusedError:
            return None

    @staticmethod
    async def __get_model_from_graphql(ip: ipaddress.ip_address) -> Union[str, None]:
        model = None
        url = f"http://{ip}/graphql"
        try:
            async with httpx.AsyncClient() as client:
                d = await client.post(
                    url, json={"query": "{bosminer {info{modelName}}}"}
                )
            if d.status_code == 200:
                model = (d.json()["data"]["bosminer"]["info"]["modelName"]).upper()
            return model
        except httpx.HTTPError:
            pass

    @staticmethod
    async def __get_system_info_from_web(ip) -> dict:
        url = f"http://{ip}/cgi-bin/get_system_info.cgi"
        auth = httpx.DigestAuth("root", "root")
        try:
            async with httpx.AsyncClient() as client:
                data = await client.get(url, auth=auth)
            if data.status_code == 200:
                data = data.json()
            return data
        except httpx.HTTPError:
            pass

    @staticmethod
    async def __get_goldshell_model_from_web(ip):
        response = None
        try:
            async with httpx.AsyncClient() as client:
                response = (
                    await client.get(
                        f"http://{ip}/mcb/status",
                    )
                ).json()
        except httpx.HTTPError as e:
            logging.info(e)
        if response:
            try:
                model = response["model"]
                if model:
                    return model.replace("-", " ").upper()
            except KeyError:
                pass

    @staticmethod
    async def __get_dragonmint_version_from_web(
        ip: ipaddress.ip_address,
    ) -> Union[str, None]:
        response = None
        try:
            async with httpx.AsyncClient() as client:
                auth = (
                    await client.post(
                        f"http://{ip}/api/auth",
                        data={"username": "admin", "password": "admin"},
                    )
                ).json()["jwt"]
                response = (
                    await client.post(
                        f"http://{ip}/api/type",
                        headers={"Authorization": "Bearer " + auth},
                        data={},
                    )
                ).json()
        except httpx.HTTPError as e:
            logging.info(e)
        if response:
            try:
                return response["type"]
            except KeyError:
                pass

    @staticmethod
    async def _validate_command(data: dict) -> Tuple[bool, Union[str, None]]:
        """Check if the returned command output is correctly formatted."""
        # check if the data returned is correct or an error
        if not data or data == {}:
            return False, "No API data."
        # if status isn't a key, it is a multicommand
        if "STATUS" not in data.keys():
            for key in data.keys():
                # make sure not to try to turn id into a dict
                if not key == "id":
                    # make sure they succeeded
                    if "STATUS" in data[key][0].keys():
                        if data[key][0]["STATUS"][0]["STATUS"] not in ["S", "I"]:
                            # this is an error
                            return False, f"{key}: " + data[key][0]["STATUS"][0]["Msg"]
        elif "id" not in data.keys():
            if data["STATUS"] not in ["S", "I"]:
                return False, data["Msg"]
        else:
            # make sure the command succeeded
            if data["STATUS"][0]["STATUS"] not in ("S", "I"):
                return False, data["STATUS"][0]["Msg"]
        return True, None

    @staticmethod
    async def _send_api_command(
        ip: Union[ipaddress.ip_address, str], command: str
    ) -> dict:
        try:
            # get reader and writer streams
            reader, writer = await asyncio.open_connection(str(ip), 4028)
        except OSError as e:
            if e.errno in [10061, 22]:
                raise e
            logging.warning(f"{str(ip)} - Command {command}: {e}")
            return {}
        # create the command
        cmd = {"command": command}

        # send the command
        writer.write(json.dumps(cmd).encode("utf-8"))
        await writer.drain()

        # instantiate data
        data = b""

        # loop to receive all the data
        try:
            while True:
                d = await reader.read(4096)
                if not d:
                    break
                data += d
        except Exception as e:
            logging.debug(f"{str(ip)}: {e}")

        try:
            # some json from the API returns with a null byte (\x00) on the end
            if data.endswith(b"\x00"):
                # handle the null byte
                str_data = data.decode("utf-8")[:-1]
            else:
                # no null byte
                str_data = data.decode("utf-8")
            # fix an error with a btminer return having an extra comma that breaks json.loads()
            str_data = str_data.replace(",}", "}")
            # fix an error with a btminer return having a newline that breaks json.loads()
            str_data = str_data.replace("\n", "")
            # fix an error with a bmminer return missing a specific comma that breaks json.loads()
            str_data = str_data.replace("}{", "},{")
            # parse the json
            data = json.loads(str_data)
        # handle bad json
        except json.decoder.JSONDecodeError:
            # raise APIError(f"Decode Error: {data}")
            data = None

        # close the connection
        writer.close()
        await writer.wait_closed()

        return data

clear_cached_miners()

Clear the miner factory cache.

Source code in pyasic/miners/miner_factory.py
713
714
715
716
def clear_cached_miners(self) -> None:
    """Clear the miner factory cache."""
    # empty out self.miners
    self.miners = {}

get_miner(ip) async

Decide a miner type using the IP address of the miner.

Parameters:

Name Type Description Default
ip Union[ipaddress.ip_address, str]

An ipaddress.ip_address or string of the IP to find the miner.

required

Returns:

Type Description
AnyMiner

A miner class.

Source code in pyasic/miners/miner_factory.py
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
async def get_miner(self, ip: Union[ipaddress.ip_address, str]) -> AnyMiner:
    """Decide a miner type using the IP address of the miner.

    Parameters:
        ip: An `ipaddress.ip_address` or string of the IP to find the miner.

    Returns:
        A miner class.
    """
    if isinstance(ip, str):
        ip = ipaddress.ip_address(ip)
    # check if the miner already exists in cache
    if ip in self.miners:
        return self.miners[ip]
    # if everything fails, the miner is already set to unknown
    model, api, ver, api_ver = None, None, None, None

    # try to get the API multiple times based on retries
    for i in range(PyasicSettings().miner_factory_get_version_retries):
        try:
            # get the API type, should be BOSMiner, CGMiner, BMMiner, BTMiner, or None
            new_model, new_api, new_ver, new_api_ver = await asyncio.wait_for(
                self._get_miner_type(ip), timeout=10
            )
            # keep track of the API and model we found first
            if new_api and not api:
                api = new_api
            if new_model and not model:
                model = new_model
            if new_ver and not ver:
                ver = new_ver
            if new_api_ver and not api_ver:
                api_ver = new_api_ver
            # if we find the API and model, don't need to loop anymore
            if api and model:
                break
        except asyncio.TimeoutError:
            logging.warning(f"{ip}: Get Miner Timed Out")
    miner = self._select_miner_from_classes(ip, model, api, ver, api_ver)

    # save the miner to the cache at its IP if its not unknown
    if not isinstance(miner, UnknownMiner):
        self.miners[ip] = miner

    # return the miner
    return miner

get_miner_generator(ips) async

Get Miner objects from ip addresses using an async generator.

Returns an asynchronous generator containing Miners.

Parameters:

Name Type Description Default
ips List[Union[ipaddress.ip_address, str]]

a list of ip addresses to get miners for.

required

Returns:

Type Description
AsyncIterable[AnyMiner]

An async iterable containing miners.

Source code in pyasic/miners/miner_factory.py
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
async def get_miner_generator(
    self, ips: List[Union[ipaddress.ip_address, str]]
) -> AsyncIterable[AnyMiner]:
    """
    Get Miner objects from ip addresses using an async generator.

    Returns an asynchronous generator containing Miners.

    Parameters:
        ips: a list of ip addresses to get miners for.

    Returns:
        An async iterable containing miners.
    """
    # get the event loop
    loop = asyncio.get_event_loop()
    # create a list of tasks
    scan_tasks = []
    # for each miner IP that was passed in, add a task to get its class
    for miner in ips:
        scan_tasks.append(loop.create_task(self.get_miner(miner)))
    # asynchronously run the tasks and return them as they complete
    scanned = asyncio.as_completed(scan_tasks)
    # loop through and yield the miners as they complete
    for miner in scanned:
        yield await miner


AnyMiner

AnyMiner is a placeholder type variable used for typing returns of functions. A function returning AnyMiner will always return a subclass of BaseMiner, and is used to specify a function returning some arbitrary type of miner class instance.