Untitled
unknown
plain_text
5 days ago
17 kB
5
Indexable
""" LOINC Mapper Module This module provides utilities for mapping lab test names to LOINC codes using fuzzy matching. It includes functions for creating and using an enhanced index for fast and accurate matching. """ import pandas as pd from fuzzywuzzy import process import os from pathlib import Path from typing import Optional, Dict, Any, List import concurrent.futures from concurrent.futures import ProcessPoolExecutor import multiprocessing import json import time from app.utils.timing import timing_decorator from app.utils.logger import logger from app.config.constants import LOINC_CSV_PATH, LOINC_INDEX_PATH @timing_decorator def find_loinc_code( test_name: str, loinc_path: str, threshold: int = 80 ) -> Optional[Dict[str, Any]]: """ Find the closest LOINC match for a given test name using fuzzy matching. Args: test_name: Name of the test to find loinc_path: Path to the LOINC CSV file threshold: Minimum match score threshold Returns: Dictionary with LOINC code details if a match is found, None otherwise """ loinc_data = pd.read_csv(loinc_path, low_memory=False) test_names = loinc_data["COMPONENT"].tolist() match, score = process.extractOne(test_name, test_names) if score >= threshold: loinc_row = loinc_data[loinc_data["COMPONENT"] == match].iloc[0] result = { "LOINC_CODE": loinc_row["LOINC_NUM"], "TEST_NAME": loinc_row["COMPONENT"], "MATCH_SCORE": score, } else: result = None logger.info(f"LOINC code lookup for '{test_name}' completed") return result def process_test_names( test_cases: Dict[str, str], loinc_path: str, threshold: int = 80 ) -> Dict[str, Dict[str, Any]]: """ Process multiple test names in parallel Args: test_cases: Dictionary of test cases to process loinc_path: Path to the LOINC CSV file threshold: Minimum match score threshold Returns: Dictionary with results for each test case """ results = {} with ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as executor: future_to_test = { executor.submit(find_loinc_code, test_name, loinc_path, threshold): test_key for test_key, test_name in test_cases.items() } for future in concurrent.futures.as_completed(future_to_test): test_key = future_to_test[future] try: result = future.result() results[test_key] = result if result else "No match found" except Exception as e: logger.error(f"Error processing {test_key}: {str(e)}") results[test_key] = f"Error: {str(e)}" return results @timing_decorator def save_loinc_code(loinc_path: str, index_path: str) -> None: """ Create an enhanced index of LOINC codes and save it to a file. The index includes multiple fields (COMPONENT, LONG_COMMON_NAME, SHORTNAME) and additional metadata (CLASS) to improve matching quality. Args: loinc_path: Path to the LOINC CSV file index_path: Path to save the index file """ logger.info(f"Creating enhanced LOINC index from {loinc_path}") # Read only necessary columns to reduce memory usage required_columns = ["LOINC_NUM", "COMPONENT", "LONG_COMMON_NAME", "SHORTNAME", "CLASS", "SYSTEM"] loinc_data = pd.read_csv(loinc_path, usecols=required_columns, low_memory=False) # Fill NaN values to avoid checks later loinc_data = loinc_data.fillna({ "COMPONENT": "", "LONG_COMMON_NAME": "", "SHORTNAME": "", "CLASS": "", "SYSTEM": "" }) # Count class frequencies class_counts = loinc_data["CLASS"].value_counts().to_dict() # Create index more efficiently loinc_index = {} # Process component entries component_mask = loinc_data["COMPONENT"] != "" component_data = loinc_data[component_mask] for _, row in component_data.iterrows(): loinc_index[row["COMPONENT"]] = { "LOINC_CODE": row["LOINC_NUM"], "TEST_NAME": row["COMPONENT"], "LONG_NAME": row["LONG_COMMON_NAME"], "SHORT_NAME": row["SHORTNAME"], "CLASS": row["CLASS"], "SYSTEM": row["SYSTEM"], "FIELD": "COMPONENT" } # Process long name entries (only if significantly more informative) long_name_mask = (loinc_data["LONG_COMMON_NAME"] != "") & ( (loinc_data["COMPONENT"] == "") | (loinc_data["LONG_COMMON_NAME"].str.len() > loinc_data["COMPONENT"].str.len() * 1.5) ) long_name_data = loinc_data[long_name_mask] for _, row in long_name_data.iterrows(): loinc_index[row["LONG_COMMON_NAME"]] = { "LOINC_CODE": row["LOINC_NUM"], "TEST_NAME": row["COMPONENT"], "LONG_NAME": row["LONG_COMMON_NAME"], "SHORT_NAME": row["SHORTNAME"], "CLASS": row["CLASS"], "SYSTEM": row["SYSTEM"], "FIELD": "LONG_COMMON_NAME" } # Process short name entries (only if different from component) short_name_mask = (loinc_data["SHORTNAME"] != "") & ( loinc_data["SHORTNAME"] != "deprecated") & ( loinc_data["SHORTNAME"] != loinc_data["COMPONENT"] ) short_name_data = loinc_data[short_name_mask] for _, row in short_name_data.iterrows(): loinc_index[row["SHORTNAME"]] = { "LOINC_CODE": row["LOINC_NUM"], "TEST_NAME": row["COMPONENT"], "LONG_NAME": row["LONG_COMMON_NAME"], "SHORT_NAME": row["SHORTNAME"], "CLASS": row["CLASS"], "SYSTEM": row["SYSTEM"], "FIELD": "SHORTNAME" } # Log top classes top_classes = sorted(class_counts.items(), key=lambda x: x[1], reverse=True)[:20] logger.info("Top 20 CLASS values:") for class_name, count in top_classes: logger.info(f" {class_name}: {count} entries") # Write index to file with open(index_path, "w") as f: json.dump(loinc_index, f, indent=2) logger.info(f"LOINC index created and saved to {index_path}") logger.info(f"Index contains {len(loinc_index)} entries") def load_loinc_index(index_path: str) -> Dict[str, Dict[str, Any]]: """ Load the LOINC index from a file. Args: index_path: Path to the index file Returns: Dictionary containing the LOINC index """ with open(index_path, "r") as f: loinc_index = json.load(f) logger.info(f"Loaded LOINC index from file with {len(loinc_index)} entries from {index_path}") return loinc_index @timing_decorator def find_loinc_code_from_index( test_name: str, loinc_index: Dict[str, Dict[str, Any]], threshold: int = 80 ) -> Optional[Dict[str, Any]]: """ Find the closest LOINC match for a given test name using fuzzy matching from a pre-loaded index with improved relevance scoring. Args: test_name: Name of the test to find loinc_index: Dictionary containing the LOINC index threshold: Minimum match score threshold Returns: Dictionary with LOINC code details if a match is found, None otherwise """ test_names = list(loinc_index.keys()) matches = process.extract(test_name, test_names, limit=10) priority_classes = { "HEM/BC": 10, # Hematology/Blood Count "CHEM": 8, # Chemistry "COAG": 8, # Coagulation "ALLERGY": 5, # Allergy "SERO": 5, # Serology "DRUG/TOX": 3 # Drug/Toxicology } # More pythonic approach using max() with a key function filtered_matches = [(match_name, score) for match_name, score in matches if score >= threshold] if not filtered_matches: logger.info(f"No matches found for '{test_name}' above threshold {threshold}") return None def score_match(match_tuple: tuple) -> float: """Calculate adjusted score for a match""" match_name, base_score = match_tuple match_data = loinc_index[match_name] class_bonus = priority_classes.get(match_data["CLASS"], 0) field_bonus = 5 if match_data["FIELD"] == "COMPONENT" else 0 system = match_data["SYSTEM"].lower() system_bonus = 3 if any(term in system for term in ["blood", "bld", "serum", "ser", "plasma"]) else 0 return base_score + class_bonus + field_bonus + system_bonus best_match_tuple = max(filtered_matches, key=score_match, default=None) if best_match_tuple: best_match, best_score = best_match_tuple best_adjusted_score = score_match(best_match_tuple) result = loinc_index[best_match].copy() result["MATCH_SCORE"] = best_score result["ADJUSTED_SCORE"] = best_adjusted_score else: result = None logger.info(f"Enhanced LOINC code lookup for '{test_name}' completed.") return result def process_test_names_with_index( test_cases: Dict[str, str], loinc_index: Dict[str, Dict[str, Any]], threshold: int = 80, max_workers: Optional[int] = None ) -> Dict[str, Dict[str, Any]]: """ Process multiple test names in parallel using the pre-loaded index Args: test_cases: Dictionary of test cases to process loinc_index: Dictionary containing the LOINC index threshold: Minimum match score threshold max_workers: Maximum number of worker processes (defaults to CPU count) Returns: Dictionary with results for each test case """ if not test_cases: return {} # Use CPU count if max_workers not specified if max_workers is None: max_workers = multiprocessing.cpu_count() # For small batches, limit workers to avoid overhead if len(test_cases) < max_workers: max_workers = max(1, len(test_cases)) results = {} # Use with statement to ensure proper executor cleanup with ProcessPoolExecutor(max_workers=max_workers) as executor: # Create futures dictionary with a dict comprehension futures = { executor.submit(find_loinc_code_from_index, test_name, loinc_index, threshold): test_key for test_key, test_name in test_cases.items() } # Process results as they complete for future in concurrent.futures.as_completed(futures): test_key = futures[future] try: result = future.result() results[test_key] = result if result else "No match found" except Exception as e: logger.error(f"Error processing {test_key}: {str(e)}") results[test_key] = f"Error: {str(e)}" return results class LoincMapper: """ Class for mapping lab test names to LOINC codes. This class provides an interface for mapping lab test names to standardized LOINC codes using fuzzy matching with enhanced relevance scoring based on lab test characteristics. """ def __init__( self, loinc_path: Optional[str] = None, index_path: Optional[str] = None, auto_initialize: bool = True ): """ Initialize the LOINC mapper. Args: loinc_path: Path to the LOINC CSV file. If None, uses default path. index_path: Path to the index file. If None, uses default path. auto_initialize: Whether to automatically initialize the index on creation """ self.loinc_path = loinc_path or str(LOINC_CSV_PATH) self.index_path = index_path or str(LOINC_INDEX_PATH) self.loinc_index: Optional[Dict[str, Dict[str, Any]]] = None if auto_initialize: self._initialize_index() @timing_decorator def _initialize_index(self) -> None: """ Initialize the LOINC index, creating it if it doesn't exist. Raises: FileNotFoundError: If the LOINC CSV file doesn't exist IOError: If there's an error reading/writing the index """ if not Path(self.loinc_path).exists(): raise FileNotFoundError(f"LOINC CSV file not found at {self.loinc_path}") if not Path(self.index_path).exists(): logger.info(f"Enhanced LOINC index not found at {self.index_path}, creating it...") save_loinc_code(self.loinc_path, self.index_path) self.loinc_index = load_loinc_index(self.index_path) logger.info(f"LOINC index initialized with {len(self.loinc_index or {})} entries") def ensure_index_loaded(self) -> None: """Ensure the index is loaded, initializing it if necessary.""" if self.loinc_index is None: self._initialize_index() def find_loinc_code( self, test_name: str, threshold: int = 80 ) -> Optional[Dict[str, Any]]: """ Find the closest LOINC match for a given test name. Args: test_name: Name of the test to find threshold: Minimum match score threshold Returns: Dictionary with LOINC code details if a match is found, None otherwise Raises: RuntimeError: If the index is not initialized """ self.ensure_index_loaded() if not self.loinc_index: raise RuntimeError("LOINC index is not initialized") return find_loinc_code_from_index(test_name, self.loinc_index, threshold) def process_test_names( self, test_cases: Dict[str, str], threshold: int = 80, max_workers: Optional[int] = None ) -> Dict[str, Dict[str, Any]]: """ Process multiple test names in parallel. Args: test_cases: Dictionary of test cases to process threshold: Minimum match score threshold max_workers: Maximum number of worker processes (defaults to CPU count) Returns: Dictionary with results for each test case Raises: RuntimeError: If the index is not initialized """ self.ensure_index_loaded() if not self.loinc_index: raise RuntimeError("LOINC index is not initialized") return process_test_names_with_index( test_cases, self.loinc_index, threshold, max_workers ) def __len__(self) -> int: """Return the number of entries in the LOINC index.""" if self.loinc_index is None: return 0 return len(self.loinc_index) # Global instance that will be initialized at startup _global_loinc_mapper = None def get_loinc_mapper(loinc_path: Optional[str] = None, index_path: Optional[str] = None) -> LoincMapper: """ Get the global LoincMapper instance, creating it if it doesn't exist. Args: loinc_path: Path to the LOINC CSV file. If None, uses default path. index_path: Path to the index file. If None, uses default path. Returns: An initialized LoincMapper instance """ global _global_loinc_mapper if _global_loinc_mapper is None: _global_loinc_mapper = LoincMapper(loinc_path, index_path) return _global_loinc_mapper if __name__ == '__main__': mapper = get_loinc_mapper() test_cases = { "interleukin_6": "Interleukin - 6 (IL-6)", "d_dimer": "D-DIMER (MINI VIDAS)", "hemoglobin": "HAEMOGLOBIN (HB)", "crp": "CRP (NYCOCARD)" } logger.info("Running test with LoincMapper class...") results = mapper.process_test_names(test_cases) matches = sum(1 for r in results.values() if isinstance(r, dict)) non_matches = sum(1 for r in results.values() if r == "No match found") logger.info(f"Matched: {matches}/{len(results)} ({matches/len(results)*100:.1f}%)") logger.info(f"Not matched: {non_matches}/{len(results)} ({non_matches/len(results)*100:.1f}%)") for test_key, result in results.items(): if isinstance(result, dict): print(f"Test: {test_key}") print(f"LOINC Code: {result.get('LOINC_CODE')}") print(f"Field Matched: {result.get('FIELD')}") print(f"Component: {result.get('TEST_NAME')}") print(f"Long Name: {result.get('LONG_NAME')[:80]}..." if len(result.get('LONG_NAME', '')) > 80 else f"Long Name: {result.get('LONG_NAME')}") print(f"Class: {result.get('CLASS')}") print(f"Match Score: {result.get('MATCH_SCORE')}") print(f"Adjusted Score: {result.get('ADJUSTED_SCORE')}") print("-" * 50) else: print(f"Test: {test_key}") print(f"Result: {result}") print("-" * 50)
Editor is loading...
Leave a Comment