Skip to content

Custom Applications Tutorial

This tutorial demonstrates how to build domain-specific applications using the Quantum Entangled Knowledge Graphs (QE-KGR) library. We'll create custom quantum knowledge systems for specific domains and use cases.

Prerequisites

Before starting this tutorial, ensure you have completed:

Building Domain-Specific Applications

1. Biomedical Knowledge Graph

Let's build a comprehensive biomedical knowledge graph with quantum entanglement:

from qekgr import EntangledGraph, QuantumNode, EntangledEdge
from qekgr import EntangledQueryEngine, QuantumInference
from qekgr.utils import QuantumGraphVisualizer
import numpy as np
from typing import Dict, List, Any

class BiomedicalQuantumGraph:
    """Custom biomedical knowledge graph with quantum enhancements."""

    def __init__(self, hilbert_dim=16):
        self.graph = EntangledGraph(hilbert_dim)
        self.entity_types = {
            'protein': {'color': '#ff6b6b', 'quantum_weight': 0.9},
            'gene': {'color': '#4ecdc4', 'quantum_weight': 0.8},
            'disease': {'color': '#45b7d1', 'quantum_weight': 0.7},
            'drug': {'color': '#96ceb4', 'quantum_weight': 0.8},
            'pathway': {'color': '#ffeaa7', 'quantum_weight': 0.6},
            'tissue': {'color': '#dda0dd', 'quantum_weight': 0.5}
        }
        self.query_engine = None
        self.inference = None

    def add_biomedical_entity(self, entity_id: str, entity_type: str, 
                             properties: Dict[str, Any] = None):
        """Add biomedical entity with quantum properties."""

        if properties is None:
            properties = {}

        # Enhance properties with quantum characteristics
        quantum_props = self.entity_types.get(entity_type, {})

        node = QuantumNode(
            entity_id,
            node_type=entity_type,
            quantum_weight=quantum_props.get('quantum_weight', 0.5),
            biomedical_domain=True,
            **properties
        )

        self.graph.add_node(node)

        # Initialize quantum state based on entity type
        self._initialize_quantum_state(entity_id, entity_type)

    def _initialize_quantum_state(self, entity_id: str, entity_type: str):
        """Initialize quantum state based on biomedical context."""

        # Create domain-specific quantum states
        if entity_type == 'protein':
            # Proteins have complex folding states
            amplitudes = np.random.normal(0.5, 0.1, self.graph.hilbert_dim)
            amplitudes = amplitudes / np.linalg.norm(amplitudes)

        elif entity_type == 'gene':
            # Genes have binary expression states with superposition
            amplitudes = np.zeros(self.graph.hilbert_dim, dtype=complex)
            amplitudes[0] = 0.6  # Expressed state
            amplitudes[1] = 0.8  # Unexpressed state
            amplitudes = amplitudes / np.linalg.norm(amplitudes)

        elif entity_type == 'disease':
            # Diseases have progressive states
            amplitudes = np.exp(-np.linspace(0, 2, self.graph.hilbert_dim))
            amplitudes = amplitudes / np.linalg.norm(amplitudes)

        else:
            # Default quantum state
            amplitudes = np.ones(self.graph.hilbert_dim) / np.sqrt(self.graph.hilbert_dim)

        self.graph.set_node_state(entity_id, amplitudes)

    def add_biomedical_relationship(self, source: str, target: str, 
                                   relation_type: str, confidence: float = 0.8,
                                   evidence_strength: float = 1.0):
        """Add biomedical relationship with quantum entanglement."""

        # Calculate quantum entanglement strength based on biological relevance
        entanglement_strength = self._calculate_biomedical_entanglement(
            source, target, relation_type, confidence, evidence_strength
        )

        edge = EntangledEdge(
            source, target,
            relation=relation_type,
            entanglement_strength=entanglement_strength,
            confidence=confidence,
            evidence_strength=evidence_strength,
            biomedical_relation=True
        )

        self.graph.add_edge(edge)

    def _calculate_biomedical_entanglement(self, source: str, target: str,
                                         relation_type: str, confidence: float,
                                         evidence_strength: float) -> float:
        """Calculate entanglement strength for biomedical relationships."""

        # Base strength from confidence and evidence
        base_strength = confidence * evidence_strength

        # Enhance based on relation type
        relation_modifiers = {
            'interacts_with': 0.9,
            'regulates': 0.8,
            'causes': 0.95,
            'treats': 0.85,
            'associated_with': 0.6,
            'inhibits': 0.8,
            'activates': 0.85,
            'expresses': 0.9,
            'metabolizes': 0.7
        }

        modifier = relation_modifiers.get(relation_type, 0.5)
        final_strength = min(0.99, base_strength * modifier)

        return final_strength

    def load_biomedical_data(self, data_sources: Dict[str, Any]):
        """Load biomedical data from multiple sources."""

        # Example: Load protein-protein interactions
        if 'proteins' in data_sources:
            for protein_data in data_sources['proteins']:
                self.add_biomedical_entity(
                    protein_data['id'],
                    'protein',
                    {
                        'name': protein_data.get('name', ''),
                        'function': protein_data.get('function', ''),
                        'molecular_weight': protein_data.get('molecular_weight', 0),
                        'organism': protein_data.get('organism', 'human')
                    }
                )

        # Load gene data
        if 'genes' in data_sources:
            for gene_data in data_sources['genes']:
                self.add_biomedical_entity(
                    gene_data['id'],
                    'gene',
                    {
                        'symbol': gene_data.get('symbol', ''),
                        'chromosome': gene_data.get('chromosome', ''),
                        'expression_level': gene_data.get('expression_level', 0.5)
                    }
                )

        # Load relationships
        if 'interactions' in data_sources:
            for interaction in data_sources['interactions']:
                self.add_biomedical_relationship(
                    interaction['source'],
                    interaction['target'],
                    interaction['type'],
                    interaction.get('confidence', 0.8),
                    interaction.get('evidence_strength', 1.0)
                )

    def setup_reasoning_engines(self):
        """Initialize query and inference engines."""
        self.query_engine = EntangledQueryEngine(self.graph)
        self.inference = QuantumInference(self.graph)

    def find_drug_targets(self, disease: str, max_results: int = 5):
        """Find potential drug targets for a disease using quantum reasoning."""

        if not self.query_engine:
            self.setup_reasoning_engines()

        # Use quantum walk to find connected proteins
        walk_result = self.inference.quantum_walk(disease, steps=15)

        # Extract high-probability proteins as targets
        targets = []
        for node_id, probability in walk_result.final_distribution.items():
            if (node_id in self.graph.nodes and 
                self.graph.nodes[node_id].node_type == 'protein' and
                probability > 0.1):
                targets.append((node_id, probability))

        # Sort by quantum probability
        targets.sort(key=lambda x: x[1], reverse=True)

        return targets[:max_results]

    def predict_drug_interactions(self, drug1: str, drug2: str):
        """Predict drug-drug interactions using quantum entanglement."""

        if not self.inference:
            self.setup_reasoning_engines()

        # Measure quantum entanglement between drugs
        direct_entanglement = self.graph.measure_entanglement(drug1, drug2)

        # Find common targets/pathways
        drug1_walk = self.inference.quantum_walk(drug1, steps=10)
        drug2_walk = self.inference.quantum_walk(drug2, steps=10)

        # Calculate interaction probability
        common_targets = set(drug1_walk.final_distribution.keys()) & \
                        set(drug2_walk.final_distribution.keys())

        interaction_score = direct_entanglement
        for target in common_targets:
            prob1 = drug1_walk.final_distribution.get(target, 0)
            prob2 = drug2_walk.final_distribution.get(target, 0)
            interaction_score += prob1 * prob2 * 0.5  # Weight common targets

        return {
            'interaction_probability': min(1.0, interaction_score),
            'direct_entanglement': direct_entanglement,
            'common_targets': list(common_targets)
        }

    def analyze_pathway_dysregulation(self, pathway: str, disease: str):
        """Analyze pathway dysregulation in disease context."""

        if not self.inference:
            self.setup_reasoning_engines()

        # Quantum walk from disease to pathway components
        disease_walk = self.inference.quantum_walk(disease, steps=12)

        # Find pathway components affected by disease
        affected_components = []
        for node_id, probability in disease_walk.final_distribution.items():
            if node_id in self.graph.nodes:
                node = self.graph.nodes[node_id]
                if (hasattr(node, 'pathway') and node.pathway == pathway) or \
                   (node.node_type in ['protein', 'gene'] and probability > 0.05):
                    affected_components.append((node_id, probability))

        # Calculate dysregulation score
        dysregulation_score = sum(prob for _, prob in affected_components)

        return {
            'dysregulation_score': dysregulation_score,
            'affected_components': affected_components,
            'pathway_coherence': self._calculate_pathway_coherence(pathway)
        }

    def _calculate_pathway_coherence(self, pathway: str):
        """Calculate quantum coherence within a pathway."""

        # Find all pathway components
        pathway_nodes = [node_id for node_id, node in self.graph.nodes.items()
                        if hasattr(node, 'pathway') and node.pathway == pathway]

        if len(pathway_nodes) < 2:
            return 0.0

        # Calculate average entanglement within pathway
        total_entanglement = 0.0
        pairs = 0

        for i, node1 in enumerate(pathway_nodes):
            for node2 in pathway_nodes[i+1:]:
                entanglement = self.graph.measure_entanglement(node1, node2)
                total_entanglement += entanglement
                pairs += 1

        return total_entanglement / pairs if pairs > 0 else 0.0

