go-micro特性
Go Micro是一个流行的微服务架构,是一个插件化的基础框架,基于此可以构建微服务,Micro的设计哲学是可插拔的插件化架构。Go Micro 简单轻巧、易于上手、功能强大、扩展方便,是基于 Go 语言进行微服务架构时非常值得推荐的一个框架。
Go Micro有以下重要特性:
- 服务发现:自动服务注册和名称解析。服务发现是微服务开发的核心。
- 负载均衡:基于服务发现构建的客户端负载均衡。一旦我们获得了服务的任意数量实例的地址,我们现在需要一种方法来决定要路由到哪个节点。
- 消息编码:基于内容类型的动态消息编码。这包括默认的protobuf和json。
- 请求/响应:基于RPC的请求/响应,支持双向流。
- Async Messaging:PubSub是异步通信和事件驱动架构的重要设计思想。事件通知是微服务开发的核心模式。
- 可插拔接口:Go Micro为每个分布式系统抽象使用Go接口,因此,这些接口是可插拔的,并允许Go Micro与运行时无关,可以插入任何基础技术

go-micro通信流程
通信的角色一共4个:server,client,register和broker,他们的各种的作用在于:
- Server监听客户端的调用,和Broker推送过来的信息进行处理。并且Server端需要向Register注册自己的存在或消亡,这样Client才能知道自己的状态;
- Register服务的注册的发现,Client端从Register中得到Server的信息,然后每次调用都根据算法选择一个的Server进行通信,当然通信是要经过编码/解码,选择传输协议等一系列过程;
- 如果有需要通知所有的Server端可以使用Broker进行信息的推送,Broker 通过队列进行信息的接收和发布;

Go Micro 框架的基础架构如上图所示,由 8 个核心接口组成,每个接口都有默认实现。Go micro 由以下接口列表组成:
- 最顶层的 Service 接口是构建服务的主要组件,它把底层的各个包需要实现的接口,做了一次封装,包含了一系列用于初始化 Service 和 Client 的方法,使我们可以很简单的创建一个 RPC 服务;
- server – 用于处理请求和通知。服务器是编写服务的构建基块,内置服务器是 RPC 系统
- client – 用于高级别请求/响应和通知;
- broker – 异步消息传递,其实就是一个消息队列系统;
- config – 用于动态配置的。配置是一个接口, 用于从任意数量的源进行动态配置加载,其实就是一个配置进程/中心;
- codec – 用于消息编码的(序列化和反序列化)。编解码器用于编码和解码消息, 然后再通过导线传输消息. 数据可能是 json, protobuf, beson, msgpack 等。
- registry – 服务发现的注册表。注册表提供一种服务发现机制, 用于将名称解析为地址. 它可以由 consul, etcd zookeeper, dns, gossip 等支持. 服务应在启动时使用注册表进行注册, 并在关闭时取消注册。
- selector – 用于负载平衡。选择器是一个负载平衡抽象, 它建立在注册表上。 户在发出请求时利用选择器. 客户端将使用选择器而不是注册表, 因为它提供了内置的负载平衡机制.
- store – 用于数据存储。存储是一个简单的键值存储接口, 用于抽象掉轻量级数据存储,仅用于保存简单的状态信息,比如用户的验证状态。
- transport – 用于同步通信。传输是服务之间同步请求/响应通信的接口. 它类似于 golang 网络包, 但提供了一个更高级别的抽象, 允许我们切换通信机制

Go Micro的接口间的关系如上图所示,每个接口都支持业界流行的开源方案,具体如下:

