Untitled

 avatar
unknown
plain_text
19 days ago
8.8 kB
2
Indexable
import pandas as pd
import numpy as np
import logging
from semopy import ModelMeans
from semopy.report import report
from semopy.plot import  semplot


# Configure logging
logging.basicConfig(
    format="%(asctime)s - %(levelname)s - [%(funcName)s] - %(message)s",
    level=logging.INFO,
    datefmt="%Y-%m-%d %H:%M:%S"
)

logger = logging.getLogger(__name__)

class SEMAnalysis:
    def __init__(self, data_path):
        self.data_path = data_path
        self.df = None
        self.model = None
        self.results = None
        self.composite_reliabilities = {}
        logger.info(f"Initializing SEMAnalysis with data path: {data_path}")

        # Define variable groups
        self.attention_items = [
            'SM1A1', 'SM1A2', 'SM1A3', 'SM2A1', 'SM2A2', 'SM2A3', 
            'SM3A1', 'SM3A2', 'SM3A3', 'SM4A1', 'SM4A2', 'SM4A3',
            'BM1A1', 'BM1A2', 'BM1A3', 'BM2A1', 'BM2A2', 'BM2A3', 
            'BM3A1', 'BM3A2', 'BM3A3', 'BM4A1', 'BM4A2', 'BM4A3'
        ]
        self.engagement_items = [
            'SM1E1', 'SM1E2', 'SM1E3', 'SM1E4', 'SM1E5', 'SM2E1', 
            'SM2E2', 'SM2E3', 'SM2E4', 'SM2E5', 'SM3E1', 'SM3E2', 
            'SM3E3', 'SM3E4', 'SM3E5', 'SM4E1', 'SM4E2', 'SM4E3', 
            'SM4E4', 'SM4E5',
            'BM1E1', 'BM1E2', 'BM1E3', 'BM1E4', 'BM2E1', 'BM2E2', 
            'BM2E3', 'BM2E4', 'BM3E1', 'BM3E2', 'BM3E3', 'BM3E4', 
            'BM4E1', 'BM4E2', 'BM4E3', 'BM4E4'
        ]
        self.visibility_items = ['dV1', 'dV2', 'dV3', 'dV4']

    def load_data(self):
        """Load the dataset from the specified path."""
        try:
            logger.info(f"Loading data from: {self.data_path}")
            self.df = pd.read_csv(self.data_path, delimiter=';')
            self.df.columns = self.df.columns.str.replace("_", "", regex=False)
            logger.info(f"Data loaded successfully. Shape: {self.df.shape}")
        except Exception as e:
            logger.error(f"Error loading data: {str(e)}")
            raise

    def preprocess_data(self):
        """Preprocess the dataset."""
        try:
            logger.info("Starting data preprocessing")
            
            # Replace single spaces with NaN
            for col in self.df.columns:
                self.df[col] = self.df[col].replace(' ', np.nan)
            
            # Convert to numeric
            self.df = self.df.apply(pd.to_numeric, errors='coerce')
            
            # Handle -99 as missing value
            self.df = self.df.replace(-99, np.nan)
            
            # Create gender columns
            self.df['Gender_Female'] = (self.df['Gender'] == 1).astype(int)
            self.df['Gender_Male'] = (self.df['Gender'] == 2).astype(int)
            self.df['Gender_NonBinary'] = (self.df['Gender'] == 0).astype(int)
            
            # Create Inoculation and Visibility
            self.df['Inoculation'] = 0
            self.df.loc[(self.df['Flow'] == 1.0) | (self.df['Flow'] == 4.0), 'Inoculation'] = 1
            
            self.df['Visibility'] = 0
            self.df.loc[(self.df['Flow'] == 2.0) | (self.df['Flow'] == 3.0), 'Visibility'] = 1
            
            # Create difference variables
            self.df['dV1'] = self.df['V11'] - self.df['V12']
            self.df['dV2'] = self.df['V21'] - self.df['V22']
            self.df['dV3'] = self.df['V31'] - self.df['V32']
            self.df['dV4'] = self.df['V41'] - self.df['V42']
            
            # Remove low variance variables
            low_variance = self.df.var()[self.df.var() < 1e-5].index
            self.df.drop(columns=low_variance, inplace=True)
            
            # Standardize numeric columns
            numeric_cols = self.df.select_dtypes(include=['float64', 'int64']).columns
            self.df[numeric_cols] = (self.df[numeric_cols] - self.df[numeric_cols].mean()) / self.df[numeric_cols].std()
            
            # Handle missing values
            self.df.fillna(self.df.mean(), inplace=True)
            
            logger.info("Preprocessing completed successfully")
        except Exception as e:
            logger.error(f"Error during preprocessing: {str(e)}")
            raise

    def specify_model(self):
        """Specify the SEM model."""
        logger.info("Specifying SEM model")
        return """
        # Measurement model for Visibility
        Visibility =~ dV1 + dV2 + dV3 + dV4

        # Measurement model for Attention
        Attention =~ SM1A1 + SM1A2 + SM1A3 + SM2A1 + SM2A2 + SM2A3 + SM3A1 + SM3A2 + SM3A3 + SM4A1 + SM4A2 + SM4A3
        Attention =~ BM1A1 + BM1A2 + BM1A3 + BM2A1 + BM2A2 + BM2A3 + BM3A1 + BM3A2 + BM3A3 + BM4A1 + BM4A2 + BM4A3

        # Measurement model for Engagement
        Engagement =~ SM1E1 + SM1E2 + SM1E3 + SM1E4 + SM1E5 + SM2E1 + SM2E2 + SM2E3 + SM2E4 + SM2E5 + SM3E1 + SM3E2 + SM3E3 + SM3E4 + SM3E5 + SM4E1 + SM4E2 + SM4E3 + SM4E4 + SM4E5
        Engagement =~ BM1E1 + BM1E2 + BM1E3 + BM1E4 + BM2E1 + BM2E2 + BM2E3 + BM2E4 + BM3E1 + BM3E2 + BM3E3 + BM3E4 + BM4E1 + BM4E2 + BM4E3 + BM4E4

        # Structural model
        Attention ~ Inoculation + Visibility
        Engagement ~ Inoculation + Visibility
        """

    def fit_model(self):
        """Fit the SEM model."""
        try:
            logger.info("Starting SEM model fitting")
            model_spec = self.specify_model()
            self.model = ModelMeans(model_spec)
            self.model.fit(self.df)
            self.results = self.model.inspect()
            logger.info("Model fitting completed")
        except Exception as e:
            logger.error(f"Error fitting the SEM model: {str(e)}", exc_info=True)
            raise
            
    def plot_model(self, output_file='sem_model.png'):
        """Create and save path diagram of the SEM model."""
        try:
            logger.info("Creating path diagram")
            
            semplot(self.model, output_file)
            report(self.model, "model_results")
            
            logger.info(f"Path diagram saved to {output_file}")
        except Exception as e:
            logger.error(f"Error creating path diagram: {str(e)}", exc_info=True)
            raise

    def calculate_composite_reliability(self, items):
        """Calculate the composite reliability for a set of items."""
        try:
            logger.info(f"Calculating composite reliability for {len(items)} items")
            loadings = self.results.loc[self.results['lval'].isin(items), 'Estimate'].values
            loadings = np.array(loadings, dtype=float)
            loadings = loadings[~np.isnan(loadings)]
            reliability = np.sum(loadings)**2 / (np.sum(loadings)**2 + np.sum(1 - loadings**2))
            return reliability
        except Exception as e:
            logger.error(f"Error calculating composite reliability: {str(e)}")
            return None

    def run(self):
        """Run the entire SEM analysis pipeline."""
        logger.info("Starting SEM analysis pipeline")
        
        try:
            self.load_data()
            self.preprocess_data()
            self.fit_model()
            
            # Calculate composite reliabilities
            constructs = {
                "Attention": self.attention_items,
                "Engagement": self.engagement_items,
                "Visibility": self.visibility_items
            }
            
            for construct, items in constructs.items():
                logger.info(f"Processing reliability for construct: {construct}")
                reliability = self.calculate_composite_reliability(items)
                self.composite_reliabilities[construct] = reliability
            
            # Create model visualization
            self.plot_model()
            
            # Print results
            print("\nModel Results:")
            print("=" * 80)
            print(report(self.model, "model_results"))
            
            print("\nComposite Reliabilities:")
            print("-" * 40)
            for construct, rel in self.composite_reliabilities.items():
                print(f"{construct:<20} = {rel:>10.4f}")
            
            logger.info("Analysis pipeline completed successfully")
            
        except Exception as e:
            logger.error("Error in analysis pipeline", exc_info=True)
            raise

if __name__ == "__main__":
    logger.info("Starting SEM Analysis script")
    
    try:
        data_path = './ThesisData6.csv'
        logger.info(f"Using data path: {data_path}")
        
        sem_analysis = SEMAnalysis(data_path)
        sem_analysis.run()
        
        logger.info("Script completed successfully")
    except Exception as e:
        logger.error("Script failed with error", exc_info=True)
        raise
Editor is loading...
Leave a Comment