Untitled

 avatar
unknown
plain_text
4 months ago
20 kB
8
Indexable
#!/usr/bin/env python3
"""
Thermal Resistance Test - Single File with Auto-Resume
Measures resistance vs temperature during heating and cooling cycles.

FEATURES:
- Ctrl+C triggers emergency shutdown (turns everything OFF)
- Auto-resume - Just run script again to continue
- 20 Hz sampling during 3-minute holds
- Printer control with standard G-code
"""

import csv
import time
import json
import sys
import shutil
from pathlib import Path
from datetime import datetime
import serial
from pymeasure.instruments.keithley import Keithley2400

# ============================================================================
# TEST PARAMETERS - EDIT THESE TO CONFIGURE YOUR TEST
# ============================================================================

# Temperature profile (°C)
START_TEMP = 25
MAX_TEMP = 60
STEP_SIZE = 5
HOLD_MINUTES = 3
COOLDOWN_TEMP = 20

# Keithley settings
SOURCE_VOLTAGE = 1.0           # volts
COMPLIANCE_CURRENT = 0.01       # amps (10 mA)
WIRING = 2                    # "2" or "4"
STABILIZATION_TIME = 2

# Measurement settings
SAMPLING_RATE = 20              # measurements per second
#HOLD_SECONDS = HOLD_MINUTES * 60
HOLD_SECONDS = 1

# Hardware connections
KEITHLEY_ADDR = "USB0::1510::9296::04564746::0::INSTR"
PRINTER_PORT = "/dev/tty.usbmodem11301"
PRINTER_BAUD = 115200

# Safety
MAX_SAFE_TEMP = 70

# ============================================================================
# GLOBALS (don't edit these)
# ============================================================================
folder = None
keithley = None
ser = None
test_start_time = None
current_phase = None
last_completed_temp = None
target_temp = None

# ============================================================================
# FILE MANAGEMENT
# ============================================================================
import time
import serial
from serial.tools import list_ports

def printer_autoconnect(baudrate=115200, timeout=0.1, ping_cmd="M105", wait_ok=["ok"]):
    """Scan available serial ports and connect to printer automatically"""
    ports = list_ports.comports()
    if not ports:
        raise RuntimeError("No serial ports found!")

    # Score likely ports first
    ports = sorted(ports, key=lambda p: _score_printer_port(p), reverse=True)

    for p in ports:
        try:
            ser = serial.Serial(p.device, baudrate, timeout=timeout)
            time.sleep(2)  # wait for board to boot/reset

            # Ping loop
            ser.reset_input_buffer()
            ser.write(f"{ping_cmd}\n".encode())
            ser.flush()
            deadline = time.time() + 2.0
            while time.time() < deadline:
                if ser.in_waiting:
                    line = ser.readline().decode(errors="ignore").strip().lower()
                    if any(ok_word in line for ok_word in wait_ok):
                        return ser, p.device  # success
            ser.close()
        except Exception:
            continue

    raise RuntimeError("Printer autoconnect failed")

def _score_printer_port(port):
    """Score likely printer ports for prioritization"""
    score = 0
    dev = port.device.lower()
    desc = (port.description or "").lower()
    import sys

    if sys.platform.startswith("win"):
        if "arduino" in desc:
            score += 3
        if "ch340" in desc or "usb serial" in desc:
            score += 2
    elif sys.platform.startswith("darwin"):
        if "usb" in dev:
            score += 3
        if "modem" in dev or "serial" in dev:
            score += 2
    else:  # linux
        if "ttyusb" in dev or "ttyacm" in dev:
            score += 3
    return score

def get_experiment_folder():
    """Returns folder for this run - creates or resumes as needed"""
    base_dir = Path("last_run")
    
    if base_dir.exists() and base_dir.is_dir():
        checkpoint = base_dir / "checkpoint.json"
        if checkpoint.exists():
            with open(checkpoint) as f:
                data = json.load(f)
            if data["status"] not in ["COMPLETE", "ABORTED"]:
                print(f"\nFound incomplete test from {data['timestamp']}")
                print(f"Last state: {data['phase']} at {data['last_completed_temp']}°C")
                response = input("Resume this test? (Y/n): ").strip().lower()
                if response in ['', 'y', 'yes']:
                    return base_dir
    # Start fresh
    if base_dir.exists():
        archive_old_run(base_dir)
    base_dir.mkdir(exist_ok=True)
    return base_dir

