Skip to content

Workspace

The Workspace class is the unified entry point for all Mixpanel data operations.

Explore on DeepWiki

🤖 Workspace Class Deep Dive →

Ask questions about Workspace methods, explore usage patterns, or understand how services are orchestrated.

Overview

Workspace orchestrates four internal services:

  • DiscoveryService — Schema exploration (events, properties, funnels, cohorts)
  • FetcherService — Data ingestion from Mixpanel to DuckDB, or streaming without storage
  • LiveQueryService — Real-time analytics queries
  • StorageEngine — Local SQL query execution

Key Features

Parallel Fetching

For large date ranges, use parallel=True for up to 10x faster exports:

# Parallel fetch for large date ranges (recommended)
result = ws.fetch_events(
    name="events",
    from_date="2024-01-01",
    to_date="2024-12-31",
    parallel=True
)

print(f"Fetched {result.total_rows} rows in {result.duration_seconds:.1f}s")

Parallel fetching:

  • Splits date ranges into 7-day chunks (configurable via chunk_days)
  • Fetches chunks concurrently (configurable via max_workers, default: 10)
  • Returns ParallelFetchResult with batch statistics and failure tracking
  • Supports progress callbacks via on_batch_complete

Parallel Profile Fetching

For large profile datasets, use parallel=True for up to 5x faster exports:

# Parallel profile fetch for large datasets
result = ws.fetch_profiles(
    name="users",
    parallel=True,
    max_workers=5  # Default and max is 5
)

print(f"Fetched {result.total_rows} profiles in {result.duration_seconds:.1f}s")
print(f"Pages: {result.successful_pages} succeeded, {result.failed_pages} failed")

Parallel profile fetching:

  • Uses page-based parallelism with session IDs for consistency
  • Fetches pages concurrently (configurable via max_workers, default: 5, max: 5)
  • Returns ParallelProfileResult with page statistics and failure tracking
  • Supports progress callbacks via on_page_complete

Append Mode

The fetch_events() and fetch_profiles() methods support an append parameter for incremental data loading:

# Initial fetch
ws.fetch_events(name="events", from_date="2025-01-01", to_date="2025-01-31")

# Append more data (duplicates are automatically skipped)
ws.fetch_events(name="events", from_date="2025-02-01", to_date="2025-02-28", append=True)

This is useful for:

  • Incremental loading: Fetch data in chunks without creating multiple tables
  • Crash recovery: Resume a failed fetch from the last successful point
  • Extending date ranges: Add more historical or recent data to an existing table
  • Retrying failed parallel batches: Use append mode to retry specific date ranges

Duplicate events (by insert_id) and profiles (by distinct_id) are automatically skipped via INSERT OR IGNORE.

Advanced Profile Fetching

The fetch_profiles() and stream_profiles() methods support advanced filtering options:

# Fetch specific users by ID
ws.fetch_profiles(name="vip_users", distinct_ids=["user_1", "user_2", "user_3"])

# Fetch group profiles (e.g., companies)
ws.fetch_profiles(name="companies", group_id="companies")

# Fetch users based on behavior
ws.fetch_profiles(
    name="purchasers",
    behaviors=[{"window": "30d", "name": "buyers", "event_selectors": [{"event": "Purchase"}]}],
    where='(behaviors["buyers"] > 0)'
)

# Query historical profile state
ws.fetch_profiles(
    name="profiles_last_week",
    as_of_timestamp=int(time.time()) - 604800  # 7 days ago
)

# Get all users with cohort membership marked
ws.fetch_profiles(
    name="cohort_analysis",
    cohort_id="12345",
    include_all_users=True
)

Parameter constraints:

  • distinct_id and distinct_ids are mutually exclusive
  • behaviors and cohort_id are mutually exclusive
  • include_all_users requires cohort_id to be set

Class Reference

mixpanel_data.Workspace

Workspace(
    account: str | None = None,
    project_id: str | None = None,
    region: str | None = None,
    path: str | Path | None = None,
    read_only: bool = False,
    _config_manager: ConfigManager | None = None,
    _api_client: MixpanelAPIClient | None = None,
    _storage: StorageEngine | None = None,
)

Unified entry point for Mixpanel data operations.

The Workspace class is a facade that orchestrates all services: - DiscoveryService for schema exploration - FetcherService for data ingestion - LiveQueryService for real-time analytics - StorageEngine for local SQL queries

Examples:

Basic usage with credentials from config:

ws = Workspace()
ws.fetch_events(from_date="2024-01-01", to_date="2024-01-31")
df = ws.sql("SELECT * FROM events LIMIT 10")

Ephemeral workspace for temporary analysis:

with Workspace.ephemeral() as ws:
    ws.fetch_events(from_date="2024-01-01", to_date="2024-01-31")
    total = ws.sql_scalar("SELECT COUNT(*) FROM events")
# Database automatically deleted

Query-only access to existing database:

ws = Workspace.open("path/to/database.db")
df = ws.sql("SELECT * FROM events")

Create a new Workspace with credentials and optional database path.

Credentials are resolved in priority order: 1. Environment variables (MP_USERNAME, MP_SECRET, MP_PROJECT_ID, MP_REGION) 2. Named account from config file (if account parameter specified) 3. Default account from config file

PARAMETER DESCRIPTION
account

Named account from config file to use.

TYPE: str | None DEFAULT: None

project_id

Override project ID from credentials.

TYPE: str | None DEFAULT: None

region

Override region from credentials (us, eu, in).

TYPE: str | None DEFAULT: None

path

Path to database file. If None, uses default location.

TYPE: str | Path | None DEFAULT: None

read_only

If True, open database in read-only mode allowing concurrent reads. Defaults to False (write access).

TYPE: bool DEFAULT: False

_config_manager

Injected ConfigManager for testing.

TYPE: ConfigManager | None DEFAULT: None

_api_client

Injected MixpanelAPIClient for testing.

TYPE: MixpanelAPIClient | None DEFAULT: None

_storage

Injected StorageEngine for testing.

TYPE: StorageEngine | None DEFAULT: None

RAISES DESCRIPTION
ConfigError

If no credentials can be resolved.

AccountNotFoundError

If named account doesn't exist.

Source code in src/mixpanel_data/workspace.py
def __init__(
    self,
    account: str | None = None,
    project_id: str | None = None,
    region: str | None = None,
    path: str | Path | None = None,
    read_only: bool = False,
    # Dependency injection for testing
    _config_manager: ConfigManager | None = None,
    _api_client: MixpanelAPIClient | None = None,
    _storage: StorageEngine | None = None,
) -> None:
    """Create a new Workspace with credentials and optional database path.

    Credentials are resolved in priority order:
    1. Environment variables (MP_USERNAME, MP_SECRET, MP_PROJECT_ID, MP_REGION)
    2. Named account from config file (if account parameter specified)
    3. Default account from config file

    Args:
        account: Named account from config file to use.
        project_id: Override project ID from credentials.
        region: Override region from credentials (us, eu, in).
        path: Path to database file. If None, uses default location.
        read_only: If True, open database in read-only mode allowing
            concurrent reads. Defaults to False (write access).
        _config_manager: Injected ConfigManager for testing.
        _api_client: Injected MixpanelAPIClient for testing.
        _storage: Injected StorageEngine for testing.

    Raises:
        ConfigError: If no credentials can be resolved.
        AccountNotFoundError: If named account doesn't exist.
    """
    # Store injected or create default ConfigManager
    self._config_manager = _config_manager or ConfigManager()

    # Resolve credentials
    self._credentials: Credentials | None = None
    self._account_name: str | None = account

    # Resolve credentials (may raise ConfigError or AccountNotFoundError)
    self._credentials = self._config_manager.resolve_credentials(account)

    # Apply overrides if provided
    if project_id or region:
        from typing import cast

        from pydantic import SecretStr

        from mixpanel_data._internal.config import RegionType

        resolved_region = region or self._credentials.region
        self._credentials = Credentials(
            username=self._credentials.username,
            secret=SecretStr(self._credentials.secret.get_secret_value()),
            project_id=project_id or self._credentials.project_id,
            region=cast(RegionType, resolved_region),
        )

    # Initialize storage lazily
    # Store path for lazy initialization, or use injected storage directly
    self._db_path: Path | None = None
    self._storage: StorageEngine | None = None
    self._read_only = read_only

    if _storage is not None:
        # Injected storage - use directly
        self._storage = _storage
    else:
        # Determine database path for lazy initialization
        if path is not None:
            self._db_path = Path(path) if isinstance(path, str) else path
        else:
            # Default path: ~/.mp/data/{project_id}.db
            self._db_path = (
                Path.home() / ".mp" / "data" / f"{self._credentials.project_id}.db"
            )
        # NOTE: StorageEngine is NOT created here - see storage property

    # Lazy-initialized services (None until first use)
    self._api_client: MixpanelAPIClient | None = _api_client
    self._discovery: DiscoveryService | None = None
    self._fetcher: FetcherService | None = None
    self._live_query: LiveQueryService | None = None

connection property

connection: DuckDBPyConnection

Direct access to the DuckDB connection.

Use this for operations not covered by the Workspace API.

RETURNS DESCRIPTION
DuckDBPyConnection

The underlying DuckDB connection.

db_path property

db_path: Path | None

Path to the DuckDB database file.

Returns the filesystem path where data is stored. Useful for: - Knowing where your data lives - Opening the same database later with Workspace.open(path) - Debugging and logging

RETURNS DESCRIPTION
Path | None

The database file path, or None for in-memory workspaces.

Example

Save the path for later use::

ws = mp.Workspace()
path = ws.db_path
ws.close()

# Later, reopen the same database
ws = mp.Workspace.open(path)

api property

api: MixpanelAPIClient

Direct access to the Mixpanel API client.

Use this escape hatch for Mixpanel API operations not covered by the Workspace class. The client handles authentication automatically.

The client provides
  • request(method, url, **kwargs): Make authenticated requests to any Mixpanel API endpoint.
  • project_id: The configured project ID for constructing URLs.
  • region: The configured region ('us', 'eu', or 'in').
RETURNS DESCRIPTION
MixpanelAPIClient

The underlying MixpanelAPIClient.

RAISES DESCRIPTION
ConfigError

If API credentials not available.

Example

Fetch event schema from the Lexicon Schemas API::

import mixpanel_data as mp
from urllib.parse import quote

ws = mp.Workspace()
client = ws.api

# Build the URL with proper encoding
event_name = quote("Added To Cart", safe="")
url = f"https://mixpanel.com/api/app/projects/{client.project_id}/schemas/event/{event_name}"

# Make the authenticated request
schema = client.request("GET", url)
print(schema)

ephemeral classmethod

ephemeral(
    account: str | None = None,
    project_id: str | None = None,
    region: str | None = None,
    _config_manager: ConfigManager | None = None,
    _api_client: MixpanelAPIClient | None = None,
) -> Iterator[Workspace]

Create a temporary workspace that auto-deletes on exit.

PARAMETER DESCRIPTION
account

Named account from config file to use.

TYPE: str | None DEFAULT: None

project_id

Override project ID from credentials.

TYPE: str | None DEFAULT: None

region

Override region from credentials.

TYPE: str | None DEFAULT: None

_config_manager

Injected ConfigManager for testing.

TYPE: ConfigManager | None DEFAULT: None

_api_client

Injected MixpanelAPIClient for testing.

TYPE: MixpanelAPIClient | None DEFAULT: None

YIELDS DESCRIPTION
Workspace

A workspace with temporary database.

TYPE:: Workspace

Example
with Workspace.ephemeral() as ws:
    ws.fetch_events(from_date="2024-01-01", to_date="2024-01-31")
    print(ws.sql_scalar("SELECT COUNT(*) FROM events"))
# Database file automatically deleted
Source code in src/mixpanel_data/workspace.py
@classmethod
@contextmanager
def ephemeral(
    cls,
    account: str | None = None,
    project_id: str | None = None,
    region: str | None = None,
    _config_manager: ConfigManager | None = None,
    _api_client: MixpanelAPIClient | None = None,
) -> Iterator[Workspace]:
    """Create a temporary workspace that auto-deletes on exit.

    Args:
        account: Named account from config file to use.
        project_id: Override project ID from credentials.
        region: Override region from credentials.
        _config_manager: Injected ConfigManager for testing.
        _api_client: Injected MixpanelAPIClient for testing.

    Yields:
        Workspace: A workspace with temporary database.

    Example:
        ```python
        with Workspace.ephemeral() as ws:
            ws.fetch_events(from_date="2024-01-01", to_date="2024-01-31")
            print(ws.sql_scalar("SELECT COUNT(*) FROM events"))
        # Database file automatically deleted
        ```
    """
    storage = StorageEngine.ephemeral()
    ws = cls(
        account=account,
        project_id=project_id,
        region=region,
        _config_manager=_config_manager,
        _api_client=_api_client,
        _storage=storage,
    )
    try:
        yield ws
    finally:
        ws.close()

memory classmethod

memory(
    account: str | None = None,
    project_id: str | None = None,
    region: str | None = None,
    _config_manager: ConfigManager | None = None,
    _api_client: MixpanelAPIClient | None = None,
) -> Iterator[Workspace]

Create a workspace with true in-memory database.

The database exists only in RAM with zero disk footprint. All data is lost when the context manager exits.

Best for: - Small datasets where zero disk footprint is required - Unit tests without filesystem side effects - Quick exploratory queries

For large datasets, prefer ephemeral() which benefits from DuckDB's compression (can be 8x faster for large workloads).

PARAMETER DESCRIPTION
account

Named account from config file to use.

TYPE: str | None DEFAULT: None

project_id

Override project ID from credentials.

TYPE: str | None DEFAULT: None

region

Override region from credentials.

TYPE: str | None DEFAULT: None

_config_manager

Injected ConfigManager for testing.

TYPE: ConfigManager | None DEFAULT: None

_api_client

Injected MixpanelAPIClient for testing.

TYPE: MixpanelAPIClient | None DEFAULT: None

YIELDS DESCRIPTION
Workspace

A workspace with in-memory database.

TYPE:: Workspace

