探秘负载均衡

前言

今天是国庆的第3天,在此恭祝小哥哥和小姐姐们⛱️节日快乐! 微信朋友圈🀄️全部是 旅途ING。

对于我这种社交尴尬者,⛱️度假最好的方式还是学习。

今日带来了 自己对于负载均衡(load balance)的学习。

度假最好的方式是学习

GRPC在负载均衡的设计: Load-balancing.md

为什么要负载均衡

在如今的互联网领域,对于数据的大爆发,高频的请求,对于提供服务的企业来说是不小的IT资源开销。为了解决资源中的负载分配,并且使得资源的利用率达到最大。出现了负载均衡,主要为了解决 高并发 和 高可用。

负载均衡的实现方式有:软件 和 硬件。这次我学习的主是软件方式。

负载均衡的算法

  • 轮询( Round Robin ) & 加权轮询( Weight Round Robin ) :
  • 随机 & 加权随机
  • 最少连接( Last Connections ) : 通过将流量引导到第一个可用服务器然后将该服务器移动到队列底部来轮换服务器,当服务器具有相同的规格并且没有很多持久连接时最有用。
  • 哈希( Hash ) eg: ip hash
  • 一致性哈希( Consistent Hash ) eg: request_url 一致性哈希
  • 最少响应时间 : 将流量定向到活动连接最少且平均响应时间最短的服务器。

常用的负载均衡手段

Load-balancing.md 已经介绍了以下几种

代理模式

代理模式需要可靠的并且能报告负载到负载均衡的系统的客户端,这种需要额外的资源来处理、并且负载均衡系统包含了每个RPC请求和响应的副本,增加了服务延迟

重客户端模式

这种方式、是绝大部分负载均衡的逻辑放置在客户端。( 可以从列表重选择服务器的负载均衡策略(循环、随机等),服务器列表要么配置的、要么由其他服务提供、举个例子:比如 kafka、pulsar、redis 等等集群模式的客户端 ) 这种模式需要已多种语言的方式提供负载均衡策略、这种增加了客户端代码的复杂性。

外部负载均衡模式 (重服务端 轻客户端)

客户端保持简单、并且可以非常简单可移植。实现用于服务器选择的著名算法。复杂的负载平衡算法由负载均衡器提供,客户端仅仅需要从负载均衡器提供的配置和服务器列表里面选择,并向它们发送客户端的请求。负载均衡器需要从这些服务器列表中选取出健康负载的服务器和剔出不可用服务器。在负载均衡器做出选举和其他决策的时候需要通知到客户端做更新。负载均衡器可以从后端服务中收集负载和健康信息。

负载均衡 与 SSL

我们都知道 SSL 是用于 Web服务器 和 浏览器 之间的建立加密链接的标准的安全技术方式,SSL 流量通常在负载均衡器上进行解密,当负载均衡器在传递请求之前解密流量或请求的时候,叫做 SSL 终止。负载均衡器使得 Web 服务器不需要花费额外的资源来进行解密操作。 但这样也带来了一定的安全风险,一般在同一个数据中心的,风险会低一些,这是一种解决方式;第二种的话就是 SSL pass-through,就是 负载均衡器 仅仅将加密的请求传递到 Web服务器,然后通过Web服务器进行解密,其他非法请求进行拦截。

负载均衡的好处

负载均衡是一个网络流量监控的👮‍的角色,负载均衡工作与 OSI (应、表、会、传、网、数、物),的 4 到 7 层

  • L4: 根据网络和传输层协议的数据(IP 和 TCP 端口)引导流量
  • L7: 将内容切换到负载均衡上去, 诸如 HTTP 标头, REQUEST URL,SSL 会话等等

回头看 负载均衡 在 微服务架构体系 里面的应用

往往在微服务架构体系里面,负载均衡 归属应用于 服务注册与发现 这一大环节内的。在引入微服务架构,往往伴随的是集群部署环境,这边就需要考虑到服务容错性、服务的自动注册、服务的自动发现。

负载均衡的均衡方式

  • Proxy eg: nginx or envoy
  • Client Side eg: grpc or dubbo

Dubbo 内部的 负载均衡 实现

Dubbo

此次查阅是基于 Apache Dubbo 3.0 版本进行的源码阅读与分析,其核心设计其实还有 route 篇幅有限,不做介绍。(后面会专门出一讲讲述 Apache Dubbo 3.0 的贴合云原生的服务架构模型设计)。