# Example usage of BiomedicalQuantumGraph
bio_graph = BiomedicalQuantumGraph(hilbert_dim=20)

# Sample biomedical data
biomedical_data = {
    'proteins': [
        {'id': 'p53', 'name': 'Tumor protein p53', 'function': 'tumor_suppressor'},
        {'id': 'BRCA1', 'name': 'BRCA1 protein', 'function': 'dna_repair'},
        {'id': 'EGFR', 'name': 'Epidermal growth factor receptor', 'function': 'signal_transduction'},
        {'id': 'MYC', 'name': 'MYC proto-oncogene', 'function': 'transcription_regulation'}
    ],
    'genes': [
        {'id': 'TP53', 'symbol': 'TP53', 'chromosome': '17p13.1'},
        {'id': 'BRCA1_gene', 'symbol': 'BRCA1', 'chromosome': '17q21.31'},
        {'id': 'EGFR_gene', 'symbol': 'EGFR', 'chromosome': '7p11.2'}
    ],
    'diseases': [
        {'id': 'breast_cancer', 'name': 'Breast Cancer'},
        {'id': 'lung_cancer', 'name': 'Lung Cancer'}
    ],
    'drugs': [
        {'id': 'tamoxifen', 'name': 'Tamoxifen', 'type': 'hormone_therapy'},
        {'id': 'gefitinib', 'name': 'Gefitinib', 'type': 'tyrosine_kinase_inhibitor'}
    ],
    'interactions': [
        {'source': 'TP53', 'target': 'p53', 'type': 'expresses', 'confidence': 0.95},
        {'source': 'p53', 'target': 'breast_cancer', 'type': 'associated_with', 'confidence': 0.9},
        {'source': 'BRCA1', 'target': 'breast_cancer', 'type': 'associated_with', 'confidence': 0.95},
        {'source': 'tamoxifen', 'target': 'breast_cancer', 'type': 'treats', 'confidence': 0.8},
        {'source': 'EGFR', 'target': 'lung_cancer', 'type': 'associated_with', 'confidence': 0.85},
        {'source': 'gefitinib', 'target': 'EGFR', 'type': 'inhibits', 'confidence': 0.9}
    ]
}

# Load data and setup
bio_graph.load_biomedical_data(biomedical_data)
bio_graph.setup_reasoning_engines()

