内容提要
通过本文您将 get 如下知识:
-
微服务为什么引入服务注册发现 -
服务注册中心设计原理
-
Golang 代码实现服务注册中心
为什么引入服务注册发现
(服务域名配置模式)
(服务注册发现模式)
注册中心实现原理
设计思想
首先进行功能需求分析,作为服务注册中心,要实现如下基本功能:
-
服务注册:接受来自服务提交的注册信息,并保存起来 -
服务下线:接受服务的主动下线请求,并将服务从注册信息表中删除
-
服务获取:调用方从注册中心拉取服务信息
-
服务续约:服务健康检查,服务通过心跳保持(主动续约)告知注册中心服务可用
-
服务剔除:注册中心将长时间不续约的服务实例从注册信息表中删除
构造注册表
type Registry struct {
apps map[string]*Application
lock sync.RWMutex
}
-
apps 记录应用服务 Application 的信息,使用 map 结构,key 为应用服务的唯一标识,值为应用服务结构类型 -
lock 读写锁,保障并发读写安全
type Application struct {
appid string
instances map[string]*Instance
latestTimestamp int64
lock sync.RWMutex
}
-
appid 记录应用服务唯一标识 -
lock 读写锁,保障并发读写安全
-
latestTimestamp 记录更新时间
-
instances 记录服务实例 Instance 的信息,使用 map 结构,key 为实例的 hostname (唯一标识),值为实例结构类型
type Instance struct {
Env string `json:"env"`
AppId string `json:"appid"`
Hostname string `json:"hostname"`
Addrs []string `json:"addrs"`
Version string `json:"version"`
Status uint32 `json:"status"`
RegTimestamp int64 `json:"reg_timestamp"`
UpTimestamp int64 `json:"up_timestamp"`
RenewTimestamp int64 `json:"renew_timestamp"`
DirtyTimestamp int64 `json:"dirty_timestamp"`
LatestTimestamp int64 `json:"latest_timestamp"`
}
-
Env 服务环境标识,如 online、dev、test -
AppId 应用服务的唯一标识
-
Hostname 服务实例的唯一标识
-
Addrs 服务实例的地址,可以是 http 或 rpc 地址,多个地址可以维护数组
-
Version 服务实例版本
-
Status 服务实例状态,用于控制上下线
-
xxTimestamp 依次记录服务实例注册时间戳,上线时间戳,最近续约时间戳,脏时间戳(后面解释),最后更新时间戳
服务注册
功能目标:接受来自服务提交的注册信息,并保存到注册表中。先初始化注册表 NewRegistry() ,根据提交信息构建实例 NewInstance(),然后进行注册写入。
func NewRegistry() *Registry {
registry := &Registry{
apps: make(map[string]*Application),
}
return registry
}
func NewInstance(req *RequestRegister) *Instance {
now := time.Now().UnixNano()
instance := &Instance{
Env: req.Env,
AppId: req.AppId,
Hostname: req.Hostname,
Addrs: req.Addrs,
Version: req.Version,
Status: req.Status,
RegTimestamp: now,
UpTimestamp: now,
RenewTimestamp: now,
DirtyTimestamp: now,
LatestTimestamp: now,
}
return instance
}
r := NewRegistry()
instance := NewInstance(&req)
r.Register(instance, req.LatestTimestamp)
注册时,先从 apps 中查找是否已注册过,根据唯一标识 key = appid + env 确定。如果没有注册过,先新建应用 app,然后将 instance 加入到 app 中,最后 app 放入注册表中。这里分别使用了读锁和写锁,保障数据安全同时,尽量减少锁时间和锁抢占影响。
func (r *Registry) Register(instance *Instance, latestTimestamp int64) (*Application, *errcode.Error) {
key := getKey(instance.AppId, instance.Env)
r.lock.RLock()
app, ok := r.apps[key]
r.lock.RUnlock()
if !ok { //new app
app = NewApplication(instance.AppId)
}
//add instance
_, isNew := app.AddInstance(instance, latestTimestamp)
if isNew { //todo }
//add into registry apps
r.lock.Lock()
r.apps[key] = app
r.lock.Unlock()
return app, nil
}
新建应用服务 app,初始化 instances
func NewApplication(appid string) *Application {
return &Application{
appid: appid,
instances: make(map[string]*Instance),
}
}
将服务主机实例 instance 加入应用 app 中,注意判断是否已存在,存在根据脏时间戳 DirtyTimestamp 比对,是否进行替换,添加实例信息,更新最新时间 latestTimestamp ,并返回实例。
func (app *Application) AddInstance(in *Instance, latestTimestamp int64) (*Instance, bool) {
app.lock.Lock()
defer app.lock.Unlock()
appIns, ok := app.instances[in.Hostname]
if ok { //exist
in.UpTimestamp = appIns.UpTimestamp
//dirtytimestamp
if in.DirtyTimestamp < appIns.DirtyTimestamp {
log.Println("register exist dirty timestamp")
in = appIns
}
}
//add or update instances
app.instances[in.Hostname] = in
app.upLatestTimestamp(latestTimestamp)
returnIns := new(Instance)
*returnIns = *in
return returnIns, !ok
}
返回 !ok (isNew)表明,本次服务注册时,实例为新增还是替换,用来维护服务健康信息(后面会再次提到)。
var req = &model.RequestRegister{AppId: "com.xx.testapp", Hostname: "myhost", Addrs: []string{"http://testapp.xx.com/myhost"}, Status: 1}
func TestRegister(t *testing.T) {
r := model.NewRegistry()
instance := model.NewInstance(req)
app, _ := r.Register(instance, req.LatestTimestamp)
t.Log(app)
}
服务发现
功能目标:查找已注册的服务获取信息,可以指定条件查找,也可以全量查找。这里以指定过滤条件 appid 、env 和 status 为例。
r := model.NewRegistry()
fetchData, err := r.Fetch(req.Env, req.AppId, req.Status, 0)
根据 appid 和 env 组合成 key,然后从注册表的 apps 中获取应用 app,然后通过 app 获取服务实例 GetInstance()
func (r *Registry) Fetch(env, appid string, status uint32, latestTime int64) (*FetchData, *errcode.Error) {
app, ok := r.getApplication(appid, env)
if !ok {
return nil, errcode.NotFound
}
return app.GetInstance(status, latestTime)
}
func (r *Registry) getApplication(appid, env string) (*Application, bool) {
key := getKey(appid, env)
r.lock.RLock()
app, ok := r.apps[key]
r.lock.RUnlock()
return app, ok
}
根据 app 获取所有应用实例,并用 status 过滤,这里对返回结果 instances 中的 Addr 进行了拷贝返回一个新的切片。
func (app *Application) GetInstance(status uint32, latestTime int64) (*FetchData, *errcode.Error) {
app.lock.RLock()
defer app.lock.RUnlock()
if latestTime >= app.latestTimestamp {
return nil, errcode.NotModified
}
fetchData := FetchData{
Instances: make([]*Instance, 0),
LatestTimestamp: app.latestTimestamp,
}
var exists bool
for _, instance := range app.instances {
if status&instance.Status > 0 {
exists = true
newInstance := copyInstance(instance)
fetchData.Instances = append(fetchData.Instances, newInstance)
}
}
if !exists {
return nil, errcode.NotFound
}
return &fetchData, nil
}
//deep copy
func copyInstance(src *Instance) *Instance {
dst := new(Instance)
*dst = *src
//copy addrs
dst.Addrs = make([]string, len(src.Addrs))
for i, addr := range src.Addrs {
dst.Addrs[i] = addr
}
return dst
}
编写测试用例,先注册再获取,看到可以正常获取到信息。
服务下线
功能目标:接受服务的下线请求,并将服务从注册信息列表中删除。通过传入 env, appid, hostname 三要素信息进行对应服务实例的取消。
r := model.NewRegistry()
r.Cancel(req.Env, req.AppId, req.Hostname, 0)
根据 appid 和 env 找到对象的 app,然后删除 app 中对应的 hostname。如果 hostname 后 app.instances 为空,那么将 app 从注册表中清除。
func (r *Registry) Cancel(env, appid, hostname string, latestTimestamp int64) (*Instance, *errcode.Error) {
log.Println("action cancel...")
//find app
app, ok := r.getApplication(appid, env)
if !ok {
return nil, errcode.NotFound
}
instance, ok, insLen := app.Cancel(hostname, latestTimestamp)
if !ok {
return nil, errcode.NotFound
}
//if instances is empty, delete app from apps
if insLen == 0 {
r.lock.Lock()
delete(r.apps, getKey(appid, env))
r.lock.Unlock()
}
return instance, nil
}
func (app *Application) Cancel(hostname string, latestTimestamp int64) (*Instance, bool, int) {
newInstance := new(Instance)
app.lock.Lock()
defer app.lock.Unlock()
appIn, ok := app.instances[hostname]
if !ok {
return nil, ok, 0
}
//delete hostname
delete(app.instances, hostname)
appIn.LatestTimestamp = latestTimestamp
app.upLatestTimestamp(latestTimestamp)
*newInstance = *appIn
return newInstance, true, len(app.instances)
}
编写测试用例先注册,再取消,然后获取信息,发现 404 not found。
服务续约
功能目标:实现服务的健康检查机制,服务注册后,如果没有取消,那么就应该在注册表中,可以随时查到,如果某个服务实例挂了,能否自动的从注册表中删除,保障注册表中的服务实例都是正常的。
通常有两种方式做法:注册中心(服务端)主动探活,通过请求指定接口得到正常响应来确认;服务实例(客户端)主动上报,调用续约接口进行续约,续约设有时效 TTL (time to live)。两种方式各有优缺点,大家可以思考一下,不同的注册中心也采用了不同的方式,这里选型第二种方案。
r := model.NewRegistry()
r.Renew(req.Env, req.AppId, req.Hostname)
根据 appid 和 env 找到对象的 app,再根据 hostname 找到对应主机实例,更新其 RenewTimestamp 为当前时间。
func (r *Registry) Renew(env, appid, hostname string) (*Instance, *errcode.Error) {
app, ok := r.getApplication(appid, env)
if !ok {
return nil, errcode.NotFound
}
in, ok := app.Renew(hostname)
if !ok {
return nil, errcode.NotFound
}
return in, nil
}
func (app *Application) Renew(hostname string) (*Instance, bool) {
app.lock.Lock()
defer app.lock.Unlock()
appIn, ok := app.instances[hostname]
if !ok {
return nil, ok
}
appIn.RenewTimestamp = time.Now().UnixNano()
return copyInstance(appIn), true
}
服务剔除
功能目标:既然有服务定期续约,那么对应的如果服务没有续约呢?服务如果下线可以使用 Cancel 进行取消,但如果服务因为网络故障或挂了导致不能提供服务,那么可以通过检查它是否按时续约来判断,把 TTL 达到阈值的服务实例剔除(Cancel),实现服务的被动下线。
func NewRegistry() *Registry {
++ go r.evictTask()
}
func (r *Registry) evictTask() {
ticker := time.Tick(configs.CheckEvictInterval)
for {
select {
case <-ticker:
r.evict()
}
}
}
遍历注册表的所有 apps,然后再遍历其中的 instances,如果当前时间减去实例上一次续约时间 instance.RenewTimestamp 达到阈值(默认 90 秒),那么将其加入过期队列中。这里并没有直接将过期队列所有实例都取消,考虑 GC 以及 本地时间漂移的因素,设定了一个剔除的上限 evictionLimit,随机剔除一些过期实例。
func (r *Registry) evict() {
now := time.Now().UnixNano()
var expiredInstances []*Instance
apps := r.getAllApplications()
var registryLen int
for _, app := range apps {
registryLen += app.GetInstanceLen()
allInstances := app.GetAllInstances()
for _, instance := range allInstances {
if now-instance.RenewTimestamp > int64(configs.InstanceExpireDuration) {
expiredInstances = append(expiredInstances, instance)
}
}
}
evictionLimit := registryLen - int(float64(registryLen)*configs.SelfProtectThreshold)
expiredLen := len(expiredInstances)
if expiredLen > evictionLimit {
expiredLen = evictionLimit
}
if expiredLen == 0 {
return
}
for i := 0; i < expiredLen; i++ {
j := i + rand.Intn(len(expiredInstances)-i)
expiredInstances[i], expiredInstances[j] = expiredInstances[j], expiredInstances[i]
expiredInstance := expiredInstances[i]
r.Cancel(expiredInstance.Env, expiredInstance.AppId, expiredInstance.Hostname, now)
}
}
剔除过期时,采用了 Knuth-Shuffle 算法,也叫公平洗牌算法来实现随机剔除。当然如果 expiredLen <= evictionLimit,随机剔除的意义不大,如果前者大于后者,随机剔除能最大程度保障,剔除的实例均匀分散到所有应用实例中,降低某服务被全部清空的风险。公平洗牌算法实现也比较简单,循环遍历过期列表,将当前数与特定随机数交换,和我们打牌时两两交换洗牌过程类似,它实现了 O(n) 的时间复杂度,由 Knuth 发明。
自我保护
功能目标:既然服务会定期剔除超时未续约的服务,那么假设一种情况,网络一段时间发生了异常,所有服务都没成功续约,这时注册中心是否将所有服务全部剔除?当然不行!所以,我们需要一个自我保护的机制防止此类事情的发生。
type Guard struct {
renewCount int64
lastRenewCount int64
needRenewCount int64
threshold int64
lock sync.RWMutex
}
-
renewCount 记录所有服务续约次数,每执行一次 renew 加 1 -
lastRenewCount 记录上一次检查周期(默认 60 秒)服务续约统计次数
-
needRenewCount 记录一个周期总计需要的续约数,按一次续约 30 秒,一周期 60 秒,一个实例就需要 2 次,所以服务注册时 + 2,服务取消时 - 2
-
threshold 通过 needRenewCount 和阈值比例 (0.85)确定触发自我保护的值
func (gd *Guard) incrNeed() {
gd.lock.Lock()
defer gd.lock.Unlock()
gd.needRenewCount += int64(configs.CheckEvictInterval / configs.RenewInterval)
gd.threshold = int64(float64(gd.needRenewCount) * configs.SelfProtectThreshold)
}
func (gd *Guard) decrNeed() {
gd.lock.Lock()
defer gd.lock.Unlock()
gd.needRenewCount -= int64(configs.CheckEvictInterval / configs.RenewInterval)
gd.threshold = int64(float64(gd.needRenewCount) * configs.SelfProtectThreshold)
}
func (gd *Guard) setNeed(count int64) {
gd.lock.Lock()
defer gd.lock.Unlock()
gd.needRenewCount = count * int64(configs.CheckEvictInterval/configs.RenewInterval)
gd.threshold = int64(float64(gd.needRenewCount) * configs.SelfProtectThreshold)
}
func (gd *Guard) incrCount() {
atomic.AddInt64(&gd.renewCount, 1)
}
type Registry struct {
++ gd *Guard
}
func NewRegistry() *Registry {
r := &Registry{
++ gd: new(Guard),
}
}
func (r *Registry) Register(...) {
if isNew {
++ r.gd.incrNeed()
}
}
func (r *Registry) Cancel(...) {
++ r.gd.decrNeed()
}
func (r *Registry) Renew(...) {
++ r.gd.incrCount()
}
在服务剔除前进行上一周期计数统计,并判断是否达到自我保护开启状态。
func (gd *Guard) storeLastCount() {
atomic.StoreInt64(&gd.lastRenewCount, atomic.SwapInt64(&gd.needRenewCount, 0))
}
func (gd *Guard) selfProtectStatus() bool {
return atomic.LoadInt64(&gd.lastRenewCount) < atomic.LoadInt64(&gd.threshold)
}
func (r *Registry) evictTask() {
case <-ticker:
++ r.gd.storeLastCount()
r.evict()
}
}
func (r *Registry) evict() {
delta := now - instance.RenewTimestamp
++ if !protectStatus && delta > int64(configs.InstanceExpireDuration) ||
delta > int64(configs.InstanceMaxExpireDuration) {
expiredInstances = append(expiredInstances, instance)
}
}
func (r *Registry) evictTask() {
resetTicker := time.Tick(configs.ResetGuardNeedCountInterval)
for {
select {
case <-resetTicker:
var count int64
for _, app := range r.getAllApplications() {
count += int64(app.GetInstanceLen())
}
r.gd.setNeed(count)
}
}
}
注册中心对外提供服务
目前注册中心基本功能已实现,需要对外提供服务了,我们采用 gin 来实现一个 web 服务,接受 http 请求进行服务的注册、查找、续约、下线操作,这样保障注册中心可以方便的接受来自任何语言客户端请求。
func main() {
//init config
c := flag.String("c", "", "config file path")
flag.Parse()
config, err := configs.LoadConfig(*c)
if err != nil {
log.Println("load config error:", err)
return
}
//global discovery
global.Discovery = model.NewDiscovery(config)
//init router and start server
router := api.InitRouter()
srv := &http.Server{
Addr: config.HttpServer,
Handler: router,
}
go func() {
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
log.Fatalf("listen:%s\n", err)
}
}()
}
增加一个 discovery 结构,并开启一个全局变量 global.Discovery ,该结构中维护注册表 Registry,然后就可以根据注册表实现各种操作了。
type Discovery struct {
config *configs.GlobalConfig
protected bool
Registry *Registry
}
func NewDiscovery(config *configs.GlobalConfig) *Discovery {
dis := &Discovery{
protected: false,
config: config,
Registry: NewRegistry(), //init registry
}
return dis
}
//init discovery
var Discovery *model.Discovery
router.POST("api/register", handler.RegisterHandler)
func RegisterHandler(c *gin.Context) {
var req model.RequestRegister
if e := c.ShouldBindJSON(&req); e != nil {
err := errcode.ParamError
c.JSON(http.StatusOK, gin.H{
"code": err.Code(),
"message": err.Error(),
})
return
}
//bind instance
instance := model.NewInstance(&req)
if instance.Status == 0 || instance.Status > 2 {
err := errcode.ParamError
c.JSON(http.StatusOK, gin.H{
"code": err.Code(),
"message": err.Error(),
})
return
}
//dirtytime
if req.DirtyTimestamp > 0 {
instance.DirtyTimestamp = req.DirtyTimestamp
}
global.Discovery.Registry.Register(instance, req.LatestTimestamp)
c.JSON(http.StatusOK, gin.H{
"code": 200,
"message": "",
"data": "",
})
}
func main() {
//...
//graceful restart
quit := make(chan os.Signal)
signal.Notify(quit, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP, syscall.SIGQUIT)
<-quit
log.Println("shutdown discovery server...")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := srv.Shutdown(ctx); err != nil {
log.Fatal("server shutdown error:", err)
}
select {
case <-ctx.Done():
log.Println("timeout of 5 seconds")
}
log.Println("server exiting")
}
实现效果如图所示:
工程实践
-
使用 go module 管理依赖的三方包 (gin 和 yaml)
-
api 存放 http 服务路由以及对应处理函数
-
cmd 存放编译入口 main 文件
-
configs 存放全局配置和全局常量
-
global 存放全局结构变量
-
model 存放注册表结构模型及主要逻辑
总结与问题
注册中心功能实现
至此,一个单机版的注册中心就可以工作了,但生产环境单点肯定是不能容忍的,因此有必要实现一个注册中心集群。那么是否部署多个注册中心实例就可以了,当然 .... 不行!这只能保障有多个注册中心节点,而每个节点中维护自己的注册表,那么就需要进行注册表数据同步。多节点数据同步又会涉及著名的一致性问题,这时 Paxos、Raft、ZAB、Gossip 等算法名词涌现,而我们将使用 P2P(Peer to Peer)对等网络协议来实现。关于集群设计与实现我们将在后续文章中展开。
相关代码:https://github.com/skyhackvip/service_discovery
参考阅读
技术原创及架构实践文章,欢迎通过公众号菜单「联系我们」进行投稿。
文章评论