DB plugins

Datasync plugin

Datasync defines the interfaces for the abstraction of data synchronization between app plugins and different backend data sources such as data stores, message buses, or RPC-connected clients.

Data synchronization is about multiple data sets that need to be synchronized whenever a particular event is published. The event can be published by:

  • database (when particular data was changed);
  • message bus (such as consuming messages from Kafka topics);
  • or by RPC clients (using GRPC or REST calls ).

The data synchronization APIs are centered around watching and publishing data change events. These events are processed asynchronously.

The data handled by one plugin can have references to the data of another plugin. Therefore, a proper time/order sequence of data resynchronization between plugins needs to be maintained. The datasync plugin initiates a full data resync in the same order as the other plugins have been registered in Init().

Note

In the discussions that follow, the term agent is used to describe a server. A KV data store holds data/configuration information for multiple agents (servers). Mechanisms are introduced to propagate data/configurations changes from the KV data store to the different agents. In addition these changes may require a resynchronization.

Watch data API

Watch data API (see Watch data API Functions figure below) is used by app plugins to:

  • Subscribe to channels for data changes using Watch(), while being “abstracted away” from the particular message source such as etcd.
  • Process a full Data RESYNC (startup & fault recovery scenarios). Feedback is provided to the user of this API (e.g. success or error) via callback.
  • Process Incremental Data CHANGE. This is an optimized variant of RESYNC, where only the minimal set of changes (deltas) needed to required to reach synchronized state is propagated to plugins. Again, feedback to the user of the API (e.g. successful configuration or an error) is returned via callback.

datasync

Watch data API Functions

This API define two types of events that a plugin be able to process:

  • Full Data RESYNC (resynchronization) event triggers a resync of the entire configuration. This event is used after an agent start/restart, or for a fault recovery scenario (e.g. when the agent’s (i.e. server) connectivity to an external data source is lost and restored).
  • Incremental Data CHANGE event triggers incremental processing of configuration changes. Each data change event contains both the previous and the new/current value. The Data synchronization is switched to this optimized mode only after a successful Full Data RESYNC.

Publish data API

Publish data API (see Publish data API Functions figure below) is used by app plugins to asynchronously publish events with data change values and still remain abstracted away from the target data store, message bus or RPC client(s).

datasync publish

Publish data API Functions

Data Broker

The Data Broker abstraction (see Broker and Watcher APIs Functions figure below) is based on two APIs:

  • Broker - used by app plugins to pull (i.e. read) data from a data store or push (i.e. write) data into the data store. Data can be retrieved for a specific key or by running a query. Data can be written for a specific key. Multiple writes can be executed in a transaction.
  • Watcher - used by app plugins to WATCH data on a specified key. Watching means to monitor for data changes and receive a notification as soon as a change occurs.

db

Broker and Watcher APIs Functions

The Broker & Watcher APIs abstract common database operations implemented by different data stores such as etcd, Redis and Cassandra. Still, there are major differences between key-value-based & sql-based data stores. Therefore the Broker & Watcher Go interfaces are defined in each package separately; while the method names for a given operation are the same, the method arguments are different.

Key-value data store

The keyval package defines the client API to access a key-value data store. It is comprised of two sub-APIs:

  • Broker supports reading and manipulation of key-value pairs.
  • Watcher provides functions for monitoring of changes in a data store.

Both interfaces are available with arguments of type []bytes (raw data) and proto.Message (protobuf formatted data).

The keyval package also provides a skeleton for a key-value plugin. A particular data store is selected in the NewSkeleton constructor using an argument of type CoreBrokerWatcher. The skeleton handles the plugin’s life-cycle and provides unified access to data stores implementing the KvPlugin interface.

etcd plugin

The etcd plugin provides access to an etcd key-value data store.

Configuration

  • Location of the etcd configuration file can be defined either by the command line flag etcd-config or by setting the ETCD_CONFIG environment variable. Examples:
vpp-agent -etcd-config=/opt/vpp-agent/dev/etcd.conf
export ETCD_CONFIG=/opt/vpp-agent/dev/etcd.conf

Status Check

  • If injected, the etcd plugin will use the Status Check plugin to periodically issue a GET request to check connection status. The etcd connection state affects the global status of the agent. If the agent cannot establish a connection with etcd, both the readiness and the liveness probe from the probe plugin will return a negative result (accessible only via a REST API in such cases).

Compacting

You can compact etcd using two ways.

  • using an API by calling plugin.Compact() which will compact the database to the current revision.
  • using a config file by setting auto-compact option to the duration of period that wish the etcd to be compacted.

Reconnect resynchronization

  • If connection to the etcd is interrupted, resync can be automatically called after re-connection. This option is disabled by default but can be enabled in the etcd.conf file.

Set resync-after-reconnect to true to enable the feature.

Redis

The code snippets below provide examples to help you get started. For simplicity, error handling is omitted.

Need to import following dependencies

import "github.com/ligato/cn-infra/db/keyval/kvproto"
import "github.com/ligato/cn-infra/db/keyval/redis"
import "github.com/ligato/cn-infra/utils/config"
import "github.com/ligato/cn-infra/logging/logrus"

Define client configuration based on your Redis installation.

  • Single Node var cfg redis.NodeConfig
  • Sentinel Enabled Cluster var cfg redis.SentinelConfig
  • Redis Cluster var cfg redis.ClusterConfig

You can initialize any of the above configuration instances in memory, or load the settings from a file using

err = config.ParseConfigFromYamlFile(configFile, &cfg)

You can also load any of the three configuration files using

var cfg interface{}
cfg, err := redis.LoadConfig(configFile)

Create connection from configuration

client, err := redis.CreateClient(cfg)
db, err := redis.NewBytesConnection(client, logrus.DefaultLogger())

