Untitled
unknown
plain_text
4 months ago
20 kB
7
Indexable
#!/usr/bin/env python3
"""
Thermal Resistance Test - Single File with Auto-Resume
Measures resistance vs temperature during heating and cooling cycles.
FEATURES:
- Ctrl+C triggers emergency shutdown (turns everything OFF)
- Auto-resume - Just run script again to continue
- 20 Hz sampling during 3-minute holds
- Printer control with standard G-code
"""
import csv
import time
import json
import sys
import shutil
from pathlib import Path
from datetime import datetime
import serial
from pymeasure.instruments.keithley import Keithley2400
# ============================================================================
# TEST PARAMETERS - EDIT THESE TO CONFIGURE YOUR TEST
# ============================================================================
# Temperature profile (°C)
START_TEMP = 25
MAX_TEMP = 60
STEP_SIZE = 5
HOLD_MINUTES = 3
COOLDOWN_TEMP = 20
# Keithley settings
SOURCE_VOLTAGE = 1.0 # volts
COMPLIANCE_CURRENT = 0.01 # amps (10 mA)
WIRING = 2 # "2" or "4"
STABILIZATION_TIME = 2
# Measurement settings
SAMPLING_RATE = 20 # measurements per second
#HOLD_SECONDS = HOLD_MINUTES * 60
HOLD_SECONDS = 1
# Hardware connections
KEITHLEY_ADDR = "USB0::1510::9296::04564746::0::INSTR"
PRINTER_PORT = "/dev/tty.usbmodem11301"
PRINTER_BAUD = 115200
# Safety
MAX_SAFE_TEMP = 70
# ============================================================================
# GLOBALS (don't edit these)
# ============================================================================
folder = None
keithley = None
ser = None
test_start_time = None
current_phase = None
last_completed_temp = None
target_temp = None
# ============================================================================
# FILE MANAGEMENT
# ============================================================================
import time
import serial
from serial.tools import list_ports
def printer_autoconnect(baudrate=115200, timeout=0.1, ping_cmd="M105", wait_ok=["ok"]):
"""Scan available serial ports and connect to printer automatically"""
ports = list_ports.comports()
if not ports:
raise RuntimeError("No serial ports found!")
# Score likely ports first
ports = sorted(ports, key=lambda p: _score_printer_port(p), reverse=True)
for p in ports:
try:
ser = serial.Serial(p.device, baudrate, timeout=timeout)
time.sleep(2) # wait for board to boot/reset
# Ping loop
ser.reset_input_buffer()
ser.write(f"{ping_cmd}\n".encode())
ser.flush()
deadline = time.time() + 2.0
while time.time() < deadline:
if ser.in_waiting:
line = ser.readline().decode(errors="ignore").strip().lower()
if any(ok_word in line for ok_word in wait_ok):
return ser, p.device # success
ser.close()
except Exception:
continue
raise RuntimeError("Printer autoconnect failed")
def _score_printer_port(port):
"""Score likely printer ports for prioritization"""
score = 0
dev = port.device.lower()
desc = (port.description or "").lower()
import sys
if sys.platform.startswith("win"):
if "arduino" in desc:
score += 3
if "ch340" in desc or "usb serial" in desc:
score += 2
elif sys.platform.startswith("darwin"):
if "usb" in dev:
score += 3
if "modem" in dev or "serial" in dev:
score += 2
else: # linux
if "ttyusb" in dev or "ttyacm" in dev:
score += 3
return score
def get_experiment_folder():
"""Returns folder for this run - creates or resumes as needed"""
base_dir = Path("last_run")
if base_dir.exists() and base_dir.is_dir():
checkpoint = base_dir / "checkpoint.json"
if checkpoint.exists():
with open(checkpoint) as f:
data = json.load(f)
if data["status"] not in ["COMPLETE", "ABORTED"]:
print(f"\nFound incomplete test from {data['timestamp']}")
print(f"Last state: {data['phase']} at {data['last_completed_temp']}°C")
response = input("Resume this test? (Y/n): ").strip().lower()
if response in ['', 'y', 'yes']:
return base_dir
# Start fresh
if base_dir.exists():
archive_old_run(base_dir)
base_dir.mkdir(exist_ok=True)
return base_dir
def archive_old_run(folder):
"""Move old run to archive with timestamp"""
archive_dir = Path("archive")
archive_dir.mkdir(exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
for f in folder.glob("*"):
if f.is_file():
shutil.move(str(f), archive_dir / f"{timestamp}_{f.name}")
def save_checkpoint(status, phase, last_temp, next_temp, measurement_count):
"""Save minimal checkpoint"""
checkpoint = {
"status": status,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"phase": phase,
"last_completed_temp": last_temp,
"next_target_temp": next_temp,
"measurement_count": measurement_count
}
with open(folder / "checkpoint.json", "w") as f:
json.dump(checkpoint, f, indent=2)
log_event(f"Checkpoint saved after {last_temp}°C")
def log_event(message, level="INFO"):
"""Simple event logging"""
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
with open(folder / "events.log", "a") as f:
f.write(f"{timestamp} | {level:7} | {message}\n")
print(f"[{level}] {message}")
def init_csv():
"""Create CSV with headers if it doesn't exist"""
if not (folder / "data.csv").exists():
with open(folder / "data.csv", "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(["timestamp", "elapsed_seconds", "phase",
"setpoint_c", "actual_c", "resistance_ohms", "current_a"])
def save_measurement(setpoint, actual, resistance, current, phase):
"""Save single measurement to CSV"""
with open(folder / "data.csv", "a", newline="") as f:
writer = csv.writer(f)
writer.writerow([
datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3],
time.time() - test_start_time,
phase,
setpoint,
actual,
resistance,
current
])
def get_measurement_count():
"""Return number of rows in CSV (approx)"""
try:
with open(folder / "data.csv", "r") as f:
return sum(1 for _ in f) - 1 # subtract header
except:
return 0
def clean_partial_data(phase, next_target_temp):
"""Remove partial measurements for interrupted step only"""
csv_file = folder / "data.csv"
if not csv_file.exists():
return
log_event(f"Cleaning partial data for interrupted step {next_target_temp}°C ({phase})")
with open(csv_file, "r", newline="") as f:
reader = csv.reader(f)
header = next(reader)
rows = list(reader)
cleaned_rows = []
removed_count = 0
for row in rows:
try:
row_phase = row[2]
setpoint = float(row[3])
# Remove only rows belonging to interrupted step
if row_phase == phase and setpoint == next_target_temp:
removed_count += 1
else:
cleaned_rows.append(row)
except:
cleaned_rows.append(row)
with open(csv_file, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow(header)
writer.writerows(cleaned_rows)
log_event(f"Removed {removed_count} partial measurements")
# ============================================================================
# HARDWARE CONTROL
# ============================================================================
def connect_instruments():
"""Connect to Keithley and printer"""
global keithley, ser
log_event("Connecting to Keithley...")
try:
keithley = Keithley2400(KEITHLEY_ADDR)
keithley.apply_voltage(voltage_range=2, compliance_current=COMPLIANCE_CURRENT)
keithley.source_voltage = SOURCE_VOLTAGE
keithley.wires = WIRING
keithley.enable_source()
log_event("Keithley connected")
except Exception as e:
log_event(f"Keithley connection failed: {e}", "ERROR")
sys.exit(1)
log_event("Connecting to printer...")
try:
ser, _ = printer_autoconnect()
log_event("Waiting for printer to boot...")
time.sleep(10)
except Exception as e:
log_event(f"Printer connection failed: {e}", "ERROR")
sys.exit(1)
def set_bed_temp(temp):
"""Set printer bed temperature"""
ser.reset_input_buffer()
ser.write(f"M140 S{temp}\n".encode())
log_event(f"Bed set to {temp}°C")
def read_bed_temp():
"""Read current bed temperature from printer"""
ser.reset_input_buffer()
ser.write(b"M105\n")
start_time = time.time()
while time.time() - start_time < 5:
if not ser.in_waiting:
time.sleep(0.1)
continue
line = ser.readline().decode().strip()
if "B:" in line:
try:
# Parse B: value
b_index = line.find("B:")
temp_str = line[b_index+2:].split()[0]
return float(temp_str)
except:
continue
# Safety timer hit - no response from printer
log_event("Printer not responding to temperature query (safety timer?)", "CRITICAL")
emergency_shutdown()
return None
def wait_for_target_temp(target, phase):
"""Wait until bed reaches target temperature"""
log_event(f"Waiting for {target}°C...")
while True:
temp = read_bed_temp()
if temp is None:
log_event("Failed to read temperature - triggering emergency shutdown", "CRITICAL")
emergency_shutdown()
sys.exit(1)
# Safety check
if temp > MAX_SAFE_TEMP:
log_event(f"SAFETY: Temperature {temp}°C exceeds limit!", "CRITICAL")
emergency_shutdown()
sys.exit(1)
# Check if reached target
if phase == "heating" and temp >= target - 0.3:
log_event(f"Target {target}°C reached (actual: {temp:.1f}°C)")
return True
elif phase == "cooling" and temp <= target + 0.3:
log_event(f"Target {target}°C reached (actual: {temp:.1f}°C)")
return True
print(f" Current: {temp:.1f}°C / {target}°C", end='\r')
time.sleep(2)
def measure_at_temperature(setpoint, phase):
"""Take high-speed measurements for the hold duration"""
total_samples = int(SAMPLING_RATE * HOLD_SECONDS)
sample_interval = 1.0 / SAMPLING_RATE
log_event(f"Measuring at {setpoint}°C for {HOLD_SECONDS}s ({total_samples} samples @ {SAMPLING_RATE}Hz)")
samples_taken = 0
next_sample_time = time.perf_counter()
consecutive_errors = 0
while samples_taken < total_samples:
# Precise timing
now = time.perf_counter()
if now < next_sample_time:
time.sleep(next_sample_time - now)
sample_start = time.perf_counter()
try:
# Take measurements
resistance = keithley.resistance
current = keithley.current
actual_temp = read_bed_temp()
if actual_temp is None:
log_event("Temperature read failed during measurement - triggering emergency shutdown", "CRITICAL")
emergency_shutdown()
sys.exit(1)
# Save to V
save_measurement(setpoint, actual_temp, resistance, current, phase)
samples_taken += 1
consecutive_errors = 0
# Progress update every 10%
if samples_taken % (total_samples // 10) == 0:
pct = (samples_taken / total_samples) * 100
log_event(f" {pct:.0f}% ({samples_taken}/{total_samples})")
except Exception as e:
log_event(f"Measurement error: {e}", "ERROR")
consecutive_errors += 1
if consecutive_errors > 10:
log_event("Too many errors, aborting", "CRITICAL")
emergency_shutdown()
sys.exit(1)
# Schedule next sample
next_sample_time = sample_start + sample_interval
log_event(f"Completed {setpoint}°C measurement ({samples_taken} samples)")
return samples_taken
# ============================================================================
# SHUTDOWN FUNCTIONS
# ============================================================================
def emergency_shutdown():
# Save checkpoint only if test actually started
try:
set_bed_temp(0)
if folder and folder.exists() and current_phase is not None and target_temp is not None:
measurement_count = get_measurement_count()
save_checkpoint(
"PAUSED",
current_phase,
last_completed_temp if last_completed_temp is not None else -999,
target_temp,
measurement_count
)
log_event(f"Checkpoint saved - {measurement_count} measurements recorded")
else:
log_event("Shutdown occurred before test fully initialized - no checkpoint saved", "WARNING")
except Exception as e:
log_event(f"Failed to save checkpoint: {e}", "ERROR")
log_event(f"!!!VERIFY WITH OWN EYES THAT BED TEMPERATURE IS SET TO 0!!!!")
# ============================================================================
# MAIN TEST LOOP
# ============================================================================
def run_test():
global folder, test_start_time, current_phase, last_completed_temp, target_temp
print("="*60)
print("tét của huyền :)")
print("="*60)
print(f"Profile: {START_TEMP}°C → {MAX_TEMP}°C → {COOLDOWN_TEMP}°C ({STEP_SIZE}°C steps)")
print(f"Hold time: {HOLD_MINUTES} minutes @ {SAMPLING_RATE} Hz ({HOLD_MINUTES*60*SAMPLING_RATE} samples/step)")
print("="*60)
# Setup folders
folder = get_experiment_folder()
init_csv()
# Initialize state BEFORE connecting instruments
current_phase = None
last_completed_temp = None
target_temp = None
# Check for checkpoint to resume
checkpoint_file = folder / "checkpoint.json"
resume_phase = "heating"
resume_temp = START_TEMP
measurement_offset = 0
if checkpoint_file.exists():
with open(checkpoint_file) as f:
cp = json.load(f)
if cp["status"] in ["IN_PROGRESS", "PAUSED"]:
resume_phase = cp["phase"]
resume_temp = cp["next_target_temp"]
log_event(f"RESUMING test from {cp['last_completed_temp']}°C, next: {resume_temp}°C")
clean_partial_data(
resume_phase,
resume_temp
)
# Connect to instruments
connect_instruments()
test_start_time = time.time()
# ========== HEATING PHASE ==========
if resume_phase == "heating":
current_phase = "heating"
temps = list(range(START_TEMP, MAX_TEMP + 1, STEP_SIZE))
start_index = temps.index(resume_temp) if resume_temp in temps else 0
for temp in temps[start_index:]:
last_completed_temp = temp - STEP_SIZE if temp > START_TEMP else START_TEMP
target_temp = temp
print(f"\n--- HEATING to {temp}°C ---")
log_event(f"--- HEATING to {temp}°C ---")
set_bed_temp(temp)
if not wait_for_target_temp(temp, "heating"):
emergency_shutdown()
sys.exit(1)
# Stabilize briefly
log_event(f"Stabilizing for {STABILIZATION_TIME} seconds...")
time.sleep(STABILIZATION_TIME)
# Measure
samples = measure_at_temperature(temp, "heating")
# Update last completed temp after successful measurement
last_completed_temp = temp
# Checkpoint after each step
measurement_count = get_measurement_count()
save_checkpoint("IN_PROGRESS", "heating", temp,
temp + STEP_SIZE if temp < MAX_TEMP else temp,
measurement_count)
# ========== COOLING PHASE ==========
if resume_phase in ["heating", "cooling"]:
current_phase = "cooling"
# Determine starting point
if resume_phase == "heating":
start_cool = MAX_TEMP - STEP_SIZE
else:
start_cool = resume_temp
temps = list(range(MAX_TEMP - STEP_SIZE, COOLDOWN_TEMP - 1, -STEP_SIZE))
start_index = temps.index(start_cool) if start_cool in temps else 0
for temp in temps[start_index:]:
target_temp = temp
print(f"\n--- COOLING to {temp}°C ---")
log_event(f"--- COOLING to {temp}°C ---")
set_bed_temp(temp)
if not wait_for_target_temp(temp, "cooling"):
emergency_shutdown()
sys.exit(1)
# Stabilize briefly
log_event(f"Stabilizing for {STABILIZATION_TIME} seconds...")
time.sleep(10)
# Measure
samples = measure_at_temperature(temp, "cooling")
# Update last completed temp after successful measurement
last_completed_temp = temp
# Checkpoint after each step
measurement_count = get_measurement_count()
save_checkpoint("IN_PROGRESS", "cooling", temp,
temp - STEP_SIZE if temp > COOLDOWN_TEMP else temp,
measurement_count)
# ========== TEST COMPLETE ==========
log_event("TEST COMPLETE - Shutting down equipment")
measurement_count = get_measurement_count()
# Turn off equipment at end of successful test
try:
ser.write(b"M140 S0\n")
log_event("Bed heater turned OFF")
except:
pass
try:
keithley.disable_source()
log_event("Keithley output disabled")
except:
pass
save_checkpoint("COMPLETE", "done", COOLDOWN_TEMP, COOLDOWN_TEMP, measurement_count)
print("\n" + "="*60)
print(f"TEST COMPLETE - Data saved to {folder}/data.csv")
print(f"Total measurements: {measurement_count}")
print("="*60)
# ============================================================================
# ENTRY POINT
# ============================================================================
if __name__ == "__main__":
try:
run_test()
except KeyboardInterrupt:
print("\n\nCtrl+C detected - initiating emergency shutdown...")
emergency_shutdown()
sys.exit(1)
except Exception as e:
log_event(f"Unexpected error: {e}", "CRITICAL")
emergency_shutdown()
sys.exit(1)Editor is loading...
Leave a Comment