Skip to main content

1. Overview

This service provides a simple, powerful, and extensible ETL (Extract, Transform, Load) solution for Laravel applications, specifically designed to handle nested documents from a MongoDB database. The core philosophy is to be schema-driven. A simple YAML file defines the data structure, validation rules, API visibility, and even frontend behavior for a given data model. This creates a “single source of truth” that simplifies development and ensures consistency across your application. Key Features:
  • Schema-Driven: Define your data models once in a YAML file.
  • MongoDB-Friendly: Easily import and export complex documents with nested objects and arrays.
  • User-Friendly Templates: Uses a simple “parent row” convention in spreadsheets, making it easy for non-technical users to populate data with nested items.
  • Extensible Drivers: Supports both maatwebsite/excel (feature-rich) and openspout (high-performance) for a balance of features and speed.
  • Fluent Export API: A clean, chainable interface for building and dispatching complex export jobs (e.g., Etl::export()->from(...)->chunk(...)).
  • Integrated Validation: The schema can define Laravel validation rules, which can be used in your Form Requests.
  • API & UI Generation: The schema contains metadata to dynamically control API resources and render frontend forms.

2. Core Concepts

2.1. The Schema (.yml file)

The schema is the heart of the service. It’s a YAML file (e.g., user_schema.yml) that defines everything about a data model.
PropertyDescription
primary_keyRequired. The unique field (e.g., _id) used to identify a document. The import process uses this to group rows.
fieldsRequired. An object containing the definition for each field in the document.
fields.<key>.typeThe data type (string, integer, float, boolean, datetime, array, object). Used for casting.
fields.<key>.header(Optional) A human-readable label to use as the column header in the spreadsheet.
fields.<key>.validator(Optional) A Laravel-compatible validation string. Use {{id}} as a placeholder for the record’s ID in unique rules.
fields.<key>.edit(Optional) Defines behavior in an edit form: true (editable), false (hidden), 'RO' (Read-Only), 'VO' (View-Only).
fields.<key>.create(Optional) Defines behavior in a create form: true (editable) or false (hidden).
fields.<key>.view(Optional) true or false. Determines if the field is shown on detail pages or tables.
fields.<key>.api(Optional) true or false. Determines if the field is exposed in your API resources.
fields.<key>.objectFor fields of type: array or type: object, this nested key contains the schema for the sub-fields.

2.2. The Spreadsheet Format

To represent a nested MongoDB document in a flat spreadsheet, we use a “parent row” convention.
  • Headers: Column headers are generated from the schema’s header or field key. Nested object fields use dot notation (e.g., settings.theme).
  • Parent Row: The first row for a document contains all the main data and the data for the first item in the primary embedded array.
  • Child Rows: To add more items to the embedded array, add new rows directly below the parent row. On these child rows, leave the parent columns blank. This signals to the importer that they belong to the document above.

3. Directory Structure

app/
├── Console/
│   └── Commands/
│       └── Test/
│           └── EtlTestPipeline.php     # Artisan command for testing
├── Providers/
│   └── EtlServiceProvider.php      # Registers the service
└── Services/
    └── Etl/
        ├── Contracts/
        │   ├── ExportDriver.php
        │   └── ImportDriver.php
        ├── Drivers/
        │   ├── Exports/
        │   │   ├── OpenSpoutSchemaExportDriver.php
        │   │   └── StandardSchemaExportDriver.php
        │   └── Imports/
        │       ├── OpenSpoutSchemaImportDriver.php
        │       └── StandardSchemaImportDriver.php
        ├── Etl.php                     # The service Facade
        ├── EtlManager.php              # Main service manager
        ├── ExportBuilder.php           # Fluent API for exports
        └── SchemaService.php           # Loads and parses YAML schemas
config/
└── etl_schemas/
    └── user_schema.yml             # Example schema definition

4. How to Use

4.1. Importing Data

In a controller, use the Etl facade to select a driver and process the uploaded file.
use App\Models\User;
use Etl; // The alias from config/app.php
use Illuminate\Http\Request;

public function import(Request $request)
{
    $request->validate(['file' => 'required|mimes:xlsx,xls']);
    $filePath = $request->file('file')->getRealPath();

    // Use the high-performance OpenSpout driver
    $dataToInsert = Etl::driver('open_spout_schema_import')
                       ->process($filePath, 'user_schema');

    foreach ($dataToInsert as $doc) {
        User::updateOrCreate(['_id' => $doc['_id']], $doc);
    }
    return back()->with('success', 'Data imported successfully!');
}

4.2. Exporting Data (Fluent API)

Use the fluent export() method to build and dispatch export jobs. Simple Export:
use App\Models\User;
use Etl;

// Creates 'all-users.xlsx' in storage/app/
$files = Etl::export()
    ->from(User::class)
    ->usingSchema('user_schema')
    ->to('all-users.xlsx')
    ->dispatch();
