What's the best way of implementing a messaging queue table in mysql

32,395

Solution 1

I've built a few message queuing systems and I'm not certain what type of message you're referring to, but in the case of the dequeuing (is that a word?) I've done the same thing you've done. Your method looks simple, clean and solid. Not that my work is the best, but it's proven very effective for large-monitoring for many sites. (error logging, mass email marketing campaigns, social networking notices)

My vote: no worries!

Solution 2

Your dequeue could be more concise. Rather than relying on the transaction rollback, you could do it in one atomic statement without an explicit transaction:

UPDATE jobs SET process_id = ? WHERE process_id IS NULL ORDER BY ID ASC LIMIT 1;

Then you can pull jobs with (brackets [] mean optional, depending on your particulars):

SELECT * FROM jobs WHERE process_id = ? [ORDER BY ID LIMIT 1];

Solution 3

Brian Aker talked about a queue engine a while ago. There's been talk about a SELECT table FROM DELETE syntax, too.

If you're not worried about throughput, you can always use SELECT GET_LOCK() as a mutex. For example:

SELECT GET_LOCK('READQUEUE');
SELECT * FROM jobs;
DELETE FROM JOBS WHERE ID = ?;
SELECT RELEASE_LOCK('READQUEUE');

And if you want to get really fancy, wrap it in a stored procedure.

Solution 4

In MySQL 8 you can use the new NOWAIT and SKIP LOCKED keywords to avoid complexity with special locking mechanisms:

START TRANSACTION;
SELECT id, message FROM jobs
 WHERE process_id IS NULL
 ORDER BY id ASC LIMIT 1
 FOR UPDATE SKIP LOCKED;
UPDATE jobs
 SET process_id = ?
 WHERE id = ?;
COMMIT;

Traditionally this was hard to achieve without hacks and unusual special tables or columns, unreliable solutions or losing concurrency.

SKIP LOCKED may cause performance issues with extremely large numbers of consumers.

This still does not however handle automatically marking the job complete on transaction rollback. For this you may need save points. That however might not solve all cases. You would really want to set an action to execute on transaction failure but as part of the transaction!

In future it's possible there may be more features to help optimise with cases such as an update that can also return the matched rows. It's important to keep apprised of new features and capabilities in the change log.

Solution 5

Here is a solution I used, working without the process_id of the current thread, or locking the table.

SELECT * from jobs ORDER BY ID ASC LIMIT 0,1;

Get the result in a $row array, and execute:

DELETE from jobs WHERE ID=$row['ID'];

Then get the affected rows(mysql_affected_rows). If there are affected rows, process the job in the $row array. If there are 0 affected rows, it means some other process is already processing the selected job. Repeat the above steps until there are no rows.

I've tested this with a 'jobs' table having 100k rows, and spawning 20 concurrent processes that do the above. No race conditions happened. You can modify the above queries to update a row with a processing flag, and delete the row after you actually processed it:

while(time()-$startTime<$timeout)
{
SELECT * from jobs WHERE processing is NULL ORDER BY ID ASC LIMIT 0,1;
if (count($row)==0) break;
UPDATE jobs set processing=1 WHERE ID=$row['ID'];
if (mysql_affected_rows==0) continue;
//process your job here
DELETE from jobs WHERE ID=$row['ID'];
}

Needless to say, you should use a proper message queue (ActiveMQ, RabbitMQ, etc) for this kind of work. We had to resort to this solution though, as our host regularly breaks things when updating software, so the less stuff to break the better.

Share:
32,395

Related videos on Youtube

taw
Author by

taw

Ruby/Perl/Python/* freelancer from London. Writes a blog and codes some Open Source stuff in free time.

Updated on February 01, 2020

Comments

  • taw
    taw about 4 years

    It's probably the tenth time I'm implementing something like this, and I've never been 100% happy about solutions I came up with.

    The reason using mysql table instead of a "proper" messaging system is attractive is primarily because most application already use some relational database for other stuff (which tends to be mysql for most of the stuff I've been doing), while very few applications use a messaging system. Also - relational databases have very strong ACID properties, while messaging systems often don't.

    The first idea is to use:

    create table jobs(
      id auto_increment not null primary key,
      message text not null,
      process_id varbinary(255) null default null,
      key jobs_key(process_id) 
    );
    

    And then enqueue looks like this:

    insert into jobs(message) values('blah blah');
    

    And dequeue looks like this:

    begin;
    select * from jobs where process_id is null order by id asc limit 1;
    update jobs set process_id = ? where id = ?; -- whatever i just got
    commit;
    -- return (id, message) to application, cleanup after done
    

    Table and enqueue look nice, but dequeue kinda bothers me. How likely is it to rollback? Or to get blocked? What keys I should use to make it O(1)-ish?

    Or is there any better solution that what I'm doing?

    • dkretz
      dkretz over 15 years
      I would try to do something like: UPDATE jobs SET process_id = id_arg WHERE id = (SELECT MIN(id) FROM jobs WHERE process_id IS NULL) <br> SELECT fields FROM jobs WHERE process_id = id_arg
    • mpeters
      mpeters almost 10 years
      Your dequeue design suffers from a race condition. You either need to turn your SELECT into a "SELECT ... FOR UPDATE" or you need to do the UPDATE first (as suggested by @pawstrong) since an UPDATE is atomic.
    • NeverEndingQueue
      NeverEndingQueue about 6 years
      In RabbitMQ task will get repeated if connection between a worker and broker dies. How to do that with MySQL, shall we add a field: ping and update it during the task execution? In theory there could be max timeout of pending task, but then if you have long-running tasks (several hours) tasks won't get repeated immediately.
    • jk2K
      jk2K over 2 years
  • angularsen
    angularsen over 8 years
    Hangfire.io is also a great alternative.
  • Kevin
    Kevin almost 6 years
    Excellent! How can I do the same thing with PostgreSQL?
  • jgmjgm
    jgmjgm over 4 years
    That method has performance or concurrency issues. You may or may not notice it. You may not realise if jobs are being processed twice every now and again if for example it's saving log lines. You may also have deadlocks with that method. It's using optimistic locking so it falls back to using versioning and conflict resolution. Two concurrent queries can get the same job then one deadlocks or has a conflict on the update also depending on consistency level. Alternatively process_id is clobbered. It works a lot of the time and with certain implementations but not all the time.