def archive_old_run(folder):
    """Move old run to archive with timestamp"""
    archive_dir = Path("archive")
    archive_dir.mkdir(exist_ok=True)
    
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    for f in folder.glob("*"):
        if f.is_file():
            shutil.move(str(f), archive_dir / f"{timestamp}_{f.name}")

def save_checkpoint(status, phase, last_temp, next_temp, measurement_count):
    """Save minimal checkpoint"""
    checkpoint = {
        "status": status,
        "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        "phase": phase,
        "last_completed_temp": last_temp,
        "next_target_temp": next_temp,
        "measurement_count": measurement_count
    }
    with open(folder / "checkpoint.json", "w") as f:
        json.dump(checkpoint, f, indent=2)
    log_event(f"Checkpoint saved after {last_temp}°C")

def log_event(message, level="INFO"):
    """Simple event logging"""
    timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    with open(folder / "events.log", "a") as f:
        f.write(f"{timestamp} | {level:7} | {message}\n")
    print(f"[{level}] {message}")

def init_csv():
    """Create CSV with headers if it doesn't exist"""
    if not (folder / "data.csv").exists():
        with open(folder / "data.csv", "w", newline="") as f:
            writer = csv.writer(f)
            writer.writerow(["timestamp", "elapsed_seconds", "phase", 
                           "setpoint_c", "actual_c", "resistance_ohms", "current_a"])

def save_measurement(setpoint, actual, resistance, current, phase):
    """Save single measurement to CSV"""
    with open(folder / "data.csv", "a", newline="") as f:
        writer = csv.writer(f)
        writer.writerow([
            datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3],
            time.time() - test_start_time,
            phase,
            setpoint,
            actual,
            resistance,
            current
        ])

def get_measurement_count():
    """Return number of rows in CSV (approx)"""
    try:
        with open(folder / "data.csv", "r") as f:
            return sum(1 for _ in f) - 1  # subtract header
    except:
        return 0

