这篇文章主要讲解了“micro微服务基础组件的组织方式是什么”,文中的讲解内容简单清晰,易于学习与理解,下面请大家跟着小编的思路慢慢深入,一起来研究和学习“micro微服务基础组件的组织方式是什么”吧!
micro是go语言实现的一个微服务框架,该框架自身实现了为服务常见的几大要素,网关,代理,注册中心,消息传递,也支持可插拔扩展。本本通过micro中的一个核心对象展开去探讨这个项目是如何实现这些组件并将其组织在一起工作的。
Notice: go代码有时候比较繁琐,截取源码的时候会删除部分不影响思想的代码会标记为...
micro通过micro.NewService创建一个服务实例,所有的微服务实例(包括网关,代理等等)需要通过这个实例来与其他服务打交道。
type service struct {
opts Options
once sync.Once
}
代码很少,但是micro中所有微服务要素最后都会汇聚在这个类型上了,因为option包罗万象(^^).
type Options struct {
Broker broker.Broker
Cmd cmd.Cmd
Client client.Client
Server server.Server
Registry registry.Registry
Transport transport.Transport
// Before and After funcs
BeforeStart []func() error
BeforeStop []func() error
AfterStart []func() error
AfterStop []func() error
// Other options for implementations of the interface
// can be stored in a context
Context context.Context
}
这里可以看到微服务的常见组件了,当micro.Server初始化的时候会给这个对象设置上对应的组件,组件的设置方式包括默认值,cli指定,env读取。
func (s *service) Init(opts ...Option) {
// process options
for _, o := range opts {
o(&s.opts)
}
s.once.Do(func() {
// Initialise the command flags, overriding new service
_ = s.opts.Cmd.Init(
cmd.Broker(&s.opts.Broker),
cmd.Registry(&s.opts.Registry),
cmd.Transport(&s.opts.Transport),
cmd.Client(&s.opts.Client),
cmd.Server(&s.opts.Server),
)
})
}
Server是最终的行为实体,监听端口并提供业务服务,注册本地服务到注册中心。 Broker异步消息,mq等方法可以替换这个类型 Client服务调用客户端 Registry 注册中心 Cmd cli客户端 Transport 类似与socket,消息同步通信,服务监听等
目前支持的服务类型有rpc,grpc。服务中如果存在两种不同的rpc协议,消息投递的时候会进行协议转换。这里详细说下默认的rpc服务。 rpc服务基于HTTP POST协议,服务启动的时候会尝试连接Broker,然后注册本服务到注册中心,最后监听服务端口.简单提一句这里是如何做到协议转换,如果http过来的消息要投递到一个grpc协议服务上,需要在Content-Type设置对应目的服务协议类型application/grpc,SerConn这里读取在Content-Type在获取对应的Codec进行协议转换,最后投递到对应服务,服务的返回内容也同样要处理下再返回。
type Service interface {
Init(...Option)
Options() Options
Client() client.Client
Server() server.Server
Run() error
String() string
}
rpc server
func (s *rpcServer) Start() error {
...
ts, err := config.Transport.Listen(config.Address)
// swap address
...
// connect to the broker
if err := config.Broker.Connect(); err != nil {
return err
}
...
// use RegisterCheck func before register
if err = s.opts.RegisterCheck(s.opts.Context); err != nil {
log.Logf("Server %s-%s register check error: %s", config.Name, config.Id, err)
} else {
// announce self to the world
if err = s.Register(); err != nil {
log.Logf("Server %s-%s register error: %s", config.Name, config.Id, err)
}
}
...
go func() {
for {
// listen for connections
err := ts.Accept(s.ServeConn)
...
}
}()
..
return nil
}
协议转换
func (s *rpcServer) ServeConn(sock transport.Socket) {
...
for {
var msg transport.Message
if err := sock.Recv(&msg); err != nil {
return
}
...
// we use this Content-Type header to identify the codec needed
ct := msg.Header["Content-Type"]
// strip our headers
hdr := make(map[string]string)
for k, v := range msg.Header {
hdr[k] = v
}
// set local/remote ips
hdr["Local"] = sock.Local()
hdr["Remote"] = sock.Remote()
...
// TODO: needs better error handling
var err error
if cf, err = s.newCodec(ct); err != nil { //请求协议转换器
...返回错误
return
}
...
rcodec := newRpcCodec(&msg, sock, cf) //返回转换器
// internal request
request := &rpcRequest{
service: getHeader("Micro-Service", msg.Header),
method: getHeader("Micro-Method", msg.Header),
endpoint: getHeader("Micro-Endpoint", msg.Header),
contentType: ct,
codec: rcodec,
header: msg.Header,
body: msg.Body,
socket: sock,
stream: true,
}
// internal response
response := &rpcResponse{
header: make(map[string]string),
socket: sock,
codec: rcodec,
}
// set router
r := Router(s.router)
...
// serve the actual request using the request router
if err := r.ServeRequest(ctx, request, response); err != nil {
// write an error response
err = rcodec.Write(&codec.Message{
Header: msg.Header,
Error: err.Error(),
Type: codec.Error,
}, nil)
// could not write the error response
if err != nil {
log.Logf("rpc: unable to write error response: %v", err)
}
if s.wg != nil {
s.wg.Done()
}
return
}
...
}
}
注册中心包括服务发现和服务注册。micro中每个注册中心类型都要实现注册中心接口
type Registry interface {
Init(...Option) error
Options() Options
Register(*Service, ...RegisterOption) error
Deregister(*Service) error
GetService(string) ([]*Service, error)
ListServices() ([]*Service, error)
Watch(...WatchOption) (Watcher, error)
String() string
}
默认的注册中心是mdns,该程序会在本地监听一个组播地址接收网络中所有及其广播的信息,同时发送的信息也能被所有其他机器发现。每当程序启动时都会广播自己的服务信息,其他节点收到该信息后添加到自己的服务列表里面,服务关闭时会发出关闭信息。mdns自身不具备健康检查,熔断等功能,其出发点也仅是便于测试使用,因而不推荐在生产环境中使用。
查找服务需要根据url或者content信息来获取服务名称,在通过服务名称到注册中心查找,获取到服务后,随机一个节点投递
type Resolver interface {
Resolve(r *http.Request) (*Endpoint, error)
String() string
}
//默认的api resolve实例,除此之外还有host,path,grpc 三种resolve,可以根据需求在启动程序的时候指定或者设置在环境变量里面
//
func (r *Resolver) Resolve(req *http.Request) (*resolver.Endpoint, error) {
var name, method string
switch r.Options.Handler {
// internal handlers
case "meta", "api", "rpc", "micro":
///foo/bar/zool => go.micro.api.foo 方法:Bar.Zool
///foo/bar => go.micro.api.foo 方法:Foo.Bar
name, method = apiRoute(req.URL.Path)
default:
//如果handler是web会走到这里
///foo/bar/zool => go.micro.api.foo method bar/zool
// 1/foo/bar/ => go.micro.api.1.foo method bar
method = req.Method
name = proxyRoute(req.URL.Path)
}
return &resolver.Endpoint{
Name: name,
Method: method,
}, nil
}
代码中定义了一个plugin,然而这个plugin并非是引入组件的作用,其作用在于在请求的管道中加一层,这样比较容易添加新的逻辑进去。然而真正需要着重提到的是micro引入新组件的方式,micro service的几个重要成员都有其对应的接口规约,只要正确实现了接口就可以轻松的接入新的组件。这里用一个引入kubernetes作为注册中心的例子。
kubernetes组件位于github.com\micro\go-plugins中
go get -u github.com\micro\go-plugins\kubernetes
由于在go中无法实现java/c#中包动态加载功能,语言本身不提供全局类型,函数扫描。往往只能通过一些曲折的办法来实现。而micro是通过在入口文件中导入包,利用init函数在启动时将需要的功能组件写入到一个map里面。 在import中加入插件
_ "github.com/micro/go-plugins/registry/kubernetes"
micro api --registry=kubernetes --registry_address=yourAddress
工作原理: 在github.com\micro\go-micro\config\cmd\cmd.go中有一个map用于保存所有的注册中心创建函数
DefaultRegistries = map[string]func(...registry.Option) registry.Registry{
"consul": consul.NewRegistry,
"gossip": gossip.NewRegistry,
"mdns": mdns.NewRegistry,
"memory": rmem.NewRegistry,
}
kubernetes会在包的init中写入对应的创建函数
func init() {
cmd.DefaultRegistries["kubernetes"] = NewRegistry
}
生效的位置
if name := ctx.String("registry"); len(name) > 0 && (*c.opts.Registry).String() != name {
r, ok := c.opts.Registries[name]
if !ok {
return fmt.Errorf("Registry %s not found", name)
}
*c.opts.Registry = r()
serverOpts = append(serverOpts, server.Registry(*c.opts.Registry))
clientOpts = append(clientOpts, client.Registry(*c.opts.Registry))
if err := (*c.opts.Selector).Init(selector.Registry(*c.opts.Registry)); err != nil {
log.Fatalf("Error configuring registry: %v", err)
}
clientOpts = append(clientOpts, client.Selector(*c.opts.Selector))
if err := (*c.opts.Broker).Init(broker.Registry(*c.opts.Registry)); err != nil {
log.Fatalf("Error configuring broker: %v", err)
}
}
最后附上一张图说明下这个核心的对象的引用关系,组件引用那一块只是画了注册中心的,Broker,Server也都是类似的原理。
感谢各位的阅读,以上就是“micro微服务基础组件的组织方式是什么”的内容了,经过本文的学习后,相信大家对micro微服务基础组件的组织方式是什么这一问题有了更深刻的体会,具体使用情况还需要大家实践验证。这里是亿速云,小编将为大家推送更多相关知识点的文章,欢迎关注!
亿速云「云服务器」,即开即用、新一代英特尔至强铂金CPU、三副本存储NVMe SSD云盘,价格低至29元/月。点击查看>>
免责声明:本站发布的内容(图片、视频和文字)以原创、转载和分享为主,文章观点不代表本网站立场,如果涉及侵权请联系站长邮箱:is@yisu.com进行举报,并提供相关证据,一经查实,将立刻删除涉嫌侵权内容。
原文链接:https://my.oschina.net/hunjixin/blog/3076782