grpc负载均衡RoundRobin源码解读

grpc client端创建连接时可以用WithBalancer来指定负载均衡组件,这里研究下grpc自带的RoundRobin(轮询调度)的实现。源码在google.golang.org/grpc/balancer.go中。

roundRobin结构体定义如下:

type roundRobin struct {
	r      naming.Resolver
	w      naming.Watcher
	addrs  []*addrInfo // all the addresses the client should potentially connect
	mu     sync.Mutex
	addrCh chan []Address // the channel to notify gRPC internals the list of addresses the client should connect to.
	next   int            // index of the next address to return for Get()
	waitCh chan struct{}  // the channel to block when there is no connected address available
	done   bool           // The Balancer is closed.
}
  • r是命名解析器,可以定义自己的命名解析器,如etcd命名解析器。如果r为nil,那么Dial中参数target将直接作为可请求地址添加到addrs中。
  • w是命名解析器Resolve方法返回的watcher,该watcher可以监听命名解析器发来的地址信息变化,通知roundRobin对addrs中的地址进行动态的增删。
  • addrs是从命名解析器获取地址信息数组,数组中每个地址不仅有地址信息,还有grpc与该地址是否已经创建了ready状态的连接。
  • addrCh是地址数组的channel,该channel会在每次命名解析器发来地址信息变化后,将所有addrs通知到grpc内部的lbWatcher,lbWatcher是统一管理地址连接状态的协程,负责新地址的连接与被删除地址的关闭操作。
  • next是roundRobin的Index,即轮询调度遍历到addrs数组中的哪个位置了。
  • waitCh是当addrs中地址为空时,grpc调用Get()方法希望获取到一个到target的连接,如果设置了grpc的failfast为false,那么Get()方法会阻塞在此channel上,直到有ready的连接。

roundRobin启动:

func (rr *roundRobin) Start(target string, config BalancerConfig) error {
	rr.mu.Lock()
	defer rr.mu.Unlock()
	if rr.done {
		return ErrClientConnClosing
	}
	if rr.r == nil {
		// 如果没有解析器,那么直接将target加入addrs地址数组
		rr.addrs = append(rr.addrs, &addrInfo{addr: Address{Addr: target}})
		return nil
	}
	// Resolve接口会返回一个watcher,watcher可以监听解析器的地址变化
	w, err := rr.r.Resolve(target)
	if err != nil {
		return err
	}
	rr.w = w
	// 创建一个channel,当watcher监听到地址变化时,通知grpc内部lbWatcher去连接该地址
	rr.addrCh = make(chan []Address, 1)
	// go 出去监听watcher,监听地址变化。
	go func() {
		for {
			if err := rr.watchAddrUpdates(); err != nil {
				return
			}
		}
	}()
	return nil
}

监听命名解析器的地址变化:

func (rr *roundRobin) watchAddrUpdates() error {
	// watcher的next方法会阻塞,直至有地址变化信息过来,updates即为变化信息
	updates, err := rr.w.Next()
	if err != nil {
		return err
	}
	// 对于addrs地址数组的操作,显然是要加锁的,因为有多个goroutine在同时操作
	rr.mu.Lock()
	defer rr.mu.Unlock()
	for _, update := range updates {
		addr := Address{
			Addr:     update.Addr,
			Metadata: update.Metadata,
		}
		switch update.Op {
		case naming.Add:
		//对于新增类型的地址,注意这里不会重复添加。
			var exist bool
			for _, v := range rr.addrs {
				if addr == v.addr {
					exist = true
					break
				}
			}
			if exist {
				continue
			}
			rr.addrs = append(rr.addrs, &addrInfo{addr: addr})
		case naming.Delete:
		//对于删除的地址,直接在addrs中删除就行了
			for i, v := range rr.addrs {
				if addr == v.addr {
					copy(rr.addrs[i:], rr.addrs[i+1:])
					rr.addrs = rr.addrs[:len(rr.addrs)-1]
					break
				}
			}
		default:
			grpclog.Errorln("Unknown update.Op ", update.Op)
		}
	}
	// 这里复制了整个addrs地址数组,然后丢到addrCh channel中通知grpc内部lbWatcher,
	// lbWatcher会关闭删除的地址,连接新增的地址。
	// 连接ready后会有专门的goroutine调用Up方法修改addrs中地址的状态。
	open := make([]Address, len(rr.addrs))
	for i, v := range rr.addrs {
		open[i] = v.addr
	}
	if rr.done {
		return ErrClientConnClosing
	}
	select {
	case <-rr.addrCh:
	default:
	}
	rr.addrCh <- open
	return nil
}

