opq alternatives and similar packages
Based on the "Queue" category.
Alternatively, view opq alternatives based on common mentions on social networks and blogs.
-
kaffe
An opinionated Elixir wrapper around brod, the Erlang Kafka client, that supports encrypted connections to Heroku Kafka out of the box. -
task_bunny
TaskBunny is a background processing application written in Elixir and uses RabbitMQ as a messaging backend -
hulaaki
DISCONTINUED. DEPRECATED : An Elixir library (driver) for clients communicating with MQTT brokers(via the MQTT 3.1.1 protocol). -
adap
Create a data stream across your information systems to query, augment and transform data according to Elixir matching rules. -
exdisque
Elixir client for Disque (https://github.com/antirez/disque), an in-memory, distributed job queue.
CodeRabbit: AI Code Reviews for Developers

Do you think we are missing an alternative of opq or a related project?
Popular Comparisons
README
OPQ: One Pooled Queue
Elixir Queue!
A simple, in-memory queue with worker pooling and rate limiting in Elixir. OPQ leverages Erlang's queue module and Elixir's GenStage.
Originally built to support Crawler.
Features
- A fast, in-memory FIFO queue.
- Worker pool.
- Rate limit.
- Timeouts.
- Pause / resume / stop the queue.
Installation
def deps do
[{:opq, "~> 3.2"}]
end
Usage
A simple example:
{:ok, opq} = OPQ.init()
OPQ.enqueue(opq, fn -> IO.inspect("hello") end)
OPQ.enqueue(opq, fn -> IO.inspect("world") end)
Specify module, function and arguments:
{:ok, opq} = OPQ.init()
OPQ.enqueue(opq, IO, :inspect, ["hello"])
OPQ.enqueue(opq, IO, :inspect, ["world"])
Specify a custom name for the queue:
OPQ.init(name: :items)
OPQ.enqueue(:items, fn -> IO.inspect("hello") end)
OPQ.enqueue(:items, fn -> IO.inspect("world") end)
Start as part of a supervision tree:
Note, when starting as part of a supervision tree, the :name
option must be provided.
children = [
{OPQ, name: :items}
]
Specify a custom worker to process items in the queue:
defmodule CustomWorker do
def start_link(item) do
Task.start_link(fn ->
Agent.update(:bucket, &[item | &1])
end)
end
end
Agent.start_link(fn -> [] end, name: :bucket)
{:ok, opq} = OPQ.init(worker: CustomWorker)
OPQ.enqueue(opq, "hello")
OPQ.enqueue(opq, "world")
Agent.get(:bucket, & &1) # => ["world", "hello"]
Rate limit:
{:ok, opq} = OPQ.init(workers: 1, interval: 1000)
Task.async(fn ->
OPQ.enqueue(opq, fn -> IO.inspect("hello") end)
OPQ.enqueue(opq, fn -> IO.inspect("world") end)
end)
If no interval is supplied, the ratelimiter will be bypassed.
Check the queue and number of available workers:
{:ok, opq} = OPQ.init()
OPQ.enqueue(opq, fn -> Process.sleep(1000) end)
{queue, available_workers} = OPQ.info(opq) # => {:normal, {[], []}, 9}
Process.sleep(1200)
{queue, available_workers} = OPQ.info(opq) # => {:normal, {[], []}, 10}
Stop the queue:
{:ok, opq} = OPQ.init()
OPQ.enqueue(opq, fn -> IO.inspect("hello") end)
OPQ.stop(opq)
OPQ.enqueue(opq, fn -> IO.inspect("world") end) # => (EXIT) no process...
Pause and resume the queue:
{:ok, opq} = OPQ.init()
OPQ.enqueue(opq, fn -> IO.inspect("hello") end) # => "hello"
OPQ.pause(opq)
OPQ.info(opq) # => {:paused, {[], []}, 10}
OPQ.enqueue(opq, fn -> IO.inspect("world") end)
OPQ.resume(opq) # => "world"
OPQ.info(opq) # => {:normal, {[], []}, 10}
Configurations
Option | Type | Default Value | Description |
---|---|---|---|
:name |
atom/module | pid | The name of the queue. |
:worker |
module | OPQ.Worker |
The worker that processes each item from the queue. |
:workers |
integer | 10 |
Maximum number of workers. |
:interval |
integer | 0 |
Rate limit control - number of milliseconds before asking for more items to process, defaults to 0 which is effectively no rate limit. |
:timeout |
integer | 5000 |
Number of milliseconds allowed to perform the work, it should always be set to higher than :interval . |
Changelog
Please see [CHANGELOG.md](CHANGELOG.md).
License
Licensed under MIT.
*Note that all licence references and agreements mentioned in the opq README section above
are relevant to that project's source code only.