在微服务架构中,RPC 框架是服务间通信的命脉。一个优秀的 RPC 框架不仅要性能出色,更要内置完善的服务治理能力。go-zero 的 zRPC 正是这样一个可直接用于生产的框架——它在 gRPC 之上封装了熔断、降载、监控、负载均衡等高级特性,让开发者开箱即用。在十三Tech 的微服务实践中,zRPC 是我们构建高可用服务集群的基石。本文将从源码层面深入剖析其核心模块的实现原理。
zRPC 概览
zRPC 是 go-zero 的 RPC 通信层,底层依赖 gRPC,并在此基础上扩展了以下核心能力:
- 服务注册与发现:默认基于 etcd 实现,支持自定义扩展
- 拦截器体系:内置熔断、降载、权限验证、Prometheus 指标收集等
- 负载均衡:自定义 P2C 算法,兼顾性能与公平性
- 自适应治理:根据服务实时状态自动调整流量策略
拦截器模块
gRPC 拦截器(Interceptor)是请求前后进行额外处理的核心机制,分为客户端拦截器和服务端拦截器两类。
拦截器签名
客户端拦截器:
type UnaryClientInterceptor func(
ctx context.Context,
method string,
req, reply interface{},
cc *ClientConn,
invoker UnaryInvoker,
opts ...CallOption,
) error
method:调用的 RPC 方法名req/reply:请求和响应参数cc:客户端连接对象invoker:真正执行 RPC 方法的 handler,在拦截器内部被调用
服务端拦截器:
type UnaryServerInterceptor func(
ctx context.Context,
req interface{},
info *UnaryServerInfo,
handler UnaryHandler,
) (resp interface{}, err error)
req:请求参数info:包含请求方法的元信息handler:对服务端方法的包装,在拦截器中被调用执行
zRPC 内置了丰富的拦截器,下面我们重点剖析两个核心实现。
自适应熔断(Breaker)
当客户端向服务端发起请求时,如果服务端返回错误的比例超过阈值,客户端会主动熔断,丢弃部分请求以保护下游依赖,并在条件恢复后自动重连。
zRPC 的自适应熔断算法遵循 Google SRE 中的过载保护策略:
requests:总请求数量accepts:正常请求数量K:倍值(Google SRE 推荐值为 2)
通过调整 K 可以控制熔断的激进程度:K 越小,熔断越敏感;K 越大,熔断越保守。
熔断拦截器的实现:
func BreakerInterceptor(ctx context.Context, method string, req, reply interface{},
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
// target + 方法名 作为熔断器唯一标识
breakerName := path.Join(cc.Target(), method)
return breaker.DoWithAcceptable(breakerName, func() error {
// 真正执行 RPC 调用
return invoker(ctx, method, req, reply, cc, opts...)
}, codes.Acceptable)
}
核心熔断判断逻辑:
func (b *googleBreaker) accept() error {
// accepts 为正常请求数,total 为总请求数
accepts, total := b.history()
weightedAccepts := b.k * float64(accepts)
// protection 默认为 5,k 默认 1.5
dropRatio := math.Max(0, (float64(total-protection)-weightedAccepts)/float64(total+1))
if dropRatio <= 0 {
return nil
}
// 按概率决定是否丢弃请求
if b.proba.TrueOnProba(dropRatio) {
return ErrServiceUnavailable
}
return nil
}
请求执行与状态记录:
func (b *googleBreaker) doReq(req func() error, fallback func(err error) error, acceptable Acceptable) error {
if err := b.accept(); err != nil {
if fallback != nil {
return fallback(err)
}
return err
}
defer func() {
if e := recover(); e != nil {
b.markFailure()
panic(e)
}
}()
// 执行 RPC 请求
err := req()
// 正常请求:total 和 accepts 均 +1
if acceptable(err) {
b.markSuccess()
} else {
// 异常请求:仅 total +1
b.markFailure()
}
return err
}
Prometheus 指标收集
服务监控是了解系统运行状态的重要手段。zRPC 通过 Prometheus 拦截器对 RPC 方法的耗时和错误码进行指标收集:
func UnaryPrometheusInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (
interface{}, error) {
// 记录开始时间
startTime := timex.Now()
resp, err := handler(ctx, req)
// 计算耗时并记录
metricServerReqDur.Observe(int64(timex.Since(startTime)/time.Millisecond), info.FullMethod)
// 记录错误码分布
metricServerReqCodeTotal.Inc(info.FullMethod, strconv.Itoa(int(status.Code(err))))
return resp, err
}
}
这里使用了 Prometheus 的 Histogram 记录延迟分布,Counter 记录错误码出现次数,为后续的告警和可视化提供数据基础。
服务发现与注册(Resolver)
zRPC 自定义了 resolver 模块来实现服务发现功能。在 gRPC 中,自定义 resolver 需要实现 resolver.Builder 接口:
type Builder interface {
Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
Scheme() string
}
Build 方法返回的 Resolver 定义如下:
type Resolver interface {
ResolveNow(ResolveNowOptions)
Close()
}
zRPC 内置了两种 resolver:direct(直连)和 discov(基于 etcd 的服务发现):
func RegisterResolver() {
resolver.Register(&dirBuilder)
resolver.Register(&disBuilder)
}
服务端注册流程
启动 zRPC 服务端时,会向 etcd 注册服务地址:
func (ags keepAliveServer) Start(fn RegisterFn) error {
// 注册服务地址到 etcd
if err := ags.registerEtcd(); err != nil {
return err
}
// 启动 gRPC 服务
return ags.Server.Start(fn)
}
客户端发现流程
启动 zRPC 客户端时,gRPC 内部会调用自定义 resolver 的 Build 方法。zRPC 在 Build 中通过 UpdateState 将服务地址注入 gRPC 客户端:
func (d *discovBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (
resolver.Resolver, error) {
hosts := strings.FieldsFunc(target.Authority, func(r rune) bool {
return r == EndpointSepChar
})
// 创建 etcd 订阅者
sub, err := discov.NewSubscriber(hosts, target.Endpoint)
if err != nil {
return nil, err
}
update := func() {
var addrs []resolver.Address
for _, val := range subset(sub.Values(), subsetSize) {
addrs = append(addrs, resolver.Address{
Addr: val,
})
}
// 向 gRPC 注册最新服务地址
cc.UpdateState(resolver.State{
Addresses: addrs,
})
}
// 监听服务地址变化
sub.AddListener(update)
update()
return &nopResolver{cc: cc}, nil
}
etcd 数据同步
discov 模块通过 load 方法从 etcd 获取指定服务的所有地址:
func (c *cluster) load(cli EtcdClient, key string) {
var resp *clientv3.GetResponse
for {
ctx, cancel := context.WithTimeout(c.context(cli), RequestTimeout)
// 从 etcd 获取指定服务的所有地址
resp, err = cli.Get(ctx, makeKeyPrefix(key), clientv3.WithPrefix())
cancel()
if err == nil {
break
}
logx.Error(err)
time.Sleep(coolDownInterval)
}
var kvs []KV
c.lock.Lock()
for _, ev := range resp.Kvs {
kvs = append(kvs, KV{
Key: string(ev.Key),
Val: string(ev.Value),
})
}
c.lock.Unlock()
c.handleChanges(key, kvs)
}
并通过 watch 监听服务地址的实时变化:
func (c *cluster) watch(cli EtcdClient, key string) {
rch := cli.Watch(clientv3.WithRequireLeader(c.context(cli)), makeKeyPrefix(key), clientv3.WithPrefix())
for {
select {
case wresp, ok := <-rch:
if !ok {
logx.Error("etcd monitor chan has been closed")
return
}
if wresp.Canceled {
logx.Error("etcd monitor chan has been canceled")
return
}
if wresp.Err() != nil {
logx.Error(fmt.Sprintf("etcd monitor chan error: %v", wresp.Err()))
return
}
// 监听变化并通知更新
c.handleWatchEvents(key, wresp.Events)
case <-c.done:
return
}
}
}
负载均衡模块(Balancer)
避免过载是负载均衡的核心目标之一。zRPC 没有采用简单的轮询或随机算法,而是自定义了 P2C(Power of Two Choices) 负载均衡算法。
P2C 算法思想
- 从可用节点中随机选取两个节点 A、B
- 比较两个节点的实时负载,选择负载更低的节点
节点状态追踪
P2C 对 gRPC 的 SubConn 进行了包装,每个连接维护自己的统计数据:
type subConn struct {
addr resolver.Address
conn balancer.SubConn
lag uint64 // EWMA 计算的延迟
inflight int64 // 当前正在处理的请求数
success uint64 // 请求成功率(初始值 1000,低于 500 判定为不健康)
requests int64
last int64
pick int64
}
lag:基于 EWMA(指数加权移动平均)计算的节点延迟,近期观察值权重更高inflight:当前正在处理的请求数,直接反映节点的拥塞程度success:请求成功率,低于阈值时节点会被判定为不健康
节点选择逻辑
func (p *p2cPicker) Pick(ctx context.Context, info balancer.PickInfo) (
conn balancer.SubConn, done func(balancer.DoneInfo), err error) {
p.lock.Lock()
defer p.lock.Unlock()
var chosen *subConn
switch len(p.conns) {
case 0:
return nil, nil, balancer.ErrNoSubConnAvailable
case 1:
chosen = p.choose(p.conns[0], nil)
case 2:
chosen = p.choose(p.conns[0], p.conns[1])
default:
var node1, node2 *subConn
for i := 0; i < pickTimes; i++ {
// 随机选取两个不同节点
a := p.r.Intn(len(p.conns))
b := p.r.Intn(len(p.conns) - 1)
if b >= a {
b++
}
node1 = p.conns[a]
node2 = p.conns[b]
// 校验节点健康状态
if node1.healthy() && node2.healthy() {
break
}
}
chosen = p.choose(node1, node2)
}
atomic.AddInt64(&chosen.inflight, 1)
atomic.AddInt64(&chosen.requests, 1)
return chosen.conn, p.buildDoneFunc(chosen), nil
}
负载比较与选择
func (p *p2cPicker) choose(c1, c2 *subConn) *subConn {
start := int64(timex.Now())
if c2 == nil {
atomic.StoreInt64(&c1.pick, start)
return c1
}
// 选择负载较低的节点
if c1.load() > c2.load() {
c1, c2 = c2, c1
}
// forcePick 机制:避免某个节点长期得不到选择
pick := atomic.LoadInt64(&c2.pick)
if start-pick > forcePick && atomic.CompareAndSwapInt64(&c2.pick, pick, start) {
return c2
}
atomic.StoreInt64(&c1.pick, start)
return c1
}
// 节点负载 = sqrt(lag) * (inflight + 1)
func (c *subConn) load() int64 {
lag := int64(math.Sqrt(float64(atomic.LoadUint64(&c.lag) + 1)))
load := lag * (atomic.LoadInt64(&c.inflight) + 1)
if load == 0 {
return penalty
}
return load
}
调用完成后的状态更新
func (p *p2cPicker) buildDoneFunc(c *subConn) func(info balancer.DoneInfo) {
start := int64(timex.Now())
return func(info balancer.DoneInfo) {
atomic.AddInt64(&c.inflight, -1)
now := timex.Now()
last := atomic.SwapInt64(&c.last, int64(now))
td := int64(now) - last
if td < 0 {
td = 0
}
// 时间衰减系数
w := math.Exp(float64(-td) / float64(decayTime))
lag := int64(now) - start
if lag < 0 {
lag = 0
}
// 更新 EWMA 延迟
olag := atomic.LoadUint64(&c.lag)
if olag == 0 {
w = 0
}
atomic.StoreUint64(&c.lag, uint64(float64(olag)*w+float64(lag)*(1-w)))
// 更新成功率
success := initSuccess
if info.Err != nil && !codes.Acceptable(info.Err) {
success = 0
}
osucc := atomic.LoadUint64(&c.success)
atomic.StoreUint64(&c.success, uint64(float64(osucc)*w+float64(success)*(1-w)))
}
}
注册到 gRPC
自定义的 balancer 通过 init 函数注册到 gRPC 中:
func init() {
balancer.Register(newBuilder())
}
func newBuilder() balancer.Builder {
return base.NewBalancerBuilder(Name, new(p2cPickerBuilder))
}
在创建客户端时通过 grpc.WithBalancerName 启用:
func NewClient(target string, opts ...ClientOption) (*client, error) {
var cli client
opts = append(opts, WithDialOption(grpc.WithBalancerName(p2c.Name)))
if err := cli.dial(target, opts...); err != nil {
return nil, err
}
return &cli, nil
}
总结
zRPC 的设计充分体现了 go-zero "工具化、开箱即用" 的理念。它在 gRPC 的基础上,通过拦截器链实现了熔断、监控、链路追踪等服务治理能力,通过 P2C 算法实现了自适应负载均衡,通过 etcd resolver 实现了高可用的服务发现。在十三Tech 的生产环境中,这些内置能力让我们无需引入额外的 Sidecar 即可获得企业级的微服务治理体验。
理解这些底层机制,不仅有助于更好地使用 go-zero,也能为设计自己的微服务框架提供宝贵的设计思路。