Expand speaker notes for Scheduling 500k Jobs slide

Torey Heinz committed Feb 22, 2026
commit 0da77c84f0db2d022ed5245f47ca7c7f2b830d39
Showing 2 changed files with 25 additions and 23 deletions
scratch.md +5 -0
@@ @@ -151,3 +151,8 @@ During my intro I want make sure to give a shout out to my employer Vianet Manag
**3.3 Internal Admin App**
- one of the values here is that instead of building admin tools into the main app, you can keep the main app focused on delivering business value.
+
+
+ Scheduling 500k Jobs
+ The Mailer Worker
+ The Rate Limiter
slides/03-real-world.html +20 -23
@@ @@ -94,35 +94,33 @@
<pre><code class="language-elixir" data-trim>
defmodule Marketing.CampaignSchedulerWorker do
- use Oban.Worker, queue: :default, max_attempts: 1
+ use Oban.Worker, queue: :default
@chunk_size 2_000
def perform(%Oban.Job{args: args}) do
- campaign = Campaigns.get_campaign_with_site(args["campaign_id"])
+ member_ids = Campaigns.member_ids(args["campaign_id"])
Repo.transaction(fn ->
- Repo.stream(campaign_member_ids_query(campaign), max_rows: @chunk_size)
+ Repo.stream(member_ids(campaign), max_rows: @chunk_size)
|> Stream.chunk_every(@chunk_size)
- |> Enum.each(&schedule_batch(&1, campaign, args["live_mode"]))
+ |> Enum.each(&schedule_batch(&1, campaign))
end, timeout: :infinity)
end
- defp schedule_batch(member_ids, campaign, live_mode) do
+ defp schedule_batch(member_ids, campaign) do
Enum.map(member_ids, fn member_id ->
- %{campaign_id: campaign.id, member_id: member_id, live_mode: live_mode}
- |> CampaignMailerWorker.new(scheduled_at: campaign.scheduled_at)
+ %{campaign_id: campaign.id, member_id: member_id}
+ |> CampaignMailerWorker.new()
end)
|> Oban.insert_all()
end
end
</code></pre>
- <p class="fragment" style="font-size: 0.75em; color: #555;">
- <code>Repo.stream</code> + <code>Stream.chunk_every</code> = process 500k records using <strong>constant memory</strong>.
- </p>
-
<aside class="notes">
- You can't load 500k records into memory at once. Repo.stream gives us a lazy stream from the database. Stream.chunk_every breaks it into chunks of 2,000. Each chunk becomes 2,000 individual email jobs inserted into Oban. Constant memory usage regardless of how many records we process.
+ You can't load 500k records into memory at once. Repo.stream gives us a lazy stream from the database — it uses a database cursor to fetch rows lazily. The Repo.transaction wrapper is required because the cursor needs the database connection to stay open for the entire iteration. Without it, the connection goes back to the pool and the cursor is lost.<br><br>
+ Stream.chunk_every breaks the stream into chunks of 2,000. Each chunk becomes 2,000 individual email jobs inserted into Oban. Constant memory usage regardless of how many records we process.<br><br>
+ The ampersand in &schedule_batch is Elixir's capture operator — it's shorthand for an anonymous function. For the JS folks, it's like writing (chunk) => scheduleBatch(chunk, campaign, liveMode). The &1 just means "the first argument."
</aside>
</section>
@@ @@ -135,20 +133,19 @@ def perform(%Oban.Job{args: args} = job) do
campaign_id = Map.get(args, "campaign_id")
member_id = Map.get(args, "member_id")
- with {:ok, member} <- get_member_with_site(member_id),
- {:ok, _} <- validate_email(member.email),
- {:ok, campaign} <- get_campaign_with_site(campaign_id),
- {:ok, campaign_email} <- reserve_campaign_email(campaign, member),
- {:ok} <- ensure_campaign_email_not_sent(campaign_email),
- {:ok, _} <- EmailRateLimiter.ready?() do
- send_campaign_email(campaign, member, live_mode)
+ with(
+ {:ok, member} <- get_member_with_site(member_id),
+ {:ok, _} <- validate_email(member.email),
+ {:ok, campaign} <- get_campaign_with_site(campaign_id),
+ {:ok, campaign_email} <- reserve_campaign_email(campaign, member),
+ {:ok, _} <- EmailRateLimiter.ready?()
+ ) do
+ send_campaign_email(campaign, member)
else
{:not_ready, :exceeded_rate} -> {:snooze, 1}
{:not_ready, :exceeded_daily} -> {:snooze, min(backoff(job), 3_600)}
{:already_sent} -> {:discard, :already_sent}
- {:invalid_email, reason} -> {:discard, {:invalid_email, reason}}
- {:error, %{code: "InvalidParameterValue"} = result} ->
- {:discard, {:invalid_param, result}}
+ {:invalid_email} -> {:discard, :invalid_email}
{:error, result} -> {:error, result}
end
end
@@ @@ -167,7 +164,7 @@ end
defmodule Marketing.EmailRateLimiter do
use GenServer
- defstruct tokens: 0.0, capacity: 50.0, rate: 25.0,
+ defstruct tokens: 0.0, rate: 100.0,
daily_remaining: 750_000, last_refill_ms: nil
# Public API — other code just calls this