Distributed work-queues?

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Distributed work-queues?

Andrew Miklas
Hi all,

Has anyone written a work-queue implementation using Cassandra?

There's a section in the UseCase wiki page for "A distributed Priority  
Job Queue" which looks perfect, but unfortunately it hasn't been  
filled in yet.
http://wiki.apache.org/cassandra/UseCases#A_distributed_Priority_Job_Queue

I've been thinking about how best to do this, but every solution I've  
thought of seems to have some serious drawback.  The "range ghost"  
problem in particular creates some issues.  I'm assuming each job has  
a row within some column family, where the row's key is the time at  
which the job should be run.  To find the next job, you'd do a range  
query with a start a few hours in the past, and an end at the current  
time.  Once a job is completed, you delete the row.

The problem here is that you have to scan through deleted-but-not-yet-
GCed rows each time you run the query.  Is there a better way?

Preventing more than one worker from starting the same job seems like  
it would be a problem too.  You'd either need an external locking  
manager, or have to use some other protocol where workers write their  
ID into the row and then immediately read it back to confirm that they  
are the owner of the job.

Any ideas here?  Has anyone come up with a nice implementation?  Is  
Cassandra not well suited for queue-like tasks?



Thanks,


Andrew
Reply | Threaded
Open this post in threaded view
|

Re: Distributed work-queues?

Vram Kouramajian


We have implemented a distributed queue (similar to AWS SQS)  and a job queue in Cassandra.

Vram


On Sat, Jun 26, 2010 at 1:56 PM, Andrew Miklas <[hidden email]> wrote:
Hi all,

Has anyone written a work-queue implementation using Cassandra?

There's a section in the UseCase wiki page for "A distributed Priority Job Queue" which looks perfect, but unfortunately it hasn't been filled in yet.
http://wiki.apache.org/cassandra/UseCases#A_distributed_Priority_Job_Queue

I've been thinking about how best to do this, but every solution I've thought of seems to have some serious drawback.  The "range ghost" problem in particular creates some issues.  I'm assuming each job has a row within some column family, where the row's key is the time at which the job should be run.  To find the next job, you'd do a range query with a start a few hours in the past, and an end at the current time.  Once a job is completed, you delete the row.

The problem here is that you have to scan through deleted-but-not-yet-GCed rows each time you run the query.  Is there a better way?

Preventing more than one worker from starting the same job seems like it would be a problem too.  You'd either need an external locking manager, or have to use some other protocol where workers write their ID into the row and then immediately read it back to confirm that they are the owner of the job.

Any ideas here?  Has anyone come up with a nice implementation?  Is Cassandra not well suited for queue-like tasks?



Thanks,


Andrew

Reply | Threaded
Open this post in threaded view
|

Re: Distributed work-queues?

Jonathan Shook
In reply to this post by Andrew Miklas
Ideas:

Use a checkpoint that moves forward in time for each logical partition
of the workload.

Establish a way of dividing up jobs between clients that doesn't
require synchronization. One way of doing this would be to modulo the
key by the number of logical workers, allowing them to graze directly
on the job data. Doing it this way means that you have to make the
workers smart enough to checkpoint properly, handle exceptions, etc.
Jobs may be dispatched out-of-order in this scheme, so you would have
to decide how to handle explicit sequencing requirements. Some jobs
have idempotent results only when executed in the same order, and
keeping operations idempotent allows for simpler failure recovery. If
your workers are capable of absorbing the workload, then backlogging
won't hurt too much. Otherwise, you'll see strange ordering of things
in your application when they would otherwise need to look more
consistent.

You might find it easier to just take the hit of having a synchronized
dispatcher, but make it is a lean as possible.

Another way to break workload up is to have logical groupings of jobs
according to a natural boundary in your domain model, and to run a
synchronized dispatcher for each of those.

Using the "job" columns to keep track of who owns a job may not be the
best approach. You may have to do row scans on column data, which is a
Cassandra anti-pattern. Without an atomic check and modify operation,
there is no way to do it that avoids possible race conditions or extra
state management. This may be one of the strongest arguments for
putting such an operation into Cassandra.

