mass_config

 avatar
unknown
python
4 months ago
13 kB
11
Indexable
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import socket
import sys
import ipaddress
import argparse
import concurrent.futures
import logging
import os
import time
from datetime import datetime

# Konfiguracja loggera
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(f"zebra_config_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

# Stałe
ZEBRA_PORT = 9100  # Standardowy port drukarek Zebra
TIMEOUT = 2  # Timeout w sekundach
MAX_WORKERS = 50  # Maksymalna liczba wątków

# Komendy ZPL do konfiguracji drukarek
ZPL_COMMANDS = {
    # Podstawowe zabezpieczenia
    "disable_ftp": "! U1 setvar \"ip.ftp.enable\" \"off\"\r\n",
    "disable_snmp": "! U1 setvar \"ip.snmp.enable\" \"off\"\r\n",
    "disable_telnet": "! U1 setvar \"ip.telnet.enable\" \"off\"\r\n",
}

# Dodatkowe komendy dla poszczególnych modeli drukarek
MODEL_COMMANDS = {
    "GK": {},
    "ZT": {},
    "ZD": {},
    "ZQ": {}
}

def is_zebra_printer(ip, port=ZEBRA_PORT, timeout=TIMEOUT):
    """Sprawdza, czy pod danym adresem IP i portem znajduje się drukarka Zebra."""
    try:
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            s.settimeout(timeout)
            s.connect((ip, port))
            
            # Metoda 1: Wysyłamy zapytanie ~HQES, które zwraca informacje o drukarce
            s.sendall(b"~HQES\r\n")
            try:
                response = s.recv(1024)
                if b"ZEBRA" in response or b"ZPL" in response:
                    return True
            except socket.timeout:
                # Niektóre drukarki mogą nie odpowiadać na to zapytanie
                pass
            
            # Metoda 2: Sprawdź, czy port 9100 jest otwarty i przyjmuje połączenia
            try:
                with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as test_socket:
                    test_socket.settimeout(timeout)
                    test_socket.connect((ip, port))
                    # Jeśli udało się połączyć, to prawdopodobnie jest to drukarka
                    logger.info(f"Wykryto urządzenie z otwartym portem {port} pod adresem {ip}")
                    return True
            except:
                pass
                
            return False
    except (socket.timeout, ConnectionRefusedError, OSError) as e:
        logger.debug(f"Nie można połączyć się z {ip}:{port} - {str(e)}")
        return False

def detect_printer_model(ip, port=ZEBRA_PORT, timeout=TIMEOUT):
    """Wykrywa model drukarki Zebra."""
    try:
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            s.settimeout(timeout)
            s.connect((ip, port))
            
            # Próba uzyskania informacji o drukarce
            commands = [
                b"~HI\r\n",  # Host Information
                b"! U1 getvar \"device.host_status\"\r\n",  # SGD komenda do uzyskania statusu
                b"! U1 getvar \"device.product_name\"\r\n"  # SGD komenda do uzyskania nazwy produktu
            ]
            
            model = "Unknown"
            
            for cmd in commands:
                try:
                    s.sendall(cmd)
                    response = s.recv(1024)
                    
                    # Próba identyfikacji modelu na podstawie odpowiedzi
                    response_str = response.decode('utf-8', errors='ignore')
                    logger.debug(f"Odpowiedź od {ip} na komendę {cmd}: {response_str}")
                    
                    for prefix in ["ZT", "GK", "ZD", "ZQ", "GX"]:
                        if prefix in response_str:
                            model = prefix
                            return model
                except:
                    continue
            
            return model
    except Exception as e:
        logger.error(f"Błąd podczas wykrywania modelu drukarki {ip}: {e}")
        return "Unknown"

def send_zpl_commands(ip, commands, port=ZEBRA_PORT, timeout=TIMEOUT):
    """Wysyła komendy ZPL do drukarki."""
    try:
        # Najpierw sprawdź model drukarki
        model_prefix = detect_printer_model(ip, port, timeout)
        logger.info(f"Wykryto drukarkę {model_prefix} pod adresem {ip}")
        
        # Wybierz komendy specyficzne dla modelu (jeśli istnieją)
        model_specific_commands = {}
        for model_key in MODEL_COMMANDS:
            if model_prefix.startswith(model_key):
                model_specific_commands = MODEL_COMMANDS[model_key]
                logger.info(f"Używam komend specyficznych dla modelu {model_key}")
                break
        
        # Łączymy wszystkie komendy - najpierw ogólne, potem specyficzne dla modelu
        all_commands = {**commands, **model_specific_commands}
        
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            s.settimeout(timeout)
            s.connect((ip, port))
            logger.debug(f"Nawiązano połączenie z {ip}:{port}")
            
            for cmd_name, cmd in all_commands.items():
                try:
                    s.sendall(cmd.encode('utf-8'))
                    logger.info(f"Wysłano komendę {cmd_name} do {ip}")
                    time.sleep(0.5)  # Krótka pauza między komendami
                    
                    # Próba odczytu odpowiedzi
                    try:
                        s.settimeout(1)
                        response = s.recv(1024)
                        logger.debug(f"Odpowiedź od {ip} na komendę {cmd_name}: {response}")
                    except socket.timeout:
                        pass  # Brak odpowiedzi to normalne zachowanie
                except Exception as e:
                    logger.warning(f"Błąd podczas wysyłania komendy {cmd_name} do {ip}: {e}")
            
            # Wyślij komendę resetującą na koniec
            try:
                reset_cmd = "! U1 do \"device.reset\" \"now\"\r\n"
                s.sendall(reset_cmd.encode('utf-8'))
                logger.info(f"Wysłano komendę resetującą do {ip}")
            except Exception as e:
                logger.warning(f"Błąd podczas wysyłania komendy resetującej do {ip}: {e}")
                
            return True
    except Exception as e:
        logger.error(f"Błąd podczas nawiązywania połączenia z {ip}:{port} - {e}")
        return False

def scan_network(network, port=ZEBRA_PORT):
    """Skanuje podaną sieć w poszukiwaniu drukarek Zebra."""
    printers = []
    network = ipaddress.IPv4Network(network)
    total_ips = network.num_addresses
    
    logger.info(f"Rozpoczęto skanowanie sieci {network}, liczba adresów: {total_ips}")
    
    if total_ips > 1000:
        logger.warning(f"Skanowanie dużej sieci ({total_ips} adresów) może zająć dużo czasu!")
    
    # Użycie puli wątków do równoległego skanowania
    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        future_to_ip = {executor.submit(is_zebra_printer, str(ip), port): str(ip) for ip in network.hosts()}
        
        for i, future in enumerate(concurrent.futures.as_completed(future_to_ip)):
            ip = future_to_ip[future]
            if i % 50 == 0:
                logger.info(f"Zeskanowano {i}/{total_ips} adresów...")
            try:
                if future.result():
                    logger.info(f"Znaleziono drukarkę Zebra pod adresem {ip}")
                    printers.append(ip)
            except Exception as e:
                logger.error(f"Błąd podczas skanowania {ip}: {e}")
    
    logger.info(f"Zakończono skanowanie. Znaleziono {len(printers)} drukarek Zebra.")
    return printers

def read_ip_list(file_path):
    """Wczytuje listę adresów IP z pliku."""
    try:
        with open(file_path, 'r') as f:
            ips = [line.strip() for line in f.readlines() if line.strip()]
        logger.info(f"Wczytano {len(ips)} adresów IP z pliku {file_path}")
        return ips
    except Exception as e:
        logger.error(f"Błąd podczas wczytywania pliku {file_path}: {e}")
        sys.exit(1)

def verify_printers(ip_list, port=ZEBRA_PORT):
    """Weryfikuje, które z podanych adresów IP są drukarkami Zebra."""
    verified_printers = []
    logger.info(f"Weryfikowanie {len(ip_list)} adresów IP...")
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        future_to_ip = {executor.submit(is_zebra_printer, ip, port): ip for ip in ip_list}
        
        for future in concurrent.futures.as_completed(future_to_ip):
            ip = future_to_ip[future]
            try:
                if future.result():
                    logger.info(f"Zweryfikowano drukarkę Zebra pod adresem {ip}")
                    verified_printers.append(ip)
                else:
                    logger.warning(f"Adres {ip} nie jest drukarką Zebra lub jest niedostępny")
            except Exception as e:
                logger.error(f"Błąd podczas weryfikacji {ip}: {e}")
    
    logger.info(f"Zweryfikowano {len(verified_printers)} drukarek Zebra.")
    return verified_printers

def configure_printers(printers, commands=None):
    """Konfiguruje drukarki Zebra wysyłając komendy ZPL."""
    if commands is None:
        commands = ZPL_COMMANDS
    
    success_count = 0
    logger.info(f"Rozpoczęto konfigurację {len(printers)} drukarek...")
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        future_to_ip = {executor.submit(send_zpl_commands, ip, commands): ip for ip in printers}
        
        for future in concurrent.futures.as_completed(future_to_ip):
            ip = future_to_ip[future]
            try:
                if future.result():
                    logger.info(f"Konfiguracja drukarki {ip} zakończona powodzeniem")
                    success_count += 1
                else:
                    logger.warning(f"Konfiguracja drukarki {ip} nie powiodła się")
            except Exception as e:
                logger.error(f"Błąd podczas konfiguracji {ip}: {e}")
    
    logger.info(f"Konfiguracja zakończona. Pomyślnie skonfigurowano {success_count}/{len(printers)} drukarek.")
    return success_count

def main():
    parser = argparse.ArgumentParser(description='Skrypt do masowej konfiguracji drukarek Zebra')
    group = parser.add_mutually_exclusive_group(required=True)
    group.add_argument('--scan', metavar='NETWORK', help='Skanuj podaną sieć (np. 192.168.1.0/24)')
    group.add_argument('--file', metavar='FILE', help='Plik z listą adresów IP drukarek')
    parser.add_argument('--port', type=int, default=ZEBRA_PORT, help=f'Port drukarki (domyślnie {ZEBRA_PORT})')
    parser.add_argument('--verify-only', action='store_true', help='Tylko weryfikuj drukarki bez konfiguracji')
    parser.add_argument('--output', metavar='FILE', help='Zapisz znalezione drukarki do pliku')
    parser.add_argument('--force', action='store_true', help='Pomiń weryfikację i traktuj wszystkie adresy jako drukarki Zebra')
    parser.add_argument('--debug', action='store_true', help='Włącz szczegółowe logowanie do debugowania')
    
    args = parser.parse_args()
    
    # Ustawienie poziomu logowania
    if args.debug:
        logger.setLevel(logging.DEBUG)
        logger.debug("Włączono tryb debugowania")
    
    # Znajdź drukarki
    if args.scan:
        printers = scan_network(args.scan, args.port)
    else:  # args.file
        ip_list = read_ip_list(args.file)
        if args.force:
            logger.info(f"Tryb wymuszony - traktowanie wszystkich {len(ip_list)} adresów jako drukarki Zebra")
            printers = ip_list
        else:
            printers = verify_printers(ip_list, args.port)
    
    # Zapisz listę drukarek do pliku, jeśli podano argument --output
    if args.output and printers:
        try:
            with open(args.output, 'w') as f:
                for ip in printers:
                    f.write(f"{ip}\n")
            logger.info(f"Zapisano listę {len(printers)} drukarek do pliku {args.output}")
        except Exception as e:
            logger.error(f"Błąd podczas zapisywania do pliku {args.output}: {e}")
    
    # Konfiguruj drukarki, jeśli nie wybrano tylko weryfikacji
    if not args.verify_only and printers:
        configure_printers(printers)
    
    if not printers:
        logger.warning("Nie znaleziono żadnych drukarek Zebra.")
    
    return 0

if __name__ == "__main__":
    sys.exit(main())
Editor is loading...
Leave a Comment