Core Features¶
OpenML Crawler provides a comprehensive set of features for data collection, processing, and analysis. This guide covers the core functionality and capabilities.
๐ Data Connectors¶
Built-in Connectors¶
OpenML Crawler includes connectors for various data sources:
Weather Data¶
from openmlcrawler import load_dataset
# Load current weather and forecast
df = load_dataset("weather", location="New York", days=7)
print(df.head())
Features:
- Multiple providers (Open-Meteo, OpenWeather, NOAA)
- Historical and forecast data
- Automatic location geocoding
- Weather alerts and warnings
Social Media Data¶
# Twitter/X data
twitter_df = load_dataset("twitter", query="machine learning", max_results=100)
# Reddit data
reddit_df = load_dataset("reddit", subreddit="MachineLearning", limit=50)
# Facebook data
facebook_df = load_dataset("facebook", page_id="cnn", limit=25)
Features:
- Real-time data collection
- Sentiment analysis integration
- Engagement metrics
- Hashtag and mention tracking
Government Data¶
# US Government data
us_df = load_dataset("us_gov", query="healthcare", limit=50)
# EU Open Data
eu_df = load_dataset("eu_gov", query="environment", limit=50)
# UK Government data
uk_df = load_dataset("uk_gov", query="education", limit=50)
# Indian Government data
india_df = load_dataset("india_gov", query="agriculture", limit=50)
Features:
- Official government datasets
- Multi-country support
- Automatic metadata extraction
- Compliance with data regulations
Custom Connectors¶
Create custom connectors for proprietary data sources:
from openmlcrawler.core.connectors.base import BaseConnector
class CustomAPIConnector(BaseConnector):
def __init__(self, api_key):
super().__init__()
self.api_key = api_key
self.session.headers.update({"Authorization": f"Bearer {api_key}"})
def fetch_data(self, endpoint, **params):
"""Fetch data from custom API."""
response = self.session.get(f"https://api.example.com/{endpoint}", params=params)
response.raise_for_status()
return response.json()
# Register custom connector
from openmlcrawler.core.connectors.registry import register_connector
register_connector("custom_api", CustomAPIConnector)
๐ท๏ธ Web Crawling¶
Basic Web Crawling¶
Crawl data from any web source:
from openmlcrawler import crawl_and_prepare
# Crawl CSV data
df = crawl_and_prepare(
source="https://example.com/data.csv",
type="csv",
encoding="utf-8"
)
# Crawl JSON API
df = crawl_and_prepare(
source="https://api.example.com/data",
type="json",
headers={"Authorization": "Bearer token"}
)
# Crawl HTML table
df = crawl_and_prepare(
source="https://example.com/table.html",
type="html",
table_index=0 # First table on page
)
Advanced Crawling Features¶
from openmlcrawler.core.crawler import AsyncCrawler
# Async crawling for multiple URLs
crawler = AsyncCrawler(max_concurrent=10)
urls = [
"https://data1.example.com/dataset.csv",
"https://data2.example.com/dataset.json",
"https://data3.example.com/table.html"
]
# Crawl all URLs concurrently
results = await crawler.crawl_batch(urls)
for result in results:
print(f"Crawled {len(result)} records from {result.url}")
Headless Browser Support¶
For JavaScript-heavy websites:
from openmlcrawler.core.crawler import HeadlessCrawler
crawler = HeadlessCrawler(
browser="chromium", # or "firefox", "webkit"
headless=True,
wait_for_selector=".data-table" # Wait for element to load
)
df = crawler.crawl_table("https://example.com/dynamic-table")
๐งน Data Cleaning & Processing¶
Automatic Data Cleaning¶
from openmlcrawler import prepare_for_ml
# Load raw data
df = load_dataset("weather", location="London", days=30)
# Automatic cleaning and preparation
X, y, X_train, X_test, y_train, y_test = prepare_for_ml(
df,
target_column="temperature",
test_size=0.2,
normalize=True,
handle_missing="auto",
remove_duplicates=True
)
print(f"Training data shape: {X_train.shape}")
print(f"Test data shape: {X_test.shape}")
Advanced Cleaning Options¶
from openmlcrawler.core.cleaners import DataCleaner
cleaner = DataCleaner()
# Handle missing values
df = cleaner.handle_missing(df, strategy="interpolate")
# Remove outliers
df = cleaner.remove_outliers(df, method="iqr", threshold=1.5)
# Normalize text data
df = cleaner.clean_text_columns(df, columns=["description", "title"])
# Standardize formats
df = cleaner.standardize_formats(df, date_columns=["created_at"])
Data Type Detection¶
from openmlcrawler.core.schema import detect_column_types
# Automatic type detection
type_mapping = detect_column_types(df)
print("Detected column types:")
for column, dtype in type_mapping.items():
print(f" {column}: {dtype}")
# Apply type conversions
df = df.astype(type_mapping)
๐ Data Quality Assessment¶
Comprehensive Quality Reports¶
from openmlcrawler import assess_data_quality
# Generate quality report
quality_report = assess_data_quality(df)
print("๐ Data Quality Report")
print("=" * 50)
print(f"Dataset shape: {quality_report['dataset_shape']}")
print(f"Completeness score: {quality_report['completeness_score']:.2%}")
print(f"Missing rate: {quality_report['missing_rate']:.2%}")
print(f"Duplicate rate: {quality_report['duplicate_rate']:.2%}")
print(f"Data types: {quality_report['data_types']}")
# Detailed missing data analysis
print("\nMissing Data by Column:")
for col, rate in quality_report['missing_by_column'].items():
print(f" {col}: {rate:.2%}")
Quality Validation Rules¶
from openmlcrawler.core.quality import DataQualityValidator
validator = DataQualityValidator()
# Define validation rules
rules = {
"age": {"min": 0, "max": 120},
"email": {"pattern": r"^[^@]+@[^@]+\.[^@]+$"},
"phone": {"pattern": r"^\+?1?\d{9,15}$"},
"salary": {"min": 0, "max": 1000000}
}
# Validate data
validation_results = validator.validate(df, rules)
print("Validation Results:")
print(f"Valid records: {validation_results['valid_count']}")
print(f"Invalid records: {validation_results['invalid_count']}")
# Get detailed error report
for error in validation_results['errors'][:10]: # First 10 errors
print(f"Row {error['row']}, Column {error['column']}: {error['message']}")
๐ Privacy & Security¶
PII Detection¶
from openmlcrawler import detect_pii
# Detect personally identifiable information
pii_report = detect_pii(df)
print("PII Detection Results:")
for column, pii_types in pii_report.items():
if pii_types:
print(f" {column}: {', '.join(pii_types)}")
Data Anonymization¶
from openmlcrawler import anonymize_data
# Anonymize sensitive data
anonymized_df = anonymize_data(df, method="hash")
# Advanced anonymization
anonymized_df = anonymize_data(
df,
methods={
"email": "mask",
"phone": "redact",
"name": "pseudonymize"
}
)
Compliance Checking¶
from openmlcrawler.core.privacy import check_compliance
# Check GDPR compliance
gdpr_compliance = check_compliance(df, standard="gdpr")
print("GDPR Compliance Report:")
print(f"Overall compliant: {gdpr_compliance['compliant']}")
print(f"Risk level: {gdpr_compliance['risk_level']}")
for issue in gdpr_compliance['issues']:
print(f" โ ๏ธ {issue['type']}: {issue['description']}")
๐ Smart Search & Discovery¶
Dataset Search¶
from openmlcrawler import search_open_data
# Search across multiple platforms
results = search_open_data("climate change", max_results=20)
for result in results:
print(f"๐ {result['title']}")
print(f" Source: {result['source']}")
print(f" URL: {result['url']}")
print(f" Relevance: {result['relevance_score']:.2f}")
AI-Powered Search¶
from openmlcrawler.core.search import SmartSearchEngine
# Create search engine
search_engine = SmartSearchEngine()
# Index datasets
search_engine.index_dataset(df, "my_dataset", metadata={
"description": "Weather data for analysis",
"tags": ["weather", "climate", "meteorology"],
"quality_score": 0.95
})
# Semantic search
results = search_engine.search_datasets(
"temperature patterns and climate trends",
top_k=5
)
for result in results:
print(f"Found: {result['dataset_id']} (similarity: {result['similarity_score']:.3f})")
Metadata Enrichment¶
from openmlcrawler.core.search import DatasetIndexer
indexer = DatasetIndexer()
# Extract metadata automatically
metadata = indexer.extract_metadata(df)
print("Extracted Metadata:")
print(f" Columns: {metadata['num_columns']}")
print(f" Rows: {metadata['num_rows']}")
print(f" Data types: {metadata['data_types']}")
print(f" Estimated quality: {metadata['quality_score']:.2f}")
# Add custom metadata
metadata.update({
"domain": "environmental science",
"temporal_coverage": "2020-2024",
"geographic_coverage": "global"
})
โ๏ธ Cloud Integration¶
Multi-Provider Support¶
from openmlcrawler import create_aws_connector, create_gcs_connector
# AWS S3
aws_conn = create_aws_connector(bucket_name="my-data-bucket")
aws_conn.upload_dataset(df, "weather_data.csv")
# Google Cloud Storage
gcs_conn = create_gcs_connector(bucket_name="my-data-bucket")
gcs_conn.upload_dataset(df, "weather_data.csv")
# Azure Blob Storage
azure_conn = create_azure_connector(container_name="data")
azure_conn.upload_dataset(df, "weather_data.csv")
Unified Cloud API¶
from openmlcrawler.core.cloud import create_cloud_manager
# Create unified cloud manager
cloud_manager = create_cloud_manager()
# Upload to multiple providers
upload_results = cloud_manager.upload_to_providers(
df,
providers=["aws", "gcp"],
object_name="weather_data.parquet"
)
for provider, result in upload_results.items():
print(f"Uploaded to {provider}: {result['url']}")
Cloud-Native Features¶
# Direct cloud-to-cloud transfers
cloud_manager.transfer_between_providers(
source_provider="aws",
source_path="s3://source-bucket/data.csv",
dest_provider="gcp",
dest_path="gs://dest-bucket/data.csv"
)
# Serverless processing
result = cloud_manager.process_in_cloud(
dataset_url="s3://my-bucket/data.csv",
operation="normalize",
output_format="parquet"
)
โ๏ธ Workflow Orchestration¶
YAML-Based Pipelines¶
Create workflow.yaml
:
version: "1.0"
name: "Data Processing Pipeline"
description: "Complete data collection and processing workflow"
steps:
- name: "collect_weather"
type: "connector"
connector: "weather"
params:
location: "New York"
days: 30
output: "weather_raw.csv"
- name: "clean_data"
type: "processor"
operation: "clean"
input: "weather_raw.csv"
params:
remove_duplicates: true
handle_missing: "interpolate"
output: "weather_clean.csv"
- name: "quality_check"
type: "validator"
input: "weather_clean.csv"
rules:
completeness: 0.9
max_missing_rate: 0.05
output: "quality_report.json"
- name: "export_final"
type: "exporter"
input: "weather_clean.csv"
format: "parquet"
output: "weather_final.parquet"
Execute the workflow:
from openmlcrawler import execute_workflow_from_file
# Run the complete pipeline
result = execute_workflow_from_file("workflow.yaml")
print(f"Workflow status: {result['status']}")
print(f"Execution time: {result['execution_time']:.2f} seconds")
# Check results
for step_result in result['step_results']:
print(f"Step '{step_result['name']}': {step_result['status']}")
Conditional Workflows¶
steps:
- name: "assess_quality"
type: "validator"
input: "data.csv"
output: "quality.json"
- name: "conditional_processing"
type: "conditional"
condition: "quality.completeness_score > 0.8"
true_steps:
- name: "high_quality_processing"
type: "processor"
operation: "advanced_clean"
false_steps:
- name: "low_quality_processing"
type: "processor"
operation: "basic_clean"
Parallel Execution¶
steps:
- name: "parallel_collection"
type: "parallel"
steps:
- name: "weather_ny"
type: "connector"
connector: "weather"
params: {location: "New York"}
- name: "weather_la"
type: "connector"
connector: "weather"
params: {location: "Los Angeles"}
- name: "weather_chicago"
type: "connector"
connector: "weather"
params: {location: "Chicago"}
output: "combined_weather.csv"
๐ฏ Active Learning & Sampling¶
Intelligent Sampling¶
from openmlcrawler import smart_sample_dataset
# Diversity-based sampling
diverse_sample = smart_sample_dataset(
df,
sample_size=1000,
strategy="diversity",
feature_columns=["feature1", "feature2", "feature3"]
)
# Uncertainty-based sampling for active learning
uncertainty_sample = smart_sample_dataset(
df,
sample_size=500,
strategy="uncertainty",
target_column="target",
model_type="random_forest"
)
# Anomaly-based sampling
anomaly_sample = smart_sample_dataset(
df,
sample_size=200,
strategy="anomaly",
contamination=0.1
)
Stratified Sampling¶
from openmlcrawler.core.sampling import StratifiedSampler
sampler = StratifiedSampler()
# Maintain class distribution
stratified_sample = sampler.sample(
df,
sample_size=1000,
stratify_column="category",
min_samples_per_class=50
)
# Multi-column stratification
multi_stratified = sampler.sample(
df,
sample_size=2000,
stratify_columns=["region", "category"],
weights={"region": 0.6, "category": 0.4}
)
๐ Distributed Processing¶
Ray Integration¶
from openmlcrawler.core.distributed import create_distributed_crawler
# Create distributed crawler
crawler = create_distributed_crawler(num_workers=8)
# Process large dataset
result = crawler.process_large_dataset(
source="https://large-dataset.example.com/data.csv",
operations=["clean", "normalize", "validate"],
output_format="parquet"
)
print(f"Processed {result['total_records']} records in {result['execution_time']:.2f}s")
Dask Integration¶
from openmlcrawler.core.distributed import create_dask_processor
# Create Dask processor
processor = create_dask_processor(cluster="local", n_workers=4)
# Process with Dask
df_computed = processor.process_dataframe(
df,
operations=[
{"type": "fillna", "value": 0},
{"type": "normalize", "method": "standard"},
{"type": "encode", "columns": ["category"]}
]
)
๐ง ML Pipeline Integration¶
AutoML Integration¶
from openmlcrawler import create_automl_pipeline
# Create AutoML pipeline
automl = create_automl_pipeline()
# Run automated model selection
results = automl.run_automl(
X_train, y_train,
task="classification", # or "regression"
time_limit=600, # 10 minutes
metric="accuracy"
)
print(f"Best model: {results['best_model'].__class__.__name__}")
print(f"Best score: {results['best_score']:.3f}")
# Make predictions
predictions = automl.predict(X_test)
Feature Store Integration¶
from openmlcrawler import create_feature_store
# Create feature store
feature_store = create_feature_store()
# Store features
feature_store.store_features(
features=X_train,
feature_names=["feature1", "feature2", "feature3"],
metadata={
"source": "weather_data",
"version": "v1.0",
"created_at": "2024-01-01"
}
)
# Retrieve features
stored_features = feature_store.get_features(
feature_names=["feature1", "feature2"],
version="latest"
)
๐ ๏ธ Developer Tools¶
CLI Integration¶
# Load data
openmlcrawler load weather --location "Tokyo" --days 14 --output weather.csv
# Assess quality
openmlcrawler quality weather.csv --format json --output quality.json
# Export data
openmlcrawler export weather.csv --format parquet --output weather.parquet
# Run workflow
openmlcrawler workflow run pipeline.yaml
Programmatic API¶
from openmlcrawler import OpenMLCrawler
# Create crawler instance
crawler = OpenMLCrawler()
# Configure settings
crawler.config.cache.enabled = True
crawler.config.processing.max_workers = 8
# Load and process data
df = crawler.load_dataset("weather", location="Paris", days=7)
clean_df = crawler.clean_data(df)
result_df = crawler.prepare_for_ml(clean_df, target_column="temperature")
# Export results
crawler.export_dataset(result_df, "processed_weather.csv")
This covers the core features of OpenML Crawler. Each feature is designed to work seamlessly together, allowing you to build comprehensive data processing pipelines with minimal code.