Table of Contents

Github ML Pipeline

Github repository structure

screenshot_2025-08-08_at_10.37.34 am

mlops-pipeline.yml


name: MLOps CI/CD Pipeline

on:
  push:
    branches: [ main ]
  pull_request:
    branches: [ main ]

env:
  AZURE_ML_WORKSPACE: ${{ secrets.AZURE_ML_WORKSPACE }}
  AZURE_RESOURCE_GROUP: ${{ secrets.AZURE_RESOURCE_GROUP }}
  AZURE_SUBSCRIPTION_ID: ${{ secrets.AZURE_SUBSCRIPTION_ID }}
  MODEL_NAME: "customer-churn-model"
  ENDPOINT_NAME: "churn-prediction-endpoint"

permissions:
  id-token: write
  contents: read

jobs:
  unit-tests:
    runs-on: ubuntu-latest
    steps:
    - name: Checkout code
      uses: actions/checkout@v4

    - name: Set up Python
      uses: actions/setup-python@v4
      with:
        python-version: '3.9'

    - name: Install dependencies
      run: |
        python -m pip install --upgrade pip
        pip install -r requirements.txt
        pip install pytest pytest-cov flake8

    - name: Run linting
      run: |
        flake8 src/ --count --select=E9,F63,F7,F82 --show-source --statistics
        flake8 src/ --count --exit-zero --max-complexity=10 --max-line-length=127 --statistics

    - name: Run unit tests
      run: |
        pytest tests/ -v --cov=src --cov-report=xml --cov-report=term-missing

    - name: Upload coverage reports
      uses: codecov/codecov-action@v3
      with:
        file: ./coverage.xml
        flags: unittests

  build-and-train:
    needs: unit-tests
    runs-on: ubuntu-latest
    if: github.event_name == 'push' && github.ref == 'refs/heads/main'
    
    steps:
    - name: Checkout code
      uses: actions/checkout@v4

    - name: Set up Python
      uses: actions/setup-python@v4
      with:
        python-version: '3.9'

    - name: Install Azure CLI and ML extension
      run: |
        curl -sL https://aka.ms/InstallAzureCLIDeb | sudo bash
        az extension add -n ml
    - name: Azure Login
      uses: azure/login@v2
      with:
        client-id: ${{ secrets.AZURE_CLIENT_ID }}
        tenant-id: ${{ secrets.AZURE_TENANT_ID }}
        subscription-id: ${{ secrets.AZURE_SUBSCRIPTION_ID }}
    # auth-type: OIDC is implied in v2 when no client-secret is provided

    - name: Install dependencies
      run: |
        python -m pip install --upgrade pip
        pip install -r requirements.txt

    - name: Set Azure ML workspace
      run: |
        az configure --defaults group=$AZURE_RESOURCE_GROUP workspace=$AZURE_ML_WORKSPACE

    - name: Create compute cluster if not exists
      run: |
        az ml compute create --name cpu-cluster --type amlcompute --min-instances 0 --max-instances 4 --size Standard_DS3_v2 || true

    - name: Submit training job
      id: training
      run: |
        RUN_ID=$(az ml job create --file ml/training-job.yml --query name -o tsv)
        echo "run_id=$RUN_ID" >> $GITHUB_OUTPUT
        echo "Training job submitted with ID: $RUN_ID"
        
        # Wait for job completion with timeout
        timeout 1800 az ml job stream --name $RUN_ID || {
          echo "Training job timed out or failed"
          az ml job show --name $RUN_ID --query status
          exit 1
        }

    - name: Get training metrics
      run: |
        az ml job show --name ${{ steps.training.outputs.run_id }} --query tags

    - name: Debug job outputs
      run: |
        echo "Checking job outputs..."
        az ml job show --name ${{ steps.training.outputs.run_id }} --query outputs
        
        echo "Listing job artifacts..."
        az ml job download --name ${{ steps.training.outputs.run_id }} --output-name model_output --download-path ./debug_model --all || echo "Download failed, checking alternative paths"

    - name: Debug job outputs and register model
      id: register
      run: |
        echo "=== Debugging Job Outputs ==="
        JOB_ID=${{ steps.training.outputs.run_id }}
        echo "Job ID: $JOB_ID"
        
        # Check job details
        echo "Job details:"
        az ml job show --name $JOB_ID --query "{status: status, outputs: outputs}" -o json
        
        # List all outputs
        echo "Available outputs:"
        az ml job show --name $JOB_ID --query "outputs" -o json
        
        # Check if there's a model_output
        echo "Checking model_output specifically:"
        az ml job show --name $JOB_ID --query "outputs.model_output" -o json || echo "model_output not found"
        
        # Get MLflow run ID if available
        echo "Getting MLflow run ID:"
        MLFLOW_RUN_ID=$(az ml job show --name $JOB_ID --query "properties.mlflow.runId" -o tsv 2>/dev/null || echo "not_found")
        echo "MLflow Run ID: $MLFLOW_RUN_ID"
        
        # Try to register model with correct path
        echo "=== Attempting Model Registration ==="
        
        if [ "$MLFLOW_RUN_ID" != "not_found" ] && [ ! -z "$MLFLOW_RUN_ID" ]; then
          echo "Trying MLflow run path..."
          MODEL_VERSION=$(az ml model create \
            --name $MODEL_NAME \
            --path "runs:/$MLFLOW_RUN_ID/model" \
            --type mlflow_model \
            --description "Customer churn prediction model trained via GitHub Actions" \
            --query version -o tsv 2>/dev/null || echo "mlflow_failed")
        else
          MODEL_VERSION="mlflow_failed"
        fi
        
        if [ "$MODEL_VERSION" = "mlflow_failed" ] || [ -z "$MODEL_VERSION" ]; then
          echo "MLflow path failed, trying job output path..."
          MODEL_VERSION=$(az ml model create \
            --name $MODEL_NAME \
            --path "azureml://jobs/$JOB_ID/outputs/model_output" \
            --type mlflow_model \
            --description "Customer churn prediction model trained via GitHub Actions" \
            --query version -o tsv 2>/dev/null || echo "job_output_failed")
        fi
        
        if [ "$MODEL_VERSION" = "job_output_failed" ] || [ -z "$MODEL_VERSION" ]; then
          echo "Job output path failed, checking what files actually exist..."
          # Try to download and check the structure
          mkdir -p ./debug_download
          az ml job download --name $JOB_ID --download-path ./debug_download --all || echo "Download failed"
          echo "Downloaded structure:"
          find ./debug_download -type f -name "*.pkl" -o -name "MLmodel" -o -name "*.json" | head -20
          
          # Look for any MLmodel files which indicate MLflow model format
          MLMODEL_PATH=$(find ./debug_download -name "MLmodel" | head -1)
          if [ ! -z "$MLMODEL_PATH" ]; then
            MODEL_DIR=$(dirname "$MLMODEL_PATH")
            RELATIVE_PATH=${MODEL_DIR#./debug_download/}
            echo "Found MLmodel at: $MLMODEL_PATH"
            echo "Relative path: $RELATIVE_PATH"
            
            # Try registering with the discovered path
            MODEL_VERSION=$(az ml model create \
              --name $MODEL_NAME \
              --path "azureml://jobs/$JOB_ID/outputs/$RELATIVE_PATH" \
              --type mlflow_model \
              --description "Customer churn prediction model trained via GitHub Actions" \
              --query version -o tsv)
          else
            echo "No MLmodel file found. Available files:"
            find ./debug_download -type f | head -20
            exit 1
          fi
        fi
        
        if [ ! -z "$MODEL_VERSION" ] && [ "$MODEL_VERSION" != "mlflow_failed" ] && [ "$MODEL_VERSION" != "job_output_failed" ]; then
          echo "model_version=$MODEL_VERSION" >> $GITHUB_OUTPUT
          echo "Model successfully registered with version: $MODEL_VERSION"
        else
          echo "Failed to register model after all attempts"
          exit 1
        fi
    - name: Create/Update Online Endpoint
      run: |
        # Check if endpoint exists
        if az ml online-endpoint show --name $ENDPOINT_NAME &>/dev/null; then
          echo "Endpoint $ENDPOINT_NAME already exists"
        else
          echo "Creating new endpoint $ENDPOINT_NAME"
          az ml online-endpoint create --file ml/endpoint.yml --name $ENDPOINT_NAME
        fi

    - name: Deploy model to endpoint
      run: |
        # Create deployment configuration
        cat > deployment.yml << EOF
        \$schema: https://azuremlschemas.azureedge.net/latest/managedOnlineDeployment.schema.json
        name: blue
        endpoint_name: $ENDPOINT_NAME
        model: azureml:$MODEL_NAME:${{ steps.register.outputs.model_version }}
        instance_type: Standard_DS3_v2
        instance_count: 1
        environment_variables:
          MLFLOW_MODEL_DIRECTORY: /var/azureml-app/azureml-models/model/1
        request_settings:
          request_timeout_ms: 90000
          max_concurrent_requests_per_instance: 1
        liveness_probe:
          initial_delay: 10
          period: 10
          timeout: 2
          success_threshold: 1
          failure_threshold: 30
        readiness_probe:
          initial_delay: 10
          period: 10
          timeout: 2
          success_threshold: 1
          failure_threshold: 3
        EOF
        
        az ml online-deployment create --file deployment.yml --all-traffic

    - name: Test endpoint
      run: |
        az ml online-endpoint invoke --name $ENDPOINT_NAME --request-file ml/sample-request.json

    - name: Send custom metrics to Azure Monitor
      run: |
        # Send custom metrics about the deployment
        az monitor metrics send \
          --resource "/subscriptions/$AZURE_SUBSCRIPTION_ID/resourceGroups/$AZURE_RESOURCE_GROUP/providers/Microsoft.MachineLearningServices/workspaces/$AZURE_ML_WORKSPACE" \
          --metric-name "ModelDeploymentSuccess" \
          --metric-value 1 \
          --metric-timestamp $(date -u +"%Y-%m-%dT%H:%M:%SZ") || true

    - name: Create Application Insights alert
      run: |
        # Create alert rule for endpoint failures
        az monitor metrics alert create \
          --name "MLEndpointFailureAlert" \
          --resource-group $AZURE_RESOURCE_GROUP \
          --condition "avg requests/failed > 5" \
          --window-size 5m \
          --evaluation-frequency 1m \
          --severity 2 \
          --description "Alert when ML endpoint has more than 5 failed requests" \
          --scopes "/subscriptions/$AZURE_SUBSCRIPTION_ID/resourceGroups/$AZURE_RESOURCE_GROUP/providers/Microsoft.MachineLearningServices/workspaces/$AZURE_ML_WORKSPACE/onlineEndpoints/$ENDPOINT_NAME" || true

  notify-on-failure:
    runs-on: ubuntu-latest
    needs: [unit-tests, build-and-train]
    if: failure()
    
    steps:
    - name: Notify on pipeline failure
      uses: 8398a7/action-slack@v3
      with:
        status: failure
        channel: '#mlops-alerts'
        webhook_url: ${{ secrets.SLACK_WEBHOOK }}
      env:
        SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK }}

    - name: Create GitHub issue on failure
      if: github.event_name == 'push'
      uses: actions/github-script@v6
      with:
        script: |
          github.rest.issues.create({
            owner: context.repo.owner,
            repo: context.repo.repo,
            title: `MLOps Pipeline Failed - ${context.sha.substring(0, 7)}`,
            body: `The MLOps pipeline failed for commit ${context.sha}.\n\nWorkflow: ${context.workflow}\nRun: ${context.runNumber}\n\nPlease investigate the failure.`,
            labels: ['mlops', 'pipeline-failure', 'bug']
          })

  security-scan:
    runs-on: ubuntu-latest
    steps:
    - name: Checkout code
      uses: actions/checkout@v4

    - name: Run Trivy vulnerability scanner
      uses: aquasecurity/trivy-action@master
      with:
        scan-type: 'fs'
        scan-ref: '.'
        format: 'sarif'
        output: 'trivy-results.sarif'

    - name: Upload Trivy scan results
      uses: github/codeql-action/upload-sarif@v2
      with:
        sarif_file: 'trivy-results.sarif'
        
        
 

