Skip to content

Core Features

OpenML Crawler provides a comprehensive set of features for data collection, processing, and analysis. This guide covers the core functionality and capabilities.

๐Ÿ”Œ Data Connectors

Built-in Connectors

OpenML Crawler includes connectors for various data sources:

Weather Data

from openmlcrawler import load_dataset

# Load current weather and forecast
df = load_dataset("weather", location="New York", days=7)
print(df.head())

Features:

  • Multiple providers (Open-Meteo, OpenWeather, NOAA)
  • Historical and forecast data
  • Automatic location geocoding
  • Weather alerts and warnings

Social Media Data

# Twitter/X data
twitter_df = load_dataset("twitter", query="machine learning", max_results=100)

# Reddit data
reddit_df = load_dataset("reddit", subreddit="MachineLearning", limit=50)

# Facebook data
facebook_df = load_dataset("facebook", page_id="cnn", limit=25)

Features:

  • Real-time data collection
  • Sentiment analysis integration
  • Engagement metrics
  • Hashtag and mention tracking

Government Data

# US Government data
us_df = load_dataset("us_gov", query="healthcare", limit=50)

# EU Open Data
eu_df = load_dataset("eu_gov", query="environment", limit=50)

# UK Government data
uk_df = load_dataset("uk_gov", query="education", limit=50)

# Indian Government data
india_df = load_dataset("india_gov", query="agriculture", limit=50)

Features:

  • Official government datasets
  • Multi-country support
  • Automatic metadata extraction
  • Compliance with data regulations

Custom Connectors

Create custom connectors for proprietary data sources:

from openmlcrawler.core.connectors.base import BaseConnector

class CustomAPIConnector(BaseConnector):
    def __init__(self, api_key):
        super().__init__()
        self.api_key = api_key
        self.session.headers.update({"Authorization": f"Bearer {api_key}"})

    def fetch_data(self, endpoint, **params):
        """Fetch data from custom API."""
        response = self.session.get(f"https://api.example.com/{endpoint}", params=params)
        response.raise_for_status()
        return response.json()

# Register custom connector
from openmlcrawler.core.connectors.registry import register_connector
register_connector("custom_api", CustomAPIConnector)

๐Ÿ•ท๏ธ Web Crawling

Basic Web Crawling

Crawl data from any web source:

from openmlcrawler import crawl_and_prepare

# Crawl CSV data
df = crawl_and_prepare(
    source="https://example.com/data.csv",
    type="csv",
    encoding="utf-8"
)

# Crawl JSON API
df = crawl_and_prepare(
    source="https://api.example.com/data",
    type="json",
    headers={"Authorization": "Bearer token"}
)

# Crawl HTML table
df = crawl_and_prepare(
    source="https://example.com/table.html",
    type="html",
    table_index=0  # First table on page
)

Advanced Crawling Features

from openmlcrawler.core.crawler import AsyncCrawler

# Async crawling for multiple URLs
crawler = AsyncCrawler(max_concurrent=10)

urls = [
    "https://data1.example.com/dataset.csv",
    "https://data2.example.com/dataset.json",
    "https://data3.example.com/table.html"
]

# Crawl all URLs concurrently
results = await crawler.crawl_batch(urls)

for result in results:
    print(f"Crawled {len(result)} records from {result.url}")

Headless Browser Support

For JavaScript-heavy websites:

from openmlcrawler.core.crawler import HeadlessCrawler

crawler = HeadlessCrawler(
    browser="chromium",  # or "firefox", "webkit"
    headless=True,
    wait_for_selector=".data-table"  # Wait for element to load
)

df = crawler.crawl_table("https://example.com/dynamic-table")

๐Ÿงน Data Cleaning & Processing

Automatic Data Cleaning

from openmlcrawler import prepare_for_ml

# Load raw data
df = load_dataset("weather", location="London", days=30)

# Automatic cleaning and preparation
X, y, X_train, X_test, y_train, y_test = prepare_for_ml(
    df,
    target_column="temperature",
    test_size=0.2,
    normalize=True,
    handle_missing="auto",
    remove_duplicates=True
)

print(f"Training data shape: {X_train.shape}")
print(f"Test data shape: {X_test.shape}")

Advanced Cleaning Options

