Skip to content

Performance Optimization Guide

This guide provides comprehensive strategies for optimizing quantum data embedding performance across different backends and use cases.

Circuit-Level Optimizations

Gate Count Reduction

from quantum_data_embedding_suite.embeddings import AngleEmbedding
from quantum_data_embedding_suite.utils import circuit_optimizer
import numpy as np

# Create embedding with optimization enabled
embedding = AngleEmbedding(
    n_qubits=6,
    optimize_circuits=True,
    optimization_level=3
)

# Optimize existing circuit
def optimize_embedding_circuit(embedding, data):
    """Optimize embedding circuit for given data"""

    # Create base circuit
    circuit = embedding.create_circuit(data)

    # Apply optimization passes
    optimized = circuit_optimizer.optimize(
        circuit,
        passes=[
            'remove_barriers',
            'merge_rotations',
            'eliminate_zero_gates',
            'commute_through_cnots',
            'optimize_1q_gates'
        ]
    )

    print(f"Original depth: {circuit.depth()}")
    print(f"Optimized depth: {optimized.depth()}")
    print(f"Gate count reduction: {circuit.count_ops()} -> {optimized.count_ops()}")

    return optimized

# Example optimization
X = np.random.randn(10, 6)
for x in X[:3]:
    optimized_circuit = optimize_embedding_circuit(embedding, x)

Parallel Circuit Execution

from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
import time

def parallel_embedding_execution(embedding, data_batch, backend, max_workers=4):
    """Execute multiple embeddings in parallel"""

    def execute_single(data_point):
        """Execute single embedding"""
        circuit = embedding.create_circuit(data_point)
        result = backend.execute(circuit)
        return result

    start_time = time.time()

    # Parallel execution
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        results = list(executor.map(execute_single, data_batch))

    execution_time = time.time() - start_time

    print(f"Parallel execution of {len(data_batch)} circuits: {execution_time:.2f}s")
    print(f"Average time per circuit: {execution_time/len(data_batch):.3f}s")

    return results

# Test parallel execution
X_batch = np.random.randn(20, 6)
parallel_results = parallel_embedding_execution(
    embedding, X_batch, backend, max_workers=8
)

Backend-Specific Optimizations

Qiskit Optimization Strategies

from qiskit.compiler import transpile
from qiskit.transpiler import PassManager
from qiskit.transpiler.passes import *

def create_optimized_qiskit_backend():
    """Create highly optimized Qiskit backend"""

    # Custom optimization pass manager
    optimization_passes = PassManager([
        # Layout optimization
        TrivialLayout(),
        FullAncillaAllocation(),
        EnlargeWithAncilla(),

        # Gate optimization
        Unroller(['u1', 'u2', 'u3', 'cx']),
        BasisTranslator(equivalence_library=qiskit.circuit.equivalence_library),
        Optimize1qGates(),
        CXCancellation(),

        # Routing optimization
        LookaheadSwap(coupling_map),

        # Final cleanup
        RemoveBarriers(),
        Depth(),
        FixedPoint('depth'),
        RemoveFinalMeasurements()
    ])

    backend = QiskitBackend(
        device_name="aer_simulator",
        shots=1024,
        optimization_level=0,  # We handle optimization manually
        custom_passes=optimization_passes
    )

    return backend

optimized_qiskit = create_optimized_qiskit_backend()

PennyLane Performance Tuning

import pennylane as qml
from pennylane.optimize import AdamOptimizer, GradientDescentOptimizer

def create_optimized_pennylane_backend():
    """Create optimized PennyLane backend with performance tuning"""

    # Use GPU if available
    try:
        device = qml.device('lightning.gpu', wires=8)
        print("Using Lightning GPU backend")
    except:
        device = qml.device('lightning.qubit', wires=8)
        print("Using Lightning CPU backend")

    # Configure for performance
    backend = PennyLaneBackend(
        device=device,
        interface='autograd',  # Fastest interface for gradients
        diff_method='best',    # Automatic differentiation method selection
        grad_on_execution=True # Compute gradients during execution
    )

    return backend

optimized_pennylane = create_optimized_pennylane_backend()

Memory and Computational Efficiency

Batch Processing Strategies