print(f"Biomedical graph loaded: {len(bio_graph.graph.nodes)} nodes, {len(bio_graph.graph.edges)} edges")

# Find drug targets for breast cancer
targets = bio_graph.find_drug_targets('breast_cancer')
print("\nPotential drug targets for breast cancer:")
for target, probability in targets:
    print(f"  {target}: {probability:.3f}")

# Predict drug interactions
interaction = bio_graph.predict_drug_interactions('tamoxifen', 'gefitinib')
print(f"\nTamoxifen-Gefitinib interaction prediction:")
print(f"  Interaction probability: {interaction['interaction_probability']:.3f}")
print(f"  Direct entanglement: {interaction['direct_entanglement']:.3f}")

2. Financial Knowledge Graph

Build a quantum-enhanced financial knowledge system:

class FinancialQuantumGraph:
    """Quantum knowledge graph for financial analysis."""

    def __init__(self, hilbert_dim=12):
        self.graph = EntangledGraph(hilbert_dim)
        self.time_series_data = {}
        self.market_state = "normal"  # normal, volatile, crisis

    def add_financial_entity(self, entity_id: str, entity_type: str, 
                           market_data: Dict[str, Any] = None):
        """Add financial entity with market-aware quantum properties."""

        if market_data is None:
            market_data = {}

        node = QuantumNode(
            entity_id,
            node_type=entity_type,
            market_cap=market_data.get('market_cap', 0),
            volatility=market_data.get('volatility', 0.1),
            sector=market_data.get('sector', 'unknown'),
            financial_entity=True
        )

        self.graph.add_node(node)
        self._initialize_financial_quantum_state(entity_id, entity_type, market_data)

    def _initialize_financial_quantum_state(self, entity_id: str, entity_type: str,
                                          market_data: Dict[str, Any]):
        """Initialize quantum state based on financial characteristics."""

        volatility = market_data.get('volatility', 0.1)

        if entity_type == 'stock':
            # Stock states represent price movements and market sentiment
            base_amplitudes = np.random.normal(0, volatility, self.graph.hilbert_dim)

        elif entity_type == 'bond':
            # Bond states are more stable, less quantum uncertainty
            base_amplitudes = np.random.normal(0, volatility * 0.3, self.graph.hilbert_dim)

        elif entity_type == 'commodity':
            # Commodity states show supply/demand fluctuations
            base_amplitudes = np.random.normal(0, volatility * 1.2, self.graph.hilbert_dim)

        elif entity_type == 'currency':
            # Currency states represent exchange rate dynamics
            base_amplitudes = np.random.normal(0, volatility * 0.8, self.graph.hilbert_dim)

        else:
            base_amplitudes = np.random.normal(0, 0.1, self.graph.hilbert_dim)

        # Normalize to valid quantum state
        amplitudes = base_amplitudes / np.linalg.norm(base_amplitudes)
        self.graph.set_node_state(entity_id, amplitudes)

    def add_market_relationship(self, source: str, target: str, 
                              correlation: float, relationship_type: str):
        """Add market relationship with correlation-based entanglement."""

        # Convert correlation to entanglement strength
        entanglement_strength = abs(correlation) * 0.8 + 0.1

        edge = EntangledEdge(
            source, target,
            relation=relationship_type,
            entanglement_strength=entanglement_strength,
            correlation=correlation,
            market_relationship=True
        )

        self.graph.add_edge(edge)

    def update_market_state(self, new_market_state: str):
        """Update market state and adjust quantum properties."""

        self.market_state = new_market_state

        # Adjust quantum states based on market conditions
        volatility_multipliers = {
            'normal': 1.0,
            'volatile': 1.5,
            'crisis': 2.0,
            'bull': 0.8,
            'bear': 1.3
        }

        multiplier = volatility_multipliers.get(new_market_state, 1.0)

        for node_id in self.graph.nodes:
            current_state = self.graph.get_node_state(node_id)

            # Apply market volatility to quantum state
            noise = np.random.normal(0, 0.1 * multiplier, len(current_state))
            new_state = current_state + noise
            new_state = new_state / np.linalg.norm(new_state)

            self.graph.set_node_state(node_id, new_state)

    def calculate_portfolio_risk(self, portfolio: List[str], weights: List[float]):
        """Calculate portfolio risk using quantum entanglement."""

        if len(portfolio) != len(weights):
            raise ValueError("Portfolio and weights must have same length")

        # Normalize weights
        weights = np.array(weights)
        weights = weights / np.sum(weights)

        # Calculate quantum portfolio risk
        total_risk = 0.0

        # Individual asset risks (diagonal terms)
        for i, asset in enumerate(portfolio):
            if asset in self.graph.nodes:
                node = self.graph.nodes[asset]
                individual_volatility = getattr(node, 'volatility', 0.1)
                total_risk += (weights[i] ** 2) * (individual_volatility ** 2)

        # Correlation risks (off-diagonal terms)
        for i, asset1 in enumerate(portfolio):
            for j, asset2 in enumerate(portfolio[i+1:], i+1):
                if asset1 in self.graph.nodes and asset2 in self.graph.nodes:
                    # Use quantum entanglement as correlation measure
                    entanglement = self.graph.measure_entanglement(asset1, asset2)

                    vol1 = getattr(self.graph.nodes[asset1], 'volatility', 0.1)
                    vol2 = getattr(self.graph.nodes[asset2], 'volatility', 0.1)

                    correlation_risk = 2 * weights[i] * weights[j] * vol1 * vol2 * entanglement
                    total_risk += correlation_risk

        return np.sqrt(total_risk)

    def detect_market_anomalies(self):
        """Detect market anomalies using quantum coherence."""

        # Calculate graph coherence
        coherence = self.graph.measure_coherence()

        # Low coherence indicates market disruption/anomalies
        if coherence < 0.3:
            anomaly_level = "high"
        elif coherence < 0.6:
            anomaly_level = "medium"
        else:
            anomaly_level = "low"

        # Find nodes with highest quantum uncertainty
        uncertain_assets = []
        for node_id in self.graph.nodes:
            state = self.graph.get_node_state(node_id)
            uncertainty = np.std(np.abs(state))
            if uncertainty > 0.2:
                uncertain_assets.append((node_id, uncertainty))

        uncertain_assets.sort(key=lambda x: x[1], reverse=True)

        return {
            'anomaly_level': anomaly_level,
            'market_coherence': coherence,
            'uncertain_assets': uncertain_assets[:5]
        }

    def predict_price_movements(self, asset: str, time_horizon: int = 5):
        """Predict price movements using quantum walks."""

        inference = QuantumInference(self.graph)

        # Perform quantum walk to explore price space
        walk_result = inference.quantum_walk(asset, steps=time_horizon)

        # Interpret walk results as price movement probabilities
        connected_assets = []
        for node_id, probability in walk_result.final_distribution.items():
            if node_id != asset and probability > 0.05:
                connected_assets.append((node_id, probability))

        # Calculate momentum based on entangled asset movements
        momentum_score = 0.0
        for connected_asset, probability in connected_assets:
            if connected_asset in self.graph.nodes:
                entanglement = self.graph.measure_entanglement(asset, connected_asset)
                momentum_score += probability * entanglement

        # Predict movement direction
        if momentum_score > 0.3:
            direction = "up"
        elif momentum_score < -0.3:
            direction = "down"
        else:
            direction = "sideways"

        return {
            'predicted_direction': direction,
            'momentum_score': momentum_score,
            'connected_assets': connected_assets[:3]
        }