Paginated (Chunked) Export:
// Creates multiple files (e.g., users-part-1.xlsx, users-part-2.xlsx)
$files = Etl::export()
    ->from(User::class)
    ->usingSchema('user_schema')
    ->to('users.xlsx')
    ->chunk(5000) // Create a new file for every 5000 records
    ->dispatch();

4.3. Testing the Pipeline

Use the provided Artisan command to test the full export/import cycle.
# Test with the Maatwebsite driver (default)
php artisan etl:test-pipeline user_schema User

# Test with the OpenSpout driver
php artisan etl:test-pipeline user_schema User --driver=openspout

# Test with 100 records and keep the file for inspection
php artisan etl:test-pipeline user_schema User --count=100 --keep-file

5. Full Source Code

Here is the complete source code for every class in the service.

5.1. Configuration Schema

File: config/etl_schemas/user_schema.yml
primary_key: _id
fields:
  _id: { type: string, header: 'User ID', validator: 'required|string|unique:users,_id,{{id}},_id', edit: 'RO', create: true, view: true, api: true }
  name: { type: string, header: 'Full Name', validator: 'required|string|max:255', edit: true, create: true, view: true, api: true }
  email: { type: string, header: 'Email Address', validator: 'required|email|unique:users,email,{{id}},_id', edit: true, create: true, view: true, api: true }
  age: { type: integer, default: null, validator: 'nullable|integer|min:0|max:150', edit: true, create: true, view: true, api: true }
  registered_at: { type: datetime, default: null, validator: 'nullable|date', edit: 'VO', create: false, view: true, api: true }
  addresses:
    type: array
    validator: 'nullable|array|min:1'
    edit: true
    create: true
    view: true
    api: true
    object:
      street: { type: string, header: 'Address Street', validator: 'required_with:addresses|string|max:255' }
      city: { type: string, header: 'Address City', validator: 'required_with:addresses|string|max:100' }
      postal_code: { type: string, header: 'Address Postal Code', validator: 'required_with:addresses|string|max:20' }
  settings:
    type: object
    validator: 'nullable|array'
    edit: true
    create: true
    view: true
    api: true
    object:
      notifications_enabled: { type: boolean, header: 'Enable Notifications', default: true, validator: 'required|boolean' }
      theme: { type: string, header: 'UI Theme', default: 'light', validator: 'required|string|in:light,dark,system' }
  internal_notes: { type: string, validator: 'nullable|string', edit: false, create: false, view: false, api: false }

5.2. Contracts (Interfaces)

File: app/Services/Etl/Contracts/ImportDriver.php
<?php
namespace App\Services\Etl\Contracts;
use Illuminate\Support\Collection;
interface ImportDriver
{
    public function process(string $filePath, string $schemaName, array $options = []): Collection;
}
File: app/Services/Etl/Contracts/ExportDriver.php
<?php
namespace App\Services\Etl\Contracts;
use Illuminate\Support\Collection;
interface ExportDriver
{
    public function generate(Collection $data, string $schemaName, string $filename);
}

5.3. Core Services & Builders

File: app/Services/Etl/SchemaService.php
<?php
namespace App\Services\Etl;

use Illuminate\Support\Facades\Cache;
use Symfony\Component\Yaml\Yaml;

class SchemaService
{
    public function get(string $name): array
    {
        return Cache::rememberForever("etl_schema_{$name}", function () use ($name) {
            $path = config_path("etl_schemas/{$name}.yml");
            if (!file_exists($path)) {
                throw new \Exception("Schema file not found for '{$name}' at {$path}");
            }
            $schema = Yaml::parseFile($path);
            return $this->processSchema($schema);
        });
    }

    protected function processSchema(array $schema): array
    {
        $processed = [
            'primary_key' => $schema['primary_key'],
            'fields' => $schema['fields'],
            'flat_fields' => [],
            'array_fields' => [],
        ];
        foreach ($schema['fields'] as $key => $config) {
            if ($config['type'] === 'array') {
                $processed['array_fields'][$key] = $config;
            } elseif ($config['type'] === 'object') {
                foreach ($config['object'] as $subKey => $subConfig) {
                    $processed['flat_fields']["{$key}.{$subKey}"] = $subConfig;
                }
            } else {
                $processed['flat_fields'][$key] = $config;
            }
        }
        return $processed;
    }
}
File: app/Services/Etl/EtlManager.php
<?php
namespace App\Services\Etl;

use App\Services\Etl\Drivers\Exports\OpenSpoutSchemaExportDriver;
use App\Services\Etl\Drivers\Exports\StandardSchemaExportDriver;
use App\Services\Etl\Drivers\Imports\OpenSpoutSchemaImportDriver;
use App\Services\Etl\Drivers\Imports\StandardSchemaImportDriver;
use Illuminate\Support\Manager;