class BatchProcessor:
    """Efficient batch processing for quantum embeddings"""

    def __init__(self, embedding, backend, batch_size=32):
        self.embedding = embedding
        self.backend = backend
        self.batch_size = batch_size
        self.results_cache = {}

    def process_dataset(self, X, use_cache=True, show_progress=True):
        """Process entire dataset efficiently"""
        from tqdm import tqdm

        n_samples = len(X)
        n_batches = (n_samples + self.batch_size - 1) // self.batch_size

        all_results = []

        if show_progress:
            pbar = tqdm(total=n_batches, desc="Processing batches")

        for i in range(n_batches):
            start_idx = i * self.batch_size
            end_idx = min((i + 1) * self.batch_size, n_samples)
            batch = X[start_idx:end_idx]

            # Check cache first
            batch_results = []
            uncached_data = []
            uncached_indices = []

            for j, data_point in enumerate(batch):
                data_hash = hash(data_point.tobytes())

                if use_cache and data_hash in self.results_cache:
                    batch_results.append(self.results_cache[data_hash])
                else:
                    uncached_data.append(data_point)
                    uncached_indices.append(j)

            # Process uncached data
            if uncached_data:
                uncached_results = self._process_batch(uncached_data)

                # Update cache and results
                for idx, result in zip(uncached_indices, uncached_results):
                    data_hash = hash(batch[idx].tobytes())
                    if use_cache:
                        self.results_cache[data_hash] = result
                    batch_results.insert(idx, result)

            all_results.extend(batch_results)

            if show_progress:
                pbar.update(1)

        if show_progress:
            pbar.close()

        return all_results

    def _process_batch(self, batch):
        """Process a single batch"""
        # Create circuits for entire batch
        circuits = [self.embedding.create_circuit(x) for x in batch]

        # Execute batch
        if hasattr(self.backend, 'execute_batch'):
            results = self.backend.execute_batch(circuits)
        else:
            # Fallback to sequential execution
            results = [self.backend.execute(circuit) for circuit in circuits]

        return results

# Use batch processor
processor = BatchProcessor(embedding, optimized_qiskit, batch_size=64)
X_large = np.random.randn(1000, 6)
batch_results = processor.process_dataset(X_large, use_cache=True)

Memory-Efficient Kernel Computation

class MemoryEfficientKernel:
    """Memory-efficient kernel computation for large datasets"""

    def __init__(self, embedding, chunk_size=100):
        self.embedding = embedding
        self.chunk_size = chunk_size

    def compute_kernel_matrix_chunked(self, X, symmetric=True):
        """Compute kernel matrix in chunks to save memory"""
        n_samples = len(X)

        # Pre-allocate result matrix
        K = np.zeros((n_samples, n_samples))

        # Process in chunks
        for i in range(0, n_samples, self.chunk_size):
            i_end = min(i + self.chunk_size, n_samples)

            for j in range(0, n_samples, self.chunk_size):
                j_end = min(j + self.chunk_size, n_samples)

                # Skip lower triangle if symmetric
                if symmetric and j > i:
                    continue

                # Compute chunk
                X_i = X[i:i_end]
                X_j = X[j:j_end]

                K_chunk = self._compute_kernel_chunk(X_i, X_j)
                K[i:i_end, j:j_end] = K_chunk

                # Fill symmetric part
                if symmetric and i != j:
                    K[j:j_end, i:i_end] = K_chunk.T

        return K

    def _compute_kernel_chunk(self, X_i, X_j):
        """Compute kernel for a chunk pair"""
        n_i, n_j = len(X_i), len(X_j)
        K_chunk = np.zeros((n_i, n_j))

        for i, x_i in enumerate(X_i):
            for j, x_j in enumerate(X_j):
                K_chunk[i, j] = self._kernel_element(x_i, x_j)

        return K_chunk

    def _kernel_element(self, x_i, x_j):
        """Compute single kernel element"""
        # Create circuits
        circuit_i = self.embedding.create_circuit(x_i)
        circuit_j = self.embedding.create_circuit(x_j)

        # Compute fidelity
        fidelity = self._compute_fidelity(circuit_i, circuit_j)

        return fidelity

    def _compute_fidelity(self, circuit_i, circuit_j):
        """Compute fidelity between two circuits"""
        # This would use the actual fidelity computation
        # For now, return a placeholder
        return np.random.rand()

# Use memory-efficient kernel
efficient_kernel = MemoryEfficientKernel(embedding, chunk_size=50)
X_large = np.random.randn(500, 6)
K_large = efficient_kernel.compute_kernel_matrix_chunked(X_large)

GPU Acceleration

CUDA-Accelerated Backends

