Skip to content

High-performance ETL pipeline library for .NET. Process CSV, JSON, Excel, and SQL data with minimal memory usage through streaming operations.

License

Notifications You must be signed in to change notification settings

Nonanti/PipeFlow

Repository files navigation

PipeFlow

NuGet Build Status License: MIT .NET

A modern, high-performance ETL pipeline library for .NET with builder pattern, async/await support, and Entity Framework integration. Process CSV, JSON, Excel, SQL and MongoDB data with minimal memory usage and maximum performance.

Installation

dotnet add package PipeFlowCore

Quick Start

// Modern builder pattern with lazy execution
var pipeline = PipeFlowBuilder
    .FromCsv("input.csv")
    .Filter(row => row["Status"] == "Active")
    .Build();

// Execute with async/await and CancellationToken
var result = await pipeline.ExecuteAsync(cancellationToken);

// Consistent From/To naming
await PipeFlowBuilder
    .FromSql(connectionString, "SELECT * FROM Orders")
    .ToExcelAsync("orders.xlsx", cancellationToken);

Supported Data Sources

PipeFlow can read from and write to:

  • CSV files
  • JSON files
  • Excel files (xlsx)
  • SQL Server databases
  • PostgreSQL databases (NEW)
  • MongoDB collections
  • REST APIs
  • In-memory collections

Key Features

  • Builder Pattern: Build complex pipelines without immediate execution
  • Async/Await: Full async support with CancellationToken
  • Entity Framework Integration: Direct support for IQueryable and EF Core
  • Lazy Execution: Pipelines execute only when explicitly called
  • Consistent API: From/To pattern for all data sources
  • Memory Efficient: Streaming support for large datasets
  • Parallel Processing: Built-in support for parallel execution
  • Type Safe: Strong typing with generics support
  • Extensible: Easy to add custom data sources and destinations

Examples

CSV Processing with Builder Pattern

// Build pipeline (not executed yet)
var pipeline = PipeFlowBuilder
    .FromCsv("sales.csv")
    .Filter(row => row.GetValue<decimal>("Amount") > 1000)
    .Map(row => new {
        Product = row["ProductName"],
        Revenue = row.GetValue<decimal>("Amount") * row.GetValue<int>("Quantity")
    })
    .Build();

// Execute when ready
var result = await pipeline.ExecuteAsync();
if (result.Success)
{
    await PipeFlowBuilder
        .FromCollection(result.Data)
        .ToCsvAsync("high_value_sales.csv");
}

Database Operations with Async Support

// Export from SQL Server with async
await PipeFlowBuilder
    .FromSql(connectionString, "SELECT * FROM Products WHERE InStock = 1")
    .ToJsonAsync("products.json", cancellationToken);

// Import to SQL Server
await PipeFlowBuilder
    .FromExcel("inventory.xlsx")
    .ToSqlAsync(connectionString, "Inventory", options =>
    {
        options.UseBulkInsert = true;
        options.BatchSize = 1000;
    }, cancellationToken);

PostgreSQL Support

// Read from PostgreSQL
var pipeline = PipeFlowBuilder
    .FromPostgreSql(connectionString, "SELECT * FROM products WHERE price > @minPrice", 
        new { minPrice = 100 })
    .Filter(row => row["in_stock"] == true)
    .Build();

// Write to PostgreSQL with upsert
await PipeFlowBuilder
    .FromCsv("products.csv")
    .ToPostgreSqlAsync(connectionString, "products", options =>
    {
        options.CreateTableIfNotExists = true;
        options.OnConflictUpdate("product_id");
        options.UseBulkInsert = true;
    }, cancellationToken);

Entity Framework Integration

// Read from Entity Framework with paging
await PipeFlowBuilder
    .FromQueryable(context.Customers.Where(c => c.IsActive))
    .WithPaging(pageSize: 500)
    .Map(c => new SupplierDto
    {
        Name = c.CompanyName,
        Email = c.Email
    })
    .ToEntityFrameworkAsync(context, options =>
    {
        options.UpsertPredicate = s => x => x.Email == s.Email;
        options.BatchSize = 100;
        options.UseTransaction = true;
    }, cancellationToken);

