golang实现mq
更新时间:2023-11-12什么是MQ?
MQ(Message Queue)是一种为分布式应用设计的中间件。它通过异步和并发执行,解耦不同的组件,提高应用程序的性能和可靠性。MQ可以基于内存、文件系统、数据库等不同的存储介质来实现消息的持久化和传输。常见的MQ有Kafka、RabbitMQ和ActiveMQ等。
如何使用Golang实现MQ?
Golang是一种高效、安全、并发的编程语言。使用Golang实现MQ可以利用其高性能和并发特性,实现轻松的水平扩展,以及高吞吐量的消息传输。下面我们将主要介绍如何使用Golang实现基于内存的MQ。
type Message struct {
Topic string
Payload string
}
type MessageQueue struct {
queue map[string][]string
lock sync.RWMutex
}
func NewMessageQueue() *MessageQueue {
return &MessageQueue{
queue: make(map[string][]string),
}
}
实现一个简单的消息队列,我们需要定义两个结构体。一个是Message,表示每个消息的主题和负载。另一个是MessageQueue,它维护了一个字符串切片的字典,用于存储不同主题对应的消息队列。 NewMessageQueue函数用于初始化一个空的MessageQueue实例
实现消息的生产者和消费者
type Producer struct {
queue *MessageQueue
}
func (p *Producer) Produce(topic string, payload string) error {
p.queue.lock.Lock()
defer p.queue.lock.Unlock()
p.queue.queue[topic] = append(p.queue.queue[topic], payload)
return nil
}
type Consumer struct {
queue *MessageQueue
topic string
consumer string
}
func (c *Consumer) Consume() (string, error) {
c.queue.lock.Lock()
defer c.queue.lock.Unlock()
if len(c.queue.queue[c.topic]) == 0 {
return "", nil
}
payload := c.queue.queue[c.topic][0]
c.queue.queue[c.topic] = c.queue.queue[c.topic][1:]
return payload, nil
}
接下来我们需要实现生产者和消费者。Producer结构体中的Produce函数往指定主题的消息队列中追加新的负载。Consumer结构体通过Consume函数从指定主题的消息队列中获取最先加入队列的消息。如果队列中没有消息,Consume函数会直接返回。
使用MQ
func main() {
mq := NewMessageQueue()
producer := &Producer{mq}
consumer1 := &Consumer{mq, "topic1", "consumer1"}
consumer2 := &Consumer{mq, "topic1", "consumer2"}
numMessages := 10
// 生产者生成消息
for i := 0; i < numMessages; i++ {
err := producer.Produce("topic1", fmt.Sprintf("Message %d", i))
if err != nil {
log.Fatal(err)
}
}
// 消费者读取消息
for i := 0; i < numMessages; i++ {
payload, err := consumer1.Consume()
if err != nil {
log.Fatal(err)
}
fmt.Printf("Consumer 1 got message: %s\n", payload)
payload, err = consumer2.Consume()
if err != nil {
log.Fatal(err)
}
fmt.Printf("Consumer 2 got message: %s\n", payload)
}
}
最后,我们在main函数中创建一个新的MessageQueue、生产者和两个消费者。生产者生成10个带有"Message i"负载的消息并将它们放入名为"topic1"的队列中。两个消费者都尝试读取这个队列,每个消息都会被两个消费者都读取到。