Play redis simple message queue

A simple message queue based on redis is written in go language
Source address
Using demo

redis's list is very flexible. You can add elements from the left or the right, and also read data from any end

Adding and getting data is also very simple
LPUSH inserts data from the left
RPUSH big right insert data
LPOP takes a data from the left
RPOP takes a data from the right

127.0.0.1:6379> LPUSH list1 a
(integer) 1
127.0.0.1:6379> RPUSH list1 b
(integer) 2
127.0.0.1:6379> LPOP list1
"a"
127.0.0.1:6379> RPOP list1
"b"

Or use BLPOP BRPOP to read data. The difference is that when fetching data, if there is no data, it will wait for the specified time,
If there is data written during this period, it will be read and returned, and if there is no data, it will return null
Read in a window 1

127.0.0.1:6379> BLPOP list1 10
1) "list1"
2) "a"

Write in another window 2

127.0.0.1:6379> RPUSH list1 a b c
(integer) 3

Open another window 3 for reading. When reading the second time, the list is empty, so wait for 1 second and return to empty.

127.0.0.1:6379> BRPOP list1 1
1) "list1"
2) "c"

127.0.0.1:6379> BRPOP list1 1
(nil)
(1.04s)

Implementation of simple message queue

If we only add elements from one side and take them out from the other, it's not a message queue. But I guess you have a question: when consuming data, will the same message be consumed by multiple consumer s at the same time?

Of course not. Because redis is a single thread, there will be no concurrency problem when fetching data from the list. But this is a simple message queue. We need to write our own code to deal with the unsuccessful consumption

Let me talk about the whole idea of using list to implement a simple message queue

Implementation of comsumer

The main purpose of consumer is to read data from the list. You can use LPOP or BLPOP,
In this case, if the useblocp of options is true, BLPOP will be used.

type consumer struct {
    once            sync.Once
    redisCmd        redis.Cmdable
    ctx             context.Context
    topicName       string
    handler         Handler
    rateLimitPeriod time.Duration
    options         ConsumerOptions
    _               struct{}
}

type ConsumerOptions struct {
    RateLimitPeriod time.Duration
    UseBLPop        bool
}

Take a look at the code to create consumer. The last opts parameter is optional configuration

type Consumer = *consumer

func NewSimpleMQConsumer(ctx context.Context, redisCmd redis.Cmdable, topicName string, opts ...ConsumerOption) Consumer {
    consumer := &consumer{
        redisCmd:  redisCmd,
        ctx:       ctx,
        topicName: topicName,
    }
    for _, o := range opts {
        o(&consumer.options)
    }
    if consumer.options.RateLimitPeriod == 0 {
        consumer.options.RateLimitPeriod = time.Microsecond * 200
    }
    return consumer
}

After reading the data, how to deal with it? The caller can deal with it according to his own business logic
There's a small interface caller that implements it according to its own logic

type Handler interface {
    HandleMessage(msg *Message)
}

The logic of reading data is implemented by a gorouting

func (s *consumer) startGetMessage() {
    go func() {
        ticker := time.NewTicker(s.options.RateLimitPeriod)
        defer func() {
            log.Println("stop get message.")
            ticker.Stop()
        }()
        for {
            select {
            case <-s.ctx.Done():
                log.Printf("context Done msg: %#v \n", s.ctx.Err())
                return
            case <-ticker.C:
                var revBody []byte
                var err error
                if !s.options.UseBLPop {
                    revBody, err = s.redisCmd.LPop(s.topicName).Bytes()
                } else {
                    revs := s.redisCmd.BLPop(time.Second, s.topicName)
                    err = revs.Err()
                    revValues := revs.Val()
                    if len(revValues) >= 2 {
                        revBody = []byte(revValues[1])
                    }
                }
                if err == redis.Nil {
                    continue
                }
                if err != nil {
                    log.Printf("LPOP error: %#v \n", err)
                    continue
                }

                if len(revBody) == 0 {
                    continue
                }
                msg := &Message{}
                json.Unmarshal(revBody, msg)
                if s.handler != nil {
                    s.handler.HandleMessage(msg)
                }
            }
        }
    }()
}

The implementation of Producer

Producer is still very simple to push data to reids

type Producer struct {
    redisCmd redis.Cmdable
    _        struct{}
}

func NewProducer(cmd redis.Cmdable) *Producer {
    return &Producer{redisCmd: cmd}
}

func (p *Producer) Publish(topicName string, body []byte) error {
    msg := NewMessage("", body)
    sendData, _ := json.Marshal(msg)
    return p.redisCmd.RPush(topicName, string(sendData)).Err()
}

Tags: Database Redis JSON Go

Posted on Mon, 13 Apr 2020 22:41:28 -0700 by taskagent