Go快速上手-微服务框架go-micro

go-micro特性

Go Micro是一个流行的微服务架构,是一个插件化的基础框架,基于此可以构建微服务,Micro的设计哲学是可插拔的插件化架构。Go Micro 简单轻巧、易于上手、功能强大、扩展方便,是基于 Go 语言进行微服务架构时非常值得推荐的一个框架。

Go Micro有以下重要特性:

  • 服务发现:自动服务注册和名称解析。服务发现是微服务开发的核心。
  • 负载均衡:基于服务发现构建的客户端负载均衡。一旦我们获得了服务的任意数量实例的地址,我们现在需要一种方法来决定要路由到哪个节点。
  • 消息编码:基于内容类型的动态消息编码。这包括默认的protobuf和json。
  • 请求/响应:基于RPC的请求/响应,支持双向流。
  • Async Messaging:PubSub是异步通信和事件驱动架构的重要设计思想。事件通知是微服务开发的核心模式。
  • 可插拔接口:Go Micro为每个分布式系统抽象使用Go接口,因此,这些接口是可插拔的,并允许Go Micro与运行时无关,可以插入任何基础技术
Go快速上手-微服务框架go-micro插图1

go-micro通信流程

通信的角色一共4个:server,client,register和broker,他们的各种的作用在于:

  1. Server监听客户端的调用,和Broker推送过来的信息进行处理。并且Server端需要向Register注册自己的存在或消亡,这样Client才能知道自己的状态;
  2. Register服务的注册的发现,Client端从Register中得到Server的信息,然后每次调用都根据算法选择一个的Server进行通信,当然通信是要经过编码/解码,选择传输协议等一系列过程;
  3. 如果有需要通知所有的Server端可以使用Broker进行信息的推送,Broker 通过队列进行信息的接收和发布;
Go快速上手-微服务框架go-micro插图3

Go Micro 框架的基础架构如上图所示,由 8 个核心接口组成,每个接口都有默认实现。Go micro 由以下接口列表组成:

  • 最顶层的 Service 接口是构建服务的主要组件,它把底层的各个包需要实现的接口,做了一次封装,包含了一系列用于初始化 Service 和 Client 的方法,使我们可以很简单的创建一个 RPC 服务;
  • server – 用于处理请求和通知。服务器是编写服务的构建基块,内置服务器是 RPC 系统
  • client – 用于高级别请求/响应和通知;
  • broker – 异步消息传递,其实就是一个消息队列系统;
  • config – 用于动态配置的。配置是一个接口, 用于从任意数量的源进行动态配置加载,其实就是一个配置进程/中心;
  • codec – 用于消息编码的(序列化和反序列化)。编解码器用于编码和解码消息, 然后再通过导线传输消息. 数据可能是 json, protobuf, beson, msgpack 等。
  • registry – 服务发现的注册表。注册表提供一种服务发现机制, 用于将名称解析为地址. 它可以由 consul, etcd zookeeper, dns, gossip 等支持. 服务应在启动时使用注册表进行注册, 并在关闭时取消注册。
  • selector – 用于负载平衡。选择器是一个负载平衡抽象, 它建立在注册表上。 户在发出请求时利用选择器. 客户端将使用选择器而不是注册表, 因为它提供了内置的负载平衡机制.
  • store – 用于数据存储。存储是一个简单的键值存储接口, 用于抽象掉轻量级数据存储,仅用于保存简单的状态信息,比如用户的验证状态。
  • transport – 用于同步通信。传输是服务之间同步请求/响应通信的接口. 它类似于 golang 网络包, 但提供了一个更高级别的抽象, 允许我们切换通信机制
Go快速上手-微服务框架go-micro插图5

Go Micro的接口间的关系如上图所示,每个接口都支持业界流行的开源方案,具体如下:

Go快速上手-微服务框架go-micro插图7

