printer_massconfig.py
unknown
python
11 days ago
11 kB
3
Indexable
#!/usr/bin/env python3 # -*- coding: utf-8 -*- import socket import sys import ipaddress import argparse import concurrent.futures import logging import os from datetime import datetime # Konfiguracja loggera logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(f"zebra_config_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # Stałe ZEBRA_PORT = 9100 # Standardowy port drukarek Zebra TIMEOUT = 2 # Timeout w sekundach MAX_WORKERS = 50 # Maksymalna liczba wątków # Komendy ZPL do konfiguracji drukarek ZPL_COMMANDS = { "disable_ftp": "! U1 setvar \"ip.ftp.enable\" \"off\"\r\n", "disable_snmp_v1v2": "! U1 setvar \"ip.snmp.enable\" \"off\"\r\n", "set_admin_password": "! U1 setvar \"weblink.ip.conn1.authentication.password\" \"NoweSilneHaslo123!\"\r\n", # Dla starszych modeli drukarek można użyć innych komend "disable_ftp_legacy": "! U1 do \"device.reset\" \"now\"\r\n", # Dodaj więcej komend wedle potrzeb } # Dodatkowe komendy dla poszczególnych modeli drukarek MODEL_COMMANDS = { "GK420d": { "disable_ftp": "! U1 setvar \"ip.ftp.enable\" \"off\"\r\n", }, "ZT230": { "disable_snmp": "! U1 setvar \"ip.snmp.enable\" \"off\"\r\n", } # Dodaj więcej modeli i komend } def is_zebra_printer(ip, port=ZEBRA_PORT, timeout=TIMEOUT): """Sprawdza, czy pod danym adresem IP i portem znajduje się drukarka Zebra.""" try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.settimeout(timeout) s.connect((ip, port)) # Metoda 1: Wysyłamy zapytanie ~HQES, które zwraca informacje o drukarce s.sendall(b"~HQES\r\n") try: response = s.recv(1024) if b"ZEBRA" in response or b"ZPL" in response: return True except socket.timeout: # Niektóre drukarki mogą nie odpowiadać na to zapytanie pass # Metoda 2: Sprawdź, czy port 9100 jest otwarty i przyjmuje połączenia # To mniej dokładne, ale może wykryć drukarki, które nie odpowiadają na komendy ZPL try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as test_socket: test_socket.settimeout(timeout) test_socket.connect((ip, port)) # Jeśli udało się połączyć, to prawdopodobnie jest to drukarka logger.info(f"Wykryto urządzenie z otwartym portem {port} pod adresem {ip}") return True except: pass return False except (socket.timeout, ConnectionRefusedError, OSError) as e: logger.debug(f"Nie można połączyć się z {ip}:{port} - {str(e)}") return False def send_zpl_commands(ip, commands, port=ZEBRA_PORT, timeout=TIMEOUT): """Wysyła komendy ZPL do drukarki.""" try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.settimeout(timeout) s.connect((ip, port)) logger.debug(f"Nawiązano połączenie z {ip}:{port}") for cmd_name, cmd in commands.items(): try: s.sendall(cmd.encode('utf-8')) logger.info(f"Wysłano komendę {cmd_name} do {ip}") # Spróbuj odczytać odpowiedź (niektóre drukarki mogą odpowiadać) try: s.settimeout(1) # Krótki timeout na odpowiedź response = s.recv(1024) logger.debug(f"Odpowiedź od {ip} na komendę {cmd_name}: {response}") except socket.timeout: pass # Brak odpowiedzi to normalne zachowanie except Exception as e: logger.warning(f"Błąd podczas wysyłania komendy {cmd_name} do {ip}: {e}") # Wyślij komendę resetującą, aby zastosować ustawienia try: reset_cmd = "! U1 do \"device.reset\" \"now\"\r\n" s.sendall(reset_cmd.encode('utf-8')) logger.info(f"Wysłano komendę resetującą do {ip}") except Exception as e: logger.warning(f"Błąd podczas wysyłania komendy resetującej do {ip}: {e}") return True except Exception as e: logger.error(f"Błąd podczas nawiązywania połączenia z {ip}:{port} - {e}") return False def scan_network(network, port=ZEBRA_PORT): """Skanuje podaną sieć w poszukiwaniu drukarek Zebra.""" printers = [] network = ipaddress.IPv4Network(network) total_ips = network.num_addresses logger.info(f"Rozpoczęto skanowanie sieci {network}, liczba adresów: {total_ips}") if total_ips > 1000: logger.warning(f"Skanowanie dużej sieci ({total_ips} adresów) może zająć dużo czasu!") # Użycie puli wątków do równoległego skanowania with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: future_to_ip = {executor.submit(is_zebra_printer, str(ip), port): str(ip) for ip in network.hosts()} for i, future in enumerate(concurrent.futures.as_completed(future_to_ip)): ip = future_to_ip[future] if i % 50 == 0: logger.info(f"Zeskanowano {i}/{total_ips} adresów...") try: if future.result(): logger.info(f"Znaleziono drukarkę Zebra pod adresem {ip}") printers.append(ip) except Exception as e: logger.error(f"Błąd podczas skanowania {ip}: {e}") logger.info(f"Zakończono skanowanie. Znaleziono {len(printers)} drukarek Zebra.") return printers def read_ip_list(file_path): """Wczytuje listę adresów IP z pliku.""" try: with open(file_path, 'r') as f: ips = [line.strip() for line in f.readlines() if line.strip()] logger.info(f"Wczytano {len(ips)} adresów IP z pliku {file_path}") return ips except Exception as e: logger.error(f"Błąd podczas wczytywania pliku {file_path}: {e}") sys.exit(1) def verify_printers(ip_list, port=ZEBRA_PORT): """Weryfikuje, które z podanych adresów IP są drukarkami Zebra.""" verified_printers = [] logger.info(f"Weryfikowanie {len(ip_list)} adresów IP...") with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: future_to_ip = {executor.submit(is_zebra_printer, ip, port): ip for ip in ip_list} for future in concurrent.futures.as_completed(future_to_ip): ip = future_to_ip[future] try: if future.result(): logger.info(f"Zweryfikowano drukarkę Zebra pod adresem {ip}") verified_printers.append(ip) else: logger.warning(f"Adres {ip} nie jest drukarką Zebra lub jest niedostępny") except Exception as e: logger.error(f"Błąd podczas weryfikacji {ip}: {e}") logger.info(f"Zweryfikowano {len(verified_printers)} drukarek Zebra.") return verified_printers def configure_printers(printers, commands=None): """Konfiguruje drukarki Zebra wysyłając komendy ZPL.""" if commands is None: commands = ZPL_COMMANDS success_count = 0 logger.info(f"Rozpoczęto konfigurację {len(printers)} drukarek...") with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: future_to_ip = {executor.submit(send_zpl_commands, ip, commands): ip for ip in printers} for future in concurrent.futures.as_completed(future_to_ip): ip = future_to_ip[future] try: if future.result(): logger.info(f"Konfiguracja drukarki {ip} zakończona powodzeniem") success_count += 1 else: logger.warning(f"Konfiguracja drukarki {ip} nie powiodła się") except Exception as e: logger.error(f"Błąd podczas konfiguracji {ip}: {e}") logger.info(f"Konfiguracja zakończona. Pomyślnie skonfigurowano {success_count}/{len(printers)} drukarek.") return success_count def main(): parser = argparse.ArgumentParser(description='Skrypt do masowej konfiguracji drukarek Zebra') group = parser.add_mutually_exclusive_group(required=True) group.add_argument('--scan', metavar='NETWORK', help='Skanuj podaną sieć (np. 192.168.1.0/24)') group.add_argument('--file', metavar='FILE', help='Plik z listą adresów IP drukarek') parser.add_argument('--port', type=int, default=ZEBRA_PORT, help=f'Port drukarki (domyślnie {ZEBRA_PORT})') parser.add_argument('--verify-only', action='store_true', help='Tylko weryfikuj drukarki bez konfiguracji') parser.add_argument('--output', metavar='FILE', help='Zapisz znalezione drukarki do pliku') parser.add_argument('--force', action='store_true', help='Pomiń weryfikację i traktuj wszystkie adresy jako drukarki Zebra') parser.add_argument('--debug', action='store_true', help='Włącz szczegółowe logowanie do debugowania') args = parser.parse_args() # Ustawienie poziomu logowania if args.debug: logger.setLevel(logging.DEBUG) logger.debug("Włączono tryb debugowania") # Znajdź drukarki if args.scan: printers = scan_network(args.scan, args.port) else: # args.file ip_list = read_ip_list(args.file) if args.force: logger.info(f"Tryb wymuszony - traktowanie wszystkich {len(ip_list)} adresów jako drukarki Zebra") printers = ip_list else: printers = verify_printers(ip_list, args.port) # Zapisz listę drukarek do pliku, jeśli podano argument --output if args.output and printers: try: with open(args.output, 'w') as f: for ip in printers: f.write(f"{ip}\n") logger.info(f"Zapisano listę {len(printers)} drukarek do pliku {args.output}") except Exception as e: logger.error(f"Błąd podczas zapisywania do pliku {args.output}: {e}") # Konfiguruj drukarki, jeśli nie wybrano tylko weryfikacji if not args.verify_only and printers: configure_printers(printers) if not printers: logger.warning("Nie znaleziono żadnych drukarek Zebra.") return 0 if __name__ == "__main__": sys.exit(main())
Editor is loading...
Leave a Comment