Example
with Workspace.memory() as ws:
    ws.fetch_events(from_date="2024-01-01", to_date="2024-01-01")
    total = ws.sql_scalar("SELECT COUNT(*) FROM events")
# Database gone - no cleanup needed, no files left behind
Source code in src/mixpanel_data/workspace.py
@classmethod
@contextmanager
def memory(
    cls,
    account: str | None = None,
    project_id: str | None = None,
    region: str | None = None,
    _config_manager: ConfigManager | None = None,
    _api_client: MixpanelAPIClient | None = None,
) -> Iterator[Workspace]:
    """Create a workspace with true in-memory database.

    The database exists only in RAM with zero disk footprint.
    All data is lost when the context manager exits.

    Best for:
    - Small datasets where zero disk footprint is required
    - Unit tests without filesystem side effects
    - Quick exploratory queries

    For large datasets, prefer ephemeral() which benefits from
    DuckDB's compression (can be 8x faster for large workloads).

    Args:
        account: Named account from config file to use.
        project_id: Override project ID from credentials.
        region: Override region from credentials.
        _config_manager: Injected ConfigManager for testing.
        _api_client: Injected MixpanelAPIClient for testing.

    Yields:
        Workspace: A workspace with in-memory database.

    Example:
        ```python
        with Workspace.memory() as ws:
            ws.fetch_events(from_date="2024-01-01", to_date="2024-01-01")
            total = ws.sql_scalar("SELECT COUNT(*) FROM events")
        # Database gone - no cleanup needed, no files left behind
        ```
    """
    storage = StorageEngine.memory()
    ws = cls(
        account=account,
        project_id=project_id,
        region=region,
        _config_manager=_config_manager,
        _api_client=_api_client,
        _storage=storage,
    )
    try:
        yield ws
    finally:
        ws.close()

open classmethod

open(path: str | Path, *, read_only: bool = True) -> Workspace

Open an existing database for query-only access.

This method opens a database without requiring API credentials. Discovery, fetching, and live query methods will be unavailable.

PARAMETER DESCRIPTION
path

Path to existing database file.

TYPE: str | Path

read_only

If True (default), open in read-only mode allowing concurrent reads. Set to False for write access.

TYPE: bool DEFAULT: True

RETURNS DESCRIPTION
Workspace

A workspace with access to stored data.

TYPE: Workspace

RAISES DESCRIPTION
FileNotFoundError

If database file doesn't exist.

Example
ws = Workspace.open("my_data.db")
df = ws.sql("SELECT * FROM events")
ws.close()
Source code in src/mixpanel_data/workspace.py
@classmethod
def open(cls, path: str | Path, *, read_only: bool = True) -> Workspace:
    """Open an existing database for query-only access.

    This method opens a database without requiring API credentials.
    Discovery, fetching, and live query methods will be unavailable.

    Args:
        path: Path to existing database file.
        read_only: If True (default), open in read-only mode allowing
            concurrent reads. Set to False for write access.

    Returns:
        Workspace: A workspace with access to stored data.

    Raises:
        FileNotFoundError: If database file doesn't exist.

    Example:
        ```python
        ws = Workspace.open("my_data.db")
        df = ws.sql("SELECT * FROM events")
        ws.close()
        ```
    """
    db_path = Path(path) if isinstance(path, str) else path
    storage = StorageEngine.open_existing(db_path, read_only=read_only)

    # Create instance without credential resolution
    instance = object.__new__(cls)
    instance._config_manager = ConfigManager()
    instance._credentials = None
    instance._account_name = None
    instance._db_path = db_path
    instance._storage = storage
    instance._read_only = read_only
    instance._api_client = None
    instance._discovery = None
    instance._fetcher = None
    instance._live_query = None

    return instance

close

close() -> None

Close all resources (database connection, HTTP client).

This method is idempotent and safe to call multiple times.

Source code in src/mixpanel_data/workspace.py
def close(self) -> None:
    """Close all resources (database connection, HTTP client).

    This method is idempotent and safe to call multiple times.
    """
    # Close storage
    if self._storage is not None:
        self._storage.close()

    # Close API client if we created one
    if self._api_client is not None:
        self._api_client.close()
        self._api_client = None

test_credentials staticmethod

test_credentials(account: str | None = None) -> dict[str, Any]

Test account credentials by making a lightweight API call.

This method verifies that credentials are valid and can access the Mixpanel API. It's useful for validating configuration before attempting more expensive operations.

PARAMETER DESCRIPTION
account

Named account to test. If None, tests the default account or credentials from environment variables.

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
dict[str, Any]

Dict containing: - success: bool - Whether the test succeeded - account: str | None - Account name tested - project_id: str - Project ID from credentials - region: str - Region from credentials - events_found: int - Number of events found (validation metric)

RAISES DESCRIPTION
AccountNotFoundError

If named account doesn't exist.

AuthenticationError

If credentials are invalid.

ConfigError

If no credentials can be resolved.

Example
# Test default account
result = Workspace.test_credentials()
if result["success"]:
    print(f"Authenticated to project {result['project_id']}")

# Test specific account
result = Workspace.test_credentials("production")
Source code in src/mixpanel_data/workspace.py
@staticmethod
def test_credentials(account: str | None = None) -> dict[str, Any]:
    """Test account credentials by making a lightweight API call.

    This method verifies that credentials are valid and can access the
    Mixpanel API. It's useful for validating configuration before
    attempting more expensive operations.

    Args:
        account: Named account to test. If None, tests the default account
            or credentials from environment variables.

    Returns:
        Dict containing:
            - success: bool - Whether the test succeeded
            - account: str | None - Account name tested
            - project_id: str - Project ID from credentials
            - region: str - Region from credentials
            - events_found: int - Number of events found (validation metric)

    Raises:
        AccountNotFoundError: If named account doesn't exist.
        AuthenticationError: If credentials are invalid.
        ConfigError: If no credentials can be resolved.

    Example:
        ```python
        # Test default account
        result = Workspace.test_credentials()
        if result["success"]:
            print(f"Authenticated to project {result['project_id']}")

        # Test specific account
        result = Workspace.test_credentials("production")
        ```
    """
    config_manager = ConfigManager()
    credentials = config_manager.resolve_credentials(account)

    # Get account info if we used a named account
    account_info = None
    if account is not None:
        account_info = config_manager.get_account(account)
    else:
        # Check if credentials came from a default account
        accounts = config_manager.list_accounts()
        for acc in accounts:
            if acc.is_default:
                account_info = acc
                break

    # Create API client and test with a lightweight call
    api_client = MixpanelAPIClient(credentials)
    try:
        events = api_client.get_events()
        event_count = len(list(events)) if events else 0

        return {
            "success": True,
            "account": account_info.name if account_info else None,
            "project_id": credentials.project_id,
            "region": credentials.region,
            "events_found": event_count,
        }
    finally:
        api_client.close()

events

events() -> list[str]

List all event names in the Mixpanel project.

Results are cached for the lifetime of the Workspace.

RETURNS DESCRIPTION
list[str]

Alphabetically sorted list of event names.

RAISES DESCRIPTION
ConfigError

If API credentials not available.

AuthenticationError

If credentials are invalid.

Source code in src/mixpanel_data/workspace.py
def events(self) -> list[str]:
    """List all event names in the Mixpanel project.

    Results are cached for the lifetime of the Workspace.

    Returns:
        Alphabetically sorted list of event names.

    Raises:
        ConfigError: If API credentials not available.
        AuthenticationError: If credentials are invalid.
    """
    return self._discovery_service.list_events()

properties

properties(event: str) -> list[str]

List all property names for an event.

Results are cached per event for the lifetime of the Workspace.

PARAMETER DESCRIPTION
event

Event name to get properties for.

TYPE: str

RETURNS DESCRIPTION
list[str]

Alphabetically sorted list of property names.

RAISES DESCRIPTION
ConfigError

If API credentials not available.

Source code in src/mixpanel_data/workspace.py
def properties(self, event: str) -> list[str]:
    """List all property names for an event.

    Results are cached per event for the lifetime of the Workspace.

    Args:
        event: Event name to get properties for.

    Returns:
        Alphabetically sorted list of property names.

    Raises:
        ConfigError: If API credentials not available.
    """
    return self._discovery_service.list_properties(event)

property_values

property_values(
    property_name: str, *, event: str | None = None, limit: int = 100
) -> list[str]

Get sample values for a property.

Results are cached per (property, event, limit) for the lifetime of the Workspace.

PARAMETER DESCRIPTION
property_name

Property to get values for.

TYPE: str

event

Optional event to filter by.

TYPE: str | None DEFAULT: None

limit

Maximum number of values to return.

TYPE: int DEFAULT: 100

RETURNS DESCRIPTION
list[str]

List of sample property values as strings.

RAISES DESCRIPTION
ConfigError

If API credentials not available.

Source code in src/mixpanel_data/workspace.py
def property_values(
    self,
    property_name: str,
    *,
    event: str | None = None,
    limit: int = 100,
) -> list[str]:
    """Get sample values for a property.

    Results are cached per (property, event, limit) for the lifetime of the Workspace.

    Args:
        property_name: Property to get values for.
        event: Optional event to filter by.
        limit: Maximum number of values to return.

    Returns:
        List of sample property values as strings.

    Raises:
        ConfigError: If API credentials not available.
    """
    return self._discovery_service.list_property_values(
        property_name, event=event, limit=limit
    )

funnels

funnels() -> list[FunnelInfo]

List saved funnels in the Mixpanel project.

Results are cached for the lifetime of the Workspace.

RETURNS DESCRIPTION
list[FunnelInfo]

List of FunnelInfo objects (funnel_id, name).

RAISES DESCRIPTION
ConfigError

If API credentials not available.

Source code in src/mixpanel_data/workspace.py
def funnels(self) -> list[FunnelInfo]:
    """List saved funnels in the Mixpanel project.

    Results are cached for the lifetime of the Workspace.

    Returns:
        List of FunnelInfo objects (funnel_id, name).

    Raises:
        ConfigError: If API credentials not available.
    """
    return self._discovery_service.list_funnels()

cohorts

cohorts() -> list[SavedCohort]

List saved cohorts in the Mixpanel project.

Results are cached for the lifetime of the Workspace.

RETURNS DESCRIPTION
list[SavedCohort]

List of SavedCohort objects.

RAISES DESCRIPTION
ConfigError

If API credentials not available.

Source code in src/mixpanel_data/workspace.py
def cohorts(self) -> list[SavedCohort]:
    """List saved cohorts in the Mixpanel project.

    Results are cached for the lifetime of the Workspace.

    Returns:
        List of SavedCohort objects.

    Raises:
        ConfigError: If API credentials not available.
    """
    return self._discovery_service.list_cohorts()

list_bookmarks

list_bookmarks(bookmark_type: BookmarkType | None = None) -> list[BookmarkInfo]

List all saved reports (bookmarks) in the project.

Retrieves metadata for all saved Insights, Funnel, Retention, and Flows reports in the project.

PARAMETER DESCRIPTION
bookmark_type

Optional filter by report type. Valid values are 'insights', 'funnels', 'retention', 'flows', 'launch-analysis'. If None, returns all bookmark types.

TYPE: BookmarkType | None DEFAULT: None

RETURNS DESCRIPTION
list[BookmarkInfo]

List of BookmarkInfo objects with report metadata.

list[BookmarkInfo]

Empty list if no bookmarks exist.

RAISES DESCRIPTION
ConfigError

If API credentials not available.

QueryError

Permission denied or invalid type parameter.

Source code in src/mixpanel_data/workspace.py
def list_bookmarks(
    self,
    bookmark_type: BookmarkType | None = None,
) -> list[BookmarkInfo]:
    """List all saved reports (bookmarks) in the project.

    Retrieves metadata for all saved Insights, Funnel, Retention, and
    Flows reports in the project.

    Args:
        bookmark_type: Optional filter by report type. Valid values are
            'insights', 'funnels', 'retention', 'flows', 'launch-analysis'.
            If None, returns all bookmark types.

    Returns:
        List of BookmarkInfo objects with report metadata.
        Empty list if no bookmarks exist.

    Raises:
        ConfigError: If API credentials not available.
        QueryError: Permission denied or invalid type parameter.
    """
    return self._discovery_service.list_bookmarks(bookmark_type=bookmark_type)

top_events

top_events(
    *,
    type: Literal["general", "average", "unique"] = "general",
    limit: int | None = None,
) -> list[TopEvent]

Get today's most active events.

This method is NOT cached (returns real-time data).

PARAMETER DESCRIPTION
type

Counting method (general, average, unique).

TYPE: Literal['general', 'average', 'unique'] DEFAULT: 'general'

limit

Maximum number of events to return.

TYPE: int | None DEFAULT: None

RETURNS DESCRIPTION
list[TopEvent]

List of TopEvent objects (event, count, percent_change).

RAISES DESCRIPTION
ConfigError

If API credentials not available.

Source code in src/mixpanel_data/workspace.py
def top_events(
    self,
    *,
    type: Literal["general", "average", "unique"] = "general",
    limit: int | None = None,
) -> list[TopEvent]:
    """Get today's most active events.

    This method is NOT cached (returns real-time data).

    Args:
        type: Counting method (general, average, unique).
        limit: Maximum number of events to return.

    Returns:
        List of TopEvent objects (event, count, percent_change).

    Raises:
        ConfigError: If API credentials not available.
    """
    return self._discovery_service.list_top_events(type=type, limit=limit)

lexicon_schemas

lexicon_schemas(
    *, entity_type: EntityType | None = None
) -> list[LexiconSchema]

List Lexicon schemas in the project.

Retrieves documented event and profile property schemas from the Mixpanel Lexicon (data dictionary).

Results are cached for the lifetime of the Workspace.

PARAMETER DESCRIPTION
entity_type

Optional filter by type ("event" or "profile"). If None, returns all schemas.

TYPE: EntityType | None DEFAULT: None

RETURNS DESCRIPTION
list[LexiconSchema]

Alphabetically sorted list of LexiconSchema objects.

RAISES DESCRIPTION
ConfigError

If API credentials not available.

AuthenticationError

If credentials are invalid.

Note

The Lexicon API has a strict 5 requests/minute rate limit. Caching helps avoid hitting this limit; call clear_discovery_cache() only when fresh data is needed.

