IMPORTANT please read:This project for now does not accept pull requests, but feel free to open issues and discuss about bugs and features. PRs will be accepted later when a minimum level of stability is reached and I've the CLAs ready.
Disque, an in-memory, distributed job queue
Disque is an ongoing experiment to build a distributed, in-memory, message broker. Its goal is to capture the essence of the "Redis as a jobs queue" use case, which is usually implemented using blocking list operations, and move it into an ad-hoc, self-contained, scalable, and fault tolerant design, with simple to understand properties and guarantees, but still resembling Redis in terms of simplicity, performance, and implementation as a C non-blocking networked server.
Project history and status:
Disque started about four years ago as a fork of the Redis code base, however I (Salvatore) immediately realized that it was not practical to merge changes between the two projects. The project was put in hold.
At the same time Redis got support for modules. Disque was theoretically possible to implement as a Redis module, however there were still too many APIs missing.
During March 2018 I started implementing the Redis modules APIs needed in order to really support a distributed system like Disque as a Redis module, two fundamental additions were a cluster API and timers. At the same time I started a first porting, but there was too much work yet to do and mode APIs needed in the Redis modules side.
When the Redis 6 roadmap was finalized, I announced that a Disque module with the same API and capabilities was going to be part of the Redis 6 release, with the main difference of being released with the AGPL license.
Now (December 2019) the Disque module is finally available as alpha quality code.
What is the state of the Disque module compared to Disque 1.0 RC1 in the original Disque repository?
This is yet alpha code, even if it is moving forward fast, while the original Disque was a release candidate.
The module lacks the implementation of any persistence layer yet.
No graceful removal of nodes (leaving node state): this flag is yet to be implemented.
There are many bugs and it may crash. It started working just a few days ago.
A few commands may be missing. Notably there is no HELLO and INFO for Disque yet. Note that both also conflict with Redis command names, so they'll renamed to DISQUE HELLO and DISQUE INFO.
The license is different, the Disque module is AGPL while the original Disque project as a Redis fork was BSD.
All the missing features will be addressed in the next weeks in order to provide a module that is ready for beta testing, and later for production usage.
WARNING: This is unstable code and may not be suitable for production usage. The API is considered to be stable if not for details that may change in the future, however it's alpha code, so handle with care!
What is a message queue?
Hint: skip this section if you are familiar with message queues.
You know how humans use text messages to communicate, right? I could write my wife "please get the milk at the store", and she maybe will reply "Ok message received, I'll get two bottles on my way home".
A message queue is the same as human text messages, but for computer programs. For example a web application, when an user subscribes, may send another process, that handles sending emails, "please send the confirmation email to tom@example.net ".
Message systems like Disque allow communication between processes using different queues. So a process can send a message to a queue with a given name, and only processes which fetch messages from this queue will return those messages. Moreover, multiple processes can listen for messages in a given queue, and multiple processes can send messages to the same queue.
The important part of a message queue is to be able to provide guarantees so that messages are eventually delivered even in the face of failures. So even if in theory implementing a message queue is very easy, to write a very robust and scalable one is harder than it may appear.
Give me the details!
Disque is a distributed and fault tolerant message broker, so it works as a middle layer among processes that want to exchange messages.
Producers add messages that are served to consumers. Since message queues are often used in order to process delayed jobs, Disque often uses the term "job" in the API and in the documentation, however jobs are actually just messages in the form of strings, so Disque can be used for other use cases. In this documentation "jobs" and "messages" are used in an interchangeable way.
Job queues with a producer-consumer model are pretty common, so the devil is in the details. A few details about Disque are:
Disque is a synchronously replicated job queue. By default when a new job is added, it is replicated to W nodes before the client gets an acknowledgement about the job being added. W-1 nodes can fail and the message will still be delivered.
Disque supports both at-least-once and at-most-oncedelivery semantics. At-least-once delivery semantics is where most effort was spent in the design and implementation, while at-most-once semantics is a trivial result of using a retry timeset to 0 (which means, never re-queue the message again) and a replication factor of 1 for the message (not strictly needed, but it is useless to have multiple copies of a message around if it will be delivered at most one time). You can have, at the same time, both at-least-once and at-most-once jobs in the same queues and nodes, since this is a per message setting.
Disque at-least-once delivery is designed to approximate single deliverywhen possible, even during certain kinds of failures. This means that while Disque can only guarantee a number of deliveries equal or greater to one, it will try hard to avoid multiple deliveries whenever possible.
Disque is a distributed system where all nodes have the same role(aka, it is multi-master). Producers and consumers can attach to whatever node they like, and there is no need for producers and consumers of the same queue to stay connected to the same node. Nodes will automatically exchange messages based on load and client requests.
Disque is Available (it is an eventually consistent AP system in CAP terms): producers and consumers can make progress as long as a single node is reachable.
Disque supports optional asynchronous commandsthat are low latency for the client but provide less guarantees. For example a producer can add a job to a queue with a replication factor of 3, but may want to run away before knowing if the contacted node was really able to replicate it to the specified number of nodes or not. The node will replicate the message in the background in a best effort way.
Disque automatically re-queues messages that are not acknowledgedas already processed by consumers, after a message-specific retry time. There is no need for consumers to re-queue a message if it was not processed.
Disque uses explicit acknowledgesin order for a consumer to signal a message as delivered (or, using a different terminology, to signal a job as already processed).
Disque queues only provides best effort ordering. Each queue sorts messages based on the job creation time, which is obtained using the wall clockof the local node where the message was created (plus an incremental counter for messages created in the same millisecond), so messages created in the same node are normally delivered in the same order they were created. This is not causal ordering since correct ordering is violated in different cases: when messages are re-issued because not they are acknowledged, because of nodes local clock drifts, and when messages are moved to other nodes for load balancing and federation (in this case you end with queues having jobs originated in different nodes with different wall clocks). However all this also means that normally messages are not delivered in random order and usually messages created first are delivered first.
Note that since Disque does not provide strict FIFO semantics, technically speaking it should not be called a message queue, and it could better identified as a message broker. However I believe that at this point in the IT industry a message queueis often more lightly used to identify a generic broker that may or may not be able to guarantee order in all cases. Given that we document the semantics very clearly, I grant myself the right to call Disque a message queue anyway.
Disque provides the user with fine-grained control for each job using three time related parameters, and one replication parameter. For each job, the user can control:
The replication factor (how many nodes have a copy).
The delay time (the min time Disque will wait before putting the message in a queue, making the message deliverable).
The retry time (how much time should elapse since the last time the job was queued and without an acknowledge about the job delivery, before the job is re-queued for delivery).
The expire time (how much time should elapse for the job to be deleted regardless of whether it was successfully delivered, i.e. acknowledged, or not).
Finally, Disque supports optional disk persistence, which is not enabled by default, but that can be handy in single data center setups and during restarts.
Other minor features are:
Ability to block queues.
A few statistics about queue activity.
Stateless iterators for queues and jobs.
Commands to control the visiblity of single jobs.
Easy resize of the cluster (adding nodes is trivial).
Graceful removal of nodes without losing job replicas.
ACKs and retries
Disque's implementation of at-least-oncedelivery semantics is designed in order to avoid multiple delivery during certain classes of failures. It is not able to guarantee that no multiple deliveries will occur. However there are many at-least-onceworkloads where duplicated deliveries are acceptable (or explicitly handled), but not desirable either. A trivial example is sending emails to users (it is not terrible if an user gets a duplicated email, but is important to avoid it when possible), or doing idempotent operations that are expensive (all the times where it is critical for performance to avoid multiple deliveries).
In order to avoid multiple deliveries when possible, Disque uses client ACKs. When a consumer processes a message correctly, it should acknowledge this fact to Disque. ACKs are replicated to multiple nodes, and are garbage collected as soon as the system believes it is unlikely that more nodes in the cluster have the job (the ACK refers to) still active. Under memory pressure or under certain failure scenarios, ACKs are eventually discarded.
More explicitly:
A job is replicated to multiple nodes, but usually only queuedin a single node. There is a difference between having a job in memory, and queueing it for delivery.
Nodes having a copy of a message, if a certain amount of time has elapsed without getting the ACK for the message, will re-queue it. Nodes will run a best-effort protocol to avoid re-queueing the message multiple times.
ACKs are replicated and garbage collected across the cluster so that eventually processed messages are evicted (this happens ASAP if there are no failures nor network partitions).
For example, if a node having a copy of a job gets partitioned away during the time the job gets acknowledged by the consumer, it is likely that when it returns (in a reasonable amount of time, that is, before the retry time is reached) it will be informed about the ACK and will avoid to re-queue the message. Similarly, jobs can be acknowledged during a partition to just a single available node, and when the partition heals the ACK will be propagated to other nodes that may still have a copy of the message.
So an ACK is just a proof of deliverythat is replicated and retained for some time in order to make multiple deliveries less likely to happen in practice.
As already mentioned, in order to control replication and retries, a Disque job has the following associated properties: number of replicas, delay, retry and expire.
If a job has a retry time set to 0, it will get queued exactly once (and in this case a replication factor greater than 1 is useless, and signaled as an error to the user), so it will get delivered either a single time or will never get delivered. While jobs can be persisted on disk for safety, queues aren't, so this behavior is guaranteed even when nodes restart after a crash, whatever the persistence configuration is. However when nodes are manually restarted by the sysadmin, for example for upgrades, queues are persisted correctly and reloaded at startup, since the store/load operation is atomic in this case, and there are no race conditions possible (it is not possible that a job was delivered to a client and is persisted on disk as queued at the same time).
Fast acknowledges
Disque supports a faster way to acknowledge processed messages, via the FASTACK command. The normal acknowledge is very expensive from the point of view of messages exchanged between nodes, this is what happens during a normal acknowledge:
The client sends ACKJOB to one node.
The node sends a SETACK message to everybody it believes to have a copy.
The receivers of SETACK reply with GOTACK to confirm.
The node finally sends DELJOB to all the nodes.
Note: actual garbage collection is more complex in case of failures and is explained in the state machine later. The above is what happens 99% of times.
If a message is replicated to 3 nodes, acknowledging requires 1+2+2+2 messages, for the sake of retaining the ack if some nodes may not be reached when the message is acknowledged. This makes the probability of multiple deliveries of this message less likely.
However the alternative fast ack, while less reliable, is much faster and invovles exchanging less messages. This is how a fast acknowledge works:
The client sends FASTACK to one node.
The node evicts the job and sends a best effort DELJOB to all the nodes that may have a copy, or to all the cluster if the node was not aware of the job.
If during a fast acknowledge a node having a copy of the message is not reachable, for example because of a network partition, the node will deliver the message again, since it has a non-acknowledged copy of the message and there is nobody able to inform it the message has been acknowledged when the partition heals.
If the network you are using is pretty reliable, and you are very concerned with performance, and multiple deliveries in the context of your applications are a non issue, then FASTACK is probably the way to go.
Dead letter queue
Many message queues implement a feature called dead letter queue. It is a special queue used in order to accumu