c语言编程笔录

首页 >   > 笔记大全

笔记大全

golang实现mq

更新时间:2023-11-12

什么是MQ?

MQ(Message Queue)是一种为分布式应用设计的中间件。它通过异步和并发执行,解耦不同的组件,提高应用程序的性能和可靠性。MQ可以基于内存、文件系统、数据库等不同的存储介质来实现消息的持久化和传输。常见的MQ有Kafka、RabbitMQ和ActiveMQ等。

如何使用Golang实现MQ?

Golang是一种高效、安全、并发的编程语言。使用Golang实现MQ可以利用其高性能和并发特性,实现轻松的水平扩展,以及高吞吐量的消息传输。下面我们将主要介绍如何使用Golang实现基于内存的MQ。


type Message struct {
    Topic   string
    Payload string
}

type MessageQueue struct {
    queue map[string][]string
    lock  sync.RWMutex
}

func NewMessageQueue() *MessageQueue {
    return &MessageQueue{
        queue: make(map[string][]string),
    }
}

实现一个简单的消息队列,我们需要定义两个结构体。一个是Message,表示每个消息的主题和负载。另一个是MessageQueue,它维护了一个字符串切片的字典,用于存储不同主题对应的消息队列。 NewMessageQueue函数用于初始化一个空的MessageQueue实例

实现消息的生产者和消费者


type Producer struct {
    queue *MessageQueue
}

func (p *Producer) Produce(topic string, payload string) error {
    p.queue.lock.Lock()
    defer p.queue.lock.Unlock()

    p.queue.queue[topic] = append(p.queue.queue[topic], payload)
    return nil
}

type Consumer struct {
    queue    *MessageQueue
    topic    string
    consumer string
}

func (c *Consumer) Consume() (string, error) {
    c.queue.lock.Lock()
    defer c.queue.lock.Unlock()

    if len(c.queue.queue[c.topic]) == 0 {
        return "", nil
    }
    payload := c.queue.queue[c.topic][0]
    c.queue.queue[c.topic] = c.queue.queue[c.topic][1:]
    return payload, nil
}

接下来我们需要实现生产者和消费者。Producer结构体中的Produce函数往指定主题的消息队列中追加新的负载。Consumer结构体通过Consume函数从指定主题的消息队列中获取最先加入队列的消息。如果队列中没有消息,Consume函数会直接返回。

使用MQ


func main() {
    mq := NewMessageQueue()
    producer := &Producer{mq}
    consumer1 := &Consumer{mq, "topic1", "consumer1"}
    consumer2 := &Consumer{mq, "topic1", "consumer2"}
    numMessages := 10

    // 生产者生成消息
    for i := 0; i < numMessages; i++ {
        err := producer.Produce("topic1", fmt.Sprintf("Message %d", i))
        if err != nil {
            log.Fatal(err)
        }
    }

    // 消费者读取消息
    for i := 0; i < numMessages; i++ {
        payload, err := consumer1.Consume()
        if err != nil {
            log.Fatal(err)
        }
        fmt.Printf("Consumer 1 got message: %s\n", payload)

        payload, err = consumer2.Consume()
        if err != nil {
            log.Fatal(err)
        }
        fmt.Printf("Consumer 2 got message: %s\n", payload)
    }
}

最后,我们在main函数中创建一个新的MessageQueue、生产者和两个消费者。生产者生成10个带有"Message i"负载的消息并将它们放入名为"topic1"的队列中。两个消费者都尝试读取这个队列,每个消息都会被两个消费者都读取到。