Support for expiring Database entries

Registering database entry expiration tracker with DatabaseStateMachine

Support for publishing database state machine snapshot installation events.
Expiry tracker will listen to these events to bootstrap its local state.

Change-Id: I8bf22c8d7bab38624341350ccc083c5ca2fcb117
diff --git a/core/api/src/main/java/org/onlab/onos/store/service/DatabaseAdminService.java b/core/api/src/main/java/org/onlab/onos/store/service/DatabaseAdminService.java
index 40d12ff..fa73546 100644
--- a/core/api/src/main/java/org/onlab/onos/store/service/DatabaseAdminService.java
+++ b/core/api/src/main/java/org/onlab/onos/store/service/DatabaseAdminService.java
@@ -20,6 +20,16 @@
     public boolean createTable(String name);
 
     /**
+     * Creates a new table where last update time will be used to track and expire old entries.
+     * Table creation is idempotent. Attempting to create a table
+     * that already exists will be a noop.
+     * @param name table name.
+     * @param ttlMillis total duration in millis since last update time when entries will be expired.
+     * @return true if the table was created by this call, false otherwise.
+     */
+    public boolean createTable(String name, int ttlMillis);
+
+    /**
      * Lists all the tables in the database.
      * @return list of table names.
      */
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
index 77ff062..0021eb7 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
@@ -35,6 +35,16 @@
         }
     }
 