go-micro最重要的是service的创建,因为go-micro现在发布了v3版本,V2版本作者表示已经不再维护了,因此这里介绍V2和V3版本的go micro 如何创建一个服务,以及基本的服务管理指令。我们生产环境用的是V2版本,至于V3,看了下网上的资源以及项目中相关的特性介绍,感觉还是处于很初级的状态,如果直接用于生产环境感觉坑会不少,但是本着学习的目的还是想实践一下这个V3版本。另外,go micro的broker机制我们也给了实践例子,体验了go micro异步消息通信机制。

新建服务(基于micro v2)

先安装相关组件

  1. go gethttp://github.com/micro/micro/v3/cmd/protoc-gen-micro@master
  2. go gethttp://github.com/micro/micro/v2

我们先定义一个简单的protobuf协议文件greet.proto,定义的服务名称为Greet。

syntax="proto3";// 指定proto版本// 指定golang包名optiongo_package="pb/proto_demo";serviceGreeter{rpcGreet(Request)returns(Response){}}messageRequest{stringname=1;}messageResponse{stringmsg=1;}

安装之后,利用protoc-gen-go和protoc-gen-micro生成协议go文件。这次一共会生成两个协议文件:greet.pb.go和greet.pb.micro.go。

protoc --micro_out=. --go_out=. ./greet.proto

着重观察下生成的micro版的go协议文件greet.pb.micro.go

// Code generated by protoc-gen-micro. DO NOT EDIT.// source: greet.protopackageproto_demoimport(fmt"fmt"proto"github.com/golang/protobuf/proto"math"math")import(context"context"client"github.com/micro/go-micro/v2/client"server"github.com/micro/go-micro/v2/server"api"github.com/micro/micro/v3/service/api")// Reference imports to suppress errors if they are not otherwise used.var_=proto.Marshalvar_=fmt.Errorfvar_=math.Inf// This is a compile-time assertion to ensure that this generated file// is compatible with the proto package it is being compiled against.// A compilation error at this line likely means your copy of the// proto package needs to be updated.const_=proto.ProtoPackageIsVersion3// please upgrade the proto package// Reference imports to suppress errors if they are not otherwise used.var_api.Endpointvar_context.Contextvar_client.Optionvar_server.Option// Api Endpoints for Greeter servicefuncNewGreeterEndpoints()[]*api.Endpoint{return[]*api.Endpoint{}}// Client API for Greeter servicetypeGreeterServiceinterface{Greet(ctxcontext.Context,in*Request,opts...client.CallOption)(*Response,error)}typegreeterServicestruct{cclient.Clientnamestring}funcNewGreeterService(namestring,cclient.Client)GreeterService{return&greeterService{c:c,name:name,}}func(c*greeterService)Greet(ctxcontext.Context,in*Request,opts...client.CallOption)(*Response,error){req:=c.c.NewRequest(c.name,"Greeter.Greet",in)out:=new(Response)err:=c.c.Call(ctx,req,out,opts...)iferr!=nil{returnnil,err}returnout,nil}// Server API for Greeter servicetypeGreeterHandlerinterface{Greet(context.Context,*Request,*Response)error}funcRegisterGreeterHandler(sserver.Server,hdlrGreeterHandler,opts...server.HandlerOption)error{typegreeterinterface{Greet(ctxcontext.Context,in*Request,out*Response)error}typeGreeterstruct{greeter}h:=&greeterHandler{hdlr}returns.Handle(s.NewHandler(&Greeter{h},opts...))}typegreeterHandlerstruct{GreeterHandler}func(h*greeterHandler)Greet(ctxcontext.Context,in*Request,out*Response)error{returnh.GreeterHandler.Greet(ctx,in,out)}

这个文件里定义了服务结构体GreeterService,RPC函数实现Greet。

现在先编写服务端代码micro_server/main.go