You can set up your job name/keying such that every job result is
logically ordered to come immediately after the job definition. Row
key range scans would still be close to optimal, but would carry a
marker for jobs which had been completed, This would allow clients to
self-checkpoint, as long as result insertions are atomic row-wise. (I
think they are). Another worker could clean up rows which were
subsequently consumed (results no longer needed) after some gap in
time.  The client can avoid lots of tombstones by only looking where
there should be additional work. (checkpoint time). Pick a character
that is not natural for your keys and make it a delimiter. Require
that all keys in the job CF be aggregate and fully-qualified.

Clients might be able to remove jobs rows that allow for it after
completion, but jobs which were dispatched to multiple works may end
up with orphaned result rows to be cleaned up.

.. just some drive-by ramblings ..

Jonathan



On Sat, Jun 26, 2010 at 3:56 PM, Andrew Miklas <[hidden email]> wrote:

> Hi all,
>
> Has anyone written a work-queue implementation using Cassandra?
>
> There's a section in the UseCase wiki page for "A distributed Priority Job
> Queue" which looks perfect, but unfortunately it hasn't been filled in yet.
> http://wiki.apache.org/cassandra/UseCases#A_distributed_Priority_Job_Queue
>
> I've been thinking about how best to do this, but every solution I've
> thought of seems to have some serious drawback.  The "range ghost" problem
> in particular creates some issues.  I'm assuming each job has a row within
> some column family, where the row's key is the time at which the job should
> be run.  To find the next job, you'd do a range query with a start a few
> hours in the past, and an end at the current time.  Once a job is completed,
> you delete the row.
>
> The problem here is that you have to scan through deleted-but-not-yet-GCed
> rows each time you run the query.  Is there a better way?
>
> Preventing more than one worker from starting the same job seems like it
> would be a problem too.  You'd either need an external locking manager, or
> have to use some other protocol where workers write their ID into the row
> and then immediately read it back to confirm that they are the owner of the
> job.
>
> Any ideas here?  Has anyone come up with a nice implementation?  Is
> Cassandra not well suited for queue-like tasks?
>
>
>
> Thanks,
>
>
> Andrew
>
Reply | Threaded
Open this post in threaded view
|

Re: Distributed work-queues?

Jeremy Davis-3
The approach that I took incorporates some of the ideas listed here...
Basically each message in the queue was assigned a sequence number (needs to be unique and increasing per queue), and then read out in sequence number order.
The Message CF is logically one row per queue, with each column being the message (Name=sequenceNum, val=data). In reality it is bucketed to get around the row size and smaller rows seem to be better.
 

Another CF was used to keep track of the Highest Completed Sequence Number (per queue) and was updated after committing products made from the messages. When products were not idempotent, they were also stored with their corresponding highest completed sequence number. In this way you create a bit of journaling and should always move forward.

The problem was creating the sequence numbers. I didn't want to rely on system time, because I was afraid that in a cluster the times would be out of skew. Even with NTP, etc, and I would end up with some messages out of order (though rarely). This may not be a problem for some/most.

I didn't want to rely on a system like zookeeper (or pick your favorite source), because to store a message I didn't want to hit the sequence source service and then store.
A DB High/Low type algorithm would work fine as long as you managed the High correctly and didn't have multiple High's running for the same Queue at the same time.


So I used ZK to assign responsibility for certain queues to certain machines (load balance issue here). When a machine took the queue range, it would get a new sequence number (64bit High) from ZK or Clock or DB etc, and then every additional message processed would get a local 32 bit counter(Low)... Not entirely satisfied with this scheme yet.



As far as scheduling the jobs.

When a message is inserted, it also updates an 'Activity' CF (actually updates happen every few messages rand()%5==0 etc). The Activity CF maps from QueueId to a single activity column (last update time and sequenceNumber) (Actually there is some bucketing/caching for efficiency) This can all be cached/asynch/lazy, because repeat work is already handled.

Workers (which are already assigned a range of queue responsibility by ZK) scan the activity table (in queue ID order) for any entry. When the job is completed the activity entry is deleted. If more activity happened in the mean time, the delete will be ignored because the TS is !=.  If Cassandra provided more complex queries, you could create more complex work scheduling behavior.

