mass_config
unknown
python
8 months ago
13 kB
21
Indexable
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import socket
import sys
import ipaddress
import argparse
import concurrent.futures
import logging
import os
import time
from datetime import datetime
# Konfiguracja loggera
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(f"zebra_config_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
# Stałe
ZEBRA_PORT = 9100 # Standardowy port drukarek Zebra
TIMEOUT = 2 # Timeout w sekundach
MAX_WORKERS = 50 # Maksymalna liczba wątków
# Komendy ZPL do konfiguracji drukarek
ZPL_COMMANDS = {
# Podstawowe zabezpieczenia
"disable_ftp": "! U1 setvar \"ip.ftp.enable\" \"off\"\r\n",
"disable_snmp": "! U1 setvar \"ip.snmp.enable\" \"off\"\r\n",
"disable_telnet": "! U1 setvar \"ip.telnet.enable\" \"off\"\r\n",
}
# Dodatkowe komendy dla poszczególnych modeli drukarek
MODEL_COMMANDS = {
"GK": {},
"ZT": {},
"ZD": {},
"ZQ": {}
}
def is_zebra_printer(ip, port=ZEBRA_PORT, timeout=TIMEOUT):
"""Sprawdza, czy pod danym adresem IP i portem znajduje się drukarka Zebra."""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(timeout)
s.connect((ip, port))
# Metoda 1: Wysyłamy zapytanie ~HQES, które zwraca informacje o drukarce
s.sendall(b"~HQES\r\n")
try:
response = s.recv(1024)
if b"ZEBRA" in response or b"ZPL" in response:
return True
except socket.timeout:
# Niektóre drukarki mogą nie odpowiadać na to zapytanie
pass
# Metoda 2: Sprawdź, czy port 9100 jest otwarty i przyjmuje połączenia
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as test_socket:
test_socket.settimeout(timeout)
test_socket.connect((ip, port))
# Jeśli udało się połączyć, to prawdopodobnie jest to drukarka
logger.info(f"Wykryto urządzenie z otwartym portem {port} pod adresem {ip}")
return True
except:
pass
return False
except (socket.timeout, ConnectionRefusedError, OSError) as e:
logger.debug(f"Nie można połączyć się z {ip}:{port} - {str(e)}")
return False
def detect_printer_model(ip, port=ZEBRA_PORT, timeout=TIMEOUT):
"""Wykrywa model drukarki Zebra."""
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(timeout)
s.connect((ip, port))
# Próba uzyskania informacji o drukarce
commands = [
b"~HI\r\n", # Host Information
b"! U1 getvar \"device.host_status\"\r\n", # SGD komenda do uzyskania statusu
b"! U1 getvar \"device.product_name\"\r\n" # SGD komenda do uzyskania nazwy produktu
]
model = "Unknown"
for cmd in commands:
try:
s.sendall(cmd)
response = s.recv(1024)
# Próba identyfikacji modelu na podstawie odpowiedzi
response_str = response.decode('utf-8', errors='ignore')
logger.debug(f"Odpowiedź od {ip} na komendę {cmd}: {response_str}")
for prefix in ["ZT", "GK", "ZD", "ZQ", "GX"]:
if prefix in response_str:
model = prefix
return model
except:
continue
return model
except Exception as e:
logger.error(f"Błąd podczas wykrywania modelu drukarki {ip}: {e}")
return "Unknown"
def send_zpl_commands(ip, commands, port=ZEBRA_PORT, timeout=TIMEOUT):
"""Wysyła komendy ZPL do drukarki."""
try:
# Najpierw sprawdź model drukarki
model_prefix = detect_printer_model(ip, port, timeout)
logger.info(f"Wykryto drukarkę {model_prefix} pod adresem {ip}")
# Wybierz komendy specyficzne dla modelu (jeśli istnieją)
model_specific_commands = {}
for model_key in MODEL_COMMANDS:
if model_prefix.startswith(model_key):
model_specific_commands = MODEL_COMMANDS[model_key]
logger.info(f"Używam komend specyficznych dla modelu {model_key}")
break
# Łączymy wszystkie komendy - najpierw ogólne, potem specyficzne dla modelu
all_commands = {**commands, **model_specific_commands}
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(timeout)
s.connect((ip, port))
logger.debug(f"Nawiązano połączenie z {ip}:{port}")
for cmd_name, cmd in all_commands.items():
try:
s.sendall(cmd.encode('utf-8'))
logger.info(f"Wysłano komendę {cmd_name} do {ip}")
time.sleep(0.5) # Krótka pauza między komendami
# Próba odczytu odpowiedzi
try:
s.settimeout(1)
response = s.recv(1024)
logger.debug(f"Odpowiedź od {ip} na komendę {cmd_name}: {response}")
except socket.timeout:
pass # Brak odpowiedzi to normalne zachowanie
except Exception as e:
logger.warning(f"Błąd podczas wysyłania komendy {cmd_name} do {ip}: {e}")
# Wyślij komendę resetującą na koniec
try:
reset_cmd = "! U1 do \"device.reset\" \"now\"\r\n"
s.sendall(reset_cmd.encode('utf-8'))
logger.info(f"Wysłano komendę resetującą do {ip}")
except Exception as e:
logger.warning(f"Błąd podczas wysyłania komendy resetującej do {ip}: {e}")
return True
except Exception as e:
logger.error(f"Błąd podczas nawiązywania połączenia z {ip}:{port} - {e}")
return False
def scan_network(network, port=ZEBRA_PORT):
"""Skanuje podaną sieć w poszukiwaniu drukarek Zebra."""
printers = []
network = ipaddress.IPv4Network(network)
total_ips = network.num_addresses
logger.info(f"Rozpoczęto skanowanie sieci {network}, liczba adresów: {total_ips}")
if total_ips > 1000:
logger.warning(f"Skanowanie dużej sieci ({total_ips} adresów) może zająć dużo czasu!")
# Użycie puli wątków do równoległego skanowania
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
future_to_ip = {executor.submit(is_zebra_printer, str(ip), port): str(ip) for ip in network.hosts()}
for i, future in enumerate(concurrent.futures.as_completed(future_to_ip)):
ip = future_to_ip[future]
if i % 50 == 0:
logger.info(f"Zeskanowano {i}/{total_ips} adresów...")
try:
if future.result():
logger.info(f"Znaleziono drukarkę Zebra pod adresem {ip}")
printers.append(ip)
except Exception as e:
logger.error(f"Błąd podczas skanowania {ip}: {e}")
logger.info(f"Zakończono skanowanie. Znaleziono {len(printers)} drukarek Zebra.")
return printers
def read_ip_list(file_path):
"""Wczytuje listę adresów IP z pliku."""
try:
with open(file_path, 'r') as f:
ips = [line.strip() for line in f.readlines() if line.strip()]
logger.info(f"Wczytano {len(ips)} adresów IP z pliku {file_path}")
return ips
except Exception as e:
logger.error(f"Błąd podczas wczytywania pliku {file_path}: {e}")
sys.exit(1)
def verify_printers(ip_list, port=ZEBRA_PORT):
"""Weryfikuje, które z podanych adresów IP są drukarkami Zebra."""
verified_printers = []
logger.info(f"Weryfikowanie {len(ip_list)} adresów IP...")
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
future_to_ip = {executor.submit(is_zebra_printer, ip, port): ip for ip in ip_list}
for future in concurrent.futures.as_completed(future_to_ip):
ip = future_to_ip[future]
try:
if future.result():
logger.info(f"Zweryfikowano drukarkę Zebra pod adresem {ip}")
verified_printers.append(ip)
else:
logger.warning(f"Adres {ip} nie jest drukarką Zebra lub jest niedostępny")
except Exception as e:
logger.error(f"Błąd podczas weryfikacji {ip}: {e}")
logger.info(f"Zweryfikowano {len(verified_printers)} drukarek Zebra.")
return verified_printers
def configure_printers(printers, commands=None):
"""Konfiguruje drukarki Zebra wysyłając komendy ZPL."""
if commands is None:
commands = ZPL_COMMANDS
success_count = 0
logger.info(f"Rozpoczęto konfigurację {len(printers)} drukarek...")
with concurrent.futures.ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
future_to_ip = {executor.submit(send_zpl_commands, ip, commands): ip for ip in printers}
for future in concurrent.futures.as_completed(future_to_ip):
ip = future_to_ip[future]
try:
if future.result():
logger.info(f"Konfiguracja drukarki {ip} zakończona powodzeniem")
success_count += 1
else:
logger.warning(f"Konfiguracja drukarki {ip} nie powiodła się")
except Exception as e:
logger.error(f"Błąd podczas konfiguracji {ip}: {e}")
logger.info(f"Konfiguracja zakończona. Pomyślnie skonfigurowano {success_count}/{len(printers)} drukarek.")
return success_count
def main():
parser = argparse.ArgumentParser(description='Skrypt do masowej konfiguracji drukarek Zebra')
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('--scan', metavar='NETWORK', help='Skanuj podaną sieć (np. 192.168.1.0/24)')
group.add_argument('--file', metavar='FILE', help='Plik z listą adresów IP drukarek')
parser.add_argument('--port', type=int, default=ZEBRA_PORT, help=f'Port drukarki (domyślnie {ZEBRA_PORT})')
parser.add_argument('--verify-only', action='store_true', help='Tylko weryfikuj drukarki bez konfiguracji')
parser.add_argument('--output', metavar='FILE', help='Zapisz znalezione drukarki do pliku')
parser.add_argument('--force', action='store_true', help='Pomiń weryfikację i traktuj wszystkie adresy jako drukarki Zebra')
parser.add_argument('--debug', action='store_true', help='Włącz szczegółowe logowanie do debugowania')
args = parser.parse_args()
# Ustawienie poziomu logowania
if args.debug:
logger.setLevel(logging.DEBUG)
logger.debug("Włączono tryb debugowania")
# Znajdź drukarki
if args.scan:
printers = scan_network(args.scan, args.port)
else: # args.file
ip_list = read_ip_list(args.file)
if args.force:
logger.info(f"Tryb wymuszony - traktowanie wszystkich {len(ip_list)} adresów jako drukarki Zebra")
printers = ip_list
else:
printers = verify_printers(ip_list, args.port)
# Zapisz listę drukarek do pliku, jeśli podano argument --output
if args.output and printers:
try:
with open(args.output, 'w') as f:
for ip in printers:
f.write(f"{ip}\n")
logger.info(f"Zapisano listę {len(printers)} drukarek do pliku {args.output}")
except Exception as e:
logger.error(f"Błąd podczas zapisywania do pliku {args.output}: {e}")
# Konfiguruj drukarki, jeśli nie wybrano tylko weryfikacji
if not args.verify_only and printers:
configure_printers(printers)
if not printers:
logger.warning("Nie znaleziono żadnych drukarek Zebra.")
return 0
if __name__ == "__main__":
sys.exit(main())Editor is loading...
Leave a Comment