packagemainimport("context""fmt"micro"github.com/micro/go-micro/v2"proto"web_demo/proto/pb/proto_demo")typeGreeterstruct{}func(g*Greeter)Greet(ctxcontext.Context,req*proto.Request,rsp*proto.Response)error{rsp.Msg="Greet "+req.Namereturnnil}funcmain(){// 创建一个新服务service:=micro.NewService(micro.Name("greeter"),)// 服务初始化service.Init()// 注册 handler处理函数proto.RegisterGreeterHandler(service.Server(),new(Greeter))// 启动服务iferr:=service.Run();err!=nil{fmt.Println(err)}}

编写客户端micro_client/main.go,客户端向服务器发起RPC调用。

packagemainimport("context""fmt"micro"github.com/micro/go-micro/v2"proto"web_demo/proto/pb/proto_demo")funcmain(){// 创建新服务service:=micro.NewService(micro.Name("greeter.client"))// 服务初始化service.Init()// 创建RPC的客户端实例greeter:=proto.NewGreeterService("greeter",service.Client())// 发起RPC调用rsp,err:=greeter.Greet(context.TODO(),&proto.Request{Name:"John"})iferr!=nil{fmt.Println(err)}// 打印返回值fmt.Println(rsp.Msg)}

先启动server

unshideMacBook-Pro:micro_server junshili$ go run main.go 
2021-05-07 00:50:11  file=v2@v2.9.1/service.go:200 level=info Starting [service] greeter
2021-05-07 00:50:11  file=grpc/grpc.go:864 level=info Server [grpc] Listening on [::]:56014
2021-05-07 00:50:11  file=grpc/grpc.go:697 level=info Registry [mdns] Registering node: greeter-147bbaba-7f7e-46af-bb5e-81d311da0d1a

当然,我们也可以指定端口启动服务,这个参数传入的端口是优先于代码设置的

go run main.go --server_address :8088

我们可以从服务器启动时打印的这些日志得到一些信息:

  1. RPC通信框架用的是gRPC,这是go-mico默认的;
  2. 我们启动的这个服务的名字叫greeter;
  3. 服务监听的端口是56014;
  4. 服务发现用的组件是mdns,这是go-mirco默认的;

我们也可以指定端口号来启动服务

junshideMacBook-Pro:micro_server junshili$ go run main.go --server_address :8088
2021-05-07 01:38:25  file=v2@v2.9.1/service.go:200 level=info Starting [service] greeter
2021-05-07 01:38:25  file=grpc/grpc.go:864 level=info Server [grpc] Listening on [::]:8088

再启动client发起RPC调用

junshideMacBook-Pro:micro_client junshili$ go run main.go 
Greet John

当我kill掉server时,会打印以下日志,表明:1.服务从服务发现组件注销注册了;2.Broker和该服务断开了连接。

2021-05-07 01:32:38  file=grpc/grpc.go:791 level=info Deregistering node: greeter-5e32cdd2-4099-4ee9-b2df-bdeee2cd5ff4
2021-05-07 01:32:38  file=grpc/grpc.go:959 level=info Broker [http] Disconnected from 127.0.0.1:0

注意,使用micro/v2时,protoc3生成micro.protoc文件可能会导致版本冲突,在go run main.go时会报错:

使用micro/v2时,protoc3生成micro.protoc文件导致的版本冲突
cannot use service.Server() (type
github.com/micro/go-micro/v2/server.Server) as type
github.com/micro/micro/v3/service/server

解决方案:可将生成的*.pb.micro.go文件中的v3依赖改为v2依赖即可

import (     
     context "context"   
    client "github.com/micro/go-micro/v2/client"
     server "github.com/micro/go-micro/v2/server"
    api "github.com/micro/micro/v3/service/api" 
 )

新建服务 (基于micro v3)

现在go micro已经推出了V3版本,因为V3和V2版本有较多的不同,且存在兼容性不足的问题。下面的所有例子都是基于V3的实践。

首先给出官方的上手文档:https://micro.mu/getting-started

首先安装/升级自己的micro版本至V3

go get github.com/micro/micro/v3

安装完之后,启动micro相关的服务进程

junshideMacBook-Pro:~ junshili$ micro server
2021-05-09 00:32:01  file=server/server.go:86 level=info Starting server
2021-05-09 00:32:01  file=server/server.go:114 level=info Registering network
2021-05-09 00:32:01  file=server/server.go:114 level=info Registering runtime
2021-05-09 00:32:01  file=server/server.go:114 level=info Registering registry
2021-05-09 00:32:01  file=server/server.go:114 level=info Registering config
2021-05-09 00:32:01  file=server/server.go:114 level=info Registering store
2021-05-09 00:32:01  file=server/server.go:114 level=info Registering broker
2021-05-09 00:32:01  file=server/server.go:114 level=info Registering events
2021-05-09 00:32:01  file=server/server.go:114 level=info Registering auth
2021-05-09 00:32:01  file=server/server.go:114 level=info Registering proxy
2021-05-09 00:32:01  file=server/server.go:114 level=info Registering api
2021-05-09 00:32:01  file=server/server.go:201 level=info Starting server runtime
2021-05-09 00:32:01  file=service/service.go:195 level=info Starting [service] server
2021-05-09 00:32:01  file=grpc/grpc.go:939 level=info Server [grpc] Listening on [::]:10001
2021-05-09 00:32:01  file=grpc/grpc.go:769 level=info Registry [mdns] Registering node: server-fc07e464-99c4-406d-b608-b54ef3c9bdde

可以注意到,registry,auth等Micro组件都启动起来了。与V2版本不一样的是,接下来需要登录账号,做身份验证。这一步很重要,不然没有登录,后续操作micro会提示权限不足。username固定为admin,password固定为micro。

$ micro login
Enter username: admin
Enter password:
Successfully logged in.

查看micro框架下跑着哪些服务,可以看到这些服务都是micro框架自带的,在我们执行micro server时启动。

junshideMacBook-Pro:~ junshili$ micro services
api
auth
broker
config
events
network
proxy
registry
runtime
server
store

现在我们打算启动一个自己的服务,micro v3提供了一个非常好用的服务生成工具,可以帮我们直接生成服务模板,我们只需要在模板上增添自己的内容就可以了,生产效率大幅提升。使用micro new 服务名就可以生成模板代码。服务名不要使用下划线

junshideMacBook-Pro:~ junshili$ micro new hellodemo
Creating service hellodemo

.
├── micro.mu
├── main.go
├── generate.go
├── handler
│   └── hellodemo.go
├── proto
│   └── hellodemo.proto
├── Dockerfile
├── Makefile
├── README.md
├── .gitignore
└── go.mod

进入项目目录,编译协议

junshideMacBook-Pro:hellodemo junshili$ make proto
protoc --proto_path=. --micro_out=. --go_out=:. proto/hellodemo.proto

在proto目录下自动生成了pb.go和.pb.micro.go的文件。

../hellodemo/
├── Dockerfile
├── Makefile
├── README.md
├── generate.go
├── go.mod
├── handler
│   └── hellodemo.go
├── main.go
├── micro.mu
└── proto
    ├── hellodemo.pb.go
    ├── hellodemo.pb.micro.go
    └── hellodemo.proto

启动我们的服务

junshideMacBook-Pro:hellodemo junshili$ micro run .

查看我们的服务是否正常启动

junshideMacBook-Pro:hellodemo junshili$ micro services
api
auth
broker
config
events
hellodemo
network
proxy
registry
runtime
server
store

junshideMacBook-Pro:hellodemo junshili$ micro status
NAME        VERSION SOURCE              STATUS  BUILD   UPDATED     METADATA
hellodemo    latest  /Users/junshili/hellodemo   running n/a 1m58s ago   owner=admin, group=micro

可以看到,我们的服务正常运行在micro框架之内。现在我们要基于这个服务模板做点自己的修改。

现在协议文件上增加rpc 函数和消息结构体。

serviceHellodemo{rpcCall(Request)returns(Response){}rpcStream(StreamingRequest)returns(streamStreamingResponse){}rpcPingPong(streamPing)returns(streamPong){}rpcGreet(GreetReq)returns(GreetRsp){}}messageGreetReq{stringname=1;int32age=2;}messageGreetRsp{stringmsg=1;int32status=2;}

再到在handler/hellodemo.go这个文件里,加一个自己的Greet的实现。

func(e*Hellodemo)Greet(ctxcontext.Context,req*hellodemo.GreetReq,rsp*hellodemo.GreetRsp)error{log.Info("Received Hellodemo.Greet request")rsp.Msg=fmt.Sprintf("Greet %s, your age is %d",req.Name,req.Age)rsp.Status=200returnnil}

我们的功能代码已经完成编写,准备把服务更新一下,因为服务已经在run了,所以我们直接使用micro update .把服务更出去,类似于我们常用的热更,不停服更新。然后用micro logs hellodemo看下输出日志是否正常。确认启动正常后,我们可以使用micro框架cli给该服务直接发送请求,测试服务可用性,而无须再写测试client。

先用help指令查看hellodemo对外提供了哪些可调用的方法。

junshideMacBook-Pro:hellodemo junshili$ micro  hellodemo  --help
NAME:
    micro hellodemo

VERSION:
    latest

USAGE:
    micro hellodemo [command]

COMMANDS:
    call
    greet
    pingPong
    stream

我们测试下call和greet方法。

junshideMacBook-Pro:hellodemo junshili$ micro  hellodemo  call  --name=James
{
    "msg": "Hello James"
}
junshideMacBook-Pro:hellodemo junshili$ micro  hellodemo  greet --name=James --age=20
{
    "msg": "Greet James, your age is 20",
    "status": 200
}

我们同样可以通过http的方式访问请求RPC,中介是micro的api服务,请求会通过api服务,再调到我们制定的服务。默认API的address是127.0.0.1:8080。

junshideMacBook-Pro:helloworld junshili$ curl -XPOST --header "Content-Type: application/json" -d {"name":"Joe", "age":30} http://127.0.0.1:8080/hellodemo/greet
{"msg":"Greet Joe, your age is 30","status":200}

搭建Http服务

上面介绍了搭建RPC服务的流程,如果要搭建HTTP服务,其实跟上面流程一样,区别只在于在main.go中调用http相关相关处理即可,比如我们使用gin来实现我们的http服务。

第一步先micro new helloweb,创建服务模板,然后我们之间在main.go添加http处理的相关代码:

packagemainimport("net/http""github.com/gin-gonic/gin")funcHelloWeb(c*gin.Context){c.String(http.StatusOK,"Hello, Gon")}funcHiWeb(c*gin.Context){c.String(http.StatusOK,"Hi, Gon")}funcmain(){// 使用gin作为路由r:=gin.Default()r.GET("/hello",func(c*gin.Context){HelloWeb(c)})r.POST("/hi",func(c*gin.Context){HiWeb(c)})r.Run(":9999")//  listen and serve on 0.0.0.0:9999}

请求和响应如下,如果想要构建更复杂的http项目,请参考我的上一篇文章:Go快速上手—Web服务器篇.

junshideMacBook-Pro:blog junshili$ curl -X POST http://127.0.0.1:9999/hi
Hi, Go
junshideMacBook-Pro:blog junshili$ curl http://127.0.0.1:9999/hello
Hello, Go

需要kill掉这个服务,需要使用

micro kill example

发布订阅(基于消息队列的异步通信)

发布订阅模式在后台服务中广泛使用,go micro框架中,Broker服务就是用于支持发布订阅模式(pub/sub)。发布订阅模式,我们一般使用消息队列作为中间件实现异步通信,从而做到系统解耦、削峰稳流。在go micro框架中,Broker就是负责消息队列这个功能,消息生产者负责生产消息,推送给Broker,消息消费者server向Broker订阅指定类型的消息(我们称为topic),即Broker注册自己的消息处理,一旦有消息到来,则调用响应的消息处理逻辑。Micro的Broker默认使用了Nats作为消息队列,同时他也支持业界常用的消息队列,比如Kafka,RabbitMQ等,因此我们可以根据我们的需求选择消息队列组件作为Broker的底层支持。

Micro内置的Pub/Sub模式很简单易用,用户只需要定义好publisher, subscriber以及消息内容,其他工作都将由框架实现。这里给出micor pub/sub的实践案例。

Broker、publisher、subscriber三者通信的消息结构体为Message,Message结构体的定义如下。

type Message struct {
    Header map[string]string
    Body   []byte
}

消息订阅者的实现消息订阅如下:

  1. subscrber先连接上broker,broker.Connect()
  2. 对指定的topic进行注册,broker.Subscribe。特殊地,如果是点对点通信,那需要在,broker.Subscribe的第三个参数加上broker.Queue(topic)。如果是发布订阅模式则不需要。
  3. 实现消息处理函数,传入broker.Subscribe第二个参数。
packagemainimport("fmt""github.com/micro/micro/v3/service""github.com/micro/micro/v3/service/logger""github.com/micro/go-micro/broker")vartopic1="topic1"vartopic2="topic2"funchandleEvent(bbroker.Event)error{msg:=string(b.Message().Body)logger.Infof("[sub] recieve message: %s, header: %sn",msg,b.Message().Header)returnnil}// 点对点消息通信,消息不会复制,只会被一个消费者消费funcsubTopicQ(topicstring){_,err:=broker.Subscribe(topic,handleEvent,broker.Queue(topic))iferr!=nil{fmt.Println(err)}}// 发布订阅模式消息通信,消息会复制,可以被多个消费者消费funcsubTopic(topicstring){_,err:=broker.Subscribe(topic,handleEvent)iferr!=nil{fmt.Println(err)}}funcmain(){// Create servicesrv:=service.New(service.Name("hellosubscriber"),service.Version("latest"),)srv.Init()iferr:=broker.Connect();err!=nil{logger.Error("Broker Connect error: ",err)}subTopic(topic1)subTopicQ(topic2)// Run serviceiferr:=srv.Run();err!=nil{logger.Fatal(err)}}

消息发布的实现消息发布的步骤如下:

  1. publisher先连接上broker,broker.Connect()
  2. broker.Publish(topic, msg) 直接对指定topic发送消息。
packagemainimport("fmt""github.com/micro/micro/v3/service""github.com/micro/micro/v3/service/logger""github.com/micro/go-micro/broker""time""github.com/pborman/uuid")vartopic1="topic1"vartopic2="topic2"funchandleEvent(bbroker.Event)error{msg:=string(b.Message().Body)logger.Infof("[sub] recieve message: %s, header: %sn",msg,b.Message().Header)returnnil}// 发布消息funcpubTopic(topicstring){forrangetime.Tick(time.Second){msg:=&broker.Message{Header:map[string]string{"id":uuid.NewUUID().String(),},Body:[]byte(fmt.Sprintf("Messaging you all day on %s",topic)),}iferr:=broker.Publish(topic,msg);err!=nil{logger.Error("Broker Publish error: ",err)}else{logger.Infof("Broker Publish topic:%s msg: %s",topic,msg)}}}funcmain(){// Create servicesrv:=service.New(service.Name("hellopublisher"),service.Version("latest"),)srv.Init()iferr:=broker.Connect();err!=nil{logger.Error("Broker Connect error: ",err)}gopubTopic(topic1)gopubTopic(topic2)// Run serviceiferr:=srv.Run();err!=nil{logger.Fatal(err)}}

实践过程:

  1. 订阅服务,我起了两个,命名为sub1,sub2
  2. 发布服务,我起了pub,pub里两个协程向topic1和topic2发送消息
micro run --name sub1 .
micro run --name sub2 .
micro run --name pub .

unshideMacBook-Pro:hellosubcriber junshili$ micro status 
NAME    VERSION SOURCE              STATUS  BUILD   UPDATED     METADATA
pub    latest  /Users/junshili/hellopublisher  running n/a 1m48s ago   owner=admin, group=micro
sub1    latest  /Users/junshili/hellosubcriber  running n/a 13s ago     owner=admin, group=micro
sub2    latest  /Users/junshili/hellosubcriber  running n/a 6s ago      owner=admin, group=micro




junshideMacBook-Pro:hellosubcriber junshili$ micro stats --all custom
SERVICE    hellopublisher

VERSION    latest

NODE                                                   ADDRESS:PORT           STARTED            UPTIME    MEMORY    THREADS    GC
hellopublisher-a5c3156e-cb26-40af-b899-092195a49fd4    192.168.1.101:54250    May 13 00:46:28    2m5s      2.88mb    43         10.670273ms

SERVICE    hellosubscriber

VERSION    latest

NODE                                                    ADDRESS:PORT           STARTED            UPTIME    MEMORY    THREADS    GC
hellosubscriber-7035f6b4-ebd8-4dbf-bb10-eb49cf7ca34c    192.168.1.101:54578    May 13 00:48:13    21s       2.54mb    33         114.399µs

hellosubscriber-aa456e83-c035-43dc-9a71-a459c2c058c3    192.168.1.101:54572    May 13 00:48:13    21s    2.63mb    33    121.528µs

我们使用micro logs -f helloworld或者micro logs helloworld查看服务输出的日志。

先观察pub服务的日志,pub在向发送topic1和topic2消息

2021-05-13 00:46:29  file=blob-940620946/main.go:34 level=info Broker Publish topic:topic2 msg: &{map[id:99b30102-b341-11eb-b5c8-acde48001122] Messaging you all day on topic2}
2021-05-13 00:46:29  file=blob-940620946/main.go:34 level=info Broker Publish topic:topic1 msg: &{map[id:99b30184-b341-11eb-b5c8-acde48001122] Messaging you all day on topic1}
2021-05-13 00:46:30  file=blob-940620946/main.go:34 level=info Broker Publish topic:topic2 msg: &{map[id:9a4af598-b341-11eb-b5c8-acde48001122] Messaging you all day on topic2}
2021-05-13 00:46:30  file=blob-940620946/main.go:34 level=info Broker Publish topic:topic1 msg: &{map[id:9a4af692-b341-11eb-b5c8-acde48001122] Messaging you all day on topic1}
2021-05-13 00:46:31  file=blob-940620946/main.go:34 level=info Broker Publish topic:topic1 msg: &{map[id:9ae409d6-b341-11eb-b5c8-acde48001122] Messaging you all day on topic1}
2021-05-13 00:46:31  file=blob-940620946/main.go:34 level=info Broker Publish topic:topic2 msg: &{map[id:9ae40ae4-b341-11eb-b5c8-acde48001122] Messaging you all day on topic2}
2021-05-13 00:46:32  file=blob-940620946/main.go:34 level=info Broker Publish topic:topic2 msg: &{map[id:9b7ca894-b341-11eb-b5c8-acde48001122] Messaging you all day on topic2}
2021-05-13 00:46:32  file=blob-940620946/main.go:34 level=info Broker Publish topic:topic1 msg: &{map[id:9b7ca7ea-b341-11eb-b5c8-acde48001122] Messaging you all day on topic1}
2021-05-13 00:46:33  file=blob-940620946/main.go:34 level=info Broker Publish topic:topic1 msg: &{map[id:9c152dc6-b341-11eb-b5c8-acde48001122] Messaging you all day on topic1}
2021-05-13 00:46:33  file=blob-940620946/main.go:34 level=info Broker Publish topic:topic2 msg: &{map[id:9c152a88-b341-11eb-b5c8-acde48001122] Messaging you all day on topic2}

观察sub1的日志

2021-05-13 00:50:14  file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic2, header: map[id:1fcede3c-b342-11eb-b5c8-acde48001122]

2021-05-13 00:50:14  file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:1fcedfb8-b342-11eb-b5c8-acde48001122]

2021-05-13 00:50:15  file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic2, header: map[id:2067d614-b342-11eb-b5c8-acde48001122]

2021-05-13 00:50:15  file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:2067d876-b342-11eb-b5c8-acde48001122]

2021-05-13 00:50:16  file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic2, header: map[id:20ffd928-b342-11eb-b5c8-acde48001122]

2021-05-13 00:50:16  file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:20ffd6c6-b342-11eb-b5c8-acde48001122]