endpoint.yml

$schema: https://azuremlschemas.azureedge.net/latest/managedOnlineEndpoint.schema.json
name: churn-prediction-endpoint
description: Endpoint for customer churn prediction
auth_mode: key

tags:
  model: customer-churn-model
  environment: production
  

environment.yml

# ml/environment.yml
name: ml-environment
dependencies:
  - python=3.9
  - pip
  - pip:
    - scikit-learn==1.3.0
    - pandas==2.0.3
    - numpy==1.24.3
    - mlflow==2.5.0
    - azure-ai-ml==1.8.0
    - joblib==1.3.1

sample-request.json

{
  "input_data": {
    "columns": [
      "Account Length", "Area Code", "VMail Message", "Day Mins", "Day Calls", 
      "Day Charge", "Eve Mins", "Eve Calls", "Eve Charge", "Night Mins", 
      "Night Calls", "Night Charge", "Intl Mins", "Intl Calls", "Intl Charge", 
      "CustServ Calls", "State", "Int'l Plan", "VMail Plan", 
      "Avg_Day_Call_Duration", "Avg_Eve_Call_Duration", "Avg_Night_Call_Duration",
      "Total_Charges", "Total_Usage_Mins"
    ],
    "index": [0],
    "data": [
      [142, 408, 28, 180.5, 95, 30.69, 210.2, 88, 17.87, 201.9, 82, 9.09, 
       8.5, 4, 2.30, 2, 5, 0, 1, 1.9, 2.4, 2.5, 59.95, 601.1]
    ]
  }
}

