Fetching Data¶
Fetch events and user profiles from Mixpanel into a local DuckDB database for fast, repeated SQL queries.
Explore on DeepWiki
Ask questions about fetch options, parallel processing, or troubleshoot data ingestion issues.
Fetching Events¶
Basic Usage¶
Fetch all events for a date range:
Filtering Events¶
Fetch specific event types:
Using Where Clauses¶
Filter with Mixpanel expression syntax:
Limiting Results¶
Cap the number of events returned (max 100,000):
This is useful for testing queries or sampling data before a full fetch.
Progress Tracking¶
Monitor fetch progress with a callback:
def on_progress(count: int) -> None:
print(f"Fetched {count} events...")
result = ws.fetch_events(
name="events",
from_date="2025-01-01",
to_date="2025-01-31",
progress_callback=on_progress
)
The CLI automatically displays a progress bar.
Batch Size¶
Control the memory/IO tradeoff with batch_size:
# Smaller batch size = less memory, more disk IO
result = ws.fetch_events(
name="events",
from_date="2025-01-01",
to_date="2025-01-31",
batch_size=500
)
# Larger batch size = more memory, less disk IO
result = ws.fetch_events(
name="events",
from_date="2025-01-01",
to_date="2025-01-31",
batch_size=5000
)
The default is 1000 rows per commit. Valid range: 100-100,000.
Parallel Fetching¶
For large date ranges, parallel fetching can dramatically speed up exports—up to 10x faster for multi-month ranges.
Basic Parallel Fetch¶
Enable parallel fetching with the parallel flag:
Parallel fetching splits the date range into 7-day chunks and fetches them concurrently using multiple threads. This bypasses Mixpanel's 100-day limit and enables faster exports.
How It Works¶
- Date Range Chunking: The date range is split into chunks (default: 7 days each)
- Concurrent Fetching: Multiple threads fetch chunks simultaneously from Mixpanel
- Single-Writer Queue: A dedicated writer thread serializes writes to DuckDB (respecting its single-writer constraint)
- Partial Failure Handling: Failed batches are tracked for potential retry
Performance¶
| Date Range | Sequential | Parallel (10 workers) | Speedup |
|---|---|---|---|
| 7 days | ~5s | ~5s | 1x (no benefit) |
| 30 days | ~20s | ~5s | 4x |
| 90 days | ~60s | ~8s | 7.5x |
When to Use Parallel Fetching
- Use parallel for date ranges > 7 days
- Use sequential for small ranges or when you need the
limitparameter
Configuring Workers¶
Control the number of concurrent fetch threads:
Higher worker counts may hit Mixpanel rate limits. The default of 10 works well for most cases.
Configuring Chunk Size¶
Control how many days each chunk covers:
Smaller chunk sizes create more parallel batches (potentially faster) but increase API overhead. Valid range: 1-100 days.
Progress Callbacks¶
Monitor batch completion with a callback:
from mixpanel_data import BatchProgress
def on_batch(progress: BatchProgress) -> None:
status = "âś“" if progress.success else "âś—"
print(f"[{status}] Batch {progress.batch_index + 1}/{progress.total_batches}: "
f"{progress.from_date} to {progress.to_date} ({progress.rows} rows)")
result = ws.fetch_events(
name="events",
from_date="2024-01-01",
to_date="2024-03-31",
parallel=True,
on_batch_complete=on_batch
)
The CLI automatically displays batch progress when --parallel is used.
Handling Failures¶
Parallel fetching tracks failures and provides retry information:
result = ws.fetch_events(
name="events",
from_date="2024-01-01",
to_date="2024-03-31",
parallel=True
)
if result.has_failures:
print(f"Warning: {result.failed_batches} batches failed")
for from_date, to_date in result.failed_date_ranges:
print(f" Failed: {from_date} to {to_date}")
# Retry failed ranges with append mode
for from_date, to_date in result.failed_date_ranges:
ws.fetch_events(
name="events",
from_date=from_date,
to_date=to_date,
append=True # Append to existing table
)
Parallel Fetch Limitations
- No
limitparameter: Parallel fetch does not support thelimitparameter. Using both raises an error. - Exit code 1 on partial failure: The CLI returns exit code 1 if any batches fail, even if some succeeded.
Fetching Profiles¶
Fetch user profiles into local storage:
Filtering Profiles¶
Use Mixpanel expression syntax:
Filtering by Cohort¶
Fetch only profiles that are members of a specific cohort:
Selecting Specific Properties¶
Reduce bandwidth and memory by fetching only the properties you need:
Combining Filters¶
Filters can be combined for precise data selection:
Fetching Specific Users by ID¶
Fetch one or more specific users by their distinct ID:
Mutually Exclusive
distinct_id and distinct_ids cannot be used together. Choose one approach based on your needs.
Fetching Group Profiles¶
Fetch group profiles (companies, accounts, etc.) instead of user profiles:
Behavioral Filtering¶
Filter profiles by event behavior—users who performed specific actions. Behaviors use a named pattern that you reference in a where clause:
# Users who purchased in the last 30 days
result = ws.fetch_profiles(
name="recent_purchasers",
behaviors=[{
"window": "30d",
"name": "made_purchase",
"event_selectors": [{"event": "Purchase"}]
}],
where='(behaviors["made_purchase"] > 0)'
)
# Users with multiple behavior criteria
result = ws.fetch_profiles(
name="engaged_users",
behaviors=[
{
"window": "30d",
"name": "purchased",
"event_selectors": [{"event": "Purchase"}]
},
{
"window": "7d",
"name": "active",
"event_selectors": [{"event": "Page View"}]
}
],
where='(behaviors["purchased"] > 0) and (behaviors["active"] >= 5)'
)
# Users who purchased in the last 30 days
mp fetch profiles recent_purchasers \
--behaviors '[{"window":"30d","name":"made_purchase","event_selectors":[{"event":"Purchase"}]}]' \
--where '(behaviors["made_purchase"] > 0)'
# Users with multiple behavior criteria
mp fetch profiles engaged_users \
--behaviors '[{"window":"30d","name":"purchased","event_selectors":[{"event":"Purchase"}]},{"window":"7d","name":"active","event_selectors":[{"event":"Page View"}]}]' \
--where '(behaviors["purchased"] > 0) and (behaviors["active"] >= 5)'
Behavior Format
Each behavior requires:
window: Time window (e.g., "30d", "7d", "90d")name: Identifier to reference inwhereclauseevent_selectors: Array of event filters with{"event": "Event Name"}
The where clause filters using behaviors["name"] to check counts.
Mutually Exclusive
behaviors and cohort_id cannot be used together. Use one or the other for filtering.
Historical Profile State¶
Query profile properties as they existed at a specific point in time:
Cohort Membership Analysis¶
Include all users and mark whether they're in a cohort:
This is useful for comparing users inside and outside a cohort. The response includes a membership indicator for each profile.
Requires Cohort
include_all_users requires cohort_id. It has no effect without specifying a cohort.
Parallel Profile Fetching¶
For large profile datasets (thousands of profiles), parallel fetching can dramatically speed up exports—up to 5x faster.
Basic Parallel Profile Fetch¶
Enable parallel fetching with the parallel flag:
Parallel profile fetching uses page-based parallelism—fetching multiple pages of profiles concurrently using a session ID for consistency.
How It Works¶
- Session-Based Pagination: The initial page establishes a session ID for consistent results
- Dynamic Page Discovery: Pages are fetched as they're discovered (not pre-scheduled)
- Concurrent Fetching: Multiple threads fetch pages simultaneously (default: 5 workers)
- Single-Writer Queue: A dedicated writer thread serializes writes to DuckDB
- Partial Failure Handling: Failed pages are tracked for potential retry
Performance¶
| Profile Count | Sequential | Parallel (5 workers) | Speedup |
|---|---|---|---|
| 1,000 | ~2s | ~2s | 1x (no benefit) |
| 10,000 | ~10s | ~3s | 3x |
| 50,000 | ~50s | ~12s | 4x |
When to Use Parallel Profile Fetching
- Use parallel for datasets with 5,000+ profiles
- Use sequential for small datasets or when you need maximum consistency
Configuring Workers¶
Control the number of concurrent fetch threads:
Worker Limit
Workers are capped at 5 to avoid Mixpanel API rate limits (60 requests/hour for Engage API). Requesting more than 5 workers will be automatically capped.
Progress Callbacks¶
Monitor page completion with a callback:
from mixpanel_data import ProfileProgress
def on_page(progress: ProfileProgress) -> None:
status = "âś“" if progress.success else "âś—"
print(f"[{status}] Page {progress.page_index}: "
f"{progress.rows} rows (cumulative: {progress.cumulative_rows})")
result = ws.fetch_profiles(
name="users",
parallel=True,
on_page_complete=on_page
)
The CLI automatically displays page progress when --parallel is used.
Handling Failures¶
Parallel fetching tracks failures and provides information for debugging:
result = ws.fetch_profiles(
name="users",
parallel=True
)
if result.has_failures:
print(f"Warning: {result.failed_pages} pages failed")
print(f"Failed page indices: {result.failed_page_indices}")
Parallel Profile Fetch Limitations
- Rate limits: The Engage API has a 60 requests/hour limit. Large exports with many pages may hit this limit.
- Exit code 1 on partial failure: The CLI returns exit code 1 if any pages fail, even if some succeeded.
Combining with Filters¶
Parallel fetching works with all profile filters:
Table Naming¶
Tables are stored with the name you provide:
ws.fetch_events(name="jan_events", ...) # Creates table: jan_events
ws.fetch_events(name="feb_events", ...) # Creates table: feb_events
ws.fetch_profiles(name="users") # Creates table: users
Table Names Must Be Unique
Fetching to an existing table name raises TableExistsError. Use --replace to overwrite, --append to add data, or choose a different name.
Replacing and Appending¶
Replace Mode¶
Drop and recreate a table with fresh data:
Append Mode¶
Add data to an existing table. Duplicates (by insert_id for events, distinct_id for profiles) are automatically skipped:
Resuming Failed Fetches
If a fetch crashes or times out, use append mode to resume from where you left off:
# Check the last event timestamp
mp query sql "SELECT MAX(event_time) FROM events"
# 2025-01-15T14:30:00
# Resume from that point
mp fetch events --from 2025-01-15 --to 2025-01-31 --append
Overlapping date ranges are safe—duplicates are automatically skipped.
Table Management¶
Listing Tables¶
tables = ws.tables()
for table in tables:
print(f"{table.name}: {table.row_count} rows ({table.type})")
Viewing Table Schema¶
Dropping Tables¶
FetchResult¶
Both fetch_events() and fetch_profiles() return a FetchResult:
result = ws.fetch_events(...)
# Attributes
result.table_name # "jan_events"
result.row_count # 125000
result.duration_seconds # 45.2
# Metadata
result.metadata.from_date # "2025-01-01"
result.metadata.to_date # "2025-01-31"
result.metadata.events # ["Purchase", "Signup"] or None
result.metadata.where # 'properties["plan"]...' or None
result.metadata.fetched_at # datetime
# Serialization
result.to_dict() # JSON-serializable dict
Event Table Schema¶
Fetched events have this schema:
| Column | Type | Description |
|---|---|---|
event_id |
VARCHAR | Unique event identifier |
event_name |
VARCHAR | Event name |
event_time |
TIMESTAMP | When the event occurred |
distinct_id |
VARCHAR | User identifier |
insert_id |
VARCHAR | Deduplication ID |
properties |
JSON | All event properties |
Profile Table Schema¶
Fetched profiles have this schema:
| Column | Type | Description |
|---|---|---|
distinct_id |
VARCHAR | User identifier (primary key) |
properties |
JSON | All profile properties |
Best Practices¶
Use Parallel Fetching for Large Date Ranges¶
For date ranges longer than a week, use parallel fetching for the best performance:
Parallel fetching automatically handles chunking, concurrent API requests, and serialized writes to DuckDB—no manual chunking required.
Manual Chunking (Alternative)¶
If you need the limit parameter (incompatible with parallel), or want fine-grained control, you can manually chunk:
import datetime
# Fetch first chunk
ws.fetch_events(
name="events_2025",
from_date="2025-01-01",
to_date="2025-01-31"
)
# Append subsequent chunks
start = datetime.date(2025, 2, 1)
end = datetime.date(2025, 12, 31)
current = start
while current <= end:
chunk_end = min(current + datetime.timedelta(days=30), end)
ws.fetch_events(
name="events_2025",
from_date=str(current),
to_date=str(chunk_end),
append=True # Add to existing table
)
current = chunk_end + datetime.timedelta(days=1)
import datetime
start = datetime.date(2025, 1, 1)
end = datetime.date(2025, 12, 31)
current = start
while current < end:
chunk_end = min(current + datetime.timedelta(days=30), end)
table_name = f"events_{current.strftime('%Y%m')}"
ws.fetch_events(
name=table_name,
from_date=str(current),
to_date=str(chunk_end)
)
current = chunk_end + datetime.timedelta(days=1)
Choose the Right Storage Mode¶
mixpanel_data offers three storage modes:
| Mode | Method | Disk Usage | Best For |
|---|---|---|---|
| Persistent | Workspace() |
Yes (permanent) | Repeated analysis, large datasets |
| Ephemeral | Workspace.ephemeral() |
Yes (temp file, auto-deleted) | One-off analysis with large data |
| In-Memory | Workspace.memory() |
None | Small datasets, testing, zero disk footprint |
Ephemeral mode creates a temp file that benefits from DuckDB's compression—up to 8× faster for large datasets:
with mp.Workspace.ephemeral() as ws:
ws.fetch_events("events", from_date="2025-01-01", to_date="2025-01-31")
result = ws.sql("SELECT event_name, COUNT(*) FROM events GROUP BY 1")
# Database automatically deleted
In-memory mode creates no files at all—ideal for small datasets, unit tests, or privacy-sensitive scenarios:
with mp.Workspace.memory() as ws:
ws.fetch_events("events", from_date="2025-01-01", to_date="2025-01-07")
total = ws.sql_scalar("SELECT COUNT(*) FROM events")
# Database gone - no files ever created
When to use each mode
- Persistent: You'll query the same data multiple times across sessions
- Ephemeral: Large datasets where you need compression benefits but won't keep the data
- In-Memory: Small datasets, unit tests, or when zero disk footprint is required
Streaming as an Alternative¶
If you don't need to store data locally, use streaming instead:
| Approach | Storage | Best For |
|---|---|---|
fetch_events() |
DuckDB table | Repeated SQL analysis |
stream_events() |
None | ETL pipelines, one-time processing |
# Stream directly without storage
for event in ws.stream_events(from_date="2025-01-01", to_date="2025-01-31"):
send_to_warehouse(event)
See Streaming Data for details.
Next Steps¶
- Streaming Data — Process data without local storage
- SQL Queries — Query your fetched data with SQL
- Live Analytics — Query Mixpanel directly for real-time data