2021-05-13 00:50:17  file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic2, header: map[id:2198c3d6-b342-11eb-b5c8-acde48001122]

2021-05-13 00:50:17  file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:2198c2e6-b342-11eb-b5c8-acde48001122]

2021-05-13 00:50:18  file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic2, header: map[id:22317f90-b342-11eb-b5c8-acde48001122]

2021-05-13 00:50:18  file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:22318634-b342-11eb-b5c8-acde48001122]

2021-05-13 00:50:19  file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:22ca283a-b342-11eb-b5c8-acde48001122]

2021-05-13 00:50:19  file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic2, header: map[id:22ca25b0-b342-11eb-b5c8-acde48001122]

2021-05-13 00:50:20  file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:23628e5e-b342-11eb-b5c8-acde48001122]

观察sub2的日志

2021-05-13 00:50:14  file=blob-694627220/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:1fcedfb8-b342-11eb-b5c8-acde48001122]

2021-05-13 00:50:15  file=blob-694627220/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:2067d876-b342-11eb-b5c8-acde48001122]

2021-05-13 00:50:16  file=blob-694627220/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:20ffd6c6-b342-11eb-b5c8-acde48001122]

2021-05-13 00:50:17  file=blob-694627220/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:2198c2e6-b342-11eb-b5c8-acde48001122]

