Syncing

Wherein we distribute the fortune teller service state across multiple devices using Syncbase.

Prerequisites:
All tutorials require Vanadium installation, and must run in a bash shell.
Further, please download this script then source it:

source ~/Downloads/scenario-f-setup.sh

This command wipes tutorial files and defines shell environment variables. If you start a new terminal, and just want to re-establish the environment, see instructions here.

If you would like to generate files from this tutorial without the copy/paste steps, download and source this script. Files will be created in the $V_TUT directory.

Introduction

In this tutorial, we will extend the Syncbase local persistence tutorial so that multiple fortune teller services can exchange data.

Syncbase uses collections as the unit of synchronization. Devices join groups called syncgroups, and each syncgroup manages one or more collections and ensures that each device in the syncgroup receives updates to its collections. In this tutorial we will add the fortune collection from the local persistence tutorial to a new syncgroup, and then add multiple fortune services to the same syncgroup in order to exchange data between them.

Modifying the Service

First, we need to modify the service to join a syncgroup, and make small changes to the Add and Get RPCs.

mkdir -p $V_TUT/src/fortune/service
 cat - <<EOF >$V_TUT/src/fortune/service/service.go
{#dim}{#dim-children}package service

import (
  "fortune/ifc"
  "math/rand"
  "strconv"
  "strings"
  "sync"

  "v.io/v23/context"
  "v.io/v23/rpc"
  "v.io/v23/security"
  "v.io/v23/security/access"
  wire "v.io/v23/services/syncbase"
  "v.io/v23/syncbase"
)

// Constant names of different Syncbase entities.
const (
  fortuneDatabaseName   = "fortuneDb"
  fortuneCollectionName = "fortuneCollection"{/dim-children}{/dim}
  fortuneSyncgroupName  = "fortuneSyncgroup"
{#dim}{#dim-children}  // A special key that specifies the number of fortunes.
  numFortunesKey = "numFortunes"
)

type impl struct {
  random *rand.Rand   // To pick a random fortune
  mu     sync.RWMutex // To safely enable concurrent use.

  syncbaseName       string // The Syncbase endpoint
  remoteSyncbaseName string // A remote endpoint to connect to

  sbs syncbase.Service    // Handle to the Syncbase service
  d   syncbase.Database   // Handle to the fortunes database
  c   syncbase.Collection // Handle to the fortunes collection
}

// Makes an implementation.
func Make(ctx *context.T, syncbaseName, remoteSyncbaseName string) ifc.FortuneServerMethods {
  impl := &impl{
    random:             rand.New(rand.NewSource(99)),
    syncbaseName:       syncbaseName,{/dim-children}{/dim}
    remoteSyncbaseName: remoteSyncbaseName,
{#dim}{#dim-children}  }

  if err := impl.initSyncbase(ctx); err != nil {
    panic(err)
  }
  return impl
}

// Initialize Syncbase by establishing a new service and creating a new database
// and a new collection.
func (f *impl) initSyncbase(ctx *context.T) error {

  // Create a new service and get its database.
  sbs := syncbase.NewService(f.syncbaseName)
  d := sbs.Database(ctx, fortuneDatabaseName, nil)
  if err := d.Create(ctx, nil); err != nil {
    return err
  }

  // Create the collection where we will store fortunes.
  c := d.Collection(ctx, fortuneCollectionName)
  if err := c.Create(ctx, nil); err != nil {
    return err
  }{/dim-children}{/dim}
  // Join a syncgroup if there is someone to connect to, otherwise create one.
  sg := d.Syncgroup(ctx, fortuneSyncgroupName)
  sgPerms := access.Permissions{}
  sgPerms.Add(security.BlessingPattern(sg.Id().Blessing),
    access.TagStrings(wire.AllSyncgroupTags...)...)

  sgSpec := wire.SyncgroupSpec{
    Description: "FortuneSyncgroup",
    Perms:       sgPerms,
    Collections: []wire.Id{c.Id()},
  }
  sgInfo := wire.SyncgroupMemberInfo{SyncPriority: 10}

  if f.remoteSyncbaseName != "" {
    if _, err := sg.Join(ctx, f.remoteSyncbaseName, nil, sgInfo); err != nil {
      return err
    }
  } else {
    if err := sg.Create(ctx, sgSpec, sgInfo); err != nil {
      return err
    }
  }
{#dim}{#dim-children}  f.sbs = sbs
  f.d = d
  f.c = c
  return nil
}

// Get RPC implementation. Returns a fortune retrieved from Syncbase.
func (f *impl) Get(ctx *context.T, _ rpc.ServerCall) (string, error) {
  f.mu.RLock()
  defer f.mu.RUnlock(){/dim-children}{/dim}
  counts := make([]int, 0)
  devices := make([]string, 0)
  it := f.c.Scan(ctx, syncbase.Prefix(numFortunesKey))
  for it.Advance() {
    var numFortunes int
    dev := strings.TrimPrefix(it.Key(), numFortunesKey)
    if err := it.Value(&numFortunes); err != nil {
      return "[error]", err
    }

    if numFortunes > 0 {
      counts = append(counts, numFortunes)
      devices = append(devices, dev)
    }
  }

  if len(counts) == 0 {
    return "[empty]", nil
  }

  index := f.random.Intn(len(counts))
  count := counts[index]
  dev := devices[index]

  // Get a random number in the range [0, numKeys) and convert it to a string;
  // this acts as the key in the fortunes collection.
  key := strconv.Itoa(f.random.Intn(count)) + dev
{#dim}{#dim-children}  var value string
  if err := f.c.Get(ctx, key, &value); err == nil {
    return value, nil
  } else {
    return "[error]", err
  }
}

// Add RPC implementation. Adds a new fortune by persisting it to Syncbase.
func (f *impl) Add(ctx *context.T, _ rpc.ServerCall, fortune string) error {
  f.mu.Lock()
  defer f.mu.Unlock(){/dim-children}{/dim}

  var numKeys int
  if err := f.c.Get(ctx, numFortunesKey+f.syncbaseName, &numKeys); err != nil {
    numKeys = 0
  }

  // Put the fortune into Syncbase.
  key := strconv.Itoa(numKeys)
  if err := f.c.Put(ctx, key+f.syncbaseName, &fortune); err != nil {
    return err
  }

  // Update the number of keys.
  return f.c.Put(ctx, numFortunesKey+f.syncbaseName, numKeys+1)

{#dim}{#dim-children}}{/dim-children}{/dim}


EOF

Code Walk

Our Make function now takes one new argument: a remoteSyncbaseName. This name points to another existing Syncbase instance that this service will use to join the syncgroup. If the name is an empty string, then the service is responsible for creating the syncgroup (and other services will use the service as their remote).

The new code in the initSyncbase function sets up permissions on the new syncgroup, adds the fortune collection to it, and then tries to join a syncgroup or creates one if no remote exists.

We have changed all our Get and Put calls to Syncbase by appending the (unique) Syncbase name to each key. This is because by default, Syncbase uses a last write wins policy to resolve conflicts to the same key. A conflict arises when two Syncbases attempt to update a value for the same key.

As an example of a potential conflict, consider updating the numFortunesKey value concurrently on two services. If both services update the value from 1 to 2, it means two fortunes were added (one by each service), and the true value should be 3. Instead, Syncbase will simply pick the latest value written (2 in this scenario), and our data will be inconsistent. Adding a unique identifier for each key prevents these conflicts since concurrent updates to the same key never occur.

This makes the Get RPC slightly more complex. We use the Scan method with a prefix to read the number of fortunes each Syncbase has added (each number is stored in a unique key, but Syncbase can perform scan on prefixes). We pick one of the Syncbases at random, and then randomly pick one of the fortunes from that Syncbase.

Server

The server requires a small change: we need to pass a remote Syncbase name to the service's Make function.

mkdir -p $V_TUT/src/fortune/server
 cat - <<EOF >$V_TUT/src/fortune/server/main.go
package main

{#dim}{#dim-children}import (
  "fmt"
  "flag"
  "fortune/ifc"
  "fortune/server/util"
  "fortune/service"
  "log"

  "v.io/v23"
  "v.io/v23/rpc"
  "v.io/x/ref/lib/signals"
  _ "v.io/x/ref/runtime/factories/generic"
)

var (
  serviceName = flag.String(
    "service-name", "",
    "Name for service in default mount table.")
  syncbaseName = flag.String(
    "sb-name", "",
    "Name of Syncbase service"){/dim-children}{/dim}
  remoteSyncbaseName = flag.String(
    "remote-sb-name", "",
    "Name of remote Syncbase service")
{#dim}{#dim-children})

func main() {
  ctx, shutdown := v23.Init()
  defer shutdown(){/dim-children}{/dim}
  fortune := ifc.FortuneServer(service.Make(ctx, *syncbaseName,  *remoteSyncbaseName))
{#dim}{#dim-children}  // If the dispatcher isn't nil, it's presumed to have
  // obtained its authorizer from util.MakeAuthorizer().
  dispatcher := util.MakeDispatcher()

  // Start serving.
  var err error
  var server rpc.Server
  if dispatcher == nil {
    // Use the default dispatcher.
    _, server, err = v23.WithNewServer(
      ctx, *serviceName, fortune, util.MakeAuthorizer())
  } else {
    _, server, err = v23.WithNewDispatchingServer(
      ctx, *serviceName, dispatcher)
  }
  if err != nil {
    log.Panic("Error serving service: ", err)
  }
  endpoint := server.Status().Endpoints[0]
  util.SaveEndpointToFile(endpoint)
  fmt.Printf("Listening at: %v\n", endpoint)

  // Wait forever.
  <-signals.ShutdownOnSignals(ctx)
}{/dim-children}{/dim}

EOF

Install the client and server:

go install fortune/server
go install fortune/client

Run Your Code

We will start a Syncbase instance like before:

syncbased \
  --v23.credentials=$V_TUT/cred/alice \
  --v23.tcp.address=127.0.0.1:0 > $V_TUT/endpoint1  2> /dev/null &
TUT_PID_SB1=$!
while [ ! -s $V_TUT/endpoint1 ]; do sleep 1; done

Then we will start a server:

rm -f $V_TUT/server1.txt $V_TUT/server2.txt
$V_TUT/bin/server \
  --v23.credentials=$V_TUT/cred/alice \
  --v23.tcp.address=127.0.0.1:0 \
  --endpoint-file-name=$V_TUT/server1.txt \
  --sb-name=`cat $V_TUT/endpoint1 | grep 'ENDPOINT=' | cut -d'=' -f2` &> /dev/null &
TUT_PID_SERVER1=$!

Now, we will start a second Syncbase. We will store the endpoint in a different file $V_TUT/endpoint2:

syncbased \
  --v23.tcp.address=127.0.0.1:0 \
  --v23.credentials=$V_TUT/cred/alice > $V_TUT/endpoint2 2> /dev/null &
TUT_PID_SB2=$!
while [ ! -s $V_TUT/endpoint2 ]; do sleep 1; done

Now start another server, and pass the first Syncbase endpoint as the remote Syncbase name:

$V_TUT/bin/server \
  --v23.credentials=$V_TUT/cred/alice \
  --v23.tcp.address=127.0.0.1:0 \
  --endpoint-file-name=$V_TUT/server2.txt \
  --sb-name=`cat $V_TUT/endpoint2 | grep 'ENDPOINT=' | cut -d'=' -f2` \
  --remote-sb-name=`cat $V_TUT/endpoint1 | grep 'ENDPOINT=' | cut -d'=' -f2` &> /dev/null &
TUT_PID_SERVER2=$!

We can now make RPC calls:

$V_TUT/bin/client \
  --v23.credentials=$V_TUT/cred/alice \
  --server=`cat $V_TUT/server1.txt` \
  --add "The greatest risk is not taking one."

$V_TUT/bin/client \
  --v23.credentials=$V_TUT/cred/alice \
  --server=`cat $V_TUT/server2.txt` \
  --add "A new challenge is near."

Now get the fortunes back:

$V_TUT/bin/client \
  --v23.credentials=$V_TUT/cred/alice \
  --server=`cat $V_TUT/server1.txt`

Calling the Get RPC above enough times should return both the first and second fortune eventually, even though the two fortunes were added to different services. Syncbases exchange data in the fortune collection using the syncgroup we set up.

Cleanup

To clean up, kill the servers, Syncbase instances, and remove any temporary files.

kill_tut_process TUT_PID_SERVER1
kill_tut_process TUT_PID_SERVER2
kill_tut_process TUT_PID_SB1
kill_tut_process TUT_PID_SB2
rm -f $V_TUT/server1.txt
rm -f $V_TUT/server2.txt

Summary

There is a lot more you can do with Syncbase. To dive deeper, see the Syncbase tutorial.