# Example financial quantum graph
financial_graph = FinancialQuantumGraph(hilbert_dim=15)

# Add financial entities
financial_entities = [
    ('AAPL', 'stock', {'volatility': 0.25, 'sector': 'technology', 'market_cap': 2000000}),
    ('GOOGL', 'stock', {'volatility': 0.28, 'sector': 'technology', 'market_cap': 1500000}),
    ('JPM', 'stock', {'volatility': 0.30, 'sector': 'banking', 'market_cap': 400000}),
    ('GLD', 'commodity', {'volatility': 0.15, 'sector': 'precious_metals'}),
    ('USD_EUR', 'currency', {'volatility': 0.08}),
    ('10Y_TREASURY', 'bond', {'volatility': 0.05})
]

for entity_id, entity_type, market_data in financial_entities:
    financial_graph.add_financial_entity(entity_id, entity_type, market_data)

# Add market relationships
relationships = [
    ('AAPL', 'GOOGL', 0.7, 'sector_correlation'),
    ('AAPL', 'USD_EUR', -0.3, 'currency_impact'),
    ('JPM', '10Y_TREASURY', 0.6, 'interest_rate_sensitivity'),
    ('GLD', 'USD_EUR', -0.5, 'safe_haven'),
    ('AAPL', 'GLD', -0.2, 'risk_off_correlation')
]

for source, target, correlation, rel_type in relationships:
    financial_graph.add_market_relationship(source, target, correlation, rel_type)

print(f"Financial graph: {len(financial_graph.graph.nodes)} nodes, {len(financial_graph.graph.edges)} edges")

# Portfolio risk analysis
portfolio = ['AAPL', 'GOOGL', 'JPM']
weights = [0.5, 0.3, 0.2]
risk = financial_graph.calculate_portfolio_risk(portfolio, weights)
print(f"Portfolio quantum risk: {risk:.3f}")

# Market anomaly detection
anomalies = financial_graph.detect_market_anomalies()
print(f"Market anomaly level: {anomalies['anomaly_level']}")
print(f"Market coherence: {anomalies['market_coherence']:.3f}")

# Price prediction
prediction = financial_graph.predict_price_movements('AAPL')
print(f"AAPL prediction: {prediction['predicted_direction']} (momentum: {prediction['momentum_score']:.3f})")

3. Recommendation System

Build a quantum-enhanced recommendation engine:

class QuantumRecommendationSystem:
    """Quantum-enhanced recommendation system."""

    def __init__(self, hilbert_dim=10):
        self.graph = EntangledGraph(hilbert_dim)
        self.user_profiles = {}
        self.item_features = {}

    def add_user(self, user_id: str, preferences: Dict[str, float],
                demographics: Dict[str, Any] = None):
        """Add user with quantum preference representation."""

        if demographics is None:
            demographics = {}

        # Create user node
        user_node = QuantumNode(
            user_id,
            node_type='user',
            **demographics
        )
        self.graph.add_node(user_node)

        # Store user preferences
        self.user_profiles[user_id] = preferences

        # Initialize quantum state based on preferences
        self._initialize_user_quantum_state(user_id, preferences)

    def add_item(self, item_id: str, features: Dict[str, float],
                categories: List[str] = None):
        """Add item with quantum feature representation."""

        if categories is None:
            categories = []

        # Create item node
        item_node = QuantumNode(
            item_id,
            node_type='item',
            categories=categories
        )
        self.graph.add_node(item_node)

        # Store item features
        self.item_features[item_id] = features

        # Initialize quantum state based on features
        self._initialize_item_quantum_state(item_id, features)

    def _initialize_user_quantum_state(self, user_id: str, preferences: Dict[str, float]):
        """Initialize user quantum state from preferences."""

        # Map preferences to quantum amplitudes
        amplitudes = np.zeros(self.graph.hilbert_dim, dtype=complex)

        for i, (pref_name, pref_value) in enumerate(preferences.items()):
            if i < self.graph.hilbert_dim:
                amplitudes[i] = pref_value

        # Fill remaining dimensions with small random values
        for i in range(len(preferences), self.graph.hilbert_dim):
            amplitudes[i] = np.random.normal(0, 0.1)

        # Normalize
        amplitudes = amplitudes / np.linalg.norm(amplitudes)
        self.graph.set_node_state(user_id, amplitudes)

    def _initialize_item_quantum_state(self, item_id: str, features: Dict[str, float]):
        """Initialize item quantum state from features."""

        # Map features to quantum amplitudes
        amplitudes = np.zeros(self.graph.hilbert_dim, dtype=complex)

        for i, (feature_name, feature_value) in enumerate(features.items()):
            if i < self.graph.hilbert_dim:
                amplitudes[i] = feature_value

        # Fill remaining dimensions
        for i in range(len(features), self.graph.hilbert_dim):
            amplitudes[i] = np.random.normal(0, 0.05)

        # Normalize
        amplitudes = amplitudes / np.linalg.norm(amplitudes)
        self.graph.set_node_state(item_id, amplitudes)

    def add_interaction(self, user_id: str, item_id: str, 
                       interaction_type: str, rating: float):
        """Add user-item interaction with quantum entanglement."""

        # Calculate entanglement strength from rating
        # Higher ratings create stronger entanglement
        if interaction_type == 'like':
            entanglement_strength = min(0.9, rating / 5.0 * 0.8 + 0.1)
        elif interaction_type == 'purchase':
            entanglement_strength = min(0.95, rating / 5.0 * 0.9 + 0.2)
        elif interaction_type == 'view':
            entanglement_strength = min(0.6, rating / 5.0 * 0.4 + 0.1)
        else:
            entanglement_strength = min(0.8, rating / 5.0 * 0.6 + 0.1)

        edge = EntangledEdge(
            user_id, item_id,
            relation=interaction_type,
            entanglement_strength=entanglement_strength,
            rating=rating
        )

        self.graph.add_edge(edge)

    def generate_recommendations(self, user_id: str, num_recommendations: int = 5,
                               filter_seen: bool = True):
        """Generate recommendations using quantum interference."""

        if user_id not in self.graph.nodes:
            return []

        # Perform quantum walk from user
        inference = QuantumInference(self.graph)
        walk_result = inference.quantum_walk(user_id, steps=8)

        # Extract item recommendations
        item_scores = []

        for node_id, probability in walk_result.final_distribution.items():
            if (node_id in self.graph.nodes and 
                self.graph.nodes[node_id].node_type == 'item'):

                # Skip items user has already interacted with
                if filter_seen and (user_id, node_id) in self.graph.edges:
                    continue

                # Calculate quantum recommendation score
                user_state = self.graph.get_node_state(user_id)
                item_state = self.graph.get_node_state(node_id)

                # Quantum interference score
                interference = abs(np.vdot(user_state, item_state))**2

                # Combined score
                final_score = 0.6 * probability + 0.4 * interference

                item_scores.append((node_id, final_score, probability, interference))

        # Sort by score and return top recommendations
        item_scores.sort(key=lambda x: x[1], reverse=True)

        return item_scores[:num_recommendations]

    def find_similar_users(self, user_id: str, num_similar: int = 3):
        """Find similar users using quantum entanglement."""

        if user_id not in self.graph.nodes:
            return []

        user_state = self.graph.get_node_state(user_id)
        similarities = []

        for other_user_id in self.graph.nodes:
            if (other_user_id != user_id and 
                self.graph.nodes[other_user_id].node_type == 'user'):

                other_state = self.graph.get_node_state(other_user_id)

                # Quantum similarity using state overlap
                similarity = abs(np.vdot(user_state, other_state))**2
                similarities.append((other_user_id, similarity))

        similarities.sort(key=lambda x: x[1], reverse=True)
        return similarities[:num_similar]

    def explain_recommendation(self, user_id: str, item_id: str):
        """Explain why an item was recommended using quantum reasoning."""

        if user_id not in self.graph.nodes or item_id not in self.graph.nodes:
            return {"error": "User or item not found"}

        # Calculate quantum similarity
        user_state = self.graph.get_node_state(user_id)
        item_state = self.graph.get_node_state(item_id)
        quantum_similarity = abs(np.vdot(user_state, item_state))**2

        # Find reasoning path through quantum walk
        inference = QuantumInference(self.graph)
        walk_result = inference.quantum_walk(user_id, steps=6)

        # Find intermediate nodes that led to recommendation
        reasoning_path = []
        for node_id, probability in walk_result.final_distribution.items():
            if (node_id != user_id and node_id != item_id and 
                probability > 0.1):

                if node_id in self.graph.nodes:
                    node_type = self.graph.nodes[node_id].node_type
                    reasoning_path.append((node_id, node_type, probability))

        reasoning_path.sort(key=lambda x: x[2], reverse=True)

        # Check for similar users who liked this item
        similar_users = self.find_similar_users(user_id, 3)
        user_connections = []

        for similar_user, similarity in similar_users:
            if (similar_user, item_id) in self.graph.edges:
                edge = self.graph.edges[(similar_user, item_id)]
                user_connections.append((similar_user, similarity, edge.rating))

        return {
            'quantum_similarity': quantum_similarity,
            'reasoning_path': reasoning_path[:3],
            'similar_user_connections': user_connections,
            'recommendation_strength': quantum_similarity * 0.7 + 
                                     len(user_connections) * 0.1
        }

    def update_user_preferences(self, user_id: str, feedback: Dict[str, float]):
        """Update user quantum state based on feedback."""

        if user_id not in self.graph.nodes:
            return

        current_state = self.graph.get_node_state(user_id)

        # Create feedback vector
        feedback_vector = np.zeros(self.graph.hilbert_dim, dtype=complex)
        for i, (item_id, rating) in enumerate(feedback.items()):
            if i < self.graph.hilbert_dim:
                feedback_vector[i] = rating / 5.0  # Normalize to [0,1]

        # Update quantum state using quantum learning rate
        learning_rate = 0.1
        new_state = (1 - learning_rate) * current_state + learning_rate * feedback_vector
        new_state = new_state / np.linalg.norm(new_state)

        self.graph.set_node_state(user_id, new_state)

# Example recommendation system
rec_system = QuantumRecommendationSystem(hilbert_dim=12)