class EtlManager extends Manager
{
    public function getDefaultDriver(){ return 'standard_schema_import'; }
    public function createStandardSchemaImportDriver(){ return new StandardSchemaImportDriver(app(SchemaService::class)); }
    public function createStandardSchemaExportDriver(){ return new StandardSchemaExportDriver(app(SchemaService::class)); }
    public function createOpenSpoutSchemaImportDriver(){ return new OpenSpoutSchemaImportDriver(app(SchemaService::class)); }
    public function createOpenSpoutSchemaExportDriver(){ return new OpenSpoutSchemaExportDriver(app(SchemaService::class)); }
    public function export(): ExportBuilder { return new ExportBuilder($this); }
}
File: app/Services/Etl/ExportBuilder.php
<?php
namespace App\Services\Etl;

use Illuminate\Database\Eloquent\Builder;
use Illuminate\Support\Facades\Storage;
use Illuminate\Support\Str;

class ExportBuilder
{
    protected string $modelClass;
    protected ?Builder $query = null;
    protected string $schemaName;
    protected string $driverName = 'standard_schema_export';
    protected string $disk = 'local';
    protected string $baseFilename;
    protected ?int $chunkSize = null;

    public function __construct(protected EtlManager $etlManager) {}
    public function from(string $modelClass): self { $this->modelClass = $modelClass; return $this; }
    public function query(Builder $query): self { $this->query = $query; return $this; }
    public function usingSchema(string $schemaName): self { $this->schemaName = $schemaName; return $this; }
    public function withDriver(string $driverName): self { $this->driverName = $driverName; return $this; }
    public function to(string $filename): self { $this->baseFilename = Str::endsWith($filename, '.xlsx') ? $filename : $filename . '.xlsx'; return $this; }
    public function onDisk(string $disk): self { $this->disk = $disk; return $this; }
    public function chunk(int $size): self { $this->chunkSize = $size; return $this; }

    public function dispatch(): array
    {
        $driver = $this->etlManager->driver($this->driverName);
        $baseQuery = $this->query ?? $this->modelClass::query();
        $generatedFiles = [];

        if ($this->chunkSize > 0) {
            $fileNumber = 1;
            $baseName = pathinfo($this->baseFilename, PATHINFO_FILENAME);
            $extension = pathinfo($this->baseFilename, PATHINFO_EXTENSION);

            $baseQuery->chunkById($this->chunkSize, function ($records) use (&$fileNumber, &$generatedFiles, $driver, $baseName, $extension) {
                if ($records->isEmpty()) return;
                $chunkFilename = "{$baseName}-part-{$fileNumber}.{$extension}";
                $fullPath = Storage::disk($this->disk)->path($chunkFilename);
                $driver->generate($records, $this->schemaName, $fullPath);
                $generatedFiles[] = $chunkFilename;
                $fileNumber++;
            });
        } else {
            $data = $baseQuery->get();
            $fullPath = Storage::disk($this->disk)->path($this->baseFilename);
            $driver->generate($data, $this->schemaName, $fullPath);
            $generatedFiles[] = $this->baseFilename;
        }
        return $generatedFiles;
    }
}

5.4. Drivers

File: app/Services/Etl/Drivers/Imports/StandardSchemaImportDriver.php
<?php
namespace App\Services\Etl\Drivers\Imports;

use App\Services\Etl\Contracts\ImportDriver;
use App\Services\Etl\SchemaService;
use Carbon\Carbon;
use Illuminate\Support\Collection;
use Illuminate\Support\Str;
use Maatwebsite\Excel\Concerns\ToCollection;
use Maatwebsite\Excel\Concerns\WithHeadingRow;
use Maatwebsite\Excel\Facades\Excel;

class StandardSchemaImportDriver implements ImportDriver, ToCollection, WithHeadingRow
{
    protected Collection $results;
    protected array $schema;

    public function __construct(protected SchemaService $schemaService) { $this->results = new Collection(); }

    public function process(string $filePath, string $schemaName, array $options = []): Collection
    {
        $this->schema = $this->schemaService->get($schemaName);
        Excel::import($this, $filePath);
        return $this->results;
    }

    public function collection(Collection $rows)
    {
        $currentDocument = null;
        $primaryKey = $this->schema['primary_key'];
        $primaryKeyHeader = Str::snake($this->schema['fields'][$primaryKey]['header'] ?? $primaryKey);

        foreach ($rows as $row) {
            $rowArray = $row->toArray();
            if (!empty($rowArray[$primaryKeyHeader])) {
                if ($currentDocument !== null) $this->results->push($currentDocument);
                $currentDocument = $this->buildDocumentFromRow($rowArray);
            }
            if ($currentDocument !== null) {
                $this->addArrayItemsToDocument($currentDocument, $rowArray);
            }
        }
        if ($currentDocument !== null) $this->results->push($currentDocument);
    }

