We can't find the internet
Attempting to reconnect
Something went wrong!
Hang in there while we get back on track
`Repo.stream/2` processes database results without loading all rows
almirsarajcic
Exporting a million database rows? Repo.all/2 loads everything into memory first. Repo.stream/2 uses database cursors to fetch rows in batches, keeping memory constant regardless of result size.
defmodule Reports do
import Ecto.Query
def export_all_orders(output_path) do
query = from(o in Order, order_by: o.id)
Repo.transaction(fn ->
query
|> Repo.stream(max_rows: 1000)
|> Stream.map(&format_csv_row/1)
|> Stream.into(File.stream!(output_path))
|> Stream.run()
end)
end
defp format_csv_row(order) do
"#{order.id},#{order.total},#{order.status}\n"
end
end
Critical requirement: Repo.stream/2 must run inside a transaction. The database cursor only lives within the transaction boundary.
Memory comparison
# Repo.all - loads 1M rows into memory at once
orders = Repo.all(from o in Order) # 1M orders = ~2GB RAM
Enum.each(orders, &process/1)
# Repo.stream - processes 1000 rows at a time
Repo.transaction(fn ->
from(o in Order)
|> Repo.stream(max_rows: 1000) # Only 1000 rows in memory
|> Enum.each(&process/1)
end)
The :max_rows option controls batch size. Larger batches mean fewer database round trips but more memory usage. Default is 500.
Real-world patterns
CSV export with progress tracking:
def export_users_csv(output_path) do
file = File.open!(output_path, [:write, :utf8])
IO.write(file, "id,email,name,created_at\n")
Repo.transaction(
fn ->
from(u in User, order_by: u.id)
|> Repo.stream(max_rows: 5000)
|> Stream.with_index(1)
|> Stream.each(fn {user, index} ->
if rem(index, 10_000) == 0 do
IO.puts("Processed #{index} users...")
end
row = "#{user.id},#{user.email},#{user.name},#{user.inserted_at}\n"
IO.write(file, row)
end)
|> Stream.run()
end,
timeout: :infinity
)
File.close(file)
end
Batch updates without loading all records:
def mark_old_orders_archived(cutoff_date) do
query = from(o in Order, where: o.inserted_at < ^cutoff_date)
Repo.transaction(
fn ->
query
|> Repo.stream()
|> Stream.chunk_every(100)
|> Stream.each(fn orders ->
ids = Enum.map(orders, & &1.id)
from(o in Order, where: o.id in ^ids)
|> Repo.update_all(set: [status: :archived])
end)
|> Stream.run()
end,
timeout: :infinity
)
end
Data migration between tables:
def migrate_legacy_records do
Repo.transaction(
fn ->
from(l in LegacyRecord, where: is_nil(l.migrated_at))
|> Repo.stream(max_rows: 500)
|> Stream.map(&transform_record/1)
|> Stream.chunk_every(100)
|> Stream.each(fn records ->
Repo.insert_all(NewRecord, records)
end)
|> Stream.run()
end,
timeout: :infinity
)
end
Transaction timeout
Long-running streams need extended timeouts:
# Default timeout is 15 seconds - will fail for large exports
Repo.transaction(fn ->
Repo.stream(query) |> Enum.each(&process/1)
end)
# Set timeout for long operations
Repo.transaction(
fn ->
Repo.stream(query) |> Enum.each(&process/1)
end,
timeout: :infinity
)
When to use Repo.stream/2
- Data exports (CSV, JSON, reports)
- Batch processing large datasets
- Data migrations between tables
- Cleanup jobs on historical data
- Any query returning 10K+ rows
Don’t use for:
- Normal application queries (use
Repo.all/2) - Queries with small result sets
- Real-time features (cursors hold database connections)
Repo.stream/2 is essential for any batch operation that would otherwise exhaust memory with Repo.all/2.
copied to clipboard