go-micro最重要的是service的创建,因为go-micro现在发布了v3版本,V2版本作者表示已经不再维护了,因此这里介绍V2和V3版本的go micro 如何创建一个服务,以及基本的服务管理指令。我们生产环境用的是V2版本,至于V3,看了下网上的资源以及项目中相关的特性介绍,感觉还是处于很初级的状态,如果直接用于生产环境感觉坑会不少,但是本着学习的目的还是想实践一下这个V3版本。另外,go micro的broker机制我们也给了实践例子,体验了go micro异步消息通信机制。
新建服务(基于micro v2)
先安装相关组件
- go gethttp://github.com/micro/micro/v3/cmd/protoc-gen-micro@master
- go gethttp://github.com/micro/micro/v2
我们先定义一个简单的protobuf协议文件greet.proto,定义的服务名称为Greet。
syntax="proto3";// 指定proto版本// 指定golang包名optiongo_package="pb/proto_demo";serviceGreeter{rpcGreet(Request)returns(Response){}}messageRequest{stringname=1;}messageResponse{stringmsg=1;}
安装之后,利用protoc-gen-go和protoc-gen-micro生成协议go文件。这次一共会生成两个协议文件:greet.pb.go和greet.pb.micro.go。
protoc --micro_out=. --go_out=. ./greet.proto
着重观察下生成的micro版的go协议文件greet.pb.micro.go
// Code generated by protoc-gen-micro. DO NOT EDIT.// source: greet.protopackageproto_demoimport(fmt"fmt"proto"github.com/golang/protobuf/proto"math"math")import(context"context"client"github.com/micro/go-micro/v2/client"server"github.com/micro/go-micro/v2/server"api"github.com/micro/micro/v3/service/api")// Reference imports to suppress errors if they are not otherwise used.var_=proto.Marshalvar_=fmt.Errorfvar_=math.Inf// This is a compile-time assertion to ensure that this generated file// is compatible with the proto package it is being compiled against.// A compilation error at this line likely means your copy of the// proto package needs to be updated.const_=proto.ProtoPackageIsVersion3// please upgrade the proto package// Reference imports to suppress errors if they are not otherwise used.var_api.Endpointvar_context.Contextvar_client.Optionvar_server.Option// Api Endpoints for Greeter servicefuncNewGreeterEndpoints()[]*api.Endpoint{return[]*api.Endpoint{}}// Client API for Greeter servicetypeGreeterServiceinterface{Greet(ctxcontext.Context,in*Request,opts...client.CallOption)(*Response,error)}typegreeterServicestruct{cclient.Clientnamestring}funcNewGreeterService(namestring,cclient.Client)GreeterService{return&greeterService{c:c,name:name,}}func(c*greeterService)Greet(ctxcontext.Context,in*Request,opts...client.CallOption)(*Response,error){req:=c.c.NewRequest(c.name,"Greeter.Greet",in)out:=new(Response)err:=c.c.Call(ctx,req,out,opts...)iferr!=nil{returnnil,err}returnout,nil}// Server API for Greeter servicetypeGreeterHandlerinterface{Greet(context.Context,*Request,*Response)error}funcRegisterGreeterHandler(sserver.Server,hdlrGreeterHandler,opts...server.HandlerOption)error{typegreeterinterface{Greet(ctxcontext.Context,in*Request,out*Response)error}typeGreeterstruct{greeter}h:=&greeterHandler{hdlr}returns.Handle(s.NewHandler(&Greeter{h},opts...))}typegreeterHandlerstruct{GreeterHandler}func(h*greeterHandler)Greet(ctxcontext.Context,in*Request,out*Response)error{returnh.GreeterHandler.Greet(ctx,in,out)}
这个文件里定义了服务结构体GreeterService,RPC函数实现Greet。
现在先编写服务端代码micro_server/main.go
packagemainimport("context""fmt"micro"github.com/micro/go-micro/v2"proto"web_demo/proto/pb/proto_demo")typeGreeterstruct{}func(g*Greeter)Greet(ctxcontext.Context,req*proto.Request,rsp*proto.Response)error{rsp.Msg="Greet "+req.Namereturnnil}funcmain(){// 创建一个新服务service:=micro.NewService(micro.Name("greeter"),)// 服务初始化service.Init()// 注册 handler处理函数proto.RegisterGreeterHandler(service.Server(),new(Greeter))// 启动服务iferr:=service.Run();err!=nil{fmt.Println(err)}}
编写客户端micro_client/main.go,客户端向服务器发起RPC调用。
packagemainimport("context""fmt"micro"github.com/micro/go-micro/v2"proto"web_demo/proto/pb/proto_demo")funcmain(){// 创建新服务service:=micro.NewService(micro.Name("greeter.client"))// 服务初始化service.Init()// 创建RPC的客户端实例greeter:=proto.NewGreeterService("greeter",service.Client())// 发起RPC调用rsp,err:=greeter.Greet(context.TODO(),&proto.Request{Name:"John"})iferr!=nil{fmt.Println(err)}// 打印返回值fmt.Println(rsp.Msg)}
先启动server
unshideMacBook-Pro:micro_server junshili$ go run main.go
2021-05-07 00:50:11 file=v2@v2.9.1/service.go:200 level=info Starting [service] greeter
2021-05-07 00:50:11 file=grpc/grpc.go:864 level=info Server [grpc] Listening on [::]:56014
2021-05-07 00:50:11 file=grpc/grpc.go:697 level=info Registry [mdns] Registering node: greeter-147bbaba-7f7e-46af-bb5e-81d311da0d1a
当然,我们也可以指定端口启动服务,这个参数传入的端口是优先于代码设置的
go run main.go --server_address :8088
我们可以从服务器启动时打印的这些日志得到一些信息:
- RPC通信框架用的是gRPC,这是go-mico默认的;
- 我们启动的这个服务的名字叫greeter;
- 服务监听的端口是56014;
- 服务发现用的组件是mdns,这是go-mirco默认的;
我们也可以指定端口号来启动服务
junshideMacBook-Pro:micro_server junshili$ go run main.go --server_address :8088
2021-05-07 01:38:25 file=v2@v2.9.1/service.go:200 level=info Starting [service] greeter
2021-05-07 01:38:25 file=grpc/grpc.go:864 level=info Server [grpc] Listening on [::]:8088
再启动client发起RPC调用
junshideMacBook-Pro:micro_client junshili$ go run main.go
Greet John
当我kill掉server时,会打印以下日志,表明:1.服务从服务发现组件注销注册了;2.Broker和该服务断开了连接。
2021-05-07 01:32:38 file=grpc/grpc.go:791 level=info Deregistering node: greeter-5e32cdd2-4099-4ee9-b2df-bdeee2cd5ff4
2021-05-07 01:32:38 file=grpc/grpc.go:959 level=info Broker [http] Disconnected from 127.0.0.1:0
注意,使用micro/v2时,protoc3生成micro.protoc文件可能会导致版本冲突,在go run main.go时会报错:
使用micro/v2时,protoc3生成micro.protoc文件导致的版本冲突
cannot use service.Server() (type
github.com/micro/go-micro/v2/server.Server) as type
github.com/micro/micro/v3/service/server
解决方案:可将生成的*.pb.micro.go文件中的v3依赖改为v2依赖即可
import (
context "context"
client "github.com/micro/go-micro/v2/client"
server "github.com/micro/go-micro/v2/server"
api "github.com/micro/micro/v3/service/api"
)
新建服务 (基于micro v3)
现在go micro已经推出了V3版本,因为V3和V2版本有较多的不同,且存在兼容性不足的问题。下面的所有例子都是基于V3的实践。
首先给出官方的上手文档:https://micro.mu/getting-started
首先安装/升级自己的micro版本至V3
go get github.com/micro/micro/v3
安装完之后,启动micro相关的服务进程
junshideMacBook-Pro:~ junshili$ micro server
2021-05-09 00:32:01 file=server/server.go:86 level=info Starting server
2021-05-09 00:32:01 file=server/server.go:114 level=info Registering network
2021-05-09 00:32:01 file=server/server.go:114 level=info Registering runtime
2021-05-09 00:32:01 file=server/server.go:114 level=info Registering registry
2021-05-09 00:32:01 file=server/server.go:114 level=info Registering config
2021-05-09 00:32:01 file=server/server.go:114 level=info Registering store
2021-05-09 00:32:01 file=server/server.go:114 level=info Registering broker
2021-05-09 00:32:01 file=server/server.go:114 level=info Registering events
2021-05-09 00:32:01 file=server/server.go:114 level=info Registering auth
2021-05-09 00:32:01 file=server/server.go:114 level=info Registering proxy
2021-05-09 00:32:01 file=server/server.go:114 level=info Registering api
2021-05-09 00:32:01 file=server/server.go:201 level=info Starting server runtime
2021-05-09 00:32:01 file=service/service.go:195 level=info Starting [service] server
2021-05-09 00:32:01 file=grpc/grpc.go:939 level=info Server [grpc] Listening on [::]:10001
2021-05-09 00:32:01 file=grpc/grpc.go:769 level=info Registry [mdns] Registering node: server-fc07e464-99c4-406d-b608-b54ef3c9bdde
可以注意到,registry,auth等Micro组件都启动起来了。与V2版本不一样的是,接下来需要登录账号,做身份验证。这一步很重要,不然没有登录,后续操作micro会提示权限不足。username固定为admin,password固定为micro。
$ micro login
Enter username: admin
Enter password:
Successfully logged in.
查看micro框架下跑着哪些服务,可以看到这些服务都是micro框架自带的,在我们执行micro server时启动。
junshideMacBook-Pro:~ junshili$ micro services
api
auth
broker
config
events
network
proxy
registry
runtime
server
store
现在我们打算启动一个自己的服务,micro v3提供了一个非常好用的服务生成工具,可以帮我们直接生成服务模板,我们只需要在模板上增添自己的内容就可以了,生产效率大幅提升。使用micro new 服务名就可以生成模板代码。服务名不要使用下划线。
junshideMacBook-Pro:~ junshili$ micro new hellodemo
Creating service hellodemo
.
├── micro.mu
├── main.go
├── generate.go
├── handler
│ └── hellodemo.go
├── proto
│ └── hellodemo.proto
├── Dockerfile
├── Makefile
├── README.md
├── .gitignore
└── go.mod
进入项目目录,编译协议
junshideMacBook-Pro:hellodemo junshili$ make proto
protoc --proto_path=. --micro_out=. --go_out=:. proto/hellodemo.proto
在proto目录下自动生成了pb.go和.pb.micro.go的文件。
../hellodemo/
├── Dockerfile
├── Makefile
├── README.md
├── generate.go
├── go.mod
├── handler
│ └── hellodemo.go
├── main.go
├── micro.mu
└── proto
├── hellodemo.pb.go
├── hellodemo.pb.micro.go
└── hellodemo.proto
启动我们的服务
junshideMacBook-Pro:hellodemo junshili$ micro run .
查看我们的服务是否正常启动
junshideMacBook-Pro:hellodemo junshili$ micro services
api
auth
broker
config
events
hellodemo
network
proxy
registry
runtime
server
store
junshideMacBook-Pro:hellodemo junshili$ micro status
NAME VERSION SOURCE STATUS BUILD UPDATED METADATA
hellodemo latest /Users/junshili/hellodemo running n/a 1m58s ago owner=admin, group=micro
可以看到,我们的服务正常运行在micro框架之内。现在我们要基于这个服务模板做点自己的修改。
现在协议文件上增加rpc 函数和消息结构体。
serviceHellodemo{rpcCall(Request)returns(Response){}rpcStream(StreamingRequest)returns(streamStreamingResponse){}rpcPingPong(streamPing)returns(streamPong){}rpcGreet(GreetReq)returns(GreetRsp){}}messageGreetReq{stringname=1;int32age=2;}messageGreetRsp{stringmsg=1;int32status=2;}
再到在handler/hellodemo.go这个文件里,加一个自己的Greet的实现。
func(e*Hellodemo)Greet(ctxcontext.Context,req*hellodemo.GreetReq,rsp*hellodemo.GreetRsp)error{log.Info("Received Hellodemo.Greet request")rsp.Msg=fmt.Sprintf("Greet %s, your age is %d",req.Name,req.Age)rsp.Status=200returnnil}
我们的功能代码已经完成编写,准备把服务更新一下,因为服务已经在run了,所以我们直接使用micro update .
把服务更出去,类似于我们常用的热更,不停服更新。然后用micro logs hellodemo
看下输出日志是否正常。确认启动正常后,我们可以使用micro框架cli给该服务直接发送请求,测试服务可用性,而无须再写测试client。
先用help指令查看hellodemo对外提供了哪些可调用的方法。
junshideMacBook-Pro:hellodemo junshili$ micro hellodemo --help
NAME:
micro hellodemo
VERSION:
latest
USAGE:
micro hellodemo [command]
COMMANDS:
call
greet
pingPong
stream
我们测试下call和greet方法。
junshideMacBook-Pro:hellodemo junshili$ micro hellodemo call --name=James
{
"msg": "Hello James"
}
junshideMacBook-Pro:hellodemo junshili$ micro hellodemo greet --name=James --age=20
{
"msg": "Greet James, your age is 20",
"status": 200
}
我们同样可以通过http的方式访问请求RPC,中介是micro的api服务,请求会通过api服务,再调到我们制定的服务。默认API的address是127.0.0.1:8080。
junshideMacBook-Pro:helloworld junshili$ curl -XPOST --header "Content-Type: application/json" -d {"name":"Joe", "age":30} http://127.0.0.1:8080/hellodemo/greet
{"msg":"Greet Joe, your age is 30","status":200}
搭建Http服务
上面介绍了搭建RPC服务的流程,如果要搭建HTTP服务,其实跟上面流程一样,区别只在于在main.go中调用http相关相关处理即可,比如我们使用gin来实现我们的http服务。
第一步先micro new helloweb,创建服务模板,然后我们之间在main.go添加http处理的相关代码:
packagemainimport("net/http""github.com/gin-gonic/gin")funcHelloWeb(c*gin.Context){c.String(http.StatusOK,"Hello, Gon")}funcHiWeb(c*gin.Context){c.String(http.StatusOK,"Hi, Gon")}funcmain(){// 使用gin作为路由r:=gin.Default()r.GET("/hello",func(c*gin.Context){HelloWeb(c)})r.POST("/hi",func(c*gin.Context){HiWeb(c)})r.Run(":9999")// listen and serve on 0.0.0.0:9999}
请求和响应如下,如果想要构建更复杂的http项目,请参考我的上一篇文章:Go快速上手—Web服务器篇.
junshideMacBook-Pro:blog junshili$ curl -X POST http://127.0.0.1:9999/hi
Hi, Go
junshideMacBook-Pro:blog junshili$ curl http://127.0.0.1:9999/hello
Hello, Go
需要kill掉这个服务,需要使用
micro kill example
发布订阅(基于消息队列的异步通信)
发布订阅模式在后台服务中广泛使用,go micro框架中,Broker服务就是用于支持发布订阅模式(pub/sub)。发布订阅模式,我们一般使用消息队列作为中间件实现异步通信,从而做到系统解耦、削峰稳流。在go micro框架中,Broker就是负责消息队列这个功能,消息生产者负责生产消息,推送给Broker,消息消费者server向Broker订阅指定类型的消息(我们称为topic),即Broker注册自己的消息处理,一旦有消息到来,则调用响应的消息处理逻辑。Micro的Broker默认使用了Nats作为消息队列,同时他也支持业界常用的消息队列,比如Kafka,RabbitMQ等,因此我们可以根据我们的需求选择消息队列组件作为Broker的底层支持。
Micro内置的Pub/Sub模式很简单易用,用户只需要定义好publisher, subscriber以及消息内容,其他工作都将由框架实现。这里给出micor pub/sub的实践案例。
Broker、publisher、subscriber三者通信的消息结构体为Message,Message结构体的定义如下。
type Message struct {
Header map[string]string
Body []byte
}
消息订阅者的实现消息订阅如下:
- subscrber先连接上broker,broker.Connect()
- 对指定的topic进行注册,broker.Subscribe。特殊地,如果是点对点通信,那需要在,broker.Subscribe的第三个参数加上broker.Queue(topic)。如果是发布订阅模式则不需要。
- 实现消息处理函数,传入broker.Subscribe第二个参数。
packagemainimport("fmt""github.com/micro/micro/v3/service""github.com/micro/micro/v3/service/logger""github.com/micro/go-micro/broker")vartopic1="topic1"vartopic2="topic2"funchandleEvent(bbroker.Event)error{msg:=string(b.Message().Body)logger.Infof("[sub] recieve message: %s, header: %sn",msg,b.Message().Header)returnnil}// 点对点消息通信,消息不会复制,只会被一个消费者消费funcsubTopicQ(topicstring){_,err:=broker.Subscribe(topic,handleEvent,broker.Queue(topic))iferr!=nil{fmt.Println(err)}}// 发布订阅模式消息通信,消息会复制,可以被多个消费者消费funcsubTopic(topicstring){_,err:=broker.Subscribe(topic,handleEvent)iferr!=nil{fmt.Println(err)}}funcmain(){// Create servicesrv:=service.New(service.Name("hellosubscriber"),service.Version("latest"),)srv.Init()iferr:=broker.Connect();err!=nil{logger.Error("Broker Connect error: ",err)}subTopic(topic1)subTopicQ(topic2)// Run serviceiferr:=srv.Run();err!=nil{logger.Fatal(err)}}
消息发布的实现消息发布的步骤如下:
- publisher先连接上broker,broker.Connect()
- broker.Publish(topic, msg) 直接对指定topic发送消息。
packagemainimport("fmt""github.com/micro/micro/v3/service""github.com/micro/micro/v3/service/logger""github.com/micro/go-micro/broker""time""github.com/pborman/uuid")vartopic1="topic1"vartopic2="topic2"funchandleEvent(bbroker.Event)error{msg:=string(b.Message().Body)logger.Infof("[sub] recieve message: %s, header: %sn",msg,b.Message().Header)returnnil}// 发布消息funcpubTopic(topicstring){forrangetime.Tick(time.Second){msg:=&broker.Message{Header:map[string]string{"id":uuid.NewUUID().String(),},Body:[]byte(fmt.Sprintf("Messaging you all day on %s",topic)),}iferr:=broker.Publish(topic,msg);err!=nil{logger.Error("Broker Publish error: ",err)}else{logger.Infof("Broker Publish topic:%s msg: %s",topic,msg)}}}funcmain(){// Create servicesrv:=service.New(service.Name("hellopublisher"),service.Version("latest"),)srv.Init()iferr:=broker.Connect();err!=nil{logger.Error("Broker Connect error: ",err)}gopubTopic(topic1)gopubTopic(topic2)// Run serviceiferr:=srv.Run();err!=nil{logger.Fatal(err)}}
实践过程:
- 订阅服务,我起了两个,命名为sub1,sub2
- 发布服务,我起了pub,pub里两个协程向topic1和topic2发送消息
micro run --name sub1 .
micro run --name sub2 .
micro run --name pub .
unshideMacBook-Pro:hellosubcriber junshili$ micro status
NAME VERSION SOURCE STATUS BUILD UPDATED METADATA
pub latest /Users/junshili/hellopublisher running n/a 1m48s ago owner=admin, group=micro
sub1 latest /Users/junshili/hellosubcriber running n/a 13s ago owner=admin, group=micro
sub2 latest /Users/junshili/hellosubcriber running n/a 6s ago owner=admin, group=micro
junshideMacBook-Pro:hellosubcriber junshili$ micro stats --all custom
SERVICE hellopublisher
VERSION latest
NODE ADDRESS:PORT STARTED UPTIME MEMORY THREADS GC
hellopublisher-a5c3156e-cb26-40af-b899-092195a49fd4 192.168.1.101:54250 May 13 00:46:28 2m5s 2.88mb 43 10.670273ms
SERVICE hellosubscriber
VERSION latest
NODE ADDRESS:PORT STARTED UPTIME MEMORY THREADS GC
hellosubscriber-7035f6b4-ebd8-4dbf-bb10-eb49cf7ca34c 192.168.1.101:54578 May 13 00:48:13 21s 2.54mb 33 114.399µs
hellosubscriber-aa456e83-c035-43dc-9a71-a459c2c058c3 192.168.1.101:54572 May 13 00:48:13 21s 2.63mb 33 121.528µs
我们使用micro logs -f helloworld
或者micro logs helloworld
查看服务输出的日志。
先观察pub服务的日志,pub在向发送topic1和topic2消息
2021-05-13 00:46:29 file=blob-940620946/main.go:34 level=info Broker Publish topic:topic2 msg: &{map[id:99b30102-b341-11eb-b5c8-acde48001122] Messaging you all day on topic2}
2021-05-13 00:46:29 file=blob-940620946/main.go:34 level=info Broker Publish topic:topic1 msg: &{map[id:99b30184-b341-11eb-b5c8-acde48001122] Messaging you all day on topic1}
2021-05-13 00:46:30 file=blob-940620946/main.go:34 level=info Broker Publish topic:topic2 msg: &{map[id:9a4af598-b341-11eb-b5c8-acde48001122] Messaging you all day on topic2}
2021-05-13 00:46:30 file=blob-940620946/main.go:34 level=info Broker Publish topic:topic1 msg: &{map[id:9a4af692-b341-11eb-b5c8-acde48001122] Messaging you all day on topic1}
2021-05-13 00:46:31 file=blob-940620946/main.go:34 level=info Broker Publish topic:topic1 msg: &{map[id:9ae409d6-b341-11eb-b5c8-acde48001122] Messaging you all day on topic1}
2021-05-13 00:46:31 file=blob-940620946/main.go:34 level=info Broker Publish topic:topic2 msg: &{map[id:9ae40ae4-b341-11eb-b5c8-acde48001122] Messaging you all day on topic2}
2021-05-13 00:46:32 file=blob-940620946/main.go:34 level=info Broker Publish topic:topic2 msg: &{map[id:9b7ca894-b341-11eb-b5c8-acde48001122] Messaging you all day on topic2}
2021-05-13 00:46:32 file=blob-940620946/main.go:34 level=info Broker Publish topic:topic1 msg: &{map[id:9b7ca7ea-b341-11eb-b5c8-acde48001122] Messaging you all day on topic1}
2021-05-13 00:46:33 file=blob-940620946/main.go:34 level=info Broker Publish topic:topic1 msg: &{map[id:9c152dc6-b341-11eb-b5c8-acde48001122] Messaging you all day on topic1}
2021-05-13 00:46:33 file=blob-940620946/main.go:34 level=info Broker Publish topic:topic2 msg: &{map[id:9c152a88-b341-11eb-b5c8-acde48001122] Messaging you all day on topic2}
观察sub1的日志
2021-05-13 00:50:14 file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic2, header: map[id:1fcede3c-b342-11eb-b5c8-acde48001122]
2021-05-13 00:50:14 file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:1fcedfb8-b342-11eb-b5c8-acde48001122]
2021-05-13 00:50:15 file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic2, header: map[id:2067d614-b342-11eb-b5c8-acde48001122]
2021-05-13 00:50:15 file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:2067d876-b342-11eb-b5c8-acde48001122]
2021-05-13 00:50:16 file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic2, header: map[id:20ffd928-b342-11eb-b5c8-acde48001122]
2021-05-13 00:50:16 file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:20ffd6c6-b342-11eb-b5c8-acde48001122]
2021-05-13 00:50:17 file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic2, header: map[id:2198c3d6-b342-11eb-b5c8-acde48001122]
2021-05-13 00:50:17 file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:2198c2e6-b342-11eb-b5c8-acde48001122]
2021-05-13 00:50:18 file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic2, header: map[id:22317f90-b342-11eb-b5c8-acde48001122]
2021-05-13 00:50:18 file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:22318634-b342-11eb-b5c8-acde48001122]
2021-05-13 00:50:19 file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:22ca283a-b342-11eb-b5c8-acde48001122]
2021-05-13 00:50:19 file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic2, header: map[id:22ca25b0-b342-11eb-b5c8-acde48001122]
2021-05-13 00:50:20 file=blob-924495561/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:23628e5e-b342-11eb-b5c8-acde48001122]
观察sub2的日志
2021-05-13 00:50:14 file=blob-694627220/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:1fcedfb8-b342-11eb-b5c8-acde48001122]
2021-05-13 00:50:15 file=blob-694627220/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:2067d876-b342-11eb-b5c8-acde48001122]
2021-05-13 00:50:16 file=blob-694627220/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:20ffd6c6-b342-11eb-b5c8-acde48001122]
2021-05-13 00:50:17 file=blob-694627220/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:2198c2e6-b342-11eb-b5c8-acde48001122]
2021-05-13 00:50:18 file=blob-694627220/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:22318634-b342-11eb-b5c8-acde48001122]
2021-05-13 00:50:19 file=blob-694627220/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:22ca283a-b342-11eb-b5c8-acde48001122]
2021-05-13 00:50:20 file=blob-694627220/main.go:15 level=info [sub] recieve message: Messaging you all day on topic2, header: map[id:23628d1e-b342-11eb-b5c8-acde48001122]
2021-05-13 00:50:20 file=blob-694627220/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:23628e5e-b342-11eb-b5c8-acde48001122]
2021-05-13 00:50:21 file=blob-694627220/main.go:15 level=info [sub] recieve message: Messaging you all day on topic2, header: map[id:23fb6cc8-b342-11eb-b5c8-acde48001122]
2021-05-13 00:50:21 file=blob-694627220/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:23fb6d90-b342-11eb-b5c8-acde48001122]
2021-05-13 00:50:22 file=blob-694627220/main.go:15 level=info [sub] recieve message: Messaging you all day on topic2, header: map[id:24940438-b342-11eb-b5c8-acde48001122]
2021-05-13 00:50:22 file=blob-694627220/main.go:15 level=info [sub] recieve message: Messaging you all day on topic1, header: map[id:2494015e-b342-11eb-b5c8-acde48001122]
对比sub1和sub2的日志可以看出,sub1和sub2都能收到topic1的消息,证实topic1可以被多个订阅者消费,符合发布订阅模式。至于topic2,只能被一个消费者消费,不存在一条消息被多个消费者消费的情况,对应的是点对点消息队列通信机制。因此,需要区分这两种通信机制的go micro写法。
go micro默认的消息队列组件是自己实现的,实际上其实就是一个map,topic是map的key,发布消息时就往map里存,然后broker调度给订阅了该topic的服务推送。生产环境请更换为kafka,rabbitmq这类专用消息队列。
https://github.com/micro/micro/blob/master/service/broker/memory/memory.go
typememoryBrokerstruct{optsbroker.Optionsaddrstringsync.RWMutexconnectedboolSubscribersmap[string][]*memorySubscriber}typememorySubscriberstruct{idstringtopicstringexitchanboolhandlerbroker.Handleroptsbroker.SubscribeOptions}
go micro broker支持的消息队列在以下链接可以获取:https://github.com/microhq/go-plugins/tree/master/broker
原创文章 Go快速上手-微服务框架go-micro,版权所有
如若转载,请注明出处:https://www.itxiaozhan.cn/20229447.html