如图,实现了 随机、轮询、一致性哈希、最少活跃调用数

dubbo-go-3.0负载均衡代码目录

核心代码解析:

doubbo-go/cluster/loadbalance.go

// LoadBalance
// Extension - LoadBalance  定义了一个负载均衡的接口,其他代码省略
type LoadBalance interface {
Select([]protocol.Invoker, protocol.Invocation) protocol.Invoker
}
  • 权重随机 doubbo-go/cluster/loadbalance/random.go 看其实现的 Select 方法
func (lb *randomLoadBalance) Select(invokers []protocol.Invoker, invocation protocol.Invocation) protocol.Invoker {
	var length int
	// 判断 Invoker 数量,如果只有一个直接返回
	if length = len(invokers); length == 1 {
		return invokers[0]
	}
	// 遍历 Invokers 计算 TotalWeight 和 SameWeight
	// 如果 TotalWeight > 0 并且 SameWeight 为 false 则随机一个 offset,
	// 然后遍历 weights,应offset挨个减去 weight 后 小于0,则这个位置的 invoker 被选中,
	// 如果都没选中,则从 invokers 列表里面 随机返回一个 
	sameWeight := true
	weights := make([]int64, length)

	firstWeight := GetWeight(invokers[0], invocation)
	totalWeight := firstWeight
	weights[0] = firstWeight

	for i := 1; i < length; i++ {
		weight := GetWeight(invokers[i], invocation)
		weights[i] = weight

		totalWeight += weight
		if sameWeight && weight != firstWeight {
			sameWeight = false
		}
	}

	if totalWeight > 0 && !sameWeight {
		// If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
		offset := rand.Int63n(totalWeight)

		for i := 0; i < length; i++ {
			offset -= weights[i]
			if offset < 0 {
				return invokers[i]
			}
		}
	}
	// If all invokers have the same weight value or totalWeight=0, return evenly.
	return invokers[rand.Intn(length)]
}
  • 轮询
// Select gets invoker based on round robin load balancing strategy
func (lb *roundRobinLoadBalance) Select(invokers []protocol.Invoker, invocation protocol.Invocation) protocol.Invoker {
	count := len(invokers)
	if count == 0 {
		return nil
	}
	if count == 1 {
		return invokers[0]
	}
    // 组装KEY
	key := invokers[0].GetURL().Path + "." + invocation.MethodName()
	// 根据KEY从缓存中获取,没有就存起来
	cache, _ := methodWeightMap.LoadOrStore(key, &cachedInvokers{})
	cachedInvokers := cache.(*cachedInvokers)

	var (
		clean               = false
		totalWeight         = int64(0)
		maxCurrentWeight    = int64(math.MinInt64)
		now                 = time.Now()
		selectedInvoker     protocol.Invoker
		selectedWeightRobin *weightedRoundRobin
	)

    // 遍历 Invokers 
	for _, invoker := range invokers {
		weight := GetWeight(invoker, invocation)
		if weight < 0 {
			weight = 0
		}
        // 将 url 转换成身份标识
		identifier := invoker.GetURL().Key()
		// 从map中获取,没有就存起来
		loaded, found := cachedInvokers.LoadOrStore(identifier, &weightedRoundRobin{weight: weight})
		weightRobin := loaded.(*weightedRoundRobin)
		if !found {
			clean = true
		}

		if weightRobin.Weight() != weight {
			weightRobin.setWeight(weight)
		}

		currentWeight := weightRobin.increaseCurrent()
		weightRobin.lastUpdate = &now

        // 如果当前权重 > max 
		if currentWeight > maxCurrentWeight {
		    // max 设置成赋值当前权重
			maxCurrentWeight = currentWeight
			selectedInvoker = invoker
			selectedWeightRobin = weightRobin
		}
		totalWeight += weight
	}
    
    // 当 invoker 列表 与 缓存的Invoker(即 Map大小) 大小不一致的时候,就需要清理缓存中的数据
    // 超过 recyclePeriod 时间没有更新过的 做清理动作
	cleanIfRequired(clean, cachedInvokers, &now)

    // 如果选中的 invoker 的 weightedRoundRobin 不为空,
    // 则将 weightedRoundRobin 的 crurent 设置为 负的 totalWeight
    // 然后 返回 选中的 invoker
    // 这边为什么设置为负的: (是不是感觉设计的精妙)
    //    totalWeight 为所有的权重的和,这次选中了这个,这个变成了负的
    //    下轮肯定轮不到这个 invoker 因为他的权重是最小的,后面所有的都被选中的了一次,那么所有的全变成负的了
    //    下次再选举的时候,再从负的里面找到最大的,所以又一次被选中,这样又变成正数了。
	if selectedWeightRobin != nil {
		selectedWeightRobin.Current(totalWeight)
		return selectedInvoker
	}

    // 上面一轮下来啥都没有,那就选择第一个
	// should never happen
	return invokers[0]
}