Source code in src/mixpanel_data/workspace.py
def lexicon_schemas(
    self,
    *,
    entity_type: EntityType | None = None,
) -> list[LexiconSchema]:
    """List Lexicon schemas in the project.

    Retrieves documented event and profile property schemas from the
    Mixpanel Lexicon (data dictionary).

    Results are cached for the lifetime of the Workspace.

    Args:
        entity_type: Optional filter by type ("event" or "profile").
            If None, returns all schemas.

    Returns:
        Alphabetically sorted list of LexiconSchema objects.

    Raises:
        ConfigError: If API credentials not available.
        AuthenticationError: If credentials are invalid.

    Note:
        The Lexicon API has a strict 5 requests/minute rate limit.
        Caching helps avoid hitting this limit; call clear_discovery_cache()
        only when fresh data is needed.
    """
    return self._discovery_service.list_schemas(entity_type=entity_type)

lexicon_schema

lexicon_schema(entity_type: EntityType, name: str) -> LexiconSchema

Get a single Lexicon schema by entity type and name.

Retrieves a documented schema for a specific event or profile property from the Mixpanel Lexicon (data dictionary).

Results are cached for the lifetime of the Workspace.

PARAMETER DESCRIPTION
entity_type

Entity type ("event" or "profile").

TYPE: EntityType

name

Entity name.

TYPE: str

RETURNS DESCRIPTION
LexiconSchema

LexiconSchema for the specified entity.

RAISES DESCRIPTION
ConfigError

If API credentials not available.

AuthenticationError

If credentials are invalid.

QueryError

If schema not found.

Note

The Lexicon API has a strict 5 requests/minute rate limit. Caching helps avoid hitting this limit; call clear_discovery_cache() only when fresh data is needed.

Source code in src/mixpanel_data/workspace.py
def lexicon_schema(
    self,
    entity_type: EntityType,
    name: str,
) -> LexiconSchema:
    """Get a single Lexicon schema by entity type and name.

    Retrieves a documented schema for a specific event or profile property
    from the Mixpanel Lexicon (data dictionary).

    Results are cached for the lifetime of the Workspace.

    Args:
        entity_type: Entity type ("event" or "profile").
        name: Entity name.

    Returns:
        LexiconSchema for the specified entity.

    Raises:
        ConfigError: If API credentials not available.
        AuthenticationError: If credentials are invalid.
        QueryError: If schema not found.

    Note:
        The Lexicon API has a strict 5 requests/minute rate limit.
        Caching helps avoid hitting this limit; call clear_discovery_cache()
        only when fresh data is needed.
    """
    return self._discovery_service.get_schema(entity_type, name)

clear_discovery_cache

clear_discovery_cache() -> None

Clear cached discovery results.

Subsequent discovery calls will fetch fresh data from the API.

Source code in src/mixpanel_data/workspace.py
def clear_discovery_cache(self) -> None:
    """Clear cached discovery results.

    Subsequent discovery calls will fetch fresh data from the API.
    """
    if self._discovery is not None:
        self._discovery.clear_cache()

fetch_events

fetch_events(
    name: str = "events",
    *,
    from_date: str,
    to_date: str,
    events: list[str] | None = None,
    where: str | None = None,
    limit: int | None = None,
    progress: bool = True,
    append: bool = False,
    batch_size: int = 1000,
    parallel: bool = False,
    max_workers: int | None = None,
    on_batch_complete: Callable[[BatchProgress], None] | None = None,
    chunk_days: int = 7,
) -> FetchResult | ParallelFetchResult

Fetch events from Mixpanel and store in local database.

Note

This is a potentially long-running operation that streams data from Mixpanel's Export API. For large date ranges, use parallel=True for significantly faster exports (up to 10x speedup).

PARAMETER DESCRIPTION
name

Table name to create or append to (default: "events").

TYPE: str DEFAULT: 'events'

from_date

Start date (YYYY-MM-DD).

TYPE: str

to_date

End date (YYYY-MM-DD).

TYPE: str

events

Optional list of event names to filter.

TYPE: list[str] | None DEFAULT: None

where

Optional WHERE clause for filtering.

TYPE: str | None DEFAULT: None

limit

Optional maximum number of events to return (max 100000).

TYPE: int | None DEFAULT: None

progress

Show progress bar (default: True).

TYPE: bool DEFAULT: True

append

If True, append to existing table. If False (default), create new.

TYPE: bool DEFAULT: False

batch_size

Number of rows per INSERT/COMMIT cycle. Controls the memory/IO tradeoff: smaller values use less memory but more disk IO; larger values use more memory but less IO. Default: 1000. Valid range: 100-100000.

TYPE: int DEFAULT: 1000

parallel

If True, use parallel fetching with multiple threads. Splits date range into 7-day chunks and fetches concurrently. Enables export of date ranges exceeding 100 days. Default: False.

TYPE: bool DEFAULT: False

max_workers

Maximum concurrent fetch threads when parallel=True. Default: 10. Higher values may hit Mixpanel rate limits. Ignored when parallel=False.

TYPE: int | None DEFAULT: None

on_batch_complete

Callback invoked when each batch completes during parallel fetch. Receives BatchProgress with status. Useful for custom progress reporting. Ignored when parallel=False.

TYPE: Callable[[BatchProgress], None] | None DEFAULT: None

chunk_days

Days per chunk for parallel date range splitting. Default: 7. Valid range: 1-100. Smaller values create more parallel batches but may increase API overhead. Ignored when parallel=False.

TYPE: int DEFAULT: 7

RETURNS DESCRIPTION
FetchResult | ParallelFetchResult

FetchResult when parallel=False, ParallelFetchResult when parallel=True.

FetchResult | ParallelFetchResult

ParallelFetchResult includes per-batch statistics and any failure info.

RAISES DESCRIPTION
TableExistsError

If table exists and append=False.

TableNotFoundError

If table doesn't exist and append=True.

ConfigError

If API credentials not available.

AuthenticationError

If credentials are invalid.

ValueError

If batch_size is outside valid range (100-100000).

ValueError

If limit is outside valid range (1-100000).

ValueError

If max_workers is not positive.

ValueError

If chunk_days is not in range 1-100.

Example
# Sequential fetch (default)
result = ws.fetch_events(
    name="events",
    from_date="2024-01-01",
    to_date="2024-01-31",
)

# Parallel fetch for large date ranges
result = ws.fetch_events(
    name="events_q4",
    from_date="2024-10-01",
    to_date="2024-12-31",
    parallel=True,
)

# With custom progress callback
def on_batch(progress: BatchProgress) -> None:
    print(f"Batch {progress.batch_index + 1}/{progress.total_batches}")

result = ws.fetch_events(
    name="events",
    from_date="2024-01-01",
    to_date="2024-03-31",
    parallel=True,
    on_batch_complete=on_batch,
)
Source code in src/mixpanel_data/workspace.py
def fetch_events(
    self,
    name: str = "events",
    *,
    from_date: str,
    to_date: str,
    events: list[str] | None = None,
    where: str | None = None,
    limit: int | None = None,
    progress: bool = True,
    append: bool = False,
    batch_size: int = 1000,
    parallel: bool = False,
    max_workers: int | None = None,
    on_batch_complete: Callable[[BatchProgress], None] | None = None,
    chunk_days: int = 7,
) -> FetchResult | ParallelFetchResult:
    """Fetch events from Mixpanel and store in local database.

    Note:
        This is a potentially long-running operation that streams data from
        Mixpanel's Export API. For large date ranges, use ``parallel=True``
        for significantly faster exports (up to 10x speedup).

    Args:
        name: Table name to create or append to (default: "events").
        from_date: Start date (YYYY-MM-DD).
        to_date: End date (YYYY-MM-DD).
        events: Optional list of event names to filter.
        where: Optional WHERE clause for filtering.
        limit: Optional maximum number of events to return (max 100000).
        progress: Show progress bar (default: True).
        append: If True, append to existing table. If False (default), create new.
        batch_size: Number of rows per INSERT/COMMIT cycle. Controls the
            memory/IO tradeoff: smaller values use less memory but more
            disk IO; larger values use more memory but less IO.
            Default: 1000. Valid range: 100-100000.
        parallel: If True, use parallel fetching with multiple threads.
            Splits date range into 7-day chunks and fetches concurrently.
            Enables export of date ranges exceeding 100 days. Default: False.
        max_workers: Maximum concurrent fetch threads when parallel=True.
            Default: 10. Higher values may hit Mixpanel rate limits.
            Ignored when parallel=False.
        on_batch_complete: Callback invoked when each batch completes
            during parallel fetch. Receives BatchProgress with status.
            Useful for custom progress reporting. Ignored when parallel=False.
        chunk_days: Days per chunk for parallel date range splitting.
            Default: 7. Valid range: 1-100. Smaller values create more
            parallel batches but may increase API overhead.
            Ignored when parallel=False.

    Returns:
        FetchResult when parallel=False, ParallelFetchResult when parallel=True.
        ParallelFetchResult includes per-batch statistics and any failure info.

    Raises:
        TableExistsError: If table exists and append=False.
        TableNotFoundError: If table doesn't exist and append=True.
        ConfigError: If API credentials not available.
        AuthenticationError: If credentials are invalid.
        ValueError: If batch_size is outside valid range (100-100000).
        ValueError: If limit is outside valid range (1-100000).
        ValueError: If max_workers is not positive.
        ValueError: If chunk_days is not in range 1-100.

    Example:
        ```python
        # Sequential fetch (default)
        result = ws.fetch_events(
            name="events",
            from_date="2024-01-01",
            to_date="2024-01-31",
        )

        # Parallel fetch for large date ranges
        result = ws.fetch_events(
            name="events_q4",
            from_date="2024-10-01",
            to_date="2024-12-31",
            parallel=True,
        )

        # With custom progress callback
        def on_batch(progress: BatchProgress) -> None:
            print(f"Batch {progress.batch_index + 1}/{progress.total_batches}")

        result = ws.fetch_events(
            name="events",
            from_date="2024-01-01",
            to_date="2024-03-31",
            parallel=True,
            on_batch_complete=on_batch,
        )
        ```
    """
    # Validate parameters early to avoid wasted API calls
    _validate_batch_size(batch_size)
    _validate_limit(limit)

    # Validate max_workers for parallel mode
    if max_workers is not None and max_workers <= 0:
        raise ValueError("max_workers must be positive")

    # Validate chunk_days for parallel mode
    if chunk_days <= 0:
        raise ValueError("chunk_days must be positive")
    if chunk_days > 100:
        raise ValueError("chunk_days must be at most 100")

    # Create progress callback if requested (only for interactive terminals)
    progress_callback = None
    pbar = None
    if progress and sys.stderr.isatty() and not parallel:
        try:
            from rich.progress import Progress, SpinnerColumn, TextColumn

            pbar = Progress(
                SpinnerColumn(),
                TextColumn("[progress.description]{task.description}"),
                TextColumn("{task.completed} rows"),
            )
            task = pbar.add_task("Fetching events...", total=None)
            pbar.start()

            def callback(count: int) -> None:
                pbar.update(task, completed=count)

            progress_callback = callback
        except Exception:
            # Progress bar unavailable or failed to initialize, skip silently
            pass

    try:
        result = self._fetcher_service.fetch_events(
            name=name,
            from_date=from_date,
            to_date=to_date,
            events=events,
            where=where,
            limit=limit,
            progress_callback=progress_callback,
            append=append,
            batch_size=batch_size,
            parallel=parallel,
            max_workers=max_workers,
            on_batch_complete=on_batch_complete,
            chunk_days=chunk_days,
        )
    finally:
        if pbar is not None:
            pbar.stop()

    return result

fetch_profiles

fetch_profiles(
    name: str = "profiles",
    *,
    where: str | None = None,
    cohort_id: str | None = None,
    output_properties: list[str] | None = None,
    progress: bool = True,
    append: bool = False,
    batch_size: int = 1000,
    distinct_id: str | None = None,
    distinct_ids: list[str] | None = None,
    group_id: str | None = None,
    behaviors: list[dict[str, Any]] | None = None,
    as_of_timestamp: int | None = None,
    include_all_users: bool = False,
    parallel: bool = False,
    max_workers: int | None = None,
    on_page_complete: Callable[[ProfileProgress], None] | None = None,
) -> FetchResult | ParallelProfileResult

Fetch user profiles from Mixpanel and store in local database.

Note

This is a potentially long-running operation that streams data from Mixpanel's Engage API. For large profile sets, use parallel=True for up to 5x faster exports.

PARAMETER DESCRIPTION
name

Table name to create or append to (default: "profiles").

TYPE: str DEFAULT: 'profiles'

where

Optional WHERE clause for filtering.

TYPE: str | None DEFAULT: None

cohort_id

Optional cohort ID to filter by. Only profiles that are members of this cohort will be returned.

TYPE: str | None DEFAULT: None

output_properties

Optional list of property names to include in the response. If None, all properties are returned.

TYPE: list[str] | None DEFAULT: None

progress

Show progress bar (default: True).

TYPE: bool DEFAULT: True

append

If True, append to existing table. If False (default), create new.

TYPE: bool DEFAULT: False

batch_size

Number of rows per INSERT/COMMIT cycle. Controls the memory/IO tradeoff: smaller values use less memory but more disk IO; larger values use more memory but less IO. Default: 1000. Valid range: 100-100000.

TYPE: int DEFAULT: 1000

distinct_id

Optional single user ID to fetch. Mutually exclusive with distinct_ids.

TYPE: str | None DEFAULT: None

distinct_ids

Optional list of user IDs to fetch. Mutually exclusive with distinct_id. Duplicates are automatically removed.

TYPE: list[str] | None DEFAULT: None

group_id

Optional group type identifier (e.g., "companies") to fetch group profiles instead of user profiles.

TYPE: str | None DEFAULT: None

behaviors

Optional list of behavioral filters. Each dict should have 'window' (e.g., "30d"), 'name' (identifier), and 'event_selectors' (list of {"event": "Name"}). Use with where parameter to filter, e.g., where='(behaviors["name"] > 0)'. Mutually exclusive with cohort_id.

