Popularity
7.1
Growing
Activity
7.1
Declining
87
12
18

Monthly Downloads: 2,182
Programming language: Elixir
License: MIT License
Tags: Queue    
Latest version: v2.4.0

gen_rmq alternatives and similar packages

Based on the "Queue" category

Do you think we are missing an alternative of gen_rmq or a related project?

Add another 'Queue' Package

README

Build Status Hex Version Coverage Status Hex.pm Download Total Dependabot Status

GenRMQ

GenRMQ is a set of behaviours meant to be used to create RabbitMQ consumers and publishers. Internally it is using AMQP elixir RabbitMQ client. The idea is to reduce boilerplate consumer / publisher code, which usually includes:

  • creating connection / channel and keeping it in a state
  • creating and binding queue
  • handling reconnections / consumer cancellations

The project currently provides the following functionality:

  • GenRMQ.Consumer - a behaviour for implementing RabbitMQ consumers
  • GenRMQ.Publisher - a behaviour for implementing RabbitMQ publishers
  • GenRMQ.Processor - a behaviour for implementing RabbitMQ message processors
  • GenRMQ.RabbitCase - test utilities for RabbitMQ (example usage)

Installation

def deps do
  [{:gen_rmq, "~> 2.4.0"}]
end

Migrations

Please check how to migrate to gen_rmq 1.0.0 from previous versions.

Examples

More thorough examples for using GenRMQ.Consumer and GenRMQ.Publisher can be found in the examples directory.

Consumer

defmodule Consumer do
  @behaviour GenRMQ.Consumer

  def init() do
    [
      queue: "gen_rmq_in_queue",
      exchange: "gen_rmq_exchange",
      routing_key: "#",
      prefetch_count: "10",
      uri: "amqp://guest:guest@localhost:5672",
      retry_delay_function: fn attempt -> :timer.sleep(2000 * attempt) end
    ]
  end

  def consumer_tag() do
    "test_tag"
  end

  def handle_message(message) do
    ...
  end
end
GenRMQ.Consumer.start_link(Consumer, name: Consumer)

This will result in:

  • durable gen_rmq_exchange.deadletter exchange created or redeclared
  • durable gen_rmq_in_queue_error queue created or redeclared. It will be bound to gen_rmq_exchange.deadletter
  • durable topic gen_rmq_exchange exchange created or redeclared
  • durable gen_rmq_in_queue queue created or redeclared. It will be bound to gen_rmq_exchange exchange and has a deadletter exchange set to gen_rmq_exchange.deadletter
  • every handle_message callback will executed in separate process. This can be disabled by setting concurrency: false in init callback
  • on failed rabbitmq connection it will wait for a bit and then reconnect

There are many options to control the consumer setup details, please check the c:GenRMQ.Consumer.init/0 docs for all available settings.

Publisher

defmodule Publisher do
  @behaviour GenRMQ.Publisher

  def init() do
    [
      exchange: "gen_rmq_exchange",
      uri: "amqp://guest:guest@localhost:5672"
    ]
  end
end
GenRMQ.Publisher.start_link(Publisher, name: Publisher)
GenRMQ.Publisher.publish(Publisher, Jason.encode!(%{msg: "msg"}))

Telemetry

GenRMQ emits Telemetry events for both consumers and publishers. It currently exposes the following events:

  • [:gen_rmq, :publisher, :connection, :start] - Dispatched by a GenRMQ publisher when a connection to RabbitMQ is started

    • Measurement: %{time: System.monotonic_time}
    • Metadata: %{exchange: String.t}
  • [:gen_rmq, :publisher, :connection, :stop] - Dispatched by a GenRMQ publisher when a connection to RabbitMQ has been established

    • Measurement: %{time: System.monotonic_time, duration: native_time}
    • Metadata: %{exchange: String.t}
  • [:gen_rmq, :publisher, :connection, :down] - Dispatched by a GenRMQ publisher when a connection to RabbitMQ has been lost

    • Measurement: %{time: System.monotonic_time}
    • Metadata: %{module: atom, reason: atom}
  • [:gen_rmq, :publisher, :message, :start] - Dispatched by a GenRMQ publisher when a message is about to be published to RabbitMQ

    • Measurement: %{time: System.monotonic_time}
    • Metadata: %{exchange: String.t, message: String.t}
  • [:gen_rmq, :publisher, :message, :stop] - Dispatched by a GenRMQ publisher when a message has been published to RabbitMQ

    • Measurement: %{time: System.monotonic_time, duration: native_time}
    • Metadata: %{exchange: String.t, message: String.t}
  • [:gen_rmq, :publisher, :message, :error] - Dispatched by a GenRMQ publisher when a message failed to be published to RabbitMQ

    • Measurement: %{time: System.monotonic_time, duration: native_time}
    • Metadata: %{exchange: String.t, message: String.t, kind: atom, reason: atom}
  • [:gen_rmq, :consumer, :message, :ack] - Dispatched by a GenRMQ consumer when a message has been acknowledged

    • Measurement: %{time: System.monotonic_time}
    • Metadata: %{message: String.t}
  • [:gen_rmq, :consumer, :message, :reject] - Dispatched by a GenRMQ consumer when a message has been rejected

    • Measurement: %{time: System.monotonic_time}
    • Metadata: %{message: String.t, requeue: boolean}
  • [:gen_rmq, :consumer, :message, :start] - Dispatched by a GenRMQ consumer when the processing of a message has begun

    • Measurement: %{time: System.monotonic_time}
    • Metadata: %{message: String.t, module: atom}
  • [:gen_rmq, :consumer, :message, :stop] - Dispatched by a GenRMQ consumer when the processing of a message has completed

    • Measurement: %{time: System.monotonic_time, duration: native_time}
    • Metadata: %{message: String.t, module: atom}
  • [:gen_rmq, :consumer, :connection, :start] - Dispatched by a GenRMQ consumer when a connection to RabbitMQ is started

    • Measurement: %{time: System.monotonic_time}
    • Metadata: %{module: atom, attempt: integer, queue: String.t, exchange: String.t, routing_key: String.t}
  • [:gen_rmq, :consumer, :connection, :stop] - Dispatched by a GenRMQ consumer when a connection to RabbitMQ has been established

    • Measurement: %{time: System.monotonic_time, duration: native_time}
    • Metadata: %{module: atom, attempt: integer, queue: String.t, exchange: String.t, routing_key: String.t}
  • [:gen_rmq, :consumer, :connection, :error] - Dispatched by a GenRMQ consumer when a connection to RabbitMQ could not be made

    • Measurement: %{time: System.monotonic_time}
    • Metadata: %{module: atom, attempt: integer, queue: String.t, exchange: String.t, routing_key: String.t, error: any}
  • [:gen_rmq, :consumer, :connection, :down] - Dispatched by a GenRMQ consumer when a connection to RabbitMQ has been lost

    • Measurement: %{time: System.monotonic_time}
    • Metadata: %{module: atom, reason: atom}

Running tests

You need docker-compose installed.

$ make test

How to contribute

We happily accept contributions in the form of Github PRs or in the form of bug reports, comments/suggestions or usage questions by creating a github issue.

Notes on project maturity

This library was developed as a Meltwater internal project starting in January 2018. Over the next two months it has been used in at least three Meltwater production services. Are you using gen_rmq in production? Please let us know, we are curious!

License

The MIT License (MIT)

Copyright (c) 2018 - 2019 Meltwater Inc. http://underthehood.meltwater.com/


*Note that all licence references and agreements mentioned in the gen_rmq README section above are relevant to that project's source code only.