    private function buildDocumentFromRow(array $row): array
    {
        $doc = [];
        foreach ($this->schema['flat_fields'] as $key => $config) {
            $header = Str::snake($config['header'] ?? $key);
            $value = $row[$header] ?? $config['default'] ?? null;
            data_set($doc, $key, $this->castValue($value, $config['type']));
        }
        foreach ($this->schema['array_fields'] as $key => $config) { $doc[$key] = []; }
        return $doc;
    }

    private function addArrayItemsToDocument(array &$document, array $row)
    {
        foreach ($this->schema['array_fields'] as $key => $config) {
            if (isset($config['object']['type'])) {
                $header = Str::snake($config['header'] ?? $key);
                if (isset($row[$header]) && $row[$header] !== '' && !is_null($row[$header])) {
                     $document[$key][] = $this->castValue($row[$header], $config['object']['type']);
                }
            } else {
                $item = []; $hasData = false;
                foreach ($config['object'] as $subKey => $subConfig) {
                    $header = Str::snake($subConfig['header'] ?? "{$key}.{$subKey}");
                    if (isset($row[$header]) && $row[$header] !== '' && !is_null($row[$header])) {
                        $hasData = true;
                        $item[$subKey] = $this->castValue($row[$header], $subConfig['type']);
                    }
                }
                if ($hasData) $document[$key][] = $item;
            }
        }
    }

    private function castValue($value, string $type)
    {
        if (is_null($value)) return null;
        return match ($type) {
            'integer' => (int) $value, 'float' => (float) $value,
            'boolean' => filter_var($value, FILTER_VALIDATE_BOOLEAN),
            'datetime'=> Carbon::parse($value)->toDateTime(),
            default => (string) $value,
        };
    }
}
File: app/Services/Etl/Drivers/Imports/OpenSpoutSchemaImportDriver.php
<?php
namespace App\Services\Etl\Drivers\Imports;

use App\Services\Etl\Contracts\ImportDriver;
use App\Services\Etl\SchemaService;
use Carbon\Carbon;
use Illuminate\Support\Collection;
use OpenSpout\Common\Entity\Row;
use OpenSpout\Reader\XLSX\Reader;

class OpenSpoutSchemaImportDriver implements ImportDriver
{
    protected array $schema;
    public function __construct(protected SchemaService $schemaService) {}

    public function process(string $filePath, string $schemaName, array $options = []): Collection
    {
        $this->schema = $this->schemaService->get($schemaName);
        $reader = new Reader();
        $reader->open($filePath);
        $results = new Collection();
        $currentDocument = null;
        $headers = [];
        $primaryKey = $this->schema['primary_key'];
        $sheet = $reader->getSheetIterator()->current();

        foreach ($sheet->getRowIterator() as $rowIndex => $row) {
            $rowData = array_map(fn($cell) => $cell->getValue(), $row->getCells());
            if ($rowIndex === 1) {
                $headers = array_map('trim', $rowData); continue;
            }
            if (empty(array_filter($rowData))) continue;

            $assocRow = array_combine($headers, array_pad($rowData, count($headers), null));
            $primaryKeyHeader = $this->schema['fields'][$primaryKey]['header'] ?? $primaryKey;

            if (!empty($assocRow[$primaryKeyHeader])) {
                if ($currentDocument !== null) $results->push($currentDocument);
                $currentDocument = $this->buildDocumentFromRow($assocRow);
            }
            if ($currentDocument !== null) {
                $this->addArrayItemsToDocument($currentDocument, $assocRow);
            }
        }
        if ($currentDocument !== null) $results->push($currentDocument);
        $reader->close();
        return $results;
    }

    private function buildDocumentFromRow(array $assocRow): array
    {
        $doc = [];
        foreach ($this->schema['flat_fields'] as $key => $config) {
            $header = $config['header'] ?? $key;
            $value = $assocRow[$header] ?? $config['default'] ?? null;
            data_set($doc, $key, $this->castValue($value, $config['type']));
        }
        foreach ($this->schema['array_fields'] as $key => $config) { $doc[$key] = []; }
        return $doc;
    }

    private function addArrayItemsToDocument(array &$document, array $assocRow)
    {
        foreach ($this->schema['array_fields'] as $key => $config) {
            if (isset($config['object']['type'])) {
                $header = $config['header'] ?? $key;
                if (isset($assocRow[$header]) && $assocRow[$header] !== '' && !is_null($assocRow[$header])) {
                    $document[$key][] = $this->castValue($assocRow[$header], $config['object']['type']);
                }
            } else {
                $item = []; $hasData = false;
                foreach ($config['object'] as $subKey => $subConfig) {
                    $header = $subConfig['header'] ?? "{$key}.{$subKey}";
                    if (isset($assocRow[$header]) && $assocRow[$header] !== '' && !is_null($assocRow[$header])) {
                        $hasData = true;
                        $item[$subKey] = $this->castValue($assocRow[$header], $subConfig['type']);
                    }
                }
                if ($hasData) $document[$key][] = $item;
            }
        }
    }