2021-05-13 00:50:18  file=blob-694627220/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:22318634-b342-11eb-b5c8-acde48001122]

2021-05-13 00:50:19  file=blob-694627220/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:22ca283a-b342-11eb-b5c8-acde48001122]

2021-05-13 00:50:20  file=blob-694627220/main.go:15 level=info [sub] recieve message: Messaging you all day on topic2, header: map[id:23628d1e-b342-11eb-b5c8-acde48001122]

2021-05-13 00:50:20  file=blob-694627220/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:23628e5e-b342-11eb-b5c8-acde48001122]

2021-05-13 00:50:21  file=blob-694627220/main.go:15 level=info [sub] recieve message: Messaging you all day on topic2, header: map[id:23fb6cc8-b342-11eb-b5c8-acde48001122]

2021-05-13 00:50:21  file=blob-694627220/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:23fb6d90-b342-11eb-b5c8-acde48001122]

2021-05-13 00:50:22  file=blob-694627220/main.go:15 level=info [sub] recieve message: Messaging you all day on topic2, header: map[id:24940438-b342-11eb-b5c8-acde48001122]

2021-05-13 00:50:22  file=blob-694627220/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:2494015e-b342-11eb-b5c8-acde48001122]

对比sub1和sub2的日志可以看出,sub1和sub2都能收到topic1的消息,证实topic1可以被多个订阅者消费,符合发布订阅模式。至于topic2,只能被一个消费者消费,不存在一条消息被多个消费者消费的情况,对应的是点对点消息队列通信机制。因此,需要区分这两种通信机制的go micro写法。

go micro默认的消息队列组件是自己实现的,实际上其实就是一个map,topic是map的key,发布消息时就往map里存,然后broker调度给订阅了该topic的服务推送。生产环境请更换为kafka,rabbitmq这类专用消息队列。

https://github.com/micro/micro/blob/master/service/broker/memory/memory.go

typememoryBrokerstruct{optsbroker.Optionsaddrstringsync.RWMutexconnectedboolSubscribersmap[string][]*memorySubscriber}typememorySubscriberstruct{idstringtopicstringexitchanboolhandlerbroker.Handleroptsbroker.SubscribeOptions}

go micro broker支持的消息队列在以下链接可以获取:https://github.com/microhq/go-plugins/tree/master/broker

原创文章 Go快速上手-微服务框架go-micro,版权所有
如若转载,请注明出处:https://www.itxiaozhan.cn/20229447.html

发表评论

您的电子邮箱地址不会被公开。 必填项已用*标注