Create Brokers / Watchers from connection

//create broker/watcher that share the same connection pools.
bytesBroker := db.NewBroker("some-prefix")
bytesWatcher := db.NewWatcher("some-prefix")

// create broker/watcher that share the same connection pools,
// capable of processing protocol-buffer generated data.
wrapper := kvproto.NewProtoWrapper(db)
protoBroker := wrapper.NewBroker("some-prefix")
protoWatcher := wrapper.NewWatcher("some-prefix")

Perform CRUD operations

// put
err = db.Put("some-key", []byte("some-value"))
err = db.Put("some-temp-key", []byte("valid for 20 seconds"),
             datasync.WithTTL(20*time.Second))

// get
value, found, revision, err := db.GetValue("some-key")
if found {
   ...
}

// Note: flight.Info implements proto.Message.
f := flight.Info{
       Airline:  "UA",
       Number:   1573,
       Priority: 1,
    }
err = protoBroker.Put("some-key-prefix", &f)
f2 := flight.Info{}
found, revision, err = protoBroker.GetValue("some-key-prefix", &f2)

// list
keyPrefix := "some"
kv, err := db.ListValues(keyPrefix)
for {
   kv, done := kv.GetNext()
   if done {
       break
   }
    key := kv.GetKey()
   value := kv.GetValue()
}

// delete
found, err := db.Delete("some-key")
// or, delete all keys matching the prefix "some-key".
found, err := db.Delete("some-key", datasync.WithPrefix())

// transaction
var txn keyval.BytesTxn = db.NewTxn()
txn.Put("key101", []byte("val 101")).Put("key102", []byte("val 102"))
txn.Put("key103", []byte("val 103")).Put("key104", []byte("val 104"))
err := txn.Commit()

Subscribe to key space events

watchChan := make(chan keyval.BytesWatchResp, 10)
err = db.Watch(watchChan, "some-key")
for {
   select {
    case r := <-watchChan:
       switch r.GetChangeType() {
       case datasync.Put:
           log.Infof("KeyValProtoWatcher received %v: %s=%s", r.GetChangeType(),
                     r.GetKey(), string(r.GetValue()))
       case datasync.Delete:
           ...
       }
   ...
   }
}

Note

You must configure Redis so it may publish key space events.

config SET notify-keyspace-events KA

Resiliency

Connection/read/write time-outs, failover, reconnection and recovery are validated by running the airport example against a Redis Sentinel Cluster. Redis nodes are paused selectively to simulate server down:

$ docker-compose ps
Name Command State Ports
dockerredissentinel_master_1 docker-entrypoint.sh redis … Paused 6379/tcp
dockerredissentinel_slave_1 docker-entrypoint.sh redis … Up 6379/tcp
dockerredissentinel_slave_2 docker-entrypoint.sh redis … Up 6379/tcp
dockerredissentinel_sentinel_1 sentinel-entrypoint.sh Up 26379/tcp, 6379/tcp
dockerredissentinel_sentinel_2 sentinel-entrypoint.sh Up 26379/tcp, 6379/tcp
dockerredissentinel_sentinel_3 sentinel-entrypoint.sh Up 26379/tcp, 6379/tcp

Redis is the implementation of the key-value Data Broker client API for the Redis key-value data store. The entity BytesConnectionRedis provides access to CRUD as well as event subscription API’s.

   +-----+   (Broker)   +------------------------+ -->  CRUD      +-------+ -->
   | app |                   |  BytesConnectionRedis  |                 | Redis |
   +-----+    <-- (KeyValProtoWatcher)  +------------------------+  <--  events    +-------+

Consul plugin

The Consul plugin provides access to a consul key-value data store.

Configuration

  • Location of the Consul configuration file can be defined either by the command line flag consul-config or set via the CONSUL_CONFIG environment variable.

Status Check

  • If injected, the Consul plugin will use the Status Check plugin to periodically issue a GET request to check connection status. The Consul connection state affects the global status of the agent. If the agent cannot establish a connection with Consul, both the readiness and the liveness probe from the probe plugin will return a negative result (accessible only via a REST API in such cases).

Reconnect resynchronization

  • If connection to the Consul data store is interrupted, resync can be automatically called after re-connection. This option is disabled by default but can be enabled in the etcd.conf file. Set resync-after-reconnect to true to enable the feature.

FileDB

The fileDB plugin uses the file system of an operating system as a key-value data store. The filesystem plugin watches for pre-defined files or directories, reads a configuration and sends response events according to changes.

All configuration is resynced in the beginning (as for standard key-value data store). Configuration files then can be added, updated, moved, renamed or removed, plugin makes all of the necessary changes.

Important

FileDB as datastore is read-only from the plugin perspective. Changes from within the plugin are not allowed.

Configuration

All files/directories used as a data store must be defined in the configuration file. Location of the file can be defined either by the command line flag filedb-config or set via the FILEDB_CONFIG environment variable.

Supported formats

  • JSON (*.json)
  • YAML (*.yaml)

Data structure

Currently only JSON and YAML-formatted data is supported. JSON format follows:

{
    "data": [
        {
            "key": "<key>",
            "value": {
                <proto-modelled data>
            }
        },
        {
            "key": "<key>",
            "value": {
                <proto-modelled data>
            }
        },
        ...
    ]
}

For YAML:

---
data:
    -
        key: '<key>'
        value: '<modelled data>'

Key must contain instance prefix with a microservice label. This is so the plugin knows which portions of the configuration are applicable to it. All configuration data is stored internally in a local database. This allows one to compare events and respond with the correct previous value for a given key.

Data state propagation

Data types supporting status propagation (e.g. interfaces or bridge domains) can store their state in the filesystem. There is a field in the configuration file called status-path which must be set in order to store status. Status data will be stored in JSON or YAML formats.