sparkless.testing Guide¶
The sparkless.testing module provides a unified framework for writing tests that run against both sparkless (Rust/Polars backend) and PySpark (JVM backend). This enables you to:
- Write tests once, run against both backends
- Validate your code produces identical results on both engines
- Run fast local tests with sparkless, and integration tests with PySpark
- Use consistent fixtures and comparison utilities
Quick Start¶
1. Add the pytest plugin to your conftest.py¶
This automatically registers fixtures (spark, spark_mode, spark_imports, etc.) and pytest markers.
2. Write a test using the spark fixture¶
def test_filter(spark):
df = spark.createDataFrame([
{"id": 1, "name": "Alice"},
{"id": 2, "name": "Bob"},
])
result = df.filter(df.id > 1).collect()
assert len(result) == 1
assert result[0]["name"] == "Bob"
3. Run tests¶
# Run with sparkless (default, fast)
pytest tests/
# Run with PySpark (validates parity)
SPARKLESS_TEST_MODE=pyspark pytest tests/
Maintainer gate (full suite)¶
CI runs a fast subset by default (pytest tests -m "not delta and not integration"). Before merging substantial engine or Python binding changes, run the full suite locally:
Optional: trigger the Full Python tests or PySpark smoke workflow in GitHub Actions (workflow_dispatch) for release-build verification on CI runners.
Expected skips¶
As of May 2026, pytest tests -n 12 reports 64 skipped tests. These are intentional, not suite failures:
| Category | Examples | Reason |
|---|---|---|
| Delta / integration | -m delta, -m integration |
Not in default CI subset; run python-delta job or SPARKLESS_ENABLE_DELTA=1 pytest -m delta -n 0 |
| JDBC / Docker | tests/sql/test_jdbc_sqlite.py |
Requires Docker JDBC fixture |
| PySpark-only / pyspark4_only | Some parity window tests | Oracle or JVM-only behavior |
| Deferred / env | SQL feature flags, regex lookaround, CTAS with Hive (#1508) | Documented in test skip reason |
CI default: pytest tests -m "not delta and not integration" -n 4 and pytest tests/parity/ after maturin develop --release.
Before release, maintainers should run make check-full (Rust --all-features, ruff, mypy, and full Python suite locally).
Environment Variable¶
The test backend is controlled by the SPARKLESS_TEST_MODE environment variable:
| Value | Backend | Use Case |
|---|---|---|
sparkless (default) |
Sparkless (Rust/Polars) | Fast local tests, CI |
pyspark |
PySpark (JVM) | Parity validation, integration tests |
# Fast local tests
pytest tests/
# Validate against PySpark
SPARKLESS_TEST_MODE=pyspark pytest tests/
# Explicit sparkless mode
SPARKLESS_TEST_MODE=sparkless pytest tests/
Fixtures¶
spark¶
The main fixture providing a SparkSession for the current mode.
def test_create_dataframe(spark):
df = spark.createDataFrame([{"x": 1}, {"x": 2}])
assert df.count() == 2
spark_mode¶
Returns the current Mode enum (Mode.SPARKLESS or Mode.PYSPARK).
from sparkless.testing import Mode
def test_mode_specific_behavior(spark, spark_mode):
df = spark.createDataFrame([{"id": 1}])
if spark_mode == Mode.PYSPARK:
# PySpark-specific assertion
assert hasattr(df, "_jdf")
else:
# Sparkless-specific assertion
pass
spark_imports¶
Provides mode-appropriate imports (SparkSession, functions, types).
def test_with_imports(spark, spark_imports):
F = spark_imports.F
df = spark.createDataFrame([{"name": "alice"}])
result = df.select(F.upper("name")).collect()
assert result[0][0] == "ALICE"
isolated_session¶
Creates a fresh, isolated SparkSession (useful for tests that modify session state).
def test_isolated(isolated_session):
spark = isolated_session
spark.conf.set("my.custom.config", "value")
# This session is independent of other tests
table_prefix¶
Provides a unique prefix for table names (useful when sharing sessions).
def test_with_table(spark, table_prefix):
df = spark.createDataFrame([{"id": 1}])
table_name = f"{table_prefix}_my_table"
df.write.saveAsTable(table_name)
# Table name is unique per test
Markers¶
@pytest.mark.sparkless_only¶
Skip test when running in PySpark mode.
@pytest.mark.sparkless_only
def test_sparkless_specific_feature(spark):
# This test only runs in sparkless mode
pass
@pytest.mark.pyspark_only¶
Skip test when running in sparkless mode.
@pytest.mark.pyspark_only
def test_pyspark_specific_feature(spark):
# This test only runs in PySpark mode
pass
@pytest.mark.backend("sparkless") / @pytest.mark.backend("pyspark")¶
Force a specific backend for a test (overrides environment variable).
@pytest.mark.backend("pyspark")
def test_always_pyspark(spark):
# This test always uses PySpark
pass
Direct API Usage¶
You can also use sparkless.testing directly without pytest fixtures.
Mode Detection¶
from sparkless.testing import Mode, get_mode, is_pyspark_mode, is_sparkless_mode
mode = get_mode() # Mode.SPARKLESS or Mode.PYSPARK
if is_pyspark_mode():
print("Running with PySpark")
elif is_sparkless_mode():
print("Running with sparkless")
Session Creation¶
from sparkless.testing import create_session, Mode
# Create session for current mode
spark = create_session(app_name="my_test")
# Create session for specific mode
sparkless_spark = create_session(app_name="test", mode=Mode.SPARKLESS)
pyspark_spark = create_session(app_name="test", mode=Mode.PYSPARK)
Unified Imports¶
from sparkless.testing import get_imports
imports = get_imports()
# Access Spark classes and functions
SparkSession = imports.SparkSession
F = imports.F # functions module
Window = imports.Window
Row = imports.Row
# Data types
StructType = imports.StructType
StructField = imports.StructField
StringType = imports.StringType
IntegerType = imports.IntegerType
# ... and more
DataFrame Comparison¶
The module provides utilities for comparing DataFrames, which is essential for parity testing.
assert_dataframes_equal¶
Assert two DataFrames are equivalent.
from sparkless.testing import assert_dataframes_equal
def test_transform(spark):
input_df = spark.createDataFrame([{"x": 1}, {"x": 2}])
result = input_df.select(input_df.x * 2)
expected = spark.createDataFrame([{"(x * 2)": 2}, {"(x * 2)": 4}])
assert_dataframes_equal(result, expected)
Options¶
assert_dataframes_equal(
actual_df,
expected_df,
tolerance=1e-6, # Float comparison tolerance
check_schema=True, # Compare schemas
check_order=False, # Ignore row order
)
compare_dataframes¶
Get detailed comparison results without raising an exception.
from sparkless.testing import compare_dataframes
result = compare_dataframes(df1, df2)
if result.equivalent:
print("DataFrames match!")
else:
print("Differences found:")
for error in result.errors:
print(f" - {error}")
assert_rows_equal¶
Compare row collections directly.
from sparkless.testing import assert_rows_equal
rows1 = df1.collect()
rows2 = df2.collect()
assert_rows_equal(rows1, rows2, check_order=False)
Complete Example: Dual-Mode Test Suite¶
Here's a complete example of a test file using sparkless.testing:
"""Tests for my_transform module."""
import pytest
from sparkless.testing import (
Mode,
get_imports,
assert_dataframes_equal,
)
class TestMyTransform:
"""Test suite for data transformations."""
def test_basic_filter(self, spark):
"""Test basic filtering works on both backends."""
df = spark.createDataFrame([
{"id": 1, "status": "active"},
{"id": 2, "status": "inactive"},
{"id": 3, "status": "active"},
])
result = df.filter(df.status == "active")
assert result.count() == 2
def test_aggregation(self, spark, spark_imports):
"""Test aggregation with functions."""
F = spark_imports.F
df = spark.createDataFrame([
{"dept": "IT", "salary": 100},
{"dept": "IT", "salary": 200},
{"dept": "HR", "salary": 150},
])
result = df.groupBy("dept").agg(
F.sum("salary").alias("total"),
F.avg("salary").alias("avg"),
)
rows = {r["dept"]: r for r in result.collect()}
assert rows["IT"]["total"] == 300
assert rows["HR"]["total"] == 150
def test_window_function(self, spark, spark_imports):
"""Test window functions."""
F = spark_imports.F
Window = spark_imports.Window
df = spark.createDataFrame([
{"dept": "IT", "name": "Alice", "salary": 100},
{"dept": "IT", "name": "Bob", "salary": 200},
{"dept": "HR", "name": "Charlie", "salary": 150},
])
window = Window.partitionBy("dept").orderBy(F.desc("salary"))
result = df.withColumn("rank", F.rank().over(window))
rows = {r["name"]: r for r in result.collect()}
assert rows["Bob"]["rank"] == 1 # Highest in IT
assert rows["Alice"]["rank"] == 2
@pytest.mark.sparkless_only
def test_sparkless_native_feature(self, spark, spark_imports):
"""Test sparkless-specific functionality."""
# Access sparkless native module
if spark_imports._native is not None:
# Test native functionality
pass
def test_dataframe_comparison(self, spark):
"""Test DataFrame comparison utilities."""
df1 = spark.createDataFrame([
{"id": 1, "value": 10.0},
{"id": 2, "value": 20.0},
])
df2 = spark.createDataFrame([
{"id": 2, "value": 20.0},
{"id": 1, "value": 10.0},
])
# Order doesn't matter
assert_dataframes_equal(df1, df2, check_order=False)
def test_with_schema(self, spark, spark_imports):
"""Test explicit schema definition."""
StructType = spark_imports.StructType
StructField = spark_imports.StructField
StringType = spark_imports.StringType
IntegerType = spark_imports.IntegerType
schema = StructType([
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
])
df = spark.createDataFrame(
[{"name": "Alice", "age": 30}],
schema=schema,
)
assert df.schema.fields[0].name == "name"
assert df.schema.fields[1].name == "age"
CI Configuration¶
GitHub Actions Example¶
name: Tests
on: [push, pull_request]
jobs:
test-sparkless:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.11"
- run: pip install -e ./python[test]
- run: pytest tests/ -v
test-pyspark:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.11"
- uses: actions/setup-java@v4
with:
distribution: "temurin"
java-version: "11"
- run: pip install -e ./python[test,pyspark]
- run: SPARKLESS_TEST_MODE=pyspark pytest tests/ -v
API Reference¶
Mode Enum¶
from sparkless.testing import Mode
Mode.SPARKLESS # Sparkless backend
Mode.PYSPARK # PySpark backend
Functions¶
| Function | Description |
|---|---|
get_mode() |
Get current test mode from environment |
is_pyspark_mode() |
Check if running in PySpark mode |
is_sparkless_mode() |
Check if running in sparkless mode |
set_mode(mode) |
Set the test mode programmatically |
create_session(app_name, mode) |
Create a SparkSession |
get_imports(mode) |
Get mode-appropriate imports |
Comparison Functions¶
| Function | Description |
|---|---|
compare_dataframes(actual, expected, ...) |
Compare DataFrames, return result |
assert_dataframes_equal(actual, expected, ...) |
Assert DataFrames are equal |
assert_rows_equal(actual, expected, ...) |
Assert row collections are equal |
SparkImports Attributes¶
| Attribute | Description |
|---|---|
SparkSession |
The SparkSession class |
F / functions |
The functions module |
Window |
Window class for window functions |
Row |
Row class |
StructType, StructField |
Schema types |
StringType, IntegerType, LongType, etc. |
Data types |
_native |
Sparkless native module (None for PySpark) |
Best Practices¶
1. Use fixtures instead of creating sessions manually¶
# Good
def test_something(spark):
df = spark.createDataFrame(...)
# Avoid (session cleanup issues)
def test_something():
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(...)
2. Use spark_imports for portable code¶
# Good - works in both modes
def test_something(spark, spark_imports):
F = spark_imports.F
df.select(F.upper("name"))
# Avoid - mode-specific imports
def test_something(spark):
from pyspark.sql import functions as F # Only works in PySpark mode
3. Use comparison utilities for result validation¶
# Good - handles float tolerance, order, etc.
assert_dataframes_equal(result, expected, tolerance=1e-6, check_order=False)
# Fragile - manual comparison
assert result.collect() == expected.collect()
4. Mark mode-specific tests appropriately¶
@pytest.mark.sparkless_only
def test_native_feature(spark):
"""Test that only makes sense in sparkless mode."""
pass
@pytest.mark.pyspark_only
def test_jvm_feature(spark):
"""Test that requires JVM features."""
pass
5. Use table_prefix for table isolation¶
def test_with_tables(spark, table_prefix):
table_name = f"{table_prefix}_users"
df.write.saveAsTable(table_name)
# No conflicts with other tests
Troubleshooting¶
PySpark session creation fails¶
Ensure Java is installed and JAVA_HOME is set:
# macOS
brew install openjdk@11
export JAVA_HOME=/opt/homebrew/opt/openjdk@11
# Ubuntu
sudo apt install openjdk-11-jdk
export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
Import errors for sparkless.testing¶
Ensure sparkless is installed:
Tests pass in sparkless but fail in PySpark¶
This indicates a parity issue. Check PYSPARK_DIFFERENCES.md for known divergences, or file an issue if you've found a new one.
Slow PySpark tests¶
PySpark session creation is slow (~5-10s). Use shared sessions when possible:
See Also¶
- PYSPARK_DIFFERENCES.md - Known differences from PySpark
- PARITY_STATUS.md - PySpark parity coverage
- Python README - Sparkless Python package