    private function castValue($value, string $type)
    {
        if (is_null($value)) return null;
        return match ($type) {
            'integer' => (int) $value, 'float' => (float) $value,
            'boolean' => filter_var($value, FILTER_VALIDATE_BOOLEAN),
            'datetime' => $value instanceof \DateTime ? $value : Carbon::parse($value)->toDateTime(),
            default => (string) $value,
        };
    }
}
File: app/Services/Etl/Drivers/Exports/StandardSchemaExportDriver.php
<?php
namespace App\Services\Etl\Drivers\Exports;

use App\Services\Etl\Contracts\ExportDriver;
use App\Services\Etl\SchemaService;
use Illuminate\Support\Collection;
use Maatwebsite\Excel\Concerns\Exportable;
use Maatwebsite\Excel\Concerns\FromCollection;
use Maatwebsite\Excel\Concerns\WithHeadings;
use Maatwebsite\Excel\Concerns\WithMapping;
use Maatwebsite\Excel\Facades\Excel;

class StandardSchemaExportDriver implements ExportDriver, FromCollection, WithHeadings, WithMapping
{
    use Exportable;
    protected Collection $data;
    protected array $schema;
    protected array $flattenedData = [];
    protected ?string $primaryArrayKey = null;

    public function __construct(protected SchemaService $schemaService) {}

    public function generate(Collection $data, string $schemaName, string $filename)
    {
        $this->data = $data;
        $this->schema = $this->schemaService->get($schemaName);
        if (!empty($this->schema['array_fields'])) {
            $this->primaryArrayKey = array_key_first($this->schema['array_fields']);
        }
        $this->flattenData();
        return Excel::download($this, $filename);
    }

    protected function flattenData()
    {
        foreach ($this->data as $document) {
            $arrayItems = $this->primaryArrayKey ? data_get($document, $this->primaryArrayKey, []) : [];
            if (empty($arrayItems)) {
                $this->flattenedData[] = ['document' => $document, 'item' => null, 'is_first' => true];
            } else {
                foreach ($arrayItems as $index => $item) {
                    $this->flattenedData[] = ['document' => $document, 'item' => $item, 'is_first' => ($index === 0)];
                }
            }
        }
    }

    public function collection() { return collect($this->flattenedData); }

    public function headings(): array
    {
        $headings = [];
        foreach ($this->schema['flat_fields'] as $key => $config) {
            $headings[] = $config['header'] ?? $key;
        }
        if ($this->primaryArrayKey) {
            $arrayConfig = $this->schema['array_fields'][$this->primaryArrayKey];
            if (isset($arrayConfig['object']['type'])) {
                $headings[] = $arrayConfig['header'] ?? $this->primaryArrayKey;
            } else {
                 foreach ($arrayConfig['object'] as $subKey => $subConfig) {
                     $headings[] = $subConfig['header'] ?? "{$this->primaryArrayKey}.{$subKey}";
                 }
            }
        }
        return $headings;
    }

    public function map($row): array
    {
        $mappedRow = [];
        foreach ($this->schema['flat_fields'] as $key => $config) {
            $value = $row['is_first'] ? data_get($row['document'], $key) : '';
            if ($value instanceof \DateTime) $value = $value->format('Y-m-d H:i:s');
            $mappedRow[] = $value;
        }
        if ($this->primaryArrayKey) {
            $item = $row['item'];
            $arrayConfig = $this->schema['array_fields'][$this->primaryArrayKey];
            if (is_object($item)) $item = (array) $item;
            if (isset($arrayConfig['object']['type'])) {
                $mappedRow[] = $item ?? '';
            } else {
                foreach ($arrayConfig['object'] as $subKey => $subConfig) {
                    $mappedRow[] = $item ? ($item[$subKey] ?? null) : '';
                }
            }
        }
        return $mappedRow;
    }
}
File: app/Services/Etl/Drivers/Exports/OpenSpoutSchemaExportDriver.php
<?php
namespace App\Services\Etl\Drivers\Exports;

use App\Services\Etl\Contracts\ExportDriver;
use App\Services\Etl\SchemaService;
use Illuminate\Support\Collection;
use Illuminate\Support\Str;
use OpenSpout\Common\Entity\Row;
use OpenSpout\Writer\XLSX\Writer;

class OpenSpoutSchemaExportDriver implements ExportDriver
{
    protected ?string $primaryArrayKey;
    public function __construct(protected SchemaService $schemaService) {}