# Add users
users_data = [
    ('user1', {'action': 0.8, 'comedy': 0.3, 'drama': 0.6, 'sci_fi': 0.9}),
    ('user2', {'action': 0.2, 'comedy': 0.9, 'drama': 0.4, 'sci_fi': 0.3}),
    ('user3', {'action': 0.7, 'comedy': 0.5, 'drama': 0.8, 'sci_fi': 0.6})
]

for user_id, preferences in users_data:
    rec_system.add_user(user_id, preferences)

# Add items (movies)
movies_data = [
    ('movie1', {'action': 0.9, 'comedy': 0.1, 'drama': 0.3, 'sci_fi': 0.8}, ['action', 'sci-fi']),
    ('movie2', {'action': 0.1, 'comedy': 0.9, 'drama': 0.2, 'sci_fi': 0.1}, ['comedy']),
    ('movie3', {'action': 0.3, 'comedy': 0.2, 'drama': 0.9, 'sci_fi': 0.4}, ['drama']),
    ('movie4', {'action': 0.6, 'comedy': 0.7, 'drama': 0.5, 'sci_fi': 0.3}, ['action', 'comedy'])
]

for movie_id, features, categories in movies_data:
    rec_system.add_item(movie_id, features, categories)

# Add interactions
interactions = [
    ('user1', 'movie1', 'like', 5.0),
    ('user1', 'movie3', 'like', 4.0),
    ('user2', 'movie2', 'like', 5.0),
    ('user2', 'movie4', 'like', 4.5),
    ('user3', 'movie1', 'like', 4.0),
    ('user3', 'movie3', 'like', 5.0)
]

for user_id, item_id, interaction_type, rating in interactions:
    rec_system.add_interaction(user_id, item_id, interaction_type, rating)

print(f"Recommendation system: {len(rec_system.graph.nodes)} nodes, {len(rec_system.graph.edges)} edges")

# Generate recommendations
recommendations = rec_system.generate_recommendations('user1', 3)
print("\nRecommendations for user1:")
for item_id, score, prob, interference in recommendations:
    print(f"  {item_id}: score={score:.3f}, probability={prob:.3f}, interference={interference:.3f}")

# Explain recommendation
explanation = rec_system.explain_recommendation('user1', 'movie2')
print(f"\nExplanation for recommending movie2 to user1:")
print(f"  Quantum similarity: {explanation['quantum_similarity']:.3f}")
print(f"  Recommendation strength: {explanation['recommendation_strength']:.3f}")

# Find similar users
similar_users = rec_system.find_similar_users('user1', 2)
print(f"\nUsers similar to user1:")
for similar_user, similarity in similar_users:
    print(f"  {similar_user}: similarity={similarity:.3f}")

Integration and Deployment

Creating Production-Ready Applications

import logging
import json
from typing import Optional
from datetime import datetime

class ProductionQuantumGraph:
    """Production-ready quantum graph with monitoring and persistence."""

    def __init__(self, config_file: str):
        self.config = self._load_config(config_file)
        self.graph = EntangledGraph(self.config['hilbert_dim'])
        self.query_engine = None
        self.inference = None

        # Setup logging
        self._setup_logging()

        # Performance monitoring
        self.performance_metrics = {
            'query_count': 0,
            'avg_query_time': 0.0,
            'total_queries': 0,
            'error_count': 0
        }

    def _load_config(self, config_file: str) -> Dict[str, Any]:
        """Load configuration from file."""
        with open(config_file, 'r') as f:
            return json.load(f)

    def _setup_logging(self):
        """Setup application logging."""
        logging.basicConfig(
            level=getattr(logging, self.config.get('log_level', 'INFO')),
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler(self.config.get('log_file', 'quantum_graph.log')),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)

    def initialize_engines(self):
        """Initialize query and inference engines."""
        try:
            self.query_engine = EntangledQueryEngine(self.graph)
            self.inference = QuantumInference(self.graph)
            self.logger.info("Quantum engines initialized successfully")
        except Exception as e:
            self.logger.error(f"Failed to initialize engines: {e}")
            raise

    def health_check(self) -> Dict[str, Any]:
        """Perform system health check."""
        try:
            health_status = {
                'timestamp': datetime.now().isoformat(),
                'graph_nodes': len(self.graph.nodes),
                'graph_edges': len(self.graph.edges),
                'coherence': self.graph.measure_coherence(),
                'engines_ready': self.query_engine is not None,
                'performance_metrics': self.performance_metrics.copy()
            }

            # Check for potential issues
            issues = []
            if health_status['coherence'] < 0.3:
                issues.append("Low quantum coherence")

            if self.performance_metrics['error_count'] > 10:
                issues.append("High error rate")

            health_status['issues'] = issues
            health_status['status'] = 'healthy' if not issues else 'warning'

            return health_status

        except Exception as e:
            self.logger.error(f"Health check failed: {e}")
            return {
                'status': 'error',
                'error': str(e),
                'timestamp': datetime.now().isoformat()
            }

    def safe_query(self, query: str, max_results: int = 5) -> Dict[str, Any]:
        """Safe query execution with error handling and monitoring."""
        start_time = time.time()

        try:
            if not self.query_engine:
                raise RuntimeError("Query engine not initialized")

            # Execute query
            results = self.query_engine.query(query, max_results)

            # Update metrics
            query_time = time.time() - start_time
            self._update_performance_metrics(query_time, success=True)

            # Format results
            formatted_results = []
            for result in results:
                formatted_results.append({
                    'answer_nodes': result.answer_nodes,
                    'confidence_score': result.confidence_score,
                    'reasoning_path': result.reasoning_path
                })

            self.logger.info(f"Query completed: '{query}' -> {len(results)} results in {query_time:.3f}s")

            return {
                'status': 'success',
                'query': query,
                'results': formatted_results,
                'execution_time': query_time,
                'timestamp': datetime.now().isoformat()
            }

        except Exception as e:
            query_time = time.time() - start_time
            self._update_performance_metrics(query_time, success=False)

            self.logger.error(f"Query failed: '{query}' -> {e}")

            return {
                'status': 'error',
                'query': query,
                'error': str(e),
                'execution_time': query_time,
                'timestamp': datetime.now().isoformat()
            }

    def _update_performance_metrics(self, query_time: float, success: bool):
        """Update performance metrics."""
        self.performance_metrics['total_queries'] += 1

        if success:
            self.performance_metrics['query_count'] += 1

            # Update average query time
            current_avg = self.performance_metrics['avg_query_time']
            total_successful = self.performance_metrics['query_count']

            new_avg = ((current_avg * (total_successful - 1)) + query_time) / total_successful
            self.performance_metrics['avg_query_time'] = new_avg
        else:
            self.performance_metrics['error_count'] += 1

    def save_graph(self, filepath: str):
        """Save graph state to file."""
        try:
            graph_data = {
                'nodes': {node_id: {
                    'properties': vars(node),
                    'quantum_state': self.graph.get_node_state(node_id).tolist()
                } for node_id, node in self.graph.nodes.items()},
                'edges': {f"{source}->{target}": {
                    'properties': vars(edge)
                } for (source, target), edge in self.graph.edges.items()},
                'hilbert_dim': self.graph.hilbert_dim,
                'timestamp': datetime.now().isoformat()
            }

            with open(filepath, 'w') as f:
                json.dump(graph_data, f, indent=2, default=str)

            self.logger.info(f"Graph saved to {filepath}")

        except Exception as e:
            self.logger.error(f"Failed to save graph: {e}")
            raise

    def load_graph(self, filepath: str):
        """Load graph state from file."""
        try:
            with open(filepath, 'r') as f:
                graph_data = json.load(f)

            # Recreate graph
            self.graph = EntangledGraph(graph_data['hilbert_dim'])

            # Restore nodes
            for node_id, node_data in graph_data['nodes'].items():
                properties = node_data['properties']
                quantum_state = np.array(node_data['quantum_state'])

                node = QuantumNode(node_id, **properties)
                self.graph.add_node(node)
                self.graph.set_node_state(node_id, quantum_state)

            # Restore edges
            for edge_key, edge_data in graph_data['edges'].items():
                source, target = edge_key.split('->')
                properties = edge_data['properties']

                edge = EntangledEdge(source, target, **properties)
                self.graph.add_edge(edge)

            self.logger.info(f"Graph loaded from {filepath}")

        except Exception as e:
            self.logger.error(f"Failed to load graph: {e}")
            raise

