Popularity
7.7
Stable
Activity
6.4
-
255
11
8

Monthly Downloads: 4,346
Programming language: Elixir
License: MIT License
Tags: Queue    
Latest version: v3.3.0

opq alternatives and similar packages

Based on the "Queue" category.
Alternatively, view opq alternatives based on common mentions on social networks and blogs.

Do you think we are missing an alternative of opq or a related project?

Add another 'Queue' Package

README

OPQ: One Pooled Queue

Build Status CodeBeat Coverage Hex.pm

Elixir Queue!

A simple, in-memory queue with worker pooling and rate limiting in Elixir. OPQ leverages Erlang's queue module and Elixir's GenStage.

Originally built to support Crawler.

Features

  • A fast, in-memory FIFO queue.
  • Worker pool.
  • Rate limit.
  • Timeouts.
  • Pause / resume / stop the queue.

Installation

def deps do
  [{:opq, "~> 3.2"}]
end

Usage

A simple example:

{:ok, opq} = OPQ.init()

OPQ.enqueue(opq, fn -> IO.inspect("hello") end)
OPQ.enqueue(opq, fn -> IO.inspect("world") end)

Specify module, function and arguments:

{:ok, opq} = OPQ.init()

OPQ.enqueue(opq, IO, :inspect, ["hello"])
OPQ.enqueue(opq, IO, :inspect, ["world"])

Specify a custom name for the queue:

OPQ.init(name: :items)

OPQ.enqueue(:items, fn -> IO.inspect("hello") end)
OPQ.enqueue(:items, fn -> IO.inspect("world") end)

Start as part of a supervision tree:

Note, when starting as part of a supervision tree, the :name option must be provided.

children = [
  {OPQ, name: :items}
]

Specify a custom worker to process items in the queue:

defmodule CustomWorker do
  def start_link(item) do
    Task.start_link(fn ->
      Agent.update(:bucket, &[item | &1])
    end)
  end
end

Agent.start_link(fn -> [] end, name: :bucket)

{:ok, opq} = OPQ.init(worker: CustomWorker)

OPQ.enqueue(opq, "hello")
OPQ.enqueue(opq, "world")

Agent.get(:bucket, & &1) # => ["world", "hello"]

Rate limit:

{:ok, opq} = OPQ.init(workers: 1, interval: 1000)

Task.async(fn ->
  OPQ.enqueue(opq, fn -> IO.inspect("hello") end)
  OPQ.enqueue(opq, fn -> IO.inspect("world") end)
end)

If no interval is supplied, the ratelimiter will be bypassed.

Check the queue and number of available workers:

{:ok, opq} = OPQ.init()

OPQ.enqueue(opq, fn -> Process.sleep(1000) end)

{queue, available_workers} = OPQ.info(opq) # => {:normal, {[], []}, 9}

Process.sleep(1200)

{queue, available_workers} = OPQ.info(opq) # => {:normal, {[], []}, 10}

Stop the queue:

{:ok, opq} = OPQ.init()

OPQ.enqueue(opq, fn -> IO.inspect("hello") end)
OPQ.stop(opq)
OPQ.enqueue(opq, fn -> IO.inspect("world") end) # => (EXIT) no process...

Pause and resume the queue:

{:ok, opq} = OPQ.init()

OPQ.enqueue(opq, fn -> IO.inspect("hello") end) # => "hello"
OPQ.pause(opq)
OPQ.info(opq) # => {:paused, {[], []}, 10}
OPQ.enqueue(opq, fn -> IO.inspect("world") end)
OPQ.resume(opq) # => "world"
OPQ.info(opq) # => {:normal, {[], []}, 10}

Configurations

Option Type Default Value Description
:name atom/module pid The name of the queue.
:worker module OPQ.Worker The worker that processes each item from the queue.
:workers integer 10 Maximum number of workers.
:interval integer 0 Rate limit control - number of milliseconds before asking for more items to process, defaults to 0 which is effectively no rate limit.
:timeout integer 5000 Number of milliseconds allowed to perform the work, it should always be set to higher than :interval.

Changelog

Please see [CHANGELOG.md](CHANGELOG.md).

License

Licensed under MIT.


*Note that all licence references and agreements mentioned in the opq README section above are relevant to that project's source code only.