def setup_gpu_acceleration():
    """Setup GPU acceleration for quantum simulations"""

    # Check GPU availability
    import torch
    if torch.cuda.is_available():
        print(f"CUDA available: {torch.cuda.get_device_name()}")
        print(f"Memory: {torch.cuda.get_device_properties(0).total_memory / 1e9:.1f} GB")

        # Qiskit GPU backend
        try:
            from qiskit_aer import AerSimulator
            gpu_backend = AerSimulator(method='statevector', device='GPU')
            print("Qiskit GPU backend available")
            return gpu_backend
        except ImportError:
            print("Qiskit Aer GPU not available")

    # PennyLane GPU backend
    try:
        import pennylane as qml
        gpu_device = qml.device('lightning.gpu', wires=10)
        print("PennyLane GPU backend available")
        return gpu_device
    except:
        print("PennyLane GPU not available")

    # Fallback to CPU
    print("Using CPU backend")
    return None

gpu_backend = setup_gpu_acceleration()

Multi-GPU Scaling

import multiprocessing as mp
from functools import partial

def multi_gpu_kernel_computation(X, embedding, n_gpus=2):
    """Distribute kernel computation across multiple GPUs"""

    n_samples = len(X)
    chunk_size = n_samples // n_gpus

    # Split data across GPUs
    data_chunks = []
    for i in range(n_gpus):
        start_idx = i * chunk_size
        end_idx = min((i + 1) * chunk_size, n_samples)
        data_chunks.append(X[start_idx:end_idx])

    # Define worker function
    def compute_chunk_kernel(chunk_data, gpu_id):
        """Compute kernel for data chunk on specific GPU"""
        import torch
        torch.cuda.set_device(gpu_id)

        # Setup GPU backend for this process
        gpu_embedding = embedding.clone()
        gpu_embedding.set_backend(setup_gpu_backend(gpu_id))

        # Compute kernel matrix for chunk
        kernel = MemoryEfficientKernel(gpu_embedding)
        K_chunk = kernel.compute_kernel_matrix_chunked(chunk_data)

        return K_chunk

    # Execute in parallel
    with mp.Pool(processes=n_gpus) as pool:
        chunk_kernels = pool.starmap(
            compute_chunk_kernel,
            [(chunk, i) for i, chunk in enumerate(data_chunks)]
        )

    # Combine results
    K_combined = np.block([[chunk_kernels[i] for i in range(n_gpus)]])

    return K_combined

# Use multi-GPU computation
if torch.cuda.device_count() > 1:
    K_multi_gpu = multi_gpu_kernel_computation(X_large, embedding, n_gpus=2)

Algorithm-Level Optimizations

Adaptive Sampling Strategies

class AdaptiveSampler:
    """Adaptive sampling for efficient quantum embedding evaluation"""

    def __init__(self, embedding, initial_samples=10, max_samples=1000):
        self.embedding = embedding
        self.initial_samples = initial_samples
        self.max_samples = max_samples
        self.sample_history = []
        self.variance_history = []

    def adaptive_metric_computation(self, X, metric_func, tolerance=1e-3):
        """Compute metric with adaptive sampling"""

        current_samples = self.initial_samples
        previous_estimate = None
        converged = False

        while current_samples <= self.max_samples and not converged:
            # Sample subset
            indices = np.random.choice(len(X), size=current_samples, replace=False)
            X_sample = X[indices]

            # Compute metric
            current_estimate = metric_func(self.embedding, X_sample)

            # Check convergence
            if previous_estimate is not None:
                change = abs(current_estimate - previous_estimate)
                if change < tolerance:
                    converged = True
                    print(f"Converged after {current_samples} samples")

            self.sample_history.append(current_samples)
            self.variance_history.append(current_estimate)

            previous_estimate = current_estimate
            current_samples = min(int(current_samples * 1.5), self.max_samples)

        return current_estimate

    def plot_convergence(self):
        """Plot convergence behavior"""
        import matplotlib.pyplot as plt

        plt.figure(figsize=(10, 6))

        plt.subplot(1, 2, 1)
        plt.plot(self.sample_history, self.variance_history, 'o-')
        plt.xlabel('Number of Samples')
        plt.ylabel('Metric Value')
        plt.title('Metric Convergence')
        plt.grid(True)

        plt.subplot(1, 2, 2)
        if len(self.variance_history) > 1:
            changes = np.abs(np.diff(self.variance_history))
            plt.plot(self.sample_history[1:], changes, 'o-')
            plt.xlabel('Number of Samples')
            plt.ylabel('Absolute Change')
            plt.title('Convergence Rate')
            plt.yscale('log')
            plt.grid(True)

        plt.tight_layout()
        plt.show()

