mass_check

 avatar
unknown
python
5 days ago
11 kB
2
Indexable
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

import socket
import sys
import argparse
import concurrent.futures
import logging
import os
import json
from datetime import datetime

# Konfiguracja loggera
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(f"zebra_verify_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

# Stałe
ZEBRA_PORT = 9100  # Standardowy port drukarek Zebra
TIMEOUT = 5  # Większy timeout dla zapytań weryfikacyjnych
MAX_WORKERS = 20  # Liczba równoległych zadań

# Parametry do sprawdzenia
VERIFY_PARAMS = {
    "ftp_status": {
        "command": "! U1 getvar \"ip.ftp.enable\"\r\n",
        "expected": "off",
        "description": "Status FTP"
    },
    "snmp_status": {
        "command": "! U1 getvar \"ip.snmp.enable\"\r\n", 
        "expected": "off",
        "description": "Status SNMP"
    }
    # Parametry obsługi haseł zostały usunięte
}

# Alternatywne polecenia dla różnych modeli drukarek
MODEL_COMMANDS = {
    "GK420d": {
        "ftp_status": {
            "command": "! U1 getvar \"ip.ftp\"\r\n"
        }
    },
    "ZT230": {
        "snmp_status": {
            "command": "! U1 getvar \"ip.snmp\"\r\n"
        }
    }
    # Dodaj więcej modeli i komend
}

def send_command_and_get_response(ip, command, port=ZEBRA_PORT, timeout=TIMEOUT):
    """Wysyła polecenie ZPL do drukarki i zwraca odpowiedź."""
    try:
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            s.settimeout(timeout)
            s.connect((ip, port))
            s.sendall(command.encode('utf-8'))
            
            # Odczytuj odpowiedź dopóki nie otrzymasz wszystkich danych
            response = b""
            while True:
                try:
                    s.settimeout(2)  # Krótki timeout na odpowiedź
                    chunk = s.recv(1024)
                    if not chunk:
                        break
                    response += chunk
                except socket.timeout:
                    break  # Koniec danych
            
            return response.decode('utf-8', errors='ignore').strip()
    except Exception as e:
        logger.error(f"Błąd podczas komunikacji z {ip}: {e}")
        return None

def get_printer_model(ip, port=ZEBRA_PORT):
    """Pobiera model drukarki."""
    model_cmd = "! U1 getvar \"device.product_name\"\r\n"
    response = send_command_and_get_response(ip, model_cmd, port)
    
    if response and len(response) > 0:
        return response
    
    # Spróbuj alternatywnego polecenia
    alt_model_cmd = "~HI\r\n"
    response = send_command_and_get_response(ip, alt_model_cmd, port)
    
    if response:
        # Parsuj odpowiedź HI, która zawiera model i inne informacje
        lines = response.splitlines()
        for line in lines:
            if "MODEL" in line or "PRINTER" in line:
                return line.split(":", 1)[1].strip() if ":" in line else line.strip()
    
    return "Unknown"

def verify_printer_settings(ip, port=ZEBRA_PORT):
    """Weryfikuje ustawienia drukarki."""
    results = {"ip": ip, "status": "ERROR", "details": {}}
    
    try:
        # Sprawdź, czy drukarka odpowiada
        connection_test = send_command_and_get_response(ip, "! U1 getvar \"device.host_status\"\r\n", port)
        if connection_test is None:
            results["status"] = "BRAK ODPOWIEDZI"
            return results
        
        # Pobierz model drukarki
        model = get_printer_model(ip, port)
        results["model"] = model
        logger.info(f"Drukarka {ip}: Model {model}")
        
        # Wybierz odpowiednie komendy dla danego modelu
        commands = {}
        for param, details in VERIFY_PARAMS.items():
            commands[param] = details["command"]
            
            # Zastąp komendą specyficzną dla modelu, jeśli istnieje
            if model in MODEL_COMMANDS and param in MODEL_COMMANDS[model]:
                commands[param] = MODEL_COMMANDS[model][param]["command"]
        
        # Sprawdź każdy parametr
        all_good = True
        for param, command in commands.items():
            response = send_command_and_get_response(ip, command, port)
            expected = VERIFY_PARAMS[param]["expected"]
            description = VERIFY_PARAMS[param]["description"]
            
            # Jeśli nie ma odpowiedzi, spróbuj alternatywnego polecenia
            if response is None or response == "":
                if param == "ftp_status":
                    # Alternatywne sprawdzenie FTP
                    response = send_command_and_get_response(ip, "! U1 getvar \"ip.port\"\r\n", port)
                
                if response is None or response == "":
                    response = "BRAK ODPOWIEDZI"
            
            # Usuń znaki nowej linii i spacje z odpowiedzi
            if response:
                response = response.strip()
            
            # Sprawdź, czy odpowiedź jest oczekiwana
            status = "OK" if response and expected in response.lower() else "BŁĄD"
            if status == "BŁĄD":
                all_good = False
            
            results["details"][param] = {
                "description": description,
                "value": response,
                "expected": expected,
                "status": status
            }
            
            logger.info(f"Drukarka {ip}, {description}: {status} (otrzymano: '{response}', oczekiwano: '{expected}')")
        
        results["status"] = "OK" if all_good else "CZĘŚCIOWE BŁĘDY"
        
    except Exception as e:
        logger.error(f"Błąd podczas weryfikacji drukarki {ip}: {e}")
        results["error"] = str(e)
        results["status"] = "BŁĄD"
    
    return results

def read_ip_list(file_path):
    """Wczytuje listę adresów IP z pliku."""
    try:
        with open(file_path, 'r') as f:
            ips = [line.strip() for line in f.readlines() if line.strip()]
        logger.info(f"Wczytano {len(ips)} adresów IP z pliku {file_path}")
        return ips
    except Exception as e:
        logger.error(f"Błąd podczas wczytywania pliku {file_path}: {e}")
        sys.exit(1)

def format_results_table(results):
    """Formatuje wyniki weryfikacji w czytelną tabelę."""
    headers = ["IP", "Model", "Status", "FTP", "SNMP"]
    
    # Określamy szerokość kolumn
    col_widths = [15, 15, 15, 10, 10]
    
    # Tworzymy nagłówek tabeli
    header_row = " | ".join(h.ljust(col_widths[i]) for i, h in enumerate(headers))
    divider = "-" * len(header_row)
    
    table = [divider, header_row, divider]
    
    # Dodajemy wiersze danych
    for r in results:
        row_data = [
            r["ip"].ljust(col_widths[0]),
            r.get("model", "Unknown").ljust(col_widths[1]),
            r["status"].ljust(col_widths[2]),
            r["details"].get("ftp_status", {}).get("status", "N/A").ljust(col_widths[3]),
            r["details"].get("snmp_status", {}).get("status", "N/A").ljust(col_widths[4])
        ]
        table.append(" | ".join(row_data))
    
    table.append(divider)
    return "\n".join(table)

def main():
    parser = argparse.ArgumentParser(description='Weryfikacja ustawień bezpieczeństwa drukarek Zebra')
    parser.add_argument('--file', metavar='FILE', required=True, help='Plik z listą adresów IP drukarek')
    parser.add_argument('--port', type=int, default=ZEBRA_PORT, help=f'Port drukarki (domyślnie {ZEBRA_PORT})')
    parser.add_argument('--output', metavar='FILE', help='Zapisz wyniki do pliku JSON')
    parser.add_argument('--summary', metavar='FILE', help='Zapisz podsumowanie w formacie tekstowym')
    parser.add_argument('--debug', action='store_true', help='Włącz szczegółowe logowanie')
    
    args = parser.parse_args()
    
    # Ustaw poziom logowania
    if args.debug:
        logger.setLevel(logging.DEBUG)
    
    # Wczytaj listę IP
    ip_list = read_ip_list(args.file)
    
    # Weryfikuj ustawienia drukarek
    all_results = []
    logger.info(f"Rozpoczynam weryfikację {len(ip_list)} drukarek...")
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
        future_to_ip = {executor.submit(verify_printer_settings, ip, args.port): ip for ip in ip_list}
        
        for future in concurrent.futures.as_completed(future_to_ip):
            ip = future_to_ip[future]
            try:
                result = future.result()
                all_results.append(result)
            except Exception as e:
                logger.error(f"Nieoczekiwany błąd dla {ip}: {e}")
                all_results.append({"ip": ip, "status": "BŁĄD KRYTYCZNY", "error": str(e)})
    
    # Sortuj wyniki według IP
    all_results.sort(key=lambda x: x["ip"])
    
    # Wyświetl podsumowanie
    num_ok = sum(1 for r in all_results if r["status"] == "OK")
    num_partial = sum(1 for r in all_results if r["status"] == "CZĘŚCIOWE BŁĘDY")
    num_error = sum(1 for r in all_results if r["status"] in ["BŁĄD", "BŁĄD KRYTYCZNY", "BRAK ODPOWIEDZI"])
    
    logger.info(f"Weryfikacja zakończona: {num_ok} OK, {num_partial} częściowe błędy, {num_error} błędy/brak odpowiedzi")
    
    # Generuj tabelę wyników
    table = format_results_table(all_results)
    print("\nWyniki weryfikacji:")
    print(table)
    
    # Zapisz wyniki do pliku JSON
    if args.output:
        try:
            with open(args.output, 'w', encoding='utf-8') as f:
                json.dump(all_results, f, ensure_ascii=False, indent=2)
            logger.info(f"Zapisano szczegółowe wyniki do pliku {args.output}")
        except Exception as e:
            logger.error(f"Błąd podczas zapisywania do pliku {args.output}: {e}")
    
    # Zapisz podsumowanie w formacie tekstowym
    if args.summary:
        try:
            with open(args.summary, 'w', encoding='utf-8') as f:
                f.write(f"Podsumowanie weryfikacji drukarek Zebra - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
                f.write(f"Liczba drukarek: {len(all_results)}\n")
                f.write(f"Poprawnie skonfigurowane: {num_ok}\n")
                f.write(f"Częściowe błędy: {num_partial}\n")
                f.write(f"Błędy/brak odpowiedzi: {num_error}\n\n")
                f.write(table)
            logger.info(f"Zapisano podsumowanie do pliku {args.summary}")
        except Exception as e:
            logger.error(f"Błąd podczas zapisywania do pliku {args.summary}: {e}")
    
    return 0

if __name__ == "__main__":
    sys.exit(main())
Editor is loading...
Leave a Comment