Features
Oban’s primary goals are reliability, consistency and observability.
Oban is a powerful and flexible library that can handle a wide range of background job use cases, and it is well-suited for systems of any size. It provides a simple and consistent API for scheduling and performing jobs, and it is built to be fault-tolerant and easy to monitor.
Oban is fundamentally different from other background job processing tools because it retains job data for historic metrics and inspection. You can leave your application running indefinitely without worrying about jobs being lost or orphaned due to crashes.
Advantages Over Other Tools
Fewer Dependencies — If you are running a web app there is a very good chance that you’re running on top of a RDBMS. Running your job queue within a SQL database minimizes system dependencies and simplifies data backups.
Transactional Control — Enqueue a job along with other database changes, ensuring that everything is committed or rolled back atomically.
Database Backups — Jobs are stored inside of your primary database, which means they are backed up together with the data that they relate to.
Advanced Features
Isolated Queues — Jobs are stored in a single table but are executed in distinct queues. Each queue runs in isolation, ensuring that a job in a single slow queue can’t back up other faster queues.
Queue Control — Queues can be started, stopped, paused, resumed and scaled independently at runtime locally or across all running nodes (even in environments like Heroku, without distributed Erlang).
Resilient Queues — Failing queries won’t crash the entire supervision tree, instead a backoff mechanism will safely retry them again in the future.
Job Canceling — Jobs can be canceled in the middle of execution regardless of which node they are running on. This stops the job at once and flags it as
cancelled
.Triggered Execution — Insert triggers ensure that jobs are dispatched on all connected nodes as soon as they are inserted into the database.
Unique Jobs — Duplicate work can be avoided through unique job controls. Uniqueness can be enforced at the argument, queue, worker and even sub-argument level for any period of time.
Scheduled Jobs — Jobs can be scheduled at any time in the future, down to the second.
Periodic (CRON) Jobs — Automatically enqueue jobs on a cron-like schedule. Duplicate jobs are never enqueued, no matter how many nodes you’re running.
Job Priority — Prioritize jobs within a queue to run ahead of others with ten levels of granularity.
Historic Metrics — After a job is processed the row isn’t deleted. Instead, the job is retained in the database to provide metrics. This allows users to inspect historic jobs and to see aggregate data at the job, queue or argument level.
Node Metrics — Every queue records metrics to the database during runtime. These are used to monitor queue health across nodes and may be used for analytics.
Graceful Shutdown — Queue shutdown is delayed so that slow jobs can finish executing before shutdown. When shutdown starts queues are paused and stop executing new jobs. Any jobs left running after the shutdown grace period may be rescued later.
Telemetry Integration — Job life-cycle events are emitted via Telemetry integration. This enables simple logging, error reporting and health checkups without plug-ins.
Requirements
Oban requires Elixir 1.13+, Erlang 23+, and PostgreSQL 12.0+ or SQLite3 3.37.0+.
Configuring Queues
Queues are specified as a keyword list where the key is the name of the queue and the value is the maximum number of concurrent jobs. The following configuration would start four queues with concurrency ranging from 5 to 50:
config :my_app, Oban,
queues: [default: 10, mailers: 20, events: 50, media: 5],
repo: MyApp.Repo
You may also use an expanded form to configure queues with individual overrides:
queues: [
default: 10,
events: [limit: 50, paused: true]
]
The events
queue will now start in a paused state, which means it won’t
process anything until Oban.resume_queue/2
is called to start it.
There isn’t a limit to the number of queues or how many jobs may execute concurrently in each queue. Some additional guidelines:
Caveats & Guidelines
Each queue will run as many jobs as possible concurrently, up to the configured limit. Make sure your system has enough resources (i.e. database connections) to handle the concurrent load.
Queue limits are local (per-node), not global (per-cluster). For example, running a queue with a local limit of one on three separate nodes is effectively a global limit of three. If you require a global limit you must restrict the number of nodes running a particular queue.
Only jobs in the configured queues will execute. Jobs in any other queue will stay in the database untouched.
Be careful how many concurrent jobs make expensive system calls (i.e. FFMpeg, ImageMagick). The BEAM ensures that the system stays responsive under load, but those guarantees don’t apply when using ports or shelling out commands.
Defining Workers
Worker modules do the work of processing a job. At a minimum they must define a
perform/1
function, which is called with an %Oban.Job{}
struct.
Note that the args
field of the job struct will always have string keys, regardless of the key
type when the job was enqueued. The args
are stored as JSON and the serialization process
automatically stringifies all keys. Also, because args
are always encoded as JSON, you must
ensure that all values are serializable, otherwise you’ll have encoding errors when inserting jobs.
Define a worker to process jobs in the events
queue:
defmodule MyApp.Business do
use Oban.Worker, queue: :events
@impl Oban.Worker
def perform(%Oban.Job{args: %{"id" => id} = args}) do
model = MyApp.Repo.get(MyApp.Business.Man, id)
case args do
%{"in_the" => "business"} ->
IO.inspect(model)
%{"vote_for" => vote} ->
IO.inspect([vote, model])
_ ->
IO.inspect(model)
end
:ok
end
end
The use
macro also accepts options to customize max_attempts
, priority
, tags
, unique
,
and replace
options:
defmodule MyApp.LazyBusiness do
use Oban.Worker,
queue: :events,
priority: 3,
max_attempts: 3,
tags: ["business"],
unique: true,
replace: [scheduled: [:scheduled_at]]
@impl Oban.Worker
def perform(_job) do
# do business slowly
:ok
end
end
Like all use
macros, options are defined at compile time. Avoid using Application.get_env/2
to
define worker options. Instead, pass dynamic options at runtime by passing them to
MyWorker.new/2
:
MyApp.MyWorker.new(args, queue: dynamic_queue)
Successful jobs should return :ok
or an {:ok, value}
tuple. The value returned from
perform/1
is used to control whether the job is treated as a success, a failure, cancelled or
deferred for retrying later.
See the Oban.Worker
docs for more details on failure conditions and Oban.Telemetry
for details
on job reporting.
Enqueueing Jobs
Jobs are simply Ecto structs and are enqueued by inserting them into the
database. For convenience and consistency all workers provide a new/2
function that converts an args map into a job changeset suitable for insertion:
%{id: 1, in_the: "business", of_doing: "business"}
|> MyApp.Business.new()
|> Oban.insert()
The worker’s defaults may be overridden by passing options:
%{id: 1, vote_for: "none of the above"}
|> MyApp.Business.new(queue: :special, max_attempts: 5)
|> Oban.insert()
Unique jobs can be configured in the worker, or when the job is built:
%{email: "brewster@example.com"}
|> MyApp.Mailer.new(unique: false)
|> Oban.insert()
Job priority can be specified using an integer from 0 to 9, with 0 being the default and highest priority:
%{id: 1}
|> MyApp.Backfiller.new(priority: 2)
|> Oban.insert()
Any number of tags can be added to a job dynamically, at the time it is inserted:
id = 1
%{id: id}
|> MyApp.OnboardMailer.new(tags: ["mailer", "record-#{id}"])
|> Oban.insert()
Multiple jobs can be inserted in a single transaction:
Ecto.Multi.new()
|> Oban.insert(:b_job, MyApp.Business.new(%{id: 1}))
|> Oban.insert(:m_job, MyApp.Mailer.new(%{email: "brewser@example.com"}))
|> Repo.transaction()
Occasionally you may need to insert a job for a worker that exists in another
application. In that case you can use Oban.Job.new/2
to build the changeset
manually:
%{id: 1, user_id: 2}
|> Oban.Job.new(queue: :default, worker: OtherApp.Worker)
|> Oban.insert()
Oban.insert/2,4
is the preferred way of inserting jobs as it provides some of
Oban’s advanced features (i.e., unique jobs). However, you can use your
application’s Repo.insert/2
function if necessary.
See Oban.Job.new/2
for a full list of job options.
Scheduling Jobs
Jobs may be scheduled down to the second any time in the future:
%{id: 1}
|> MyApp.Business.new(schedule_in: 5)
|> Oban.insert()
Jobs may also be scheduled at a specific datetime in the future:
%{id: 1}
|> MyApp.Business.new(scheduled_at: ~U[2020-12-25 19:00:56.0Z])
|> Oban.insert()
Scheduling is always in UTC. You’ll have to shift timestamps in other zones to UTC before scheduling:
%{id: 1}
|> MyApp.Business.new(scheduled_at: DateTime.shift_zone!(datetime, "Etc/UTC"))
|> Oban.insert()
Caveats & Guidelines
Usually, scheduled job management operates in global
mode and notifies queues
of available jobs via PubSub to minimize database load. However, when PubSub
isn’t available, staging switches to a local
mode where each queue polls
independently.
Local mode is less efficient and will only happen if you’re running in an
environment where neither Postgres
nor PG
notifications work. That situation
should be rare and limited to the following conditions:
- Running with a connection pooler, i.e.,
pg_bouncer
, in transaction mode. - Running without clustering, i.e., without Distributed Erlang
If both of those criteria apply and PubSub notifications won’t work, then
staging will switch to polling in local
mode.
Prioritizing Jobs
Normally, all available jobs within a queue are executed in the order they were scheduled. You can
override the normal behavior and prioritize or de-prioritize a job by assigning a numerical
priority
.
Priorities from 0-9 are allowed, where 0 is the highest priority and 9 is the lowest.
The default priority is 0, unless specified all jobs have an equally high priority.
All jobs with a higher priority will execute before any jobs with a lower priority. Within a particular priority jobs are executed in their scheduled order.
Caveats & Guidelines
The default priority is defined in the jobs table. The least intrusive way to change it for all jobs is to change the column default:
alter table("oban_jobs") do
modify :priority, :integer, default: 1, from: {:integer, default: 0}
end
Unique Jobs
The unique jobs feature lets you specify constraints to prevent enqueueing duplicate jobs. Uniqueness is based on a combination of job attribute based on the following options:
:period
— The number of seconds until a job is no longer considered duplicate. You should always specify a period, otherwise Oban will default to 60 seconds.:infinity
can be used to indicate the job be considered a duplicate as long as jobs are retained.:fields
— The fields to compare when evaluating uniqueness. The available fields are:args
,:queue
,:worker
, and:meta
. By default, fields is set to[:worker, :queue, :args]
.:keys
— A specific subset of the:args
or:meta
to consider when comparing against historic jobs. This allows a job with multiple key/value pairs in the args to be compared using only a subset of them.:states
— The job states that are checked for duplicates. The available states are:available
,:scheduled
,:executing
,:retryable
,:completed
,:cancelled
and:discarded
. By default all states except for:discarded
and:cancelled
are checked, which prevents duplicates even if the previous job has been completed.:timestamp
— Which timestamp to check the period against. The available timestamps are:inserted_at
or:scheduled_at
, and it defaults to:inserted_at
for legacy reasons.
The simplest form of uniqueness will configure uniqueness for as long as a matching job exists in the database, regardless of state:
use Oban.Worker, unique: true
Configure the worker to be unique only for 60 seconds:
use Oban.Worker, unique: [period: 60]
Check the :scheduled_at
timestamp instead of :inserted_at
for uniqueness:
use Oban.Worker, unique: [period: 120, timestamp: :scheduled_at]
Only consider the :url
key rather than the entire args
:
use Oban.Worker, unique: [keys: [:url]]
Use Oban.Job.states/0
to specify uniqueness across all states, including cancelled
and
discarded
:
use Oban.Worker, unique: [period: :infinity, states: Oban.Job.states()]
Detecting Unique Conflicts
When unique settings match an existing job, the return value of Oban.insert/2
is still {:ok,
job}
. However, you can detect a unique conflict by checking the jobs’ :conflict?
field. If
there was an existing job, the field is true
; otherwise it is false
.
You can use the :conflict?
field to customize responses after insert:
case Oban.insert(changeset) do
{:ok, %Job{id: nil, conflict?: true}} ->
{:error, :failed_to_acquire_lock}
{:ok, %Job{conflict?: true}} ->
{:error, :job_already_exists}
result ->
result
end
Note that conflicts are only detected for jobs enqueued through Oban.insert/2,3
. Jobs enqueued
through Oban.insert_all/2
do not use per-job unique configuration.
Replacing Values
In addition to detecting unique conflicts, passing options to replace
can update any job field
when there is a conflict. Any of the following fields can be replaced per state: args
,
max_attempts
, meta
, priority
, queue
, scheduled_at
, tags
, worker
.
For example, to change the priority
and increase max_attempts
when there is a conflict with a
job in a scheduled
state:
BusinessWorker.new(
args,
max_attempts: 5,
priority: 0,
replace: [scheduled: [:max_attempts, :priority]]
)
Another example is bumping the scheduled time on conflict. Either scheduled_at
or schedule_in
values will work, but the replace option is always scheduled_at
.
UrgentWorker.new(args, schedule_in: 1, replace: [scheduled: [:scheduled_at]])
NOTE: If you use this feature to replace a field (e.g. args
) in the
executing
state by doing something like: UniqueWorker.new(new_args, replace:
[executing: [:args]])
Oban will update the args
, but the job will continue
executing with the original value.
Strong Guarantees
Unique jobs are guaranteed through transactional locks and database queries: they do not rely on unique constraints in the database. This makes uniqueness entirely configurable by application code, without the need for database migrations.
Testing
Find testing setup, helpers, and strategies in the testing guide.
Pruning Historic Jobs
Job stats and queue introspection are built on keeping job rows in the database
after they have completed. This allows administrators to review completed jobs
and build informative aggregates, at the expense of storage and an unbounded
table size. To prevent the oban_jobs
table from growing indefinitely, Oban
provides active pruning of completed
, cancelled
and discarded
jobs.
By default, the Pruner
plugin retains jobs for 60 seconds. You can configure a
longer retention period by providing a max_age
in seconds to the Pruner
plugin.
# Set the max_age for 5 minutes
config :my_app, Oban,
plugins: [{Oban.Plugins.Pruner, max_age: 300}]
...
Caveats & Guidelines
Pruning is best-effort and performed out-of-band. This means that all limits are soft; jobs beyond a specified age may not be pruned immediately after jobs complete.
Pruning is only applied to jobs that are
completed
,cancelled
ordiscarded
. It’ll never delete a new job, a scheduled job or a job that failed and will be retried.
Periodic Jobs
Oban’s Cron
plugin registers workers a cron-like schedule and enqueues jobs
automatically. Periodic jobs are declared as a list of {cron, worker}
or
{cron, worker, options}
tuples:
config :my_app, Oban,
repo: MyApp.Repo,
plugins: [
{Oban.Plugins.Cron,
crontab: [
{"* * * * *", MyApp.MinuteWorker},
{"0 * * * *", MyApp.HourlyWorker, args: %{custom: "arg"}},
{"0 0 * * *", MyApp.DailyWorker, max_attempts: 1},
{"0 12 * * MON", MyApp.MondayWorker, queue: :scheduled, tags: ["mondays"]},
{"@daily", MyApp.AnotherDailyWorker}
]}
]
The crontab would insert jobs as follows:
MyApp.MinuteWorker
— Inserted once every minuteMyApp.HourlyWorker
— Inserted at the first minute of every hour with custom argsMyApp.DailyWorker
— Inserted at midnight every day with no retriesMyApp.MondayWorker
— Inserted at noon every Monday in the “scheduled” queueMyApp.AnotherDailyWorker
— Inserted at midnight every day with no retries
The crontab format respects all standard rules and has one minute resolution. Jobs are considered unique for most of each minute, which prevents duplicate jobs with multiple nodes and across node restarts.
Like other jobs, recurring jobs will use the :queue
specified by the worker
module (or :default
if one is not specified).
Caveats & Guidelines
All schedules are evaluated as UTC unless a different timezone is provided. See
Oban.Plugins.Cron
for information about configuring a timezone.Workers can be used for regular and scheduled jobs so long as they accept different arguments.
Duplicate jobs are prevented through transactional locks and unique constraints. Workers that are used for regular and scheduled jobs must not specify
unique
options less than60s
.Long running jobs may execute simultaneously if the scheduling interval is shorter than it takes to execute the job. You can prevent overlap by passing custom
unique
opts in the crontab config:
custom_args = %{scheduled: true}
unique_opts = [
period: 60 * 60 * 24,
states: [:available, :scheduled, :executing]
]
config :my_app, Oban,
repo: MyApp.Repo,
plugins: [
{Oban.Plugins.Cron,
crontab: [
{"* * * * *", MyApp.SlowWorker, args: custom_args, unique: unique_opts}
]}
]
Error Handling
When a job returns an error value, raises an error, or exits during execution the
details are recorded within the errors
array on the job. When the number of
execution attempts is below the configured max_attempts
limit, the job will
automatically be retried in the future.
The retry delay has an exponential backoff, meaning the job’s second attempt will be after 16s, third after 31s, fourth after 1m 36s, etc.
See the Oban.Worker
documentation on “Customizing Backoff” for alternative
backoff strategies.
Error Details
Execution errors are stored as a formatted exception along with metadata about when the failure occurred and which attempt caused it. Each error is stored with the following keys:
at
The UTC timestamp when the error occurred atattempt
The attempt number when the error occurrederror
A formatted error message and stacktrace
See the Instrumentation docs for an example of integrating with external error reporting systems.
Limiting Retries
By default, jobs are retried up to 20 times. The number of retries is controlled
by the max_attempts
value, which can be set at the Worker or Job level. For
example, to instruct a worker to discard jobs after three failures:
use Oban.Worker, queue: :limited, max_attempts: 3
Limiting Execution Time
By default, individual jobs may execute indefinitely. If this is undesirable you
may define a timeout in milliseconds with the timeout/1
callback on your
worker module.
For example, to limit a worker’s execution time to 30 seconds:
def MyApp.Worker do
use Oban.Worker
@impl Oban.Worker
def perform(_job) do
something_that_may_take_a_long_time()
:ok
end
@impl Oban.Worker
def timeout(_job), do: :timer.seconds(30)
end
The timeout/1
function accepts an Oban.Job
struct, so you can customize the
timeout using any job attributes.
Define the timeout
value through job args:
def timeout(%_{args: %{"timeout" => timeout}}), do: timeout
Define the timeout
based on the number of attempts:
def timeout(%_{attempt: attempt}), do: attempt * :timer.seconds(5)
Instrumentation, Error Reporting, and Logging
Oban provides integration with Telemetry, a dispatching library for
metrics. It is easy to report Oban metrics to any backend by attaching to
:oban
events.
Here is an example of a sample unstructured log handler:
defmodule MyApp.ObanLogger do
require Logger
def handle_event([:oban, :job, :start], measure, meta, _) do
Logger.warning("[Oban] :started #{meta.worker} at #{measure.system_time}")
end
def handle_event([:oban, :job, event], measure, meta, _) do
Logger.warning("[Oban] #{event} #{meta.worker} ran in #{measure.duration}")
end
end
Attach the handler to success and failure events in application.ex
:
events = [[:oban, :job, :start], [:oban, :job, :stop], [:oban, :job, :exception]]
:telemetry.attach_many("oban-logger", events, &MyApp.ObanLogger.handle_event/4, [])
The Oban.Telemetry
module provides a robust structured logger that handles all
of Oban’s telemetry events. As in the example above, attach it within your
application.ex
module:
:ok = Oban.Telemetry.attach_default_logger()
For more details on the default structured logger and information on event
metadata see docs for the Oban.Telemetry
module.
Instance and Database Isolation
You can run multiple Oban instances with different prefixes on the same system and have them entirely isolated, provided you give each supervisor a distinct id.
Here we configure our application to start three Oban supervisors using the “public”, “special” and “private” prefixes, respectively:
def start(_type, _args) do
children = [
Repo,
Endpoint,
{Oban, name: ObanA, repo: Repo},
{Oban, name: ObanB, repo: Repo, prefix: "special"},
{Oban, name: ObanC, repo: Repo, prefix: "private"}
]
Supervisor.start_link(children, strategy: :one_for_one, name: MyApp.Supervisor)
end
Umbrella Apps
If you need to run Oban from an umbrella application where more than one of
the child apps need to interact with Oban, you may need to set the :name
for
each child application that configures Oban.
For example, your umbrella contains two apps: MyAppA
and MyAppB
. MyAppA
is
responsible for inserting jobs, while only MyAppB
actually runs any queues.
Configure Oban with a custom name for MyAppA
:
config :my_app_a, Oban,
name: MyAppA.Oban,
repo: MyApp.Repo
Then configure Oban for MyAppB
with a different name:
config :my_app_b, Oban,
name: MyAppB.Oban,
repo: MyApp.Repo,
queues: [default: 10]
Now, use the configured name when calling functions like Oban.insert/2
,
Oban.insert_all/2
, Oban.drain_queue/2
, etc., to reference the correct Oban
process for the current application.
Oban.insert(MyAppA.Oban, MyWorker.new(%{}))
Oban.insert_all(MyAppB.Oban, multi, :multiname, [MyWorker.new(%{})])
Oban.drain_queue(MyAppB.Oban, queue: :default)
Ecto Multi-tenancy
If you followed the Ecto guide on setting up multi-tenancy with foreign keys, you need to add an
exception for queries originating from Oban. All of Oban’s queries have the custom option oban:
true
to help you identify them in prepare_query/3
or other instrumentation:
# Sample code, only relevant if you followed the Ecto guide on multi tenancy with foreign keys.
defmodule MyApp.Repo do
use Ecto.Repo, otp_app: :my_app
require Ecto.Query
@impl true
def prepare_query(_operation, query, opts) do
cond do
opts[:skip_org_id] || opts[:schema_migration] || opts[:oban] ->
{query, opts}
org_id = opts[:org_id] ->
{Ecto.Query.where(query, org_id: ^org_id), opts}
true ->
raise "expected org_id or skip_org_id to be set"
end
end
end
Community
There are a few places to connect and communicate with other Oban users:
- Ask questions and discuss #oban on the Elixir Forum
- Request an invitation and join the #oban channel on Slack
- Learn about bug reports and upcoming features in the issue tracker
- Follow @sorentwo (Twitter)