def clean_partial_data(phase, next_target_temp):
    """Remove partial measurements for interrupted step only"""
    csv_file = folder / "data.csv"
    if not csv_file.exists():
        return

    log_event(f"Cleaning partial data for interrupted step {next_target_temp}°C ({phase})")

    with open(csv_file, "r", newline="") as f:
        reader = csv.reader(f)
        header = next(reader)
        rows = list(reader)

    cleaned_rows = []
    removed_count = 0

    for row in rows:
        try:
            row_phase = row[2]
            setpoint = float(row[3])

            # Remove only rows belonging to interrupted step
            if row_phase == phase and setpoint == next_target_temp:
                removed_count += 1
            else:
                cleaned_rows.append(row)

        except:
            cleaned_rows.append(row)

    with open(csv_file, "w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(header)
        writer.writerows(cleaned_rows)

    log_event(f"Removed {removed_count} partial measurements")
# ============================================================================
# HARDWARE CONTROL
# ============================================================================

def connect_instruments():
    """Connect to Keithley and printer"""
    global keithley, ser
    
    log_event("Connecting to Keithley...")
    try:
        keithley = Keithley2400(KEITHLEY_ADDR)
        keithley.apply_voltage(voltage_range=2, compliance_current=COMPLIANCE_CURRENT)
        keithley.source_voltage = SOURCE_VOLTAGE
        keithley.wires = WIRING
        keithley.enable_source()
        log_event("Keithley connected")
    except Exception as e:
        log_event(f"Keithley connection failed: {e}", "ERROR")
        sys.exit(1)
    
    log_event("Connecting to printer...")
    try:
        ser, _ = printer_autoconnect()
        log_event("Waiting for printer to boot...")
        time.sleep(10)
        
    except Exception as e:
        log_event(f"Printer connection failed: {e}", "ERROR")
        sys.exit(1)

def set_bed_temp(temp):
    """Set printer bed temperature"""
    ser.reset_input_buffer()
    ser.write(f"M140 S{temp}\n".encode())
    log_event(f"Bed set to {temp}°C")

def read_bed_temp():
    """Read current bed temperature from printer"""
    ser.reset_input_buffer()
    ser.write(b"M105\n")
    
    start_time = time.time()
    while time.time() - start_time < 5:
        if not ser.in_waiting:
            time.sleep(0.1)
            continue
            
        line = ser.readline().decode().strip()
        if "B:" in line:
            try:
                # Parse B: value
                b_index = line.find("B:")
                temp_str = line[b_index+2:].split()[0]
                return float(temp_str)
            except:
                continue
    
    # Safety timer hit - no response from printer
    log_event("Printer not responding to temperature query (safety timer?)", "CRITICAL")
    emergency_shutdown()
    return None

def wait_for_target_temp(target, phase):
    """Wait until bed reaches target temperature"""
    log_event(f"Waiting for {target}°C...")
    while True:
        temp = read_bed_temp()
        if temp is None:
            log_event("Failed to read temperature - triggering emergency shutdown", "CRITICAL")
            emergency_shutdown()
            sys.exit(1)
        
        # Safety check
        if temp > MAX_SAFE_TEMP:
            log_event(f"SAFETY: Temperature {temp}°C exceeds limit!", "CRITICAL")
            emergency_shutdown()
            sys.exit(1)
        
        # Check if reached target
        if phase == "heating" and temp >= target - 0.3:
            log_event(f"Target {target}°C reached (actual: {temp:.1f}°C)")
            return True
        elif phase == "cooling" and temp <= target + 0.3:
            log_event(f"Target {target}°C reached (actual: {temp:.1f}°C)")
            return True
        
        print(f"  Current: {temp:.1f}°C / {target}°C", end='\r')
        time.sleep(2)

def measure_at_temperature(setpoint, phase):
    """Take high-speed measurements for the hold duration"""
    total_samples = int(SAMPLING_RATE * HOLD_SECONDS)
    sample_interval = 1.0 / SAMPLING_RATE
    
    log_event(f"Measuring at {setpoint}°C for {HOLD_SECONDS}s ({total_samples} samples @ {SAMPLING_RATE}Hz)")
    
    samples_taken = 0
    next_sample_time = time.perf_counter()
    consecutive_errors = 0
    
    while samples_taken < total_samples:
        # Precise timing
        now = time.perf_counter()
        if now < next_sample_time:
            time.sleep(next_sample_time - now)
        
        sample_start = time.perf_counter()
        
        try:
            # Take measurements
            resistance = keithley.resistance
            current = keithley.current
            actual_temp = read_bed_temp()
            
            if actual_temp is None:
                log_event("Temperature read failed during measurement - triggering emergency shutdown", "CRITICAL")
                emergency_shutdown()
                sys.exit(1)
            
            # Save to V
            save_measurement(setpoint, actual_temp, resistance, current, phase)
            
            samples_taken += 1
            consecutive_errors = 0
            
            # Progress update every 10%
            if samples_taken % (total_samples // 10) == 0:
                pct = (samples_taken / total_samples) * 100
                log_event(f"  {pct:.0f}% ({samples_taken}/{total_samples})")
            
        except Exception as e:
            log_event(f"Measurement error: {e}", "ERROR")
            consecutive_errors += 1
            if consecutive_errors > 10:
                log_event("Too many errors, aborting", "CRITICAL")
                emergency_shutdown()
                sys.exit(1)
        
        # Schedule next sample
        next_sample_time = sample_start + sample_interval 
    
    log_event(f"Completed {setpoint}°C measurement ({samples_taken} samples)")
    return samples_taken

# ============================================================================
# SHUTDOWN FUNCTIONS
# ============================================================================

def emergency_shutdown():
    # Save checkpoint only if test actually started
    try:
        set_bed_temp(0)
        if folder and folder.exists() and current_phase is not None and target_temp is not None:
            measurement_count = get_measurement_count()
            save_checkpoint(
                "PAUSED",
                current_phase,
                last_completed_temp if last_completed_temp is not None else -999,
                target_temp,
                measurement_count
            )
            log_event(f"Checkpoint saved - {measurement_count} measurements recorded")
        else:
            log_event("Shutdown occurred before test fully initialized - no checkpoint saved", "WARNING")
    except Exception as e:
        log_event(f"Failed to save checkpoint: {e}", "ERROR")
        log_event(f"!!!VERIFY WITH OWN EYES THAT BED TEMPERATURE IS SET TO 0!!!!")
# ============================================================================
# MAIN TEST LOOP
# ============================================================================

def run_test():
    global folder, test_start_time, current_phase, last_completed_temp, target_temp
    
    print("="*60)
    print("tét của huyền :)")
    print("="*60)
    print(f"Profile: {START_TEMP}°C → {MAX_TEMP}°C → {COOLDOWN_TEMP}°C ({STEP_SIZE}°C steps)")
    print(f"Hold time: {HOLD_MINUTES} minutes @ {SAMPLING_RATE} Hz ({HOLD_MINUTES*60*SAMPLING_RATE} samples/step)")
    print("="*60)
    
    # Setup folders
    folder = get_experiment_folder()
    init_csv()
    # Initialize state BEFORE connecting instruments
    current_phase = None
    last_completed_temp = None
    target_temp = None
    
    # Check for checkpoint to resume
    checkpoint_file = folder / "checkpoint.json"
    resume_phase = "heating"
    resume_temp = START_TEMP
    measurement_offset = 0
    
    if checkpoint_file.exists():
        with open(checkpoint_file) as f:
            cp = json.load(f)

        if cp["status"] in ["IN_PROGRESS", "PAUSED"]:
            resume_phase = cp["phase"]
            resume_temp = cp["next_target_temp"]

            log_event(f"RESUMING test from {cp['last_completed_temp']}°C, next: {resume_temp}°C")

            clean_partial_data(
                resume_phase,
                resume_temp
            )
    
    # Connect to instruments
    connect_instruments()
    test_start_time = time.time()
    
    # ========== HEATING PHASE ==========
    if resume_phase == "heating":
        current_phase = "heating"
        temps = list(range(START_TEMP, MAX_TEMP + 1, STEP_SIZE))
        start_index = temps.index(resume_temp) if resume_temp in temps else 0
        
        for temp in temps[start_index:]:
            last_completed_temp = temp - STEP_SIZE if temp > START_TEMP else START_TEMP
            target_temp = temp
            
            print(f"\n--- HEATING to {temp}°C ---")
            log_event(f"--- HEATING to {temp}°C ---")
            set_bed_temp(temp)
            
            if not wait_for_target_temp(temp, "heating"):
                emergency_shutdown()
                sys.exit(1)
            
            # Stabilize briefly
            log_event(f"Stabilizing for {STABILIZATION_TIME} seconds...")
            time.sleep(STABILIZATION_TIME)
            
            # Measure
            samples = measure_at_temperature(temp, "heating")
            
            # Update last completed temp after successful measurement
            last_completed_temp = temp
            
            # Checkpoint after each step
            measurement_count = get_measurement_count()
            save_checkpoint("IN_PROGRESS", "heating", temp, 
                          temp + STEP_SIZE if temp < MAX_TEMP else temp, 
                          measurement_count)
    
    # ========== COOLING PHASE ==========
    if resume_phase in ["heating", "cooling"]:
        current_phase = "cooling"
        
        # Determine starting point
        if resume_phase == "heating":
            start_cool = MAX_TEMP - STEP_SIZE
        else:
            start_cool = resume_temp
        
        temps = list(range(MAX_TEMP - STEP_SIZE, COOLDOWN_TEMP - 1, -STEP_SIZE))
        start_index = temps.index(start_cool) if start_cool in temps else 0
        
        for temp in temps[start_index:]:
            target_temp = temp
            
            print(f"\n--- COOLING to {temp}°C ---")
            log_event(f"--- COOLING to {temp}°C ---")
            set_bed_temp(temp)
            
            if not wait_for_target_temp(temp, "cooling"):
                emergency_shutdown()
                sys.exit(1)
            
            # Stabilize briefly
            log_event(f"Stabilizing for {STABILIZATION_TIME} seconds...")
            time.sleep(10)
            
            # Measure
            samples = measure_at_temperature(temp, "cooling")
            
            # Update last completed temp after successful measurement
            last_completed_temp = temp
            
            # Checkpoint after each step
            measurement_count = get_measurement_count()
            save_checkpoint("IN_PROGRESS", "cooling", temp,
                          temp - STEP_SIZE if temp > COOLDOWN_TEMP else temp,
                          measurement_count)
    
    # ========== TEST COMPLETE ==========
    log_event("TEST COMPLETE - Shutting down equipment")
    measurement_count = get_measurement_count()
    
    # Turn off equipment at end of successful test
    try:
        ser.write(b"M140 S0\n")
        log_event("Bed heater turned OFF")
    except:
        pass
    
    try:
        keithley.disable_source()
        log_event("Keithley output disabled")
    except:
        pass
    
    save_checkpoint("COMPLETE", "done", COOLDOWN_TEMP, COOLDOWN_TEMP, measurement_count)
    
    print("\n" + "="*60)
    print(f"TEST COMPLETE - Data saved to {folder}/data.csv")
    print(f"Total measurements: {measurement_count}")
    print("="*60)

# ============================================================================
# ENTRY POINT
# ============================================================================

if __name__ == "__main__":
    try:
        run_test()
    except KeyboardInterrupt:
        print("\n\nCtrl+C detected - initiating emergency shutdown...")
        emergency_shutdown()
        sys.exit(1)
    except Exception as e:
        log_event(f"Unexpected error: {e}", "CRITICAL")
        emergency_shutdown()
        sys.exit(1)
Editor is loading...
Leave a Comment