mass_check
unknown
python
5 days ago
11 kB
2
Indexable
#!/usr/bin/env python3 # -*- coding: utf-8 -*- import socket import sys import argparse import concurrent.futures import logging import os import json from datetime import datetime # Konfiguracja loggera logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(f"zebra_verify_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) # Stałe ZEBRA_PORT = 9100 # Standardowy port drukarek Zebra TIMEOUT = 5 # Większy timeout dla zapytań weryfikacyjnych MAX_WORKERS = 20 # Liczba równoległych zadań # Parametry do sprawdzenia VERIFY_PARAMS = { "ftp_status": { "command": "! U1 getvar \"ip.ftp.enable\"\r\n", "expected": "off", "description": "Status FTP" }, "snmp_status": { "command": "! U1 getvar \"ip.snmp.enable\"\r\n", "expected": "off", "description": "Status SNMP" } # Parametry obsługi haseł zostały usunięte } # Alternatywne polecenia dla różnych modeli drukarek MODEL_COMMANDS = { "GK420d": { "ftp_status": { "command": "! U1 getvar \"ip.ftp\"\r\n" } }, "ZT230": { "snmp_status": { "command": "! U1 getvar \"ip.snmp\"\r\n" } } # Dodaj więcej modeli i komend } def send_command_and_get_response(ip, command, port=ZEBRA_PORT, timeout=TIMEOUT): """Wysyła polecenie ZPL do drukarki i zwraca odpowiedź.""" try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.settimeout(timeout) s.connect((ip, port)) s.sendall(command.encode('utf-8')) # Odczytuj odpowiedź dopóki nie otrzymasz wszystkich danych response = b"" while True: try: s.settimeout(2) # Krótki timeout na odpowiedź chunk = s.recv(1024) if not chunk: break response += chunk except socket.timeout: break # Koniec danych return response.decode('utf-8', errors='ignore').strip() except Exception as e: logger.error(f"Błąd podczas komunikacji z {ip}: {e}") return None def get_printer_model(ip, port=ZEBRA_PORT): """Pobiera model drukarki.""" model_cmd = "! U1 getvar \"device.product_name\"\r\n" response = send_command_and_get_response(ip, model_cmd, port) if response and len(response) > 0: return response # Spróbuj alternatywnego polecenia alt_model_cmd = "~HI\r\n" response = send_command_and_get_response(ip, alt_model_cmd, port) if response: # Parsuj odpowiedź HI, która zawiera model i inne informacje lines = response.splitlines() for line in lines: if "MODEL" in line or "PRINTER" in line: return line.split(":", 1)[1].strip() if ":" in line else line.strip() return "Unknown" def verify_printer_settings(ip, port=ZEBRA_PORT): """Weryfikuje ustawienia drukarki.""" results = {"ip": ip, "status": "ERROR", "details": {}} try: # Sprawdź, czy drukarka odpowiada connection_test = send_command_and_get_response(ip, "! U1 getvar \"device.host_status\"\r\n", port) if connection_test is None: results["status"] = "BRAK ODPOWIEDZI" return results # Pobierz model drukarki model = get_printer_model(ip, port) results["model"] = model logger.info(f"Drukarka {ip}: Model {model}") # Wybierz odpowiednie komendy dla danego modelu commands = {} for param, details in VERIFY_PARAMS.items(): commands[param] = details["command"] # Zastąp komendą specyficzną dla modelu, jeśli istnieje if model in MODEL_COMMANDS and param in MODEL_COMMANDS[model]: commands[param] = MODEL_COMMANDS[model][param]["command"] # Sprawdź każdy parametr all_good = True for param, command in commands.items(): response = send_command_and_get_response(ip, command, port) expected = VERIFY_PARAMS[param]["expected"] description = VERIFY_PARAMS[param]["description"] # Jeśli nie ma odpowiedzi, spróbuj alternatywnego polecenia if response is None or response == "": if param == "ftp_status": # Alternatywne sprawdzenie FTP response = send_command_and_get_response(ip, "! U1 getvar \"ip.port\"\r\n", port) if response is None or response == "": response = "BRAK ODPOWIEDZI" # Usuń znaki nowej linii i spacje z odpowiedzi if response: response = response.strip() # Sprawdź, czy odpowiedź jest oczekiwana status = "OK" if response and expected in response.lower() else "BŁĄD" if status == "BŁĄD": all_good = False results["details"][param] = { "description": description, "value": response, "expected": expected, "status": status } logger.info(f"Drukarka {ip}, {description}: {status} (otrzymano: '{response}', oczekiwano: '{expected}')") results["status"] = "OK" if all_good else "CZĘŚCIOWE BŁĘDY" except Exception as e: logger.error(f"Błąd podczas weryfikacji drukarki {ip}: {e}") results["error"] = str(e) results["status"] = "BŁĄD" return results def read_ip_list(file_path): """Wczytuje listę adresów IP z pliku.""" try: with open(file_path, 'r') as f: ips = [line.strip() for line in f.readlines() if line.strip()] logger.info(f"Wczytano {len(ips)} adresów IP z pliku {file_path}") return ips except Exception as e: logger.error(f"Błąd podczas wczytywania pliku {file_path}: {e}") sys.exit(1) def format_results_table(results): """Formatuje wyniki weryfikacji w czytelną tabelę.""" headers = ["IP", "Model", "Status", "FTP", "SNMP"] # Określamy szerokość kolumn col_widths = [15, 15, 15, 10, 10] # Tworzymy nagłówek tabeli header_row = " | ".join(h.ljust(col_widths[i]) for i, h in enumerate(headers)) divider = "-" * len(header_row) table = [divider, header_row, divider] # Dodajemy wiersze danych for r in results: row_data = [ r["ip"].ljust(col_widths[0]), r.get("model", "Unknown").ljust(col_widths[1]), r["status"].ljust(col_widths[2]), r["details"].get("ftp_status", {}).get("status", "N/A").ljust(col_widths[3]), r["details"].get("snmp_status", {}).get("status", "N/A").ljust(col_widths[4]) ] table.append(" | ".join(row_data)) table.append(divider) return "\n".join(table) def main(): parser = argparse.ArgumentParser(description='Weryfikacja ustawień bezpieczeństwa drukarek Zebra') parser.add_argument('--file', metavar='FILE', required=True, help='Plik z listą adresów IP drukarek') parser.add_argument('--port', type=int, default=ZEBRA_PORT, help=f'Port drukarki (domyślnie {ZEBRA_PORT})') parser.add_argument('--output', metavar='FILE', help='Zapisz wyniki do pliku JSON') parser.add_argument('--summary', metavar='FILE', help='Zapisz podsumowanie w formacie tekstowym') parser.add_argument('--debug', action='store_true', help='Włącz szczegółowe logowanie') args = parser.parse_args() # Ustaw poziom logowania if args.debug: logger.setLevel(logging.DEBUG) # Wczytaj listę IP ip_list = read_ip_list(args.file) # Weryfikuj ustawienia drukarek all_results = [] logger.info(f"Rozpoczynam weryfikację {len(ip_list)} drukarek...") with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: future_to_ip = {executor.submit(verify_printer_settings, ip, args.port): ip for ip in ip_list} for future in concurrent.futures.as_completed(future_to_ip): ip = future_to_ip[future] try: result = future.result() all_results.append(result) except Exception as e: logger.error(f"Nieoczekiwany błąd dla {ip}: {e}") all_results.append({"ip": ip, "status": "BŁĄD KRYTYCZNY", "error": str(e)}) # Sortuj wyniki według IP all_results.sort(key=lambda x: x["ip"]) # Wyświetl podsumowanie num_ok = sum(1 for r in all_results if r["status"] == "OK") num_partial = sum(1 for r in all_results if r["status"] == "CZĘŚCIOWE BŁĘDY") num_error = sum(1 for r in all_results if r["status"] in ["BŁĄD", "BŁĄD KRYTYCZNY", "BRAK ODPOWIEDZI"]) logger.info(f"Weryfikacja zakończona: {num_ok} OK, {num_partial} częściowe błędy, {num_error} błędy/brak odpowiedzi") # Generuj tabelę wyników table = format_results_table(all_results) print("\nWyniki weryfikacji:") print(table) # Zapisz wyniki do pliku JSON if args.output: try: with open(args.output, 'w', encoding='utf-8') as f: json.dump(all_results, f, ensure_ascii=False, indent=2) logger.info(f"Zapisano szczegółowe wyniki do pliku {args.output}") except Exception as e: logger.error(f"Błąd podczas zapisywania do pliku {args.output}: {e}") # Zapisz podsumowanie w formacie tekstowym if args.summary: try: with open(args.summary, 'w', encoding='utf-8') as f: f.write(f"Podsumowanie weryfikacji drukarek Zebra - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n") f.write(f"Liczba drukarek: {len(all_results)}\n") f.write(f"Poprawnie skonfigurowane: {num_ok}\n") f.write(f"Częściowe błędy: {num_partial}\n") f.write(f"Błędy/brak odpowiedzi: {num_error}\n\n") f.write(table) logger.info(f"Zapisano podsumowanie do pliku {args.summary}") except Exception as e: logger.error(f"Błąd podczas zapisywania do pliku {args.summary}: {e}") return 0 if __name__ == "__main__": sys.exit(main())
Editor is loading...
Leave a Comment