from openmlcrawler.core.cleaners import DataCleaner

cleaner = DataCleaner()

# Handle missing values
df = cleaner.handle_missing(df, strategy="interpolate")

# Remove outliers
df = cleaner.remove_outliers(df, method="iqr", threshold=1.5)

# Normalize text data
df = cleaner.clean_text_columns(df, columns=["description", "title"])

# Standardize formats
df = cleaner.standardize_formats(df, date_columns=["created_at"])

Data Type Detection

from openmlcrawler.core.schema import detect_column_types

# Automatic type detection
type_mapping = detect_column_types(df)

print("Detected column types:")
for column, dtype in type_mapping.items():
    print(f"  {column}: {dtype}")

# Apply type conversions
df = df.astype(type_mapping)

๐Ÿ“Š Data Quality Assessment

Comprehensive Quality Reports

from openmlcrawler import assess_data_quality

# Generate quality report
quality_report = assess_data_quality(df)

print("๐Ÿ“Š Data Quality Report")
print("=" * 50)
print(f"Dataset shape: {quality_report['dataset_shape']}")
print(f"Completeness score: {quality_report['completeness_score']:.2%}")
print(f"Missing rate: {quality_report['missing_rate']:.2%}")
print(f"Duplicate rate: {quality_report['duplicate_rate']:.2%}")
print(f"Data types: {quality_report['data_types']}")

# Detailed missing data analysis
print("\nMissing Data by Column:")
for col, rate in quality_report['missing_by_column'].items():
    print(f"  {col}: {rate:.2%}")

Quality Validation Rules

from openmlcrawler.core.quality import DataQualityValidator

validator = DataQualityValidator()

# Define validation rules
rules = {
    "age": {"min": 0, "max": 120},
    "email": {"pattern": r"^[^@]+@[^@]+\.[^@]+$"},
    "phone": {"pattern": r"^\+?1?\d{9,15}$"},
    "salary": {"min": 0, "max": 1000000}
}

# Validate data
validation_results = validator.validate(df, rules)

print("Validation Results:")
print(f"Valid records: {validation_results['valid_count']}")
print(f"Invalid records: {validation_results['invalid_count']}")

# Get detailed error report
for error in validation_results['errors'][:10]:  # First 10 errors
    print(f"Row {error['row']}, Column {error['column']}: {error['message']}")

๐Ÿ”’ Privacy & Security

PII Detection

from openmlcrawler import detect_pii

# Detect personally identifiable information
pii_report = detect_pii(df)

print("PII Detection Results:")
for column, pii_types in pii_report.items():
    if pii_types:
        print(f"  {column}: {', '.join(pii_types)}")

Data Anonymization

from openmlcrawler import anonymize_data

# Anonymize sensitive data
anonymized_df = anonymize_data(df, method="hash")

# Advanced anonymization
anonymized_df = anonymize_data(
    df,
    methods={
        "email": "mask",
        "phone": "redact",
        "name": "pseudonymize"
    }
)

Compliance Checking

from openmlcrawler.core.privacy import check_compliance

# Check GDPR compliance
gdpr_compliance = check_compliance(df, standard="gdpr")

print("GDPR Compliance Report:")
print(f"Overall compliant: {gdpr_compliance['compliant']}")
print(f"Risk level: {gdpr_compliance['risk_level']}")

for issue in gdpr_compliance['issues']:
    print(f"  โš ๏ธ  {issue['type']}: {issue['description']}")

๐Ÿ” Smart Search & Discovery

from openmlcrawler import search_open_data

# Search across multiple platforms
results = search_open_data("climate change", max_results=20)

for result in results:
    print(f"๐Ÿ“Š {result['title']}")
    print(f"   Source: {result['source']}")
    print(f"   URL: {result['url']}")
    print(f"   Relevance: {result['relevance_score']:.2f}")
from openmlcrawler.core.search import SmartSearchEngine

# Create search engine
search_engine = SmartSearchEngine()

# Index datasets
search_engine.index_dataset(df, "my_dataset", metadata={
    "description": "Weather data for analysis",
    "tags": ["weather", "climate", "meteorology"],
    "quality_score": 0.95
})

# Semantic search
results = search_engine.search_datasets(
    "temperature patterns and climate trends",
    top_k=5
)