There is no priority in this scheme.
 
Original Messages are left undeleted for now. But if you wanted to delete them I would use some of the coming bulk deletion/truncate features, like drop the entire CF.

-JD




On Sat, Jun 26, 2010 at 3:31 PM, Jonathan Shook <[hidden email]> wrote:
Ideas:

Use a checkpoint that moves forward in time for each logical partition
of the workload.

Establish a way of dividing up jobs between clients that doesn't
require synchronization. One way of doing this would be to modulo the
key by the number of logical workers, allowing them to graze directly
on the job data. Doing it this way means that you have to make the
workers smart enough to checkpoint properly, handle exceptions, etc.
Jobs may be dispatched out-of-order in this scheme, so you would have
to decide how to handle explicit sequencing requirements. Some jobs
have idempotent results only when executed in the same order, and
keeping operations idempotent allows for simpler failure recovery. If
your workers are capable of absorbing the workload, then backlogging
won't hurt too much. Otherwise, you'll see strange ordering of things
in your application when they would otherwise need to look more
consistent.

You might find it easier to just take the hit of having a synchronized
dispatcher, but make it is a lean as possible.

Another way to break workload up is to have logical groupings of jobs
according to a natural boundary in your domain model, and to run a
synchronized dispatcher for each of those.

Using the "job" columns to keep track of who owns a job may not be the
best approach. You may have to do row scans on column data, which is a
Cassandra anti-pattern. Without an atomic check and modify operation,
there is no way to do it that avoids possible race conditions or extra
state management. This may be one of the strongest arguments for
putting such an operation into Cassandra.

You can set up your job name/keying such that every job result is
logically ordered to come immediately after the job definition. Row
key range scans would still be close to optimal, but would carry a
marker for jobs which had been completed, This would allow clients to
self-checkpoint, as long as result insertions are atomic row-wise. (I
think they are). Another worker could clean up rows which were
subsequently consumed (results no longer needed) after some gap in
time.  The client can avoid lots of tombstones by only looking where
there should be additional work. (checkpoint time). Pick a character
that is not natural for your keys and make it a delimiter. Require
that all keys in the job CF be aggregate and fully-qualified.

Clients might be able to remove jobs rows that allow for it after
completion, but jobs which were dispatched to multiple works may end
up with orphaned result rows to be cleaned up.

.. just some drive-by ramblings ..

Jonathan



On Sat, Jun 26, 2010 at 3:56 PM, Andrew Miklas <[hidden email]> wrote:
> Hi all,
>
> Has anyone written a work-queue implementation using Cassandra?
>
> There's a section in the UseCase wiki page for "A distributed Priority Job
> Queue" which looks perfect, but unfortunately it hasn't been filled in yet.
> http://wiki.apache.org/cassandra/UseCases#A_distributed_Priority_Job_Queue
>
> I've been thinking about how best to do this, but every solution I've
> thought of seems to have some serious drawback.  The "range ghost" problem
> in particular creates some issues.  I'm assuming each job has a row within
> some column family, where the row's key is the time at which the job should
> be run.  To find the next job, you'd do a range query with a start a few
> hours in the past, and an end at the current time.  Once a job is completed,
> you delete the row.
>
> The problem here is that you have to scan through deleted-but-not-yet-GCed
> rows each time you run the query.  Is there a better way?
>
> Preventing more than one worker from starting the same job seems like it
> would be a problem too.  You'd either need an external locking manager, or
> have to use some other protocol where workers write their ID into the row
> and then immediately read it back to confirm that they are the owner of the
> job.
>
> Any ideas here?  Has anyone come up with a nice implementation?  Is
> Cassandra not well suited for queue-like tasks?
>
>
>
> Thanks,
>
>
> Andrew
>

Reply | Threaded
Open this post in threaded view
|

Re: Distributed work-queues?

devshorts
This post has NOT been accepted by the mailing list yet.
I wanted to chime in and share that there is a Cassandra queue docker project called cassieq https://github.com/paradoxical-io/cassieq which uses bucketizing and reader pointers to give efficient queue based semantics. Disclaimer, I'm a contributor on the project, but felt it was worth sharing here