TYPE: list[dict[str, Any]] | None DEFAULT: None

as_of_timestamp

Optional Unix timestamp to query profile state at a specific point in time. Must be in the past.

TYPE: int | None DEFAULT: None

include_all_users

If True, include all users and mark cohort membership. Only valid when cohort_id is provided.

TYPE: bool DEFAULT: False

parallel

If True, use parallel fetching with multiple threads. Uses page-based parallelism for concurrent profile fetching. Enables up to 5x faster exports. Default: False.

TYPE: bool DEFAULT: False

max_workers

Maximum concurrent fetch threads when parallel=True. Default: 5, capped at 5. Ignored when parallel=False.

TYPE: int | None DEFAULT: None

on_page_complete

Callback invoked when each page completes during parallel fetch. Receives ProfileProgress with status. Useful for custom progress reporting. Ignored when parallel=False.

TYPE: Callable[[ProfileProgress], None] | None DEFAULT: None

RETURNS DESCRIPTION
FetchResult | ParallelProfileResult

FetchResult when parallel=False, ParallelProfileResult when parallel=True.

FetchResult | ParallelProfileResult

ParallelProfileResult includes per-page statistics and any failure info.

RAISES DESCRIPTION
TableExistsError

If table exists and append=False.

TableNotFoundError

If table doesn't exist and append=True.

ConfigError

If API credentials not available.

ValueError

If batch_size is outside valid range (100-100000) or mutually exclusive parameters are provided.

Source code in src/mixpanel_data/workspace.py
def fetch_profiles(
    self,
    name: str = "profiles",
    *,
    where: str | None = None,
    cohort_id: str | None = None,
    output_properties: list[str] | None = None,
    progress: bool = True,
    append: bool = False,
    batch_size: int = 1000,
    distinct_id: str | None = None,
    distinct_ids: list[str] | None = None,
    group_id: str | None = None,
    behaviors: list[dict[str, Any]] | None = None,
    as_of_timestamp: int | None = None,
    include_all_users: bool = False,
    parallel: bool = False,
    max_workers: int | None = None,
    on_page_complete: Callable[[ProfileProgress], None] | None = None,
) -> FetchResult | ParallelProfileResult:
    """Fetch user profiles from Mixpanel and store in local database.

    Note:
        This is a potentially long-running operation that streams data from
        Mixpanel's Engage API. For large profile sets, use ``parallel=True``
        for up to 5x faster exports.

    Args:
        name: Table name to create or append to (default: "profiles").
        where: Optional WHERE clause for filtering.
        cohort_id: Optional cohort ID to filter by. Only profiles that are
            members of this cohort will be returned.
        output_properties: Optional list of property names to include in
            the response. If None, all properties are returned.
        progress: Show progress bar (default: True).
        append: If True, append to existing table. If False (default), create new.
        batch_size: Number of rows per INSERT/COMMIT cycle. Controls the
            memory/IO tradeoff: smaller values use less memory but more
            disk IO; larger values use more memory but less IO.
            Default: 1000. Valid range: 100-100000.
        distinct_id: Optional single user ID to fetch. Mutually exclusive
            with distinct_ids.
        distinct_ids: Optional list of user IDs to fetch. Mutually exclusive
            with distinct_id. Duplicates are automatically removed.
        group_id: Optional group type identifier (e.g., "companies") to fetch
            group profiles instead of user profiles.
        behaviors: Optional list of behavioral filters. Each dict should have
            'window' (e.g., "30d"), 'name' (identifier), and 'event_selectors'
            (list of {"event": "Name"}). Use with `where` parameter to filter,
            e.g., where='(behaviors["name"] > 0)'. Mutually exclusive with
            cohort_id.
        as_of_timestamp: Optional Unix timestamp to query profile state at
            a specific point in time. Must be in the past.
        include_all_users: If True, include all users and mark cohort membership.
            Only valid when cohort_id is provided.
        parallel: If True, use parallel fetching with multiple threads.
            Uses page-based parallelism for concurrent profile fetching.
            Enables up to 5x faster exports. Default: False.
        max_workers: Maximum concurrent fetch threads when parallel=True.
            Default: 5, capped at 5. Ignored when parallel=False.
        on_page_complete: Callback invoked when each page completes during
            parallel fetch. Receives ProfileProgress with status.
            Useful for custom progress reporting. Ignored when parallel=False.

    Returns:
        FetchResult when parallel=False, ParallelProfileResult when parallel=True.
        ParallelProfileResult includes per-page statistics and any failure info.

    Raises:
        TableExistsError: If table exists and append=False.
        TableNotFoundError: If table doesn't exist and append=True.
        ConfigError: If API credentials not available.
        ValueError: If batch_size is outside valid range (100-100000) or
            mutually exclusive parameters are provided.
    """
    # Validate batch_size
    _validate_batch_size(batch_size)

    # Validate max_workers for parallel mode
    if max_workers is not None and max_workers <= 0:
        raise ValueError("max_workers must be positive")

    # Create progress callback if requested (only for interactive terminals)
    # Sequential mode uses spinner progress bar
    progress_callback = None
    pbar = None
    if progress and sys.stderr.isatty() and not parallel:
        try:
            from rich.progress import Progress, SpinnerColumn, TextColumn

            pbar = Progress(
                SpinnerColumn(),
                TextColumn("[progress.description]{task.description}"),
                TextColumn("{task.completed} rows"),
            )
            task = pbar.add_task("Fetching profiles...", total=None)
            pbar.start()

            def callback(count: int) -> None:
                pbar.update(task, completed=count)

            progress_callback = callback
        except Exception:
            # Progress bar unavailable or failed to initialize, skip silently
            pass

    try:
        result = self._fetcher_service.fetch_profiles(
            name=name,
            where=where,
            cohort_id=cohort_id,
            output_properties=output_properties,
            progress_callback=progress_callback,
            append=append,
            batch_size=batch_size,
            distinct_id=distinct_id,
            distinct_ids=distinct_ids,
            group_id=group_id,
            behaviors=behaviors,
            as_of_timestamp=as_of_timestamp,
            include_all_users=include_all_users,
            parallel=parallel,
            max_workers=max_workers,
            on_page_complete=on_page_complete,
        )
    finally:
        if pbar is not None:
            pbar.stop()

    return result

stream_events

stream_events(
    *,
    from_date: str,
    to_date: str,
    events: list[str] | None = None,
    where: str | None = None,
    limit: int | None = None,
    raw: bool = False,
) -> Iterator[dict[str, Any]]

Stream events directly from Mixpanel API without storing.

Yields events one at a time as they are received from the API. No database files or tables are created.

PARAMETER DESCRIPTION
from_date

Start date inclusive (YYYY-MM-DD format).

TYPE: str

to_date

End date inclusive (YYYY-MM-DD format).

TYPE: str

events

Optional list of event names to filter. If None, all events returned.

TYPE: list[str] | None DEFAULT: None

where

Optional Mixpanel filter expression (e.g., 'properties["country"]=="US"').

TYPE: str | None DEFAULT: None

limit

Optional maximum number of events to return (max 100000).

TYPE: int | None DEFAULT: None

raw

If True, return events in raw Mixpanel API format. If False (default), return normalized format with datetime objects.

TYPE: bool DEFAULT: False

YIELDS DESCRIPTION
dict[str, Any]

dict[str, Any]: Event dictionaries in normalized or raw format.

RAISES DESCRIPTION
ConfigError

If API credentials are not available.

AuthenticationError

If credentials are invalid.

RateLimitError

If rate limit exceeded after max retries.

QueryError

If filter expression is invalid.

ValueError

If limit is outside valid range (1-100000).

Example
ws = Workspace()
for event in ws.stream_events(from_date="2024-01-01", to_date="2024-01-31"):
    process(event)
ws.close()

With raw format:

for event in ws.stream_events(
    from_date="2024-01-01", to_date="2024-01-31", raw=True
):
    legacy_system.ingest(event)
Source code in src/mixpanel_data/workspace.py
def stream_events(
    self,
    *,
    from_date: str,
    to_date: str,
    events: list[str] | None = None,
    where: str | None = None,
    limit: int | None = None,
    raw: bool = False,
) -> Iterator[dict[str, Any]]:
    """Stream events directly from Mixpanel API without storing.

    Yields events one at a time as they are received from the API.
    No database files or tables are created.

    Args:
        from_date: Start date inclusive (YYYY-MM-DD format).
        to_date: End date inclusive (YYYY-MM-DD format).
        events: Optional list of event names to filter. If None, all events returned.
        where: Optional Mixpanel filter expression (e.g., 'properties["country"]=="US"').
        limit: Optional maximum number of events to return (max 100000).
        raw: If True, return events in raw Mixpanel API format.
             If False (default), return normalized format with datetime objects.

    Yields:
        dict[str, Any]: Event dictionaries in normalized or raw format.

    Raises:
        ConfigError: If API credentials are not available.
        AuthenticationError: If credentials are invalid.
        RateLimitError: If rate limit exceeded after max retries.
        QueryError: If filter expression is invalid.
        ValueError: If limit is outside valid range (1-100000).

    Example:
        ```python
        ws = Workspace()
        for event in ws.stream_events(from_date="2024-01-01", to_date="2024-01-31"):
            process(event)
        ws.close()
        ```

        With raw format:

        ```python
        for event in ws.stream_events(
            from_date="2024-01-01", to_date="2024-01-31", raw=True
        ):
            legacy_system.ingest(event)
        ```
    """
    # Validate limit early to avoid wasted API calls
    _validate_limit(limit)

    api_client = self._require_api_client()
    event_iterator = api_client.export_events(
        from_date=from_date,
        to_date=to_date,
        events=events,
        where=where,
        limit=limit,
    )

    if raw:
        yield from event_iterator
    else:
        for event in event_iterator:
            yield transform_event(event)

stream_profiles

stream_profiles(
    *,
    where: str | None = None,
    cohort_id: str | None = None,
    output_properties: list[str] | None = None,
    raw: bool = False,
    distinct_id: str | None = None,
    distinct_ids: list[str] | None = None,
    group_id: str | None = None,
    behaviors: list[dict[str, Any]] | None = None,
    as_of_timestamp: int | None = None,
    include_all_users: bool = False,
) -> Iterator[dict[str, Any]]

Stream user profiles directly from Mixpanel API without storing.

Yields profiles one at a time as they are received from the API. No database files or tables are created.

PARAMETER DESCRIPTION
where

Optional Mixpanel filter expression for profile properties.

TYPE: str | None DEFAULT: None

cohort_id

Optional cohort ID to filter by. Only profiles that are members of this cohort will be returned.

TYPE: str | None DEFAULT: None

output_properties

Optional list of property names to include in the response. If None, all properties are returned.

TYPE: list[str] | None DEFAULT: None

raw

If True, return profiles in raw Mixpanel API format. If False (default), return normalized format.

TYPE: bool DEFAULT: False

distinct_id

Optional single user ID to fetch. Mutually exclusive with distinct_ids.

TYPE: str | None DEFAULT: None

distinct_ids

Optional list of user IDs to fetch. Mutually exclusive with distinct_id. Duplicates are automatically removed.

TYPE: list[str] | None DEFAULT: None

group_id

Optional group type identifier (e.g., "companies") to fetch group profiles instead of user profiles.

TYPE: str | None DEFAULT: None

behaviors

Optional list of behavioral filters. Each dict should have 'window' (e.g., "30d"), 'name' (identifier), and 'event_selectors' (list of {"event": "Name"}). Use with where parameter to filter, e.g., where='(behaviors["name"] > 0)'. Mutually exclusive with cohort_id.

TYPE: list[dict[str, Any]] | None DEFAULT: None

as_of_timestamp

Optional Unix timestamp to query profile state at a specific point in time. Must be in the past.

TYPE: int | None DEFAULT: None

include_all_users

If True, include all users and mark cohort membership. Only valid when cohort_id is provided.

TYPE: bool DEFAULT: False

YIELDS DESCRIPTION
dict[str, Any]

dict[str, Any]: Profile dictionaries in normalized or raw format.

RAISES DESCRIPTION
ConfigError

If API credentials are not available.

AuthenticationError

If credentials are invalid.

RateLimitError

If rate limit exceeded after max retries.

ValueError

If mutually exclusive parameters are provided.

Example
ws = Workspace()
for profile in ws.stream_profiles():
    sync_to_crm(profile)
ws.close()

Filter to premium users:

for profile in ws.stream_profiles(where='properties["plan"]=="premium"'):
    send_survey(profile)

Filter by cohort and select specific properties:

for profile in ws.stream_profiles(
    cohort_id="12345",
    output_properties=["$email", "$name"]
):
    send_email(profile)

Fetch specific users by ID:

for profile in ws.stream_profiles(distinct_ids=["user_1", "user_2"]):
    print(profile)

Fetch group profiles:

for company in ws.stream_profiles(group_id="companies"):
    print(company)
