Redis实现消息队列

2019-01-18 16:09:57 消息队列 3407 0
本文配套源码

https://github.com/HaleyLeoZhang/message_queue

注意:当前时间戳,在执行以下操作前,就已经获取到了,后面的几步只是复用这个值

redis队列数据说明

队列数据 = 队列唯一编号+队列内容+重试次数

{"job":{"id":"FC3LJXSJWX4DI6RIWOFEHAJYDJ7T0OIU","attemps":0,"data":"O:28:\"HaleyLeoZhang\\Job\\EchoLogJob\":4:{s:10:\"\u0000*\u0000payload\";N;s:13:\"\u0000*\u0000delay_time\";i:3;s:13:\"\u0000*\u0000queue_name\";N;s:6:\"object\";O:8:\"stdClass\":3:{s:2:\"id\";i:1;s:4:\"name\";s:13:\"HaleyLeoZhang\";s:4:\"time\";i:1548040630;}}"}}

延迟队列

插入延迟任务
zadd 延迟队列名 执行分数(延迟秒数+当前时间戳[整型]) redis队列数据)
示例
"ZADD" "queues:echo_log_job:delayed" "1489802332" {"job":{"id":"FC3LJXSJWX4DI6RIWOFEHAJYDJ7T0OIU","attemps":0,"data":"O:28:\"HaleyLeoZhang\\Job\\EchoLogJob\":4:{s:10:\"\u0000*\u0000payload\";N;s:13:\"\u0000*\u0000delay_time\";i:3;s:13:\"\u0000*\u0000queue_name\";N;s:6:\"object\";O:8:\"stdClass\":3:{s:2:\"id\";i:1;s:4:\"name\";s:13:\"HaleyLeoZhang\";s:4:\"time\";i:1548040630;}}"}}
取出该执行的所有延迟任务
zrangebyscore 延迟队列名  最小分值(负无穷)  最大分值(当前时间戳)
rpush 队列名  队列数据
示例
"zrangebyscore" "queues:echo_log_job:delayed" "-inf" "1489802333"
清空到刚刚取出来的那些任务

负无穷到当前时间戳的分值

"zremrangebyrank" "queues:echo_log_job:delayed" "0" 刚刚取出来的任务数量-1(因为下标是从0开始算)
示例
"zremrangebyrank" "queues:echo_log_job:delayed" "0" "0"
插入刚刚取出来的那些任务到对应队列
rpush 队列名  队列数据
示例
 "rpush" "queues:echo_log_job" {"job":{"id":"FC3LJXSJWX4DI6RIWOFEHAJYDJ7T0OIU","attemps":0,"data":"O:28:\"HaleyLeoZhang\\Job\\EchoLogJob\":4:{s:10:\"\u0000*\u0000payload\";N;s:13:\"\u0000*\u0000delay_time\";i:3;s:13:\"\u0000*\u0000queue_name\";N;s:6:\"object\";O:8:\"stdClass\":3:{s:2:\"id\";i:1;s:4:\"name\";s:13:\"HaleyLeoZhang\";s:4:\"time\";i:1548040630;}}"}}

消费部分

从对应队列中取出一个任务
lpop 队列名
示例
"lpop" "queues:echo_log_job"
存入待确认执行成功数据

获取当前执行的时间戳+超时时间,zadd插入到对应队列的reserved数据结构中,用以任务失败后,重新放入队列

示例
"zadd" "queues:echo_log_job:reserved" "1489802343" {"job":{"id":"FC3LJXSJWX4DI6RIWOFEHAJYDJ7T0OIU","attemps":0,"data":"O:28:\"HaleyLeoZhang\\Job\\EchoLogJob\":4:{s:10:\"\u0000*\u0000payload\";N;s:13:\"\u0000*\u0000delay_time\";i:3;s:13:\"\u0000*\u0000queue_name\";N;s:6:\"object\";O:8:\"stdClass\":3:{s:2:\"id\";i:1;s:4:\"name\";s:13:\"HaleyLeoZhang\";s:4:\"time\";i:1548040630;}}"}}
如果消息确认完成,则删除这个队列的zset中的对应键
zrem "queues:echo_log_job:reserved" {"job":{"id":"FC3LJXSJWX4DI6RIWOFEHAJYDJ7T0OIU","attemps":0,"data":"O:28:\"HaleyLeoZhang\\Job\\EchoLogJob\":4:{s:10:\"\u0000*\u0000payload\";N;s:13:\"\u0000*\u0000delay_time\";i:3;s:13:\"\u0000*\u0000queue_name\";N;s:6:\"object\";O:8:\"stdClass\":3:{s:2:\"id\";i:1;s:4:\"name\";s:13:\"HaleyLeoZhang\";s:4:\"time\";i:1548040630;}}"}}

监听部分

程序端,for循环拉取,如果任务很多,考虑CPU占用
可以在处理完一次数据后,指定等待时间挂起进程再拉取新的任务

应用场景

购物秒杀

- TODO

通讯录或者用户足迹上传

- TODO

区块链机器人玩家

- TODO

引申

下篇文章云天河将会讲解 RabbitMQ 的一些运作原理,及其与 Redis 实现的消息队列的优缺点,此外会引入下 KafkaRabbitMQ 各自适用的场景

注:若无特殊说明,文章均为云天河原创,请尊重作者劳动成果,转载前请一定要注明出处