== training-job.yml ==

# ml/training-job.yml
$schema: https://azuremlschemas.azureedge.net/latest/commandJob.schema.json
type: command

display_name: Customer Churn Model Training
description: Training job for customer churn prediction model

experiment_name: customer-churn-experiment

compute: azureml:cpu-cluster

environment: azureml:AzureML-sklearn-1.0-ubuntu20.04-py38-cpu@latest

code: ../src

command: >
  python train.py
  --data_path ${{inputs.training_data}}
  --model_output ${{outputs.model_output}}
  --test_size 0.2
  --random_state 42

inputs:
  training_data:
    type: uri_file
    path: https://raw.githubusercontent.com/albayraktaroglu/Datasets/master/churn.csv

outputs:
  model_output:
    type: uri_folder
    mode: rw_mount

environment_variables:
  MLFLOW_TRACKING_URI: "azureml://experiments/customer-churn-experiment"

tags:
  model_type: "classification"
  framework: "scikit-learn"
  training_method: "automated"
  

train.py


# src/train.py
import argparse
import os
import pandas as pd
import numpy as np
import mlflow
import mlflow.sklearn
import joblib
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import LabelEncoder, StandardScaler
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, roc_auc_score
from sklearn.pipeline import Pipeline
import logging

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def parse_args():
    parser = argparse.ArgumentParser(description="Train customer churn model")
    parser.add_argument("--data_path", type=str, required=True, help="Path to training data")
    parser.add_argument("--model_output", type=str, required=True, help="Path to save model")
    parser.add_argument("--test_size", type=float, default=0.2, help="Test set size")
    parser.add_argument("--random_state", type=int, default=42, help="Random state")
    parser.add_argument("--n_estimators", type=int, default=100, help="Number of trees")
    parser.add_argument("--max_depth", type=int, default=10, help="Maximum depth")
    return parser.parse_args()