+    public boolean createTable(String tableName, int ttlMillis) {
+
+        CompletableFuture<Boolean> future = copycat.submit("createTable", tableName, ttlMillis);
+        try {
+            return future.get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new DatabaseException(e);
+        }
+    }
+
     public void dropTable(String tableName) {
 
         CompletableFuture<Void> future = copycat.submit("dropTable", tableName);
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseEntryExpirationTracker.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseEntryExpirationTracker.java
index c9775ca..157d3a6 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseEntryExpirationTracker.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseEntryExpirationTracker.java
@@ -30,12 +30,14 @@
 import net.kuujo.copycat.event.EventHandler;
 import net.kuujo.copycat.event.LeaderElectEvent;
 
-import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.cluster.ControllerNode;
 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
 import org.onlab.onos.store.cluster.messaging.ClusterMessage;
 import org.onlab.onos.store.cluster.messaging.MessageSubject;
 import org.onlab.onos.store.service.DatabaseService;
 import org.onlab.onos.store.service.VersionedValue;
+import org.onlab.onos.store.service.impl.DatabaseStateMachine.State;
+import org.onlab.onos.store.service.impl.DatabaseStateMachine.TableMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,23 +54,34 @@
     public static final MessageSubject DATABASE_UPDATES = new MessageSubject(
             "database-update-event");
 
-    private DatabaseService databaseService;
-    private ClusterService cluster;
-    private ClusterCommunicationService clusterCommunicator;
+    private final DatabaseService databaseService;
+    private final ClusterCommunicationService clusterCommunicator;
 
     private final Member localMember;
+    private final ControllerNode localNode;
     private final AtomicBoolean isLocalMemberLeader = new AtomicBoolean(false);
 
     private final Map<String, Map<DatabaseRow, VersionedValue>> tableEntryExpirationMap = new HashMap<>();
 
     private final ExpirationListener<DatabaseRow, VersionedValue> expirationObserver = new ExpirationObserver();
 
-    DatabaseEntryExpirationTracker(Member localMember) {
+    DatabaseEntryExpirationTracker(
+            Member localMember,
+            ControllerNode localNode,
+            ClusterCommunicationService clusterCommunicator,
+            DatabaseService databaseService) {
         this.localMember = localMember;
+        this.localNode = localNode;
+        this.clusterCommunicator = clusterCommunicator;
+        this.databaseService = databaseService;
     }
 
     @Override
     public void tableModified(TableModificationEvent event) {
+        if (!tableEntryExpirationMap.containsKey(event.tableName())) {
+            return;
+        }
+
         DatabaseRow row = new DatabaseRow(event.tableName(), event.key());
         Map<DatabaseRow, VersionedValue> map = tableEntryExpirationMap
                 .get(event.tableName());
@@ -77,8 +90,8 @@
         case ROW_DELETED:
             if (isLocalMemberLeader.get()) {
                 try {
-                    clusterCommunicator.broadcast(new ClusterMessage(cluster
-                            .getLocalNode().id(), DATABASE_UPDATES,
+                    clusterCommunicator.broadcast(new ClusterMessage(
+                            localNode.id(), DATABASE_UPDATES,
                             DatabaseStateMachine.SERIALIZER.encode(event)));
                 } catch (IOException e) {
                     log.error(
@@ -97,12 +110,10 @@
     }
 
     @Override
-    public void tableCreated(String tableName, int expirationTimeMillis) {
-        // make this explicit instead of relying on a negative value
-        // to indicate no expiration.
-        if (expirationTimeMillis > 0) {
-            tableEntryExpirationMap.put(tableName, ExpiringMap.builder()
-                    .expiration(expirationTimeMillis, TimeUnit.SECONDS)
+    public void tableCreated(TableMetadata metadata) {
+        if (metadata.expireOldEntries()) {
+            tableEntryExpirationMap.put(metadata.tableName(), ExpiringMap.builder()
+                    .expiration(metadata.ttlMillis(), TimeUnit.SECONDS)
                     .expirationListener(expirationObserver)
                     // FIXME: make the expiration policy configurable.
                     .expirationPolicy(ExpirationPolicy.CREATED).build());
@@ -188,4 +199,28 @@
             return Objects.hash(tableName, key);
         }
     }
+
+    @Override
+    public void snapshotInstalled(State state) {
+        if (!tableEntryExpirationMap.isEmpty()) {
+            return;
+        }
+        for (String tableName : state.getTableNames()) {
+
+            TableMetadata metadata = state.getTableMetadata(tableName);
+            if (!metadata.expireOldEntries()) {
+                continue;
+            }
+
+            Map<DatabaseRow, VersionedValue> tableExpirationMap = ExpiringMap.builder()
+                    .expiration(metadata.ttlMillis(), TimeUnit.MILLISECONDS)
+                    .expirationListener(expirationObserver)
+                    .expirationPolicy(ExpirationPolicy.CREATED).build();
+            for (Map.Entry<String, VersionedValue> entry : state.getTable(tableName).entrySet()) {
+                tableExpirationMap.put(new DatabaseRow(tableName, entry.getKey()), entry.getValue());
+            }
+
+            tableEntryExpirationMap.put(tableName, tableExpirationMap);
+        }
+    }
 }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
index b2fe19f..e233e5a 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
@@ -14,7 +14,6 @@
 import java.util.concurrent.TimeUnit;
 
 import net.kuujo.copycat.Copycat;
-import net.kuujo.copycat.StateMachine;
 import net.kuujo.copycat.cluster.ClusterConfig;
 import net.kuujo.copycat.cluster.Member;
 import net.kuujo.copycat.cluster.TcpCluster;
@@ -34,6 +33,7 @@
 import org.onlab.onos.cluster.ControllerNode;
 import org.onlab.onos.cluster.DefaultControllerNode;
 import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
 import org.onlab.onos.store.service.BatchReadRequest;
 import org.onlab.onos.store.service.BatchReadResult;
 import org.onlab.onos.store.service.BatchWriteRequest;
@@ -65,6 +65,9 @@
     protected ClusterService clusterService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterCommunicationService clusterCommunicator;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected DatabaseProtocolService copycatMessagingProtocol;
 
     // FIXME: point to appropriate path
@@ -158,7 +161,13 @@
         log.info("Starting cluster: {}", cluster);
 
 
-        StateMachine stateMachine = new DatabaseStateMachine();
+        DatabaseStateMachine stateMachine = new DatabaseStateMachine();
+        stateMachine.addEventListener(
+                new DatabaseEntryExpirationTracker(
+                    clusterConfig.getLocalMember(),
+                    clusterService.getLocalNode(),
+                    clusterCommunicator,
+                    this));
         Log consensusLog = new MapDBLog(LOG_FILE_PREFIX + localNode.id(),
                                         ClusterMessagingProtocol.SERIALIZER);
 
@@ -183,6 +192,11 @@
     }
 
     @Override
+    public boolean createTable(String name, int ttlMillis) {
+        return client.createTable(name, ttlMillis);
+    }
+
+    @Override
     public void dropTable(String name) {
         client.dropTable(name);
     }
@@ -418,4 +432,4 @@
         }
         return null;
     }
-}
\ No newline at end of file
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
index 62a06b4..3b0d874 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
@@ -30,9 +30,10 @@
 import org.onlab.util.KryoNamespace;
 import org.slf4j.Logger;
 
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import com.google.common.io.ByteStreams;
 
 /**
@@ -57,6 +58,7 @@
             serializerPool = KryoNamespace.newBuilder()
                     .register(VersionedValue.class)
                     .register(State.class)
+                    .register(TableMetadata.class)
                     .register(BatchReadRequest.class)
                     .register(BatchWriteRequest.class)
                     .register(ReadStatus.class)
@@ -69,7 +71,7 @@
         }
     };
 
-    private final List<DatabaseUpdateEventListener> listeners = Lists.newLinkedList();
+    private final Set<DatabaseUpdateEventListener> listeners = Sets.newIdentityHashSet();
 
     // durable internal state of the database.
     private State state = new State();
@@ -78,34 +80,31 @@
 
     @Command
     public boolean createTable(String tableName) {
-        Map<String, VersionedValue> existingTable =
-                state.getTables().putIfAbsent(tableName, Maps.newHashMap());
-        if (existingTable == null) {
-            for (DatabaseUpdateEventListener listener : listeners) {
-                listener.tableCreated(tableName, Integer.MAX_VALUE);
-            }
-            return true;
-        }
-        return false;
+        TableMetadata metadata = new TableMetadata(tableName);
+        return createTable(metadata);
     }
 
     @Command
-    public boolean createTable(String tableName, int expirationTimeMillis) {
-        Map<String, VersionedValue> existingTable =
-                state.getTables().putIfAbsent(tableName, Maps.newHashMap());
-        if (existingTable == null) {
-            for (DatabaseUpdateEventListener listener : listeners) {
-                listener.tableCreated(tableName, expirationTimeMillis);
-            }
-            return true;
+    public boolean createTable(String tableName, int ttlMillis) {
+        TableMetadata metadata = new TableMetadata(tableName, ttlMillis);
+        return createTable(metadata);
+    }
+
+    private boolean createTable(TableMetadata metadata) {
+        Map<String, VersionedValue> existingTable = state.getTable(metadata.tableName());
+        if (existingTable != null) {
+            return false;
         }
-        return false;
+        state.createTable(metadata);
+        for (DatabaseUpdateEventListener listener : listeners) {
+            listener.tableCreated(metadata);
+        }
+        return true;
     }
 
     @Command
     public boolean dropTable(String tableName) {
-        Map<String, VersionedValue> table = state.getTables().remove(tableName);
-        if (table != null) {
+        if (state.removeTable(tableName)) {
             for (DatabaseUpdateEventListener listener : listeners) {
                 listener.tableDeleted(tableName);
             }
@@ -116,8 +115,8 @@
 
     @Command
     public boolean dropAllTables() {
-        Set<String> tableNames = state.getTables().keySet();
-        state.getTables().clear();
+        Set<String> tableNames = state.getTableNames();
+        state.removeAllTables();
         for (DatabaseUpdateEventListener listener : listeners) {
             for (String tableName : tableNames) {
                 listener.tableDeleted(tableName);
@@ -127,15 +126,15 @@
     }
 
     @Query
-    public List<String> listTables() {
-        return ImmutableList.copyOf(state.getTables().keySet());
+    public Set<String> listTables() {
+        return ImmutableSet.copyOf(state.getTableNames());
     }
 
     @Query
     public List<ReadResult> read(BatchReadRequest batchRequest) {
         List<ReadResult> results = new ArrayList<>(batchRequest.batchSize());
         for (ReadRequest request : batchRequest.getAsList()) {
-            Map<String, VersionedValue> table = state.getTables().get(request.tableName());
+            Map<String, VersionedValue> table = state.getTable(request.tableName());
             if (table == null) {
                 results.add(new ReadResult(ReadStatus.NO_SUCH_TABLE, request.tableName(), request.key(), null));
                 continue;
@@ -186,7 +185,7 @@
         boolean abort = false;
         List<WriteStatus> validationResults = new ArrayList<>(batchRequest.batchSize());
         for (WriteRequest request : batchRequest.getAsList()) {
-            Map<String, VersionedValue> table = state.getTables().get(request.tableName());
+            Map<String, VersionedValue> table = state.getTable(request.tableName());
             if (table == null) {
                 validationResults.add(WriteStatus.NO_SUCH_TABLE);
                 abort = true;
@@ -218,7 +217,7 @@
 
         // apply changes
         for (WriteRequest request : batchRequest.getAsList()) {
-            Map<String, VersionedValue> table = state.getTables().get(request.tableName());
+            Map<String, VersionedValue> table = state.getTable(request.tableName());
 
             TableModificationEvent tableModificationEvent = null;
             // FIXME: If this method could be called by multiple thread,
@@ -274,19 +273,78 @@
         return results;
     }
 
-    public class State {
+    public static class State {
 
-        private final Map<String, Map<String, VersionedValue>> tables =
-                Maps.newHashMap();
+        private final Map<String, TableMetadata> tableMetadata = Maps.newHashMap();
+        private final Map<String, Map<String, VersionedValue>> tableData = Maps.newHashMap();
         private long versionCounter = 1;
 
-        Map<String, Map<String, VersionedValue>> getTables() {
-            return tables;
+        public Map<String, VersionedValue> getTable(String tableName) {
+            return tableData.get(tableName);
+        }
+
+        void createTable(TableMetadata metadata) {
+            tableMetadata.put(metadata.tableName, metadata);
+            tableData.put(metadata.tableName, Maps.newHashMap());
+        }
+
+        TableMetadata getTableMetadata(String tableName) {
+            return tableMetadata.get(tableName);
         }
 
         long nextVersion() {
             return versionCounter++;
         }
+
+        Set<String> getTableNames() {
+            return ImmutableSet.copyOf(tableMetadata.keySet());
+        }
+
+
+        boolean removeTable(String tableName) {
+            if (!tableMetadata.containsKey(tableName)) {
+                return false;
+            }
+            tableMetadata.remove(tableName);
+            tableData.remove(tableName);
+            return true;
+        }
+
+        void removeAllTables() {
+            tableMetadata.clear();
+            tableData.clear();
+        }
+    }
+
+    public static class TableMetadata {
+        private final String tableName;
+        private final boolean expireOldEntries;
+        private final int ttlMillis;
+
+        public TableMetadata(String tableName) {
+            this.tableName = tableName;
+            this.expireOldEntries = false;
+            this.ttlMillis = Integer.MAX_VALUE;
+
+        }
+
+        public TableMetadata(String tableName, int ttlMillis) {
+            this.tableName = tableName;
+            this.expireOldEntries = true;
+            this.ttlMillis = ttlMillis;
+        }
+
+        public String tableName() {
+            return tableName;
+        }
+
+        public boolean expireOldEntries() {
+            return expireOldEntries;
+        }
+
+        public int ttlMillis() {
+            return ttlMillis;
+        }
     }
 
     @Override
@@ -319,13 +377,30 @@
             } else {
                 this.state = SERIALIZER.decode(data);
             }
+
+            // FIXME: synchronize.
+            for (DatabaseUpdateEventListener listener : listeners) {
+                listener.snapshotInstalled(state);
+            }
         } catch (Exception e) {
             log.error("Failed to install from snapshot", e);
             throw new SnapshotException(e);
         }
     }
 
+    /**
+     * Adds specified DatabaseUpdateEventListener.
+     * @param listener listener to add
+     */
     public void addEventListener(DatabaseUpdateEventListener listener) {
         listeners.add(listener);
     }
+
+    /**
+     * Removes specified DatabaseUpdateEventListener.
+     * @param listener listener to remove
+     */
+    public void removeEventListener(DatabaseUpdateEventListener listener) {
+        listeners.remove(listener);
+    }
 }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseUpdateEventListener.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseUpdateEventListener.java
index 1dc0e9d..6eb8703 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseUpdateEventListener.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseUpdateEventListener.java
@@ -16,6 +16,8 @@
 
 package org.onlab.onos.store.service.impl;
 
+import org.onlab.onos.store.service.impl.DatabaseStateMachine.TableMetadata;
+
 /**
  * Interface of database update event listeners.
  */
@@ -29,14 +31,19 @@
 
     /**
      * Notifies listeners of a table created event.
-     * @param tableName name of the table created
-     * @param expirationTimeMillis TTL for entries added to the table (measured since last update time)
+     * @param metadata metadata for the created table.
      */
-    public void tableCreated(String tableName, int expirationTimeMillis);
+    public void tableCreated(TableMetadata metadata);
 
     /**
      * Notifies listeners of a table deleted event.
      * @param tableName name of the table deleted
      */
     public void tableDeleted(String tableName);
-}
\ No newline at end of file
+
+    /**
+     * Notifies listeners of a snapshot installation event.
+     * @param snapshotState installed snapshot state.
+     */
+    public void snapshotInstalled(DatabaseStateMachine.State snapshotState);
+}