Source code for device_manager.scanner._base

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Base classes for device scanners."""

import abc
import typing

from .nmap import NMAPWrapper
from ..device import Device, LANDevice

__all__ = ["BaseDeviceScanner", "BaseLANDeviceScanner"]

####################################################################################################


[docs]class BaseDeviceScanner(abc.ABC): """Base class for device scanners. Device scanners are used to scan specific protocols (like usb or ip). You can get a list of all connected devices or search with a user-defined filter. """ def __init__(self): super().__init__() self._devices = []
[docs] def list_devices(self, rescan: bool = False) -> typing.Sequence[Device]: """Lists all connected devices. Args: rescan: True, if the protocol should be scanned again. False, if you only want to scan, if there are no results from a previous scan. Returns: tuple: A sequence of all connected devices. """ return tuple(self._scan(rescan))
[docs] def find_devices(self, rescan: bool = False, **filters) -> typing.Sequence[Device]: """Lists all connected devices that match the filter. Args: rescan: True, if the protocol should be scanned again. False, if you only want to scan, if there are no results from a previous scan. **filters: User-defined filters. Only devices that match these filters will be returned. Returns: tuple: A sequence of all connected devices that match the filter. """ devices = self._scan(rescan) return tuple(device for device in devices if self._match_filters(device, **filters))
[docs] @abc.abstractmethod def _scan(self, rescan: bool) -> typing.Sequence[Device]: """Scans the specific protocol for devices. Subclasses must override this function and fill the `_devices` attribute. Args: rescan: True, if the protocol should be scanned again. False, if you only want to scan, if there are no results from a previous scan. """ raise NotImplementedError()
[docs] @staticmethod def _match_filters(device: Device, **filters) -> bool: """Checks if the device matches the user-defined filters. Args: device: The device object, that is about to be checked against the filters. **filters: User-defined filters. Only devices that match these filters will be returned. Returns: bool: True, if the device matches the filters. """ for attr, mask_value in filters.items(): if attr == "address": # If an address filter is passed, check if it is contained in all_addresses. So, the # aliases are checked two if mask_value not in device.all_addresses: return False else: try: if attr == "mac_address": mask_value = LANDevice.format_mac(mask_value) if mask_value != getattr(device, attr): return False except AttributeError: # If the device does not contain the filter, it does not match the filter return False # If no conflict happened, the device matches the filter return True
[docs]class BaseLANDeviceScanner(BaseDeviceScanner, abc.ABC): """A device scanner that scans the local network for ethernet devices. Args: **kwargs: - nmap_search_path: One or multiple paths where to search for the nmap executable. """ def __init__(self, **kwargs): super().__init__() self._nmap = NMAPWrapper(notify_parent_done=lambda b: self._scan(True), **kwargs) @property def nmap(self) -> typing.Optional["NMAPWrapper"]: """Wrapper for a nmap port scanner. May be None, if nmap could not be imported.""" return self._nmap
[docs] def _scan(self, rescan: bool) -> typing.Sequence[LANDevice]: """Scans the arp cache for ip and mac addresses. Args: rescan: True, to scan again. False, if you only want to scan, if there are no results from a previous scan. """ if len(self._devices) > 0 and not rescan: return self._devices devices = self._get_arp_cache() if self.nmap.valid: # pragma: no cover for dev in self.nmap.devices: if dev.mac_address in devices: address_aliases = [ip_address for ip_address in dev.all_addresses if ip_address not in devices[dev.mac_address].all_addresses] if len(address_aliases) > 0: devices[dev.mac_address].address_aliases = \ [*devices[dev.mac_address].address_aliases, address_aliases] else: devices[dev.mac_address] = dev self._devices = list(devices.values()) return tuple(self._devices)
[docs] @abc.abstractmethod def _get_arp_cache(self) -> typing.Dict[str, LANDevice]: """Runs the arp command and extracts ip and mac addresses from the command's output. A subclass must override this function because this function is platform dependent. Returns: dict: A dictionary, mapping strings to `LANDevice`s. The dictionary contains all results of the arp command, that contain a valid ip and mac address. """ raise NotImplementedError()