Source code in src/mixpanel_data/workspace.py
def stream_profiles(
    self,
    *,
    where: str | None = None,
    cohort_id: str | None = None,
    output_properties: list[str] | None = None,
    raw: bool = False,
    distinct_id: str | None = None,
    distinct_ids: list[str] | None = None,
    group_id: str | None = None,
    behaviors: list[dict[str, Any]] | None = None,
    as_of_timestamp: int | None = None,
    include_all_users: bool = False,
) -> Iterator[dict[str, Any]]:
    """Stream user profiles directly from Mixpanel API without storing.

    Yields profiles one at a time as they are received from the API.
    No database files or tables are created.

    Args:
        where: Optional Mixpanel filter expression for profile properties.
        cohort_id: Optional cohort ID to filter by. Only profiles that are
            members of this cohort will be returned.
        output_properties: Optional list of property names to include in
            the response. If None, all properties are returned.
        raw: If True, return profiles in raw Mixpanel API format.
             If False (default), return normalized format.
        distinct_id: Optional single user ID to fetch. Mutually exclusive
            with distinct_ids.
        distinct_ids: Optional list of user IDs to fetch. Mutually exclusive
            with distinct_id. Duplicates are automatically removed.
        group_id: Optional group type identifier (e.g., "companies") to fetch
            group profiles instead of user profiles.
        behaviors: Optional list of behavioral filters. Each dict should have
            'window' (e.g., "30d"), 'name' (identifier), and 'event_selectors'
            (list of {"event": "Name"}). Use with `where` parameter to filter,
            e.g., where='(behaviors["name"] > 0)'. Mutually exclusive with
            cohort_id.
        as_of_timestamp: Optional Unix timestamp to query profile state at
            a specific point in time. Must be in the past.
        include_all_users: If True, include all users and mark cohort membership.
            Only valid when cohort_id is provided.

    Yields:
        dict[str, Any]: Profile dictionaries in normalized or raw format.

    Raises:
        ConfigError: If API credentials are not available.
        AuthenticationError: If credentials are invalid.
        RateLimitError: If rate limit exceeded after max retries.
        ValueError: If mutually exclusive parameters are provided.

    Example:
        ```python
        ws = Workspace()
        for profile in ws.stream_profiles():
            sync_to_crm(profile)
        ws.close()
        ```

        Filter to premium users:

        ```python
        for profile in ws.stream_profiles(where='properties["plan"]=="premium"'):
            send_survey(profile)
        ```

        Filter by cohort and select specific properties:

        ```python
        for profile in ws.stream_profiles(
            cohort_id="12345",
            output_properties=["$email", "$name"]
        ):
            send_email(profile)
        ```

        Fetch specific users by ID:

        ```python
        for profile in ws.stream_profiles(distinct_ids=["user_1", "user_2"]):
            print(profile)
        ```

        Fetch group profiles:

        ```python
        for company in ws.stream_profiles(group_id="companies"):
            print(company)
        ```
    """
    api_client = self._require_api_client()
    profile_iterator = api_client.export_profiles(
        where=where,
        cohort_id=cohort_id,
        output_properties=output_properties,
        distinct_id=distinct_id,
        distinct_ids=distinct_ids,
        group_id=group_id,
        behaviors=behaviors,
        as_of_timestamp=as_of_timestamp,
        include_all_users=include_all_users,
    )

    if raw:
        yield from profile_iterator
    else:
        for profile in profile_iterator:
            yield transform_profile(profile)

sql

sql(query: str) -> pd.DataFrame

Execute SQL query and return results as DataFrame.

PARAMETER DESCRIPTION
query

SQL query string.

TYPE: str

RETURNS DESCRIPTION
DataFrame

pandas DataFrame with query results.

RAISES DESCRIPTION
QueryError

If query is invalid.

Source code in src/mixpanel_data/workspace.py
def sql(self, query: str) -> pd.DataFrame:
    """Execute SQL query and return results as DataFrame.

    Args:
        query: SQL query string.

    Returns:
        pandas DataFrame with query results.

    Raises:
        QueryError: If query is invalid.
    """
    return self.storage.execute_df(query)

sql_scalar

sql_scalar(query: str) -> Any

Execute SQL query and return single scalar value.

PARAMETER DESCRIPTION
query

SQL query that returns a single value.

TYPE: str

RETURNS DESCRIPTION
Any

The scalar result (int, float, str, etc.).

RAISES DESCRIPTION
QueryError

If query is invalid or returns multiple values.

Source code in src/mixpanel_data/workspace.py
def sql_scalar(self, query: str) -> Any:
    """Execute SQL query and return single scalar value.

    Args:
        query: SQL query that returns a single value.

    Returns:
        The scalar result (int, float, str, etc.).

    Raises:
        QueryError: If query is invalid or returns multiple values.
    """
    return self.storage.execute_scalar(query)

sql_rows

sql_rows(query: str) -> SQLResult

Execute SQL query and return structured result with column metadata.

PARAMETER DESCRIPTION
query

SQL query string.

TYPE: str

RETURNS DESCRIPTION
SQLResult

SQLResult with column names and row tuples.

RAISES DESCRIPTION
QueryError

If query is invalid.

Example
result = ws.sql_rows("SELECT name, age FROM users")
print(result.columns)  # ['name', 'age']
for row in result.rows:
    print(row)  # ('Alice', 30)

# Or convert to dicts for JSON output:
for row in result.to_dicts():
    print(row)  # {'name': 'Alice', 'age': 30}
Source code in src/mixpanel_data/workspace.py
def sql_rows(self, query: str) -> SQLResult:
    """Execute SQL query and return structured result with column metadata.

    Args:
        query: SQL query string.

    Returns:
        SQLResult with column names and row tuples.

    Raises:
        QueryError: If query is invalid.

    Example:
        ```python
        result = ws.sql_rows("SELECT name, age FROM users")
        print(result.columns)  # ['name', 'age']
        for row in result.rows:
            print(row)  # ('Alice', 30)

        # Or convert to dicts for JSON output:
        for row in result.to_dicts():
            print(row)  # {'name': 'Alice', 'age': 30}
        ```
    """
    return self.storage.execute_rows(query)

segmentation

segmentation(
    event: str,
    *,
    from_date: str,
    to_date: str,
    on: str | None = None,
    unit: Literal["day", "week", "month"] = "day",
    where: str | None = None,
) -> SegmentationResult

Run a segmentation query against Mixpanel API.

PARAMETER DESCRIPTION
event

Event name to query.

TYPE: str

from_date

Start date (YYYY-MM-DD).

TYPE: str

to_date

End date (YYYY-MM-DD).

TYPE: str

on

Optional property to segment by.

TYPE: str | None DEFAULT: None

unit

Time unit for aggregation.

TYPE: Literal['day', 'week', 'month'] DEFAULT: 'day'

where

Optional WHERE clause.

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
SegmentationResult

SegmentationResult with time-series data.

RAISES DESCRIPTION
ConfigError

If API credentials not available.

Source code in src/mixpanel_data/workspace.py
def segmentation(
    self,
    event: str,
    *,
    from_date: str,
    to_date: str,
    on: str | None = None,
    unit: Literal["day", "week", "month"] = "day",
    where: str | None = None,
) -> SegmentationResult:
    """Run a segmentation query against Mixpanel API.

    Args:
        event: Event name to query.
        from_date: Start date (YYYY-MM-DD).
        to_date: End date (YYYY-MM-DD).
        on: Optional property to segment by.
        unit: Time unit for aggregation.
        where: Optional WHERE clause.

    Returns:
        SegmentationResult with time-series data.

    Raises:
        ConfigError: If API credentials not available.
    """
    return self._live_query_service.segmentation(
        event=event,
        from_date=from_date,
        to_date=to_date,
        on=on,
        unit=unit,
        where=where,
    )

funnel

funnel(
    funnel_id: int,
    *,
    from_date: str,
    to_date: str,
    unit: str | None = None,
    on: str | None = None,
) -> FunnelResult

Run a funnel analysis query.

PARAMETER DESCRIPTION
funnel_id

ID of saved funnel.

TYPE: int

from_date

Start date (YYYY-MM-DD).

TYPE: str

to_date

End date (YYYY-MM-DD).

TYPE: str

unit

Optional time unit.

TYPE: str | None DEFAULT: None

on

Optional property to segment by.

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
FunnelResult

FunnelResult with step conversion rates.

RAISES DESCRIPTION
ConfigError

If API credentials not available.

Source code in src/mixpanel_data/workspace.py
def funnel(
    self,
    funnel_id: int,
    *,
    from_date: str,
    to_date: str,
    unit: str | None = None,
    on: str | None = None,
) -> FunnelResult:
    """Run a funnel analysis query.

    Args:
        funnel_id: ID of saved funnel.
        from_date: Start date (YYYY-MM-DD).
        to_date: End date (YYYY-MM-DD).
        unit: Optional time unit.
        on: Optional property to segment by.

    Returns:
        FunnelResult with step conversion rates.

    Raises:
        ConfigError: If API credentials not available.
    """
    return self._live_query_service.funnel(
        funnel_id=funnel_id,
        from_date=from_date,
        to_date=to_date,
        unit=unit,
        on=on,
    )

retention

retention(
    *,
    born_event: str,
    return_event: str,
    from_date: str,
    to_date: str,
    born_where: str | None = None,
    return_where: str | None = None,
    interval: int = 1,
    interval_count: int = 10,
    unit: Literal["day", "week", "month"] = "day",
) -> RetentionResult

Run a retention analysis query.

PARAMETER DESCRIPTION
born_event

Event that defines cohort entry.

TYPE: str

return_event

Event that defines return.

TYPE: str

from_date

Start date (YYYY-MM-DD).

TYPE: str

to_date

End date (YYYY-MM-DD).

TYPE: str

born_where

Optional filter for born event.

TYPE: str | None DEFAULT: None

return_where

Optional filter for return event.

TYPE: str | None DEFAULT: None

interval

Retention interval.

TYPE: int DEFAULT: 1

interval_count

Number of intervals.

TYPE: int DEFAULT: 10

unit

Time unit.

TYPE: Literal['day', 'week', 'month'] DEFAULT: 'day'

RETURNS DESCRIPTION
RetentionResult

RetentionResult with cohort retention data.

RAISES DESCRIPTION
ConfigError

If API credentials not available.

Source code in src/mixpanel_data/workspace.py
def retention(
    self,
    *,
    born_event: str,
    return_event: str,
    from_date: str,
    to_date: str,
    born_where: str | None = None,
    return_where: str | None = None,
    interval: int = 1,
    interval_count: int = 10,
    unit: Literal["day", "week", "month"] = "day",
) -> RetentionResult:
    """Run a retention analysis query.

    Args:
        born_event: Event that defines cohort entry.
        return_event: Event that defines return.
        from_date: Start date (YYYY-MM-DD).
        to_date: End date (YYYY-MM-DD).
        born_where: Optional filter for born event.
        return_where: Optional filter for return event.
        interval: Retention interval.
        interval_count: Number of intervals.
        unit: Time unit.

    Returns:
        RetentionResult with cohort retention data.

    Raises:
        ConfigError: If API credentials not available.
    """
    return self._live_query_service.retention(
        born_event=born_event,
        return_event=return_event,
        from_date=from_date,
        to_date=to_date,
        born_where=born_where,
        return_where=return_where,
        interval=interval,
        interval_count=interval_count,
        unit=unit,
    )

jql

jql(script: str, params: dict[str, Any] | None = None) -> JQLResult

Execute a custom JQL script.

PARAMETER DESCRIPTION
script

JQL script code.

TYPE: str

params

Optional parameters to pass to script.

TYPE: dict[str, Any] | None DEFAULT: None

RETURNS DESCRIPTION
JQLResult

JQLResult with raw query results.

RAISES DESCRIPTION
ConfigError

If API credentials not available.

JQLSyntaxError

If script has syntax errors.

Source code in src/mixpanel_data/workspace.py
def jql(self, script: str, params: dict[str, Any] | None = None) -> JQLResult:
    """Execute a custom JQL script.

    Args:
        script: JQL script code.
        params: Optional parameters to pass to script.

    Returns:
        JQLResult with raw query results.

    Raises:
        ConfigError: If API credentials not available.
        JQLSyntaxError: If script has syntax errors.
    """
    return self._live_query_service.jql(script=script, params=params)

event_counts

event_counts(
    events: list[str],
    *,
    from_date: str,
    to_date: str,
    type: Literal["general", "unique", "average"] = "general",
    unit: Literal["day", "week", "month"] = "day",
) -> EventCountsResult

Get event counts for multiple events.

PARAMETER DESCRIPTION
events

List of event names.

TYPE: list[str]

from_date

Start date (YYYY-MM-DD).

TYPE: str

to_date

End date (YYYY-MM-DD).

TYPE: str

type

Counting method.

TYPE: Literal['general', 'unique', 'average'] DEFAULT: 'general'

unit

Time unit.

TYPE: Literal['day', 'week', 'month'] DEFAULT: 'day'

RETURNS DESCRIPTION
EventCountsResult

EventCountsResult with time-series per event.

RAISES DESCRIPTION
ConfigError

If API credentials not available.

Source code in src/mixpanel_data/workspace.py
def event_counts(
    self,
    events: list[str],
    *,
    from_date: str,
    to_date: str,
    type: Literal["general", "unique", "average"] = "general",
    unit: Literal["day", "week", "month"] = "day",
) -> EventCountsResult:
    """Get event counts for multiple events.

    Args:
        events: List of event names.
        from_date: Start date (YYYY-MM-DD).
        to_date: End date (YYYY-MM-DD).
        type: Counting method.
        unit: Time unit.

    Returns:
        EventCountsResult with time-series per event.

    Raises:
        ConfigError: If API credentials not available.
    """
    return self._live_query_service.event_counts(
        events=events,
        from_date=from_date,
        to_date=to_date,
        type=type,
        unit=unit,
    )

property_counts

property_counts(
    event: str,
    property_name: str,
    *,
    from_date: str,
    to_date: str,
    type: Literal["general", "unique", "average"] = "general",
    unit: Literal["day", "week", "month"] = "day",
    values: list[str] | None = None,
    limit: int | None = None,
) -> PropertyCountsResult

Get event counts broken down by property values.

PARAMETER DESCRIPTION
event

Event name.

TYPE: str

property_name

Property to break down by.

TYPE: str

from_date

Start date (YYYY-MM-DD).

TYPE: str

to_date

End date (YYYY-MM-DD).

TYPE: str

type

Counting method.

TYPE: Literal['general', 'unique', 'average'] DEFAULT: 'general'

unit

Time unit.

TYPE: Literal['day', 'week', 'month'] DEFAULT: 'day'

values

Optional list of property values to include.

TYPE: list[str] | None DEFAULT: None

limit

Maximum number of property values.

TYPE: int | None DEFAULT: None

RETURNS DESCRIPTION
PropertyCountsResult

PropertyCountsResult with time-series per property value.

RAISES DESCRIPTION
ConfigError

If API credentials not available.

