25 January 2018

2414 words 12 mins read

Exploring Kubernetes Service Discovery and loadbalancing

Kubernetes has some great helpers that allow us to drop a service in and satisfy most requirements for service discovery and loadbalancing. Service discovery out of the box will make a dns request through Kube-DNS. Kube-DNS is (soon to be coredns) a pod running dnsmasq with a custom integration to watch events straight off the Kubernetes api. Loadbalancing is provided out of the box by Kube-proxy. Which provides TCP stream balancing inside the cluster. There is an implementation using IPVS coming but that’s outside the scope of this intro. These two helpers provide a well rounded solution that fits many use cases but in some use cases the limits with these generic solutions are somewhat visible. This post will mainly go into swapping out Kube-proxy with other methods of discovering and load-balancing our requests. A later post will show how a system can get away from using Kube-DNS entirely

Let’s start with a quick kube101. Starting with a deployment (see the repo) that creates several backend pods. These pods provide a service that other deployments consume. In this case it’s a super boring non-cryptographic provider of random numbers. The quickest way to expose it is with a Service. Such as this one:

kind: Service
apiVersion: v1
metadata:
  name: rng-clusterip
spec:
  selector:
    provides: rng
  ports:
  - protocol: TCP
    name: grpclb
    port: 8081
    targetPort: 8081

The kube-controller-manager sees this new service pop into existence in etcd and allocates a clusterIP1 and other labels that let kube-proxy and kube-dns find it.

kube-proxy running on each and every node sees a new service entry and ensures that new connections being made from that host go through a decision process that end up with the TCP stream going to a valid backend pod and finally (for this posts concerns) Kube-dns adds the rather spiffy rng-clusterip.$namespace.svc.cluster.local dns entry pointing to the clusterIP.

That’s it nice and understandable. Let’s make something to consume it. We’re using gRPC here so let’s go through a quick gRPC setup

import github.com/kcollasarundell/balancing-on-k8s/rng

func main() {
  // Set up a quick gRPC connection
  // (You'll want to remove withInsecure in
  // prod and add some others (WithBlock perhaps))
  conn, err := grpc.Dial("rng-clusterip", grpc.WithInsecure())
  if err != nil {
    log.Fatalf("did not connect: %v", err)
  }
  defer conn.Close()
  // Turn it into an rng client
  // (See https://github.com/kcollasarundell/balancing-on-k8s/rng)
  c := rng.NewRngClient(conn)

  r, err := c.Rng(ctx, &rng.Source{Name: name})
  if err != nil {
    log.Printf("Wat: %v", err)
  }
  log.Printf("request %d", r.GetRN())
}

You now have a gRPC connection that connects to the rng-clusterip service. It just works. There are some downsides though. Kube-proxy can only balance TCP streams. This results in long lived connections with multiple requests going over it (http with pipelining, http2 with the many streams, etc.) will only get routed to a single backend. Leaving work possibly unbalanced.

We can work around these limitations by removing kube-proxy from our dependencies. Of course there is no such thing as a perfect solution so will add different compromises and complexity.

Removing kube-proxy

We can remove kube-proxy by creating a headless service. A headless service by itself won’t help us much. It still has the problem with long lived connections not being balanced but now there’s the added benefit of when the pods providing the service change it has to rely on the DNS library.

It’s not DNS. It can’t be DNS. It was DNS.

We can solve this. With either sidekick 3 or clientside loadbalancing. By pushing the loadbalancing decision into the pod and using more of the information available our services can make more nuanced decisions about what they depend on. Before going into clientside loadbalancing decisions let’s first get rid of that ClientIP. (Not really in this case as this creates a second service so that both can be deployed.)

kind: Service
apiVersion: v1
metadata:
  name: rng-headless # Different name allows coexistence
spec:
  clusterIP: None  # <--- this line is important
  selector:
    provides: rng
  ports:
  - protocol: TCP
    name: grpclb
    port: 8081
    targetPort: 8081

Kube-dns handles this a bit differently from the clusterIP service. Instead of a single A record it creates an A record for the rng-headless name for each of the backing pods. It also creates a full SRV(Service) record for each backing pod. Including port and protocol information for each named port. These SRV records are pretty damn awesome. The gRPC client side balancer uses these SRV records directly. You may have noted the port name above (grpclb). This allows the balancer resolver to identify the appropriate port to connect to and DNS name to look up. We don’t need to use this it just makes life easier.