for result in results:
    print(f"Found: {result['dataset_id']} (similarity: {result['similarity_score']:.3f})")

Metadata Enrichment

from openmlcrawler.core.search import DatasetIndexer

indexer = DatasetIndexer()

# Extract metadata automatically
metadata = indexer.extract_metadata(df)

print("Extracted Metadata:")
print(f"  Columns: {metadata['num_columns']}")
print(f"  Rows: {metadata['num_rows']}")
print(f"  Data types: {metadata['data_types']}")
print(f"  Estimated quality: {metadata['quality_score']:.2f}")

# Add custom metadata
metadata.update({
    "domain": "environmental science",
    "temporal_coverage": "2020-2024",
    "geographic_coverage": "global"
})

โ˜๏ธ Cloud Integration

Multi-Provider Support

from openmlcrawler import create_aws_connector, create_gcs_connector

# AWS S3
aws_conn = create_aws_connector(bucket_name="my-data-bucket")
aws_conn.upload_dataset(df, "weather_data.csv")

# Google Cloud Storage
gcs_conn = create_gcs_connector(bucket_name="my-data-bucket")
gcs_conn.upload_dataset(df, "weather_data.csv")

# Azure Blob Storage
azure_conn = create_azure_connector(container_name="data")
azure_conn.upload_dataset(df, "weather_data.csv")

Unified Cloud API

from openmlcrawler.core.cloud import create_cloud_manager

# Create unified cloud manager
cloud_manager = create_cloud_manager()

# Upload to multiple providers
upload_results = cloud_manager.upload_to_providers(
    df,
    providers=["aws", "gcp"],
    object_name="weather_data.parquet"
)

for provider, result in upload_results.items():
    print(f"Uploaded to {provider}: {result['url']}")

Cloud-Native Features

# Direct cloud-to-cloud transfers
cloud_manager.transfer_between_providers(
    source_provider="aws",
    source_path="s3://source-bucket/data.csv",
    dest_provider="gcp",
    dest_path="gs://dest-bucket/data.csv"
)

# Serverless processing
result = cloud_manager.process_in_cloud(
    dataset_url="s3://my-bucket/data.csv",
    operation="normalize",
    output_format="parquet"
)

โš™๏ธ Workflow Orchestration

YAML-Based Pipelines

Create workflow.yaml:

version: "1.0"

name: "Data Processing Pipeline"
description: "Complete data collection and processing workflow"

steps:
  - name: "collect_weather"
    type: "connector"
    connector: "weather"
    params:
      location: "New York"
      days: 30
    output: "weather_raw.csv"

  - name: "clean_data"
    type: "processor"
    operation: "clean"
    input: "weather_raw.csv"
    params:
      remove_duplicates: true
      handle_missing: "interpolate"
    output: "weather_clean.csv"

  - name: "quality_check"
    type: "validator"
    input: "weather_clean.csv"
    rules:
      completeness: 0.9
      max_missing_rate: 0.05
    output: "quality_report.json"

  - name: "export_final"
    type: "exporter"
    input: "weather_clean.csv"
    format: "parquet"
    output: "weather_final.parquet"

Execute the workflow:

from openmlcrawler import execute_workflow_from_file

# Run the complete pipeline
result = execute_workflow_from_file("workflow.yaml")

print(f"Workflow status: {result['status']}")
print(f"Execution time: {result['execution_time']:.2f} seconds")

# Check results
for step_result in result['step_results']:
    print(f"Step '{step_result['name']}': {step_result['status']}")

Conditional Workflows

steps:
  - name: "assess_quality"
    type: "validator"
    input: "data.csv"
    output: "quality.json"

  - name: "conditional_processing"
    type: "conditional"
    condition: "quality.completeness_score > 0.8"
    true_steps:
      - name: "high_quality_processing"
        type: "processor"
        operation: "advanced_clean"
    false_steps:
      - name: "low_quality_processing"
        type: "processor"
        operation: "basic_clean"

Parallel Execution

steps:
  - name: "parallel_collection"
    type: "parallel"
    steps:
      - name: "weather_ny"
        type: "connector"
        connector: "weather"
        params: {location: "New York"}
      - name: "weather_la"
        type: "connector"
        connector: "weather"
        params: {location: "Los Angeles"}
      - name: "weather_chicago"
        type: "connector"
        connector: "weather"
        params: {location: "Chicago"}
    output: "combined_weather.csv"