Source code in src/mixpanel_data/workspace.py
def property_counts(
    self,
    event: str,
    property_name: str,
    *,
    from_date: str,
    to_date: str,
    type: Literal["general", "unique", "average"] = "general",
    unit: Literal["day", "week", "month"] = "day",
    values: list[str] | None = None,
    limit: int | None = None,
) -> PropertyCountsResult:
    """Get event counts broken down by property values.

    Args:
        event: Event name.
        property_name: Property to break down by.
        from_date: Start date (YYYY-MM-DD).
        to_date: End date (YYYY-MM-DD).
        type: Counting method.
        unit: Time unit.
        values: Optional list of property values to include.
        limit: Maximum number of property values.

    Returns:
        PropertyCountsResult with time-series per property value.

    Raises:
        ConfigError: If API credentials not available.
    """
    return self._live_query_service.property_counts(
        event=event,
        property_name=property_name,
        from_date=from_date,
        to_date=to_date,
        type=type,
        unit=unit,
        values=values,
        limit=limit,
    )

activity_feed

activity_feed(
    distinct_ids: list[str],
    *,
    from_date: str | None = None,
    to_date: str | None = None,
) -> ActivityFeedResult

Get activity feed for specific users.

PARAMETER DESCRIPTION
distinct_ids

List of user identifiers.

TYPE: list[str]

from_date

Optional start date filter.

TYPE: str | None DEFAULT: None

to_date

Optional end date filter.

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
ActivityFeedResult

ActivityFeedResult with user events.

RAISES DESCRIPTION
ConfigError

If API credentials not available.

Source code in src/mixpanel_data/workspace.py
def activity_feed(
    self,
    distinct_ids: list[str],
    *,
    from_date: str | None = None,
    to_date: str | None = None,
) -> ActivityFeedResult:
    """Get activity feed for specific users.

    Args:
        distinct_ids: List of user identifiers.
        from_date: Optional start date filter.
        to_date: Optional end date filter.

    Returns:
        ActivityFeedResult with user events.

    Raises:
        ConfigError: If API credentials not available.
    """
    return self._live_query_service.activity_feed(
        distinct_ids=distinct_ids,
        from_date=from_date,
        to_date=to_date,
    )

query_saved_report

query_saved_report(bookmark_id: int) -> SavedReportResult

Query a saved report (Insights, Retention, or Funnel).

Executes a saved report by its bookmark ID. The report type is automatically detected from the response headers.

PARAMETER DESCRIPTION
bookmark_id

ID of saved report (from list_bookmarks or Mixpanel URL).

TYPE: int

RETURNS DESCRIPTION
SavedReportResult

SavedReportResult with report data and report_type property.

RAISES DESCRIPTION
ConfigError

If API credentials not available.

QueryError

If bookmark_id is invalid or report not found.

Source code in src/mixpanel_data/workspace.py
def query_saved_report(self, bookmark_id: int) -> SavedReportResult:
    """Query a saved report (Insights, Retention, or Funnel).

    Executes a saved report by its bookmark ID. The report type is
    automatically detected from the response headers.

    Args:
        bookmark_id: ID of saved report (from list_bookmarks or Mixpanel URL).

    Returns:
        SavedReportResult with report data and report_type property.

    Raises:
        ConfigError: If API credentials not available.
        QueryError: If bookmark_id is invalid or report not found.
    """
    return self._live_query_service.query_saved_report(bookmark_id=bookmark_id)

query_flows

query_flows(bookmark_id: int) -> FlowsResult

Query a saved Flows report.

Executes a saved Flows report by its bookmark ID, returning step data, breakdowns, and conversion rates.

PARAMETER DESCRIPTION
bookmark_id

ID of saved flows report (from list_bookmarks or Mixpanel URL).

TYPE: int

RETURNS DESCRIPTION
FlowsResult

FlowsResult with steps, breakdowns, and conversion rate.

RAISES DESCRIPTION
ConfigError

If API credentials not available.

QueryError

If bookmark_id is invalid or report not found.

Source code in src/mixpanel_data/workspace.py
def query_flows(self, bookmark_id: int) -> FlowsResult:
    """Query a saved Flows report.

    Executes a saved Flows report by its bookmark ID, returning
    step data, breakdowns, and conversion rates.

    Args:
        bookmark_id: ID of saved flows report (from list_bookmarks or Mixpanel URL).

    Returns:
        FlowsResult with steps, breakdowns, and conversion rate.

    Raises:
        ConfigError: If API credentials not available.
        QueryError: If bookmark_id is invalid or report not found.
    """
    return self._live_query_service.query_flows(bookmark_id=bookmark_id)

frequency

frequency(
    *,
    from_date: str,
    to_date: str,
    unit: Literal["day", "week", "month"] = "day",
    addiction_unit: Literal["hour", "day"] = "hour",
    event: str | None = None,
    where: str | None = None,
) -> FrequencyResult

Analyze event frequency distribution.

PARAMETER DESCRIPTION
from_date

Start date (YYYY-MM-DD).

TYPE: str

to_date

End date (YYYY-MM-DD).

TYPE: str

unit

Overall time unit.

TYPE: Literal['day', 'week', 'month'] DEFAULT: 'day'

addiction_unit

Measurement granularity.

TYPE: Literal['hour', 'day'] DEFAULT: 'hour'

event

Optional event filter.

TYPE: str | None DEFAULT: None

where

Optional WHERE clause.

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
FrequencyResult

FrequencyResult with frequency distribution.

RAISES DESCRIPTION
ConfigError

If API credentials not available.

Source code in src/mixpanel_data/workspace.py
def frequency(
    self,
    *,
    from_date: str,
    to_date: str,
    unit: Literal["day", "week", "month"] = "day",
    addiction_unit: Literal["hour", "day"] = "hour",
    event: str | None = None,
    where: str | None = None,
) -> FrequencyResult:
    """Analyze event frequency distribution.

    Args:
        from_date: Start date (YYYY-MM-DD).
        to_date: End date (YYYY-MM-DD).
        unit: Overall time unit.
        addiction_unit: Measurement granularity.
        event: Optional event filter.
        where: Optional WHERE clause.

    Returns:
        FrequencyResult with frequency distribution.

    Raises:
        ConfigError: If API credentials not available.
    """
    return self._live_query_service.frequency(
        from_date=from_date,
        to_date=to_date,
        unit=unit,
        addiction_unit=addiction_unit,
        event=event,
        where=where,
    )

segmentation_numeric

segmentation_numeric(
    event: str,
    *,
    from_date: str,
    to_date: str,
    on: str,
    unit: Literal["hour", "day"] = "day",
    where: str | None = None,
    type: Literal["general", "unique", "average"] = "general",
) -> NumericBucketResult

Bucket events by numeric property ranges.

PARAMETER DESCRIPTION
event

Event name.

TYPE: str

from_date

Start date.

TYPE: str

to_date

End date.

TYPE: str

on

Numeric property expression.

TYPE: str

unit

Time unit.

TYPE: Literal['hour', 'day'] DEFAULT: 'day'

where

Optional filter.

TYPE: str | None DEFAULT: None

type

Counting method.

TYPE: Literal['general', 'unique', 'average'] DEFAULT: 'general'

RETURNS DESCRIPTION
NumericBucketResult

NumericBucketResult with bucketed data.

RAISES DESCRIPTION
ConfigError

If API credentials not available.

Source code in src/mixpanel_data/workspace.py
def segmentation_numeric(
    self,
    event: str,
    *,
    from_date: str,
    to_date: str,
    on: str,
    unit: Literal["hour", "day"] = "day",
    where: str | None = None,
    type: Literal["general", "unique", "average"] = "general",
) -> NumericBucketResult:
    """Bucket events by numeric property ranges.

    Args:
        event: Event name.
        from_date: Start date.
        to_date: End date.
        on: Numeric property expression.
        unit: Time unit.
        where: Optional filter.
        type: Counting method.

    Returns:
        NumericBucketResult with bucketed data.

    Raises:
        ConfigError: If API credentials not available.
    """
    return self._live_query_service.segmentation_numeric(
        event=event,
        from_date=from_date,
        to_date=to_date,
        on=on,
        unit=unit,
        where=where,
        type=type,
    )

segmentation_sum

segmentation_sum(
    event: str,
    *,
    from_date: str,
    to_date: str,
    on: str,
    unit: Literal["hour", "day"] = "day",
    where: str | None = None,
) -> NumericSumResult

Calculate sum of numeric property over time.

PARAMETER DESCRIPTION
event

Event name.

TYPE: str

from_date

Start date.

TYPE: str

to_date

End date.

TYPE: str

on

Numeric property expression.

TYPE: str

unit

Time unit.

TYPE: Literal['hour', 'day'] DEFAULT: 'day'

where

Optional filter.

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
NumericSumResult

NumericSumResult with sum values per period.

RAISES DESCRIPTION
ConfigError

If API credentials not available.

Source code in src/mixpanel_data/workspace.py
def segmentation_sum(
    self,
    event: str,
    *,
    from_date: str,
    to_date: str,
    on: str,
    unit: Literal["hour", "day"] = "day",
    where: str | None = None,
) -> NumericSumResult:
    """Calculate sum of numeric property over time.

    Args:
        event: Event name.
        from_date: Start date.
        to_date: End date.
        on: Numeric property expression.
        unit: Time unit.
        where: Optional filter.

    Returns:
        NumericSumResult with sum values per period.

    Raises:
        ConfigError: If API credentials not available.
    """
    return self._live_query_service.segmentation_sum(
        event=event,
        from_date=from_date,
        to_date=to_date,
        on=on,
        unit=unit,
        where=where,
    )

segmentation_average

segmentation_average(
    event: str,
    *,
    from_date: str,
    to_date: str,
    on: str,
    unit: Literal["hour", "day"] = "day",
    where: str | None = None,
) -> NumericAverageResult

Calculate average of numeric property over time.

PARAMETER DESCRIPTION
event

Event name.

TYPE: str

from_date

Start date.

TYPE: str

to_date

End date.

TYPE: str

on

Numeric property expression.

TYPE: str

unit

Time unit.

TYPE: Literal['hour', 'day'] DEFAULT: 'day'

where

Optional filter.

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
NumericAverageResult

NumericAverageResult with average values per period.

RAISES DESCRIPTION
ConfigError

If API credentials not available.

Source code in src/mixpanel_data/workspace.py
def segmentation_average(
    self,
    event: str,
    *,
    from_date: str,
    to_date: str,
    on: str,
    unit: Literal["hour", "day"] = "day",
    where: str | None = None,
) -> NumericAverageResult:
    """Calculate average of numeric property over time.

    Args:
        event: Event name.
        from_date: Start date.
        to_date: End date.
        on: Numeric property expression.
        unit: Time unit.
        where: Optional filter.

    Returns:
        NumericAverageResult with average values per period.

    Raises:
        ConfigError: If API credentials not available.
    """
    return self._live_query_service.segmentation_average(
        event=event,
        from_date=from_date,
        to_date=to_date,
        on=on,
        unit=unit,
        where=where,
    )

property_distribution

property_distribution(
    event: str, property: str, *, from_date: str, to_date: str, limit: int = 20
) -> PropertyDistributionResult

Get distribution of values for a property.

Uses JQL to count occurrences of each property value, returning counts and percentages sorted by frequency.

PARAMETER DESCRIPTION
event

Event name to analyze.

TYPE: str

property

Property name to get distribution for.

TYPE: str

from_date

Start date (YYYY-MM-DD).

TYPE: str

to_date

End date (YYYY-MM-DD).

TYPE: str

limit

Maximum number of values to return. Default: 20.

TYPE: int DEFAULT: 20

RETURNS DESCRIPTION
PropertyDistributionResult

PropertyDistributionResult with value counts and percentages.

RAISES DESCRIPTION
ConfigError

If API credentials not available.

QueryError

Script execution error.

Example
result = ws.property_distribution(
    event="Purchase",
    property="country",
    from_date="2024-01-01",
    to_date="2024-01-31",
)
for v in result.values:
    print(f"{v.value}: {v.count} ({v.percentage:.1f}%)")
Source code in src/mixpanel_data/workspace.py
def property_distribution(
    self,
    event: str,
    property: str,
    *,
    from_date: str,
    to_date: str,
    limit: int = 20,
) -> PropertyDistributionResult:
    """Get distribution of values for a property.

    Uses JQL to count occurrences of each property value, returning
    counts and percentages sorted by frequency.

    Args:
        event: Event name to analyze.
        property: Property name to get distribution for.
        from_date: Start date (YYYY-MM-DD).
        to_date: End date (YYYY-MM-DD).
        limit: Maximum number of values to return. Default: 20.

    Returns:
        PropertyDistributionResult with value counts and percentages.

    Raises:
        ConfigError: If API credentials not available.
        QueryError: Script execution error.

    Example:
        ```python
        result = ws.property_distribution(
            event="Purchase",
            property="country",
            from_date="2024-01-01",
            to_date="2024-01-31",
        )
        for v in result.values:
            print(f"{v.value}: {v.count} ({v.percentage:.1f}%)")
        ```
    """
    return self._live_query_service.property_distribution(
        event=event,
        property=property,
        from_date=from_date,
        to_date=to_date,
        limit=limit,
    )

numeric_summary

numeric_summary(
    event: str,
    property: str,
    *,
    from_date: str,
    to_date: str,
    percentiles: list[int] | None = None,
) -> NumericPropertySummaryResult

Get statistical summary for a numeric property.

Uses JQL to compute count, min, max, avg, stddev, and percentiles for a numeric property.

PARAMETER DESCRIPTION
event

Event name to analyze.

TYPE: str

property

Numeric property name.

TYPE: str

from_date

Start date (YYYY-MM-DD).

TYPE: str

to_date

End date (YYYY-MM-DD).

TYPE: str

percentiles

Percentiles to compute. Default: [25, 50, 75, 90, 95, 99].

TYPE: list[int] | None DEFAULT: None

RETURNS DESCRIPTION
NumericPropertySummaryResult

NumericPropertySummaryResult with statistics.

RAISES DESCRIPTION
ConfigError

If API credentials not available.

QueryError

Script execution error or non-numeric property.

