mass_check
unknown
python
9 months ago
11 kB
29
Indexable
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import socket
import sys
import argparse
import concurrent.futures
import logging
import os
import json
from datetime import datetime
# Konfiguracja loggera
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(f"zebra_verify_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Stałe
ZEBRA_PORT = 9100 # Standardowy port drukarek Zebra
TIMEOUT = 5 # Większy timeout dla zapytań weryfikacyjnych
MAX_WORKERS = 20 # Liczba równoległych zadań
# Parametry do sprawdzenia
VERIFY_PARAMS = {
"ftp_status": {
"command": "! U1 getvar \"ip.ftp.enable\"\r\n",
"expected": "off",
"description": "Status FTP"
},
"snmp_status": {
"command": "! U1 getvar \"ip.snmp.enable\"\r\n",
"expected": "off",
"description": "Status SNMP"
}
# Parametry obsługi haseł zostały usunięte
}
# Alternatywne polecenia dla różnych modeli drukarek
MODEL_COMMANDS = {
"GK420d": {
"ftp_status": {
"command": "! U1 getvar \"ip.ftp\"\r\n"
}
},
"ZT230": {
"snmp_status": {
"command": "! U1 getvar \"ip.snmp\"\r\n"
}
}
# Dodaj więcej modeli i komend
}
def send_command_and_get_response(ip, command, port=ZEBRA_PORT, timeout=TIMEOUT):
"""Wysyła polecenie ZPL do drukarki i zwraca odpowiedź."""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(timeout)
s.connect((ip, port))
s.sendall(command.encode('utf-8'))
# Odczytuj odpowiedź dopóki nie otrzymasz wszystkich danych
response = b""
while True:
try:
s.settimeout(2) # Krótki timeout na odpowiedź
chunk = s.recv(1024)
if not chunk:
break
response += chunk
except socket.timeout:
break # Koniec danych
return response.decode('utf-8', errors='ignore').strip()
except Exception as e:
logger.error(f"Błąd podczas komunikacji z {ip}: {e}")
return None
def get_printer_model(ip, port=ZEBRA_PORT):
"""Pobiera model drukarki."""
model_cmd = "! U1 getvar \"device.product_name\"\r\n"
response = send_command_and_get_response(ip, model_cmd, port)
if response and len(response) > 0:
return response
# Spróbuj alternatywnego polecenia
alt_model_cmd = "~HI\r\n"
response = send_command_and_get_response(ip, alt_model_cmd, port)
if response:
# Parsuj odpowiedź HI, która zawiera model i inne informacje
lines = response.splitlines()
for line in lines:
if "MODEL" in line or "PRINTER" in line:
return line.split(":", 1)[1].strip() if ":" in line else line.strip()
return "Unknown"
def verify_printer_settings(ip, port=ZEBRA_PORT):
"""Weryfikuje ustawienia drukarki."""
results = {"ip": ip, "status": "ERROR", "details": {}}
try:
# Sprawdź, czy drukarka odpowiada
connection_test = send_command_and_get_response(ip, "! U1 getvar \"device.host_status\"\r\n", port)
if connection_test is None:
results["status"] = "BRAK ODPOWIEDZI"
return results
# Pobierz model drukarki
model = get_printer_model(ip, port)
results["model"] = model
logger.info(f"Drukarka {ip}: Model {model}")
# Wybierz odpowiednie komendy dla danego modelu
commands = {}
for param, details in VERIFY_PARAMS.items():
commands[param] = details["command"]
# Zastąp komendą specyficzną dla modelu, jeśli istnieje
if model in MODEL_COMMANDS and param in MODEL_COMMANDS[model]:
commands[param] = MODEL_COMMANDS[model][param]["command"]
# Sprawdź każdy parametr
all_good = True
for param, command in commands.items():
response = send_command_and_get_response(ip, command, port)
expected = VERIFY_PARAMS[param]["expected"]
description = VERIFY_PARAMS[param]["description"]
# Jeśli nie ma odpowiedzi, spróbuj alternatywnego polecenia
if response is None or response == "":
if param == "ftp_status":
# Alternatywne sprawdzenie FTP
response = send_command_and_get_response(ip, "! U1 getvar \"ip.port\"\r\n", port)
if response is None or response == "":
response = "BRAK ODPOWIEDZI"
# Usuń znaki nowej linii i spacje z odpowiedzi
if response:
response = response.strip()
# Sprawdź, czy odpowiedź jest oczekiwana
status = "OK" if response and expected in response.lower() else "BŁĄD"
if status == "BŁĄD":
all_good = False
results["details"][param] = {
"description": description,
"value": response,
"expected": expected,
"status": status
}
logger.info(f"Drukarka {ip}, {description}: {status} (otrzymano: '{response}', oczekiwano: '{expected}')")
results["status"] = "OK" if all_good else "CZĘŚCIOWE BŁĘDY"
except Exception as e:
logger.error(f"Błąd podczas weryfikacji drukarki {ip}: {e}")
results["error"] = str(e)
results["status"] = "BŁĄD"
return results
def read_ip_list(file_path):
"""Wczytuje listę adresów IP z pliku."""
try:
with open(file_path, 'r') as f:
ips = [line.strip() for line in f.readlines() if line.strip()]
logger.info(f"Wczytano {len(ips)} adresów IP z pliku {file_path}")
return ips
except Exception as e:
logger.error(f"Błąd podczas wczytywania pliku {file_path}: {e}")
sys.exit(1)
def format_results_table(results):
"""Formatuje wyniki weryfikacji w czytelną tabelę."""
headers = ["IP", "Model", "Status", "FTP", "SNMP"]
# Określamy szerokość kolumn
col_widths = [15, 15, 15, 10, 10]
# Tworzymy nagłówek tabeli
header_row = " | ".join(h.ljust(col_widths[i]) for i, h in enumerate(headers))
divider = "-" * len(header_row)
table = [divider, header_row, divider]
# Dodajemy wiersze danych
for r in results:
row_data = [
r["ip"].ljust(col_widths[0]),
r.get("model", "Unknown").ljust(col_widths[1]),
r["status"].ljust(col_widths[2]),
r["details"].get("ftp_status", {}).get("status", "N/A").ljust(col_widths[3]),
r["details"].get("snmp_status", {}).get("status", "N/A").ljust(col_widths[4])
]
table.append(" | ".join(row_data))
table.append(divider)
return "\n".join(table)
def main():
parser = argparse.ArgumentParser(description='Weryfikacja ustawień bezpieczeństwa drukarek Zebra')
parser.add_argument('--file', metavar='FILE', required=True, help='Plik z listą adresów IP drukarek')
parser.add_argument('--port', type=int, default=ZEBRA_PORT, help=f'Port drukarki (domyślnie {ZEBRA_PORT})')
parser.add_argument('--output', metavar='FILE', help='Zapisz wyniki do pliku JSON')
parser.add_argument('--summary', metavar='FILE', help='Zapisz podsumowanie w formacie tekstowym')
parser.add_argument('--debug', action='store_true', help='Włącz szczegółowe logowanie')
args = parser.parse_args()
# Ustaw poziom logowania
if args.debug:
logger.setLevel(logging.DEBUG)
# Wczytaj listę IP
ip_list = read_ip_list(args.file)
# Weryfikuj ustawienia drukarek
all_results = []
logger.info(f"Rozpoczynam weryfikację {len(ip_list)} drukarek...")
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
future_to_ip = {executor.submit(verify_printer_settings, ip, args.port): ip for ip in ip_list}
for future in concurrent.futures.as_completed(future_to_ip):
ip = future_to_ip[future]
try:
result = future.result()
all_results.append(result)
except Exception as e:
logger.error(f"Nieoczekiwany błąd dla {ip}: {e}")
all_results.append({"ip": ip, "status": "BŁĄD KRYTYCZNY", "error": str(e)})
# Sortuj wyniki według IP
all_results.sort(key=lambda x: x["ip"])
# Wyświetl podsumowanie
num_ok = sum(1 for r in all_results if r["status"] == "OK")
num_partial = sum(1 for r in all_results if r["status"] == "CZĘŚCIOWE BŁĘDY")
num_error = sum(1 for r in all_results if r["status"] in ["BŁĄD", "BŁĄD KRYTYCZNY", "BRAK ODPOWIEDZI"])
logger.info(f"Weryfikacja zakończona: {num_ok} OK, {num_partial} częściowe błędy, {num_error} błędy/brak odpowiedzi")
# Generuj tabelę wyników
table = format_results_table(all_results)
print("\nWyniki weryfikacji:")
print(table)
# Zapisz wyniki do pliku JSON
if args.output:
try:
with open(args.output, 'w', encoding='utf-8') as f:
json.dump(all_results, f, ensure_ascii=False, indent=2)
logger.info(f"Zapisano szczegółowe wyniki do pliku {args.output}")
except Exception as e:
logger.error(f"Błąd podczas zapisywania do pliku {args.output}: {e}")
# Zapisz podsumowanie w formacie tekstowym
if args.summary:
try:
with open(args.summary, 'w', encoding='utf-8') as f:
f.write(f"Podsumowanie weryfikacji drukarek Zebra - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
f.write(f"Liczba drukarek: {len(all_results)}\n")
f.write(f"Poprawnie skonfigurowane: {num_ok}\n")
f.write(f"Częściowe błędy: {num_partial}\n")
f.write(f"Błędy/brak odpowiedzi: {num_error}\n\n")
f.write(table)
logger.info(f"Zapisano podsumowanie do pliku {args.summary}")
except Exception as e:
logger.error(f"Błąd podczas zapisywania do pliku {args.summary}: {e}")
return 0
if __name__ == "__main__":
sys.exit(main())Editor is loading...
Leave a Comment