Reworked AbstractDistributedStore RemoteEventHandler to allow delegating various events to accommodate cache-specific behaviours.
diff --git a/core/api/src/main/java/org/onlab/onos/store/StoreDelegate.java b/core/api/src/main/java/org/onlab/onos/store/StoreDelegate.java
index e2c5cd3..c7b0465 100644
--- a/core/api/src/main/java/org/onlab/onos/store/StoreDelegate.java
+++ b/core/api/src/main/java/org/onlab/onos/store/StoreDelegate.java
@@ -8,6 +8,11 @@
*/
public interface StoreDelegate<E extends Event> {
+ /**
+ * Notifies the delegate via the specified event.
+ *
+ * @param event store generated event
+ */
void notify(E event);
}
diff --git a/core/net/src/test/java/org/onlab/onos/net/device/impl/DistributedDeviceManagerTest.java b/core/net/src/test/java/org/onlab/onos/net/device/impl/DistributedDeviceManagerTest.java
index b9ca46f..8923da9 100644
--- a/core/net/src/test/java/org/onlab/onos/net/device/impl/DistributedDeviceManagerTest.java
+++ b/core/net/src/test/java/org/onlab/onos/net/device/impl/DistributedDeviceManagerTest.java
@@ -160,7 +160,7 @@
public void deviceDisconnected() {
connectDevice(DID1, SW1);
connectDevice(DID2, SW1);
- validateEvents(DEVICE_ADDED, DEVICE_ADDED);
+ validateEvents(DEVICE_ADDED, DEVICE_ADDED, DEVICE_ADDED, DEVICE_ADDED);
assertTrue("device should be available", service.isAvailable(DID1));
// Disconnect
@@ -179,7 +179,7 @@
@Test
public void deviceUpdated() {
connectDevice(DID1, SW1);
- validateEvents(DEVICE_ADDED);
+ validateEvents(DEVICE_ADDED, DEVICE_ADDED);
connectDevice(DID1, SW2);
validateEvents(DEVICE_UPDATED);
@@ -199,7 +199,7 @@
pds.add(new DefaultPortDescription(P2, true));
pds.add(new DefaultPortDescription(P3, true));
providerService.updatePorts(DID1, pds);
- validateEvents(DEVICE_ADDED, PORT_ADDED, PORT_ADDED, PORT_ADDED);
+ validateEvents(DEVICE_ADDED, DEVICE_ADDED, PORT_ADDED, PORT_ADDED, PORT_ADDED);
pds.clear();
pds.add(new DefaultPortDescription(P1, false));
@@ -215,7 +215,7 @@
pds.add(new DefaultPortDescription(P1, true));
pds.add(new DefaultPortDescription(P2, true));
providerService.updatePorts(DID1, pds);
- validateEvents(DEVICE_ADDED, PORT_ADDED, PORT_ADDED);
+ validateEvents(DEVICE_ADDED, DEVICE_ADDED, PORT_ADDED, PORT_ADDED);
providerService.portStatusChanged(DID1, new DefaultPortDescription(P1, false));
validateEvents(PORT_UPDATED);
@@ -230,7 +230,7 @@
pds.add(new DefaultPortDescription(P1, true));
pds.add(new DefaultPortDescription(P2, true));
providerService.updatePorts(DID1, pds);
- validateEvents(DEVICE_ADDED, PORT_ADDED, PORT_ADDED);
+ validateEvents(DEVICE_ADDED, DEVICE_ADDED, PORT_ADDED, PORT_ADDED);
assertEquals("wrong port count", 2, service.getPorts(DID1).size());
Port port = service.getPort(DID1, P1);
@@ -244,10 +244,10 @@
connectDevice(DID2, SW2);
assertEquals("incorrect device count", 2, service.getDeviceCount());
admin.removeDevice(DID1);
+ validateEvents(DEVICE_ADDED, DEVICE_ADDED, DEVICE_ADDED, DEVICE_ADDED, DEVICE_REMOVED, DEVICE_REMOVED);
assertNull("device should not be found", service.getDevice(DID1));
assertNotNull("device should be found", service.getDevice(DID2));
assertEquals("incorrect device count", 1, service.getDeviceCount());
-
}
protected void validateEvents(Enum... types) {
diff --git a/core/store/src/main/java/org/onlab/onos/store/device/impl/DistributedDeviceStore.java b/core/store/src/main/java/org/onlab/onos/store/device/impl/DistributedDeviceStore.java
index ed19199..64ae3c8 100644
--- a/core/store/src/main/java/org/onlab/onos/store/device/impl/DistributedDeviceStore.java
+++ b/core/store/src/main/java/org/onlab/onos/store/device/impl/DistributedDeviceStore.java
@@ -7,7 +7,6 @@
import com.google.common.collect.ImmutableSet.Builder;
import com.hazelcast.core.IMap;
import com.hazelcast.core.ISet;
-
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -38,6 +37,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.cache.CacheBuilder.newBuilder;
import static org.onlab.onos.net.device.DeviceEvent.Type.*;
@@ -82,7 +82,7 @@
= new OptionalCacheLoader<>(storeService, rawDevices);
devices = new AbsentInvalidatingLoadingCache<>(newBuilder().build(deviceLoader));
// refresh/populate cache based on notification from other instance
- rawDevices.addEntryListener(new RemoteEventHandler<>(devices), includeValue);
+ rawDevices.addEntryListener(new RemoteDeviceEventHandler(devices), includeValue);
// TODO cache availableDevices
availableDevices = theInstance.getSet("availableDevices");
@@ -92,35 +92,25 @@
= new OptionalCacheLoader<>(storeService, rawDevicePorts);
devicePorts = new AbsentInvalidatingLoadingCache<>(newBuilder().build(devicePortLoader));
// refresh/populate cache based on notification from other instance
- rawDevicePorts.addEntryListener(new RemoteEventHandler<>(devicePorts), includeValue);
+ rawDevicePorts.addEntryListener(new RemotePortEventHandler(devicePorts), includeValue);
+
+ loadDeviceCache();
log.info("Started");
}
@Deactivate
public void deactivate() {
-
log.info("Stopped");
}
@Override
public int getDeviceCount() {
- // TODO IMap size or cache size?
- return rawDevices.size();
+ return devices.asMap().size();
}
@Override
public Iterable<Device> getDevices() {
-// TODO Revisit if we ever need to do this.
-// log.info("{}:{}", rawMap.size(), cache.size());
-// if (rawMap.size() != cache.size()) {
-// for (Entry<byte[], byte[]> e : rawMap.entrySet()) {
-// final DeviceId key = deserialize(e.getKey());
-// final DefaultDevice val = deserialize(e.getValue());
-// cache.put(key, val);
-// }
-// }
-
// TODO builder v.s. copyOf. Guava semms to be using copyOf?
Builder<Device> builder = ImmutableSet.builder();
for (Optional<DefaultDevice> e : devices.asMap().values()) {
@@ -131,6 +121,17 @@
return builder.build();
}
+ private void loadDeviceCache() {
+ log.info("{}:{}", rawDevices.size(), devices.size());
+ if (rawDevices.size() != devices.size()) {
+ for (Map.Entry<byte[], byte[]> e : rawDevices.entrySet()) {
+ final DeviceId key = deserialize(e.getKey());
+ final DefaultDevice val = deserialize(e.getValue());
+ devices.put(key, Optional.of(val));
+ }
+ }
+ }
+
@Override
public Device getDevice(DeviceId deviceId) {
// TODO revisit if ignoring exception is safe.
@@ -162,7 +163,7 @@
availableDevices.add(deviceIdBytes);
}
- return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, device, null);
+ return new DeviceEvent(DEVICE_ADDED, device, null);
}
// Updates the device and returns the appropriate event if necessary.
@@ -343,5 +344,48 @@
}
}
+ private class RemoteDeviceEventHandler extends RemoteEventHandler<DeviceId, DefaultDevice> {
+ public RemoteDeviceEventHandler(LoadingCache<DeviceId, Optional<DefaultDevice>> cache) {
+ super(cache);
+ }
+
+ @Override
+ protected void onAdd(DeviceId deviceId, DefaultDevice device) {
+ delegate.notify(new DeviceEvent(DEVICE_ADDED, device));
+ }
+
+ @Override
+ protected void onRemove(DeviceId deviceId, DefaultDevice device) {
+ delegate.notify(new DeviceEvent(DEVICE_REMOVED, device));
+ }
+
+ @Override
+ protected void onUpdate(DeviceId deviceId, DefaultDevice device) {
+ delegate.notify(new DeviceEvent(DEVICE_UPDATED, device));
+ }
+ }
+
+ private class RemotePortEventHandler extends RemoteEventHandler<DeviceId, Map<PortNumber, Port>> {
+ public RemotePortEventHandler(LoadingCache<DeviceId, Optional<Map<PortNumber, Port>>> cache) {
+ super(cache);
+ }
+
+ @Override
+ protected void onAdd(DeviceId deviceId, Map<PortNumber, Port> ports) {
+// delegate.notify(new DeviceEvent(PORT_ADDED, getDevice(deviceId)));
+ }
+
+ @Override
+ protected void onRemove(DeviceId deviceId, Map<PortNumber, Port> ports) {
+// delegate.notify(new DeviceEvent(PORT_REMOVED, getDevice(deviceId)));
+ }
+
+ @Override
+ protected void onUpdate(DeviceId deviceId, Map<PortNumber, Port> ports) {
+// delegate.notify(new DeviceEvent(PORT_UPDATED, getDevice(deviceId)));
+ }
+ }
+
+
// TODO cache serialized DeviceID if we suffer from serialization cost
}
diff --git a/core/store/src/main/java/org/onlab/onos/store/impl/AbstractDistributedStore.java b/core/store/src/main/java/org/onlab/onos/store/impl/AbstractDistributedStore.java
index 2d0fb07..e7c2d58 100644
--- a/core/store/src/main/java/org/onlab/onos/store/impl/AbstractDistributedStore.java
+++ b/core/store/src/main/java/org/onlab/onos/store/impl/AbstractDistributedStore.java
@@ -6,7 +6,6 @@
import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.MapEvent;
-
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Reference;
@@ -25,7 +24,7 @@
*/
@Component(componentAbstract = true)
public abstract class AbstractDistributedStore<E extends Event, D extends StoreDelegate<E>>
- extends AbstractStore<E, D> {
+ extends AbstractStore<E, D> {
protected final Logger log = getLogger(getClass());
@@ -67,7 +66,7 @@
* @param <K> IMap key type after deserialization
* @param <V> IMap value type after deserialization
*/
- public final class RemoteEventHandler<K, V> extends EntryAdapter<byte[], byte[]> {
+ public class RemoteEventHandler<K, V> extends EntryAdapter<byte[], byte[]> {
private LoadingCache<K, Optional<V>> cache;
@@ -86,26 +85,58 @@
}
@Override
+ public void entryAdded(EntryEvent<byte[], byte[]> event) {
+ K key = deserialize(event.getKey());
+ V newVal = deserialize(event.getValue());
+ Optional<V> newValue = Optional.of(newVal);
+ cache.asMap().putIfAbsent(key, newValue);
+ onAdd(key, newVal);
+ }
+
+ @Override
public void entryUpdated(EntryEvent<byte[], byte[]> event) {
- K key = storeService.<K>deserialize(event.getKey());
- final V oldVal = storeService.<V>deserialize(event.getOldValue());
+ K key = deserialize(event.getKey());
+ V oldVal = deserialize(event.getOldValue());
Optional<V> oldValue = Optional.fromNullable(oldVal);
- final V newVal = storeService.<V>deserialize(event.getValue());
+ V newVal = deserialize(event.getValue());
Optional<V> newValue = Optional.of(newVal);
cache.asMap().replace(key, oldValue, newValue);
+ onUpdate(key, newVal);
}
@Override
public void entryRemoved(EntryEvent<byte[], byte[]> event) {
- cache.invalidate(storeService.<K>deserialize(event.getKey()));
+ K key = deserialize(event.getKey());
+ V val = deserialize(event.getValue());
+ cache.invalidate(key);
+ onRemove(key, val);
}
- @Override
- public void entryAdded(EntryEvent<byte[], byte[]> event) {
- K key = storeService.<K>deserialize(event.getKey());
- final V newVal = storeService.<V>deserialize(event.getValue());
- Optional<V> newValue = Optional.of(newVal);
- cache.asMap().putIfAbsent(key, newValue);
+ /**
+ * Cache entry addition hook.
+ *
+ * @param key new key
+ * @param newVal new value
+ */
+ protected void onAdd(K key, V newVal) {
+ }
+
+ /**
+ * Cache entry update hook.
+ *
+ * @param key new key
+ * @param newVal new value
+ */
+ protected void onUpdate(K key, V newVal) {
+ }
+
+ /**
+ * Cache entry remove hook.
+ *
+ * @param key new key
+ * @param val old value
+ */
+ protected void onRemove(K key, V val) {
}
}