Skip to content

Test Creation Guide: PySpark Parity via Fixtures

This document explains how to create and maintain behavioral parity tests between PySpark and robin-sparkless.

Sparkless integration: Robin-sparkless is designed to replace the backend of Sparkless. Sparkless has 270+ expected_outputs that can be converted to robin-sparkless fixtures. See SPARKLESS_INTEGRATION_ANALYSIS.md §4 for fixture format comparison and conversion strategy.

The goal is to: - Use PySpark as the oracle for correct behavior. - Generate JSON fixtures from PySpark runs. - Consume those fixtures in Rust tests to ensure robin-sparkless matches PySpark.

Note: Running the Rust test suite (make test, make check-full) does not require PySpark or Java. PySpark/Java 17+ are only required when generating or refreshing fixtures (e.g. via tests/convert_sparkless_fixtures.py and tests/regenerate_expected_from_pyspark.py, or when running make sparkless-parity with regeneration enabled).


1. Overview

We separate the flow into three layers:

  1. Scenario definition (Python + PySpark, optional)
    Small scripts that build PySpark DataFrames, apply operations, and record the expected results when you need new fixtures or want to refresh existing ones.

  2. Fixtures (JSON)
    A stable, machine-readable description of:

  3. Input schema and rows
  4. The sequence of operations
  5. Expected schema and rows

  6. Rust test harness
    tests/parity.rs reads fixtures, reconstructs the input as a robin-sparkless DataFrame, applies the same operations, and compares results.


2. Fixture Format

Fixtures live under tests/fixtures/ and are plain JSON files.

Minimal schema

{
  "name": "filter_age_gt_30",
  "pyspark_version": "3.5.0",

  "input": {
    "schema": [
      { "name": "id",   "type": "int" },
      { "name": "age",  "type": "int" },
      { "name": "name", "type": "string" }
    ],
    "rows": [
      [1, 25, "Alice"],
      [2, 30, "Bob"],
      [3, 35, "Charlie"]
    ]
  },

  "operations": [
    { "op": "filter",  "expr": "col('age') > 30" },
    { "op": "select",  "columns": ["name", "age"] },
    { "op": "orderBy", "columns": ["name"], "ascending": [true] }
  ],

  "expected": {
    "schema": [
      { "name": "name", "type": "string" },
      { "name": "age",  "type": "int" }
    ],
    "rows": [
      ["Charlie", 35]
    ]
  }
}

Notes: - type uses PySpark’s simple type names (int, bigint, string, double, boolean, etc.). - rows are lists of JSON values; null is allowed. - operations are descriptive, not executable Python strings; we only embed simple PySpark-style expressions for filter to keep parity obvious.

Date and timestamp columns

The parity harness supports date and timestamp (or datetime) column types so that datetime functions (e.g. date_add, datediff, current_date) can be tested with real date/datetime inputs.

  • Schema: Use "type": "date" or "type": "timestamp" (or "datetime", "timestamp_ntz") in input.schema and expected.schema.
  • Row values: Use JSON strings in ISO format:
  • Date: "YYYY-MM-DD" (e.g. "2024-01-15").
  • Timestamp: "YYYY-MM-DDTHH:MM:SS" or "YYYY-MM-DDTHH:MM:SS.ffffff" (e.g. "2024-01-15T12:00:00", "2024-01-15T12:00:00.123456").
  • Timestamp can also be given as a numeric (epoch microseconds) in the JSON when building the column.

3. PySpark Generator Script

Requirements: PySpark (pip install pyspark) and Java 17 or newer. Set JAVA_HOME to a JDK 17+ installation if you see JVM or UnsupportedClassVersionError when starting PySpark.

Create a Python script, e.g. tests/gen_pyspark_cases.py, that:

  1. Starts a PySpark SparkSession (pinned PySpark version).
  2. Defines scenario functions that:
  3. Build a PySpark DataFrame from literal data.
  4. Apply a sequence of transformations/actions.
  5. Collect both input and expected sections.
  6. Writes JSON fixtures into tests/fixtures/.

Example skeleton

from pyspark.sql import SparkSession
import json
from pathlib import Path

spark = SparkSession.builder.appName("rs_parity_gen").getOrCreate()


def case_filter_age_gt_30():
    data = [(1, 25, "Alice"), (2, 30, "Bob"), (3, 35, "Charlie")]
    df = spark.createDataFrame(data, ["id", "age", "name"])

    out_df = df.filter("age > 30").select("name", "age").orderBy("name")

    def schema_to_json(schema):
        return [{"name": f.name, "type": f.dataType.simpleString()}
                for f in schema.fields]

    input_schema = schema_to_json(df.schema)
    input_rows = [list(r) for r in df.collect()]

    expected_schema = schema_to_json(out_df.schema)
    expected_rows = [list(r) for r in out_df.collect()]

    return {
        "name": "filter_age_gt_30",
        "pyspark_version": spark.version,
        "input": {"schema": input_schema, "rows": input_rows},
        "operations": [
            {"op": "filter", "expr": "col('age') > 30"},
            {"op": "select", "columns": ["name", "age"]},
            {"op": "orderBy", "columns": ["name"], "ascending": [True]}
        ],
        "expected": {"schema": expected_schema, "rows": expected_rows},
    }


