blob: 62cf5845db582e0d5a7583483afb82d709261e89 [file] [log] [blame]
Madan Jampani9b37d572014-11-12 11:53:24 -08001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onlab.onos.store.service.impl;
18
19import java.io.IOException;
20import java.util.HashMap;
21import java.util.Map;
22import java.util.Objects;
23import java.util.concurrent.TimeUnit;
24import java.util.concurrent.atomic.AtomicBoolean;
25
26import net.jodah.expiringmap.ExpiringMap;
27import net.jodah.expiringmap.ExpiringMap.ExpirationListener;
28import net.jodah.expiringmap.ExpiringMap.ExpirationPolicy;
29import net.kuujo.copycat.cluster.Member;
30import net.kuujo.copycat.event.EventHandler;
31import net.kuujo.copycat.event.LeaderElectEvent;
32
Madan Jampanidef2c652014-11-12 13:50:10 -080033import org.onlab.onos.cluster.ControllerNode;
Madan Jampani9b37d572014-11-12 11:53:24 -080034import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
35import org.onlab.onos.store.cluster.messaging.ClusterMessage;
Madan Jampani9b37d572014-11-12 11:53:24 -080036import org.onlab.onos.store.service.DatabaseService;
37import org.onlab.onos.store.service.VersionedValue;
Madan Jampanidef2c652014-11-12 13:50:10 -080038import org.onlab.onos.store.service.impl.DatabaseStateMachine.State;
39import org.onlab.onos.store.service.impl.DatabaseStateMachine.TableMetadata;
Madan Jampani9b37d572014-11-12 11:53:24 -080040import org.slf4j.Logger;
41import org.slf4j.LoggerFactory;
42
43/**
44 * Plugs into the database update stream and track the TTL of entries added to
45 * the database. For tables with pre-configured finite TTL, this class has
46 * mechanisms for expiring (deleting) old, expired entries from the database.
47 */
48public class DatabaseEntryExpirationTracker implements
49 DatabaseUpdateEventListener, EventHandler<LeaderElectEvent> {
50
51 private final Logger log = LoggerFactory.getLogger(getClass());
52
Madan Jampanidef2c652014-11-12 13:50:10 -080053 private final DatabaseService databaseService;
54 private final ClusterCommunicationService clusterCommunicator;
Madan Jampani9b37d572014-11-12 11:53:24 -080055
56 private final Member localMember;
Madan Jampanidef2c652014-11-12 13:50:10 -080057 private final ControllerNode localNode;
Madan Jampani9b37d572014-11-12 11:53:24 -080058 private final AtomicBoolean isLocalMemberLeader = new AtomicBoolean(false);
59
Madan Jampanif5d263b2014-11-13 10:04:40 -080060 private final Map<String, Map<DatabaseRow, Long>> tableEntryExpirationMap = new HashMap<>();
Madan Jampani9b37d572014-11-12 11:53:24 -080061
Madan Jampanif5d263b2014-11-13 10:04:40 -080062 private final ExpirationListener<DatabaseRow, Long> expirationObserver = new ExpirationObserver();
Madan Jampani9b37d572014-11-12 11:53:24 -080063
Madan Jampanidef2c652014-11-12 13:50:10 -080064 DatabaseEntryExpirationTracker(
65 Member localMember,
66 ControllerNode localNode,
67 ClusterCommunicationService clusterCommunicator,
68 DatabaseService databaseService) {
Madan Jampani9b37d572014-11-12 11:53:24 -080069 this.localMember = localMember;
Madan Jampanidef2c652014-11-12 13:50:10 -080070 this.localNode = localNode;
71 this.clusterCommunicator = clusterCommunicator;
72 this.databaseService = databaseService;
Madan Jampani9b37d572014-11-12 11:53:24 -080073 }
74
75 @Override
76 public void tableModified(TableModificationEvent event) {
Madan Jampanif5d263b2014-11-13 10:04:40 -080077 log.debug("Received a table modification event {}", event);
78
Madan Jampanidef2c652014-11-12 13:50:10 -080079 if (!tableEntryExpirationMap.containsKey(event.tableName())) {
80 return;
81 }
82
Madan Jampanif5d263b2014-11-13 10:04:40 -080083 Map<DatabaseRow, Long> map = tableEntryExpirationMap.get(event.tableName());
Madan Jampani9b37d572014-11-12 11:53:24 -080084 DatabaseRow row = new DatabaseRow(event.tableName(), event.key());
Madan Jampanif5d263b2014-11-13 10:04:40 -080085 Long eventVersion = event.value().version();
Madan Jampani9b37d572014-11-12 11:53:24 -080086
87 switch (event.type()) {
88 case ROW_DELETED:
Madan Jampanif5d263b2014-11-13 10:04:40 -080089 map.remove(row, eventVersion);
Madan Jampani9b37d572014-11-12 11:53:24 -080090 if (isLocalMemberLeader.get()) {
91 try {
Madan Jampanif5d263b2014-11-13 10:04:40 -080092 // FIXME: The broadcast message should be sent to self.
Madan Jampanidef2c652014-11-12 13:50:10 -080093 clusterCommunicator.broadcast(new ClusterMessage(
Madan Jampanif5d263b2014-11-13 10:04:40 -080094 localNode.id(), DatabaseStateMachine.DATABASE_UPDATE_EVENTS,
Madan Jampani9b37d572014-11-12 11:53:24 -080095 DatabaseStateMachine.SERIALIZER.encode(event)));
96 } catch (IOException e) {
Madan Jampanif5d263b2014-11-13 10:04:40 -080097 log.error("Failed to broadcast a database row deleted event.", e);
Madan Jampani9b37d572014-11-12 11:53:24 -080098 }
99 }
100 break;
101 case ROW_ADDED:
102 case ROW_UPDATED:
Madan Jampanif5d263b2014-11-13 10:04:40 -0800103 // To account for potential reordering of notifications,
104 // check to make sure we are replacing an old version with a new version
105 Long currentVersion = map.get(row);
106 if (currentVersion == null || currentVersion < eventVersion) {
107 map.put(row, eventVersion);
108 }
Madan Jampani9b37d572014-11-12 11:53:24 -0800109 break;
110 default:
111 break;
112 }
113 }
114
115 @Override
Madan Jampanidef2c652014-11-12 13:50:10 -0800116 public void tableCreated(TableMetadata metadata) {
Madan Jampanif5d263b2014-11-13 10:04:40 -0800117 log.debug("Received a table created event {}", metadata);
Madan Jampanidef2c652014-11-12 13:50:10 -0800118 if (metadata.expireOldEntries()) {
119 tableEntryExpirationMap.put(metadata.tableName(), ExpiringMap.builder()
Madan Jampanif5d263b2014-11-13 10:04:40 -0800120 .expiration(metadata.ttlMillis(), TimeUnit.MILLISECONDS)
Madan Jampani9b37d572014-11-12 11:53:24 -0800121 .expirationListener(expirationObserver)
Madan Jampanif5d263b2014-11-13 10:04:40 -0800122 // TODO: make the expiration policy configurable.
123 // Do we need to support expiration based on last access time?
Madan Jampani9b37d572014-11-12 11:53:24 -0800124 .expirationPolicy(ExpirationPolicy.CREATED).build());
125 }
126 }
127
128 @Override
129 public void tableDeleted(String tableName) {
Madan Jampanif5d263b2014-11-13 10:04:40 -0800130 log.debug("Received a table deleted event for table ({})", tableName);
Madan Jampani9b37d572014-11-12 11:53:24 -0800131 tableEntryExpirationMap.remove(tableName);
132 }
133
134 private class ExpirationObserver implements
Madan Jampanif5d263b2014-11-13 10:04:40 -0800135 ExpirationListener<DatabaseRow, Long> {
Madan Jampani9b37d572014-11-12 11:53:24 -0800136 @Override
Madan Jampanif5d263b2014-11-13 10:04:40 -0800137 public void expired(DatabaseRow row, Long version) {
138 Map<DatabaseRow, Long> map = tableEntryExpirationMap.get(row.tableName);
Madan Jampani9b37d572014-11-12 11:53:24 -0800139 try {
140 if (isLocalMemberLeader.get()) {
Madan Jampanif5d263b2014-11-13 10:04:40 -0800141 if (!databaseService.removeIfVersionMatches(row.tableName,
142 row.key, version)) {
143 log.info("Entry in database was updated right before its expiration.");
144 } else {
145 log.info("Successfully expired old entry with key ({}) from table ({})",
146 row.key, row.tableName);
Madan Jampani9b37d572014-11-12 11:53:24 -0800147 }
148 } else {
Madan Jampanif5d263b2014-11-13 10:04:40 -0800149 // Only the current leader will expire keys from database.
150 // Everyone else function as standby just in case they need to take over
Madan Jampani9b37d572014-11-12 11:53:24 -0800151 if (map != null) {
Madan Jampanif5d263b2014-11-13 10:04:40 -0800152 map.putIfAbsent(row, version);
Madan Jampani9b37d572014-11-12 11:53:24 -0800153 }
154 }
155
156 } catch (Exception e) {
Madan Jampanif5d263b2014-11-13 10:04:40 -0800157 log.warn("Failed to delete entry from the database after ttl "
158 + "expiration. Operation will be retried.", e);
159 map.putIfAbsent(row, version);
Madan Jampani9b37d572014-11-12 11:53:24 -0800160 }
161 }
162 }
163
164 @Override
165 public void handle(LeaderElectEvent event) {
Madan Jampanif5d263b2014-11-13 10:04:40 -0800166 isLocalMemberLeader.set(localMember.equals(event.leader()));
Madan Jampani9b37d572014-11-12 11:53:24 -0800167 }
168
169 /**
170 * Wrapper class for a database row identifier.
171 */
172 private class DatabaseRow {
173
174 String tableName;
175 String key;
176
177 public DatabaseRow(String tableName, String key) {
178 this.tableName = tableName;
179 this.key = key;
180 }
181
182 @Override
183 public boolean equals(Object obj) {
184 if (this == obj) {
185 return true;
186 }
187 if (!(obj instanceof DatabaseRow)) {
188 return false;
189 }
190 DatabaseRow that = (DatabaseRow) obj;
191
192 return Objects.equals(this.tableName, that.tableName)
193 && Objects.equals(this.key, that.key);
194 }
195
196 @Override
197 public int hashCode() {
198 return Objects.hash(tableName, key);
199 }
200 }
Madan Jampanidef2c652014-11-12 13:50:10 -0800201
202 @Override
203 public void snapshotInstalled(State state) {
204 if (!tableEntryExpirationMap.isEmpty()) {
205 return;
206 }
207 for (String tableName : state.getTableNames()) {
208
209 TableMetadata metadata = state.getTableMetadata(tableName);
210 if (!metadata.expireOldEntries()) {
211 continue;
212 }
213
Madan Jampanif5d263b2014-11-13 10:04:40 -0800214 Map<DatabaseRow, Long> tableExpirationMap = ExpiringMap.builder()
Madan Jampanidef2c652014-11-12 13:50:10 -0800215 .expiration(metadata.ttlMillis(), TimeUnit.MILLISECONDS)
216 .expirationListener(expirationObserver)
217 .expirationPolicy(ExpirationPolicy.CREATED).build();
218 for (Map.Entry<String, VersionedValue> entry : state.getTable(tableName).entrySet()) {
Madan Jampanif5d263b2014-11-13 10:04:40 -0800219 tableExpirationMap.put(new DatabaseRow(tableName, entry.getKey()), entry.getValue().version());
Madan Jampanidef2c652014-11-12 13:50:10 -0800220 }
221
222 tableEntryExpirationMap.put(tableName, tableExpirationMap);
223 }
224 }
Madan Jampani9b37d572014-11-12 11:53:24 -0800225}