blob: 58d544682b42d4b9c7944e7f84939de58e63a8c0 [file] [log] [blame]
Madan Jampani12390c12014-11-12 00:35:56 -08001/*
2 * Copyright 2014 Open Networking Laboratory
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.onlab.onos.store.service.impl;
18
19import java.io.IOException;
20import java.util.HashMap;
21import java.util.Map;
22import java.util.Objects;
23import java.util.concurrent.TimeUnit;
24import java.util.concurrent.atomic.AtomicBoolean;
25
26import net.jodah.expiringmap.ExpiringMap;
27import net.jodah.expiringmap.ExpiringMap.ExpirationListener;
28import net.jodah.expiringmap.ExpiringMap.ExpirationPolicy;
29import net.kuujo.copycat.cluster.Member;
30import net.kuujo.copycat.event.EventHandler;
31import net.kuujo.copycat.event.LeaderElectEvent;
32
33import org.onlab.onos.cluster.ClusterService;
34import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
35import org.onlab.onos.store.cluster.messaging.ClusterMessage;
36import org.onlab.onos.store.cluster.messaging.MessageSubject;
37import org.onlab.onos.store.service.DatabaseService;
38import org.slf4j.Logger;
39import org.slf4j.LoggerFactory;
40
41public class DatabaseUpdateEventHandler implements DatabaseUpdateEventListener, EventHandler<LeaderElectEvent> {
42
43 private final Logger log = LoggerFactory.getLogger(getClass());
44
45 public final static MessageSubject DATABASE_UPDATES = new MessageSubject("database-update-event");
46
47 private DatabaseService databaseService;
48 private ClusterService cluster;
49 private ClusterCommunicationService clusterCommunicator;
50
51 private final Member localMember;
52 private final AtomicBoolean isLocalMemberLeader = new AtomicBoolean(false);
53 private final Map<String, Map<DatabaseRow, Void>> tableEntryExpirationMap = new HashMap<>();
54 private final ExpirationListener<DatabaseRow, Void> expirationObserver = new ExpirationObserver();
55
56 DatabaseUpdateEventHandler(Member localMember) {
57 this.localMember = localMember;
58 }
59
60 @Override
61 public void tableModified(TableModificationEvent event) {
62 DatabaseRow row = new DatabaseRow(event.tableName(), event.key());
63 Map<DatabaseRow, Void> map = tableEntryExpirationMap.get(event.tableName());
64
65 switch (event.type()) {
66 case ROW_DELETED:
67 if (isLocalMemberLeader.get()) {
68 try {
69 clusterCommunicator.broadcast(
70 new ClusterMessage(
71 cluster.getLocalNode().id(),
72 DATABASE_UPDATES,
73 DatabaseStateMachine.SERIALIZER.encode(event)));
74 } catch (IOException e) {
75 log.error("Failed to broadcast a database table modification event.", e);
76 }
77 }
78 break;
79 case ROW_ADDED:
80 case ROW_UPDATED:
81 map.put(row, null);
82 break;
83 default:
84 break;
85 }
86 }
87
88 @Override
89 public void tableCreated(String tableName, int expirationTimeMillis) {
90 // make this explicit instead of relying on a negative value
91 // to indicate no expiration.
92 if (expirationTimeMillis > 0) {
93 tableEntryExpirationMap.put(tableName, ExpiringMap.builder()
94 .expiration(expirationTimeMillis, TimeUnit.SECONDS)
95 .expirationListener(expirationObserver)
96 // FIXME: make the expiration policy configurable.
97 .expirationPolicy(ExpirationPolicy.CREATED)
98 .build());
99 }
100 }
101
102 @Override
103 public void tableDeleted(String tableName) {
104 tableEntryExpirationMap.remove(tableName);
105 }
106
107 private class ExpirationObserver implements ExpirationListener<DatabaseRow, Void> {
108 @Override
109 public void expired(DatabaseRow key, Void value) {
110 try {
111 // TODO: The safety of this check needs to be verified.
112 // Couple of issues:
113 // 1. It is very likely that only one member should attempt deletion of the entry from database.
114 // 2. A potential race condition exists where the entry expires, but before its can be deleted
115 // from the database, a new entry is added or existing entry is updated.
116 // That means ttl and expiration should be for a given version.
117 if (isLocalMemberLeader.get()) {
118 databaseService.remove(key.tableName, key.key);
119 }
120 } catch (Exception e) {
121 log.warn("Failed to delete entry from the database after ttl expiration. Will retry eviction", e);
122 tableEntryExpirationMap.get(key.tableName).put(new DatabaseRow(key.tableName, key.key), null);
123 }
124 }
125 }
126
127 @Override
128 public void handle(LeaderElectEvent event) {
129 if (localMember.equals(event.leader())) {
130 isLocalMemberLeader.set(true);
131 }
132 }
133
134 private class DatabaseRow {
135
136 String tableName;
137 String key;
138
139 public DatabaseRow(String tableName, String key) {
140 this.tableName = tableName;
141 this.key = key;
142 }
143
144 @Override
145 public boolean equals(Object obj) {
146 if (this == obj) {
147 return true;
148 }
149 if (!(obj instanceof DatabaseRow)) {
150 return false;
151 }
152 DatabaseRow that = (DatabaseRow) obj;
153
154 return Objects.equals(this.tableName, that.tableName) &&
155 Objects.equals(this.key, that.key);
156 }
157
158 @Override
159 public int hashCode() {
160 return Objects.hash(tableName, key);
161 }
162 }
163}