Example
result = ws.numeric_summary(
    event="Purchase",
    property="amount",
    from_date="2024-01-01",
    to_date="2024-01-31",
)
print(f"Avg: {result.avg}, Median: {result.percentiles[50]}")
Source code in src/mixpanel_data/workspace.py
def numeric_summary(
    self,
    event: str,
    property: str,
    *,
    from_date: str,
    to_date: str,
    percentiles: list[int] | None = None,
) -> NumericPropertySummaryResult:
    """Get statistical summary for a numeric property.

    Uses JQL to compute count, min, max, avg, stddev, and percentiles
    for a numeric property.

    Args:
        event: Event name to analyze.
        property: Numeric property name.
        from_date: Start date (YYYY-MM-DD).
        to_date: End date (YYYY-MM-DD).
        percentiles: Percentiles to compute. Default: [25, 50, 75, 90, 95, 99].

    Returns:
        NumericPropertySummaryResult with statistics.

    Raises:
        ConfigError: If API credentials not available.
        QueryError: Script execution error or non-numeric property.

    Example:
        ```python
        result = ws.numeric_summary(
            event="Purchase",
            property="amount",
            from_date="2024-01-01",
            to_date="2024-01-31",
        )
        print(f"Avg: {result.avg}, Median: {result.percentiles[50]}")
        ```
    """
    return self._live_query_service.numeric_summary(
        event=event,
        property=property,
        from_date=from_date,
        to_date=to_date,
        percentiles=percentiles,
    )

daily_counts

daily_counts(
    *, from_date: str, to_date: str, events: list[str] | None = None
) -> DailyCountsResult

Get daily event counts.

Uses JQL to count events by day, optionally filtered to specific events.

PARAMETER DESCRIPTION
from_date

Start date (YYYY-MM-DD).

TYPE: str

to_date

End date (YYYY-MM-DD).

TYPE: str

events

Optional list of events to count. None = all events.

TYPE: list[str] | None DEFAULT: None

RETURNS DESCRIPTION
DailyCountsResult

DailyCountsResult with date/event/count tuples.

RAISES DESCRIPTION
ConfigError

If API credentials not available.

QueryError

Script execution error.

Example
result = ws.daily_counts(
    from_date="2024-01-01",
    to_date="2024-01-07",
    events=["Purchase", "Signup"],
)
for c in result.counts:
    print(f"{c.date} {c.event}: {c.count}")
Source code in src/mixpanel_data/workspace.py
def daily_counts(
    self,
    *,
    from_date: str,
    to_date: str,
    events: list[str] | None = None,
) -> DailyCountsResult:
    """Get daily event counts.

    Uses JQL to count events by day, optionally filtered to specific events.

    Args:
        from_date: Start date (YYYY-MM-DD).
        to_date: End date (YYYY-MM-DD).
        events: Optional list of events to count. None = all events.

    Returns:
        DailyCountsResult with date/event/count tuples.

    Raises:
        ConfigError: If API credentials not available.
        QueryError: Script execution error.

    Example:
        ```python
        result = ws.daily_counts(
            from_date="2024-01-01",
            to_date="2024-01-07",
            events=["Purchase", "Signup"],
        )
        for c in result.counts:
            print(f"{c.date} {c.event}: {c.count}")
        ```
    """
    return self._live_query_service.daily_counts(
        from_date=from_date,
        to_date=to_date,
        events=events,
    )

engagement_distribution

engagement_distribution(
    *,
    from_date: str,
    to_date: str,
    events: list[str] | None = None,
    buckets: list[int] | None = None,
) -> EngagementDistributionResult

Get user engagement distribution.

Uses JQL to bucket users by their event count, showing how many users performed N events.

PARAMETER DESCRIPTION
from_date

Start date (YYYY-MM-DD).

TYPE: str

to_date

End date (YYYY-MM-DD).

TYPE: str

events

Optional list of events to count. None = all events.

TYPE: list[str] | None DEFAULT: None

buckets

Bucket boundaries. Default: [1, 2, 5, 10, 25, 50, 100].

TYPE: list[int] | None DEFAULT: None

RETURNS DESCRIPTION
EngagementDistributionResult

EngagementDistributionResult with user counts per bucket.

RAISES DESCRIPTION
ConfigError

If API credentials not available.

QueryError

Script execution error.

Example
result = ws.engagement_distribution(
    from_date="2024-01-01",
    to_date="2024-01-31",
)
for b in result.buckets:
    print(f"{b.bucket_label}: {b.user_count} ({b.percentage:.1f}%)")
Source code in src/mixpanel_data/workspace.py
def engagement_distribution(
    self,
    *,
    from_date: str,
    to_date: str,
    events: list[str] | None = None,
    buckets: list[int] | None = None,
) -> EngagementDistributionResult:
    """Get user engagement distribution.

    Uses JQL to bucket users by their event count, showing how many
    users performed N events.

    Args:
        from_date: Start date (YYYY-MM-DD).
        to_date: End date (YYYY-MM-DD).
        events: Optional list of events to count. None = all events.
        buckets: Bucket boundaries. Default: [1, 2, 5, 10, 25, 50, 100].

    Returns:
        EngagementDistributionResult with user counts per bucket.

    Raises:
        ConfigError: If API credentials not available.
        QueryError: Script execution error.

    Example:
        ```python
        result = ws.engagement_distribution(
            from_date="2024-01-01",
            to_date="2024-01-31",
        )
        for b in result.buckets:
            print(f"{b.bucket_label}: {b.user_count} ({b.percentage:.1f}%)")
        ```
    """
    return self._live_query_service.engagement_distribution(
        from_date=from_date,
        to_date=to_date,
        events=events,
        buckets=buckets,
    )

property_coverage

property_coverage(
    event: str, properties: list[str], *, from_date: str, to_date: str
) -> PropertyCoverageResult

Get property coverage statistics.

Uses JQL to count how often each property is defined (non-null) vs undefined for the specified event.

PARAMETER DESCRIPTION
event

Event name to analyze.

TYPE: str

properties

List of property names to check.

TYPE: list[str]

from_date

Start date (YYYY-MM-DD).

TYPE: str

to_date

End date (YYYY-MM-DD).

TYPE: str

RETURNS DESCRIPTION
PropertyCoverageResult

PropertyCoverageResult with coverage statistics per property.

RAISES DESCRIPTION
ConfigError

If API credentials not available.

QueryError

Script execution error.

Example
result = ws.property_coverage(
    event="Purchase",
    properties=["coupon_code", "referrer"],
    from_date="2024-01-01",
    to_date="2024-01-31",
)
for c in result.coverage:
    print(f"{c.property}: {c.coverage_percentage:.1f}% defined")
Source code in src/mixpanel_data/workspace.py
def property_coverage(
    self,
    event: str,
    properties: list[str],
    *,
    from_date: str,
    to_date: str,
) -> PropertyCoverageResult:
    """Get property coverage statistics.

    Uses JQL to count how often each property is defined (non-null)
    vs undefined for the specified event.

    Args:
        event: Event name to analyze.
        properties: List of property names to check.
        from_date: Start date (YYYY-MM-DD).
        to_date: End date (YYYY-MM-DD).

    Returns:
        PropertyCoverageResult with coverage statistics per property.

    Raises:
        ConfigError: If API credentials not available.
        QueryError: Script execution error.

    Example:
        ```python
        result = ws.property_coverage(
            event="Purchase",
            properties=["coupon_code", "referrer"],
            from_date="2024-01-01",
            to_date="2024-01-31",
        )
        for c in result.coverage:
            print(f"{c.property}: {c.coverage_percentage:.1f}% defined")
        ```
    """
    return self._live_query_service.property_coverage(
        event=event,
        properties=properties,
        from_date=from_date,
        to_date=to_date,
    )

info

info() -> WorkspaceInfo

Get metadata about this workspace.

RETURNS DESCRIPTION
WorkspaceInfo

WorkspaceInfo with path, project_id, region, account, tables, size.

Source code in src/mixpanel_data/workspace.py
def info(self) -> WorkspaceInfo:
    """Get metadata about this workspace.

    Returns:
        WorkspaceInfo with path, project_id, region, account, tables, size.
    """
    path = self.storage.path
    tables = [t.name for t in self.storage.list_tables()]

    # Calculate database size and creation time
    size_mb = 0.0
    created_at: datetime | None = None
    if path is not None and path.exists():
        try:
            stat = path.stat()
            size_mb = stat.st_size / 1_000_000
            created_at = datetime.fromtimestamp(stat.st_ctime)
        except (OSError, PermissionError):
            # File became inaccessible, use defaults
            pass

    return WorkspaceInfo(
        path=path,
        project_id=self._credentials.project_id if self._credentials else "unknown",
        region=self._credentials.region if self._credentials else "unknown",
        account=self._account_name,
        tables=tables,
        size_mb=size_mb,
        created_at=created_at,
    )

tables

tables() -> list[TableInfo]

List tables in the local database.

RETURNS DESCRIPTION
list[TableInfo]

List of TableInfo objects (name, type, row_count, fetched_at).

Source code in src/mixpanel_data/workspace.py
def tables(self) -> list[TableInfo]:
    """List tables in the local database.

    Returns:
        List of TableInfo objects (name, type, row_count, fetched_at).
    """
    return self.storage.list_tables()

table_schema

table_schema(table: str) -> TableSchema

Get schema for a table in the local database.

PARAMETER DESCRIPTION
table

Table name.

TYPE: str

RETURNS DESCRIPTION
TableSchema

TableSchema with column definitions.

RAISES DESCRIPTION
TableNotFoundError

If table doesn't exist.

Source code in src/mixpanel_data/workspace.py
def table_schema(self, table: str) -> TableSchema:
    """Get schema for a table in the local database.

    Args:
        table: Table name.

    Returns:
        TableSchema with column definitions.

    Raises:
        TableNotFoundError: If table doesn't exist.
    """
    return self.storage.get_schema(table)

drop

drop(*names: str) -> None

Drop specified tables.

PARAMETER DESCRIPTION
*names

Table names to drop.

TYPE: str DEFAULT: ()

RAISES DESCRIPTION
TableNotFoundError

If any table doesn't exist.

Source code in src/mixpanel_data/workspace.py
def drop(self, *names: str) -> None:
    """Drop specified tables.

    Args:
        *names: Table names to drop.

    Raises:
        TableNotFoundError: If any table doesn't exist.
    """
    for name in names:
        self.storage.drop_table(name)

drop_all

drop_all(type: TableType | None = None) -> None

Drop all tables from the workspace, optionally filtered by type.

Permanently removes all tables and their data. When used with the type parameter, only tables matching the specified type are dropped.

PARAMETER DESCRIPTION
type

Optional table type filter. Valid values: "events", "profiles". If None, all tables are dropped regardless of type.

TYPE: TableType | None DEFAULT: None

RAISES DESCRIPTION
TableNotFoundError

If a table cannot be dropped (rare in practice).

Example

Drop all event tables:

ws = Workspace()
ws.drop_all(type="events")  # Only drops event tables
ws.close()

Drop all tables:

ws = Workspace()
ws.drop_all()  # Drops everything
ws.close()
Source code in src/mixpanel_data/workspace.py
def drop_all(self, type: TableType | None = None) -> None:
    """Drop all tables from the workspace, optionally filtered by type.

    Permanently removes all tables and their data. When used with the type
    parameter, only tables matching the specified type are dropped.

    Args:
        type: Optional table type filter. Valid values: "events", "profiles".
              If None, all tables are dropped regardless of type.

    Raises:
        TableNotFoundError: If a table cannot be dropped (rare in practice).

    Example:
        Drop all event tables:

        ```python
        ws = Workspace()
        ws.drop_all(type="events")  # Only drops event tables
        ws.close()
        ```

        Drop all tables:

        ```python
        ws = Workspace()
        ws.drop_all()  # Drops everything
        ws.close()
        ```
    """
    tables = self.storage.list_tables()
    for table in tables:
        if type is None or table.type == type:
            self.storage.drop_table(table.name)

sample

sample(table: str, n: int = 10) -> pd.DataFrame

Return random sample rows from a table.

Uses DuckDB's reservoir sampling for representative results. Unlike LIMIT, sampling returns rows from throughout the table.

PARAMETER DESCRIPTION
table

Table name to sample from.

TYPE: str

n

Number of rows to return (default: 10).

TYPE: int DEFAULT: 10

RETURNS DESCRIPTION
DataFrame

DataFrame with n random rows. If table has fewer than n rows,

DataFrame

returns all available rows.

RAISES DESCRIPTION
TableNotFoundError

If table doesn't exist.

Example
ws = Workspace()
ws.sample("events")  # 10 random rows
ws.sample("events", n=5)  # 5 random rows
Source code in src/mixpanel_data/workspace.py
def sample(self, table: str, n: int = 10) -> pd.DataFrame:
    """Return random sample rows from a table.

    Uses DuckDB's reservoir sampling for representative results.
    Unlike LIMIT, sampling returns rows from throughout the table.

    Args:
        table: Table name to sample from.
        n: Number of rows to return (default: 10).

    Returns:
        DataFrame with n random rows. If table has fewer than n rows,
        returns all available rows.

    Raises:
        TableNotFoundError: If table doesn't exist.

    Example:
        ```python
        ws = Workspace()
        ws.sample("events")  # 10 random rows
        ws.sample("events", n=5)  # 5 random rows
        ```
    """
    # Validate table exists
    self.storage.get_schema(table)

    # Use DuckDB's reservoir sampling
    sql = f'SELECT * FROM "{table}" USING SAMPLE {n}'
    return self.storage.execute_df(sql)

summarize

summarize(table: str) -> SummaryResult

Get statistical summary of all columns in a table.

Uses DuckDB's SUMMARIZE command to compute min/max, quartiles, null percentage, and approximate distinct counts for each column.

PARAMETER DESCRIPTION
table

Table name to summarize.

TYPE: str

RETURNS DESCRIPTION
SummaryResult

SummaryResult with per-column statistics and total row count.

RAISES DESCRIPTION
TableNotFoundError

If table doesn't exist.