def load_and_preprocess_data(data_path):
    """Load and preprocess the customer churn data"""
    logger.info(f"Loading data from {data_path}")
    
    # For demo purposes, creating sample data if file doesn't exist
    if not os.path.exists(data_path):
        logger.info("Creating sample data for demonstration")
        np.random.seed(42)
        n_samples = 10000
        
        data = {
            'CreditScore': np.random.randint(350, 850, n_samples),
            'Geography': np.random.choice(['France', 'Spain', 'Germany'], n_samples),
            'Gender': np.random.choice(['Male', 'Female'], n_samples),
            'Age': np.random.randint(18, 92, n_samples),
            'Tenure': np.random.randint(0, 10, n_samples),
            'Balance': np.random.uniform(0, 250000, n_samples),
            'NumOfProducts': np.random.randint(1, 4, n_samples),
            'HasCrCard': np.random.choice([0, 1], n_samples),
            'IsActiveMember': np.random.choice([0, 1], n_samples),
            'EstimatedSalary': np.random.uniform(0, 200000, n_samples),
        }
        
        # Create target with some logic
        df = pd.DataFrame(data)
        churn_prob = (
            0.1 + 
            0.3 * (df['Age'] > 50).astype(int) +
            0.2 * (df['NumOfProducts'] == 1).astype(int) +
            0.15 * (df['IsActiveMember'] == 0).astype(int) +
            0.1 * (df['Balance'] == 0).astype(int)
        )
        df['Exited'] = np.random.binomial(1, churn_prob)
    else:
        df = pd.read_csv(data_path)
    
    logger.info(f"Data shape: {df.shape}")
    logger.info(f"Column names: {list(df.columns)}")
    
    # Handle different target column names
    if 'Churn?' in df.columns:
        # Convert Churn? to binary Exited column
        df['Exited'] = (df['Churn?'] == 'True.').astype(int)
        logger.info(f"Churn rate: {df['Exited'].mean():.3f}")
        # Drop the original column
        df = df.drop('Churn?', axis=1)
    elif 'Exited' in df.columns:
        logger.info(f"Churn rate: {df['Exited'].mean():.3f}")
    else:
        logger.error("No target column found. Expected 'Exited' or 'Churn?'")
        raise ValueError("Target column not found")
    
    return df

def preprocess_features(df):
    """Preprocess features for training"""
    df_processed = df.copy()
    label_encoders = {}
    
    # Handle different dataset structures
    if 'Geography' in df.columns and 'Gender' in df.columns:
        # Bank churn dataset structure
        le_geography = LabelEncoder()
        le_gender = LabelEncoder()
        
        df_processed['Geography'] = le_geography.fit_transform(df['Geography'])
        df_processed['Gender'] = le_gender.fit_transform(df['Gender'])
        
        label_encoders['Geography'] = le_geography
        label_encoders['Gender'] = le_gender
        
        # Feature engineering for bank dataset
        if 'CreditScore' in df.columns and 'Age' in df.columns:
            df_processed['CreditScore_Age_Ratio'] = df_processed['CreditScore'] / df_processed['Age']
        if 'Balance' in df.columns and 'EstimatedSalary' in df.columns:
            df_processed['Balance_Salary_Ratio'] = df_processed['Balance'] / (df_processed['EstimatedSalary'] + 1)
            df_processed['Balance_Log'] = np.log1p(df_processed['Balance'])
            df_processed['EstimatedSalary_Log'] = np.log1p(df_processed['EstimatedSalary'])
    
    else:
        # Telecom churn dataset structure
        logger.info("Processing telecom churn dataset")
        
        # Handle categorical columns
        categorical_columns = []
        for col in df_processed.columns:
            if df_processed[col].dtype == 'object' and col != 'Exited':
                categorical_columns.append(col)
        
        logger.info(f"Categorical columns found: {categorical_columns}")
        
        # Encode categorical variables
        for col in categorical_columns:
            if col not in ['Phone']:  # Skip phone numbers
                le = LabelEncoder()
                df_processed[col] = le.fit_transform(df_processed[col].astype(str))
                label_encoders[col] = le
        
        # Drop non-predictive columns
        columns_to_drop = ['Phone']
        df_processed = df_processed.drop(columns=[col for col in columns_to_drop if col in df_processed.columns])
        
        # Feature engineering for telecom dataset
        if 'Day Mins' in df_processed.columns and 'Day Calls' in df_processed.columns:
            df_processed['Avg_Day_Call_Duration'] = df_processed['Day Mins'] / (df_processed['Day Calls'] + 1)
        
        if 'Eve Mins' in df_processed.columns and 'Eve Calls' in df_processed.columns:
            df_processed['Avg_Eve_Call_Duration'] = df_processed['Eve Mins'] / (df_processed['Eve Calls'] + 1)
        
        if 'Night Mins' in df_processed.columns and 'Night Calls' in df_processed.columns:
            df_processed['Avg_Night_Call_Duration'] = df_processed['Night Mins'] / (df_processed['Night Calls'] + 1)
        
        # Total usage features
        usage_cols = [col for col in df_processed.columns if 'Mins' in col]
        if usage_cols:
            df_processed['Total_Usage_Mins'] = df_processed[usage_cols].sum(axis=1)
        
        charge_cols = [col for col in df_processed.columns if 'Charge' in col]
        if charge_cols:
            df_processed['Total_Charges'] = df_processed[charge_cols].sum(axis=1)
    
    # Handle boolean columns
    for col in df_processed.columns:
        if df_processed[col].dtype == 'bool':
            df_processed[col] = df_processed[col].astype(int)
    
    return df_processed, label_encoders