# Example configuration file (config.json)
config_example = {
    "hilbert_dim": 16,
    "log_level": "INFO",
    "log_file": "quantum_app.log",
    "cache_size": 1000,
    "max_query_time": 30.0,
    "coherence_threshold": 0.3
}

# Save example config
with open('config.json', 'w') as f:
    json.dump(config_example, f, indent=2)

# Example production application
print("Production quantum graph application initialized!")
print("Features:")
print("- Configuration management")
print("- Comprehensive logging")
print("- Performance monitoring") 
print("- Health checks")
print("- Safe query execution")
print("- Graph persistence")

Testing and Validation

Quantum Graph Testing Framework

import unittest
import numpy as np
from unittest.mock import Mock, patch

class QuantumGraphTestCase(unittest.TestCase):
    """Base test case for quantum graph applications."""

    def setUp(self):
        """Set up test fixtures."""
        self.graph = EntangledGraph(hilbert_dim=8)
        self.test_nodes = [
            QuantumNode("node1", node_type="test"),
            QuantumNode("node2", node_type="test"),
            QuantumNode("node3", node_type="test")
        ]

        for node in self.test_nodes:
            self.graph.add_node(node)

    def tearDown(self):
        """Clean up after tests."""
        self.graph = None

    def assert_quantum_state_valid(self, node_id):
        """Assert that a node's quantum state is valid."""
        state = self.graph.get_node_state(node_id)

        # Check normalization
        norm = np.linalg.norm(state)
        self.assertAlmostEqual(norm, 1.0, places=6, 
                              msg=f"Node {node_id} state not normalized")

        # Check for NaN values
        self.assertFalse(np.any(np.isnan(state)), 
                        msg=f"Node {node_id} contains NaN values")

        # Check dimension
        self.assertEqual(len(state), self.graph.hilbert_dim,
                        msg=f"Node {node_id} state dimension mismatch")

    def assert_entanglement_valid(self, source, target):
        """Assert that entanglement between nodes is valid."""
        entanglement = self.graph.measure_entanglement(source, target)

        self.assertGreaterEqual(entanglement, 0.0,
                               msg="Entanglement cannot be negative")
        self.assertLessEqual(entanglement, 1.0,
                            msg="Entanglement cannot exceed 1.0")

    def assert_coherence_maintained(self, min_coherence=0.1):
        """Assert that graph maintains quantum coherence."""
        coherence = self.graph.measure_coherence()
        self.assertGreaterEqual(coherence, min_coherence,
                               msg=f"Graph coherence too low: {coherence}")

class TestBiomedicalGraph(QuantumGraphTestCase):
    """Test biomedical quantum graph functionality."""

    def setUp(self):
        super().setUp()
        self.bio_graph = BiomedicalQuantumGraph(hilbert_dim=8)

    def test_biomedical_entity_creation(self):
        """Test creating biomedical entities."""
        self.bio_graph.add_biomedical_entity("test_protein", "protein")

        self.assertIn("test_protein", self.bio_graph.graph.nodes)
        self.assert_quantum_state_valid("test_protein")

    def test_drug_target_prediction(self):
        """Test drug target prediction functionality."""
        # Add test data
        self.bio_graph.add_biomedical_entity("disease1", "disease")
        self.bio_graph.add_biomedical_entity("protein1", "protein")
        self.bio_graph.add_biomedical_relationship("protein1", "disease1", 
                                                  "associated_with", 0.8)

        targets = self.bio_graph.find_drug_targets("disease1", 1)

        self.assertIsInstance(targets, list)
        self.assertGreater(len(targets), 0)

    def test_biomedical_entanglement_calculation(self):
        """Test biomedical-specific entanglement calculation."""
        strength = self.bio_graph._calculate_biomedical_entanglement(
            "protein1", "disease1", "causes", 0.9, 1.0
        )

        self.assertGreater(strength, 0.0)
        self.assertLess(strength, 1.0)