# Use adaptive sampling
sampler = AdaptiveSampler(embedding)
adaptive_expressibility = sampler.adaptive_metric_computation(
    X_large, expressibility, tolerance=1e-4
)
sampler.plot_convergence()

Efficient Hyperparameter Optimization

from sklearn.model_selection import GridSearchCV
from skopt import gp_minimize
from skopt.space import Real, Integer

class EmbeddingOptimizer:
    """Efficient hyperparameter optimization for embeddings"""

    def __init__(self, embedding_class, X_train, y_train=None):
        self.embedding_class = embedding_class
        self.X_train = X_train
        self.y_train = y_train
        self.best_params = None
        self.optimization_history = []

    def bayesian_optimization(self, param_space, n_calls=20, objective='expressibility'):
        """Bayesian optimization of embedding hyperparameters"""

        def objective_function(params):
            """Objective function for optimization"""
            try:
                # Create embedding with current parameters
                param_dict = dict(zip(param_space.keys(), params))
                embedding = self.embedding_class(**param_dict)

                # Compute objective metric
                if objective == 'expressibility':
                    score = expressibility(embedding, n_samples=200)
                elif objective == 'trainability':
                    score = trainability(embedding, self.X_train[:50])
                else:
                    raise ValueError(f"Unknown objective: {objective}")

                # We minimize, so negate for maximization objectives
                result = -score

                # Record history
                self.optimization_history.append({
                    'params': param_dict,
                    'score': score,
                    'objective_value': result
                })

                return result

            except Exception as e:
                print(f"Error in objective function: {e}")
                return 1.0  # Large penalty for failed evaluations

        # Convert parameter space
        space = [param_space[key] for key in param_space.keys()]

        # Run optimization
        result = gp_minimize(
            func=objective_function,
            dimensions=space,
            n_calls=n_calls,
            random_state=42,
            acq_func='EI'  # Expected Improvement
        )

        # Extract best parameters
        best_param_values = result.x
        self.best_params = dict(zip(param_space.keys(), best_param_values))

        print(f"Best parameters: {self.best_params}")
        print(f"Best score: {-result.fun:.6f}")

        return self.best_params

    def plot_optimization_history(self):
        """Plot optimization history"""
        import matplotlib.pyplot as plt

        scores = [entry['score'] for entry in self.optimization_history]
        iterations = range(1, len(scores) + 1)

        plt.figure(figsize=(12, 4))

        plt.subplot(1, 2, 1)
        plt.plot(iterations, scores, 'o-')
        plt.xlabel('Iteration')
        plt.ylabel('Objective Score')
        plt.title('Optimization Progress')
        plt.grid(True)

        # Running best
        running_best = []
        current_best = -np.inf
        for score in scores:
            current_best = max(current_best, score)
            running_best.append(current_best)

        plt.subplot(1, 2, 2)
        plt.plot(iterations, running_best, 'o-', color='red')
        plt.xlabel('Iteration')
        plt.ylabel('Best Score So Far')
        plt.title('Best Score Evolution')
        plt.grid(True)

        plt.tight_layout()
        plt.show()

# Example usage
param_space = {
    'n_qubits': Integer(3, 8),
    'depth': Integer(1, 5),
    'rotation_gates': ['rx', 'ry', 'rz']
}

optimizer = EmbeddingOptimizer(IQPEmbedding, X_large[:100])
best_params = optimizer.bayesian_optimization(param_space, n_calls=15)
optimizer.plot_optimization_history()

Performance Monitoring and Profiling

Comprehensive Performance Profiler

import cProfile
import pstats
from memory_profiler import profile
import psutil
import time

