Reorder email marketing slides to follow data flow

Torey Heinz committed Feb 22, 2026
commit f2ab132148475d45725746f1489b50008d8b0cd6
Showing 1 changed file with 71 additions and 71 deletions
slides/03-real-world.html +71 -71
@@ @@ -88,6 +88,77 @@
</aside>
</section>
+ <section>
+ <h2>Scheduling 500k Jobs</h2>
+ <p class="muted small">Stream records in chunks &mdash; constant memory usage</p>
+
+ <pre><code class="language-elixir" data-trim>
+ defmodule Marketing.CampaignSchedulerWorker do
+ use Oban.Worker, queue: :default, max_attempts: 1
+ @chunk_size 2_000
+
+ def perform(%Oban.Job{args: args}) do
+ campaign = Campaigns.get_campaign_with_site(args["campaign_id"])
+
+ Repo.transaction(fn ->
+ Repo.stream(campaign_member_ids_query(campaign), max_rows: @chunk_size)
+ |> Stream.chunk_every(@chunk_size)
+ |> Enum.each(&schedule_batch(&1, campaign, args["live_mode"]))
+ end, timeout: :infinity)
+ end
+
+ defp schedule_batch(member_ids, campaign, live_mode) do
+ Enum.map(member_ids, fn member_id ->
+ %{campaign_id: campaign.id, member_id: member_id, live_mode: live_mode}
+ |> CampaignMailerWorker.new(scheduled_at: campaign.scheduled_at)
+ end)
+ |> Oban.insert_all()
+ end
+ end
+ </code></pre>
+
+ <p class="fragment" style="font-size: 0.75em; color: #555;">
+ <code>Repo.stream</code> + <code>Stream.chunk_every</code> = process 500k records using <strong>constant memory</strong>.
+ </p>
+
+ <aside class="notes">
+ You can't load 500k records into memory at once. Repo.stream gives us a lazy stream from the database. Stream.chunk_every breaks it into chunks of 2,000. Each chunk becomes 2,000 individual email jobs inserted into Oban. Constant memory usage regardless of how many records we process.
+ </aside>
+ </section>
+
+ <section>
+ <h2>The Mailer Worker</h2>
+ <p class="muted small">Pattern matching handles every possible failure</p>
+
+ <pre class="small-code"><code class="language-elixir" data-trim>
+ def perform(%Oban.Job{args: args} = job) do
+ campaign_id = Map.get(args, "campaign_id")
+ member_id = Map.get(args, "member_id")
+
+ with {:ok, member} <- get_member_with_site(member_id),
+ {:ok, _} <- validate_email(member.email),
+ {:ok, campaign} <- get_campaign_with_site(campaign_id),
+ {:ok, campaign_email} <- reserve_campaign_email(campaign, member),
+ {:ok} <- ensure_campaign_email_not_sent(campaign_email),
+ {:ok, _} <- EmailRateLimiter.ready?() do
+ send_campaign_email(campaign, member, live_mode)
+ else
+ {:not_ready, :exceeded_rate} -> {:snooze, 1}
+ {:not_ready, :exceeded_daily} -> {:snooze, min(backoff(job), 3_600)}
+ {:already_sent} -> {:discard, :already_sent}
+ {:invalid_email, reason} -> {:discard, {:invalid_email, reason}}
+ {:error, %{code: "InvalidParameterValue"} = result} ->
+ {:discard, {:invalid_param, result}}
+ {:error, result} -> {:error, result}
+ end
+ end
+ </code></pre>
+
+ <aside class="notes">
+ Each email goes through a pipeline with the `with` chain. Get the member, validate the email, get the campaign, reserve the record, check it hasn't been sent, check the rate limiter. If everything passes, send. The else block reads like a specification: rate exceeded? Snooze 1 second. Daily limit? Backoff up to an hour. Already sent? Discard. Every outcome is handled. No silent failures.
+ </aside>
+ </section>
+
<section>
<h2>The Rate Limiter</h2>
<p class="muted small">A GenServer using a token bucket algorithm</p>
@@ @@ -165,77 +236,6 @@ end
</aside>
</section>
- <section>
- <h2>The Mailer Worker</h2>
- <p class="muted small">Pattern matching handles every possible failure</p>
-
- <pre class="small-code"><code class="language-elixir" data-trim>
- def perform(%Oban.Job{args: args} = job) do
- campaign_id = Map.get(args, "campaign_id")
- member_id = Map.get(args, "member_id")
-
- with {:ok, member} <- get_member_with_site(member_id),
- {:ok, _} <- validate_email(member.email),
- {:ok, campaign} <- get_campaign_with_site(campaign_id),
- {:ok, campaign_email} <- reserve_campaign_email(campaign, member),
- {:ok} <- ensure_campaign_email_not_sent(campaign_email),
- {:ok, _} <- EmailRateLimiter.ready?() do
- send_campaign_email(campaign, member, live_mode)
- else
- {:not_ready, :exceeded_rate} -> {:snooze, 1}
- {:not_ready, :exceeded_daily} -> {:snooze, min(backoff(job), 3_600)}
- {:already_sent} -> {:discard, :already_sent}
- {:invalid_email, reason} -> {:discard, {:invalid_email, reason}}
- {:error, %{code: "InvalidParameterValue"} = result} ->
- {:discard, {:invalid_param, result}}
- {:error, result} -> {:error, result}
- end
- end
- </code></pre>
-
- <aside class="notes">
- Each email goes through a pipeline with the `with` chain. Get the member, validate the email, get the campaign, reserve the record, check it hasn't been sent, check the rate limiter. If everything passes, send. The else block reads like a specification: rate exceeded? Snooze 1 second. Daily limit? Backoff up to an hour. Already sent? Discard. Every outcome is handled. No silent failures.
- </aside>
- </section>
-
- <section>
- <h2>Scheduling 500k Jobs</h2>
- <p class="muted small">Stream records in chunks &mdash; constant memory usage</p>
-
- <pre><code class="language-elixir" data-trim>
- defmodule Marketing.CampaignSchedulerWorker do
- use Oban.Worker, queue: :default, max_attempts: 1
- @chunk_size 2_000
-
- def perform(%Oban.Job{args: args}) do
- campaign = Campaigns.get_campaign_with_site(args["campaign_id"])
-
- Repo.transaction(fn ->
- Repo.stream(campaign_member_ids_query(campaign), max_rows: @chunk_size)
- |> Stream.chunk_every(@chunk_size)
- |> Enum.each(&schedule_batch(&1, campaign, args["live_mode"]))
- end, timeout: :infinity)
- end
-
- defp schedule_batch(member_ids, campaign, live_mode) do
- Enum.map(member_ids, fn member_id ->
- %{campaign_id: campaign.id, member_id: member_id, live_mode: live_mode}
- |> CampaignMailerWorker.new(scheduled_at: campaign.scheduled_at)
- end)
- |> Oban.insert_all()
- end
- end
- </code></pre>
-
- <p class="fragment" style="font-size: 0.75em; color: #555;">
- <code>Repo.stream</code> + <code>Stream.chunk_every</code> = process 500k records using <strong>constant memory</strong>.
- </p>
-
- <aside class="notes">
- You can't load 500k records into memory at once. Repo.stream gives us a lazy stream from the database. Stream.chunk_every breaks it into chunks of 2,000. Each chunk becomes 2,000 individual email jobs inserted into Oban. Constant memory usage regardless of how many records we process.
- </aside>
- </section>
-
<!-- =============================================
SECTION 3.2: Domain Registry
============================================= -->