func cleanIfRequired(clean bool, invokers *cachedInvokers, now *time.Time) {
	if clean && atomic.CompareAndSwapInt32(&state, COMPLETE, UPDATING) {
		defer atomic.CompareAndSwapInt32(&state, UPDATING, COMPLETE)
		invokers.Range(func(identify, robin interface{}) bool {
			weightedRoundRobin := robin.(*weightedRoundRobin)
			elapsed := now.Sub(*weightedRoundRobin.lastUpdate).Nanoseconds()
			if elapsed > recyclePeriod {
				invokers.Delete(identify)
			}
			return true
		})
	}
}

// Record the weight of the invoker
type weightedRoundRobin struct {
    // 权重
	weight     int64
	current    int64
	// 最后修改时间
	lastUpdate *time.Time
}

func (robin *weightedRoundRobin) Weight() int64 {
	return atomic.LoadInt64(&robin.weight)
}

func (robin *weightedRoundRobin) setWeight(weight int64) {
	robin.weight = weight
	robin.current = 0
}

func (robin *weightedRoundRobin) increaseCurrent() int64 {
	return atomic.AddInt64(&robin.current, robin.weight)
}

func (robin *weightedRoundRobin) Current(delta int64) {
	atomic.AddInt64(&robin.current, -1*delta)
}

// 缓存, key = 
type cachedInvokers struct {
	sync.Map /*[string]weightedRoundRobin*/
}
  • 一致性哈希
// Select gets invoker based on load balancing strategy
func (lb *consistentHashLoadBalance) Select(invokers []protocol.Invoker, invocation protocol.Invocation) protocol.Invoker {
	methodName := invocation.MethodName()
	key := invokers[0].GetURL().ServiceKey() + "." + methodName

    // 遍历整个 invoker 列表, 挨个执行 json.Marshal 操作,然后将 bytes[] 添加到bs中
    // 通过 crc32.ChecksumIEEE(bs) 计算 hashCode, 对比 selectors 中的 key 是否一致,一致选中,
    // 不一致则通过 newConsistentHashSelector 重新设置一个 
    
	// hash the invokers
	bs := make([]byte, 0)
	for _, invoker := range invokers {
		b, err := json.Marshal(invoker)
		if err != nil {
			return nil
		}
		bs = append(bs, b...)
	}
	hashCode := crc32.ChecksumIEEE(bs)
	selector, ok := selectors[key]
	if !ok || selector.hashCode != hashCode {
		selectors[key] = newConsistentHashSelector(invokers, methodName, hashCode)
		selector = selectors[key]
	}
	return selector.Select(invocation)
}

// 定于了 hashCode、replicaNum、virtualInvokers 等等参数
// consistentHashSelector implementation of Selector:get invoker based on load balancing strategy
type consistentHashSelector struct {
	hashCode        uint32
	replicaNum      int
	virtualInvokers map[uint32]protocol.Invoker
	keys            gxsort.Uint32Slice
	argumentIndex   []int
}
// ..... 后面的代码省略

GRPC 内部的 负载均衡 实现

GRPC

GRPC 定义了 负载均衡的工作流程 和 接口

grpc-load-balancing

  • 客户端服务在启动的时候,首先根据 Name Resolver 去解析,解析成是 LB 地址 还是 IP列表地址。(这个IP列表,标记这个这个是服务器地址还是LB地址,对于请求的客户端的负载均衡策略和服务配置等)
  • 客户端根据返回,实例化请求策略( LB 的话 使用 grpclb 策略,否则 客户端使用服务配置的请求的负载均衡策略)
  • 负载均衡策略为每个服务器地址创建一个 Channel
  • 当有请求的时候,负载均衡策略决定使用哪个子通道。