MongoDB Integration

// Query MongoDB
PipeFlow.From.MongoDB("mongodb://localhost", "store", "products")
    .Where("category", "Electronics")
    .Sort("price", ascending: false)
    .WriteToCsv("electronics.csv");

// Update MongoDB from CSV
PipeFlow.From.Csv("product_updates.csv")
    .ToMongoDB("mongodb://localhost", "store", "products", writer => writer
        .WithUpsert("sku")
        .WithBatchSize(500));

REST API Support

// Fetch from API
PipeFlow.From.Api("https://api.example.com/data")
    .Filter(item => item["active"] == true)
    .WriteToJson("active_items.json");

// Send to API with new builder pattern
await PipeFlowBuilder
    .FromCsv("upload.csv")
    .ToApiAsync("https://api.example.com/import", options =>
    {
        options.AuthToken = "api-key";
        options.BatchSize = 100;
    }, cancellationToken);

Data Transformations

// Complex transformations
PipeFlow.From.Csv("raw_data.csv")
    .RemoveDuplicates("Id")
    .FillMissing("Email", "unknown@example.com")
    .Map(row => {
        row["Email"] = row["Email"].ToString().ToLower();
        row["FullName"] = $"{row["FirstName"]} {row["LastName"]}";
        return row;
    })
    .GroupBy(row => row["Department"])
    .Select(group => new {
        Department = group.Key,
        EmployeeCount = group.Count(),
        AverageSalary = group.Average(r => r.GetValue<decimal>("Salary"))
    })
    .WriteToJson("department_summary.json");

Parallel Processing

For better performance with large datasets:

var pipeline = PipeFlowBuilder
    .FromCsv("large_file.csv")
    .AsParallel(maxDegreeOfParallelism: 8)
    .Map(row => {
        row["Hash"] = ComputeHash(row["Data"]);
        return row;
    })
    .Build();

var result = await pipeline.ExecuteAsync(cancellationToken);
await PipeFlowBuilder
    .FromCollection(result.Data)
    .ToCsvAsync("processed.csv", cancellationToken);

Data Validation

var validator = new DataValidator()
    .Required("Id", "Email")
    .Email("Email")
    .Range("Age", min: 0, max: 120);

PipeFlow.From.Csv("users.csv")
    .Validate(validator)
    .OnInvalid(ErrorStrategy.LogAndSkip)
    .WriteToCsv("valid_users.csv");

Performance

PipeFlow uses streaming to process data efficiently. Memory usage remains constant regardless of file size.

Benchmark results (1M records):

  • CSV read/write: ~3 seconds
  • JSON processing: ~5 seconds
  • SQL operations: ~4 seconds
  • Parallel processing: ~1.5 seconds (8 cores)

Configuration

PipeFlowConfig.Configure(config => {
    config.DefaultCsvDelimiter = ',';
    config.BufferSize = 8192;
    config.ThrowOnMissingColumns = false;
});

Requirements

  • .NET 6.0 or later
  • SQL Server 2012+ (for SQL features)
  • MongoDB 4.0+ (for MongoDB features)

Benchmarks

PipeFlow is optimized for performance:

Operation Records Time Memory
CSV Read 1M ~3s <50MB
Parallel Processing 1M ~1.5s <100MB
Streaming 10M ~30s <50MB

Contributing

We welcome contributions! Please see CONTRIBUTING.md for details.

Changelog

See CHANGELOG.md for version history and release notes.

License

This project is licensed under the MIT License - see the LICENSE file for details.

Author

Berkant - GitHub

About

High-performance ETL pipeline library for .NET. Process CSV, JSON, Excel, and SQL data with minimal memory usage through streaming operations.

Topics

Resources

License

Contributing

Stars

Watchers

Forks

Packages

No packages published

Contributors 2

  •  
  •  

Languages