Up方法:
up方法是grpc内部负载均衡watcher调用的,该watcher会读全局的连接状态改变队列,如果是ready状态的连接,会调用up方法来改变addrs地址数组中该地址的状态为已连接

func (rr *roundRobin) Up(addr Address) func(error) {
	rr.mu.Lock()
	defer rr.mu.Unlock()
	var cnt int
	//将地址数组中的addr置为已连接状态,这样这个地址就可以被client使用了。
	for _, a := range rr.addrs {
		if a.addr == addr {
			if a.connected {
				return nil
			}
			a.connected = true
		}
		if a.connected {
			cnt++
		}
	}
	// 当有一个可用地址时,之前可能是0个,可能要很多client阻塞在获取连接地址上,这里通知所有的client有可用连接啦。
	// 为什么只等于1时通知?因为可用地址数量>1时,client是不会阻塞的。
	if cnt == 1 && rr.waitCh != nil {
		close(rr.waitCh)
		rr.waitCh = nil
	}
	//返回禁用该地址的方法
	return func(err error) {
		rr.down(addr, err)
	}
}

down方法:
down方法就简单了, 直接找到addr置为不可用就行了。

//如果addr1已经被连接上了,但是resolver通知删除了,grpc内部如何处理关闭的逻辑?
func (rr *roundRobin) down(addr Address, err error) {
	rr.mu.Lock()
	defer rr.mu.Unlock()
	for _, a := range rr.addrs {
		if addr == a.addr {
			a.connected = false
			break
		}
	}
}

Get()方法:
client 需要获取一个可用的地址,如果addrs为空,或者addrs不为空,但是地址都不可用(没连接),Get()方法会返回错误。但是如果设置了failfast = false,Get()方法会阻塞在waitCh channel上,直至Up方法给到通知,然后轮询调度可用的地址。

func (rr *roundRobin) Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) {
	var ch chan struct{}
	rr.mu.Lock()
	if rr.done {
		rr.mu.Unlock()
		err = ErrClientConnClosing
		return
	}

	if len(rr.addrs) > 0 {
		// addrs的长度可能变化,如果next值超出了,就置为0,从头开始调度。
		if rr.next >= len(rr.addrs) {
			rr.next = 0
		}
		next := rr.next
		//遍历整个addrs数组,直到选出一个可用的地址
		for {
			a := rr.addrs[next]
			// next值加一,当然是循环的,到len(addrs)后,变为0
			next = (next + 1) % len(rr.addrs)
			if a.connected {
				addr = a.addr
				rr.next = next
				rr.mu.Unlock()
				return
			}
			if next == rr.next {
				// 遍历完一圈了,还没找到,走下面逻辑
				break
			}
		}
	}
	if !opts.BlockingWait { //如果是非阻塞模式,如果没有可用地址,那么报错
		if len(rr.addrs) == 0 {
			rr.mu.Unlock()
			err = status.Errorf(codes.Unavailable, "there is no address available")
			return
		}
		// Returns the next addr on rr.addrs for failfast RPCs.
		addr = rr.addrs[rr.next].addr
		rr.next++
		rr.mu.Unlock()
		return
	}
	// Wait on rr.waitCh for non-failfast RPCs.
	// 如果是阻塞模式,那么需要阻塞在waitCh上,直到Up方法给通知
	if rr.waitCh == nil {
		ch = make(chan struct{})
		rr.waitCh = ch
	} else {
		ch = rr.waitCh
	}
	rr.mu.Unlock()
	for {
		select {
		case <-ctx.Done():
			err = ctx.Err()
			return
		case <-ch:
			rr.mu.Lock()
			if rr.done {
				rr.mu.Unlock()
				err = ErrClientConnClosing
				return
			}

			if len(rr.addrs) > 0 {
				if rr.next >= len(rr.addrs) {
					rr.next = 0
				}
				next := rr.next
				for {
					a := rr.addrs[next]
					next = (next + 1) % len(rr.addrs)
					if a.connected {
						addr = a.addr
						rr.next = next
						rr.mu.Unlock()
						return
					}
					if next == rr.next {
						// 遍历完一圈了,还没找到,可能刚Up的地址被down掉了,重新等待。
						break
					}
				}
			}
			// The newly added addr got removed by Down() again.
			if rr.waitCh == nil {
				ch = make(chan struct{})
				rr.waitCh = ch
			} else {
				ch = rr.waitCh
			}
			rr.mu.Unlock()
		}
	}
}

