`Repo.stream/2` processes database results without loading all rows

almirsarajcic

almirsarajcic

2 hours ago

0 comments

Exporting a million database rows? Repo.all/2 loads everything into memory first. Repo.stream/2 uses database cursors to fetch rows in batches, keeping memory constant regardless of result size.

defmodule Reports do
  import Ecto.Query

  def export_all_orders(output_path) do
    query = from(o in Order, order_by: o.id)

    Repo.transaction(fn ->
      query
      |> Repo.stream(max_rows: 1000)
      |> Stream.map(&format_csv_row/1)
      |> Stream.into(File.stream!(output_path))
      |> Stream.run()
    end)
  end

  defp format_csv_row(order) do
    "#{order.id},#{order.total},#{order.status}\n"
  end
end

Critical requirement: Repo.stream/2 must run inside a transaction. The database cursor only lives within the transaction boundary.

Memory comparison

# Repo.all - loads 1M rows into memory at once
orders = Repo.all(from o in Order)  # 1M orders = ~2GB RAM
Enum.each(orders, &process/1)

# Repo.stream - processes 1000 rows at a time
Repo.transaction(fn ->
  from(o in Order)
  |> Repo.stream(max_rows: 1000)    # Only 1000 rows in memory
  |> Enum.each(&process/1)
end)

The :max_rows option controls batch size. Larger batches mean fewer database round trips but more memory usage. Default is 500.

Real-world patterns

CSV export with progress tracking:

def export_users_csv(output_path) do
  file = File.open!(output_path, [:write, :utf8])
  IO.write(file, "id,email,name,created_at\n")

  Repo.transaction(
    fn ->
      from(u in User, order_by: u.id)
      |> Repo.stream(max_rows: 5000)
      |> Stream.with_index(1)
      |> Stream.each(fn {user, index} ->
        if rem(index, 10_000) == 0 do
          IO.puts("Processed #{index} users...")
        end

        row = "#{user.id},#{user.email},#{user.name},#{user.inserted_at}\n"
        IO.write(file, row)
      end)
      |> Stream.run()
    end,
    timeout: :infinity
  )

  File.close(file)
end

Batch updates without loading all records:

def mark_old_orders_archived(cutoff_date) do
  query = from(o in Order, where: o.inserted_at < ^cutoff_date)

  Repo.transaction(
    fn ->
      query
      |> Repo.stream()
      |> Stream.chunk_every(100)
      |> Stream.each(fn orders ->
        ids = Enum.map(orders, & &1.id)

        from(o in Order, where: o.id in ^ids)
        |> Repo.update_all(set: [status: :archived])
      end)
      |> Stream.run()
    end,
    timeout: :infinity
  )
end

Data migration between tables:

def migrate_legacy_records do
  Repo.transaction(
    fn ->
      from(l in LegacyRecord, where: is_nil(l.migrated_at))
      |> Repo.stream(max_rows: 500)
      |> Stream.map(&transform_record/1)
      |> Stream.chunk_every(100)
      |> Stream.each(fn records ->
        Repo.insert_all(NewRecord, records)
      end)
      |> Stream.run()
    end,
    timeout: :infinity
  )
end

Transaction timeout

Long-running streams need extended timeouts:

# Default timeout is 15 seconds - will fail for large exports
Repo.transaction(fn ->
  Repo.stream(query) |> Enum.each(&process/1)
end)

# Set timeout for long operations
Repo.transaction(
  fn ->
    Repo.stream(query) |> Enum.each(&process/1)
  end,
  timeout: :infinity
)

When to use Repo.stream/2

  • Data exports (CSV, JSON, reports)
  • Batch processing large datasets
  • Data migrations between tables
  • Cleanup jobs on historical data
  • Any query returning 10K+ rows

Don’t use for:

  • Normal application queries (use Repo.all/2)
  • Queries with small result sets
  • Real-time features (cursors hold database connections)

Repo.stream/2 is essential for any batch operation that would otherwise exhaust memory with Repo.all/2.

Comments (0)

Sign in with GitHub to join the discussion