Source code for device_manager.scanner._linux
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Device scanners working on linux systems."""
import os
import sys
if sys.platform != "linux":
if "DEVMAN_NO_IMPORT_ERROR" not in os.environ or \
str(os.environ["DEVMAN_NO_IMPORT_ERROR"]) not in ["True", "1"]:
raise ImportError("Linux-specific device scanners are only importable on linux systems")
import re
import subprocess
import typing
import pyudev
from ._base import BaseDeviceScanner, BaseLANDeviceScanner
from ..device import USBDevice, LANDevice
__all__ = ["LinuxUSBDeviceScanner", "LinuxLANDeviceScanner"]
####################################################################################################
[docs]class LinuxUSBDeviceScanner(BaseDeviceScanner):
"""A device scanner that scans for usb devices on linux systems. It scans all usb ports for
devices.
"""
def __init__(self):
super().__init__()
self._context = pyudev.Context()
[docs] @staticmethod
def _device_from_raw(raw_device: pyudev.Device) -> USBDevice:
"""Converts a raw device provided from pyudev into a `USBDevice`-object.
Args:
raw_device: A device provided from pyudev as result of scanning all usb ports.
Returns:
USBDevice: The `raw_device` converted into a `USBDevice`-object.
"""
if not isinstance(raw_device, pyudev.Device):
raise TypeError("Expected \"pyudev.Device\", got \"{}\" instead.".format(
type(raw_device)))
try:
device_type = raw_device.properties["SUBSYSTEM"].upper()
except (KeyError, AttributeError):
raise TypeError("Could not specify the device's subsystem, expected: \"USB\".")
if device_type != "USB":
raise TypeError("Expected subsystem \"USB\", got \"{}\" instead.".format(device_type))
dev = USBDevice()
try:
dev.address = raw_device.device_path
except (AttributeError, TypeError) as exc:
# When the raw device does not contain a path, it makes no sense
raise TypeError("Could not get device path from pyudev.Device") from exc
# Extracting device properties
if "DEVNAME" in raw_device.properties:
dev.address_aliases = [raw_device.properties["DEVNAME"]]
if "ID_VENDOR_ID" in raw_device.properties:
dev.vendor_id = int(raw_device.properties["ID_VENDOR_ID"], base=16)
if "ID_MODEL_ID" in raw_device.properties:
dev.product_id = int(raw_device.properties["ID_MODEL_ID"], base=16)
if "ID_REVISION" in raw_device.properties:
dev.revision_id = int(raw_device.properties["ID_REVISION"], base=16)
if "ID_SERIAL_SHORT" in raw_device.properties:
dev.serial = raw_device.properties["ID_SERIAL_SHORT"]
return dev
[docs] def _scan(self, rescan: bool) -> typing.Sequence[USBDevice]:
"""Scans all usb ports for devices.
Args:
rescan: True, if the ports should be scanned again. False, if you only want to scan,
if there are no results from a previous scan.
"""
if len(self._devices) > 0 and not rescan:
return self._devices
self._devices.clear()
raw_devices = self._context.list_devices()
for raw_dev in raw_devices:
try:
dev = self._device_from_raw(raw_dev)
self._devices.append(dev)
except (TypeError, ValueError):
pass
return tuple(self._devices)
[docs]class LinuxLANDeviceScanner(BaseLANDeviceScanner):
"""A device scanner that scans the local network for ethernet devices.
Args:
**kwargs:
- nmap_search_path: One or multiple paths where to search for the nmap executable.
"""
def __init__(self, **kwargs):
super().__init__(**kwargs)
# Regular Expression: " <ip address> <hardware type> <mac address> ..."
self._arp_regex = re.compile(r"^[ \t]*((?:\d{1,3}\.){3}\d{1,3})[ \t]+(\w*[ \t]+)?"
r"([0-9A-Fa-f]{2}[.:\-]){5}([0-9A-Fa-f]{2})")
[docs] def _get_arp_cache(self) -> typing.Dict[str, LANDevice]:
"""Runs the arp command and extracts ip and mac addresses from the command's output.
Returns:
dict: A dictionary, mapping strings to `LANDevice`s. The dictionary contains all results
of the arp command, that contain a valid ip and mac address.
"""
devices = {}
try:
# Run "arp -n", to retrieve all mac addresses from the ARP-cache
process = subprocess.Popen(["arp", "-n"],
bufsize=100000,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
except FileNotFoundError as exc:
raise FileNotFoundError("Command 'arp' was not found. Please, make sure \"net-tools\" "
"is installed.") from exc
raw_arp_out, raw_arp_err = process.communicate()
if process.returncode != 0:
# The arp-command failed
return devices
arp_out = bytes.decode(raw_arp_out, errors="ignore")
arp_err = bytes.decode(raw_arp_err, errors="ignore")
if len(arp_err) > 0:
# The arp-command failed
return devices
for line in arp_out.splitlines():
if self._arp_regex.match(line):
# Valid line due to regular expression
components = line.split()
ip_address = components[0]
# The mac address can be found in third or second component, depending on whether
# the <hardware type> is contained or not.
for i in [2, 1]:
try:
mac_address = LANDevice.format_mac(components[i])
break
except (IndexError, TypeError):
pass
else:
# If no mac address was found, continue with next line
continue
if mac_address in devices:
# If mac address is already known, ip address is added to the address aliases
if ip_address not in devices[mac_address].all_addresses:
devices[mac_address].address_aliases = [
*devices[mac_address].address_aliases,
ip_address]
else:
dev = LANDevice()
dev.address = ip_address
dev.mac_address = mac_address
devices[mac_address] = dev
return devices