def train_model(X_train, y_train, n_estimators, max_depth, random_state):
    """Train the random forest model"""
    logger.info("Training Random Forest model")
    
    # Create pipeline with scaling
    pipeline = Pipeline([
        ('scaler', StandardScaler()),
        ('classifier', RandomForestClassifier(
            n_estimators=n_estimators,
            max_depth=max_depth,
            random_state=random_state,
            n_jobs=-1
        ))
    ])
    
    pipeline.fit(X_train, y_train)
    return pipeline

def evaluate_model(model, X_test, y_test):
    """Evaluate model performance"""
    y_pred = model.predict(X_test)
    y_pred_proba = model.predict_proba(X_test)[:, 1]
    
    metrics = {
        'accuracy': accuracy_score(y_test, y_pred),
        'precision': precision_score(y_test, y_pred),
        'recall': recall_score(y_test, y_pred),
        'f1_score': f1_score(y_test, y_pred),
        'roc_auc': roc_auc_score(y_test, y_pred_proba)
    }
    
    logger.info("Model Performance:")
    for metric, value in metrics.items():
        logger.info(f"{metric}: {value:.4f}")
    
    return metrics

def main():
    args = parse_args()
    
    # Start MLflow run
    with mlflow.start_run():
        # Log parameters
        mlflow.log_param("test_size", args.test_size)
        mlflow.log_param("random_state", args.random_state)
        mlflow.log_param("n_estimators", args.n_estimators)
        mlflow.log_param("max_depth", args.max_depth)
        
        # Load and preprocess data
        df = load_and_preprocess_data(args.data_path)
        df_processed, label_encoders = preprocess_features(df)
        
        # Prepare features and target
        feature_columns = [col for col in df_processed.columns if col != 'Exited']
        X = df_processed[feature_columns]
        y = df_processed['Exited']
        
        logger.info(f"Feature columns: {feature_columns}")
        logger.info(f"Features shape: {X.shape}")
        logger.info(f"Target shape: {y.shape}")
        
        # Split data
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=args.test_size, random_state=args.random_state, stratify=y
        )
        
        logger.info(f"Training set size: {X_train.shape[0]}")
        logger.info(f"Test set size: {X_test.shape[0]}")
        
        # Train model
        model = train_model(X_train, y_train, args.n_estimators, args.max_depth, args.random_state)
        
        # Evaluate model
        metrics = evaluate_model(model, X_test, y_test)
        
        # Log metrics
        for metric, value in metrics.items():
            mlflow.log_metric(metric, value)
        
        # Log feature importance
        feature_importance = model.named_steps['classifier'].feature_importances_
        for i, (feature, importance) in enumerate(zip(feature_columns, feature_importance)):
            # Sanitize feature name for MLflow (remove invalid characters)
            sanitized_feature = feature.replace("'", "").replace(" ", "_").replace("-", "_")
            mlflow.log_metric(f"feature_importance_{sanitized_feature}", importance)
        
        # Log additional info
        mlflow.log_metric("training_samples", len(X_train))
        mlflow.log_metric("test_samples", len(X_test))
        mlflow.log_metric("n_features", X_train.shape[1])
        
        # Save model artifacts
        os.makedirs(args.model_output, exist_ok=True)
        
        # Save the model using MLflow
        model_path = os.path.join(args.model_output, "model")
        mlflow.sklearn.save_model(model, model_path)
        
        # Log the model path for debugging
        logger.info(f"Model saved to: {model_path}")
        logger.info(f"Contents of model output directory: {os.listdir(args.model_output)}")
        
        # Save preprocessing objects
        joblib.dump(label_encoders, os.path.join(args.model_output, "label_encoders.pkl"))
        joblib.dump(feature_columns, os.path.join(args.model_output, "feature_columns.pkl"))
        
        # Log model with MLflow
        mlflow.sklearn.log_model(
            model, 
            "model",
            registered_model_name="customer-churn-model"
        )
        
        logger.info(f"Model saved to {args.model_output}")
        logger.info("Training completed successfully!")

if __name__ == "__main__":
    main()
    
    

test_integration.py


# tests/test_integration.py
import pytest
import tempfile
import os
import pandas as pd
import numpy as np
from src.train import main
import sys
from unittest.mock import patch