lbWatcher:
lbWatcher会监听地址变化信息,roundroubin每次有地址变化时,会将所有的地址通知给lbWatcher,lbWatcher本身维护了地址连接的map表,会找出新添加的地址和需要删除的地址,然后做连接、关闭操作,再调用roundRobin的Up/Down方法通知连接的状态。

func (bw *balancerWrapper) lbWatcher() {
	notifyCh := bw.balancer.Notify()
	if notifyCh == nil {
		// 没有定义解析器,直接连接这个地址。
		a := resolver.Address{
			Addr: bw.targetAddr,
			Type: resolver.Backend,
		}
		sc, err := bw.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{})
		if err != nil {
			grpclog.Warningf("Error creating connection to %v. Err: %v", a, err)
		} else {
			bw.mu.Lock()
			bw.conns[a] = sc
			bw.connSt[sc] = &scState{
				addr: Address{Addr: bw.targetAddr},
				s:    connectivity.Idle,
			}
			bw.mu.Unlock()
			sc.Connect()
		}
		return
	}

	for addrs := range notifyCh {
			var newAddrs []resolver.Address
			for _, a := range addrs {
				newAddr := resolver.Address{
					Addr:       a.Addr,
					Type:       resolver.Backend, // All addresses from balancer are all backends.
					ServerName: "",
					Metadata:   a.Metadata,
				}
				newAddrs = append(newAddrs, newAddr)
			}
			var (
				add []resolver.Address // Addresses need to setup connections.
				del []balancer.SubConn // Connections need to tear down.
			)
			resAddrs := make(map[resolver.Address]Address)
			for _, a := range addrs {
				resAddrs[resolver.Address{
					Addr:       a.Addr,
					Type:       resolver.Backend, // All addresses from balancer are all backends.
					ServerName: "",
					Metadata:   a.Metadata,
				}] = a
			}
			bw.mu.Lock()
			// 新添加的地址,需要去新建连接
			for a := range resAddrs {
				if _, ok := bw.conns[a]; !ok {
					add = append(add, a)
				}
			}
			// 要被删除的地址,需要去关闭连接
			for a, c := range bw.conns {
				if _, ok := resAddrs[a]; !ok {
					del = append(del, c)
					delete(bw.conns, a)
					// Keep the state of this sc in bw.connSt until its state becomes Shutdown.
				}
			}
			bw.mu.Unlock()
			for _, a := range add {
				sc, err := bw.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{})
				if err != nil {
					grpclog.Warningf("Error creating connection to %v. Err: %v", a, err)
				} else {
					bw.mu.Lock()
					bw.conns[a] = sc
					bw.connSt[sc] = &scState{
						addr: resAddrs[a],
						s:    connectivity.Idle,
					}
					bw.mu.Unlock()
					sc.Connect()//  这一步真正做了连接的操作。
				}
			}
			for _, c := range del {
				bw.cc.RemoveSubConn(c)
			}
		}
	}
}