    public function generate(Collection $data, string $schemaName, string $filename)
    {
        $schema = $this->schemaService->get($schemaName);
        $this->primaryArrayKey = empty($schema['array_fields']) ? null : array_key_first($schema['array_fields']);

        $writer = new Writer();
        if (Str::startsWith($filename, ['/', '\\']) || Str::of($filename)->contains(':')) {
            $writer->openToFile($filename);
        } else {
            $writer->openToBrowser($filename);
        }

        $writer->addRow(Row::fromValues($this->getHeadings($schema)));

        foreach ($data as $document) {
            $arrayItems = $this->primaryArrayKey ? data_get($document, $this->primaryArrayKey, []) : [];
            if (empty($arrayItems)) {
                $writer->addRow(Row::fromValues($this->mapRow($document, null, true, $schema)));
            } else {
                foreach ($arrayItems as $index => $item) {
                    $writer->addRow(Row::fromValues($this->mapRow($document, $item, $index === 0, $schema)));
                }
            }
        }
        $writer->close();
    }

    private function getHeadings(array $schema): array
    {
        $headings = [];
        foreach ($schema['flat_fields'] as $key => $config) {
            $headings[] = $config['header'] ?? $key;
        }
        if ($this->primaryArrayKey) {
            $arrayConfig = $schema['array_fields'][$this->primaryArrayKey];
            if (isset($arrayConfig['object']['type'])) {
                $headings[] = $arrayConfig['header'] ?? $this->primaryArrayKey;
            } else {
                foreach ($arrayConfig['object'] as $subKey => $subConfig) {
                    $headings[] = $subConfig['header'] ?? "{$this->primaryArrayKey}.{$subKey}";
                }
            }
        }
        return $headings;
    }

    private function mapRow($document, $item, bool $isFirst, array $schema): array
    {
        $mappedRow = [];
        foreach ($schema['flat_fields'] as $key => $config) {
            $value = $isFirst ? data_get($document, $key) : '';
            if ($value instanceof \DateTime) $value = $value->format('Y-m-d H:i:s');
            $mappedRow[] = $value;
        }
        if ($this->primaryArrayKey) {
            $arrayConfig = $schema['array_fields'][$this->primaryArrayKey];
            if (is_object($item)) $item = (array) $item;
            if (isset($arrayConfig['object']['type'])) {
                $mappedRow[] = $item ?? '';
            } else {
                foreach ($arrayConfig['object'] as $subKey => $subConfig) {
                    $mappedRow[] = $item ? ($item[$subKey] ?? null) : '';
                }
            }
        }
        return $mappedRow;
    }
}

5.5. Service Integration & Testing

File: app/Services/Etl/Etl.php
<?php
namespace App\Services\Etl;
use Illuminate\Support\Facades\Facade;

/**
 * @method static \App\Services\Etl\EtlManager driver(string $driver)
 * @method static \App\Services\Etl\ExportBuilder export()
 * @see \App\Services\Etl\EtlManager
 */
class Etl extends Facade
{
    protected static function getFacadeAccessor()
    {
        return 'etl';
    }
}
File: app/Providers/EtlServiceProvider.php
<?php
namespace App\Providers;

use App\Services\Etl\EtlManager;
use App\Services\Etl\SchemaService;
use Illuminate\Support\ServiceProvider;

class EtlServiceProvider extends ServiceProvider
{
    public function register(): void
    {
        $this->app->singleton('etl', function ($app) {
            return new EtlManager($app);
        });
        $this->app->singleton(SchemaService::class, function ($app) {
            return new SchemaService();
        });
    }
}
File: config/app.php (Add this to the aliases array)
'aliases' => Facade::defaultAliases()->merge([
    // ... other aliases
    'Etl' => App\Services\Etl\Etl::class,
])->toArray(),
File: app/Console/Commands/Test/EtlTestPipeline.php
<?php
namespace App\Console\Commands\Test;

use Etl;
use Illuminate\Console\Command;
use Illuminate\Database\Eloquent\Model;
use Illuminate\Support\Collection;
use Illuminate\Support\Facades\File;
use Illuminate\Support\Str;
use MongoDB\BSON\UTCDateTime;

class EtlTestPipeline extends Command
{
    protected $signature = 'etl:test-pipeline {schema} {model} {--count=5} {--driver=standard} {--keep-file}';
    protected $description = 'Tests the full ETL export/import pipeline for a given schema and model.';

