Merge "Support for expiring Database entries"
diff --git a/core/api/src/main/java/org/onlab/onos/store/service/DatabaseAdminService.java b/core/api/src/main/java/org/onlab/onos/store/service/DatabaseAdminService.java
index 40d12ff..fa73546 100644
--- a/core/api/src/main/java/org/onlab/onos/store/service/DatabaseAdminService.java
+++ b/core/api/src/main/java/org/onlab/onos/store/service/DatabaseAdminService.java
@@ -20,6 +20,16 @@
     public boolean createTable(String name);
 
     /**
+     * Creates a new table where last update time will be used to track and expire old entries.
+     * Table creation is idempotent. Attempting to create a table
+     * that already exists will be a noop.
+     * @param name table name.
+     * @param ttlMillis total duration in millis since last update time when entries will be expired.
+     * @return true if the table was created by this call, false otherwise.
+     */
+    public boolean createTable(String name, int ttlMillis);
+
+    /**
      * Lists all the tables in the database.
      * @return list of table names.
      */
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
index 77ff062..0021eb7 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseClient.java
@@ -35,6 +35,16 @@
         }
     }
 
+    public boolean createTable(String tableName, int ttlMillis) {
+
+        CompletableFuture<Boolean> future = copycat.submit("createTable", tableName, ttlMillis);
+        try {
+            return future.get();
+        } catch (InterruptedException | ExecutionException e) {
+            throw new DatabaseException(e);
+        }
+    }
+
     public void dropTable(String tableName) {
 
         CompletableFuture<Void> future = copycat.submit("dropTable", tableName);
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseEntryExpirationTracker.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseEntryExpirationTracker.java
index c9775ca..157d3a6 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseEntryExpirationTracker.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseEntryExpirationTracker.java
@@ -30,12 +30,14 @@
 import net.kuujo.copycat.event.EventHandler;
 import net.kuujo.copycat.event.LeaderElectEvent;
 
-import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.cluster.ControllerNode;
 import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
 import org.onlab.onos.store.cluster.messaging.ClusterMessage;
 import org.onlab.onos.store.cluster.messaging.MessageSubject;
 import org.onlab.onos.store.service.DatabaseService;
 import org.onlab.onos.store.service.VersionedValue;
+import org.onlab.onos.store.service.impl.DatabaseStateMachine.State;
+import org.onlab.onos.store.service.impl.DatabaseStateMachine.TableMetadata;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -52,23 +54,34 @@
     public static final MessageSubject DATABASE_UPDATES = new MessageSubject(
             "database-update-event");
 
-    private DatabaseService databaseService;
-    private ClusterService cluster;
-    private ClusterCommunicationService clusterCommunicator;
+    private final DatabaseService databaseService;
+    private final ClusterCommunicationService clusterCommunicator;
 
     private final Member localMember;
+    private final ControllerNode localNode;
     private final AtomicBoolean isLocalMemberLeader = new AtomicBoolean(false);
 
     private final Map<String, Map<DatabaseRow, VersionedValue>> tableEntryExpirationMap = new HashMap<>();
 
     private final ExpirationListener<DatabaseRow, VersionedValue> expirationObserver = new ExpirationObserver();
 
-    DatabaseEntryExpirationTracker(Member localMember) {
+    DatabaseEntryExpirationTracker(
+            Member localMember,
+            ControllerNode localNode,
+            ClusterCommunicationService clusterCommunicator,
+            DatabaseService databaseService) {
         this.localMember = localMember;
+        this.localNode = localNode;
+        this.clusterCommunicator = clusterCommunicator;
+        this.databaseService = databaseService;
     }
 
     @Override
     public void tableModified(TableModificationEvent event) {
+        if (!tableEntryExpirationMap.containsKey(event.tableName())) {
+            return;
+        }
+
         DatabaseRow row = new DatabaseRow(event.tableName(), event.key());
         Map<DatabaseRow, VersionedValue> map = tableEntryExpirationMap
                 .get(event.tableName());
@@ -77,8 +90,8 @@
         case ROW_DELETED:
             if (isLocalMemberLeader.get()) {
                 try {
-                    clusterCommunicator.broadcast(new ClusterMessage(cluster
-                            .getLocalNode().id(), DATABASE_UPDATES,
+                    clusterCommunicator.broadcast(new ClusterMessage(
+                            localNode.id(), DATABASE_UPDATES,
                             DatabaseStateMachine.SERIALIZER.encode(event)));
                 } catch (IOException e) {
                     log.error(
@@ -97,12 +110,10 @@
     }
 
     @Override
-    public void tableCreated(String tableName, int expirationTimeMillis) {
-        // make this explicit instead of relying on a negative value
-        // to indicate no expiration.
-        if (expirationTimeMillis > 0) {
-            tableEntryExpirationMap.put(tableName, ExpiringMap.builder()
-                    .expiration(expirationTimeMillis, TimeUnit.SECONDS)
+    public void tableCreated(TableMetadata metadata) {
+        if (metadata.expireOldEntries()) {
+            tableEntryExpirationMap.put(metadata.tableName(), ExpiringMap.builder()
+                    .expiration(metadata.ttlMillis(), TimeUnit.SECONDS)
                     .expirationListener(expirationObserver)
                     // FIXME: make the expiration policy configurable.
                     .expirationPolicy(ExpirationPolicy.CREATED).build());
@@ -188,4 +199,28 @@
             return Objects.hash(tableName, key);
         }
     }
+
+    @Override
+    public void snapshotInstalled(State state) {
+        if (!tableEntryExpirationMap.isEmpty()) {
+            return;
+        }
+        for (String tableName : state.getTableNames()) {
+
+            TableMetadata metadata = state.getTableMetadata(tableName);
+            if (!metadata.expireOldEntries()) {
+                continue;
+            }
+
+            Map<DatabaseRow, VersionedValue> tableExpirationMap = ExpiringMap.builder()
+                    .expiration(metadata.ttlMillis(), TimeUnit.MILLISECONDS)
+                    .expirationListener(expirationObserver)
+                    .expirationPolicy(ExpirationPolicy.CREATED).build();
+            for (Map.Entry<String, VersionedValue> entry : state.getTable(tableName).entrySet()) {
+                tableExpirationMap.put(new DatabaseRow(tableName, entry.getKey()), entry.getValue());
+            }
+
+            tableEntryExpirationMap.put(tableName, tableExpirationMap);
+        }
+    }
 }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
index b2fe19f..e233e5a 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseManager.java
@@ -14,7 +14,6 @@
 import java.util.concurrent.TimeUnit;
 
 import net.kuujo.copycat.Copycat;
-import net.kuujo.copycat.StateMachine;
 import net.kuujo.copycat.cluster.ClusterConfig;
 import net.kuujo.copycat.cluster.Member;
 import net.kuujo.copycat.cluster.TcpCluster;
@@ -34,6 +33,7 @@
 import org.onlab.onos.cluster.ControllerNode;
 import org.onlab.onos.cluster.DefaultControllerNode;
 import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
 import org.onlab.onos.store.service.BatchReadRequest;
 import org.onlab.onos.store.service.BatchReadResult;
 import org.onlab.onos.store.service.BatchWriteRequest;
@@ -65,6 +65,9 @@
     protected ClusterService clusterService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterCommunicationService clusterCommunicator;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected DatabaseProtocolService copycatMessagingProtocol;
 
     // FIXME: point to appropriate path
@@ -158,7 +161,13 @@
         log.info("Starting cluster: {}", cluster);
 
 
-        StateMachine stateMachine = new DatabaseStateMachine();
+        DatabaseStateMachine stateMachine = new DatabaseStateMachine();
+        stateMachine.addEventListener(
+                new DatabaseEntryExpirationTracker(
+                    clusterConfig.getLocalMember(),
+                    clusterService.getLocalNode(),
+                    clusterCommunicator,
+                    this));
         Log consensusLog = new MapDBLog(LOG_FILE_PREFIX + localNode.id(),
                                         ClusterMessagingProtocol.SERIALIZER);
 
@@ -183,6 +192,11 @@
     }
 
     @Override
+    public boolean createTable(String name, int ttlMillis) {
+        return client.createTable(name, ttlMillis);
+    }
+
+    @Override
     public void dropTable(String name) {
         client.dropTable(name);
     }
@@ -418,4 +432,4 @@
         }
         return null;
     }
-}
\ No newline at end of file
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
index 62a06b4..3b0d874 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseStateMachine.java
@@ -30,9 +30,10 @@
 import org.onlab.util.KryoNamespace;
 import org.slf4j.Logger;
 
-import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 import com.google.common.io.ByteStreams;
 
 /**
@@ -57,6 +58,7 @@
             serializerPool = KryoNamespace.newBuilder()
                     .register(VersionedValue.class)
                     .register(State.class)
+                    .register(TableMetadata.class)
                     .register(BatchReadRequest.class)
                     .register(BatchWriteRequest.class)
                     .register(ReadStatus.class)
@@ -69,7 +71,7 @@
         }
     };
 
-    private final List<DatabaseUpdateEventListener> listeners = Lists.newLinkedList();
+    private final Set<DatabaseUpdateEventListener> listeners = Sets.newIdentityHashSet();
 
     // durable internal state of the database.
     private State state = new State();
@@ -78,34 +80,31 @@
 
     @Command
     public boolean createTable(String tableName) {
-        Map<String, VersionedValue> existingTable =
-                state.getTables().putIfAbsent(tableName, Maps.newHashMap());
-        if (existingTable == null) {
-            for (DatabaseUpdateEventListener listener : listeners) {
-                listener.tableCreated(tableName, Integer.MAX_VALUE);
-            }
-            return true;
-        }
-        return false;
+        TableMetadata metadata = new TableMetadata(tableName);
+        return createTable(metadata);
     }
 
     @Command
-    public boolean createTable(String tableName, int expirationTimeMillis) {
-        Map<String, VersionedValue> existingTable =
-                state.getTables().putIfAbsent(tableName, Maps.newHashMap());
-        if (existingTable == null) {
-            for (DatabaseUpdateEventListener listener : listeners) {
-                listener.tableCreated(tableName, expirationTimeMillis);
-            }
-            return true;
+    public boolean createTable(String tableName, int ttlMillis) {
+        TableMetadata metadata = new TableMetadata(tableName, ttlMillis);
+        return createTable(metadata);
+    }
+
+    private boolean createTable(TableMetadata metadata) {
+        Map<String, VersionedValue> existingTable = state.getTable(metadata.tableName());
+        if (existingTable != null) {
+            return false;
         }
-        return false;
+        state.createTable(metadata);
+        for (DatabaseUpdateEventListener listener : listeners) {
+            listener.tableCreated(metadata);
+        }
+        return true;
     }
 
     @Command
     public boolean dropTable(String tableName) {
-        Map<String, VersionedValue> table = state.getTables().remove(tableName);
-        if (table != null) {
+        if (state.removeTable(tableName)) {
             for (DatabaseUpdateEventListener listener : listeners) {
                 listener.tableDeleted(tableName);
             }
@@ -116,8 +115,8 @@
 
     @Command
     public boolean dropAllTables() {
-        Set<String> tableNames = state.getTables().keySet();
-        state.getTables().clear();
+        Set<String> tableNames = state.getTableNames();
+        state.removeAllTables();
         for (DatabaseUpdateEventListener listener : listeners) {
             for (String tableName : tableNames) {
                 listener.tableDeleted(tableName);
@@ -127,15 +126,15 @@
     }
 
     @Query
-    public List<String> listTables() {
-        return ImmutableList.copyOf(state.getTables().keySet());
+    public Set<String> listTables() {
+        return ImmutableSet.copyOf(state.getTableNames());
     }
 
     @Query
     public List<ReadResult> read(BatchReadRequest batchRequest) {
         List<ReadResult> results = new ArrayList<>(batchRequest.batchSize());
         for (ReadRequest request : batchRequest.getAsList()) {
-            Map<String, VersionedValue> table = state.getTables().get(request.tableName());
+            Map<String, VersionedValue> table = state.getTable(request.tableName());
             if (table == null) {
                 results.add(new ReadResult(ReadStatus.NO_SUCH_TABLE, request.tableName(), request.key(), null));
                 continue;
@@ -186,7 +185,7 @@
         boolean abort = false;
         List<WriteStatus> validationResults = new ArrayList<>(batchRequest.batchSize());
         for (WriteRequest request : batchRequest.getAsList()) {
-            Map<String, VersionedValue> table = state.getTables().get(request.tableName());
+            Map<String, VersionedValue> table = state.getTable(request.tableName());
             if (table == null) {
                 validationResults.add(WriteStatus.NO_SUCH_TABLE);
                 abort = true;
@@ -218,7 +217,7 @@
 
         // apply changes
         for (WriteRequest request : batchRequest.getAsList()) {
-            Map<String, VersionedValue> table = state.getTables().get(request.tableName());
+            Map<String, VersionedValue> table = state.getTable(request.tableName());
 
             TableModificationEvent tableModificationEvent = null;
             // FIXME: If this method could be called by multiple thread,
@@ -274,19 +273,78 @@
         return results;
     }
 
-    public class State {
+    public static class State {
 
-        private final Map<String, Map<String, VersionedValue>> tables =
-                Maps.newHashMap();
+        private final Map<String, TableMetadata> tableMetadata = Maps.newHashMap();
+        private final Map<String, Map<String, VersionedValue>> tableData = Maps.newHashMap();
         private long versionCounter = 1;
 
-        Map<String, Map<String, VersionedValue>> getTables() {
-            return tables;
+        public Map<String, VersionedValue> getTable(String tableName) {
+            return tableData.get(tableName);
+        }
+
+        void createTable(TableMetadata metadata) {
+            tableMetadata.put(metadata.tableName, metadata);
+            tableData.put(metadata.tableName, Maps.newHashMap());
+        }
+
+        TableMetadata getTableMetadata(String tableName) {
+            return tableMetadata.get(tableName);
         }
 
         long nextVersion() {
             return versionCounter++;
         }
+
+        Set<String> getTableNames() {
+            return ImmutableSet.copyOf(tableMetadata.keySet());
+        }
+
+
+        boolean removeTable(String tableName) {
+            if (!tableMetadata.containsKey(tableName)) {
+                return false;
+            }
+            tableMetadata.remove(tableName);
+            tableData.remove(tableName);
+            return true;
+        }
+
+        void removeAllTables() {
+            tableMetadata.clear();
+            tableData.clear();
+        }
+    }
+
+    public static class TableMetadata {
+        private final String tableName;
+        private final boolean expireOldEntries;
+        private final int ttlMillis;
+
+        public TableMetadata(String tableName) {
+            this.tableName = tableName;
+            this.expireOldEntries = false;
+            this.ttlMillis = Integer.MAX_VALUE;
+
+        }
+
+        public TableMetadata(String tableName, int ttlMillis) {
+            this.tableName = tableName;
+            this.expireOldEntries = true;
+            this.ttlMillis = ttlMillis;
+        }
+
+        public String tableName() {
+            return tableName;
+        }
+
+        public boolean expireOldEntries() {
+            return expireOldEntries;
+        }
+
+        public int ttlMillis() {
+            return ttlMillis;
+        }
     }
 
     @Override
@@ -319,13 +377,30 @@
             } else {
                 this.state = SERIALIZER.decode(data);
             }
+
+            // FIXME: synchronize.
+            for (DatabaseUpdateEventListener listener : listeners) {
+                listener.snapshotInstalled(state);
+            }
         } catch (Exception e) {
             log.error("Failed to install from snapshot", e);
             throw new SnapshotException(e);
         }
     }
 
+    /**
+     * Adds specified DatabaseUpdateEventListener.
+     * @param listener listener to add
+     */
     public void addEventListener(DatabaseUpdateEventListener listener) {
         listeners.add(listener);
     }
+
+    /**
+     * Removes specified DatabaseUpdateEventListener.
+     * @param listener listener to remove
+     */
+    public void removeEventListener(DatabaseUpdateEventListener listener) {
+        listeners.remove(listener);
+    }
 }
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseUpdateEventListener.java b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseUpdateEventListener.java
index 1dc0e9d..6eb8703 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseUpdateEventListener.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/service/impl/DatabaseUpdateEventListener.java
@@ -16,6 +16,8 @@
 
 package org.onlab.onos.store.service.impl;
 
+import org.onlab.onos.store.service.impl.DatabaseStateMachine.TableMetadata;
+
 /**
  * Interface of database update event listeners.
  */
@@ -29,14 +31,19 @@
 
     /**
      * Notifies listeners of a table created event.
-     * @param tableName name of the table created
-     * @param expirationTimeMillis TTL for entries added to the table (measured since last update time)
+     * @param metadata metadata for the created table.
      */
-    public void tableCreated(String tableName, int expirationTimeMillis);
+    public void tableCreated(TableMetadata metadata);
 
     /**
      * Notifies listeners of a table deleted event.
      * @param tableName name of the table deleted
      */
     public void tableDeleted(String tableName);
-}
\ No newline at end of file
+
+    /**
+     * Notifies listeners of a snapshot installation event.
+     * @param snapshotState installed snapshot state.
+     */
+    public void snapshotInstalled(DatabaseStateMachine.State snapshotState);
+}