๐ŸŽฏ Active Learning & Sampling

Intelligent Sampling

from openmlcrawler import smart_sample_dataset

# Diversity-based sampling
diverse_sample = smart_sample_dataset(
    df,
    sample_size=1000,
    strategy="diversity",
    feature_columns=["feature1", "feature2", "feature3"]
)

# Uncertainty-based sampling for active learning
uncertainty_sample = smart_sample_dataset(
    df,
    sample_size=500,
    strategy="uncertainty",
    target_column="target",
    model_type="random_forest"
)

# Anomaly-based sampling
anomaly_sample = smart_sample_dataset(
    df,
    sample_size=200,
    strategy="anomaly",
    contamination=0.1
)

Stratified Sampling

from openmlcrawler.core.sampling import StratifiedSampler

sampler = StratifiedSampler()

# Maintain class distribution
stratified_sample = sampler.sample(
    df,
    sample_size=1000,
    stratify_column="category",
    min_samples_per_class=50
)

# Multi-column stratification
multi_stratified = sampler.sample(
    df,
    sample_size=2000,
    stratify_columns=["region", "category"],
    weights={"region": 0.6, "category": 0.4}
)

๐Ÿš€ Distributed Processing

Ray Integration

from openmlcrawler.core.distributed import create_distributed_crawler

# Create distributed crawler
crawler = create_distributed_crawler(num_workers=8)

# Process large dataset
result = crawler.process_large_dataset(
    source="https://large-dataset.example.com/data.csv",
    operations=["clean", "normalize", "validate"],
    output_format="parquet"
)

print(f"Processed {result['total_records']} records in {result['execution_time']:.2f}s")

Dask Integration

from openmlcrawler.core.distributed import create_dask_processor

# Create Dask processor
processor = create_dask_processor(cluster="local", n_workers=4)

# Process with Dask
df_computed = processor.process_dataframe(
    df,
    operations=[
        {"type": "fillna", "value": 0},
        {"type": "normalize", "method": "standard"},
        {"type": "encode", "columns": ["category"]}
    ]
)

๐Ÿง  ML Pipeline Integration

AutoML Integration

from openmlcrawler import create_automl_pipeline

# Create AutoML pipeline
automl = create_automl_pipeline()

# Run automated model selection
results = automl.run_automl(
    X_train, y_train,
    task="classification",  # or "regression"
    time_limit=600,        # 10 minutes
    metric="accuracy"
)

print(f"Best model: {results['best_model'].__class__.__name__}")
print(f"Best score: {results['best_score']:.3f}")

# Make predictions
predictions = automl.predict(X_test)

Feature Store Integration

from openmlcrawler import create_feature_store

# Create feature store
feature_store = create_feature_store()

# Store features
feature_store.store_features(
    features=X_train,
    feature_names=["feature1", "feature2", "feature3"],
    metadata={
        "source": "weather_data",
        "version": "v1.0",
        "created_at": "2024-01-01"
    }
)

# Retrieve features
stored_features = feature_store.get_features(
    feature_names=["feature1", "feature2"],
    version="latest"
)

๐Ÿ› ๏ธ Developer Tools

CLI Integration

# Load data
openmlcrawler load weather --location "Tokyo" --days 14 --output weather.csv

# Assess quality
openmlcrawler quality weather.csv --format json --output quality.json

# Export data
openmlcrawler export weather.csv --format parquet --output weather.parquet

# Run workflow
openmlcrawler workflow run pipeline.yaml

Programmatic API

from openmlcrawler import OpenMLCrawler

# Create crawler instance
crawler = OpenMLCrawler()

# Configure settings
crawler.config.cache.enabled = True
crawler.config.processing.max_workers = 8

# Load and process data
df = crawler.load_dataset("weather", location="Paris", days=7)
clean_df = crawler.clean_data(df)
result_df = crawler.prepare_for_ml(clean_df, target_column="temperature")

# Export results
crawler.export_dataset(result_df, "processed_weather.csv")

This covers the core features of OpenML Crawler. Each feature is designed to work seamlessly together, allowing you to build comprehensive data processing pipelines with minimal code.