#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Device scanners working on windows systems."""
import os
import sys
if sys.platform != "win32":
if "DEVMAN_NO_IMPORT_ERROR" not in os.environ or \
str(os.environ["DEVMAN_NO_IMPORT_ERROR"]) not in ["True", "1"]:
raise ImportError("Windows-specific device scanners are only importable on windows systems")
import re
import subprocess
import typing
import win32com.client
from ._base import BaseDeviceScanner, BaseLANDeviceScanner
from ..device import USBDevice, LANDevice
__all__ = ["Win32USBDeviceScanner", "Win32LANDeviceScanner"]
####################################################################################################
[docs]class Win32USBDeviceScanner(BaseDeviceScanner):
"""A device scanner that scans for usb devices on linux systems. It scans all usb ports for
devices.
"""
def __init__(self, **kwargs):
super().__init__()
self._wmi = win32com.client.Dispatch("WbemScripting.SWbemLocator")
self._wbem = self._wmi.ConnectServer(".", "root\\cimv2")
[docs] @staticmethod
def _device_from_raw(raw_device) -> USBDevice:
"""Converts a raw device provided from the windows device manager into a `USBDevice`-object.
Args:
raw_device: A device provided from the windows device manager as result of scanning all
known PNP devices.
Returns:
USBDevice: The `raw_device` converted into a `USBDevice`-object.
"""
if raw_device.CreationClassName != "Win32_PnPEntity":
raise TypeError("Expected \"Win32_PnPEntity\", got \"{}\" instead.".format(
raw_device.CreationClassName))
if raw_device.PNPClass != "USB":
# Only usb devices are accepted
raise TypeError("Expected PNP class \"USB\", got \"{}\" instead.".format(
raw_device.PNPClass))
dev = USBDevice()
try:
dev.address = raw_device.DeviceID
# On win32 usb devices have many ids, that can be used to connect to the device
dev.address_aliases = list(dict.fromkeys([*raw_device.HardwareID,
*raw_device.CompatibleID]))
except AttributeError as exc:
raise TypeError("Could not extract device id and hardware id from Win32_PnPEntity") \
from exc
device_type, attributes, instance_id = _analyse_win32_device_ids(dev.all_addresses)
if device_type != "USB":
# The pnp class (from the device id) also must be usb. Sometimes there are some PCI
# devices found.
raise TypeError("Expected PNP class \"USB\", got \"{}\" instead.".format(device_type))
if instance_id is not None:
# If the second character of the instance id is an ampersand, the device does not have a
# serial number
instance_id = instance_id.split("&")[0]
if len(instance_id) > 1:
dev.serial = instance_id
# Readout the device attributes
for key, value in attributes.items():
if key == "VID":
dev.vendor_id = int(value, base=16)
elif key == "PID":
dev.product_id = int(value, base=16)
elif key == "REV":
dev.revision_id = int(value, base=16)
return dev
[docs] def _scan(self, rescan: bool) -> typing.Sequence[USBDevice]:
"""Scans all PNP devices from the windows device manager and filters the USB devices.
Args:
rescan: True, to scan again. False, if you only want to scan, if there are no
results from a previous scan.
"""
if len(self._devices) > 0 and not rescan:
# Only scan if rescan is True or no devices were found, yet
return self._devices
self._devices.clear()
# Get all plug-and-play devices from the windows device manager
raw_devices = self._wbem.ExecQuery("SELECT * FROM Win32_PnPEntity")
for raw_dev in raw_devices:
try:
dev = self._device_from_raw(raw_dev)
self._devices.append(dev)
except (TypeError, AttributeError, ValueError):
pass
return tuple(self._devices)
[docs]class Win32LANDeviceScanner(BaseLANDeviceScanner):
"""A device scanner that scans the local network for ethernet devices.
Args:
**kwargs:
- nmap_search_path: One or multiple paths where to search for the nmap executable.
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
# Regular Expression: " <ip address> <hardware type> <mac address> ..."
self._arp_regex = re.compile(r"^[ \t]*((?:\d{1,3}\.){3}\d{1,3})[ \t]+"
r"([0-9A-Fa-f]{2}[.:\-]){5}([0-9A-Fa-f]{2})")
[docs] def _get_arp_cache(self) -> typing.Dict[str, LANDevice]:
"""Runs the arp command and extracts ip and mac addresses from the command's output.
Returns:
dict: A dictionary, mapping strings to `LANDevice`s. The dictionary contains all results
of the arp command, that contain a valid ip and mac address.
"""
devices = {}
try:
# Run "arp -a", to retrieve all mac addresses from the ARP-cache
process = subprocess.Popen(["arp", "-a"],
bufsize=100000,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
except FileNotFoundError as exc:
raise FileNotFoundError("Command 'arp' was not found. Please, make sure it is "
"installed.") from exc
raw_arp_out, raw_arp_err = process.communicate()
if process.returncode != 0:
# The arp-command failed
return devices
arp_out = bytes.decode(raw_arp_out, errors="ignore")
arp_err = bytes.decode(raw_arp_err, errors="ignore")
if len(arp_err) > 0:
# The arp-command failed
return devices
for line in arp_out.splitlines():
if self._arp_regex.match(line):
# Valid line due to regular expression
components = line.split()
ip_address = components[0]
# The mac address can be found in third or second component, depending on whether
# the <hardware type> is contained or not.
try:
mac_address = LANDevice.format_mac(components[1])
except (IndexError, TypeError):
# If no mac address was found, continue with next line
continue
if mac_address in devices:
# If mac address is already known, IP address is added to the address aliases
if ip_address not in devices[mac_address].all_addresses:
devices[mac_address].address_aliases = [
*devices[mac_address].address_aliases,
ip_address]
else:
# Unknown mac address: Create a new ethernet device
dev = LANDevice()
dev.address = ip_address
dev.mac_address = mac_address
devices[mac_address] = dev
return devices
def _analyse_win32_device_ids(ids: typing.Iterable[str]) \
-> typing.Tuple[str, typing.Dict[str, str], typing.Optional[str]]:
"""Analyses the windows devices ids of usb devices and extracts the pnp class, the device's
identifiers and the instance id.
Args:
ids: A list of device ids of the same device.
Returns:
tuple of str: device's pnp class (e.g. USB, PCI)
dict: Device attributes (e.g. VID: vendor id, PID: product id, REV: revision code)
str (optional): instance id (often the serial number)
"""
device_class = None
items = {}
instance_id = None
for device_id in ids:
components = device_id.split("\\")
if len(components) < 2:
continue # At least to components expected
if device_class is None:
# Save device's pnp class of first valid device id
device_class = components[0]
elif device_class != components[0]:
# Compare device's pnp class with first device, to ensure they are the same
raise ValueError("Different values for device class: {}, {}".format(device_class,
components[0]))
# If length is at least 3, the last component contains the instance id
if len(components) > 2:
# Use the last components as instance id
tmp_instance_id = "\\".join(components[2:])
if instance_id is None:
# Save the first valid instance id
instance_id = tmp_instance_id
elif instance_id != tmp_instance_id:
# Compare the next valid instance ids to the first one, to ensure they are all the
# same. If a different one is found, take the longest one, as soon as they are at
# least partly equal.
if instance_id.startswith(tmp_instance_id):
pass
elif tmp_instance_id.startswith(instance_id):
instance_id = tmp_instance_id
else:
# Raise an error, if another instance id was found, that is not at least partly
# equal to the other(s).
raise ValueError("Different values for instance id: {}, {}".format(
instance_id, tmp_instance_id))
# The device's attributes are separated with an ampersand
key_values = components[1].split("&")
for key_value in key_values:
try:
# Split key and value of an attribute. These are separated with an underscore
key, value = key_value.split('_')
except ValueError:
# If the attributes are not matching the expected format, continue with the next one
continue
key = key.upper() # Only use uppercase keys
if key not in items:
# Add new key-value-pair
items[key] = value
elif items[key] != value:
# Different values for the same key are not allowed
raise ValueError("Different values for \"{}\": {}, {}".format(key, items[key],
value))
return device_class, items, instance_id