class TestIntegration:
    
    def test_end_to_end_training(self):
        """Test the complete training pipeline"""
        with tempfile.TemporaryDirectory() as temp_dir:
            # Create test arguments
            test_args = [
                'train.py',
                '--data_path', 'non_existent_path.csv',  # Will trigger sample data creation
                '--model_output', temp_dir,
                '--test_size', '0.2',
                '--random_state', '42',
                '--n_estimators', '10',
                '--max_depth', '5'
            ]
            
            # Mock sys.argv
            with patch.object(sys, 'argv', test_args):
                try:
                    main()
                    
                    # Check that model files are created
                    assert os.path.exists(os.path.join(temp_dir, 'model'))
                    assert os.path.exists(os.path.join(temp_dir, 'label_encoders.pkl'))
                    assert os.path.exists(os.path.join(temp_dir, 'feature_columns.pkl'))
                    
                except SystemExit as e:
                    # MLflow might cause a system exit, which is okay for testing
                    if e.code != 0:
                        raise
    
    def test_end_to_end_training_with_telecom_data(self):
        """Test the complete training pipeline with telecom-style data"""
        with tempfile.TemporaryDirectory() as temp_dir:
            # Create a temporary CSV file with telecom data structure
            test_data_path = os.path.join(temp_dir, 'test_churn.csv')
            
            # Create sample telecom data
            np.random.seed(42)
            n_samples = 100
            
            telecom_data = {
                'State': np.random.choice(['TX', 'CA', 'NY', 'FL'], n_samples),
                'Account Length': np.random.randint(1, 243, n_samples),
                'Area Code': np.random.choice([408, 415, 510], n_samples),
                'Phone': [f"{np.random.randint(100,999)}-{np.random.randint(1000,9999)}" for _ in range(n_samples)],
                "Int'l Plan": np.random.choice(['yes', 'no'], n_samples),
                'VMail Plan': np.random.choice(['yes', 'no'], n_samples),
                'VMail Message': np.random.randint(0, 51, n_samples),
                'Day Mins': np.random.uniform(0, 351, n_samples),
                'Day Calls': np.random.randint(0, 166, n_samples),
                'Day Charge': np.random.uniform(0, 60, n_samples),
                'Eve Mins': np.random.uniform(0, 364, n_samples),
                'Eve Calls': np.random.randint(0, 171, n_samples),
                'Eve Charge': np.random.uniform(0, 31, n_samples),
                'Night Mins': np.random.uniform(0, 396, n_samples),
                'Night Calls': np.random.randint(0, 176, n_samples),
                'Night Charge': np.random.uniform(0, 18, n_samples),
                'Intl Mins': np.random.uniform(0, 20, n_samples),
                'Intl Calls': np.random.randint(0, 21, n_samples),
                'Intl Charge': np.random.uniform(0, 6, n_samples),
                'CustServ Calls': np.random.randint(0, 10, n_samples),
                'Churn?': np.random.choice(['True.', 'False.'], n_samples)
            }
            
            df = pd.DataFrame(telecom_data)
            df.to_csv(test_data_path, index=False)
            
            # Create test arguments
            test_args = [
                'train.py',
                '--data_path', test_data_path,
                '--model_output', temp_dir,
                '--test_size', '0.2',
                '--random_state', '42',
                '--n_estimators', '10',
                '--max_depth', '5'
            ]
            
            # Mock sys.argv
            with patch.object(sys, 'argv', test_args):
                try:
                    main()
                    
                    # Check that model files are created
                    assert os.path.exists(os.path.join(temp_dir, 'model'))
                    assert os.path.exists(os.path.join(temp_dir, 'label_encoders.pkl'))
                    assert os.path.exists(os.path.join(temp_dir, 'feature_columns.pkl'))
                    
                    # Verify the saved files contain expected data
                    import joblib
                    label_encoders = joblib.load(os.path.join(temp_dir, 'label_encoders.pkl'))
                    feature_columns = joblib.load(os.path.join(temp_dir, 'feature_columns.pkl'))
                    
                    # Should have label encoders for categorical columns
                    assert isinstance(label_encoders, dict)
                    assert len(label_encoders) > 0
                    
                    # Should have feature columns
                    assert isinstance(feature_columns, list)
                    assert len(feature_columns) > 0
                    assert 'Exited' not in feature_columns  # Target should not be in features
                    
                except SystemExit as e:
                    # MLflow might cause a system exit, which is okay for testing
                    if e.code != 0:
                        raise
    
    def test_end_to_end_training_with_bank_data(self):
        """Test the complete training pipeline with bank-style data"""
        with tempfile.TemporaryDirectory() as temp_dir:
            # Create a temporary CSV file with bank data structure
            test_data_path = os.path.join(temp_dir, 'test_bank_churn.csv')
            
            # Create sample bank data
            np.random.seed(42)
            n_samples = 100
            
            bank_data = {
                'CreditScore': np.random.randint(350, 850, n_samples),
                'Geography': np.random.choice(['France', 'Spain', 'Germany'], n_samples),
                'Gender': np.random.choice(['Male', 'Female'], n_samples),
                'Age': np.random.randint(18, 92, n_samples),
                'Tenure': np.random.randint(0, 10, n_samples),
                'Balance': np.random.uniform(0, 250000, n_samples),
                'NumOfProducts': np.random.randint(1, 4, n_samples),
                'HasCrCard': np.random.choice([0, 1], n_samples),
                'IsActiveMember': np.random.choice([0, 1], n_samples),
                'EstimatedSalary': np.random.uniform(0, 200000, n_samples),
                'Exited': np.random.choice([0, 1], n_samples)
            }
            
            df = pd.DataFrame(bank_data)
            df.to_csv(test_data_path, index=False)
            
            # Create test arguments
            test_args = [
                'train.py',
                '--data_path', test_data_path,
                '--model_output', temp_dir,
                '--test_size', '0.2',
                '--random_state', '42',
                '--n_estimators', '10',
                '--max_depth', '5'
            ]
            
            # Mock sys.argv
            with patch.object(sys, 'argv', test_args):
                try:
                    main()
                    
                    # Check that model files are created
                    assert os.path.exists(os.path.join(temp_dir, 'model'))
                    assert os.path.exists(os.path.join(temp_dir, 'label_encoders.pkl'))
                    assert os.path.exists(os.path.join(temp_dir, 'feature_columns.pkl'))
                    
                    # Verify the saved files contain expected data
                    import joblib
                    label_encoders = joblib.load(os.path.join(temp_dir, 'label_encoders.pkl'))
                    feature_columns = joblib.load(os.path.join(temp_dir, 'feature_columns.pkl'))
                    
                    # Should have label encoders for Geography and Gender
                    assert isinstance(label_encoders, dict)
                    assert 'Geography' in label_encoders
                    assert 'Gender' in label_encoders
                    
                    # Should have feature columns including engineered features
                    assert isinstance(feature_columns, list)
                    assert len(feature_columns) > 10  # Should have original + engineered features
                    assert 'Exited' not in feature_columns  # Target should not be in features
                    
                    # Check for engineered features
                    assert 'CreditScore_Age_Ratio' in feature_columns
                    assert 'Balance_Salary_Ratio' in feature_columns
                    
                except SystemExit as e:
                    # MLflow might cause a system exit, which is okay for testing
                    if e.code != 0:
                        raise
                        
                        