    public function handle()
    {
        $schemaName = $this->argument('schema');
        $modelClass = $this->qualifyModel($this->argument('model'));
        $count = (int) $this->option('count');
        $driverType = $this->option('driver');

        $this->info("Starting ETL pipeline test for '{$schemaName}' schema using '{$driverType}' driver.");
        if (!class_exists($modelClass)) {
            $this->error("Model class '{$modelClass}' does not exist.");
            return Command::FAILURE;
        }
        $model = new $modelClass();
        $originalData = $model->take($count)->get();
        if ($originalData->isEmpty()) {
            $this->warn("No data found in '{$model->getTable()}'. Test cannot proceed.");
            return Command::SUCCESS;
        }
        $tempFilePath = storage_path('app/' . Str::slug($schemaName) . '_test_' . time() . '.xlsx');

        try {
            $this->comment("Exporting data to: {$tempFilePath}");
            $exportDriverName = ($driverType === 'openspout') ? 'open_spout_schema_export' : 'standard_schema_export';
            Etl::driver($exportDriverName)->generate($originalData, $schemaName, $tempFilePath);
            if (!File::exists($tempFilePath)) { $this->error("Export failed."); return Command::FAILURE; }
            $this->info("Export successful.");

            $this->comment("Importing data from temporary file...");
            $importDriverName = ($driverType === 'openspout') ? 'open_spout_schema_import' : 'standard_schema_import';
            $importedData = Etl::driver($importDriverName)->process($tempFilePath, $schemaName);
            $this->info("Import successful.");

            $this->comment("Verifying data integrity...");
            $this->verifyData($originalData, $importedData);
        } catch (\Exception $e) {
            $this->error("An exception occurred: " . $e->getMessage());
            $this->line($e->getTraceAsString());
            return Command::FAILURE;
        } finally {
            if (!$this->option('keep-file') && File::exists($tempFilePath)) {
                File::delete($tempFilePath);
                $this->comment("Temporary file deleted.");
            }
        }
        return Command::SUCCESS;
    }

    private function verifyData(Collection $originalData, Collection $importedData)
    {
        if ($originalData->count() !== $importedData->count()) {
            $this->error(sprintf("Count mismatch. Original: %d, Imported: %d.", $originalData->count(), $importedData->count()));
            return;
        }
        $errors = 0;
        $bar = $this->output->createProgressBar($originalData->count());
        $bar->start();
        foreach ($originalData as $index => $originalModel) {
            $originalArray = $this->normalizeArray($originalModel->toArray());
            $importedArray = $this->normalizeArray($importedData->get($index));
            if (json_encode($originalArray) !== json_encode($importedArray)) {
                $this->newLine(2);
                $this->warn("Mismatch for record #{$index} (ID: {$originalModel->getKey()})");
                $this->table(['Key', 'Original (JSON)', 'Imported (JSON)'], $this->getDiff($originalArray, $importedArray));
                $errors++;
            }
            $bar->advance();
        }
        $bar->finish();
        $this->newLine(2);
        if ($errors === 0) {
            $this->info("✅ Verification successful! All {$originalData->count()} records match.");
        } else {
            $this->error("❌ Verification failed! Found {$errors} mismatched records.");
        }
    }

    private function qualifyModel(string $model): string { return Str::contains($model, '\\') ? $model : 'App\\Models\\' . Str::studly($model); }

    private function normalizeArray(array $array): array
    {
        ksort($array);
        foreach ($array as $key => &$value) {
            if ($value instanceof UTCDateTime) $value = $value->toDateTime()->format('Y-m-d H:i:s');
            if (is_object($value) && method_exists($value, 'toDateTime')) $value = $value->toDateTime()->format('Y-m-d H:i:s');
            if (is_array($value)) $value = $this->normalizeArray($value);
        }
        return $array;
    }

    private function getDiff(array $original, array $imported): array
    {
        $keys = array_unique(array_merge(array_keys($original), array_keys($imported)));
        sort($keys);
        $diff = [];
        foreach ($keys as $key) {
            $originalValue = json_encode(data_get($original, $key));
            $importedValue = json_encode(data_get($imported, $key));
            if ($originalValue !== $importedValue) {
                $diff[] = ["<fg=red>{$key}</>", $originalValue, $importedValue];
            }
        }
        return $diff;
    }
}

Addendum A: CSV Import Functionality

This section details the necessary additions to support importing data from .csv files.

1. Conceptual Changes

To handle CSV files, a new import driver, OpenSpoutCsvSchemaImportDriver, has been created. This driver uses the same schema-driven logic as the XLSX driver but is specifically configured to read and parse the CSV format using the high-performance OpenSpout library. The CSV file must adhere to the same “parent-child row” convention. A child row is indicated by leaving the parent columns empty, which translates to consecutive commas at the beginning of the line. Example users.csv format:
User ID,Full Name,Email Address,Address Street,"Address City"
"USR-001","John Doe","[email protected]","123 Main St","Anytown"
,,,"456 Oak Ave","Anytown"

2. Directory Structure Updates

A new driver has been added to the Imports directory:
app/Services/Etl/Drivers/
└── Imports/
    ├── OpenSpoutCsvSchemaImportDriver.php   <-- NEW FILE
    ├── OpenSpoutSchemaImportDriver.php
    └── StandardSchemaImportDriver.php

3. New Driver Source Code

File: app/Services/Etl/Drivers/Imports/OpenSpoutCsvSchemaImportDriver.php
<?php
namespace App\Services\Etl\Drivers\Imports;

use App\Services\Etl\Contracts\ImportDriver;
use App\Services\Etl\SchemaService;
use Carbon\Carbon;
use Illuminate\Support\Collection;
use OpenSpout\Reader\CSV\Reader; // Using the CSV Reader