class QuantumEmbeddingProfiler:
    """Comprehensive profiling for quantum embeddings"""

    def __init__(self):
        self.profile_results = {}
        self.memory_usage = []
        self.timing_data = {}

    def profile_execution(self, func, *args, **kwargs):
        """Profile function execution"""

        # CPU profiling
        profiler = cProfile.Profile()
        profiler.enable()

        # Memory monitoring
        process = psutil.Process()
        initial_memory = process.memory_info().rss / 1024 / 1024  # MB

        start_time = time.time()

        # Execute function
        result = func(*args, **kwargs)

        end_time = time.time()
        execution_time = end_time - start_time

        # Stop profiling
        profiler.disable()

        # Get memory usage
        final_memory = process.memory_info().rss / 1024 / 1024  # MB
        memory_usage = final_memory - initial_memory

        # Store results
        func_name = func.__name__
        self.profile_results[func_name] = {
            'execution_time': execution_time,
            'memory_usage': memory_usage,
            'profiler': profiler
        }

        print(f"{func_name} - Time: {execution_time:.3f}s, Memory: +{memory_usage:.1f}MB")

        return result

    def generate_detailed_report(self, func_name):
        """Generate detailed profiling report"""
        if func_name not in self.profile_results:
            print(f"No profile data for {func_name}")
            return

        profiler = self.profile_results[func_name]['profiler']

        # Create stats object
        stats = pstats.Stats(profiler)
        stats.sort_stats('cumulative')

        print(f"\nDetailed Profile Report for {func_name}:")
        print("=" * 50)
        stats.print_stats(20)  # Top 20 functions

        # Memory report
        print(f"\nMemory Usage: +{self.profile_results[func_name]['memory_usage']:.1f}MB")
        print(f"Execution Time: {self.profile_results[func_name]['execution_time']:.3f}s")

    @profile
    def memory_intensive_operation(self, embedding, X):
        """Example memory-profiled operation"""
        # This decorator will show line-by-line memory usage
        kernel_matrices = []
        for i in range(len(X)):
            if i % 10 == 0:
                kernel = FidelityKernel(embedding)
                K = kernel.compute_kernel_matrix(X[i:i+10])
                kernel_matrices.append(K)
        return kernel_matrices

# Use profiler
profiler = QuantumEmbeddingProfiler()

# Profile embedding creation
embedding = profiler.profile_execution(
    lambda: AngleEmbedding(n_qubits=6),
)

# Profile kernel computation
X_small = X_large[:50]
kernel_result = profiler.profile_execution(
    lambda: FidelityKernel(embedding).compute_kernel_matrix(X_small)
)

# Generate detailed reports
profiler.generate_detailed_report('<lambda>')

Best Practices Summary

1. Circuit Optimization

  • Use circuit optimization passes
  • Minimize gate count and depth
  • Leverage hardware-specific optimizations

2. Computational Efficiency

  • Implement batch processing
  • Use memory-efficient algorithms
  • Cache intermediate results

3. Hardware Utilization

  • Utilize GPU acceleration when available
  • Implement parallel processing
  • Monitor resource usage

4. Algorithm Design

  • Use adaptive sampling strategies
  • Implement efficient hyperparameter optimization
  • Profile and monitor performance

5. Scalability

  • Design for large datasets
  • Implement chunked processing
  • Use distributed computing when needed

Performance Benchmarks

def run_performance_benchmarks():
    """Run comprehensive performance benchmarks"""

    benchmarks = {
        'small_dataset': np.random.randn(100, 4),
        'medium_dataset': np.random.randn(500, 6),
        'large_dataset': np.random.randn(1000, 8)
    }

    embeddings = {
        'angle': AngleEmbedding,
        'iqp': IQPEmbedding,
        'amplitude': AmplitudeEmbedding
    }

    results = {}

    for emb_name, emb_class in embeddings.items():
        results[emb_name] = {}

        for data_name, data in benchmarks.items():
            n_features = data.shape[1]
            embedding = emb_class(n_qubits=n_features)

            # Time various operations
            start_time = time.time()

            # Circuit creation
            circuits = [embedding.create_circuit(x) for x in data[:10]]
            circuit_time = time.time() - start_time

            # Expressibility computation
            start_time = time.time()
            expr_score = expressibility(embedding, n_samples=100)
            expr_time = time.time() - start_time

            results[emb_name][data_name] = {
                'circuit_creation_time': circuit_time,
                'expressibility_time': expr_time,
                'total_time': circuit_time + expr_time
            }

    # Display results
    import pandas as pd

    for metric in ['circuit_creation_time', 'expressibility_time', 'total_time']:
        print(f"\n{metric.replace('_', ' ').title()}:")
        data_for_df = {}
        for emb_name in results:
            data_for_df[emb_name] = [results[emb_name][data_name][metric] 
                                   for data_name in benchmarks]

        df = pd.DataFrame(data_for_df, index=benchmarks.keys())
        print(df.round(4))

# Run benchmarks
run_performance_benchmarks()

This comprehensive optimization guide provides strategies for maximizing the performance of quantum data embeddings across different scales and use cases.