test_train.py


# tests/test_train.py
import pytest
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from src.train import load_and_preprocess_data, preprocess_features, train_model, evaluate_model

class TestTrainingPipeline:
    
    @pytest.fixture
    def sample_bank_data(self):
        """Create sample bank churn data for testing"""
        np.random.seed(42)
        n_samples = 100
        
        data = {
            'CreditScore': np.random.randint(350, 850, n_samples),
            'Geography': np.random.choice(['France', 'Spain', 'Germany'], n_samples),
            'Gender': np.random.choice(['Male', 'Female'], n_samples),
            'Age': np.random.randint(18, 92, n_samples),
            'Tenure': np.random.randint(0, 10, n_samples),
            'Balance': np.random.uniform(0, 250000, n_samples),
            'NumOfProducts': np.random.randint(1, 4, n_samples),
            'HasCrCard': np.random.choice([0, 1], n_samples),
            'IsActiveMember': np.random.choice([0, 1], n_samples),
            'EstimatedSalary': np.random.uniform(0, 200000, n_samples),
            'Exited': np.random.choice([0, 1], n_samples)
        }
        
        return pd.DataFrame(data)
    
    @pytest.fixture
    def sample_telecom_data(self):
        """Create sample telecom churn data for testing"""
        np.random.seed(42)
        n_samples = 100
        
        data = {
            'State': np.random.choice(['TX', 'CA', 'NY', 'FL'], n_samples),
            'Account Length': np.random.randint(1, 243, n_samples),
            'Area Code': np.random.choice([408, 415, 510], n_samples),
            'Phone': [f"{np.random.randint(100,999)}-{np.random.randint(1000,9999)}" for _ in range(n_samples)],
            "Int'l Plan": np.random.choice(['yes', 'no'], n_samples),
            'VMail Plan': np.random.choice(['yes', 'no'], n_samples),
            'VMail Message': np.random.randint(0, 51, n_samples),
            'Day Mins': np.random.uniform(0, 351, n_samples),
            'Day Calls': np.random.randint(0, 166, n_samples),
            'Day Charge': np.random.uniform(0, 60, n_samples),
            'Eve Mins': np.random.uniform(0, 364, n_samples),
            'Eve Calls': np.random.randint(0, 171, n_samples),
            'Eve Charge': np.random.uniform(0, 31, n_samples),
            'Night Mins': np.random.uniform(0, 396, n_samples),
            'Night Calls': np.random.randint(0, 176, n_samples),
            'Night Charge': np.random.uniform(0, 18, n_samples),
            'Intl Mins': np.random.uniform(0, 20, n_samples),
            'Intl Calls': np.random.randint(0, 21, n_samples),
            'Intl Charge': np.random.uniform(0, 6, n_samples),
            'CustServ Calls': np.random.randint(0, 10, n_samples),
            'Churn?': np.random.choice(['True.', 'False.'], n_samples)
        }
        
        return pd.DataFrame(data)
    
    def test_preprocess_bank_features(self, sample_bank_data):
        """Test feature preprocessing for bank data"""
        df_processed, label_encoders = preprocess_features(sample_bank_data)
        
        # Check that categorical variables are encoded
        assert df_processed['Geography'].dtype in ['int64', 'int32']
        assert df_processed['Gender'].dtype in ['int64', 'int32']
        
        # Check that new features are created
        assert 'CreditScore_Age_Ratio' in df_processed.columns
        assert 'Balance_Salary_Ratio' in df_processed.columns
        assert 'Balance_Log' in df_processed.columns
        assert 'EstimatedSalary_Log' in df_processed.columns
        
        # Check that encoders work
        assert 'Geography' in label_encoders
        assert 'Gender' in label_encoders
        assert len(label_encoders['Geography'].classes_) <= 3
        assert len(label_encoders['Gender'].classes_) <= 2
    
    def test_preprocess_telecom_features(self, sample_telecom_data):
        """Test feature preprocessing for telecom data"""
        # First convert Churn? to Exited
        sample_telecom_data['Exited'] = (sample_telecom_data['Churn?'] == 'True.').astype(int)
        sample_telecom_data = sample_telecom_data.drop('Churn?', axis=1)
        
        df_processed, label_encoders = preprocess_features(sample_telecom_data)
        
        # Check that categorical variables are encoded
        categorical_cols = ['State', "Int'l Plan", 'VMail Plan']
        for col in categorical_cols:
            if col in df_processed.columns:
                assert df_processed[col].dtype in ['int64', 'int32']
                assert col in label_encoders
        
        # Check that phone numbers are dropped
        assert 'Phone' not in df_processed.columns
        
        # Check that new telecom features are created
        assert 'Avg_Day_Call_Duration' in df_processed.columns
        assert 'Avg_Eve_Call_Duration' in df_processed.columns
        assert 'Avg_Night_Call_Duration' in df_processed.columns
        assert 'Total_Usage_Mins' in df_processed.columns
        assert 'Total_Charges' in df_processed.columns
    
    def test_train_model_bank_data(self, sample_bank_data):
        """Test model training with bank data"""
        df_processed, _ = preprocess_features(sample_bank_data)
        
        feature_columns = [col for col in df_processed.columns if col != 'Exited']
        X = df_processed[feature_columns]
        y = df_processed['Exited']
        
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42
        )
        
        model = train_model(X_train, y_train, n_estimators=10, max_depth=5, random_state=42)
        
        # Check that model is trained
        assert hasattr(model, 'predict')
        assert hasattr(model, 'predict_proba')
        
        # Check predictions
        predictions = model.predict(X_test)
        assert len(predictions) == len(X_test)
        assert all(pred in [0, 1] for pred in predictions)
    
    def test_train_model_telecom_data(self, sample_telecom_data):
        """Test model training with telecom data"""
        # First convert Churn? to Exited
        sample_telecom_data['Exited'] = (sample_telecom_data['Churn?'] == 'True.').astype(int)
        sample_telecom_data = sample_telecom_data.drop('Churn?', axis=1)
        
        df_processed, _ = preprocess_features(sample_telecom_data)
        
        feature_columns = [col for col in df_processed.columns if col != 'Exited']
        X = df_processed[feature_columns]
        y = df_processed['Exited']
        
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42
        )
        
        model = train_model(X_train, y_train, n_estimators=10, max_depth=5, random_state=42)
        
        # Check that model is trained
        assert hasattr(model, 'predict')
        assert hasattr(model, 'predict_proba')
        
        # Check predictions
        predictions = model.predict(X_test)
        assert len(predictions) == len(X_test)
        assert all(pred in [0, 1] for pred in predictions)
    
    def test_evaluate_model(self, sample_bank_data):
        """Test model evaluation"""
        df_processed, _ = preprocess_features(sample_bank_data)
        
        feature_columns = [col for col in df_processed.columns if col != 'Exited']
        X = df_processed[feature_columns]
        y = df_processed['Exited']
        
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42
        )
        
        model = train_model(X_train, y_train, n_estimators=10, max_depth=5, random_state=42)
        metrics = evaluate_model(model, X_test, y_test)
        
        # Check that all metrics are present
        expected_metrics = ['accuracy', 'precision', 'recall', 'f1_score', 'roc_auc']
        for metric in expected_metrics:
            assert metric in metrics
            assert 0 <= metrics[metric] <= 1
    
    def test_data_shapes_bank(self, sample_bank_data):
        """Test data shapes after preprocessing bank data"""
        df_processed, _ = preprocess_features(sample_bank_data)
        
        # Should have more columns after feature engineering
        assert df_processed.shape[1] > sample_bank_data.shape[1]
        
        # Should have same number of rows
        assert df_processed.shape[0] == sample_bank_data.shape[0]
    
    def test_data_shapes_telecom(self, sample_telecom_data):
        """Test data shapes after preprocessing telecom data"""
        # First convert Churn? to Exited
        original_cols = sample_telecom_data.shape[1]
        sample_telecom_data['Exited'] = (sample_telecom_data['Churn?'] == 'True.').astype(int)
        sample_telecom_data = sample_telecom_data.drop('Churn?', axis=1)
        
        df_processed, _ = preprocess_features(sample_telecom_data)
        
        # Should have more columns after feature engineering (considering Phone is dropped)
        assert df_processed.shape[1] > original_cols - 2  # -1 for Phone drop, -1 for Churn? -> Exited
        
        # Should have same number of rows
        assert df_processed.shape[0] == sample_telecom_data.shape[0]
    
    def test_load_and_preprocess_data_nonexistent_file(self):
        """Test load_and_preprocess_data with non-existent file (should create sample data)"""
        df = load_and_preprocess_data('nonexistent_file.csv')
        
        # Should create sample data
        assert isinstance(df, pd.DataFrame)
        assert 'Exited' in df.columns
        assert df.shape[0] > 0
        
        # Should have bank data structure when creating sample data
        expected_bank_cols = ['CreditScore', 'Geography', 'Gender', 'Age', 'Tenure', 
                             'Balance', 'NumOfProducts', 'HasCrCard', 'IsActiveMember', 
                             'EstimatedSalary', 'Exited']
        for col in expected_bank_cols:
            assert col in df.columns

requirements.txt

# requirements.txt
scikit-learn==1.3.0
pandas==2.0.3
numpy==1.24.3
mlflow==2.5.0
azure-ai-ml==1.8.0
azure-identity==1.13.0
azure-storage-blob==12.17.0
joblib==1.3.1
pytest==7.4.0
pytest-cov==4.1.0
flake8==6.0.0