def main():
    out_dir = Path("tests/fixtures")
    out_dir.mkdir(parents=True, exist_ok=True)

    fixtures = [
        case_filter_age_gt_30(),
        # add more scenarios here
    ]

    for fx in fixtures:
        path = out_dir / f"{fx['name']}.json"
        path.write_text(json.dumps(fx, indent=2))


if __name__ == "__main__":
    main()

Run this script manually (or via a Makefile target) whenever you add or update scenarios.


4. Rust Test Harness (tests/parity.rs)

The file tests/parity.rs implements the full parity flow:

  • It defines Fixture, InputSection, ColumnSpec, ExpectedSection, and Operation to mirror the JSON format.
  • The test pyspark_parity_fixtures:
  • Scans tests/fixtures/, tests/fixtures/converted/, and tests/fixtures/pyspark_extracted/.
  • Deserializes each JSON fixture, skips any with "skip": true.
  • For each fixture, calls run_fixture(&fixture), which:
    1. Input reconstruction: Builds a DataFrame from input.schema and input.rows (or from input.file_source for read_csv/read_parquet/read_json).
    2. Operation dispatch: Applies each Operation in sequence (filter, select, orderBy, groupBy, agg, join, withColumn, window, union, distinct, drop, dropna, fillna, limit, withColumnRenamed, replace, crossJoin, describe, subtract, intersect, first, head, offset, summary, etc.).
    3. Comparison: Collects the result to schema + rows and compares to expected (schema equality and row equality with optional order handling).
  • Collects all failures and reports them at the end with fixture names (so CI shows every failing fixture, not just the first).

Running a single fixture: Set the PARITY_FIXTURE environment variable to the fixture name (e.g. PARITY_FIXTURE=groupby_count) to run only that fixture. Example:

PARITY_FIXTURE=groupby_count cargo test pyspark_parity_fixtures

Running by phase: Set the PARITY_PHASE environment variable to run only fixtures in that phase's manifest. Phases are ag; the mapping lives in tests/fixtures/phase_manifest.json. Example:

PARITY_PHASE=a cargo test pyspark_parity_fixtures
make test-parity-phase-a   # same as above
make test-parity-phases    # runs all phases (a through g)

Phase A = signature alignment; B = high-value functions; C = Reader/Writer; D = DataFrame methods; E = SparkSession/Catalog (no fixtures, Python-only); F = behavioral; G = fixture expansion. See PARITY_STATUS.md for the phase-to-fixture mapping.

Extending the harness

When adding new operations or expression forms:

  1. New operation type: Add a variant to the Operation enum in parity.rs, deserialize it from the fixture JSON, and handle it in apply_operations.
  2. New expression in filter/withColumn: Extend parse_with_column_expr (and any related helpers) so the fixture expr string is parsed into the correct Rust Expr.
  3. Comparison: Schema and row comparison already support nulls, numbers, strings, arrays; extend values_equal / compare_values if new types need special handling.

5. Workflow for Adding a New Parity Test

  1. Pick a PySpark behavior to cover
  2. Example: groupBy("name").count() with nulls in the grouping column.

  3. Add a scenario function in gen_pyspark_cases.py

  4. Build the PySpark input DataFrame.
  5. Apply the desired operations.
  6. Emit a fixture JSON.

  7. Regenerate fixtures

  8. Run python tests/gen_pyspark_cases.py.

  9. Update Rust harness if needed

  10. If the new scenario uses an operation not yet supported by the dispatcher, update Operation and apply_op logic.

  11. Update phase manifest (optional)

  12. Add the new fixture name to the appropriate phase in tests/fixtures/phase_manifest.json so it is included in phase-specific tests (make test-parity-phase-X).

  13. Run tests

  14. cargo test pyspark_parity_fixtures (runs all fixtures).
  15. To run a single fixture: PARITY_FIXTURE=<name> cargo test pyspark_parity_fixtures.
  16. To run by phase: PARITY_PHASE=a cargo test pyspark_parity_fixtures or make test-parity-phase-a.

6. Tracking Coverage

Maintain a simple matrix (e.g. in PARITY_STATUS.md or ROADMAP.md) with:

  • Rows: operations / behaviors (e.g. filter >, filter == null, groupBy+count, inner join, etc.).
  • Columns: data-type combinations, null presence, edge cases.
  • Cell values: “covered by fixture X”, “not yet covered”, or “intentionally diverges from PySpark”.

This makes it clear where Robin Sparkless truly emulates PySpark today and where work remains.


7. Sparkless Fixture Conversion

Sparkless uses a different fixture format (input_data as dict rows, expected_output with schema/data). A converter can:

  1. Read Sparkless tests/expected_outputs/*.json
  2. Map input_datainput.schema + input.rows
  3. Infer or annotate operations from the test name/category
  4. Map expected_outputexpected.schema + expected.rows
  5. Write robin-sparkless fixtures to tests/fixtures/

This lets both projects validate against the same logical test cases. See SPARKLESS_INTEGRATION_ANALYSIS.md §4.1–4.4 for details.