grpc-go负载均衡代码目录

grpc-go/balancer/base/base.go

// PickerBuilder creates balancer.Picker.
type PickerBuilder interface {
	// Build returns a picker that will be used by gRPC to pick a SubConn.
	Build(info PickerBuildInfo) balancer.Picker
}

grpc-go/balancer/balancer.go

type Picker interface {
	// Pick returns the connection to use for this RPC and related information.
	//
	// Pick should not block.  If the balancer needs to do I/O or any blocking
	// or time-consuming work to service this call, it should return
	// ErrNoSubConnAvailable, and the Pick call will be repeated by gRPC when
	// the Picker is updated (using ClientConn.UpdateState).
	//
	// If an error is returned:
	//
	// - If the error is ErrNoSubConnAvailable, gRPC will block until a new
	//   Picker is provided by the balancer (using ClientConn.UpdateState).
	//
	// - If the error is a status error (implemented by the grpc/status
	//   package), gRPC will terminate the RPC with the code and message
	//   provided.
	//
	// - For all other errors, wait for ready RPCs will wait, but non-wait for
	//   ready RPCs will be terminated with this error's Error() string and
	//   status code Unavailable.
	Pick(info PickInfo) (PickResult, error)
}

grpc-go/balancer/base/balancer.go

func (p *errPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
	return balancer.PickResult{}, p.err
}
  • 轮询 grpc-go/balancer/roundrobin/roundrobin.go
type rrPickerBuilder struct{}

// Builder 操作,当GRPC 有节点更新的时候
func (*rrPickerBuilder) Build(info base.PickerBuildInfo) balancer.Picker {
	logger.Infof("roundrobinPicker: Build called with info: %v", info)
	if len(info.ReadySCs) == 0 {
		return base.NewErrPicker(balancer.ErrNoSubConnAvailable)
	}
	scs := make([]balancer.SubConn, 0, len(info.ReadySCs))
	for sc := range info.ReadySCs {
		scs = append(scs, sc)
	}
	return &rrPicker{
		subConns: scs,
		// Start at a random index, as the same RR balancer rebuilds a new
		// picker when SubConn states change, and we don't want to apply excess
		// load to the first server in the list.
		next: grpcrand.Intn(len(scs)),
	}
}

type rrPicker struct {
	// subConns is the snapshot of the roundrobin balancer when this picker was
	// created. The slice is immutable. Each Get() will do a round robin
	// selection from it and return the selected SubConn.
	subConns []balancer.SubConn

	mu   sync.Mutex
	next int
}

// 开始选择了操作了 挑选节点
func (p *rrPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
	p.mu.Lock()
	sc := p.subConns[p.next]
	p.next = (p.next + 1) % len(p.subConns)
	p.mu.Unlock()
	return balancer.PickResult{SubConn: sc}, nil
}
  • 加权轮询 grpc-go/balancer/weightedroundrobin/weightedroundrobin.go

// attributeKey is the type used as the key to store AddrInfo in the Attributes
// field of resolver.Address.
type attributeKey struct{}

// AddrInfo will be stored inside Address metadata in order to use weighted
// roundrobin balancer.
type AddrInfo struct {
	Weight uint32
}

// SetAddrInfo returns a copy of addr in which the Attributes field is updated
// with addrInfo.
//
// Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func SetAddrInfo(addr resolver.Address, addrInfo AddrInfo) resolver.Address {
	addr.Attributes = addr.Attributes.WithValues(attributeKey{}, addrInfo)
	return addr
}

// GetAddrInfo returns the AddrInfo stored in the Attributes fields of addr.
//
// Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a
// later release.
func GetAddrInfo(addr resolver.Address) AddrInfo {
	v := addr.Attributes.Value(attributeKey{})
	ai, _ := v.(AddrInfo)
	return ai
}

Envoy 负载均衡

Envoy

  • 轮询
  • 加权最少请求
  • 环哈希(一致性哈希)
  • Maglev
  • 随机
  • IP哈希
  • 优先级划分

这边预留思考探讨了~~~ 向读者去了解一下 Envoy 如何实现这些的负载均衡方式的?

参考文档

comments powered by Disqus