class TestFinancialGraph(QuantumGraphTestCase):
    """Test financial quantum graph functionality."""

    def setUp(self):
        super().setUp()
        self.financial_graph = FinancialQuantumGraph(hilbert_dim=8)

    def test_financial_entity_creation(self):
        """Test creating financial entities."""
        self.financial_graph.add_financial_entity(
            "TEST_STOCK", "stock", 
            {"volatility": 0.2, "market_cap": 1000000}
        )

        self.assertIn("TEST_STOCK", self.financial_graph.graph.nodes)
        self.assert_quantum_state_valid("TEST_STOCK")

    def test_portfolio_risk_calculation(self):
        """Test portfolio risk calculation."""
        # Add test assets
        assets = ["ASSET1", "ASSET2"]
        for asset in assets:
            self.financial_graph.add_financial_entity(
                asset, "stock", {"volatility": 0.1}
            )

        risk = self.financial_graph.calculate_portfolio_risk(assets, [0.5, 0.5])

        self.assertIsInstance(risk, float)
        self.assertGreater(risk, 0.0)

    def test_market_state_update(self):
        """Test market state updates."""
        # Add test asset
        self.financial_graph.add_financial_entity("TEST", "stock")
        initial_state = self.financial_graph.graph.get_node_state("TEST").copy()

        # Update market state
        self.financial_graph.update_market_state("crisis")

        updated_state = self.financial_graph.graph.get_node_state("TEST")

        # States should be different after market update
        self.assertFalse(np.allclose(initial_state, updated_state, atol=1e-6))

class TestRecommendationSystem(QuantumGraphTestCase):
    """Test quantum recommendation system."""

    def setUp(self):
        super().setUp()
        self.rec_system = QuantumRecommendationSystem(hilbert_dim=8)

    def test_user_addition(self):
        """Test adding users to recommendation system."""
        preferences = {"action": 0.8, "comedy": 0.3}
        self.rec_system.add_user("test_user", preferences)

        self.assertIn("test_user", self.rec_system.graph.nodes)
        self.assert_quantum_state_valid("test_user")

    def test_recommendation_generation(self):
        """Test recommendation generation."""
        # Add test data
        self.rec_system.add_user("user1", {"action": 0.8})
        self.rec_system.add_item("item1", {"action": 0.9})
        self.rec_system.add_interaction("user1", "item1", "like", 5.0)

        # Add another item to recommend
        self.rec_system.add_item("item2", {"action": 0.7})

        recommendations = self.rec_system.generate_recommendations("user1", 1)

        self.assertIsInstance(recommendations, list)
        if len(recommendations) > 0:
            self.assertIsInstance(recommendations[0], tuple)
            self.assertEqual(len(recommendations[0]), 4)  # (item_id, score, prob, interference)

# Performance testing
class TestQuantumPerformance(unittest.TestCase):
    """Test quantum graph performance characteristics."""

    def test_large_graph_performance(self):
        """Test performance with large graphs."""
        import time

        graph = EntangledGraph(hilbert_dim=10)

        # Add many nodes
        start_time = time.time()
        for i in range(100):
            node = QuantumNode(f"node_{i}", node_type="test")
            graph.add_node(node)
        node_creation_time = time.time() - start_time

        # Add many edges
        start_time = time.time()
        for i in range(50):
            edge = EntangledEdge(f"node_{i}", f"node_{i+1}", 
                               relation="connects", entanglement_strength=0.5)
            graph.add_edge(edge)
        edge_creation_time = time.time() - start_time

        # Test coherence measurement
        start_time = time.time()
        coherence = graph.measure_coherence()
        coherence_time = time.time() - start_time

        # Performance assertions
        self.assertLess(node_creation_time, 1.0, "Node creation too slow")
        self.assertLess(edge_creation_time, 1.0, "Edge creation too slow")
        self.assertLess(coherence_time, 5.0, "Coherence measurement too slow")

        print(f"\nPerformance Results:")
        print(f"  Node creation (100): {node_creation_time:.3f}s")
        print(f"  Edge creation (50): {edge_creation_time:.3f}s")
        print(f"  Coherence measurement: {coherence_time:.3f}s")

# Run tests
if __name__ == "__main__":
    # Create test suite
    suite = unittest.TestSuite()

    # Add test cases
    suite.addTest(unittest.makeSuite(TestBiomedicalGraph))
    suite.addTest(unittest.makeSuite(TestFinancialGraph))
    suite.addTest(unittest.makeSuite(TestRecommendationSystem))
    suite.addTest(unittest.makeSuite(TestQuantumPerformance))

    # Run tests
    runner = unittest.TextTestRunner(verbosity=2)
    result = runner.run(suite)

    print(f"\nTest Results:")
    print(f"  Tests run: {result.testsRun}")
    print(f"  Failures: {len(result.failures)}")
    print(f"  Errors: {len(result.errors)}")
    print(f"  Success rate: {((result.testsRun - len(result.failures) - len(result.errors)) / result.testsRun * 100):.1f}%")

Congratulations! You've mastered building custom quantum knowledge graph applications! 🎉⚛️

This tutorial covered:

  • Domain-Specific Applications: Biomedical, financial, and recommendation systems
  • Production Deployment: Configuration, logging, monitoring, and persistence
  • Testing Framework: Comprehensive testing for quantum applications
  • Performance Optimization: Large-scale graph handling and monitoring

You're now equipped to build sophisticated quantum knowledge systems for any domain! Explore the Use Cases section for real-world implementations and best practices.