Now Let’s take the gRPC connection from the clusterIP configuration and enable a dns based balancer. It’s super easy. With just a single dial option grpc.WithBalancerName(roundrobin.Name)4 and a slight change to the address

  // The dns:/// prefix is used to indicate which resolver to use
  // This will be important later (I promise)
  conn, err := grpc.Dial("dns:///rng-headless",
    grpc.WithInsecure(),
    // This tells the gRPC library to use the round robin balancer.
    // We can register other balancers and use them instead
    // of roundrobin later
    grpc.WithBalancerName(roundrobin.Name)
  )
  if err != nil {
    log.Fatalf("did not connect: %v", err)
  }
  defer conn.Close()

We could leave it here. Instead of a single task opening a single TCP connection. Processes open a TCP connection to every available backend. The gRPC resolver will periodically refresh the DNS entries and feed them into the gRPC balancer. This adds and removes connections as they come and go. Super shiny pretty much a solution that works for many many cases.

Unfortunately there are some downsides to this configuration. This balancer will be unsuited for sticky data. Roundrobining requests on a cache service will result some delightfully annoying side effects.

The default gRPC dns resolver used here also doesn’t make decisions on what to include. It just grabs everything. With this the balancer will open a connection each and every available backend. When there are only running 10s of clients and backends or just in a single region\zone this isn’t to bad. The growth in connections and background load of maintaining all these open connections can quickly get silly. Users may also find the latency for detecting changes in the DNS resolver limiting. By default the resolver only refreshes every 30 minutes 5. This might be a bit too long for users to wait when they are scaling up the backend service.

These two problems are both reasonably easily addressable. We can swap the balancer out for one that uses a consistent hash to decide on traffic routing. A later post will swap out the resolver for one that can connect directly to the kubernetes api and make decisions about which pods to pass to the balancer.

Consistent hashing is pretty shiny and while this post will go over how to do it here. the footnotes will have links to more info 6 7 8. Building a consistent hash is only part of the solution the code also has to satisfy the balancer.Builder and balancer.Picker interfaces for integration with the rest of the balancer code.

First up lets make a package, imports and pick a name. We’re limited to

// Package consistentHashBalancer is all about
// building balancers that use a consistent hash
// keyed off a specific context.Value.
package consistentHashBalancer

import (
  "context"
  "fmt"
  "sync"

  "google.golang.org/grpc/balancer"
  "google.golang.org/grpc/balancer/base"
  "google.golang.org/grpc/grpclog"
  "google.golang.org/grpc/resolver"
)

// Name is the name of consistentHash balancer.
const baseName = "consistentHash"

Next let’s create the builder. The gRPC balancer works with two major components (That are considered in this set of posts) the balancer.Builder and the resolver. The resolver watches for changes in the backend connections and establishes a slice of all suitable backends. This is then passed through the base package methods to the balancer.Builder by the Build method.


// NewBuilder creates a new consistentHash balancer builder
// that keys off of a context value for field.
// it requires a two step calling process for each value used.
// b := consistentHash.NewBuilder("keyName")
// balancer.Register(b)
// grpc.WithBalancerName(b.Name)
func NewBuilder(key interface{}) balancer.Builder {
  name := baseName + fmt.Sprintf("%v", key)
  return base.NewBalancerBuilder(name, &hashBalancerBuilder{Name: name, key: key})
}

type hashBalancerBuilder struct {
  Name string
  key  interface{}
}

Build gets a map of ready SubConns returns a picker.


func (h *hashBalancerBuilder) Build(readySCs map[resolver.Address]balancer.SubConn) balancer.Picker {
  grpclog.Infof("consistentHashBuilder: newPicker called with key: %v and readySCs: %v", h.key, readySCs)
  // It starts by creating a new hashRing with a replica count of 100.
  // replicas are discussed below and in more detail in the footnotes [^3].
  r := newRing(100)
  // Then for each SubConn it gets it add's a node to the hashRing
  for address, sc := range readySCs {
    r.addNode(address, sc)
  }
  // Once this is complete it can return the hashPicker
  return &hashPicker{
    key:      h.key,
    subConns: r,
  }
}

Before going into the hashPicker let’s go through how the hashing works.

A node is the basic structure of our hash ring. We have the id (The connections address), hashID (Combination of address and replica hashed and stored as a uint32) and a pointer to a SubConn

type node struct {
	id      string
	hashID  uint32
	subConn *balancer.SubConn
}

