c语言编程笔录

首页 >   > 笔记大全

笔记大全

Go语言中的消息队列和分布式系统

更新时间:2023-07-13

第一段:消息队列的概念和应用场景

消息队列是一种解耦的通信机制,被广泛应用于异步处理、削峰填谷、任务分发等场景。在Go语言中,使用RabbitMQ作为消息队列是一个常见的选择。以下是一个简单的RabbitMQ示例:

package main

import (
	"fmt"
	"log"

	"github.com/streadway/amqp"
)

func main() {
	conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
	if err != nil {
		log.Fatal(err)
	}
	defer conn.Close()

	ch, err := conn.Channel()
	if err != nil {
		log.Fatal(err)
	}
	defer ch.Close()

	q, err := ch.QueueDeclare("hello", false, false, false, false, nil)
	if err != nil {
		log.Fatal(err)
	}

	msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil)
	if err != nil {
		log.Fatal(err)
	}

	forever := make(chan bool)

	go func() {
		for d := range msgs {
			log.Printf("Received a message: %s", d.Body)
		}
	}()

	fmt.Println("Waiting for messages. To exit press CTRL+C")
	<-forever
}

这个示例中,我们定义了一个名为“hello”的队列,并通过“ch.Consume”在后台接收队列中的消息。

第二段:分布式系统的概念和实践

分布式系统是由多个独立计算机组成的系统,并通过网络互相通信和传输数据。在Go语言中,可以使用ETCD等分布式存储来协调不同的组件和服务。以下是一个简单的ETCD示例:

package main

import (
	"context"
	"log"
	"time"

	"go.etcd.io/etcd/clientv3"
)

func main() {
	cli, err := clientv3.New(clientv3.Config{
		Endpoints:   []string{"localhost:2379", "localhost:22379", "localhost:32379"},
		DialTimeout: 5 * time.Second,
	})
	if err != nil {
		log.Fatal(err)
	}
	defer cli.Close()

	ctx, cancel := context.WithTimeout(context.Background(), time.Second)
	_, err = cli.Put(ctx, "sample_key", "sample_value")
	cancel()
	if err != nil {
		log.Fatal(err)
	}

	ctx, cancel = context.WithTimeout(context.Background(), time.Second)
	getResp, err := cli.Get(ctx, "sample_key")
	cancel()

	if err != nil {
		log.Fatal(err)
	}

	for _, kv := range getResp.Kvs {
		log.Printf("%s : %s\n", string(kv.Key), string(kv.Value))
	}
}

这个示例中,我们测试了ETCD的GetObject和PutObject方法,将一个键-值对写入ETCD中并读取之后输出到控制台。

第三段:消息队列和分布式系统的结合实践

将消息队列和分布式系统结合起来可以实现更为复杂的场景,例如分布式任务的协调。以下是一个简单的Go语言示例,展示如何使用RabbitMQ和ETCD协调工作进程:

// 生产者
func NewTask() {
    task := "do_something"
    err := WriteTaskToQueue(task)
    if err != nil {
        log.Fatal(err)
    }
}

func WriteTaskToQueue(task string) error {
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
    if err != nil {
        return err
    }
    defer conn.Close()

    ch, err := conn.Channel()
    if err != nil {
        return err
    }
    defer ch.Close()

    q, err := ch.QueueDeclare("task_queue", true, false, false, false, nil)
    if err != nil {
        return err
    }

    message := amqp.Publishing{
        DeliveryMode: amqp.Persistent,
        ContentType: "text/plain",
        Body: []byte(task),
    }

    ch.Publish("", q.Name, false, false, message)
    return nil
}

// 消费者

func StartWorker() {
    cli, err := clientv3.New(clientv3.Config{
        Endpoints: []string{"localhost:2379", "localhost:22379", "localhost:32379"},
        DialTimeout: 5*time.Second,
    })
    if err != nil {
        log.Fatal(err)
    }
    defer cli.Close()

    ctx, cancel := context.WithTimeout(context.Background(), time.Second)
    defer cancel()

    _, err = cli.Grant(ctx, 10)
    if err != nil {
        log.Fatal(err)
    }

    ch, err := conn.Channel()
    if err != nil {
        return err
    }
    defer ch.Close()

    q, err := ch.QueueDeclare("task_queue", true, false, false, false, nil)
    if err != nil {
        return err
    }

    msgs, err := ch.Consume(q.Name, "", false, false, false, false, nil)
    if err != nil {
        return err
    }

    for msg := range msgs {
        task := string(msg.Body)
        log.Printf("Received task: %s", task)

        // 处理任务...

        // 完成任务
        ctx, cancel := context.WithTimeout(context.Background(), time.Second)
        resp, err := cli.Txn(ctx).
            If(clientv3.Compare(clientv3.Version("/tasks/"+task), "=", 1)).
            Then(clientv3.OpDelete("/tasks/"+task)).
            Commit()
        cancel()

        if err != nil {
            log.Fatal(err)
        }
        if !resp.Succeeded {
            log.Printf("Task was already completed by another worker.")
        }

        msg.Ack(false)
    }
}

该示例中,生产者将任务写入队列,消费者从队列中读取任务并在ETCD中临时记录该任务,以保证任务在分布式环境下只会被执行一次。

第四段:总结

消息队列和分布式系统是大型应用程序中不可或缺的一部分。在Go语言中,使用RabbitMQ和ETCD等工具可以轻松实现这些机制。需要强调的是,在使用这些机制时,需要仔细考虑各个组件之间的交互方式以及网络延迟等因素。