Example
result = ws.summarize("events")
result.row_count         # 1234567
result.columns[0].null_percentage  # 0.5
result.df                # Full summary as DataFrame
Source code in src/mixpanel_data/workspace.py
def summarize(self, table: str) -> SummaryResult:
    """Get statistical summary of all columns in a table.

    Uses DuckDB's SUMMARIZE command to compute min/max, quartiles,
    null percentage, and approximate distinct counts for each column.

    Args:
        table: Table name to summarize.

    Returns:
        SummaryResult with per-column statistics and total row count.

    Raises:
        TableNotFoundError: If table doesn't exist.

    Example:
        ```python
        result = ws.summarize("events")
        result.row_count         # 1234567
        result.columns[0].null_percentage  # 0.5
        result.df                # Full summary as DataFrame
        ```
    """
    # Validate table exists
    self.storage.get_schema(table)

    # Get row count
    row_count = self.storage.execute_scalar(f'SELECT COUNT(*) FROM "{table}"')

    # Get column statistics using SUMMARIZE
    summary_df = self.storage.execute_df(f'SUMMARIZE "{table}"')

    # Convert to ColumnSummary objects (to_dict is more efficient than iterrows)
    columns: list[ColumnSummary] = []
    for row in summary_df.to_dict("records"):
        columns.append(
            ColumnSummary(
                column_name=str(row["column_name"]),
                column_type=str(row["column_type"]),
                min=row["min"],
                max=row["max"],
                approx_unique=int(row["approx_unique"]),
                avg=self._try_float(row["avg"]),
                std=self._try_float(row["std"]),
                q25=row["q25"],
                q50=row["q50"],
                q75=row["q75"],
                count=int(row["count"]),
                null_percentage=float(row["null_percentage"]),
            )
        )

    return SummaryResult(
        table=table,
        row_count=int(row_count),
        columns=columns,
    )

event_breakdown

event_breakdown(table: str) -> EventBreakdownResult

Analyze event distribution in a table.

Computes per-event counts, unique users, date ranges, and percentage of total for each event type.

PARAMETER DESCRIPTION
table

Table name containing events. Must have columns: event_name, event_time, distinct_id.

TYPE: str

RETURNS DESCRIPTION
EventBreakdownResult

EventBreakdownResult with per-event statistics.

RAISES DESCRIPTION
TableNotFoundError

If table doesn't exist.

QueryError

If table lacks required columns (event_name, event_time, distinct_id). Error message lists the specific missing columns.

Example
breakdown = ws.event_breakdown("events")
breakdown.total_events           # 1234567
breakdown.events[0].event_name   # "Page View"
breakdown.events[0].pct_of_total # 45.2
Source code in src/mixpanel_data/workspace.py
def event_breakdown(self, table: str) -> EventBreakdownResult:
    """Analyze event distribution in a table.

    Computes per-event counts, unique users, date ranges, and
    percentage of total for each event type.

    Args:
        table: Table name containing events. Must have columns:
               event_name, event_time, distinct_id.

    Returns:
        EventBreakdownResult with per-event statistics.

    Raises:
        TableNotFoundError: If table doesn't exist.
        QueryError: If table lacks required columns (event_name,
                   event_time, distinct_id). Error message lists
                   the specific missing columns.

    Example:
        ```python
        breakdown = ws.event_breakdown("events")
        breakdown.total_events           # 1234567
        breakdown.events[0].event_name   # "Page View"
        breakdown.events[0].pct_of_total # 45.2
        ```
    """
    # Validate table exists and get schema
    schema = self.storage.get_schema(table)
    column_names = {col.name for col in schema.columns}

    # Check for required columns
    required_columns = {"event_name", "event_time", "distinct_id"}
    missing = required_columns - column_names
    if missing:
        raise QueryError(
            f"event_breakdown() requires columns {required_columns}, "
            f"but '{table}' is missing: {missing}",
            status_code=0,
        )

    # Get aggregate statistics
    agg_sql = f"""
        SELECT
            COUNT(*) as total_events,
            COUNT(DISTINCT distinct_id) as total_users,
            MIN(event_time) as min_time,
            MAX(event_time) as max_time
        FROM "{table}"
    """
    agg_result = self.storage.execute_rows(agg_sql)
    total_events, total_users, min_time, max_time = agg_result.rows[0]

    # Handle empty table
    if total_events == 0:
        return EventBreakdownResult(
            table=table,
            total_events=0,
            total_users=0,
            date_range=(datetime.min, datetime.min),
            events=[],
        )

    # Get per-event statistics
    breakdown_sql = f"""
        SELECT
            event_name,
            COUNT(*) as count,
            COUNT(DISTINCT distinct_id) as unique_users,
            MIN(event_time) as first_seen,
            MAX(event_time) as last_seen,
            ROUND(100.0 * COUNT(*) / SUM(COUNT(*)) OVER (), 2) as pct_of_total
        FROM "{table}"
        GROUP BY event_name
        ORDER BY count DESC
    """
    breakdown_rows = self.storage.execute_rows(breakdown_sql)

    events: list[EventStats] = []
    for row in breakdown_rows:
        event_name, count, unique_users, first_seen, last_seen, pct = row
        events.append(
            EventStats(
                event_name=str(event_name),
                count=int(count),
                unique_users=int(unique_users),
                first_seen=first_seen
                if isinstance(first_seen, datetime)
                else datetime.fromisoformat(str(first_seen)),
                last_seen=last_seen
                if isinstance(last_seen, datetime)
                else datetime.fromisoformat(str(last_seen)),
                pct_of_total=float(pct),
            )
        )

    return EventBreakdownResult(
        table=table,
        total_events=int(total_events),
        total_users=int(total_users),
        date_range=(
            min_time
            if isinstance(min_time, datetime)
            else datetime.fromisoformat(str(min_time)),
            max_time
            if isinstance(max_time, datetime)
            else datetime.fromisoformat(str(max_time)),
        ),
        events=events,
    )

property_keys

property_keys(table: str, event: str | None = None) -> list[str]

List all JSON property keys in a table.

Extracts distinct keys from the 'properties' JSON column. Useful for discovering queryable fields in event properties.

PARAMETER DESCRIPTION
table

Table name with a 'properties' JSON column.

TYPE: str

event

Optional event name to filter by. If provided, only returns keys present in events of that type.

TYPE: str | None DEFAULT: None

RETURNS DESCRIPTION
list[str]

Alphabetically sorted list of property key names.

list[str]

Empty list if no keys found.

RAISES DESCRIPTION
TableNotFoundError

If table doesn't exist.

QueryError

If table lacks 'properties' column.

Example

All keys across all events:

ws.property_keys("events")
# ['$browser', '$city', 'page', 'referrer', 'user_plan']

Keys for specific event type:

ws.property_keys("events", event="Purchase")
# ['amount', 'currency', 'product_id', 'quantity']
Source code in src/mixpanel_data/workspace.py
def property_keys(
    self,
    table: str,
    event: str | None = None,
) -> list[str]:
    """List all JSON property keys in a table.

    Extracts distinct keys from the 'properties' JSON column.
    Useful for discovering queryable fields in event properties.

    Args:
        table: Table name with a 'properties' JSON column.
        event: Optional event name to filter by. If provided, only
               returns keys present in events of that type.

    Returns:
        Alphabetically sorted list of property key names.
        Empty list if no keys found.

    Raises:
        TableNotFoundError: If table doesn't exist.
        QueryError: If table lacks 'properties' column.

    Example:
        All keys across all events:

        ```python
        ws.property_keys("events")
        # ['$browser', '$city', 'page', 'referrer', 'user_plan']
        ```

        Keys for specific event type:

        ```python
        ws.property_keys("events", event="Purchase")
        # ['amount', 'currency', 'product_id', 'quantity']
        ```
    """
    # Validate table exists and get schema
    schema = self.storage.get_schema(table)
    column_names = {col.name for col in schema.columns}

    # Check for required column
    if "properties" not in column_names:
        raise QueryError(
            f"property_keys() requires a 'properties' column, "
            f"but '{table}' does not have one",
            status_code=0,
        )

    # Build query with optional event filter
    if event is not None:
        # Check if event_name column exists
        if "event_name" not in column_names:
            raise QueryError(
                f"Cannot filter by event: '{table}' lacks 'event_name' column",
                status_code=0,
            )
        sql = f"""
            SELECT DISTINCT unnest(json_keys(properties)) as key
            FROM "{table}"
            WHERE event_name = ?
            ORDER BY key
        """
        result = self.storage.execute_rows_params(sql, [event])
        rows = result.rows
    else:
        sql = f"""
            SELECT DISTINCT unnest(json_keys(properties)) as key
            FROM "{table}"
            ORDER BY key
        """
        result = self.storage.execute_rows(sql)
        rows = result.rows

    return [str(row[0]) for row in rows]

column_stats

column_stats(table: str, column: str, *, top_n: int = 10) -> ColumnStatsResult

Get detailed statistics for a single column.

Performs deep analysis including null rates, cardinality, top values, and numeric statistics (for numeric columns).

The column parameter supports JSON path expressions for analyzing properties stored in JSON columns: - properties->>'$.country' for string extraction - CAST(properties->>'$.amount' AS DOUBLE) for numeric

PARAMETER DESCRIPTION
table

Table name to analyze.

TYPE: str

column

Column name or expression to analyze.

TYPE: str

top_n

Number of top values to return (default: 10).

TYPE: int DEFAULT: 10

RETURNS DESCRIPTION
ColumnStatsResult

ColumnStatsResult with comprehensive column statistics.

RAISES DESCRIPTION
TableNotFoundError

If table doesn't exist.

QueryError

If column expression is invalid.

Example

Analyze standard column:

stats = ws.column_stats("events", "event_name")
stats.unique_count      # 47
stats.top_values[:3]    # [('Page View', 45230), ...]

Analyze JSON property:

stats = ws.column_stats("events", "properties->>'$.country'")
Security

The column parameter is interpolated directly into SQL queries to allow expression syntax. Only use with trusted input from developers or AI coding agents. Do not pass untrusted user input.

Source code in src/mixpanel_data/workspace.py
def column_stats(
    self,
    table: str,
    column: str,
    *,
    top_n: int = 10,
) -> ColumnStatsResult:
    """Get detailed statistics for a single column.

    Performs deep analysis including null rates, cardinality,
    top values, and numeric statistics (for numeric columns).

    The column parameter supports JSON path expressions for
    analyzing properties stored in JSON columns:
    - `properties->>'$.country'` for string extraction
    - `CAST(properties->>'$.amount' AS DOUBLE)` for numeric

    Args:
        table: Table name to analyze.
        column: Column name or expression to analyze.
        top_n: Number of top values to return (default: 10).

    Returns:
        ColumnStatsResult with comprehensive column statistics.

    Raises:
        TableNotFoundError: If table doesn't exist.
        QueryError: If column expression is invalid.

    Example:
        Analyze standard column:

        ```python
        stats = ws.column_stats("events", "event_name")
        stats.unique_count      # 47
        stats.top_values[:3]    # [('Page View', 45230), ...]
        ```

        Analyze JSON property:

        ```python
        stats = ws.column_stats("events", "properties->>'$.country'")
        ```

    Security:
        The column parameter is interpolated directly into SQL queries
        to allow expression syntax. Only use with trusted input from
        developers or AI coding agents. Do not pass untrusted user input.
    """
    # Validate table exists
    self.storage.get_schema(table)

    # Get total row count
    total_rows = self.storage.execute_scalar(f'SELECT COUNT(*) FROM "{table}"')

    # Get basic stats: count, null_count, approx unique
    stats_sql = f"""
        SELECT
            COUNT({column}) as count,
            COUNT(*) - COUNT({column}) as null_count,
            APPROX_COUNT_DISTINCT({column}) as unique_count
        FROM "{table}"
    """
    try:
        stats_result = self.storage.execute_rows(stats_sql)
    except Exception as e:
        raise QueryError(
            f"Invalid column expression: {column}. Error: {e}",
            status_code=0,
        ) from e

    count, null_count, unique_count = stats_result.rows[0]

    # Calculate percentages
    null_pct = (null_count / total_rows * 100) if total_rows > 0 else 0.0
    unique_pct = (unique_count / count * 100) if count > 0 else 0.0

    # Get top values
    top_sql = f"""
        SELECT {column} as value, COUNT(*) as cnt
        FROM "{table}"
        WHERE {column} IS NOT NULL
        GROUP BY {column}
        ORDER BY cnt DESC
        LIMIT {top_n}
    """
    top_result = self.storage.execute_rows(top_sql)
    top_values: list[tuple[Any, int]] = [
        (row[0], int(row[1])) for row in top_result.rows
    ]

    # Detect column type to determine if numeric stats apply
    type_sql = (
        f'SELECT typeof({column}) FROM "{table}" WHERE {column} IS NOT NULL LIMIT 1'
    )
    try:
        type_result = self.storage.execute_rows(type_sql)
        dtype = str(type_result.rows[0][0]) if type_result.rows else "UNKNOWN"
    except Exception:
        dtype = "UNKNOWN"

    # Get numeric stats if applicable
    min_val: float | None = None
    max_val: float | None = None
    mean_val: float | None = None
    std_val: float | None = None

    numeric_types = {
        "INTEGER",
        "BIGINT",
        "DOUBLE",
        "FLOAT",
        "DECIMAL",
        "HUGEINT",
        "SMALLINT",
        "TINYINT",
        "UBIGINT",
        "UINTEGER",
        "USMALLINT",
        "UTINYINT",
    }
    if dtype.upper() in numeric_types:
        numeric_sql = f"""
            SELECT
                MIN({column}) as min_val,
                MAX({column}) as max_val,
                AVG({column}) as mean_val,
                STDDEV({column}) as std_val
            FROM "{table}"
        """
        try:
            numeric_result = self.storage.execute_rows(numeric_sql)
            if numeric_result.rows:
                row = numeric_result.rows[0]
                min_val = float(row[0]) if row[0] is not None else None
                max_val = float(row[1]) if row[1] is not None else None
                mean_val = float(row[2]) if row[2] is not None else None
                std_val = float(row[3]) if row[3] is not None else None
        except Exception:
            # Not numeric, skip
            pass

    return ColumnStatsResult(
        table=table,
        column=column,
        dtype=dtype,
        count=int(count),
        null_count=int(null_count),
        null_pct=round(null_pct, 2),
        unique_count=int(unique_count),
        unique_pct=round(unique_pct, 2),
        top_values=top_values,
        min=min_val,
        max=max_val,
        mean=mean_val,
        std=std_val,
    )