system column十三Tech
← 返回技术专栏
TECH

go-zero 微服务实战:深入理解 zRPC 框架的核心机制

zRPC 是 go-zero 的企业级 RPC 框架,底层基于 gRPC 并内置熔断、限流、负载均衡等治理能力。本文深度解析拦截器、P2C 负载均衡、服务发现等核心模块的实现原理。

Go微服务

在微服务架构中,RPC 框架是服务间通信的命脉。一个优秀的 RPC 框架不仅要性能出色,更要内置完善的服务治理能力。go-zero 的 zRPC 正是这样一个可直接用于生产的框架——它在 gRPC 之上封装了熔断、降载、监控、负载均衡等高级特性,让开发者开箱即用。在十三Tech 的微服务实践中,zRPC 是我们构建高可用服务集群的基石。本文将从源码层面深入剖析其核心模块的实现原理。

zRPC 概览

zRPC 是 go-zero 的 RPC 通信层,底层依赖 gRPC,并在此基础上扩展了以下核心能力:

  • 服务注册与发现:默认基于 etcd 实现,支持自定义扩展
  • 拦截器体系:内置熔断、降载、权限验证、Prometheus 指标收集等
  • 负载均衡:自定义 P2C 算法,兼顾性能与公平性
  • 自适应治理:根据服务实时状态自动调整流量策略

拦截器模块

gRPC 拦截器(Interceptor)是请求前后进行额外处理的核心机制,分为客户端拦截器和服务端拦截器两类。

拦截器签名

客户端拦截器:

type UnaryClientInterceptor func(
	ctx context.Context,
	method string,
	req, reply interface{},
	cc *ClientConn,
	invoker UnaryInvoker,
	opts ...CallOption,
) error
  • method:调用的 RPC 方法名
  • req / reply:请求和响应参数
  • cc:客户端连接对象
  • invoker:真正执行 RPC 方法的 handler,在拦截器内部被调用

服务端拦截器:

type UnaryServerInterceptor func(
	ctx context.Context,
	req interface{},
	info *UnaryServerInfo,
	handler UnaryHandler,
) (resp interface{}, err error)
  • req:请求参数
  • info:包含请求方法的元信息
  • handler:对服务端方法的包装,在拦截器中被调用执行

zRPC 内置了丰富的拦截器,下面我们重点剖析两个核心实现。

自适应熔断(Breaker)

当客户端向服务端发起请求时,如果服务端返回错误的比例超过阈值,客户端会主动熔断,丢弃部分请求以保护下游依赖,并在条件恢复后自动重连。

zRPC 的自适应熔断算法遵循 Google SRE 中的过载保护策略:

  • requests:总请求数量
  • accepts:正常请求数量
  • K:倍值(Google SRE 推荐值为 2)

通过调整 K 可以控制熔断的激进程度:K 越小,熔断越敏感;K 越大,熔断越保守。

熔断拦截器的实现:

func BreakerInterceptor(ctx context.Context, method string, req, reply interface{},
	cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
	// target + 方法名 作为熔断器唯一标识
	breakerName := path.Join(cc.Target(), method)
	return breaker.DoWithAcceptable(breakerName, func() error {
		// 真正执行 RPC 调用
		return invoker(ctx, method, req, reply, cc, opts...)
	}, codes.Acceptable)
}

核心熔断判断逻辑:

func (b *googleBreaker) accept() error {
	// accepts 为正常请求数,total 为总请求数
	accepts, total := b.history()

	weightedAccepts := b.k * float64(accepts)

	// protection 默认为 5,k 默认 1.5
	dropRatio := math.Max(0, (float64(total-protection)-weightedAccepts)/float64(total+1))

	if dropRatio <= 0 {
		return nil
	}

	// 按概率决定是否丢弃请求
	if b.proba.TrueOnProba(dropRatio) {
		return ErrServiceUnavailable
	}

	return nil
}

请求执行与状态记录:

func (b *googleBreaker) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error {
	if err := b.accept(); err != nil {
		if fallback != nil {
			return fallback(err)
		}
		return err
	}

	defer func() {
		if e := recover(); e != nil {
			b.markFailure()
			panic(e)
		}
	}()

	// 执行 RPC 请求
	err := req()
	// 正常请求:total 和 accepts 均 +1
	if acceptable(err) {
		b.markSuccess()
	} else {
		// 异常请求:仅 total +1
		b.markFailure()
	}

	return err
}

Prometheus 指标收集

服务监控是了解系统运行状态的重要手段。zRPC 通过 Prometheus 拦截器对 RPC 方法的耗时和错误码进行指标收集:

func UnaryPrometheusInterceptor() grpc.UnaryServerInterceptor {
	return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (
		interface{}, error) {
		// 记录开始时间
		startTime := timex.Now()
		resp, err := handler(ctx, req)
		// 计算耗时并记录
		metricServerReqDur.Observe(int64(timex.Since(startTime)/time.Millisecond), info.FullMethod)
		// 记录错误码分布
		metricServerReqCodeTotal.Inc(info.FullMethod, strconv.Itoa(int(status.Code(err))))
		return resp, err
	}
}

这里使用了 Prometheus 的 Histogram 记录延迟分布,Counter 记录错误码出现次数,为后续的告警和可视化提供数据基础。

服务发现与注册(Resolver)

zRPC 自定义了 resolver 模块来实现服务发现功能。在 gRPC 中,自定义 resolver 需要实现 resolver.Builder 接口:

type Builder interface {
	Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
	Scheme() string
}

Build 方法返回的 Resolver 定义如下:

type Resolver interface {
	ResolveNow(ResolveNowOptions)
	Close()
}

zRPC 内置了两种 resolver:direct(直连)和 discov(基于 etcd 的服务发现):

func RegisterResolver() {
	resolver.Register(&dirBuilder)
	resolver.Register(&disBuilder)
}

服务端注册流程

启动 zRPC 服务端时,会向 etcd 注册服务地址:

func (ags keepAliveServer) Start(fn RegisterFn) error {
	// 注册服务地址到 etcd
	if err := ags.registerEtcd(); err != nil {
		return err
	}
	// 启动 gRPC 服务
	return ags.Server.Start(fn)
}

客户端发现流程

启动 zRPC 客户端时,gRPC 内部会调用自定义 resolver 的 Build 方法。zRPC 在 Build 中通过 UpdateState 将服务地址注入 gRPC 客户端:

func (d *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (
	resolver.Resolver, error) {
	hosts := strings.FieldsFunc(target.Authority, func(r rune) bool {
		return r == EndpointSepChar
	})
	// 创建 etcd 订阅者
	sub, err := discov.NewSubscriber(hosts, target.Endpoint)
	if err != nil {
		return nil, err
	}

	update := func() {
		var addrs []resolver.Address
		for _, val := range subset(sub.Values(), subsetSize) {
			addrs = append(addrs, resolver.Address{
				Addr: val,
			})
		}
		// 向 gRPC 注册最新服务地址
		cc.UpdateState(resolver.State{
			Addresses: addrs,
		})
	}
	// 监听服务地址变化
	sub.AddListener(update)
	update()
	return &nopResolver{cc: cc}, nil
}

etcd 数据同步

discov 模块通过 load 方法从 etcd 获取指定服务的所有地址:

func (c *cluster) load(cli EtcdClient, key string) {
	var resp *clientv3.GetResponse
	for {
		ctx, cancel := context.WithTimeout(c.context(cli), RequestTimeout)
		// 从 etcd 获取指定服务的所有地址
		resp, err = cli.Get(ctx, makeKeyPrefix(key), clientv3.WithPrefix())
		cancel()
		if err == nil {
			break
		}
		logx.Error(err)
		time.Sleep(coolDownInterval)
	}

	var kvs []KV
	c.lock.Lock()
	for _, ev := range resp.Kvs {
		kvs = append(kvs, KV{
			Key: string(ev.Key),
			Val: string(ev.Value),
		})
	}
	c.lock.Unlock()
	c.handleChanges(key, kvs)
}

并通过 watch 监听服务地址的实时变化:

func (c *cluster) watch(cli EtcdClient, key string) {
	rch := cli.Watch(clientv3.WithRequireLeader(c.context(cli)), makeKeyPrefix(key), clientv3.WithPrefix())
	for {
		select {
		case wresp, ok := <-rch:
			if !ok {
				logx.Error("etcd monitor chan has been closed")
				return
			}
			if wresp.Canceled {
				logx.Error("etcd monitor chan has been canceled")
				return
			}
			if wresp.Err() != nil {
				logx.Error(fmt.Sprintf("etcd monitor chan error: %v", wresp.Err()))
				return
			}
			// 监听变化并通知更新
			c.handleWatchEvents(key, wresp.Events)
		case <-c.done:
			return
		}
	}
}

负载均衡模块(Balancer)

避免过载是负载均衡的核心目标之一。zRPC 没有采用简单的轮询或随机算法,而是自定义了 P2C(Power of Two Choices) 负载均衡算法。

P2C 算法思想

  1. 从可用节点中随机选取两个节点 A、B
  2. 比较两个节点的实时负载,选择负载更低的节点

节点状态追踪

P2C 对 gRPC 的 SubConn 进行了包装,每个连接维护自己的统计数据:

type subConn struct {
	addr       resolver.Address
	conn       balancer.SubConn
	lag        uint64  // EWMA 计算的延迟
	inflight   int64   // 当前正在处理的请求数
	success    uint64  // 请求成功率(初始值 1000,低于 500 判定为不健康)
	requests   int64
	last       int64
	pick       int64
}
  • lag:基于 EWMA(指数加权移动平均)计算的节点延迟,近期观察值权重更高
  • inflight:当前正在处理的请求数,直接反映节点的拥塞程度
  • success:请求成功率,低于阈值时节点会被判定为不健康

节点选择逻辑

func (p *p2cPicker) Pick(ctx context.Context, info balancer.PickInfo) (
	conn balancer.SubConn, done func(balancer.DoneInfo), err error) {
	p.lock.Lock()
	defer p.lock.Unlock()

	var chosen *subConn
	switch len(p.conns) {
	case 0:
		return nil, nil, balancer.ErrNoSubConnAvailable
	case 1:
		chosen = p.choose(p.conns[0], nil)
	case 2:
		chosen = p.choose(p.conns[0], p.conns[1])
	default:
		var node1, node2 *subConn
		for i := 0; i < pickTimes; i++ {
			// 随机选取两个不同节点
			a := p.r.Intn(len(p.conns))
			b := p.r.Intn(len(p.conns) - 1)
			if b >= a {
				b++
			}
			node1 = p.conns[a]
			node2 = p.conns[b]
			// 校验节点健康状态
			if node1.healthy() && node2.healthy() {
				break
			}
		}
		chosen = p.choose(node1, node2)
	}

	atomic.AddInt64(&chosen.inflight, 1)
	atomic.AddInt64(&chosen.requests, 1)
	return chosen.conn, p.buildDoneFunc(chosen), nil
}

负载比较与选择

func (p *p2cPicker) choose(c1, c2 *subConn) *subConn {
	start := int64(timex.Now())
	if c2 == nil {
		atomic.StoreInt64(&c1.pick, start)
		return c1
	}

	// 选择负载较低的节点
	if c1.load() > c2.load() {
		c1, c2 = c2, c1
	}

	// forcePick 机制:避免某个节点长期得不到选择
	pick := atomic.LoadInt64(&c2.pick)
	if start-pick > forcePick && atomic.CompareAndSwapInt64(&c2.pick, pick, start) {
		return c2
	}
	atomic.StoreInt64(&c1.pick, start)
	return c1
}

// 节点负载 = sqrt(lag) * (inflight + 1)
func (c *subConn) load() int64 {
	lag := int64(math.Sqrt(float64(atomic.LoadUint64(&c.lag) + 1)))
	load := lag * (atomic.LoadInt64(&c.inflight) + 1)
	if load == 0 {
		return penalty
	}
	return load
}

调用完成后的状态更新

func (p *p2cPicker) buildDoneFunc(c *subConn) func(info balancer.DoneInfo) {
	start := int64(timex.Now())
	return func(info balancer.DoneInfo) {
		atomic.AddInt64(&c.inflight, -1)
		now := timex.Now()
		last := atomic.SwapInt64(&c.last, int64(now))
		td := int64(now) - last
		if td < 0 {
			td = 0
		}
		// 时间衰减系数
		w := math.Exp(float64(-td) / float64(decayTime))
		lag := int64(now) - start
		if lag < 0 {
			lag = 0
		}
		// 更新 EWMA 延迟
		olag := atomic.LoadUint64(&c.lag)
		if olag == 0 {
			w = 0
		}
		atomic.StoreUint64(&c.lag, uint64(float64(olag)*w+float64(lag)*(1-w)))
		// 更新成功率
		success := initSuccess
		if info.Err != nil && !codes.Acceptable(info.Err) {
			success = 0
		}
		osucc := atomic.LoadUint64(&c.success)
		atomic.StoreUint64(&c.success, uint64(float64(osucc)*w+float64(success)*(1-w)))
	}
}

注册到 gRPC

自定义的 balancer 通过 init 函数注册到 gRPC 中:

func init() {
	balancer.Register(newBuilder())
}

func newBuilder() balancer.Builder {
	return base.NewBalancerBuilder(Name, new(p2cPickerBuilder))
}

在创建客户端时通过 grpc.WithBalancerName 启用:

func NewClient(target string, opts ...ClientOption) (*client, error) {
	var cli client
	opts = append(opts, WithDialOption(grpc.WithBalancerName(p2c.Name)))
	if err := cli.dial(target, opts...); err != nil {
		return nil, err
	}
	return &cli, nil
}

总结

zRPC 的设计充分体现了 go-zero "工具化、开箱即用" 的理念。它在 gRPC 的基础上,通过拦截器链实现了熔断、监控、链路追踪等服务治理能力,通过 P2C 算法实现了自适应负载均衡,通过 etcd resolver 实现了高可用的服务发现。在十三Tech 的生产环境中,这些内置能力让我们无需引入额外的 Sidecar 即可获得企业级的微服务治理体验。

理解这些底层机制,不仅有助于更好地使用 go-zero,也能为设计自己的微服务框架提供宝贵的设计思路。

参考文档