Untitled

 avatar
unknown
plain_text
a month ago
8.3 kB
3
Indexable
import streamlit as st
import pandas as pd
import re

st.title("🔧 Network Config Compliance Checker")

# --- Upload Config Files ---
uploaded_files = st.file_uploader("📂 Upload Config Files", type=["txt"], accept_multiple_files=True)

# --- Advanced Policy Section ---
st.subheader("🛠️ Define Compliance Policies")
if "advanced_policies" not in st.session_state:
    st.session_state.advanced_policies = []

with st.expander("➕ Create New Policy"):
    policy_name = st.text_input("Policy Name (e.g., Template Check)")

    policy_scope = st.selectbox("🔍 Policy Scope", ["Interface Configuration", "General Configuration", "Both"])

    if "advanced_conditions" not in st.session_state:
        st.session_state.advanced_conditions = []

    if st.button("➕ Add Condition"):
        st.session_state.advanced_conditions.append({"condition": "Exists", "keyword": "", "dependent_on": None})

    for idx, condition in enumerate(st.session_state.advanced_conditions):
        cols = st.columns([0.3, 0.3, 0.2, 0.2])
        condition["condition"] = cols[0].selectbox(f"Condition {idx+1}", ["Exists", "Does Not Exist"], key=f"cond_{idx}")
        condition["keyword"] = cols[1].text_input("Keyword / Command", key=f"kw_{idx}")

        # Dependency selection
        condition["dependent_on"] = cols[2].selectbox(
            "Depends On",
            ["None"] + [c["keyword"] for c in st.session_state.advanced_conditions if c["keyword"]],
            key=f"dep_{idx}",
        )

        if cols[3].button("❌ Remove", key=f"del_{idx}"):
            st.session_state.advanced_conditions.pop(idx)

    compliance_action_adv = st.selectbox("🔍 Compliance Action", ["Compliant", "Non-Compliant", "Warning"])

    if st.button("✅ Add Policy"):
        if policy_name and st.session_state.advanced_conditions:
            conditions = [{"condition": cond["condition"], "keyword": cond["keyword"], "dependent_on": cond["dependent_on"]}
                          for cond in st.session_state.advanced_conditions]
            st.session_state.advanced_policies.append({
                "name": policy_name,
                "scope": policy_scope,
                "conditions": conditions,
                "action": compliance_action_adv
            })
            st.session_state.advanced_conditions = []  
            st.success("Policy Added!")

# --- Display & Delete Policies ---
st.subheader("📋 Active Policies")
if st.session_state.advanced_policies:
    for idx, policy in enumerate(st.session_state.advanced_policies):
        with st.container():
            st.write(f"**🔹 {policy['name']} → {policy['action']} ({policy['scope']})**")
            for cond in policy["conditions"]:
                dep_info = f" (Depends on: {cond['dependent_on']})" if cond["dependent_on"] else ""
                st.write(f"   - {cond['condition']} '{cond['keyword']}'{dep_info}")
            if st.button(f"🗑️ Delete '{policy['name']}'", key=f"del_policy_{idx}"):
                st.session_state.advanced_policies.pop(idx)
                st.rerun()
else:
    st.info("No policies defined yet. Add a new one above.")

# --- Helper Function to Evaluate Conditions ---
def evaluate_conditions(config_text, conditions, template_blocks=None):
    conditions_met = True
    missing_conditions = []
    dependency_status = {}
    current_scope = config_text  # Default to full config if no template

    for cond in conditions:
        # Check if dependency is a template
        if cond["dependent_on"] and template_blocks:
            for template_name, template_config in template_blocks:
                if template_name.strip() == f"template {cond['dependent_on']}":
                    current_scope = template_config  # Search within the dependent template block
                    break
            else:
                current_scope = config_text  # Reset to full config if dependency not found

        exists = bool(re.search(cond["keyword"], current_scope, re.DOTALL))

        # Check dependencies
        if cond["dependent_on"] and cond["dependent_on"] in dependency_status:
            if not dependency_status[cond["dependent_on"]]:
                conditions_met = False
                missing_conditions.append(f"Dependency failed: {cond['dependent_on']}")
                continue  # Skip actual checking

        if cond["condition"] == "Exists" and not exists:
            conditions_met = False
            missing_conditions.append(cond["keyword"])
        elif cond["condition"] == "Does Not Exist" and exists:
            conditions_met = False
            missing_conditions.append(f"NOT {cond['keyword']}")

        dependency_status[cond["keyword"]] = exists  # Store result for dependencies

    return conditions_met, missing_conditions

# --- Compliance Check ---
if st.button("🚀 Run Compliance Check"):
    if not uploaded_files:
        st.warning("⚠️ Please upload config files first.")
    else:
        results = []

        for uploaded_file in uploaded_files:
            config_text = uploaded_file.read().decode("utf-8")
            switch_name = uploaded_file.name.split(".")[0]

            # Parse template blocks
            template_blocks = re.findall(r"(template \S+)(.*?)(?=template \S+|$)", config_text, re.DOTALL)

            # --- General Config Compliance Check ---
            for policy in st.session_state.advanced_policies:
                if policy["scope"] in ["General Configuration", "Both"]:
                    conditions_met, missing_conditions = evaluate_conditions(
                        config_text, 
                        policy["conditions"], 
                        template_blocks=template_blocks  # Pass template blocks
                    )

                    if conditions_met:
                        status = policy["action"]  # Compliant
                    elif missing_conditions:
                        status = "Non-Compliant"
                    else:
                        status = "Non-Compliant"  # If no conditions met

                    results.append({
                        "Switch": switch_name,
                        "Interface": "N/A (General Config)",
                        "Policy": policy["name"],
                        "Status": status
                    })

            # --- Interface Config Compliance Check ---
            interface_blocks = re.findall(r"(interface \S+)(.*?)(?=interface \S+|$)", config_text, re.DOTALL)
            processed_interfaces = set()  # Track processed interfaces

            for interface, config in interface_blocks:
                interface_name = interface.split()[1]

                # Skip processing if the interface has already been processed
                if interface_name in processed_interfaces:
                    continue

                # Add the interface to the processed set
                processed_interfaces.add(interface_name)

                for policy in st.session_state.advanced_policies:
                    if policy["scope"] in ["Interface Configuration", "Both"]:
                        conditions_met, missing_conditions = evaluate_conditions(
                            config, 
                            policy["conditions"], 
                            template_blocks=template_blocks  # Pass template blocks
                        )

                        # Final Status Evaluation for Interface
                        if conditions_met:
                            status = policy["action"]  # Compliant
                        elif re.search(r"shutdown", config, re.DOTALL):  
                            status = "Shutdown"  # Interface is shut down
                        else:
                            status = "Infrastructure Interface"  # None of the conditions exist

                        results.append({
                            "Switch": switch_name,
                            "Interface": interface_name,
                            "Policy": policy["name"],
                            "Status": status
                        })

        # Convert to DataFrame and Display
        df = pd.DataFrame(results)
        st.write(df)

        # Download Report Button
        st.download_button("📥 Download Report", df.to_csv(index=False), "compliance_report.csv", "text/csv")
Editor is loading...
Leave a Comment