Not finished. Sorry

Caveats

  • Examples are in go for reasons
  • Probably implementable in other languages\protocols
  • Everything runs on Kubernetes
  • Code is incomplete
  • Bugs are included free of charge

TL;DR

Frontend thing depends on a background process.

One Frontend gopher talks to one backend gopher

But there aren’t usually 1 to 1 relationships

Many front end gophers to many backend gophers
Kubernetes logo with gophers

Kubernetes Service

kind: Service
apiVersion: v1
metadata:
    name: rng-clusterip
spec:
    selector:
        provides: rng
    ports:
        - protocol: TCP
        name: grpclb
        port: 8081
        targetPort: 8081

And dialing that service

conn, err := grpc.Dial(
    "rng-clusterip",
    grpc.WithInsecure()
)
  • Pros:
    • Built in
    • Simple and effective
    • Automatic DNS
    • Virtual IP for acccess
    • TCP or UDP stream based,
      Works for any application
  • Cons:
    • Deterministicish round robin
    • TCP or UDP stream based,
      Only works on the TCP protocol

Bugger

gRPC Client Side Balancing

3ish chunks of grpc balancing code

  • Resolver: Finds
  • Balancer: Maintains connections
  • Picker: Picks which connection to send the request over

Pickers

Round robin doesn’t work for everything

type PickerBuilder interface {
    Build(
        readySCs map[resolver.Address]balancer.SubConn
    ) balancer.Picker
}

type Picker interface {
    Pick(
        ctx context.Context, opts PickOptions
    ) (conn SubConn, done func(DoneInfo), err error)
}
const baseName = "consistentHash"

func NewBuilder(key interface{}) balancer.Builder {
  name := baseName + fmt.Sprintf("%v", key)
  return base.NewBalancerBuilder(
    name,
    &hashPicker{
      Name: name, key: key
    })
}
    type contextValue string
    key = contextValue("userid")
    b := consistentHashBalancer.NewBuilder(key)
    balancer.Register(b)

    conn, err := grpc.Dial(
      "dns:///rng-headless", 
      grpc.WithBalancerName(b.Name())
    )
func (h *hashPicker) Build(
    readySCs map[resolver.Address]balancer.SubConn
    ) balancer.Picker {

  r := newRing(100)
  for address, sc := range readySCs {
    r.addNode(address, sc)
  }
  return &hashPicker{
    key:      h.key,
    subConns: r,
  }
}
func (p *hashPicker) Pick(
    ctx context.Context,
    opts balancer.PickOptions,
) (balancer.SubConn, func(balancer.DoneInfo), error) {

  if p.subConns.size <= 0 { return...  }
  value := ctx.Value(p.key)

  s, ok := value.(string)
  if !ok { return nil, nil, "derp"}

  sc := p.subConns.get(s)
  return *sc.subConn, nil, nil
}

Now let’s Find all the pods

Resolver

kind: Service
apiVersion: v1
metadata:
  name: rng-headless
spec:
  clusterIP: None
  selector:
    provides: rng
  ports:
  - protocol: TCP
    name: grpclb
    port: 8081
    targetPort: 8081
dig -tSRV _grpclb._tcp.rng-headless

_grpclb._tcp.rng-headless.BLAH. 30
IN SRV    10 25 8081 6563663761663230.rng-headless.BLAH.
_grpclb._tcp.rng-headless.BLAH. 30
IN SRV    10 25 8081 3633363663623365.rng-headless.BLAH.
...

10 25 8081 36…5.rng-headless.BLAH.

conn, err := grpc.Dial("dns:///rng-headless",
    grpc.WithInsecure(),
    grpc.WithBalancerName(roundrobin.Name),
)
Scheme://Authority/Endpoint

type Target struct {
    Scheme    string
    Authority string
    Endpoint  string
}

And that’s it

Or Not

  • Downsides
    • DNS resolution only happens on connection change or every 30 minutes
    • Large environments will spend more time on connection management than using them.

Replacing the resolver

Need something to build the resolver

type Builder interface {
    Build(
        target Target, cc ClientConn, opts BuildOption,
    ) (Resolver, error)
    Scheme() string
}

and interact with it once it’s running

type Resolver interface {
    ResolveNow(struct{})
    Close()
}
type Resolver struct {
    scheme string
    cc     resolver.ClientConn
    source resolve.ResolveClient
}
type Resolver struct {
  scheme    string
  cc        resolver.ClientConn
  source    resolve.ResolveClient
}
func (r *Resolver) Build(stuff) (
    resolver.Resolver, error,
  ) {
  r.cc = cc

  conn, _ := grpc.Dial(target.Authority)
  r.source = resolve.NewResolveClient(conn)
  c, _ := r.source.ResolveStream(
    context.Background(),
    &resolve.Source{Name: target.Endpoint },
  )
  go r.stream(c)
  return r, nil
}
for {
  rawAddresses, _ := stream.Recv()

  var backends []resolver.Address
  for _, rawAddress := range rawAddresses.Name {
    backends = append(backends, resolver.Address{
      Addr:       rawAddress,
    })
  }
  r.cc.NewAddress(backends)
}

Here be unfinished dragons

func subset(
  backends []resolver.Address,
  clientID, subsetSize int,
) []resolver.Address {

  subsetCount := len(backends) / subsetSize
  round := int64(clientID / subsetCount)
  subsetID := clientID % subsetCount
// shuffle the slice of addresses
r := rand.New(rand.NewSource(round))
shuffledBackends := make([]resolver.Address, len(backends))
for i, o := range r.Perm(len(backends)) {
  shuffledBackends[i] = backends[o]
}

start := subsetID * subsetSize
return shuffledBackends[start : start+subsetSize]

Connection distribution

Questions?

Acknowledgements

@kcollasarundell