blob: 42de7350cc883b0ac7f29217f5db598aebc70e5a [file] [log] [blame]
Madan Jampani9b37d572014-11-12 11:53:24 -08001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onlab.onos.store.service.impl;
18
Madan Jampania88d1f52014-11-14 16:45:24 -080019import static org.onlab.util.Tools.namedThreads;
20
Madan Jampani9b37d572014-11-12 11:53:24 -080021import java.io.IOException;
22import java.util.HashMap;
23import java.util.Map;
24import java.util.Objects;
Madan Jampania88d1f52014-11-14 16:45:24 -080025import java.util.concurrent.ExecutorService;
26import java.util.concurrent.Executors;
Madan Jampani9b37d572014-11-12 11:53:24 -080027import java.util.concurrent.TimeUnit;
28import java.util.concurrent.atomic.AtomicBoolean;
29
30import net.jodah.expiringmap.ExpiringMap;
31import net.jodah.expiringmap.ExpiringMap.ExpirationListener;
32import net.jodah.expiringmap.ExpiringMap.ExpirationPolicy;
33import net.kuujo.copycat.cluster.Member;
34import net.kuujo.copycat.event.EventHandler;
35import net.kuujo.copycat.event.LeaderElectEvent;
36
Madan Jampanidef2c652014-11-12 13:50:10 -080037import org.onlab.onos.cluster.ControllerNode;
Madan Jampani9b37d572014-11-12 11:53:24 -080038import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
39import org.onlab.onos.store.cluster.messaging.ClusterMessage;
Madan Jampani9b37d572014-11-12 11:53:24 -080040import org.onlab.onos.store.service.DatabaseService;
41import org.onlab.onos.store.service.VersionedValue;
Madan Jampanidef2c652014-11-12 13:50:10 -080042import org.onlab.onos.store.service.impl.DatabaseStateMachine.State;
43import org.onlab.onos.store.service.impl.DatabaseStateMachine.TableMetadata;
Madan Jampani9b37d572014-11-12 11:53:24 -080044import org.slf4j.Logger;
45import org.slf4j.LoggerFactory;
46
Madan Jampania88d1f52014-11-14 16:45:24 -080047import com.google.common.base.MoreObjects;
48
Madan Jampani9b37d572014-11-12 11:53:24 -080049/**
50 * Plugs into the database update stream and track the TTL of entries added to
51 * the database. For tables with pre-configured finite TTL, this class has
52 * mechanisms for expiring (deleting) old, expired entries from the database.
53 */
54public class DatabaseEntryExpirationTracker implements
55 DatabaseUpdateEventListener, EventHandler<LeaderElectEvent> {
56
Madan Jampania88d1f52014-11-14 16:45:24 -080057 private static final ExecutorService THREAD_POOL =
58 Executors.newCachedThreadPool(namedThreads("database-stale-entry-expirer-%d"));
59
Madan Jampani9b37d572014-11-12 11:53:24 -080060 private final Logger log = LoggerFactory.getLogger(getClass());
61
Madan Jampanidef2c652014-11-12 13:50:10 -080062 private final DatabaseService databaseService;
63 private final ClusterCommunicationService clusterCommunicator;
Madan Jampani9b37d572014-11-12 11:53:24 -080064
65 private final Member localMember;
Madan Jampanidef2c652014-11-12 13:50:10 -080066 private final ControllerNode localNode;
Madan Jampani9b37d572014-11-12 11:53:24 -080067 private final AtomicBoolean isLocalMemberLeader = new AtomicBoolean(false);
68
Madan Jampanif5d263b2014-11-13 10:04:40 -080069 private final Map<String, Map<DatabaseRow, Long>> tableEntryExpirationMap = new HashMap<>();
Madan Jampani9b37d572014-11-12 11:53:24 -080070
Madan Jampanif5d263b2014-11-13 10:04:40 -080071 private final ExpirationListener<DatabaseRow, Long> expirationObserver = new ExpirationObserver();
Madan Jampani9b37d572014-11-12 11:53:24 -080072
Madan Jampanidef2c652014-11-12 13:50:10 -080073 DatabaseEntryExpirationTracker(
74 Member localMember,
75 ControllerNode localNode,
76 ClusterCommunicationService clusterCommunicator,
77 DatabaseService databaseService) {
Madan Jampani9b37d572014-11-12 11:53:24 -080078 this.localMember = localMember;
Madan Jampanidef2c652014-11-12 13:50:10 -080079 this.localNode = localNode;
80 this.clusterCommunicator = clusterCommunicator;
81 this.databaseService = databaseService;
Madan Jampani9b37d572014-11-12 11:53:24 -080082 }
83
84 @Override
85 public void tableModified(TableModificationEvent event) {
Madan Jampania88d1f52014-11-14 16:45:24 -080086 log.debug("{}: Received {}", localNode.id(), event);
Madan Jampanif5d263b2014-11-13 10:04:40 -080087
Madan Jampanidef2c652014-11-12 13:50:10 -080088 if (!tableEntryExpirationMap.containsKey(event.tableName())) {
89 return;
90 }
91
Madan Jampanif5d263b2014-11-13 10:04:40 -080092 Map<DatabaseRow, Long> map = tableEntryExpirationMap.get(event.tableName());
Madan Jampani9b37d572014-11-12 11:53:24 -080093 DatabaseRow row = new DatabaseRow(event.tableName(), event.key());
Madan Jampanif5d263b2014-11-13 10:04:40 -080094 Long eventVersion = event.value().version();
Madan Jampani9b37d572014-11-12 11:53:24 -080095
96 switch (event.type()) {
97 case ROW_DELETED:
Madan Jampanif5d263b2014-11-13 10:04:40 -080098 map.remove(row, eventVersion);
Madan Jampani9b37d572014-11-12 11:53:24 -080099 if (isLocalMemberLeader.get()) {
100 try {
Madan Jampania88d1f52014-11-14 16:45:24 -0800101 log.debug("Broadcasting {} to the entire cluster", event);
102 clusterCommunicator.broadcastIncludeSelf(new ClusterMessage(
Madan Jampanif5d263b2014-11-13 10:04:40 -0800103 localNode.id(), DatabaseStateMachine.DATABASE_UPDATE_EVENTS,
Yuta HIGUCHI91768e32014-11-22 05:06:35 -0800104 ClusterMessagingProtocol.DB_SERIALIZER.encode(event)));
Madan Jampani9b37d572014-11-12 11:53:24 -0800105 } catch (IOException e) {
Madan Jampanif5d263b2014-11-13 10:04:40 -0800106 log.error("Failed to broadcast a database row deleted event.", e);
Madan Jampani9b37d572014-11-12 11:53:24 -0800107 }
108 }
109 break;
110 case ROW_ADDED:
111 case ROW_UPDATED:
Madan Jampanif5d263b2014-11-13 10:04:40 -0800112 // To account for potential reordering of notifications,
113 // check to make sure we are replacing an old version with a new version
114 Long currentVersion = map.get(row);
115 if (currentVersion == null || currentVersion < eventVersion) {
116 map.put(row, eventVersion);
117 }
Madan Jampani9b37d572014-11-12 11:53:24 -0800118 break;
119 default:
120 break;
121 }
122 }
123
124 @Override
Madan Jampanidef2c652014-11-12 13:50:10 -0800125 public void tableCreated(TableMetadata metadata) {
Madan Jampanif5d263b2014-11-13 10:04:40 -0800126 log.debug("Received a table created event {}", metadata);
Madan Jampanidef2c652014-11-12 13:50:10 -0800127 if (metadata.expireOldEntries()) {
128 tableEntryExpirationMap.put(metadata.tableName(), ExpiringMap.builder()
Madan Jampanif5d263b2014-11-13 10:04:40 -0800129 .expiration(metadata.ttlMillis(), TimeUnit.MILLISECONDS)
Madan Jampani9b37d572014-11-12 11:53:24 -0800130 .expirationListener(expirationObserver)
Madan Jampani9b37d572014-11-12 11:53:24 -0800131 .expirationPolicy(ExpirationPolicy.CREATED).build());
132 }
133 }
134
135 @Override
136 public void tableDeleted(String tableName) {
Madan Jampanif5d263b2014-11-13 10:04:40 -0800137 log.debug("Received a table deleted event for table ({})", tableName);
Madan Jampani9b37d572014-11-12 11:53:24 -0800138 tableEntryExpirationMap.remove(tableName);
139 }
140
141 private class ExpirationObserver implements
Madan Jampanif5d263b2014-11-13 10:04:40 -0800142 ExpirationListener<DatabaseRow, Long> {
Madan Jampani9b37d572014-11-12 11:53:24 -0800143 @Override
Madan Jampanif5d263b2014-11-13 10:04:40 -0800144 public void expired(DatabaseRow row, Long version) {
Madan Jampania88d1f52014-11-14 16:45:24 -0800145 THREAD_POOL.submit(new ExpirationTask(row, version));
146 }
147 }
148
149 private class ExpirationTask implements Runnable {
150
151 private final DatabaseRow row;
152 private final Long version;
153
154 public ExpirationTask(DatabaseRow row, Long version) {
155 this.row = row;
156 this.version = version;
157 }
158
159 @Override
160 public void run() {
Yuta HIGUCHI47b2f552014-11-28 20:13:15 -0800161 log.trace("Received an expiration event for {}, version: {}", row, version);
Madan Jampanif5d263b2014-11-13 10:04:40 -0800162 Map<DatabaseRow, Long> map = tableEntryExpirationMap.get(row.tableName);
Madan Jampani9b37d572014-11-12 11:53:24 -0800163 try {
164 if (isLocalMemberLeader.get()) {
Madan Jampanif5d263b2014-11-13 10:04:40 -0800165 if (!databaseService.removeIfVersionMatches(row.tableName,
166 row.key, version)) {
167 log.info("Entry in database was updated right before its expiration.");
168 } else {
Madan Jampania88d1f52014-11-14 16:45:24 -0800169 log.debug("Successfully expired old entry with key ({}) from table ({})",
Madan Jampanif5d263b2014-11-13 10:04:40 -0800170 row.key, row.tableName);
Madan Jampani9b37d572014-11-12 11:53:24 -0800171 }
172 } else {
Madan Jampanif5d263b2014-11-13 10:04:40 -0800173 // Only the current leader will expire keys from database.
174 // Everyone else function as standby just in case they need to take over
Madan Jampani9b37d572014-11-12 11:53:24 -0800175 if (map != null) {
Madan Jampanif5d263b2014-11-13 10:04:40 -0800176 map.putIfAbsent(row, version);
Madan Jampani9b37d572014-11-12 11:53:24 -0800177 }
178 }
179
180 } catch (Exception e) {
Madan Jampanif5d263b2014-11-13 10:04:40 -0800181 log.warn("Failed to delete entry from the database after ttl "
182 + "expiration. Operation will be retried.", e);
183 map.putIfAbsent(row, version);
Madan Jampani9b37d572014-11-12 11:53:24 -0800184 }
185 }
186 }
187
188 @Override
189 public void handle(LeaderElectEvent event) {
Madan Jampanif5d263b2014-11-13 10:04:40 -0800190 isLocalMemberLeader.set(localMember.equals(event.leader()));
Madan Jampania88d1f52014-11-14 16:45:24 -0800191 if (isLocalMemberLeader.get()) {
192 log.info("{} is now the leader of Raft cluster", localNode.id());
193 }
Madan Jampani9b37d572014-11-12 11:53:24 -0800194 }
195
196 /**
197 * Wrapper class for a database row identifier.
198 */
199 private class DatabaseRow {
200
201 String tableName;
202 String key;
203
204 public DatabaseRow(String tableName, String key) {
205 this.tableName = tableName;
206 this.key = key;
207 }
208
209 @Override
Madan Jampania88d1f52014-11-14 16:45:24 -0800210 public String toString() {
211 return MoreObjects.toStringHelper(getClass())
212 .add("tableName", tableName)
213 .add("key", key)
214 .toString();
215 }
216
217 @Override
Madan Jampani9b37d572014-11-12 11:53:24 -0800218 public boolean equals(Object obj) {
219 if (this == obj) {
220 return true;
221 }
222 if (!(obj instanceof DatabaseRow)) {
223 return false;
224 }
225 DatabaseRow that = (DatabaseRow) obj;
226
227 return Objects.equals(this.tableName, that.tableName)
228 && Objects.equals(this.key, that.key);
229 }
230
231 @Override
232 public int hashCode() {
233 return Objects.hash(tableName, key);
234 }
235 }
Madan Jampanidef2c652014-11-12 13:50:10 -0800236
237 @Override
238 public void snapshotInstalled(State state) {
239 if (!tableEntryExpirationMap.isEmpty()) {
240 return;
241 }
Madan Jampania88d1f52014-11-14 16:45:24 -0800242 log.debug("Received a snapshot installed notification");
Madan Jampanidef2c652014-11-12 13:50:10 -0800243 for (String tableName : state.getTableNames()) {
244
245 TableMetadata metadata = state.getTableMetadata(tableName);
246 if (!metadata.expireOldEntries()) {
247 continue;
248 }
249
Madan Jampanif5d263b2014-11-13 10:04:40 -0800250 Map<DatabaseRow, Long> tableExpirationMap = ExpiringMap.builder()
Madan Jampanidef2c652014-11-12 13:50:10 -0800251 .expiration(metadata.ttlMillis(), TimeUnit.MILLISECONDS)
252 .expirationListener(expirationObserver)
253 .expirationPolicy(ExpirationPolicy.CREATED).build();
254 for (Map.Entry<String, VersionedValue> entry : state.getTable(tableName).entrySet()) {
Madan Jampanif5d263b2014-11-13 10:04:40 -0800255 tableExpirationMap.put(new DatabaseRow(tableName, entry.getKey()), entry.getValue().version());
Madan Jampanidef2c652014-11-12 13:50:10 -0800256 }
257
258 tableEntryExpirationMap.put(tableName, tableExpirationMap);
259 }
260 }
Madan Jampani9b37d572014-11-12 11:53:24 -0800261}