printer_massconfig.py

 avatar
unknown
python
11 days ago
11 kB
3
Indexable
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import socket
import sys
import ipaddress
import argparse
import concurrent.futures
import logging
import os
from datetime import datetime

# Konfiguracja loggera
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(f"zebra_config_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

# Stałe
ZEBRA_PORT = 9100  # Standardowy port drukarek Zebra
TIMEOUT = 2  # Timeout w sekundach
MAX_WORKERS = 50  # Maksymalna liczba wątków

# Komendy ZPL do konfiguracji drukarek
ZPL_COMMANDS = {
    "disable_ftp": "! U1 setvar \"ip.ftp.enable\" \"off\"\r\n",
    "disable_snmp_v1v2": "! U1 setvar \"ip.snmp.enable\" \"off\"\r\n",
    "set_admin_password": "! U1 setvar \"weblink.ip.conn1.authentication.password\" \"NoweSilneHaslo123!\"\r\n",
    # Dla starszych modeli drukarek można użyć innych komend
    "disable_ftp_legacy": "! U1 do \"device.reset\" \"now\"\r\n",
    # Dodaj więcej komend wedle potrzeb
}

# Dodatkowe komendy dla poszczególnych modeli drukarek
MODEL_COMMANDS = {
    "GK420d": {
        "disable_ftp": "! U1 setvar \"ip.ftp.enable\" \"off\"\r\n",
    },
    "ZT230": {
        "disable_snmp": "! U1 setvar \"ip.snmp.enable\" \"off\"\r\n",
    }
    # Dodaj więcej modeli i komend
}

def is_zebra_printer(ip, port=ZEBRA_PORT, timeout=TIMEOUT):
    """Sprawdza, czy pod danym adresem IP i portem znajduje się drukarka Zebra."""
    try:
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            s.settimeout(timeout)
            s.connect((ip, port))
            
            # Metoda 1: Wysyłamy zapytanie ~HQES, które zwraca informacje o drukarce
            s.sendall(b"~HQES\r\n")
            try:
                response = s.recv(1024)
                if b"ZEBRA" in response or b"ZPL" in response:
                    return True
            except socket.timeout:
                # Niektóre drukarki mogą nie odpowiadać na to zapytanie
                pass
                
            # Metoda 2: Sprawdź, czy port 9100 jest otwarty i przyjmuje połączenia
            # To mniej dokładne, ale może wykryć drukarki, które nie odpowiadają na komendy ZPL
            try:
                with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as test_socket:
                    test_socket.settimeout(timeout)
                    test_socket.connect((ip, port))
                    # Jeśli udało się połączyć, to prawdopodobnie jest to drukarka
                    logger.info(f"Wykryto urządzenie z otwartym portem {port} pod adresem {ip}")
                    return True
            except:
                pass
                
            return False
    except (socket.timeout, ConnectionRefusedError, OSError) as e:
        logger.debug(f"Nie można połączyć się z {ip}:{port} - {str(e)}")
        return False

def send_zpl_commands(ip, commands, port=ZEBRA_PORT, timeout=TIMEOUT):
    """Wysyła komendy ZPL do drukarki."""
    try:
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            s.settimeout(timeout)
            s.connect((ip, port))
            logger.debug(f"Nawiązano połączenie z {ip}:{port}")
            
            for cmd_name, cmd in commands.items():
                try:
                    s.sendall(cmd.encode('utf-8'))
                    logger.info(f"Wysłano komendę {cmd_name} do {ip}")
                    # Spróbuj odczytać odpowiedź (niektóre drukarki mogą odpowiadać)
                    try:
                        s.settimeout(1)  # Krótki timeout na odpowiedź
                        response = s.recv(1024)
                        logger.debug(f"Odpowiedź od {ip} na komendę {cmd_name}: {response}")
                    except socket.timeout:
                        pass  # Brak odpowiedzi to normalne zachowanie
                except Exception as e:
                    logger.warning(f"Błąd podczas wysyłania komendy {cmd_name} do {ip}: {e}")
            
            # Wyślij komendę resetującą, aby zastosować ustawienia
            try:
                reset_cmd = "! U1 do \"device.reset\" \"now\"\r\n"
                s.sendall(reset_cmd.encode('utf-8'))
                logger.info(f"Wysłano komendę resetującą do {ip}")
            except Exception as e:
                logger.warning(f"Błąd podczas wysyłania komendy resetującej do {ip}: {e}")
                
            return True
    except Exception as e:
        logger.error(f"Błąd podczas nawiązywania połączenia z {ip}:{port} - {e}")
        return False

def scan_network(network, port=ZEBRA_PORT):
    """Skanuje podaną sieć w poszukiwaniu drukarek Zebra."""
    printers = []
    network = ipaddress.IPv4Network(network)
    total_ips = network.num_addresses
    
    logger.info(f"Rozpoczęto skanowanie sieci {network}, liczba adresów: {total_ips}")
    
    if total_ips > 1000:
        logger.warning(f"Skanowanie dużej sieci ({total_ips} adresów) może zająć dużo czasu!")
    
    # Użycie puli wątków do równoległego skanowania
    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        future_to_ip = {executor.submit(is_zebra_printer, str(ip), port): str(ip) for ip in network.hosts()}
        
        for i, future in enumerate(concurrent.futures.as_completed(future_to_ip)):
            ip = future_to_ip[future]
            if i % 50 == 0:
                logger.info(f"Zeskanowano {i}/{total_ips} adresów...")
            try:
                if future.result():
                    logger.info(f"Znaleziono drukarkę Zebra pod adresem {ip}")
                    printers.append(ip)
            except Exception as e:
                logger.error(f"Błąd podczas skanowania {ip}: {e}")
    
    logger.info(f"Zakończono skanowanie. Znaleziono {len(printers)} drukarek Zebra.")
    return printers

def read_ip_list(file_path):
    """Wczytuje listę adresów IP z pliku."""
    try:
        with open(file_path, 'r') as f:
            ips = [line.strip() for line in f.readlines() if line.strip()]
        logger.info(f"Wczytano {len(ips)} adresów IP z pliku {file_path}")
        return ips
    except Exception as e:
        logger.error(f"Błąd podczas wczytywania pliku {file_path}: {e}")
        sys.exit(1)

def verify_printers(ip_list, port=ZEBRA_PORT):
    """Weryfikuje, które z podanych adresów IP są drukarkami Zebra."""
    verified_printers = []
    logger.info(f"Weryfikowanie {len(ip_list)} adresów IP...")
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        future_to_ip = {executor.submit(is_zebra_printer, ip, port): ip for ip in ip_list}
        
        for future in concurrent.futures.as_completed(future_to_ip):
            ip = future_to_ip[future]
            try:
                if future.result():
                    logger.info(f"Zweryfikowano drukarkę Zebra pod adresem {ip}")
                    verified_printers.append(ip)
                else:
                    logger.warning(f"Adres {ip} nie jest drukarką Zebra lub jest niedostępny")
            except Exception as e:
                logger.error(f"Błąd podczas weryfikacji {ip}: {e}")
    
    logger.info(f"Zweryfikowano {len(verified_printers)} drukarek Zebra.")
    return verified_printers

def configure_printers(printers, commands=None):
    """Konfiguruje drukarki Zebra wysyłając komendy ZPL."""
    if commands is None:
        commands = ZPL_COMMANDS
    
    success_count = 0
    logger.info(f"Rozpoczęto konfigurację {len(printers)} drukarek...")
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        future_to_ip = {executor.submit(send_zpl_commands, ip, commands): ip for ip in printers}
        
        for future in concurrent.futures.as_completed(future_to_ip):
            ip = future_to_ip[future]
            try:
                if future.result():
                    logger.info(f"Konfiguracja drukarki {ip} zakończona powodzeniem")
                    success_count += 1
                else:
                    logger.warning(f"Konfiguracja drukarki {ip} nie powiodła się")
            except Exception as e:
                logger.error(f"Błąd podczas konfiguracji {ip}: {e}")
    
    logger.info(f"Konfiguracja zakończona. Pomyślnie skonfigurowano {success_count}/{len(printers)} drukarek.")
    return success_count

def main():
    parser = argparse.ArgumentParser(description='Skrypt do masowej konfiguracji drukarek Zebra')
    group = parser.add_mutually_exclusive_group(required=True)
    group.add_argument('--scan', metavar='NETWORK', help='Skanuj podaną sieć (np. 192.168.1.0/24)')
    group.add_argument('--file', metavar='FILE', help='Plik z listą adresów IP drukarek')
    parser.add_argument('--port', type=int, default=ZEBRA_PORT, help=f'Port drukarki (domyślnie {ZEBRA_PORT})')
    parser.add_argument('--verify-only', action='store_true', help='Tylko weryfikuj drukarki bez konfiguracji')
    parser.add_argument('--output', metavar='FILE', help='Zapisz znalezione drukarki do pliku')
    parser.add_argument('--force', action='store_true', help='Pomiń weryfikację i traktuj wszystkie adresy jako drukarki Zebra')
    parser.add_argument('--debug', action='store_true', help='Włącz szczegółowe logowanie do debugowania')
    
    args = parser.parse_args()
    
    # Ustawienie poziomu logowania
    if args.debug:
        logger.setLevel(logging.DEBUG)
        logger.debug("Włączono tryb debugowania")
    
    # Znajdź drukarki
    if args.scan:
        printers = scan_network(args.scan, args.port)
    else:  # args.file
        ip_list = read_ip_list(args.file)
        if args.force:
            logger.info(f"Tryb wymuszony - traktowanie wszystkich {len(ip_list)} adresów jako drukarki Zebra")
            printers = ip_list
        else:
            printers = verify_printers(ip_list, args.port)
    
    # Zapisz listę drukarek do pliku, jeśli podano argument --output
    if args.output and printers:
        try:
            with open(args.output, 'w') as f:
                for ip in printers:
                    f.write(f"{ip}\n")
            logger.info(f"Zapisano listę {len(printers)} drukarek do pliku {args.output}")
        except Exception as e:
            logger.error(f"Błąd podczas zapisywania do pliku {args.output}: {e}")
    
    # Konfiguruj drukarki, jeśli nie wybrano tylko weryfikacji
    if not args.verify_only and printers:
        configure_printers(printers)
    
    if not printers:
        logger.warning("Nie znaleziono żadnych drukarek Zebra.")
    
    return 0

if __name__ == "__main__":
    sys.exit(main())
Editor is loading...
Leave a Comment