Advanced Features¶
This guide covers the advanced features of OpenML Crawler, including real-time monitoring, federated learning, self-healing pipelines, and distributed processing capabilities.
📊 Real-Time Monitoring & Analytics¶
Real-Time Data Streams¶
from openmlcrawler.core.monitoring import RealTimeMonitor
# Create real-time monitor
monitor = RealTimeMonitor()
# Monitor data quality in real-time
@monitor.on_data_received
def process_streaming_data(data_batch):
"""Process incoming data batches."""
# Quality assessment
quality_score = assess_data_quality(data_batch)
# Alert if quality drops
if quality_score < 0.8:
monitor.alert("Data quality dropped below threshold", quality_score)
# Update metrics
monitor.update_metrics({
"records_processed": len(data_batch),
"quality_score": quality_score,
"processing_time": time.time() - start_time
})
# Start monitoring
monitor.start_stream_monitoring(
source="kafka://data-stream:9092",
topic="weather_data",
batch_size=100,
interval_seconds=30
)
Performance Monitoring¶
from openmlcrawler.core.monitoring import PerformanceMonitor
# Monitor system performance
perf_monitor = PerformanceMonitor()
# Track pipeline performance
with perf_monitor.track_operation("data_processing_pipeline"):
# Your data processing code here
df = load_dataset("weather", location="New York")
clean_df = clean_data(df)
result = process_data(clean_df)
# Get performance metrics
metrics = perf_monitor.get_metrics()
print("Performance Metrics:")
print(f"Total operations: {metrics['total_operations']}")
print(f"Average latency: {metrics['avg_latency_ms']:.2f}ms")
print(f"Memory usage: {metrics['memory_mb']:.2f}MB")
print(f"CPU usage: {metrics['cpu_percent']:.1f}%")
Custom Metrics & Alerts¶
from openmlcrawler.core.monitoring import MetricsCollector, AlertManager
# Create metrics collector
metrics = MetricsCollector()
# Define custom metrics
metrics.register_metric(
name="data_freshness",
type="gauge",
description="Time since last data update"
)
metrics.register_metric(
name="error_rate",
type="counter",
description="Rate of processing errors"
)
# Create alert manager
alerts = AlertManager()
# Define alerts
alerts.create_alert(
name="high_error_rate",
condition="error_rate > 0.05",
severity="critical",
channels=["email", "slack"]
)
alerts.create_alert(
name="stale_data",
condition="data_freshness > 3600", # 1 hour
severity="warning",
channels=["dashboard"]
)
# Update metrics
metrics.set_gauge("data_freshness", time.time() - last_update)
metrics.increment_counter("error_rate", error_count)
🌐 Federated Learning¶
Federated Learning Setup¶
from openmlcrawler.core.federated import FederatedLearner, FederatedServer
# Create federated server
server = FederatedServer(
num_clients=10,
min_clients_per_round=5,
rounds=100
)
# Configure federated learning
server.configure_model(
model_type="neural_network",
input_shape=(784,),
num_classes=10,
learning_rate=0.01
)
# Start federated learning
server.start_federated_training()
# Client-side participation
client = FederatedLearner(server_address="federated-server:8080")
# Join federated learning
client.join_federation(client_id="client_001")
# Train on local data
for round_num in range(100):
# Get global model
global_model = client.get_global_model()
# Train on local dataset
local_model = client.train_local_model(
global_model,
local_dataset,
epochs=5
)
# Send updates to server
client.send_model_update(local_model)
Privacy-Preserving Federated Learning¶
from openmlcrawler.core.federated import PrivateFederatedLearner
# Create privacy-preserving client
private_client = PrivateFederatedLearner(
server_address="secure-server:8080",
privacy_budget=1.0,
noise_multiplier=0.1
)
# Differential privacy training
private_client.enable_differential_privacy(
epsilon=0.5,
delta=1e-5
)
# Secure aggregation
private_client.enable_secure_aggregation(
key_size=2048,
threshold=3 # Minimum clients for reconstruction
)
# Train with privacy guarantees
private_model = private_client.train_private_model(
dataset=local_data,
target_column="label",
privacy_level="high"
)
Federated Analytics¶
from openmlcrawler.core.federated import FederatedAnalytics
# Create federated analytics engine
analytics = FederatedAnalytics()
# Distributed statistical analysis
stats = analytics.compute_federated_statistics(
clients=["client_1", "client_2", "client_3"],
query="SELECT AVG(temperature), COUNT(*) FROM weather_data WHERE city='NYC'",
privacy_budget=0.1
)
print("Federated Statistics:")
print(f"Average temperature: {stats['avg_temperature']:.2f}")
print(f"Total records: {stats['total_count']}")
# Federated data visualization
chart_data = analytics.create_federated_chart(
clients=["client_1", "client_2"],
chart_type="histogram",
column="temperature",
bins=20
)
🔧 Self-Healing Pipelines¶
Auto-Recovery Configuration¶
from openmlcrawler.core.self_healing import SelfHealingPipeline
# Create self-healing pipeline
pipeline = SelfHealingPipeline()
# Configure recovery strategies
pipeline.add_recovery_strategy(
failure_type="connection_error",
strategy="retry",
max_retries=3,
backoff_seconds=5
)
pipeline.add_recovery_strategy(
failure_type="data_quality_error",
strategy="fallback",
fallback_source="backup_data_source"
)
pipeline.add_recovery_strategy(
failure_type="memory_error",
strategy="scale_down",
max_batch_size=1000
)
# Define pipeline steps with error handling
@pipeline.step(recovery_strategies=["retry", "fallback"])
def load_weather_data():
return load_dataset("weather", location="New York")
@pipeline.step(recovery_strategies=["scale_down"])
def process_large_dataset(data):
return process_data(data)
# Execute with auto-recovery
result = pipeline.execute_with_recovery()
Intelligent Error Detection¶
from openmlcrawler.core.self_healing import ErrorDetector
# Create error detector
detector = ErrorDetector()
# Configure error patterns
detector.add_error_pattern(
name="api_rate_limit",
pattern=r"rate limit exceeded",
severity="warning",
auto_recover=True,
recovery_action="wait_and_retry",
wait_seconds=60
)
detector.add_error_pattern(
name="data_corruption",
pattern=r"unexpected data format",
severity="error",
auto_recover=True,
recovery_action="switch_source"
)
detector.add_error_pattern(
name="memory_exhaustion",
pattern=r"out of memory",
severity="critical",
auto_recover=True,
recovery_action="reduce_batch_size"
)
# Monitor for errors
detector.start_monitoring()
# Process with error detection
try:
result = process_data_with_monitoring(data, detector)
except Exception as e:
recovery_action = detector.suggest_recovery(str(e))
if recovery_action:
print(f"Suggested recovery: {recovery_action}")
# Execute recovery
detector.execute_recovery(recovery_action)
Adaptive Resource Management¶
from openmlcrawler.core.self_healing import AdaptiveResourceManager
# Create adaptive resource manager
resource_manager = AdaptiveResourceManager()
# Configure resource limits
resource_manager.set_limits(
max_memory_gb=8,
max_cpu_percent=80,
max_concurrent_tasks=10
)
# Adaptive scaling based on workload
resource_manager.enable_adaptive_scaling(
scale_up_threshold=0.8, # Scale up at 80% utilization
scale_down_threshold=0.3, # Scale down at 30% utilization
min_workers=2,
max_workers=20
)
# Monitor and adjust resources
resource_manager.start_resource_monitoring(interval_seconds=30)
# Process with adaptive resources
result = resource_manager.process_with_adaptation(
data=data,
operation="heavy_computation",
estimated_complexity="high"
)
🚀 Distributed Processing¶
Ray Integration¶
import ray
from openmlcrawler.core.distributed import RayProcessor
# Initialize Ray
ray.init()
# Create Ray processor
processor = RayProcessor()
# Distributed data processing
@ray.remote
def process_partition(partition):
"""Process a data partition."""
return clean_data(partition)
# Split data into partitions
partitions = split_data(df, num_partitions=10)
# Process in parallel
futures = [process_partition.remote(partition) for partition in partitions]
results = ray.get(futures)
# Combine results
final_result = combine_results(results)
Dask Integration¶
import dask.dataframe as dd
from openmlcrawler.core.distributed import DaskProcessor
# Create Dask processor
processor = DaskProcessor()
# Convert to Dask DataFrame
ddf = dd.from_pandas(df, npartitions=8)
# Distributed operations
clean_ddf = processor.clean_distributed(ddf)
processed_ddf = processor.process_distributed(clean_ddf)
# Compute results
result = processed_ddf.compute()
# Distributed ML training
from dask_ml.model_selection import train_test_split
from dask_ml.linear_model import LogisticRegression
X_train, X_test, y_train, y_test = train_test_split(
processed_ddf.drop('target', axis=1),
processed_ddf['target'],
test_size=0.2
)
# Train distributed model
model = LogisticRegression()
model.fit(X_train, y_train)
# Predict
predictions = model.predict(X_test)
Kubernetes Integration¶
from openmlcrawler.core.distributed import KubernetesProcessor
# Create Kubernetes processor
k8s_processor = KubernetesProcessor(
namespace="data-processing",
image="openmlcrawler:latest"
)
# Deploy distributed job
job_spec = {
"name": "data-processing-job",
"replicas": 5,
"resources": {
"requests": {"cpu": "500m", "memory": "1Gi"},
"limits": {"cpu": "1000m", "memory": "2Gi"}
},
"env": {
"DATA_SOURCE": "s3://bucket/data.csv",
"OUTPUT_PATH": "s3://bucket/processed/"
}
}
# Deploy and monitor
job_id = k8s_processor.deploy_job(job_spec)
status = k8s_processor.monitor_job(job_id)
print(f"Job status: {status['phase']}")
print(f"Running pods: {status['running_pods']}")
print(f"Completed pods: {status['completed_pods']}")
# Get results
results = k8s_processor.get_job_results(job_id)
🔐 Advanced Security Features¶
Data Encryption¶
from openmlcrawler.core.security import DataEncryptor
# Create encryptor
encryptor = DataEncryptor(
algorithm="AES-256-GCM",
key_rotation_days=30
)
# Encrypt sensitive data
encrypted_df = encryptor.encrypt_dataframe(
df,
columns=["ssn", "credit_card", "personal_info"],
key_id="production-key-v1"
)
# Store encrypted data
encryptor.store_encrypted_data(
encrypted_df,
storage_path="s3://secure-bucket/encrypted-data/",
metadata={"encryption_key": "v1", "algorithm": "AES-256-GCM"}
)
# Decrypt when needed
decrypted_df = encryptor.decrypt_dataframe(
encrypted_data_path="s3://secure-bucket/encrypted-data/",
key_id="production-key-v1"
)
Access Control¶
from openmlcrawler.core.security import AccessController
# Create access controller
ac = AccessController()
# Define roles and permissions
ac.create_role("data_analyst", permissions=[
"read:weather_data",
"read:social_media_data",
"execute:analysis_pipeline"
])
ac.create_role("data_engineer", permissions=[
"read:*",
"write:processed_data",
"execute:etl_pipeline",
"manage:pipelines"
])
# Assign roles to users
ac.assign_role("user_123", "data_analyst")
ac.assign_role("user_456", "data_engineer")
# Check permissions
if ac.has_permission("user_123", "read:weather_data"):
weather_data = load_dataset("weather")
else:
raise PermissionError("Access denied")
# Row-level security
ac.enable_row_level_security(
table="customer_data",
policy="user_id = current_user_id()"
)
Audit Logging¶
from openmlcrawler.core.security import AuditLogger
# Create audit logger
audit = AuditLogger(
log_destination="cloudwatch://audit-logs",
retention_days=365
)
# Log all data access
@audit.log_access
def access_sensitive_data(user_id, data_type):
"""Access sensitive data with audit logging."""
audit.log_event({
"event_type": "data_access",
"user_id": user_id,
"data_type": data_type,
"timestamp": datetime.now(),
"source_ip": get_client_ip(),
"action": "read"
})
return load_dataset(data_type)
# Log pipeline executions
@audit.log_pipeline_execution
def execute_data_pipeline(pipeline_name, parameters):
"""Execute pipeline with audit logging."""
audit.log_event({
"event_type": "pipeline_execution",
"pipeline_name": pipeline_name,
"parameters": parameters,
"user_id": get_current_user(),
"start_time": datetime.now()
})
result = execute_pipeline(pipeline_name, parameters)
audit.log_event({
"event_type": "pipeline_completion",
"pipeline_name": pipeline_name,
"status": "success",
"end_time": datetime.now(),
"records_processed": len(result)
})
return result
# Query audit logs
audit_logs = audit.query_logs(
user_id="user_123",
date_range=("2024-01-01", "2024-01-31"),
event_type="data_access"
)
📈 Advanced Analytics¶
Time Series Analysis¶
from openmlcrawler.core.analytics import TimeSeriesAnalyzer
# Create time series analyzer
ts_analyzer = TimeSeriesAnalyzer()
# Analyze time series data
analysis = ts_analyzer.analyze_series(
df,
time_column="timestamp",
value_column="temperature",
frequency="H" # Hourly data
)
print("Time Series Analysis:")
print(f"Trend: {analysis['trend']}")
print(f"Seasonality: {analysis['seasonal']}")
print(f"Stationarity: {analysis['stationary']}")
# Forecast future values
forecast = ts_analyzer.forecast(
df,
target_column="temperature",
periods=24, # 24 hours ahead
model="prophet"
)
print("24-hour Forecast:")
for i, value in enumerate(forecast['predictions'][:5]):
print(f"Hour {i+1}: {value:.2f}")
Anomaly Detection¶
from openmlcrawler.core.analytics import AnomalyDetector
# Create anomaly detector
detector = AnomalyDetector()
# Detect anomalies in data
anomalies = detector.detect_anomalies(
df,
method="isolation_forest",
contamination=0.1, # Expected 10% anomalies
features=["temperature", "humidity", "pressure"]
)
print(f"Detected {len(anomalies)} anomalies")
# Visualize anomalies
detector.plot_anomalies(
df,
anomalies,
save_path="anomaly_analysis.png"
)
# Real-time anomaly detection
@detector.on_anomaly_detected
def handle_anomaly(anomaly_data):
"""Handle detected anomalies."""
print(f"Anomaly detected: {anomaly_data}")
# Send alert
send_alert(
message=f"Anomaly in {anomaly_data['feature']}: {anomaly_data['value']}",
severity="high"
)
# Start real-time monitoring
detector.start_realtime_monitoring(
data_stream="kafka://sensor-data:9092",
threshold=0.95 # 95% confidence threshold
)
Predictive Modeling¶
from openmlcrawler.core.analytics import PredictiveModeler
# Create predictive modeler
modeler = PredictiveModeler()
# AutoML model selection
best_model = modeler.auto_select_model(
X_train, y_train,
task="regression",
time_limit=300, # 5 minutes
metric="rmse"
)
print(f"Best model: {best_model.__class__.__name__}")
print(f"Best score: {best_model.score(X_test, y_test):.3f}")
# Feature importance analysis
importance = modeler.analyze_feature_importance(
best_model,
feature_names=X_train.columns
)
print("Top 5 Important Features:")
for feature, score in importance[:5]:
print(f" {feature}: {score:.3f}")
# Model explainability
explanation = modeler.explain_prediction(
model=best_model,
instance=X_test.iloc[0],
method="shap"
)
print("Prediction Explanation:")
print(f"Base value: {explanation['base_value']:.3f}")
print(f"Prediction: {explanation['prediction']:.3f}")
for feature, contribution in explanation['feature_contributions'][:3]:
print(f" {feature}: {contribution:.3f}")
🔗 API Integration¶
REST API Server¶
from openmlcrawler.core.api import APIServer
# Create API server
server = APIServer(host="0.0.0.0", port=8080)
# Register endpoints
@server.get("/health")
def health_check():
return {"status": "healthy", "timestamp": datetime.now()}
@server.post("/datasets/{dataset_type}")
def load_dataset_endpoint(dataset_type: str, request_data: dict):
"""Load dataset via API."""
df = load_dataset(dataset_type, **request_data)
return {
"dataset_type": dataset_type,
"shape": df.shape,
"columns": list(df.columns),
"data": df.to_dict('records')[:10] # First 10 rows
}
@server.post("/process")
def process_data_endpoint(data: dict):
"""Process data via API."""
df = pd.DataFrame(data['data'])
# Process data
clean_df = clean_data(df)
result_df = prepare_for_ml(clean_df, target_column=data.get('target'))
return {
"processed_shape": result_df.shape,
"quality_score": assess_data_quality(result_df),
"data": result_df.to_dict('records')
}
# Start server
server.start()
GraphQL API¶
from openmlcrawler.core.api import GraphQLServer
# Create GraphQL server
gql_server = GraphQLServer()
# Define GraphQL schema
schema = """
type Dataset {
id: ID!
name: String!
shape: String!
columns: [String!]!
data: [[String]]!
quality_score: Float!
}
type Query {
datasets: [Dataset!]!
dataset(id: ID!): Dataset
}
type Mutation {
loadDataset(type: String!, params: String): Dataset!
processDataset(id: ID!, operations: [String!]!): Dataset!
}
"""
# Implement resolvers
@gql_server.query("datasets")
def resolve_datasets():
"""Resolve all datasets query."""
# Return list of available datasets
return get_available_datasets()
@gql_server.query("dataset")
def resolve_dataset(id):
"""Resolve single dataset query."""
return get_dataset_by_id(id)
@gql_server.mutation("loadDataset")
def resolve_load_dataset(type, params):
"""Resolve load dataset mutation."""
params_dict = json.loads(params) if params else {}
df = load_dataset(type, **params_dict)
return {
"id": str(uuid.uuid4()),
"name": f"{type}_dataset",
"shape": f"{df.shape[0]}x{df.shape[1]}",
"columns": list(df.columns),
"data": df.values.tolist()[:10],
"quality_score": assess_data_quality(df)
}
# Start GraphQL server
gql_server.start(schema, host="0.0.0.0", port=4000)
WebSocket Streaming¶
from openmlcrawler.core.api import WebSocketServer
# Create WebSocket server
ws_server = WebSocketServer(host="0.0.0.0", port=8081)
# Handle real-time data streaming
@ws_server.on_connect
def handle_connect(client_id):
"""Handle client connection."""
print(f"Client {client_id} connected")
@ws_server.on_disconnect
def handle_disconnect(client_id):
"""Handle client disconnection."""
print(f"Client {client_id} disconnected")
@ws_server.on_message("subscribe")
def handle_subscribe(client_id, data):
"""Handle data subscription."""
dataset_type = data.get("dataset_type")
filters = data.get("filters", {})
# Subscribe to real-time data
subscription_id = ws_server.subscribe_client(
client_id,
dataset_type,
filters
)
return {"subscription_id": subscription_id}
@ws_server.on_message("unsubscribe")
def handle_unsubscribe(client_id, data):
"""Handle data unsubscription."""
subscription_id = data.get("subscription_id")
ws_server.unsubscribe_client(client_id, subscription_id)
return {"status": "unsubscribed"}
# Broadcast data updates
def broadcast_data_updates():
"""Broadcast data updates to subscribed clients."""
while True:
# Get latest data
new_data = get_latest_data()
# Broadcast to subscribers
ws_server.broadcast_to_subscribers(
dataset_type="weather",
data=new_data
)
time.sleep(30) # Update every 30 seconds
# Start WebSocket server
ws_server.start()
# Start broadcasting in background
import threading
threading.Thread(target=broadcast_data_updates, daemon=True).start()
These advanced features make OpenML Crawler a powerful platform for enterprise-grade data processing, machine learning, and analytics workflows.