A ring the core component in a consistent hash. It has a slice of pointers to nodes a count of it’s current size, a lock for science and the number of replicas that will be added for each new node. With only a single replica the nodes on the ring will be spread by unequal amounts due to the differences generated by the hashing algorithm. This unequal distribution and limited number of nodes results in two problems. First there will be an unequal distribution of work to the backends. Then there is the problem of node failures. In the event of a node failure all connections that go to that backend will get distributed to the next node along. As there was only one point they were coming to they all go to the same new destination. With many replicas added for each node the spacing between each node will be smaller and smoother. In the event of a node failure the greater number of replicas also results in a larger spread of nodes that the work gets distributed to smoothing out the jump in new work.

type ring struct {
	nodes  nodes
	replicas int
	size   int
	sync.Mutex
}

// newRing does exactly what it says on the box.
func newRing(replicas int) *ring {
	return &ring{nodes: nodes{}, replicas: replicas}
}

r.addNode is where ring construction happens. For each Address and SubConn pair it adds a number of nodes to the ring (hardcoded to 100 here). the hashID function hashes the identifying information of each node (address+replica) and is stored as an uint. This uint represents where in the ring the node should sit and used when the slice is sorted.

func (r *ring) addNode(address resolver.Address, subConn balancer.SubConn) {
	r.Lock()
	defer r.Unlock()

	r.size++

	for replica := 0; replica < r.replicas; replica++ {
		node := &node{
			id:      address.Addr,
			hashID:  hashID(address.Addr + fmt.Sprintf("%d", replica)),
			subConn: &subConn,
		}
		r.nodes = append(r.nodes, node)
	}
	sort.Sort(r.nodes)
}

Once all the connections are in the sorted slice and attached to the hashPicker it is returned to the caller and can start being used. The Picker is a small struct that only needs two things.

type hashPicker struct {
  // subConns is the ring of subConns when this picker was
  // created. The ring is immutable. Each Get() will do a get on the ring
  // to find the appropriate backend based on the context.Value for key
  subConns *ring
  // key is used to find the appropriate field in the context to hash
  key interface{}
}

When the Pick method is called and after some small sanity checks it gets the value from the request context. Casts it to a string and passes it to the get method which does most of the hardwork here.


func (p *hashPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) {
  if p.subConns.size <= 0 {
    return nil, nil, balancer.ErrNoSubConnAvailable
  }
  value := ctx.Value(p.key)
  s, ok := value.(string)
  if !ok {
    return nil, nil, fmt.Errorf(
      "Invalid request, context does not contain value %v", p.key
      )
  }

  sc := p.subConns.get(s)
  return *sc.subConn, nil, nil
}

The get method expects a string and returns a pointer to a node. Get uses the same hashID function as the earlier addNode but instead of adding it to the ring it wants to find where on the slice it should exist. This is done by using the sort.Search function to identify the appropriate index. This index identifies the appropriate SubConn to return. If it’s passed the end of the slice it will wrap around and return the first SubConn 6

func (r *ring) get(id string) *node {

	s := func(i int) bool {
		return r.nodes[i].hashID >= hashID(id)
	}

	i := sort.Search(r.nodes.Len(), s)

	if i >= r.nodes.Len() {
		i = 0
	}
	return r.nodes[i]
}

And it’s done. While a ClusterIP service provides a bulletproof first implementation. The capabilities and benefits that are available by stepping past these helpers can easily justify the different complexity that are covered above. These patterns give a team the ability to implement a client side loadbalancing system not just in kuberentes (It was covered because I was lazy and already had minikube) but anywhere that can return a SRV record collection of backend services (or multiple A records but seriously just go all the way to SRV records). There are still limits to address. The resolver only refreshes every 30 minutes or on connection changes so scaling up a back end service won’t translate to more capacity for some time 5. The service may also need to make more intelligent decisions on routing the connections. The next post (which needs the demo code written before i can start to write the post.) should go into swapping out the resolver and adding these decisions.


  1. A virtual ip that gets routed by iptables ↩︎

  2. This looks shiny af (see https://buoyant.io/2017/12/05/introducing-conduit/ ) ↩︎

  3. A sidekick is a process running inside the same pod and available in the same network namespace. service meshes such as istio, linkerd or conduit2) have some great sidekicks that do loadbalancing. Going into these is a bit beyond this post. So it will come up some later time this year (Hopefully). ↩︎

  4. import "google.golang.org/grpc/balancer/roundrobin" ↩︎

  5. Refreshes also happen when connection state changes so if the dns balancer is limiting the system, removing one of the older pods can force a refresh. ↩︎

  6. Consistent hashing by Tom White ↩︎

  7. wikipedia ↩︎

  8. Akamai paper ↩︎