class OpenSpoutCsvSchemaImportDriver implements ImportDriver
{
    protected array $schema;

    public function __construct(protected SchemaService $schemaService) {}

    public function process(string $filePath, string $schemaName, array $options = []): Collection
    {
        $this->schema = $this->schemaService->get($schemaName);

        $reader = new Reader(); // Use the CSV Reader
        $reader->open($filePath);

        $results = new Collection();
        $currentDocument = null;
        $headers = [];
        $primaryKey = $this->schema['primary_key'];
        $sheet = $reader->getSheetIterator()->current();

        foreach ($sheet->getRowIterator() as $rowIndex => $row) {
            $rowData = array_map(fn($cell) => $cell->getValue(), $row->getCells());

            if ($rowIndex === 1) {
                $headers = array_map('trim', $rowData);
                continue;
            }
            if (empty(array_filter($rowData))) continue;

            $assocRow = array_combine($headers, array_pad($rowData, count($headers), null));
            $primaryKeyHeader = $this->schema['fields'][$primaryKey]['header'] ?? $primaryKey;

            if (!empty($assocRow[$primaryKeyHeader])) {
                if ($currentDocument !== null) $results->push($currentDocument);
                $currentDocument = $this->buildDocumentFromRow($assocRow);
            }

            if ($currentDocument !== null) {
                $this->addArrayItemsToDocument($currentDocument, $assocRow);
            }
        }

        if ($currentDocument !== null) $results->push($currentDocument);
        $reader->close();
        return $results;
    }

    private function buildDocumentFromRow(array $assocRow): array
    {
        $doc = [];
        foreach ($this->schema['flat_fields'] as $key => $config) {
            $header = $config['header'] ?? $key;
            $value = $assocRow[$header] ?? $config['default'] ?? null;
            data_set($doc, $key, $this->castValue($value, $config['type']));
        }
        foreach ($this->schema['array_fields'] as $key => $config) { $doc[$key] = []; }
        return $doc;
    }

    private function addArrayItemsToDocument(array &$document, array $assocRow)
    {
        foreach ($this->schema['array_fields'] as $key => $config) {
            if (isset($config['object']['type'])) {
                $header = $config['header'] ?? $key;
                if (isset($assocRow[$header]) && $assocRow[$header] !== '' && !is_null($assocRow[$header])) {
                    $document[$key][] = $this->castValue($assocRow[$header], $config['object']['type']);
                }
            } else {
                $item = []; $hasData = false;
                foreach ($config['object'] as $subKey => $subConfig) {
                    $header = $subConfig['header'] ?? "{$key}.{$subKey}";
                    if (isset($assocRow[$header]) && $assocRow[$header] !== '' && !is_null($assocRow[$header])) {
                        $hasData = true;
                        $item[$subKey] = $this->castValue($assocRow[$header], $subConfig['type']);
                    }
                }
                if ($hasData) $document[$key][] = $item;
            }
        }
    }

    private function castValue($value, string $type)
    {
        if (is_null($value)) return null;
        return match ($type) {
            'integer' => (int) $value,
            'float' => (float) $value,
            'boolean' => filter_var($value, FILTER_VALIDATE_BOOLEAN),
            'datetime' => $value instanceof \DateTime ? $value : Carbon::parse($value)->toDateTime(),
            default => (string) $value,
        };
    }
}

4. Service Manager Update

The EtlManager must be updated to register the new driver. File: app/Services/Etl/EtlManager.php
<?php
namespace App\Services\Etl;

use App\Services\Etl\Drivers\Exports\OpenSpoutSchemaExportDriver;
use App\Services\Etl\Drivers\Exports\StandardSchemaExportDriver;
use App\Services\Etl\Drivers\Imports\OpenSpoutCsvSchemaImportDriver; // <-- ADD THIS
use App\Services\Etl\Drivers\Imports\OpenSpoutSchemaImportDriver;
use App\Services\Etl\Drivers\Imports\StandardSchemaImportDriver;
use Illuminate\Support\Manager;

class EtlManager extends Manager
{
    // ... other methods ...
    public function createOpenSpoutSchemaExportDriver(){ /* ... */ }

    // ADD THIS METHOD
    public function createOpenSpoutCsvSchemaImportDriver(){
        return new OpenSpoutCsvSchemaImportDriver(app(SchemaService::class));
    }

    public function export(): ExportBuilder { /* ... */ }
}

5. Usage Example

To use the new driver, simply call it by its key, open_spout_csv_schema_import.
use Etl;
use Illuminate\Http\Request;

public function importCsv(Request $request)
{
    $request->validate(['file' => 'required|mimes:csv,txt']);
    $filePath = $request->file('file')->getRealPath();

    $dataToInsert = Etl::driver('open_spout_csv_schema_import')
                       ->process($filePath, 'user_schema');

    // ... database logic ...
}