Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next
Conflicts:
core/net/src/main/java/org/onlab/onos/net/device/impl/DeviceManager.java
core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java
Change-Id: Ia1274657b27e01366a4a87196a13068d7104ee80
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterManagementMessageSubjects.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterManagementMessageSubjects.java
new file mode 100644
index 0000000..74c22f1
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterManagementMessageSubjects.java
@@ -0,0 +1,10 @@
+package org.onlab.onos.store.cluster.impl;
+
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+
+public final class ClusterManagementMessageSubjects {
+ // avoid instantiation
+ private ClusterManagementMessageSubjects() {}
+
+ public static final MessageSubject CLUSTER_MEMBERSHIP_EVENT = new MessageSubject("CLUSTER_MEMBERSHIP_EVENT");
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterMembershipEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMembershipEvent.java
similarity index 91%
rename from core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterMembershipEvent.java
rename to core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMembershipEvent.java
index 961ed4f..30b847f 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterMembershipEvent.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMembershipEvent.java
@@ -1,4 +1,4 @@
-package org.onlab.onos.store.cluster.messaging.impl;
+package org.onlab.onos.store.cluster.impl;
import org.onlab.onos.cluster.ControllerNode;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterMembershipEventType.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMembershipEventType.java
similarity index 69%
rename from core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterMembershipEventType.java
rename to core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMembershipEventType.java
index 1f5fd3f..cdfd145 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterMembershipEventType.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMembershipEventType.java
@@ -1,4 +1,4 @@
-package org.onlab.onos.store.cluster.messaging.impl;
+package org.onlab.onos.store.cluster.impl;
public enum ClusterMembershipEventType {
NEW_MEMBER,
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
index 9408cc9..5e64a39 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
@@ -7,11 +7,9 @@
import com.google.common.collect.ImmutableSet;
import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ClusterEvent;
import org.onlab.onos.cluster.ClusterStore;
import org.onlab.onos.cluster.ClusterStoreDelegate;
@@ -37,8 +35,8 @@
/**
* Distributed implementation of the cluster nodes store.
*/
-@Component(immediate = true)
-@Service
+//@Component(immediate = true)
+//@Service
public class DistributedClusterStore
extends AbstractStore<ClusterEvent, ClusterStoreDelegate>
implements ClusterStore {
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/MessageSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/MessageSerializer.java
deleted file mode 100644
index 98e80f7..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/MessageSerializer.java
+++ /dev/null
@@ -1,117 +0,0 @@
-package org.onlab.onos.store.cluster.impl;
-
-import de.javakaffee.kryoserializers.URISerializer;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.onos.cluster.ControllerNode;
-import org.onlab.onos.cluster.DefaultControllerNode;
-import org.onlab.onos.cluster.NodeId;
-import org.onlab.onos.net.ConnectPoint;
-import org.onlab.onos.net.DefaultDevice;
-import org.onlab.onos.net.DefaultLink;
-import org.onlab.onos.net.DefaultPort;
-import org.onlab.onos.net.Device;
-import org.onlab.onos.net.DeviceId;
-import org.onlab.onos.net.Element;
-import org.onlab.onos.net.Link;
-import org.onlab.onos.net.LinkKey;
-import org.onlab.onos.net.MastershipRole;
-import org.onlab.onos.net.Port;
-import org.onlab.onos.net.PortNumber;
-import org.onlab.onos.net.provider.ProviderId;
-import org.onlab.onos.store.cluster.messaging.MessageSubject;
-import org.onlab.onos.store.cluster.messaging.SerializationService;
-import org.onlab.onos.store.serializers.ConnectPointSerializer;
-import org.onlab.onos.store.serializers.DefaultLinkSerializer;
-import org.onlab.onos.store.serializers.DefaultPortSerializer;
-import org.onlab.onos.store.serializers.DeviceIdSerializer;
-import org.onlab.onos.store.serializers.IpPrefixSerializer;
-import org.onlab.onos.store.serializers.LinkKeySerializer;
-import org.onlab.onos.store.serializers.NodeIdSerializer;
-import org.onlab.onos.store.serializers.PortNumberSerializer;
-import org.onlab.onos.store.serializers.ProviderIdSerializer;
-import org.onlab.packet.IpPrefix;
-import org.onlab.util.KryoPool;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.URI;
-import java.util.ArrayList;
-import java.util.HashMap;
-
-/**
- * Factory for parsing messages sent between cluster members.
- */
-@Component(immediate = true)
-@Service
-public class MessageSerializer implements SerializationService {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- private static final int METADATA_LENGTH = 12; // 8 + 4
- private static final int LENGTH_OFFSET = 8;
-
- private static final long MARKER = 0xfeedcafebeaddeadL;
-
- private KryoPool serializerPool;
-
- @Activate
- public void activate() {
- setupKryoPool();
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- log.info("Stopped");
- }
-
- /**
- * Sets up the common serialzers pool.
- */
- protected void setupKryoPool() {
- // FIXME Slice out types used in common to separate pool/namespace.
- serializerPool = KryoPool.newBuilder()
- .register(ArrayList.class,
- HashMap.class,
-
- ControllerNode.State.class,
- Device.Type.class,
-
- DefaultControllerNode.class,
- DefaultDevice.class,
- MastershipRole.class,
- Port.class,
- Element.class,
-
- Link.Type.class,
-
- MessageSubject.class
- )
- .register(IpPrefix.class, new IpPrefixSerializer())
- .register(URI.class, new URISerializer())
- .register(NodeId.class, new NodeIdSerializer())
- .register(ProviderId.class, new ProviderIdSerializer())
- .register(DeviceId.class, new DeviceIdSerializer())
- .register(PortNumber.class, new PortNumberSerializer())
- .register(DefaultPort.class, new DefaultPortSerializer())
- .register(LinkKey.class, new LinkKeySerializer())
- .register(ConnectPoint.class, new ConnectPointSerializer())
- .register(DefaultLink.class, new DefaultLinkSerializer())
- .build()
- .populate(1);
- }
-
-
- @Override
- public Object decode(byte[] data) {
- return serializerPool.deserialize(data);
- }
-
- @Override
- public byte[] encode(Object payload) {
- return serializerPool.serialize(payload);
- }
-}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationAdminService.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationAdminService.java
index 5966f12..0bc31fa 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationAdminService.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterCommunicationAdminService.java
@@ -3,6 +3,8 @@
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.store.cluster.impl.ClusterNodesDelegate;
+// TODO: This service interface can be removed, once we properly start
+// using ClusterService
/**
* Service for administering communications manager.
*/
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java
index ee558dd..b74f887 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java
@@ -2,6 +2,7 @@
import org.onlab.onos.cluster.NodeId;
+// TODO: Should payload type be ByteBuffer?
/**
* Base message for cluster-wide communications.
*/
@@ -9,14 +10,14 @@
private final NodeId sender;
private final MessageSubject subject;
- private final Object payload;
+ private final byte[] payload;
/**
* Creates a cluster message.
*
* @param subject message subject
*/
- public ClusterMessage(NodeId sender, MessageSubject subject, Object payload) {
+ public ClusterMessage(NodeId sender, MessageSubject subject, byte[] payload) {
this.sender = sender;
this.subject = subject;
this.payload = payload;
@@ -45,7 +46,7 @@
*
* @return message payload.
*/
- public Object payload() {
+ public byte[] payload() {
return payload;
}
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageHandler.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageHandler.java
index 7ec27ec..4dd7bc2 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageHandler.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageHandler.java
@@ -10,4 +10,4 @@
* @param message cluster message.
*/
public void handle(ClusterMessage message);
-}
\ No newline at end of file
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java
index ee8d9c1..43df15f 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java
@@ -1,5 +1,9 @@
package org.onlab.onos.store.cluster.messaging;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Objects;
+
/**
* Representation of a message subject.
* Cluster messages have associated subjects that dictate how they get handled
@@ -10,7 +14,7 @@
private final String value;
public MessageSubject(String value) {
- this.value = value;
+ this.value = checkNotNull(value);
}
public String value() {
@@ -21,4 +25,29 @@
public String toString() {
return value;
}
+
+ @Override
+ public int hashCode() {
+ return value.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ MessageSubject that = (MessageSubject) obj;
+ return Objects.equals(this.value, that.value);
+ }
+
+ // for serializer
+ protected MessageSubject() {
+ this.value = "";
+ }
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/SerializationService.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/SerializationService.java
index d85f488..4d76ce3 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/SerializationService.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/SerializationService.java
@@ -1,7 +1,7 @@
package org.onlab.onos.store.cluster.messaging;
/**
- * Service for encoding & decoding intra-cluster messages.
+ * Service for encoding & decoding intra-cluster message payload.
*/
public interface SerializationService {
@@ -11,7 +11,7 @@
* @param buffer byte buffer with message(s)
* @return parsed message
*/
- Object decode(byte[] data);
+ <T> T decode(byte[] data);
/**
* Encodes the specified message into the given byte buffer.
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
index d4fd9c0..1b11873 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -3,30 +3,36 @@
import static com.google.common.base.Preconditions.checkArgument;
import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
-
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.store.cluster.impl.ClusterMembershipEvent;
+import org.onlab.onos.store.cluster.impl.ClusterMembershipEventType;
import org.onlab.onos.store.cluster.impl.ClusterNodesDelegate;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationAdminService;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
+import org.onlab.onos.store.serializers.ClusterMessageSerializer;
+import org.onlab.onos.store.serializers.KryoPoolUtil;
+import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.onos.store.serializers.MessageSubjectSerializer;
+import org.onlab.util.KryoPool;
import org.onlab.netty.Endpoint;
import org.onlab.netty.Message;
import org.onlab.netty.MessageHandler;
import org.onlab.netty.MessagingService;
+import org.onlab.netty.NettyMessagingService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,28 +44,57 @@
private final Logger log = LoggerFactory.getLogger(getClass());
private ControllerNode localNode;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ private ClusterService clusterService;
+
private ClusterNodesDelegate nodesDelegate;
- private Map<NodeId, ControllerNode> members = new HashMap<>();
private final Timer timer = new Timer("onos-controller-heatbeats");
public static final long HEART_BEAT_INTERVAL_MILLIS = 1000L;
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ // TODO: This probably should not be a OSGi service.
private MessagingService messagingService;
+ private static final KryoSerializer SERIALIZER = new KryoSerializer() {
+ @Override
+ protected void setupKryoPool() {
+ serializerPool = KryoPool.newBuilder()
+ .register(KryoPoolUtil.API)
+ .register(ClusterMessage.class, new ClusterMessageSerializer())
+ .register(ClusterMembershipEvent.class)
+ .register(byte[].class)
+ .register(MessageSubject.class, new MessageSubjectSerializer())
+ .build()
+ .populate(1);
+ }
+
+ };
+
@Activate
public void activate() {
+ localNode = clusterService.getLocalNode();
+ NettyMessagingService netty = new NettyMessagingService(localNode.tcpPort());
+ // FIXME: workaround until it becomes a service.
+ try {
+ netty.activate();
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ log.error("NettyMessagingService#activate", e);
+ }
+ messagingService = netty;
log.info("Started");
}
@Deactivate
public void deactivate() {
+ // TODO: cleanup messageingService if needed.
log.info("Stopped");
}
@Override
public boolean broadcast(ClusterMessage message) {
boolean ok = true;
- for (ControllerNode node : members.values()) {
+ for (ControllerNode node : clusterService.getNodes()) {
if (!node.equals(localNode)) {
ok = unicast(message, node.id()) && ok;
}
@@ -80,11 +115,12 @@
@Override
public boolean unicast(ClusterMessage message, NodeId toNodeId) {
- ControllerNode node = members.get(toNodeId);
+ ControllerNode node = clusterService.getNode(toNodeId);
checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
try {
- messagingService.sendAsync(nodeEp, message.subject().value(), message);
+ messagingService.sendAsync(nodeEp,
+ message.subject().value(), SERIALIZER.encode(message));
return true;
} catch (IOException e) {
log.error("Failed to send cluster message to nodeId: " + toNodeId, e);
@@ -110,7 +146,7 @@
@Override
public void addNode(ControllerNode node) {
- members.put(node.id(), node);
+ //members.put(node.id(), node);
}
@Override
@@ -118,8 +154,8 @@
broadcast(new ClusterMessage(
localNode.id(),
new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
- new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node)));
- members.remove(node.id());
+ SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node))));
+ //members.remove(node.id());
}
// Sends a heart beat to all peers.
@@ -130,7 +166,7 @@
broadcast(new ClusterMessage(
localNode.id(),
new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
- new ClusterMembershipEvent(ClusterMembershipEventType.HEART_BEAT, localNode)));
+ SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.HEART_BEAT, localNode))));
}
}
@@ -139,7 +175,7 @@
@Override
public void handle(ClusterMessage message) {
- ClusterMembershipEvent event = (ClusterMembershipEvent) message.payload();
+ ClusterMembershipEvent event = SERIALIZER.decode(message.payload());
ControllerNode node = event.node();
if (event.type() == ClusterMembershipEventType.HEART_BEAT) {
log.info("Node {} sent a hearbeat", node.id());
@@ -154,7 +190,7 @@
}
}
- private static class InternalClusterMessageHandler implements MessageHandler {
+ private final class InternalClusterMessageHandler implements MessageHandler {
private final ClusterMessageHandler handler;
@@ -164,7 +200,13 @@
@Override
public void handle(Message message) {
- handler.handle((ClusterMessage) message.payload());
+ try {
+ ClusterMessage clusterMessage = SERIALIZER.decode(message.payload());
+ handler.handle(clusterMessage);
+ } catch (Exception e) {
+ log.error("Exception caught during ClusterMessageHandler", e);
+ throw e;
+ }
}
}
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterMessageSubjects.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterMessageSubjects.java
deleted file mode 100644
index d1f75ae..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterMessageSubjects.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package org.onlab.onos.store.cluster.messaging.impl;
-
-import org.onlab.onos.store.cluster.messaging.MessageSubject;
-
-public final class ClusterMessageSubjects {
- private ClusterMessageSubjects() {}
- public static final MessageSubject CLUSTER_MEMBERSHIP_EVENT = new MessageSubject("CLUSTER_MEMBERSHIP_EVENT");
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/MessageSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/MessageSerializer.java
new file mode 100644
index 0000000..bf47f49
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/MessageSerializer.java
@@ -0,0 +1,63 @@
+package org.onlab.onos.store.cluster.messaging.impl;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+import org.onlab.onos.store.cluster.messaging.SerializationService;
+import org.onlab.onos.store.serializers.KryoPoolUtil;
+import org.onlab.util.KryoPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Factory for parsing messages sent between cluster members.
+ */
+@Component(immediate = true)
+@Service
+public class MessageSerializer implements SerializationService {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private static final int METADATA_LENGTH = 12; // 8 + 4
+ private static final int LENGTH_OFFSET = 8;
+
+ private static final long MARKER = 0xfeedcafebeaddeadL;
+
+ private KryoPool serializerPool;
+
+ @Activate
+ public void activate() {
+ setupKryoPool();
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ log.info("Stopped");
+ }
+
+ /**
+ * Sets up the common serialzers pool.
+ */
+ protected void setupKryoPool() {
+ serializerPool = KryoPool.newBuilder()
+ .register(KryoPoolUtil.API)
+ // TODO: Should MessageSubject be in API bundle?
+ .register(MessageSubject.class)
+ .build()
+ .populate(1);
+ }
+
+
+ @Override
+ public <T> T decode(byte[] data) {
+ return serializerPool.deserialize(data);
+ }
+
+ @Override
+ public byte[] encode(Object payload) {
+ return serializerPool.serialize(payload);
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/package-info.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/package-info.java
new file mode 100644
index 0000000..6c1e71b
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * Implementation of the cluster messaging mechanism.
+ */
+package org.onlab.onos.store.cluster.messaging.impl;
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/AntiEntropyAdvertisement.java b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/AntiEntropyAdvertisement.java
similarity index 95%
rename from core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/AntiEntropyAdvertisement.java
rename to core/store/dist/src/main/java/org/onlab/onos/store/common/impl/AntiEntropyAdvertisement.java
index b70da73..132f27a 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/AntiEntropyAdvertisement.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/AntiEntropyAdvertisement.java
@@ -1,4 +1,4 @@
-package org.onlab.onos.store.cluster.messaging;
+package org.onlab.onos.store.common.impl;
import java.util.Map;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/AntiEntropyReply.java b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/AntiEntropyReply.java
similarity index 97%
rename from core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/AntiEntropyReply.java
rename to core/store/dist/src/main/java/org/onlab/onos/store/common/impl/AntiEntropyReply.java
index 095752b..033a1de 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/AntiEntropyReply.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/AntiEntropyReply.java
@@ -1,4 +1,4 @@
-package org.onlab.onos.store.cluster.messaging;
+package org.onlab.onos.store.common.impl;
import java.util.Map;
import java.util.Set;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/impl/OnosTimestamp.java b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/MastershipBasedTimestamp.java
similarity index 73%
rename from core/store/dist/src/main/java/org/onlab/onos/store/impl/OnosTimestamp.java
rename to core/store/dist/src/main/java/org/onlab/onos/store/common/impl/MastershipBasedTimestamp.java
index 2005582..0f4f894 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/impl/OnosTimestamp.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/MastershipBasedTimestamp.java
@@ -1,4 +1,4 @@
-package org.onlab.onos.store.impl;
+package org.onlab.onos.store.common.impl;
import static com.google.common.base.Preconditions.checkArgument;
@@ -9,12 +9,11 @@
import com.google.common.base.MoreObjects;
import com.google.common.collect.ComparisonChain;
-// If it is store specific, implement serializable interfaces?
/**
* Default implementation of Timestamp.
* TODO: Better documentation.
*/
-public final class OnosTimestamp implements Timestamp {
+public final class MastershipBasedTimestamp implements Timestamp {
private final int termNumber;
private final int sequenceNumber;
@@ -25,15 +24,16 @@
* @param termNumber the mastership termNumber
* @param sequenceNumber the sequenceNumber number within the termNumber
*/
- public OnosTimestamp(int termNumber, int sequenceNumber) {
+ public MastershipBasedTimestamp(int termNumber, int sequenceNumber) {
this.termNumber = termNumber;
this.sequenceNumber = sequenceNumber;
}
@Override
public int compareTo(Timestamp o) {
- checkArgument(o instanceof OnosTimestamp, "Must be OnosTimestamp", o);
- OnosTimestamp that = (OnosTimestamp) o;
+ checkArgument(o instanceof MastershipBasedTimestamp,
+ "Must be MastershipBasedTimestamp", o);
+ MastershipBasedTimestamp that = (MastershipBasedTimestamp) o;
return ComparisonChain.start()
.compare(this.termNumber, that.termNumber)
@@ -51,10 +51,10 @@
if (this == obj) {
return true;
}
- if (!(obj instanceof OnosTimestamp)) {
+ if (!(obj instanceof MastershipBasedTimestamp)) {
return false;
}
- OnosTimestamp that = (OnosTimestamp) obj;
+ MastershipBasedTimestamp that = (MastershipBasedTimestamp) obj;
return Objects.equals(this.termNumber, that.termNumber) &&
Objects.equals(this.sequenceNumber, that.sequenceNumber);
}
@@ -84,4 +84,11 @@
public int sequenceNumber() {
return sequenceNumber;
}
+
+ // Default constructor for serialization
+ @Deprecated
+ protected MastershipBasedTimestamp() {
+ this.termNumber = -1;
+ this.sequenceNumber = -1;
+ }
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/Timestamped.java b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/Timestamped.java
new file mode 100644
index 0000000..77b0a87
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/Timestamped.java
@@ -0,0 +1,89 @@
+package org.onlab.onos.store.common.impl;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Objects;
+
+import org.onlab.onos.store.Timestamp;
+
+import com.google.common.base.MoreObjects;
+
+/**
+ * Wrapper class to store Timestamped value.
+ * @param <T>
+ */
+public final class Timestamped<T> {
+
+ private final Timestamp timestamp;
+ private final T value;
+
+ /**
+ * Creates a time stamped value.
+ *
+ * @param value to be timestamp
+ * @param timestamp the timestamp
+ */
+ public Timestamped(T value, Timestamp timestamp) {
+ this.value = checkNotNull(value);
+ this.timestamp = checkNotNull(timestamp);
+ }
+
+ /**
+ * Returns the value.
+ * @return value
+ */
+ public T value() {
+ return value;
+ }
+
+ /**
+ * Returns the time stamp.
+ * @return time stamp
+ */
+ public Timestamp timestamp() {
+ return timestamp;
+ }
+
+ /**
+ * Tests if this timestamped value is newer than the other.
+ *
+ * @param other timestamped value
+ * @return true if this instance is newer.
+ */
+ public boolean isNewer(Timestamped<T> other) {
+ return this.timestamp.compareTo(checkNotNull(other).timestamp()) > 0;
+ }
+
+ @Override
+ public int hashCode() {
+ return timestamp.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (!(obj instanceof Timestamped)) {
+ return false;
+ }
+ @SuppressWarnings("unchecked")
+ Timestamped<T> that = (Timestamped<T>) obj;
+ return Objects.equals(this.timestamp, that.timestamp);
+ }
+
+ @Override
+ public String toString() {
+ return MoreObjects.toStringHelper(getClass())
+ .add("timestamp", timestamp)
+ .add("value", value)
+ .toString();
+ }
+
+ // Default constructor for serialization
+ @Deprecated
+ private Timestamped() {
+ this.value = null;
+ this.timestamp = null;
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/package-info.java b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/package-info.java
new file mode 100644
index 0000000..992fd49
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/package-info.java
@@ -0,0 +1,5 @@
+/**
+ * Common abstractions and facilities for implementing distributed store
+ * using gossip protocol.
+ */
+package org.onlab.onos.store.common.impl;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyAdvertisement.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyAdvertisement.java
index 301884c..d05659b 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyAdvertisement.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyAdvertisement.java
@@ -8,7 +8,7 @@
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.store.Timestamp;
-import org.onlab.onos.store.cluster.messaging.AntiEntropyAdvertisement;
+import org.onlab.onos.store.common.impl.AntiEntropyAdvertisement;
// TODO DeviceID needs to be changed to something like (ProviderID, DeviceID)
// TODO: Handle Port as part of these messages, or separate messages for Ports?
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyReply.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyReply.java
index 011713e..e7a4d0a 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyReply.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyReply.java
@@ -10,7 +10,7 @@
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.store.Timestamp;
-import org.onlab.onos.store.cluster.messaging.AntiEntropyReply;
+import org.onlab.onos.store.common.impl.AntiEntropyReply;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/OnosClockService.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceClockManager.java
similarity index 81%
rename from core/store/dist/src/main/java/org/onlab/onos/store/device/impl/OnosClockService.java
rename to core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceClockManager.java
index a99482f..e1e3692 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/OnosClockService.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceClockManager.java
@@ -12,14 +12,18 @@
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.store.ClockProviderService;
import org.onlab.onos.store.ClockService;
import org.onlab.onos.store.Timestamp;
-import org.onlab.onos.store.impl.OnosTimestamp;
+import org.onlab.onos.store.common.impl.MastershipBasedTimestamp;
import org.slf4j.Logger;
+/**
+ * Clock service to issue Timestamp based on Device Mastership.
+ */
@Component(immediate = true)
@Service
-public class OnosClockService implements ClockService {
+public class DeviceClockManager implements ClockService, ClockProviderService {
private final Logger log = getLogger(getClass());
@@ -43,7 +47,7 @@
if (term == null) {
throw new IllegalStateException("Requesting timestamp for a deviceId without mastership");
}
- return new OnosTimestamp(term.termNumber(), ticker.incrementAndGet());
+ return new MastershipBasedTimestamp(term.termNumber(), ticker.incrementAndGet());
}
@Override
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
new file mode 100644
index 0000000..f39413b
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
@@ -0,0 +1,945 @@
+package org.onlab.onos.store.device.impl;
+
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import org.apache.commons.lang3.concurrent.ConcurrentException;
+import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.net.AnnotationsUtil;
+import org.onlab.onos.net.DefaultAnnotations;
+import org.onlab.onos.net.DefaultDevice;
+import org.onlab.onos.net.DefaultPort;
+import org.onlab.onos.net.Device;
+import org.onlab.onos.net.Device.Type;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.Port;
+import org.onlab.onos.net.PortNumber;
+import org.onlab.onos.net.SparseAnnotations;
+import org.onlab.onos.net.device.DefaultDeviceDescription;
+import org.onlab.onos.net.device.DefaultPortDescription;
+import org.onlab.onos.net.device.DeviceDescription;
+import org.onlab.onos.net.device.DeviceEvent;
+import org.onlab.onos.net.device.DeviceStore;
+import org.onlab.onos.net.device.DeviceStoreDelegate;
+import org.onlab.onos.net.device.PortDescription;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.AbstractStore;
+import org.onlab.onos.store.ClockService;
+import org.onlab.onos.store.Timestamp;
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
+import org.onlab.onos.store.common.impl.MastershipBasedTimestamp;
+import org.onlab.onos.store.common.impl.Timestamped;
+import org.onlab.onos.store.serializers.KryoPoolUtil;
+import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.onos.store.serializers.MastershipBasedTimestampSerializer;
+import org.onlab.util.KryoPool;
+import org.onlab.util.NewConcurrentHashMap;
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Predicates.notNull;
+import static org.onlab.onos.net.device.DeviceEvent.Type.*;
+import static org.slf4j.LoggerFactory.getLogger;
+import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
+import static org.onlab.onos.net.DefaultAnnotations.merge;
+import static org.onlab.onos.net.DefaultAnnotations.union;
+import static com.google.common.base.Verify.verify;
+
+// TODO: give me a better name
+/**
+ * Manages inventory of infrastructure devices using gossip protocol to distribute
+ * information.
+ */
+@Component(immediate = true)
+@Service
+public class GossipDeviceStore
+ extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
+ implements DeviceStore {
+
+ private final Logger log = getLogger(getClass());
+
+ public static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
+
+ // TODO: Check if inner Map can be replaced with plain Map
+ // innerMap is used to lock a Device, thus instance should never be replaced.
+ // collection of Description given from various providers
+ private final ConcurrentMap<DeviceId,
+ ConcurrentMap<ProviderId, DeviceDescriptions>>
+ deviceDescs = Maps.newConcurrentMap();
+
+ // cache of Device and Ports generated by compositing descriptions from providers
+ private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap();
+ private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
+
+ // to be updated under Device lock
+ private final Map<DeviceId, Timestamp> offline = Maps.newHashMap();
+ private final Map<DeviceId, Timestamp> removalRequest = Maps.newHashMap();
+
+ // available(=UP) devices
+ private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet();
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClockService clockService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterCommunicationService clusterCommunicator;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ private static final KryoSerializer SERIALIZER = new KryoSerializer() {
+ @Override
+ protected void setupKryoPool() {
+ serializerPool = KryoPool.newBuilder()
+ .register(KryoPoolUtil.API)
+ .register(InternalDeviceEvent.class, new InternalDeviceEventSerializer())
+ .register(InternalDeviceOfflineEvent.class, new InternalDeviceOfflineEventSerializer())
+ .register(InternalDeviceRemovedEvent.class)
+ .register(InternalPortEvent.class, new InternalPortEventSerializer())
+ .register(InternalPortStatusEvent.class, new InternalPortStatusEventSerializer())
+ .register(Timestamp.class)
+ .register(Timestamped.class)
+ .register(MastershipBasedTimestamp.class, new MastershipBasedTimestampSerializer())
+ .build()
+ .populate(1);
+ }
+
+ };
+
+ @Activate
+ public void activate() {
+ clusterCommunicator.addSubscriber(
+ GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener());
+ clusterCommunicator.addSubscriber(
+ GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE, new InternalDeviceOfflineEventListener());
+ clusterCommunicator.addSubscriber(
+ GossipDeviceStoreMessageSubjects.DEVICE_REMOVED, new InternalDeviceRemovedEventListener());
+ clusterCommunicator.addSubscriber(
+ GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener());
+ clusterCommunicator.addSubscriber(
+ GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener());
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ deviceDescs.clear();
+ devices.clear();
+ devicePorts.clear();
+ availableDevices.clear();
+ log.info("Stopped");
+ }
+
+ @Override
+ public int getDeviceCount() {
+ return devices.size();
+ }
+
+ @Override
+ public Iterable<Device> getDevices() {
+ return Collections.unmodifiableCollection(devices.values());
+ }
+
+ @Override
+ public Device getDevice(DeviceId deviceId) {
+ return devices.get(deviceId);
+ }
+
+ @Override
+ public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
+ DeviceId deviceId,
+ DeviceDescription deviceDescription) {
+ Timestamp newTimestamp = clockService.getTimestamp(deviceId);
+ final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
+ DeviceEvent event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
+ if (event != null) {
+ log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
+ providerId, deviceId);
+ try {
+ notifyPeers(new InternalDeviceEvent(providerId, deviceId, deltaDesc));
+ } catch (IOException e) {
+ log.error("Failed to notify peers of a device update topology event for providerId: "
+ + providerId + " and deviceId: " + deviceId, e);
+ }
+ }
+ return event;
+ }
+
+ private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
+ DeviceId deviceId,
+ Timestamped<DeviceDescription> deltaDesc) {
+
+ // Collection of DeviceDescriptions for a Device
+ ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs
+ = getDeviceDescriptions(deviceId);
+
+ synchronized (providerDescs) {
+ // locking per device
+
+ if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
+ log.debug("Ignoring outdated event: {}", deltaDesc);
+ return null;
+ }
+
+ DeviceDescriptions descs
+ = createIfAbsentUnchecked(providerDescs, providerId,
+ new InitDeviceDescs(deltaDesc));
+
+ final Device oldDevice = devices.get(deviceId);
+ final Device newDevice;
+
+ if (deltaDesc == descs.getDeviceDesc() ||
+ deltaDesc.isNewer(descs.getDeviceDesc())) {
+ // on new device or valid update
+ descs.putDeviceDesc(deltaDesc);
+ newDevice = composeDevice(deviceId, providerDescs);
+ } else {
+ // outdated event, ignored.
+ return null;
+ }
+ if (oldDevice == null) {
+ // ADD
+ return createDevice(providerId, newDevice, deltaDesc.timestamp());
+ } else {
+ // UPDATE or ignore (no change or stale)
+ return updateDevice(providerId, oldDevice, newDevice, deltaDesc.timestamp());
+ }
+ }
+ }
+
+ // Creates the device and returns the appropriate event if necessary.
+ // Guarded by deviceDescs value (=Device lock)
+ private DeviceEvent createDevice(ProviderId providerId,
+ Device newDevice, Timestamp timestamp) {
+
+ // update composed device cache
+ Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
+ verify(oldDevice == null,
+ "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
+ providerId, oldDevice, newDevice);
+
+ if (!providerId.isAncillary()) {
+ markOnline(newDevice.id(), timestamp);
+ }
+
+ return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
+ }
+
+ // Updates the device and returns the appropriate event if necessary.
+ // Guarded by deviceDescs value (=Device lock)
+ private DeviceEvent updateDevice(ProviderId providerId,
+ Device oldDevice,
+ Device newDevice, Timestamp newTimestamp) {
+
+ // We allow only certain attributes to trigger update
+ if (!Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
+ !Objects.equals(oldDevice.swVersion(), newDevice.swVersion()) ||
+ !AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations())) {
+
+ boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
+ if (!replaced) {
+ verify(replaced,
+ "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
+ providerId, oldDevice, devices.get(newDevice.id())
+ , newDevice);
+ }
+ if (!providerId.isAncillary()) {
+ markOnline(newDevice.id(), newTimestamp);
+ }
+ return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
+ }
+
+ // Otherwise merely attempt to change availability if primary provider
+ if (!providerId.isAncillary()) {
+ boolean added = markOnline(newDevice.id(), newTimestamp);
+ return !added ? null :
+ new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null);
+ }
+ return null;
+ }
+
+ @Override
+ public DeviceEvent markOffline(DeviceId deviceId) {
+ Timestamp timestamp = clockService.getTimestamp(deviceId);
+ DeviceEvent event = markOfflineInternal(deviceId, timestamp);
+ if (event != null) {
+ log.info("Notifying peers of a device offline topology event for deviceId: {}",
+ deviceId);
+ try {
+ notifyPeers(new InternalDeviceOfflineEvent(deviceId, timestamp));
+ } catch (IOException e) {
+ log.error("Failed to notify peers of a device offline topology event for deviceId: {}",
+ deviceId);
+ }
+ }
+ return event;
+ }
+
+ private DeviceEvent markOfflineInternal(DeviceId deviceId, Timestamp timestamp) {
+
+ Map<ProviderId, DeviceDescriptions> providerDescs
+ = getDeviceDescriptions(deviceId);
+
+ // locking device
+ synchronized (providerDescs) {
+
+ // accept off-line if given timestamp is newer than
+ // the latest Timestamp from Primary provider
+ DeviceDescriptions primDescs = getPrimaryDescriptions(providerDescs);
+ Timestamp lastTimestamp = primDescs.getLatestTimestamp();
+ if (timestamp.compareTo(lastTimestamp) <= 0) {
+ // outdated event ignore
+ return null;
+ }
+
+ offline.put(deviceId, timestamp);
+
+ Device device = devices.get(deviceId);
+ if (device == null) {
+ return null;
+ }
+ boolean removed = availableDevices.remove(deviceId);
+ if (removed) {
+ // TODO: broadcast ... DOWN only?
+ return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
+ }
+ return null;
+ }
+ }
+
+ /**
+ * Marks the device as available if the given timestamp is not outdated,
+ * compared to the time the device has been marked offline.
+ *
+ * @param deviceId identifier of the device
+ * @param timestamp of the event triggering this change.
+ * @return true if availability change request was accepted and changed the state
+ */
+ // Guarded by deviceDescs value (=Device lock)
+ private boolean markOnline(DeviceId deviceId, Timestamp timestamp) {
+ // accept on-line if given timestamp is newer than
+ // the latest offline request Timestamp
+ Timestamp offlineTimestamp = offline.get(deviceId);
+ if (offlineTimestamp == null ||
+ offlineTimestamp.compareTo(timestamp) < 0) {
+
+ offline.remove(deviceId);
+ return availableDevices.add(deviceId);
+ }
+ return false;
+ }
+
+ @Override
+ public synchronized List<DeviceEvent> updatePorts(ProviderId providerId,
+ DeviceId deviceId,
+ List<PortDescription> portDescriptions) {
+ Timestamp newTimestamp = clockService.getTimestamp(deviceId);
+
+ Timestamped<List<PortDescription>> timestampedPortDescriptions =
+ new Timestamped<>(portDescriptions, newTimestamp);
+
+ List<DeviceEvent> events = updatePortsInternal(providerId, deviceId, timestampedPortDescriptions);
+ if (!events.isEmpty()) {
+ log.info("Notifying peers of a port update topology event for providerId: {} and deviceId: {}",
+ providerId, deviceId);
+ try {
+ notifyPeers(new InternalPortEvent(providerId, deviceId, timestampedPortDescriptions));
+ } catch (IOException e) {
+ log.error("Failed to notify peers of a port update topology event or providerId: "
+ + providerId + " and deviceId: " + deviceId, e);
+ }
+ }
+ return events;
+ }
+
+ private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
+ DeviceId deviceId,
+ Timestamped<List<PortDescription>> portDescriptions) {
+
+ Device device = devices.get(deviceId);
+ checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
+
+ ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
+ checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
+
+ List<DeviceEvent> events = new ArrayList<>();
+ synchronized (descsMap) {
+
+ if (isDeviceRemoved(deviceId, portDescriptions.timestamp())) {
+ log.debug("Ignoring outdated events: {}", portDescriptions);
+ return null;
+ }
+
+ DeviceDescriptions descs = descsMap.get(providerId);
+ // every provider must provide DeviceDescription.
+ checkArgument(descs != null,
+ "Device description for Device ID %s from Provider %s was not found",
+ deviceId, providerId);
+
+ Map<PortNumber, Port> ports = getPortMap(deviceId);
+
+ final Timestamp newTimestamp = portDescriptions.timestamp();
+
+ // Add new ports
+ Set<PortNumber> processed = new HashSet<>();
+ for (PortDescription portDescription : portDescriptions.value()) {
+ final PortNumber number = portDescription.portNumber();
+ processed.add(number);
+
+ final Port oldPort = ports.get(number);
+ final Port newPort;
+
+
+ final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
+ if (existingPortDesc == null ||
+ newTimestamp.compareTo(existingPortDesc.timestamp()) >= 0) {
+ // on new port or valid update
+ // update description
+ descs.putPortDesc(new Timestamped<>(portDescription,
+ portDescriptions.timestamp()));
+ newPort = composePort(device, number, descsMap);
+ } else {
+ // outdated event, ignored.
+ continue;
+ }
+
+ events.add(oldPort == null ?
+ createPort(device, newPort, ports) :
+ updatePort(device, oldPort, newPort, ports));
+ }
+
+ events.addAll(pruneOldPorts(device, ports, processed));
+ }
+ return FluentIterable.from(events).filter(notNull()).toList();
+ }
+
+ // Creates a new port based on the port description adds it to the map and
+ // Returns corresponding event.
+ // Guarded by deviceDescs value (=Device lock)
+ private DeviceEvent createPort(Device device, Port newPort,
+ Map<PortNumber, Port> ports) {
+ ports.put(newPort.number(), newPort);
+ return new DeviceEvent(PORT_ADDED, device, newPort);
+ }
+
+ // Checks if the specified port requires update and if so, it replaces the
+ // existing entry in the map and returns corresponding event.
+ // Guarded by deviceDescs value (=Device lock)
+ private DeviceEvent updatePort(Device device, Port oldPort,
+ Port newPort,
+ Map<PortNumber, Port> ports) {
+ if (oldPort.isEnabled() != newPort.isEnabled() ||
+ !AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) {
+
+ ports.put(oldPort.number(), newPort);
+ return new DeviceEvent(PORT_UPDATED, device, newPort);
+ }
+ return null;
+ }
+
+ // Prunes the specified list of ports based on which ports are in the
+ // processed list and returns list of corresponding events.
+ // Guarded by deviceDescs value (=Device lock)
+ private List<DeviceEvent> pruneOldPorts(Device device,
+ Map<PortNumber, Port> ports,
+ Set<PortNumber> processed) {
+ List<DeviceEvent> events = new ArrayList<>();
+ Iterator<PortNumber> iterator = ports.keySet().iterator();
+ while (iterator.hasNext()) {
+ PortNumber portNumber = iterator.next();
+ if (!processed.contains(portNumber)) {
+ events.add(new DeviceEvent(PORT_REMOVED, device,
+ ports.get(portNumber)));
+ iterator.remove();
+ }
+ }
+ return events;
+ }
+
+ // Gets the map of ports for the specified device; if one does not already
+ // exist, it creates and registers a new one.
+ private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
+ return createIfAbsentUnchecked(devicePorts, deviceId,
+ NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
+ }
+
+ private ConcurrentMap<ProviderId, DeviceDescriptions> getDeviceDescriptions(
+ DeviceId deviceId) {
+ return createIfAbsentUnchecked(deviceDescs, deviceId,
+ NewConcurrentHashMap.<ProviderId, DeviceDescriptions>ifNeeded());
+ }
+
+ @Override
+ public synchronized DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId,
+ PortDescription portDescription) {
+ Timestamp newTimestamp = clockService.getTimestamp(deviceId);
+ final Timestamped<PortDescription> deltaDesc = new Timestamped<>(portDescription, newTimestamp);
+ DeviceEvent event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
+ if (event != null) {
+ log.info("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
+ providerId, deviceId);
+ try {
+ notifyPeers(new InternalPortStatusEvent(providerId, deviceId, deltaDesc));
+ } catch (IOException e) {
+ log.error("Failed to notify peers of a port status update topology event or providerId: "
+ + providerId + " and deviceId: " + deviceId, e);
+ }
+ }
+ return event;
+ }
+
+ private DeviceEvent updatePortStatusInternal(ProviderId providerId, DeviceId deviceId,
+ Timestamped<PortDescription> deltaDesc) {
+
+ Device device = devices.get(deviceId);
+ checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
+
+ ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
+ checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
+
+ synchronized (descsMap) {
+
+ if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
+ log.debug("Ignoring outdated event: {}", deltaDesc);
+ return null;
+ }
+
+ DeviceDescriptions descs = descsMap.get(providerId);
+ // assuming all providers must to give DeviceDescription
+ checkArgument(descs != null,
+ "Device description for Device ID %s from Provider %s was not found",
+ deviceId, providerId);
+
+ ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
+ final PortNumber number = deltaDesc.value().portNumber();
+ final Port oldPort = ports.get(number);
+ final Port newPort;
+
+ final Timestamped<PortDescription> existingPortDesc = descs.getPortDesc(number);
+ if (existingPortDesc == null ||
+ deltaDesc == existingPortDesc ||
+ deltaDesc.isNewer(existingPortDesc)) {
+ // on new port or valid update
+ // update description
+ descs.putPortDesc(deltaDesc);
+ newPort = composePort(device, number, descsMap);
+ } else {
+ // outdated event, ignored.
+ return null;
+ }
+
+ if (oldPort == null) {
+ return createPort(device, newPort, ports);
+ } else {
+ return updatePort(device, oldPort, newPort, ports);
+ }
+ }
+ }
+
+ @Override
+ public List<Port> getPorts(DeviceId deviceId) {
+ Map<PortNumber, Port> ports = devicePorts.get(deviceId);
+ if (ports == null) {
+ return Collections.emptyList();
+ }
+ return ImmutableList.copyOf(ports.values());
+ }
+
+ @Override
+ public Port getPort(DeviceId deviceId, PortNumber portNumber) {
+ Map<PortNumber, Port> ports = devicePorts.get(deviceId);
+ return ports == null ? null : ports.get(portNumber);
+ }
+
+ @Override
+ public boolean isAvailable(DeviceId deviceId) {
+ return availableDevices.contains(deviceId);
+ }
+
+ @Override
+ public synchronized DeviceEvent removeDevice(DeviceId deviceId) {
+ Timestamp timestamp = clockService.getTimestamp(deviceId);
+ DeviceEvent event = removeDeviceInternal(deviceId, timestamp);
+ if (event != null) {
+ log.info("Notifying peers of a device removed topology event for deviceId: {}",
+ deviceId);
+ try {
+ notifyPeers(new InternalDeviceRemovedEvent(deviceId, timestamp));
+ } catch (IOException e) {
+ log.error("Failed to notify peers of a device removed topology event for deviceId: {}",
+ deviceId);
+ }
+ }
+ return event;
+ }
+
+ private DeviceEvent removeDeviceInternal(DeviceId deviceId,
+ Timestamp timestamp) {
+
+ Map<ProviderId, DeviceDescriptions> descs = getDeviceDescriptions(deviceId);
+ synchronized (descs) {
+ // accept removal request if given timestamp is newer than
+ // the latest Timestamp from Primary provider
+ DeviceDescriptions primDescs = getPrimaryDescriptions(descs);
+ Timestamp lastTimestamp = primDescs.getLatestTimestamp();
+ if (timestamp.compareTo(lastTimestamp) <= 0) {
+ // outdated event ignore
+ return null;
+ }
+ removalRequest.put(deviceId, timestamp);
+
+ Device device = devices.remove(deviceId);
+ // should DEVICE_REMOVED carry removed ports?
+ Map<PortNumber, Port> ports = devicePorts.get(deviceId);
+ if (ports != null) {
+ ports.clear();
+ }
+ markOfflineInternal(deviceId, timestamp);
+ descs.clear();
+ return device == null ? null :
+ new DeviceEvent(DEVICE_REMOVED, device, null);
+ }
+ }
+
+ private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
+ Timestamp removalTimestamp = removalRequest.get(deviceId);
+ if (removalTimestamp != null &&
+ removalTimestamp.compareTo(timestampToCheck) >= 0) {
+ // removalRequest is more recent
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Returns a Device, merging description given from multiple Providers.
+ *
+ * @param deviceId device identifier
+ * @param providerDescs Collection of Descriptions from multiple providers
+ * @return Device instance
+ */
+ private Device composeDevice(DeviceId deviceId,
+ ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
+
+ checkArgument(!providerDescs.isEmpty(), "No Device descriptions supplied");
+
+ ProviderId primary = pickPrimaryPID(providerDescs);
+
+ DeviceDescriptions desc = providerDescs.get(primary);
+
+ final DeviceDescription base = desc.getDeviceDesc().value();
+ Type type = base.type();
+ String manufacturer = base.manufacturer();
+ String hwVersion = base.hwVersion();
+ String swVersion = base.swVersion();
+ String serialNumber = base.serialNumber();
+ DefaultAnnotations annotations = DefaultAnnotations.builder().build();
+ annotations = merge(annotations, base.annotations());
+
+ for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
+ if (e.getKey().equals(primary)) {
+ continue;
+ }
+ // TODO: should keep track of Description timestamp
+ // and only merge conflicting keys when timestamp is newer
+ // Currently assuming there will never be a key conflict between
+ // providers
+
+ // annotation merging. not so efficient, should revisit later
+ annotations = merge(annotations, e.getValue().getDeviceDesc().value().annotations());
+ }
+
+ return new DefaultDevice(primary, deviceId , type, manufacturer,
+ hwVersion, swVersion, serialNumber, annotations);
+ }
+
+ /**
+ * Returns a Port, merging description given from multiple Providers.
+ *
+ * @param device device the port is on
+ * @param number port number
+ * @param providerDescs Collection of Descriptions from multiple providers
+ * @return Port instance
+ */
+ private Port composePort(Device device, PortNumber number,
+ ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
+
+ ProviderId primary = pickPrimaryPID(providerDescs);
+ DeviceDescriptions primDescs = providerDescs.get(primary);
+ // if no primary, assume not enabled
+ // TODO: revisit this default port enabled/disabled behavior
+ boolean isEnabled = false;
+ DefaultAnnotations annotations = DefaultAnnotations.builder().build();
+
+ final Timestamped<PortDescription> portDesc = primDescs.getPortDesc(number);
+ if (portDesc != null) {
+ isEnabled = portDesc.value().isEnabled();
+ annotations = merge(annotations, portDesc.value().annotations());
+ }
+
+ for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
+ if (e.getKey().equals(primary)) {
+ continue;
+ }
+ // TODO: should keep track of Description timestamp
+ // and only merge conflicting keys when timestamp is newer
+ // Currently assuming there will never be a key conflict between
+ // providers
+
+ // annotation merging. not so efficient, should revisit later
+ final Timestamped<PortDescription> otherPortDesc = e.getValue().getPortDesc(number);
+ if (otherPortDesc != null) {
+ annotations = merge(annotations, otherPortDesc.value().annotations());
+ }
+ }
+
+ return new DefaultPort(device, number, isEnabled, annotations);
+ }
+
+ /**
+ * @return primary ProviderID, or randomly chosen one if none exists
+ */
+ private ProviderId pickPrimaryPID(
+ Map<ProviderId, DeviceDescriptions> providerDescs) {
+ ProviderId fallBackPrimary = null;
+ for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
+ if (!e.getKey().isAncillary()) {
+ return e.getKey();
+ } else if (fallBackPrimary == null) {
+ // pick randomly as a fallback in case there is no primary
+ fallBackPrimary = e.getKey();
+ }
+ }
+ return fallBackPrimary;
+ }
+
+ private DeviceDescriptions getPrimaryDescriptions(
+ Map<ProviderId, DeviceDescriptions> providerDescs) {
+ ProviderId pid = pickPrimaryPID(providerDescs);
+ return providerDescs.get(pid);
+ }
+
+ public static final class InitDeviceDescs
+ implements ConcurrentInitializer<DeviceDescriptions> {
+
+ private final Timestamped<DeviceDescription> deviceDesc;
+
+ public InitDeviceDescs(Timestamped<DeviceDescription> deviceDesc) {
+ this.deviceDesc = checkNotNull(deviceDesc);
+ }
+ @Override
+ public DeviceDescriptions get() throws ConcurrentException {
+ return new DeviceDescriptions(deviceDesc);
+ }
+ }
+
+
+ /**
+ * Collection of Description of a Device and it's Ports given from a Provider.
+ */
+ public static class DeviceDescriptions {
+
+ private final AtomicReference<Timestamped<DeviceDescription>> deviceDesc;
+ private final ConcurrentMap<PortNumber, Timestamped<PortDescription>> portDescs;
+
+ public DeviceDescriptions(Timestamped<DeviceDescription> desc) {
+ this.deviceDesc = new AtomicReference<>(checkNotNull(desc));
+ this.portDescs = new ConcurrentHashMap<>();
+ }
+
+ Timestamp getLatestTimestamp() {
+ Timestamp latest = deviceDesc.get().timestamp();
+ for (Timestamped<PortDescription> desc : portDescs.values()) {
+ if (desc.timestamp().compareTo(latest) > 0) {
+ latest = desc.timestamp();
+ }
+ }
+ return latest;
+ }
+
+ public Timestamped<DeviceDescription> getDeviceDesc() {
+ return deviceDesc.get();
+ }
+
+ public Timestamped<PortDescription> getPortDesc(PortNumber number) {
+ return portDescs.get(number);
+ }
+
+ /**
+ * Puts DeviceDescription, merging annotations as necessary.
+ *
+ * @param newDesc new DeviceDescription
+ * @return previous DeviceDescription
+ */
+ public synchronized Timestamped<DeviceDescription> putDeviceDesc(Timestamped<DeviceDescription> newDesc) {
+ Timestamped<DeviceDescription> oldOne = deviceDesc.get();
+ Timestamped<DeviceDescription> newOne = newDesc;
+ if (oldOne != null) {
+ SparseAnnotations merged = union(oldOne.value().annotations(),
+ newDesc.value().annotations());
+ newOne = new Timestamped<DeviceDescription>(
+ new DefaultDeviceDescription(newDesc.value(), merged),
+ newDesc.timestamp());
+ }
+ return deviceDesc.getAndSet(newOne);
+ }
+
+ /**
+ * Puts PortDescription, merging annotations as necessary.
+ *
+ * @param newDesc new PortDescription
+ * @return previous PortDescription
+ */
+ public synchronized Timestamped<PortDescription> putPortDesc(Timestamped<PortDescription> newDesc) {
+ Timestamped<PortDescription> oldOne = portDescs.get(newDesc.value().portNumber());
+ Timestamped<PortDescription> newOne = newDesc;
+ if (oldOne != null) {
+ SparseAnnotations merged = union(oldOne.value().annotations(),
+ newDesc.value().annotations());
+ newOne = new Timestamped<PortDescription>(
+ new DefaultPortDescription(newDesc.value(), merged),
+ newDesc.timestamp());
+ }
+ return portDescs.put(newOne.value().portNumber(), newOne);
+ }
+ }
+
+ private void notifyPeers(InternalDeviceEvent event) throws IOException {
+ ClusterMessage message = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ GossipDeviceStoreMessageSubjects.DEVICE_UPDATE,
+ SERIALIZER.encode(event));
+ clusterCommunicator.broadcast(message);
+ }
+
+ private void notifyPeers(InternalDeviceOfflineEvent event) throws IOException {
+ ClusterMessage message = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ GossipDeviceStoreMessageSubjects.DEVICE_OFFLINE,
+ SERIALIZER.encode(event));
+ clusterCommunicator.broadcast(message);
+ }
+
+ private void notifyPeers(InternalDeviceRemovedEvent event) throws IOException {
+ ClusterMessage message = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ GossipDeviceStoreMessageSubjects.DEVICE_REMOVED,
+ SERIALIZER.encode(event));
+ clusterCommunicator.broadcast(message);
+ }
+
+ private void notifyPeers(InternalPortEvent event) throws IOException {
+ ClusterMessage message = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ GossipDeviceStoreMessageSubjects.PORT_UPDATE,
+ SERIALIZER.encode(event));
+ clusterCommunicator.broadcast(message);
+ }
+
+ private void notifyPeers(InternalPortStatusEvent event) throws IOException {
+ ClusterMessage message = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE,
+ SERIALIZER.encode(event));
+ clusterCommunicator.broadcast(message);
+ }
+
+ private class InternalDeviceEventListener implements ClusterMessageHandler {
+ @Override
+ public void handle(ClusterMessage message) {
+
+ log.info("Received device update event from peer: {}", message.sender());
+ InternalDeviceEvent event = (InternalDeviceEvent) SERIALIZER.decode(message.payload());
+
+ ProviderId providerId = event.providerId();
+ DeviceId deviceId = event.deviceId();
+ Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
+
+ createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription);
+ }
+ }
+
+ private class InternalDeviceOfflineEventListener implements ClusterMessageHandler {
+ @Override
+ public void handle(ClusterMessage message) {
+
+ log.info("Received device offline event from peer: {}", message.sender());
+ InternalDeviceOfflineEvent event = (InternalDeviceOfflineEvent) SERIALIZER.decode(message.payload());
+
+ DeviceId deviceId = event.deviceId();
+ Timestamp timestamp = event.timestamp();
+
+ markOfflineInternal(deviceId, timestamp);
+ }
+ }
+
+ private class InternalDeviceRemovedEventListener implements ClusterMessageHandler {
+ @Override
+ public void handle(ClusterMessage message) {
+
+ log.info("Received device removed event from peer: {}", message.sender());
+ InternalDeviceRemovedEvent event = (InternalDeviceRemovedEvent) SERIALIZER.decode(message.payload());
+
+ DeviceId deviceId = event.deviceId();
+ Timestamp timestamp = event.timestamp();
+
+ removeDeviceInternal(deviceId, timestamp);
+ }
+ }
+
+ private class InternalPortEventListener implements ClusterMessageHandler {
+ @Override
+ public void handle(ClusterMessage message) {
+
+ log.info("Received port update event from peer: {}", message.sender());
+ InternalPortEvent event = (InternalPortEvent) SERIALIZER.decode(message.payload());
+
+ ProviderId providerId = event.providerId();
+ DeviceId deviceId = event.deviceId();
+ Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions();
+
+ updatePortsInternal(providerId, deviceId, portDescriptions);
+ }
+ }
+
+ private class InternalPortStatusEventListener implements ClusterMessageHandler {
+ @Override
+ public void handle(ClusterMessage message) {
+
+ log.info("Received port status update event from peer: {}", message.sender());
+ InternalPortStatusEvent event = (InternalPortStatusEvent) SERIALIZER.decode(message.payload());
+
+ ProviderId providerId = event.providerId();
+ DeviceId deviceId = event.deviceId();
+ Timestamped<PortDescription> portDescription = event.portDescription();
+
+ updatePortStatusInternal(providerId, deviceId, portDescription);
+ }
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStoreMessageSubjects.java
new file mode 100644
index 0000000..5272182
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStoreMessageSubjects.java
@@ -0,0 +1,17 @@
+package org.onlab.onos.store.device.impl;
+
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+
+/**
+ * MessageSubjects used by GossipDeviceStore peer-peer communication.
+ */
+public final class GossipDeviceStoreMessageSubjects {
+
+ private GossipDeviceStoreMessageSubjects() {}
+
+ public static final MessageSubject DEVICE_UPDATE = new MessageSubject("peer-device-update");
+ public static final MessageSubject DEVICE_OFFLINE = new MessageSubject("peer-device-offline");
+ public static final MessageSubject DEVICE_REMOVED = new MessageSubject("peer-device-removed");
+ public static final MessageSubject PORT_UPDATE = new MessageSubject("peer-port-update");
+ public static final MessageSubject PORT_STATUS_UPDATE = new MessageSubject("peer-port-status-update");
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEvent.java
new file mode 100644
index 0000000..4214384
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEvent.java
@@ -0,0 +1,45 @@
+package org.onlab.onos.store.device.impl;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.device.DeviceDescription;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.common.impl.Timestamped;
+
+/**
+ * Information published by GossipDeviceStore to notify peers of a device
+ * change event.
+ */
+public class InternalDeviceEvent {
+
+ private final ProviderId providerId;
+ private final DeviceId deviceId;
+ private final Timestamped<DeviceDescription> deviceDescription;
+
+ protected InternalDeviceEvent(
+ ProviderId providerId,
+ DeviceId deviceId,
+ Timestamped<DeviceDescription> deviceDescription) {
+ this.providerId = providerId;
+ this.deviceId = deviceId;
+ this.deviceDescription = deviceDescription;
+ }
+
+ public DeviceId deviceId() {
+ return deviceId;
+ }
+
+ public ProviderId providerId() {
+ return providerId;
+ }
+
+ public Timestamped<DeviceDescription> deviceDescription() {
+ return deviceDescription;
+ }
+
+ // for serializer
+ protected InternalDeviceEvent() {
+ this.providerId = null;
+ this.deviceId = null;
+ this.deviceDescription = null;
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEventSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEventSerializer.java
new file mode 100644
index 0000000..0d3d013
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEventSerializer.java
@@ -0,0 +1,43 @@
+package org.onlab.onos.store.device.impl;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.device.DeviceDescription;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.common.impl.Timestamped;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Kryo Serializer for {@link InternalDeviceEvent}.
+ */
+public class InternalDeviceEventSerializer extends Serializer<InternalDeviceEvent> {
+
+ /**
+ * Creates a serializer for {@link InternalDeviceEvent}.
+ */
+ public InternalDeviceEventSerializer() {
+ // does not accept null
+ super(false);
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output, InternalDeviceEvent event) {
+ kryo.writeClassAndObject(output, event.providerId());
+ kryo.writeClassAndObject(output, event.deviceId());
+ kryo.writeClassAndObject(output, event.deviceDescription());
+ }
+
+ @Override
+ public InternalDeviceEvent read(Kryo kryo, Input input,
+ Class<InternalDeviceEvent> type) {
+ ProviderId providerId = (ProviderId) kryo.readClassAndObject(input);
+ DeviceId deviceId = (DeviceId) kryo.readClassAndObject(input);
+ Timestamped<DeviceDescription> deviceDescription
+ = (Timestamped<DeviceDescription>) kryo.readClassAndObject(input);
+
+ return new InternalDeviceEvent(providerId, deviceId, deviceDescription);
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceOfflineEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceOfflineEvent.java
new file mode 100644
index 0000000..d8942d6
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceOfflineEvent.java
@@ -0,0 +1,39 @@
+package org.onlab.onos.store.device.impl;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.store.Timestamp;
+
+/**
+ * Information published by GossipDeviceStore to notify peers of a device
+ * going offline.
+ */
+public class InternalDeviceOfflineEvent {
+
+ private final DeviceId deviceId;
+ private final Timestamp timestamp;
+
+ /**
+ * Creates a InternalDeviceOfflineEvent.
+ * @param deviceId identifier of device going offline.
+ * @param timestamp timestamp of when the device went offline.
+ */
+ public InternalDeviceOfflineEvent(DeviceId deviceId, Timestamp timestamp) {
+ this.deviceId = deviceId;
+ this.timestamp = timestamp;
+ }
+
+ public DeviceId deviceId() {
+ return deviceId;
+ }
+
+ public Timestamp timestamp() {
+ return timestamp;
+ }
+
+ // for serializer
+ @SuppressWarnings("unused")
+ private InternalDeviceOfflineEvent() {
+ deviceId = null;
+ timestamp = null;
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceOfflineEventSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceOfflineEventSerializer.java
new file mode 100644
index 0000000..7059636
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceOfflineEventSerializer.java
@@ -0,0 +1,38 @@
+package org.onlab.onos.store.device.impl;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.store.Timestamp;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Kryo Serializer for {@link InternalDeviceOfflineEvent}.
+ */
+public class InternalDeviceOfflineEventSerializer extends Serializer<InternalDeviceOfflineEvent> {
+
+ /**
+ * Creates a serializer for {@link InternalDeviceOfflineEvent}.
+ */
+ public InternalDeviceOfflineEventSerializer() {
+ // does not accept null
+ super(false);
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output, InternalDeviceOfflineEvent event) {
+ kryo.writeClassAndObject(output, event.deviceId());
+ kryo.writeClassAndObject(output, event.timestamp());
+ }
+
+ @Override
+ public InternalDeviceOfflineEvent read(Kryo kryo, Input input,
+ Class<InternalDeviceOfflineEvent> type) {
+ DeviceId deviceId = (DeviceId) kryo.readClassAndObject(input);
+ Timestamp timestamp = (Timestamp) kryo.readClassAndObject(input);
+
+ return new InternalDeviceOfflineEvent(deviceId, timestamp);
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceRemovedEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceRemovedEvent.java
new file mode 100644
index 0000000..6c8b905
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceRemovedEvent.java
@@ -0,0 +1,39 @@
+package org.onlab.onos.store.device.impl;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.store.Timestamp;
+
+/**
+ * Information published by GossipDeviceStore to notify peers of a device
+ * being administratively removed.
+ */
+public class InternalDeviceRemovedEvent {
+
+ private final DeviceId deviceId;
+ private final Timestamp timestamp;
+
+ /**
+ * Creates a InternalDeviceRemovedEvent.
+ * @param deviceId identifier of the removed device.
+ * @param timestamp timestamp of when the device was administratively removed.
+ */
+ public InternalDeviceRemovedEvent(DeviceId deviceId, Timestamp timestamp) {
+ this.deviceId = deviceId;
+ this.timestamp = timestamp;
+ }
+
+ public DeviceId deviceId() {
+ return deviceId;
+ }
+
+ public Timestamp timestamp() {
+ return timestamp;
+ }
+
+ // for serializer
+ @SuppressWarnings("unused")
+ private InternalDeviceRemovedEvent() {
+ deviceId = null;
+ timestamp = null;
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEvent.java
new file mode 100644
index 0000000..64e77ca
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEvent.java
@@ -0,0 +1,47 @@
+package org.onlab.onos.store.device.impl;
+
+import java.util.List;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.device.PortDescription;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.common.impl.Timestamped;
+
+/**
+ * Information published by GossipDeviceStore to notify peers of a port
+ * change event.
+ */
+public class InternalPortEvent {
+
+ private final ProviderId providerId;
+ private final DeviceId deviceId;
+ private final Timestamped<List<PortDescription>> portDescriptions;
+
+ protected InternalPortEvent(
+ ProviderId providerId,
+ DeviceId deviceId,
+ Timestamped<List<PortDescription>> portDescriptions) {
+ this.providerId = providerId;
+ this.deviceId = deviceId;
+ this.portDescriptions = portDescriptions;
+ }
+
+ public DeviceId deviceId() {
+ return deviceId;
+ }
+
+ public ProviderId providerId() {
+ return providerId;
+ }
+
+ public Timestamped<List<PortDescription>> portDescriptions() {
+ return portDescriptions;
+ }
+
+ // for serializer
+ protected InternalPortEvent() {
+ this.providerId = null;
+ this.deviceId = null;
+ this.portDescriptions = null;
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEventSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEventSerializer.java
new file mode 100644
index 0000000..6fff395
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEventSerializer.java
@@ -0,0 +1,45 @@
+package org.onlab.onos.store.device.impl;
+
+import java.util.List;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.device.PortDescription;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.common.impl.Timestamped;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Kryo Serializer for {@link InternalPortEvent}.
+ */
+public class InternalPortEventSerializer extends Serializer<InternalPortEvent> {
+
+ /**
+ * Creates a serializer for {@link InternalPortEvent}.
+ */
+ public InternalPortEventSerializer() {
+ // does not accept null
+ super(false);
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output, InternalPortEvent event) {
+ kryo.writeClassAndObject(output, event.providerId());
+ kryo.writeClassAndObject(output, event.deviceId());
+ kryo.writeClassAndObject(output, event.portDescriptions());
+ }
+
+ @Override
+ public InternalPortEvent read(Kryo kryo, Input input,
+ Class<InternalPortEvent> type) {
+ ProviderId providerId = (ProviderId) kryo.readClassAndObject(input);
+ DeviceId deviceId = (DeviceId) kryo.readClassAndObject(input);
+ Timestamped<List<PortDescription>> portDescriptions
+ = (Timestamped<List<PortDescription>>) kryo.readClassAndObject(input);
+
+ return new InternalPortEvent(providerId, deviceId, portDescriptions);
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEvent.java
new file mode 100644
index 0000000..7d3854b
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEvent.java
@@ -0,0 +1,45 @@
+package org.onlab.onos.store.device.impl;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.device.PortDescription;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.common.impl.Timestamped;
+
+/**
+ * Information published by GossipDeviceStore to notify peers of a port
+ * status change event.
+ */
+public class InternalPortStatusEvent {
+
+ private final ProviderId providerId;
+ private final DeviceId deviceId;
+ private final Timestamped<PortDescription> portDescription;
+
+ protected InternalPortStatusEvent(
+ ProviderId providerId,
+ DeviceId deviceId,
+ Timestamped<PortDescription> portDescription) {
+ this.providerId = providerId;
+ this.deviceId = deviceId;
+ this.portDescription = portDescription;
+ }
+
+ public DeviceId deviceId() {
+ return deviceId;
+ }
+
+ public ProviderId providerId() {
+ return providerId;
+ }
+
+ public Timestamped<PortDescription> portDescription() {
+ return portDescription;
+ }
+
+ // for serializer
+ protected InternalPortStatusEvent() {
+ this.providerId = null;
+ this.deviceId = null;
+ this.portDescription = null;
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEventSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEventSerializer.java
new file mode 100644
index 0000000..6ec4122
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEventSerializer.java
@@ -0,0 +1,42 @@
+package org.onlab.onos.store.device.impl;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.device.PortDescription;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.common.impl.Timestamped;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Kryo Serializer for {@link InternalPortStatusEvent}.
+ */
+public class InternalPortStatusEventSerializer extends Serializer<InternalPortStatusEvent> {
+
+ /**
+ * Creates a serializer for {@link InternalPortStatusEvent}.
+ */
+ public InternalPortStatusEventSerializer() {
+ // does not accept null
+ super(false);
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output, InternalPortStatusEvent event) {
+ kryo.writeClassAndObject(output, event.providerId());
+ kryo.writeClassAndObject(output, event.deviceId());
+ kryo.writeClassAndObject(output, event.portDescription());
+ }
+
+ @Override
+ public InternalPortStatusEvent read(Kryo kryo, Input input,
+ Class<InternalPortStatusEvent> type) {
+ ProviderId providerId = (ProviderId) kryo.readClassAndObject(input);
+ DeviceId deviceId = (DeviceId) kryo.readClassAndObject(input);
+ Timestamped<PortDescription> portDescription = (Timestamped<PortDescription>) kryo.readClassAndObject(input);
+
+ return new InternalPortStatusEvent(providerId, deviceId, portDescription);
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/OnosDistributedDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/OnosDistributedDeviceStore.java
deleted file mode 100644
index d912983..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/OnosDistributedDeviceStore.java
+++ /dev/null
@@ -1,339 +0,0 @@
-package org.onlab.onos.store.device.impl;
-
-import static com.google.common.base.Predicates.notNull;
-import static com.google.common.base.Preconditions.checkState;
-
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.ImmutableSet.Builder;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.onos.net.DefaultDevice;
-import org.onlab.onos.net.DefaultPort;
-import org.onlab.onos.net.Device;
-import org.onlab.onos.net.DeviceId;
-import org.onlab.onos.net.Port;
-import org.onlab.onos.net.PortNumber;
-import org.onlab.onos.net.device.DeviceDescription;
-import org.onlab.onos.net.device.DeviceEvent;
-import org.onlab.onos.net.device.DeviceStore;
-import org.onlab.onos.net.device.DeviceStoreDelegate;
-import org.onlab.onos.net.device.PortDescription;
-import org.onlab.onos.net.provider.ProviderId;
-import org.onlab.onos.store.AbstractStore;
-import org.onlab.onos.store.ClockService;
-import org.onlab.onos.store.Timestamp;
-import org.slf4j.Logger;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import static com.google.common.base.Preconditions.checkArgument;
-import static org.onlab.onos.net.device.DeviceEvent.Type.*;
-import static org.slf4j.LoggerFactory.getLogger;
-
-/**
- * Manages inventory of infrastructure devices using a protocol that takes into consideration
- * the order in which device events occur.
- */
-@Component(immediate = true)
-@Service
-public class OnosDistributedDeviceStore
- extends AbstractStore<DeviceEvent, DeviceStoreDelegate>
- implements DeviceStore {
-
- private final Logger log = getLogger(getClass());
-
- public static final String DEVICE_NOT_FOUND = "Device with ID %s not found";
-
- private ConcurrentMap<DeviceId, VersionedValue<Device>> devices;
- private ConcurrentMap<DeviceId, Map<PortNumber, VersionedValue<Port>>> devicePorts;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected ClockService clockService;
-
- @Activate
- public void activate() {
-
- devices = new ConcurrentHashMap<>();
- devicePorts = new ConcurrentHashMap<>();
-
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- log.info("Stopped");
- }
-
- @Override
- public int getDeviceCount() {
- return devices.size();
- }
-
- @Override
- public Iterable<Device> getDevices() {
- Builder<Device> builder = ImmutableSet.builder();
- synchronized (this) {
- for (VersionedValue<Device> device : devices.values()) {
- builder.add(device.entity());
- }
- return builder.build();
- }
- }
-
- @Override
- public Device getDevice(DeviceId deviceId) {
- VersionedValue<Device> device = devices.get(deviceId);
- checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
- return device.entity();
- }
-
- @Override
- public DeviceEvent createOrUpdateDevice(ProviderId providerId, DeviceId deviceId,
- DeviceDescription deviceDescription) {
- Timestamp newTimestamp = clockService.getTimestamp(deviceId);
- VersionedValue<Device> device = devices.get(deviceId);
-
- if (device == null) {
- return createDevice(providerId, deviceId, deviceDescription, newTimestamp);
- }
-
- checkState(newTimestamp.compareTo(device.timestamp()) > 0,
- "Existing device has a timestamp in the future!");
-
- return updateDevice(providerId, device.entity(), deviceDescription, newTimestamp);
- }
-
- // Creates the device and returns the appropriate event if necessary.
- private DeviceEvent createDevice(ProviderId providerId, DeviceId deviceId,
- DeviceDescription desc, Timestamp timestamp) {
- Device device = new DefaultDevice(providerId, deviceId, desc.type(),
- desc.manufacturer(),
- desc.hwVersion(), desc.swVersion(),
- desc.serialNumber());
-
- devices.put(deviceId, new VersionedValue<>(device, true, timestamp));
- // TODO,FIXME: broadcast a message telling peers of a device event.
- return new DeviceEvent(DEVICE_ADDED, device, null);
- }
-
- // Updates the device and returns the appropriate event if necessary.
- private DeviceEvent updateDevice(ProviderId providerId, Device device,
- DeviceDescription desc, Timestamp timestamp) {
- // We allow only certain attributes to trigger update
- if (!Objects.equals(device.hwVersion(), desc.hwVersion()) ||
- !Objects.equals(device.swVersion(), desc.swVersion())) {
-
- Device updated = new DefaultDevice(providerId, device.id(),
- desc.type(),
- desc.manufacturer(),
- desc.hwVersion(),
- desc.swVersion(),
- desc.serialNumber());
- devices.put(device.id(), new VersionedValue<Device>(updated, true, timestamp));
- // FIXME: broadcast a message telling peers of a device event.
- return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, updated, null);
- }
-
- // Otherwise merely attempt to change availability
- Device updated = new DefaultDevice(providerId, device.id(),
- desc.type(),
- desc.manufacturer(),
- desc.hwVersion(),
- desc.swVersion(),
- desc.serialNumber());
-
- VersionedValue<Device> oldDevice = devices.put(device.id(),
- new VersionedValue<Device>(updated, true, timestamp));
- if (!oldDevice.isUp()) {
- return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
- } else {
- return null;
- }
- }
-
- @Override
- public DeviceEvent markOffline(DeviceId deviceId) {
- VersionedValue<Device> device = devices.get(deviceId);
- boolean willRemove = device != null && device.isUp();
- if (!willRemove) {
- return null;
- }
- Timestamp timestamp = clockService.getTimestamp(deviceId);
- if (replaceIfLatest(device.entity(), false, timestamp)) {
- return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device.entity(), null);
- }
- return null;
- }
-
- // Replace existing value if its timestamp is older.
- private synchronized boolean replaceIfLatest(Device device, boolean isUp, Timestamp timestamp) {
- VersionedValue<Device> existingValue = devices.get(device.id());
- if (timestamp.compareTo(existingValue.timestamp()) > 0) {
- devices.put(device.id(), new VersionedValue<Device>(device, isUp, timestamp));
- return true;
- }
- return false;
- }
-
- @Override
- public List<DeviceEvent> updatePorts(ProviderId providerId, DeviceId deviceId,
- List<PortDescription> portDescriptions) {
- List<DeviceEvent> events = new ArrayList<>();
- synchronized (this) {
- VersionedValue<Device> device = devices.get(deviceId);
- checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
- Map<PortNumber, VersionedValue<Port>> ports = getPortMap(deviceId);
- Timestamp newTimestamp = clockService.getTimestamp(deviceId);
-
- // Add new ports
- Set<PortNumber> processed = new HashSet<>();
- for (PortDescription portDescription : portDescriptions) {
- VersionedValue<Port> port = ports.get(portDescription.portNumber());
- if (port == null) {
- events.add(createPort(device, portDescription, ports, newTimestamp));
- }
- checkState(newTimestamp.compareTo(port.timestamp()) > 0,
- "Existing port state has a timestamp in the future!");
- events.add(updatePort(device.entity(), port.entity(), portDescription, ports, newTimestamp));
- processed.add(portDescription.portNumber());
- }
-
- updatePortMap(deviceId, ports);
-
- events.addAll(pruneOldPorts(device.entity(), ports, processed));
- }
- return FluentIterable.from(events).filter(notNull()).toList();
- }
-
- // Creates a new port based on the port description adds it to the map and
- // Returns corresponding event.
- //@GuardedBy("this")
- private DeviceEvent createPort(VersionedValue<Device> device, PortDescription portDescription,
- Map<PortNumber, VersionedValue<Port>> ports, Timestamp timestamp) {
- Port port = new DefaultPort(device.entity(), portDescription.portNumber(),
- portDescription.isEnabled());
- ports.put(port.number(), new VersionedValue<Port>(port, true, timestamp));
- updatePortMap(device.entity().id(), ports);
- return new DeviceEvent(PORT_ADDED, device.entity(), port);
- }
-
- // Checks if the specified port requires update and if so, it replaces the
- // existing entry in the map and returns corresponding event.
- //@GuardedBy("this")
- private DeviceEvent updatePort(Device device, Port port,
- PortDescription portDescription,
- Map<PortNumber, VersionedValue<Port>> ports,
- Timestamp timestamp) {
- if (port.isEnabled() != portDescription.isEnabled()) {
- VersionedValue<Port> updatedPort = new VersionedValue<Port>(
- new DefaultPort(device, portDescription.portNumber(),
- portDescription.isEnabled()),
- portDescription.isEnabled(),
- timestamp);
- ports.put(port.number(), updatedPort);
- updatePortMap(device.id(), ports);
- return new DeviceEvent(PORT_UPDATED, device, updatedPort.entity());
- }
- return null;
- }
-
- // Prunes the specified list of ports based on which ports are in the
- // processed list and returns list of corresponding events.
- //@GuardedBy("this")
- private List<DeviceEvent> pruneOldPorts(Device device,
- Map<PortNumber, VersionedValue<Port>> ports,
- Set<PortNumber> processed) {
- List<DeviceEvent> events = new ArrayList<>();
- Iterator<PortNumber> iterator = ports.keySet().iterator();
- while (iterator.hasNext()) {
- PortNumber portNumber = iterator.next();
- if (!processed.contains(portNumber)) {
- events.add(new DeviceEvent(PORT_REMOVED, device,
- ports.get(portNumber).entity()));
- iterator.remove();
- }
- }
- if (!events.isEmpty()) {
- updatePortMap(device.id(), ports);
- }
- return events;
- }
-
- // Gets the map of ports for the specified device; if one does not already
- // exist, it creates and registers a new one.
- // WARN: returned value is a copy, changes made to the Map
- // needs to be written back using updatePortMap
- //@GuardedBy("this")
- private Map<PortNumber, VersionedValue<Port>> getPortMap(DeviceId deviceId) {
- Map<PortNumber, VersionedValue<Port>> ports = devicePorts.get(deviceId);
- if (ports == null) {
- ports = new HashMap<>();
- // this probably is waste of time in most cases.
- updatePortMap(deviceId, ports);
- }
- return ports;
- }
-
- //@GuardedBy("this")
- private void updatePortMap(DeviceId deviceId, Map<PortNumber, VersionedValue<Port>> ports) {
- devicePorts.put(deviceId, ports);
- }
-
- @Override
- public DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId,
- PortDescription portDescription) {
- VersionedValue<Device> device = devices.get(deviceId);
- checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
- Map<PortNumber, VersionedValue<Port>> ports = getPortMap(deviceId);
- VersionedValue<Port> port = ports.get(portDescription.portNumber());
- Timestamp timestamp = clockService.getTimestamp(deviceId);
- return updatePort(device.entity(), port.entity(), portDescription, ports, timestamp);
- }
-
- @Override
- public List<Port> getPorts(DeviceId deviceId) {
- Map<PortNumber, VersionedValue<Port>> versionedPorts = devicePorts.get(deviceId);
- if (versionedPorts == null) {
- return Collections.emptyList();
- }
- List<Port> ports = new ArrayList<>();
- for (VersionedValue<Port> port : versionedPorts.values()) {
- ports.add(port.entity());
- }
- return ports;
- }
-
- @Override
- public Port getPort(DeviceId deviceId, PortNumber portNumber) {
- Map<PortNumber, VersionedValue<Port>> ports = devicePorts.get(deviceId);
- return ports == null ? null : ports.get(portNumber).entity();
- }
-
- @Override
- public boolean isAvailable(DeviceId deviceId) {
- return devices.get(deviceId).isUp();
- }
-
- @Override
- public DeviceEvent removeDevice(DeviceId deviceId) {
- VersionedValue<Device> previousDevice = devices.remove(deviceId);
- return previousDevice == null ? null :
- new DeviceEvent(DEVICE_REMOVED, previousDevice.entity(), null);
- }
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/package-info.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/package-info.java
index aa644db..c1f5aad 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/package-info.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/package-info.java
@@ -1,4 +1,4 @@
/**
* Implementation of device store using distributed distributed p2p synchronization protocol.
*/
-package org.onlab.onos.store.device.impl;
\ No newline at end of file
+package org.onlab.onos.store.device.impl;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index 5a5592a..d49e00b 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -1,6 +1,5 @@
package org.onlab.onos.store.flow.impl;
-import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_ADDED;
import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
import static org.slf4j.LoggerFactory.getLogger;
@@ -13,9 +12,10 @@
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.ApplicationId;
import org.onlab.onos.net.DeviceId;
-import org.onlab.onos.net.flow.DefaultFlowRule;
+import org.onlab.onos.net.flow.DefaultFlowEntry;
+import org.onlab.onos.net.flow.FlowEntry;
+import org.onlab.onos.net.flow.FlowEntry.FlowEntryState;
import org.onlab.onos.net.flow.FlowRule;
-import org.onlab.onos.net.flow.FlowRule.FlowRuleState;
import org.onlab.onos.net.flow.FlowRuleEvent;
import org.onlab.onos.net.flow.FlowRuleEvent.Type;
import org.onlab.onos.net.flow.FlowRuleStore;
@@ -30,18 +30,18 @@
/**
* Manages inventory of flow rules using trivial in-memory implementation.
*/
-//FIXME: I LIE I AM NOT DISTRIBUTED
+//FIXME I LIE. I AIN'T DISTRIBUTED
@Component(immediate = true)
@Service
public class DistributedFlowRuleStore
-extends AbstractStore<FlowRuleEvent, FlowRuleStoreDelegate>
-implements FlowRuleStore {
+ extends AbstractStore<FlowRuleEvent, FlowRuleStoreDelegate>
+ implements FlowRuleStore {
private final Logger log = getLogger(getClass());
// store entries as a pile of rules, no info about device tables
- private final Multimap<DeviceId, FlowRule> flowEntries =
- ArrayListMultimap.<DeviceId, FlowRule>create();
+ private final Multimap<DeviceId, FlowEntry> flowEntries =
+ ArrayListMultimap.<DeviceId, FlowEntry>create();
private final Multimap<ApplicationId, FlowRule> flowEntriesById =
ArrayListMultimap.<ApplicationId, FlowRule>create();
@@ -58,8 +58,13 @@
@Override
- public synchronized FlowRule getFlowRule(FlowRule rule) {
- for (FlowRule f : flowEntries.get(rule.deviceId())) {
+ public int getFlowRuleCount() {
+ return flowEntries.size();
+ }
+
+ @Override
+ public synchronized FlowEntry getFlowEntry(FlowRule rule) {
+ for (FlowEntry f : flowEntries.get(rule.deviceId())) {
if (f.equals(rule)) {
return f;
}
@@ -68,8 +73,8 @@
}
@Override
- public synchronized Iterable<FlowRule> getFlowEntries(DeviceId deviceId) {
- Collection<FlowRule> rules = flowEntries.get(deviceId);
+ public synchronized Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
+ Collection<FlowEntry> rules = flowEntries.get(deviceId);
if (rules == null) {
return Collections.emptyList();
}
@@ -77,7 +82,7 @@
}
@Override
- public synchronized Iterable<FlowRule> getFlowEntriesByAppId(ApplicationId appId) {
+ public synchronized Iterable<FlowRule> getFlowRulesByAppId(ApplicationId appId) {
Collection<FlowRule> rules = flowEntriesById.get(appId);
if (rules == null) {
return Collections.emptyList();
@@ -87,7 +92,7 @@
@Override
public synchronized void storeFlowRule(FlowRule rule) {
- FlowRule f = new DefaultFlowRule(rule, FlowRuleState.PENDING_ADD);
+ FlowEntry f = new DefaultFlowEntry(rule);
DeviceId did = f.deviceId();
if (!flowEntries.containsEntry(did, f)) {
flowEntries.put(did, f);
@@ -97,57 +102,41 @@
@Override
public synchronized void deleteFlowRule(FlowRule rule) {
- FlowRule f = new DefaultFlowRule(rule, FlowRuleState.PENDING_REMOVE);
- DeviceId did = f.deviceId();
-
- /*
- * find the rule and mark it for deletion.
- * Ultimately a flow removed will come remove it.
- */
-
- if (flowEntries.containsEntry(did, f)) {
- //synchronized (flowEntries) {
- flowEntries.remove(did, f);
- flowEntries.put(did, f);
- flowEntriesById.remove(rule.appId(), rule);
- //}
+ FlowEntry entry = getFlowEntry(rule);
+ if (entry == null) {
+ return;
}
+ entry.setState(FlowEntryState.PENDING_REMOVE);
}
@Override
- public synchronized FlowRuleEvent addOrUpdateFlowRule(FlowRule rule) {
+ public synchronized FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
DeviceId did = rule.deviceId();
// check if this new rule is an update to an existing entry
- if (flowEntries.containsEntry(did, rule)) {
- //synchronized (flowEntries) {
- // Multimaps support duplicates so we have to remove our rule
- // and replace it with the current version.
- flowEntries.remove(did, rule);
- flowEntries.put(did, rule);
- //}
+ FlowEntry stored = getFlowEntry(rule);
+ if (stored != null) {
+ stored.setBytes(rule.bytes());
+ stored.setLife(rule.life());
+ stored.setPackets(rule.packets());
+ if (stored.state() == FlowEntryState.PENDING_ADD) {
+ stored.setState(FlowEntryState.ADDED);
+ return new FlowRuleEvent(Type.RULE_ADDED, rule);
+ }
return new FlowRuleEvent(Type.RULE_UPDATED, rule);
}
flowEntries.put(did, rule);
- return new FlowRuleEvent(RULE_ADDED, rule);
+ return null;
}
@Override
- public synchronized FlowRuleEvent removeFlowRule(FlowRule rule) {
- //synchronized (this) {
+ public synchronized FlowRuleEvent removeFlowRule(FlowEntry rule) {
+ // This is where one could mark a rule as removed and still keep it in the store.
if (flowEntries.remove(rule.deviceId(), rule)) {
return new FlowRuleEvent(RULE_REMOVED, rule);
} else {
return null;
}
- //}
}
-
-
-
-
-
-
-
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/OnosDistributedLinkStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/OnosDistributedLinkStore.java
index 5df25b4..a59b151 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/OnosDistributedLinkStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/OnosDistributedLinkStore.java
@@ -42,6 +42,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
+//TODO: Add support for multiple provider and annotations
/**
* Manages inventory of infrastructure links using a protocol that takes into consideration
* the order in which events occur.
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/package-info.java b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/package-info.java
index 127dc84..c675f84 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/package-info.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/link/impl/package-info.java
@@ -1,4 +1,4 @@
/**
* Implementation of link store using distributed p2p synchronization protocol.
*/
-package org.onlab.onos.store.link.impl;
\ No newline at end of file
+package org.onlab.onos.store.link.impl;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java
new file mode 100644
index 0000000..c0cefd6
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java
@@ -0,0 +1,38 @@
+package org.onlab.onos.store.serializers;
+
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+public final class ClusterMessageSerializer extends Serializer<ClusterMessage> {
+
+ /**
+ * Creates a serializer for {@link ClusterMessage}.
+ */
+ public ClusterMessageSerializer() {
+ // does not accept null
+ super(false);
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output, ClusterMessage message) {
+ kryo.writeClassAndObject(output, message.sender());
+ kryo.writeClassAndObject(output, message.subject());
+ output.writeInt(message.payload().length);
+ output.writeBytes(message.payload());
+ }
+
+ @Override
+ public ClusterMessage read(Kryo kryo, Input input,
+ Class<ClusterMessage> type) {
+ NodeId sender = (NodeId) kryo.readClassAndObject(input);
+ MessageSubject subject = (MessageSubject) kryo.readClassAndObject(input);
+ int payloadSize = input.readInt();
+ byte[] payload = input.readBytes(payloadSize);
+ return new ClusterMessage(sender, subject, payload);
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/serializers/MastershipBasedTimestampSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/MastershipBasedTimestampSerializer.java
new file mode 100644
index 0000000..516915e
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/MastershipBasedTimestampSerializer.java
@@ -0,0 +1,36 @@
+package org.onlab.onos.store.serializers;
+
+import org.onlab.onos.store.common.impl.MastershipBasedTimestamp;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+// To be used if Timestamp ever needs to cross bundle boundary.
+/**
+ * Kryo Serializer for {@link MastershipBasedTimestamp}.
+ */
+public class MastershipBasedTimestampSerializer extends Serializer<MastershipBasedTimestamp> {
+
+ /**
+ * Creates a serializer for {@link MastershipBasedTimestamp}.
+ */
+ public MastershipBasedTimestampSerializer() {
+ // non-null, immutable
+ super(false, true);
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output, MastershipBasedTimestamp object) {
+ output.writeInt(object.termNumber());
+ output.writeInt(object.sequenceNumber());
+ }
+
+ @Override
+ public MastershipBasedTimestamp read(Kryo kryo, Input input, Class<MastershipBasedTimestamp> type) {
+ final int term = input.readInt();
+ final int sequence = input.readInt();
+ return new MastershipBasedTimestamp(term, sequence);
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/serializers/MessageSubjectSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/MessageSubjectSerializer.java
new file mode 100644
index 0000000..bb6b292
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/MessageSubjectSerializer.java
@@ -0,0 +1,31 @@
+package org.onlab.onos.store.serializers;
+
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+public final class MessageSubjectSerializer extends Serializer<MessageSubject> {
+
+ /**
+ * Creates a serializer for {@link MessageSubject}.
+ */
+ public MessageSubjectSerializer() {
+ // non-null, immutable
+ super(false, true);
+ }
+
+
+ @Override
+ public void write(Kryo kryo, Output output, MessageSubject object) {
+ output.writeString(object.value());
+ }
+
+ @Override
+ public MessageSubject read(Kryo kryo, Input input,
+ Class<MessageSubject> type) {
+ return new MessageSubject(input.readString());
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/serializers/OnosTimestampSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/OnosTimestampSerializer.java
deleted file mode 100644
index 192e035..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/serializers/OnosTimestampSerializer.java
+++ /dev/null
@@ -1,35 +0,0 @@
-package org.onlab.onos.store.serializers;
-
-import org.onlab.onos.store.impl.OnosTimestamp;
-
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-
-/**
- * Kryo Serializer for {@link OnosTimestamp}.
- */
-public class OnosTimestampSerializer extends Serializer<OnosTimestamp> {
-
- /**
- * Default constructor.
- */
- public OnosTimestampSerializer() {
- // non-null, immutable
- super(false, true);
- }
-
- @Override
- public void write(Kryo kryo, Output output, OnosTimestamp object) {
- output.writeInt(object.termNumber());
- output.writeInt(object.sequenceNumber());
- }
-
- @Override
- public OnosTimestamp read(Kryo kryo, Input input, Class<OnosTimestamp> type) {
- final int term = input.readInt();
- final int sequence = input.readInt();
- return new OnosTimestamp(term, sequence);
- }
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/topology/impl/DistributedTopologyStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/topology/impl/DistributedTopologyStore.java
index 567861e..9ae9d7b 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/topology/impl/DistributedTopologyStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/topology/impl/DistributedTopologyStore.java
@@ -125,7 +125,8 @@
// Promote the new topology to current and return a ready-to-send event.
synchronized (this) {
current = newTopology;
- return new TopologyEvent(TopologyEvent.Type.TOPOLOGY_CHANGED, current);
+ return new TopologyEvent(TopologyEvent.Type.TOPOLOGY_CHANGED,
+ current, reasons);
}
}
diff --git a/core/store/dist/src/test/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManagerTest.java b/core/store/dist/src/test/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManagerTest.java
index bba12f2..e63fcaa 100644
--- a/core/store/dist/src/test/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManagerTest.java
+++ b/core/store/dist/src/test/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManagerTest.java
@@ -7,6 +7,7 @@
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.messaging.impl.ClusterCommunicationManager;
+import org.onlab.onos.store.cluster.messaging.impl.MessageSerializer;
import org.onlab.netty.NettyMessagingService;
import org.onlab.packet.IpPrefix;
diff --git a/core/store/dist/src/test/java/org/onlab/onos/store/common/impl/MastershipBasedTimestampTest.java b/core/store/dist/src/test/java/org/onlab/onos/store/common/impl/MastershipBasedTimestampTest.java
new file mode 100644
index 0000000..ea63ef8
--- /dev/null
+++ b/core/store/dist/src/test/java/org/onlab/onos/store/common/impl/MastershipBasedTimestampTest.java
@@ -0,0 +1,95 @@
+package org.onlab.onos.store.common.impl;
+
+import static org.junit.Assert.*;
+
+import java.nio.ByteBuffer;
+
+import org.junit.Test;
+import org.onlab.onos.store.Timestamp;
+import org.onlab.onos.store.serializers.MastershipBasedTimestampSerializer;
+import org.onlab.util.KryoPool;
+
+import com.google.common.testing.EqualsTester;
+
+/**
+ * Test of {@link MastershipBasedTimestamp}.
+ */
+public class MastershipBasedTimestampTest {
+
+ private static final Timestamp TS_1_1 = new MastershipBasedTimestamp(1, 1);
+ private static final Timestamp TS_1_2 = new MastershipBasedTimestamp(1, 2);
+ private static final Timestamp TS_2_1 = new MastershipBasedTimestamp(2, 1);
+ private static final Timestamp TS_2_2 = new MastershipBasedTimestamp(2, 2);
+
+ @Test
+ public final void testBasic() {
+ final int termNumber = 5;
+ final int sequenceNumber = 6;
+ MastershipBasedTimestamp ts = new MastershipBasedTimestamp(termNumber,
+ sequenceNumber);
+
+ assertEquals(termNumber, ts.termNumber());
+ assertEquals(sequenceNumber, ts.sequenceNumber());
+ }
+
+ @Test
+ public final void testCompareTo() {
+ assertTrue(TS_1_1.compareTo(TS_1_1) == 0);
+ assertTrue(TS_1_1.compareTo(new MastershipBasedTimestamp(1, 1)) == 0);
+
+ assertTrue(TS_1_1.compareTo(TS_1_2) < 0);
+ assertTrue(TS_1_2.compareTo(TS_1_1) > 0);
+
+ assertTrue(TS_1_2.compareTo(TS_2_1) < 0);
+ assertTrue(TS_1_2.compareTo(TS_2_2) < 0);
+ assertTrue(TS_2_1.compareTo(TS_1_1) > 0);
+ assertTrue(TS_2_2.compareTo(TS_1_1) > 0);
+ }
+
+ @Test
+ public final void testEqualsObject() {
+ new EqualsTester()
+ .addEqualityGroup(new MastershipBasedTimestamp(1, 1),
+ new MastershipBasedTimestamp(1, 1), TS_1_1)
+ .addEqualityGroup(new MastershipBasedTimestamp(1, 2),
+ new MastershipBasedTimestamp(1, 2), TS_1_2)
+ .addEqualityGroup(new MastershipBasedTimestamp(2, 1),
+ new MastershipBasedTimestamp(2, 1), TS_2_1)
+ .addEqualityGroup(new MastershipBasedTimestamp(2, 2),
+ new MastershipBasedTimestamp(2, 2), TS_2_2)
+ .testEquals();
+ }
+
+ @Test
+ public final void testKryoSerializable() {
+ final ByteBuffer buffer = ByteBuffer.allocate(1 * 1024 * 1024);
+ final KryoPool kryos = KryoPool.newBuilder()
+ .register(MastershipBasedTimestamp.class)
+ .build();
+
+ kryos.serialize(TS_2_1, buffer);
+ buffer.flip();
+ Timestamp copy = kryos.deserialize(buffer);
+
+ new EqualsTester()
+ .addEqualityGroup(TS_2_1, copy)
+ .testEquals();
+ }
+
+ @Test
+ public final void testKryoSerializableWithHandcraftedSerializer() {
+ final ByteBuffer buffer = ByteBuffer.allocate(1 * 1024 * 1024);
+ final KryoPool kryos = KryoPool.newBuilder()
+ .register(MastershipBasedTimestamp.class, new MastershipBasedTimestampSerializer())
+ .build();
+
+ kryos.serialize(TS_1_2, buffer);
+ buffer.flip();
+ Timestamp copy = kryos.deserialize(buffer);
+
+ new EqualsTester()
+ .addEqualityGroup(TS_1_2, copy)
+ .testEquals();
+ }
+
+}
diff --git a/core/store/dist/src/test/java/org/onlab/onos/store/common/impl/TimestampedTest.java b/core/store/dist/src/test/java/org/onlab/onos/store/common/impl/TimestampedTest.java
new file mode 100644
index 0000000..4b44d40
--- /dev/null
+++ b/core/store/dist/src/test/java/org/onlab/onos/store/common/impl/TimestampedTest.java
@@ -0,0 +1,94 @@
+package org.onlab.onos.store.common.impl;
+
+import static org.junit.Assert.*;
+
+import java.nio.ByteBuffer;
+
+import org.junit.Test;
+import org.onlab.onos.store.Timestamp;
+import org.onlab.util.KryoPool;
+
+import com.google.common.testing.EqualsTester;
+
+/**
+ * Test of {@link Timestamped}.
+ */
+public class TimestampedTest {
+
+ private static final Timestamp TS_1_1 = new MastershipBasedTimestamp(1, 1);
+ private static final Timestamp TS_1_2 = new MastershipBasedTimestamp(1, 2);
+ private static final Timestamp TS_2_1 = new MastershipBasedTimestamp(2, 1);
+
+ @Test
+ public final void testHashCode() {
+ Timestamped<String> a = new Timestamped<>("a", TS_1_1);
+ Timestamped<String> b = new Timestamped<>("b", TS_1_1);
+ assertTrue("value does not impact hashCode",
+ a.hashCode() == b.hashCode());
+ }
+
+ @Test
+ public final void testEquals() {
+ Timestamped<String> a = new Timestamped<>("a", TS_1_1);
+ Timestamped<String> b = new Timestamped<>("b", TS_1_1);
+ assertTrue("value does not impact equality",
+ a.equals(b));
+
+ new EqualsTester()
+ .addEqualityGroup(new Timestamped<>("a", TS_1_1),
+ new Timestamped<>("b", TS_1_1),
+ new Timestamped<>("c", TS_1_1))
+ .addEqualityGroup(new Timestamped<>("a", TS_1_2),
+ new Timestamped<>("b", TS_1_2),
+ new Timestamped<>("c", TS_1_2))
+ .addEqualityGroup(new Timestamped<>("a", TS_2_1),
+ new Timestamped<>("b", TS_2_1),
+ new Timestamped<>("c", TS_2_1))
+ .testEquals();
+
+ }
+
+ @Test
+ public final void testValue() {
+ final Integer n = Integer.valueOf(42);
+ Timestamped<Integer> tsv = new Timestamped<>(n, TS_1_1);
+ assertSame(n, tsv.value());
+
+ }
+
+ @Test(expected = NullPointerException.class)
+ public final void testValueNonNull() {
+ new Timestamped<>(null, TS_1_1);
+ }
+
+ @Test(expected = NullPointerException.class)
+ public final void testTimestampNonNull() {
+ new Timestamped<>("Foo", null);
+ }
+
+ @Test
+ public final void testIsNewer() {
+ Timestamped<String> a = new Timestamped<>("a", TS_1_2);
+ Timestamped<String> b = new Timestamped<>("b", TS_1_1);
+ assertTrue(a.isNewer(b));
+ assertFalse(b.isNewer(a));
+ }
+
+ @Test
+ public final void testKryoSerializable() {
+ final ByteBuffer buffer = ByteBuffer.allocate(1 * 1024 * 1024);
+ final KryoPool kryos = KryoPool.newBuilder()
+ .register(Timestamped.class,
+ MastershipBasedTimestamp.class)
+ .build();
+
+ Timestamped<String> original = new Timestamped<>("foobar", TS_1_1);
+ kryos.serialize(original, buffer);
+ buffer.flip();
+ Timestamped<String> copy = kryos.deserialize(buffer);
+
+ new EqualsTester()
+ .addEqualityGroup(original, copy)
+ .testEquals();
+ }
+}
diff --git a/core/store/dist/src/test/java/org/onlab/onos/store/device/impl/GossipDeviceStoreTest.java b/core/store/dist/src/test/java/org/onlab/onos/store/device/impl/GossipDeviceStoreTest.java
new file mode 100644
index 0000000..fa42a6b
--- /dev/null
+++ b/core/store/dist/src/test/java/org/onlab/onos/store/device/impl/GossipDeviceStoreTest.java
@@ -0,0 +1,619 @@
+package org.onlab.onos.store.device.impl;
+
+import static org.junit.Assert.*;
+import static org.onlab.onos.net.Device.Type.SWITCH;
+import static org.onlab.onos.net.DeviceId.deviceId;
+import static org.onlab.onos.net.device.DeviceEvent.Type.*;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.onlab.onos.cluster.ClusterEventListener;
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.cluster.ControllerNode.State;
+import org.onlab.onos.cluster.DefaultControllerNode;
+import org.onlab.onos.cluster.MastershipTerm;
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.net.Annotations;
+import org.onlab.onos.net.DefaultAnnotations;
+import org.onlab.onos.net.Device;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.Port;
+import org.onlab.onos.net.PortNumber;
+import org.onlab.onos.net.SparseAnnotations;
+import org.onlab.onos.net.device.DefaultDeviceDescription;
+import org.onlab.onos.net.device.DefaultPortDescription;
+import org.onlab.onos.net.device.DeviceDescription;
+import org.onlab.onos.net.device.DeviceEvent;
+import org.onlab.onos.net.device.DeviceStore;
+import org.onlab.onos.net.device.DeviceStoreDelegate;
+import org.onlab.onos.net.device.PortDescription;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.ClockService;
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+import org.onlab.packet.IpPrefix;
+
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+
+
+// TODO add tests for remote replication
+/**
+ * Test of the gossip based distributed DeviceStore implementation.
+ */
+public class GossipDeviceStoreTest {
+
+ private static final ProviderId PID = new ProviderId("of", "foo");
+ private static final ProviderId PIDA = new ProviderId("of", "bar", true);
+ private static final DeviceId DID1 = deviceId("of:foo");
+ private static final DeviceId DID2 = deviceId("of:bar");
+ private static final String MFR = "whitebox";
+ private static final String HW = "1.1.x";
+ private static final String SW1 = "3.8.1";
+ private static final String SW2 = "3.9.5";
+ private static final String SN = "43311-12345";
+
+ private static final PortNumber P1 = PortNumber.portNumber(1);
+ private static final PortNumber P2 = PortNumber.portNumber(2);
+ private static final PortNumber P3 = PortNumber.portNumber(3);
+
+ private static final SparseAnnotations A1 = DefaultAnnotations.builder()
+ .set("A1", "a1")
+ .set("B1", "b1")
+ .build();
+ private static final SparseAnnotations A1_2 = DefaultAnnotations.builder()
+ .remove("A1")
+ .set("B3", "b3")
+ .build();
+ private static final SparseAnnotations A2 = DefaultAnnotations.builder()
+ .set("A2", "a2")
+ .set("B2", "b2")
+ .build();
+ private static final SparseAnnotations A2_2 = DefaultAnnotations.builder()
+ .remove("A2")
+ .set("B4", "b4")
+ .build();
+
+ private static final NodeId MYSELF = new NodeId("myself");
+
+ private GossipDeviceStore gossipDeviceStore;
+ private DeviceStore deviceStore;
+
+ private DeviceClockManager deviceClockManager;
+ private ClockService clockService;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ }
+
+
+ @Before
+ public void setUp() throws Exception {
+ deviceClockManager = new DeviceClockManager();
+ deviceClockManager.activate();
+ clockService = deviceClockManager;
+
+ deviceClockManager.setMastershipTerm(DID1, MastershipTerm.of(MYSELF, 1));
+ deviceClockManager.setMastershipTerm(DID2, MastershipTerm.of(MYSELF, 2));
+
+ ClusterCommunicationService clusterCommunicator = new TestClusterCommunicationService();
+ ClusterService clusterService = new TestClusterService();
+
+ gossipDeviceStore = new TestGossipDeviceStore(clockService, clusterService, clusterCommunicator);
+ gossipDeviceStore.activate();
+ deviceStore = gossipDeviceStore;
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ gossipDeviceStore.deactivate();
+ deviceClockManager.deactivate();
+ }
+
+ private void putDevice(DeviceId deviceId, String swVersion,
+ SparseAnnotations... annotations) {
+ DeviceDescription description =
+ new DefaultDeviceDescription(deviceId.uri(), SWITCH, MFR,
+ HW, swVersion, SN, annotations);
+ deviceStore.createOrUpdateDevice(PID, deviceId, description);
+ }
+
+ private void putDeviceAncillary(DeviceId deviceId, String swVersion,
+ SparseAnnotations... annotations) {
+ DeviceDescription description =
+ new DefaultDeviceDescription(deviceId.uri(), SWITCH, MFR,
+ HW, swVersion, SN, annotations);
+ deviceStore.createOrUpdateDevice(PIDA, deviceId, description);
+ }
+
+ private static void assertDevice(DeviceId id, String swVersion, Device device) {
+ assertNotNull(device);
+ assertEquals(id, device.id());
+ assertEquals(MFR, device.manufacturer());
+ assertEquals(HW, device.hwVersion());
+ assertEquals(swVersion, device.swVersion());
+ assertEquals(SN, device.serialNumber());
+ }
+
+ /**
+ * Verifies that Annotations created by merging {@code annotations} is
+ * equal to actual Annotations.
+ *
+ * @param actual Annotations to check
+ * @param annotations
+ */
+ private static void assertAnnotationsEquals(Annotations actual, SparseAnnotations... annotations) {
+ DefaultAnnotations expected = DefaultAnnotations.builder().build();
+ for (SparseAnnotations a : annotations) {
+ expected = DefaultAnnotations.merge(expected, a);
+ }
+ assertEquals(expected.keys(), actual.keys());
+ for (String key : expected.keys()) {
+ assertEquals(expected.value(key), actual.value(key));
+ }
+ }
+
+ @Test
+ public final void testGetDeviceCount() {
+ assertEquals("initialy empty", 0, deviceStore.getDeviceCount());
+
+ putDevice(DID1, SW1);
+ putDevice(DID2, SW2);
+ putDevice(DID1, SW1);
+
+ assertEquals("expect 2 uniq devices", 2, deviceStore.getDeviceCount());
+ }
+
+ @Test
+ public final void testGetDevices() {
+ assertEquals("initialy empty", 0, Iterables.size(deviceStore.getDevices()));
+
+ putDevice(DID1, SW1);
+ putDevice(DID2, SW2);
+ putDevice(DID1, SW1);
+
+ assertEquals("expect 2 uniq devices",
+ 2, Iterables.size(deviceStore.getDevices()));
+
+ Map<DeviceId, Device> devices = new HashMap<>();
+ for (Device device : deviceStore.getDevices()) {
+ devices.put(device.id(), device);
+ }
+
+ assertDevice(DID1, SW1, devices.get(DID1));
+ assertDevice(DID2, SW2, devices.get(DID2));
+
+ // add case for new node?
+ }
+
+ @Test
+ public final void testGetDevice() {
+
+ putDevice(DID1, SW1);
+
+ assertDevice(DID1, SW1, deviceStore.getDevice(DID1));
+ assertNull("DID2 shouldn't be there", deviceStore.getDevice(DID2));
+ }
+
+ @Test
+ public final void testCreateOrUpdateDevice() {
+ DeviceDescription description =
+ new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
+ HW, SW1, SN);
+ DeviceEvent event = deviceStore.createOrUpdateDevice(PID, DID1, description);
+ assertEquals(DEVICE_ADDED, event.type());
+ assertDevice(DID1, SW1, event.subject());
+
+ DeviceDescription description2 =
+ new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
+ HW, SW2, SN);
+ DeviceEvent event2 = deviceStore.createOrUpdateDevice(PID, DID1, description2);
+ assertEquals(DEVICE_UPDATED, event2.type());
+ assertDevice(DID1, SW2, event2.subject());
+
+ assertNull("No change expected", deviceStore.createOrUpdateDevice(PID, DID1, description2));
+ }
+
+ @Test
+ public final void testCreateOrUpdateDeviceAncillary() {
+ DeviceDescription description =
+ new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
+ HW, SW1, SN, A2);
+ DeviceEvent event = deviceStore.createOrUpdateDevice(PIDA, DID1, description);
+ assertEquals(DEVICE_ADDED, event.type());
+ assertDevice(DID1, SW1, event.subject());
+ assertEquals(PIDA, event.subject().providerId());
+ assertAnnotationsEquals(event.subject().annotations(), A2);
+ assertFalse("Ancillary will not bring device up", deviceStore.isAvailable(DID1));
+
+ DeviceDescription description2 =
+ new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
+ HW, SW2, SN, A1);
+ DeviceEvent event2 = deviceStore.createOrUpdateDevice(PID, DID1, description2);
+ assertEquals(DEVICE_UPDATED, event2.type());
+ assertDevice(DID1, SW2, event2.subject());
+ assertEquals(PID, event2.subject().providerId());
+ assertAnnotationsEquals(event2.subject().annotations(), A1, A2);
+ assertTrue(deviceStore.isAvailable(DID1));
+
+ assertNull("No change expected", deviceStore.createOrUpdateDevice(PID, DID1, description2));
+
+ // For now, Ancillary is ignored once primary appears
+ assertNull("No change expected", deviceStore.createOrUpdateDevice(PIDA, DID1, description));
+
+ // But, Ancillary annotations will be in effect
+ DeviceDescription description3 =
+ new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
+ HW, SW1, SN, A2_2);
+ DeviceEvent event3 = deviceStore.createOrUpdateDevice(PIDA, DID1, description3);
+ assertEquals(DEVICE_UPDATED, event3.type());
+ // basic information will be the one from Primary
+ assertDevice(DID1, SW2, event3.subject());
+ assertEquals(PID, event3.subject().providerId());
+ // but annotation from Ancillary will be merged
+ assertAnnotationsEquals(event3.subject().annotations(), A1, A2, A2_2);
+ assertTrue(deviceStore.isAvailable(DID1));
+ }
+
+
+ @Test
+ public final void testMarkOffline() {
+
+ putDevice(DID1, SW1);
+ assertTrue(deviceStore.isAvailable(DID1));
+
+ DeviceEvent event = deviceStore.markOffline(DID1);
+ assertEquals(DEVICE_AVAILABILITY_CHANGED, event.type());
+ assertDevice(DID1, SW1, event.subject());
+ assertFalse(deviceStore.isAvailable(DID1));
+
+ DeviceEvent event2 = deviceStore.markOffline(DID1);
+ assertNull("No change, no event", event2);
+}
+
+ @Test
+ public final void testUpdatePorts() {
+ putDevice(DID1, SW1);
+ List<PortDescription> pds = Arrays.<PortDescription>asList(
+ new DefaultPortDescription(P1, true),
+ new DefaultPortDescription(P2, true)
+ );
+
+ List<DeviceEvent> events = deviceStore.updatePorts(PID, DID1, pds);
+
+ Set<PortNumber> expectedPorts = Sets.newHashSet(P1, P2);
+ for (DeviceEvent event : events) {
+ assertEquals(PORT_ADDED, event.type());
+ assertDevice(DID1, SW1, event.subject());
+ assertTrue("PortNumber is one of expected",
+ expectedPorts.remove(event.port().number()));
+ assertTrue("Port is enabled", event.port().isEnabled());
+ }
+ assertTrue("Event for all expectedport appeared", expectedPorts.isEmpty());
+
+
+ List<PortDescription> pds2 = Arrays.<PortDescription>asList(
+ new DefaultPortDescription(P1, false),
+ new DefaultPortDescription(P2, true),
+ new DefaultPortDescription(P3, true)
+ );
+
+ events = deviceStore.updatePorts(PID, DID1, pds2);
+ assertFalse("event should be triggered", events.isEmpty());
+ for (DeviceEvent event : events) {
+ PortNumber num = event.port().number();
+ if (P1.equals(num)) {
+ assertEquals(PORT_UPDATED, event.type());
+ assertDevice(DID1, SW1, event.subject());
+ assertFalse("Port is disabled", event.port().isEnabled());
+ } else if (P2.equals(num)) {
+ fail("P2 event not expected.");
+ } else if (P3.equals(num)) {
+ assertEquals(PORT_ADDED, event.type());
+ assertDevice(DID1, SW1, event.subject());
+ assertTrue("Port is enabled", event.port().isEnabled());
+ } else {
+ fail("Unknown port number encountered: " + num);
+ }
+ }
+
+ List<PortDescription> pds3 = Arrays.<PortDescription>asList(
+ new DefaultPortDescription(P1, false),
+ new DefaultPortDescription(P2, true)
+ );
+ events = deviceStore.updatePorts(PID, DID1, pds3);
+ assertFalse("event should be triggered", events.isEmpty());
+ for (DeviceEvent event : events) {
+ PortNumber num = event.port().number();
+ if (P1.equals(num)) {
+ fail("P1 event not expected.");
+ } else if (P2.equals(num)) {
+ fail("P2 event not expected.");
+ } else if (P3.equals(num)) {
+ assertEquals(PORT_REMOVED, event.type());
+ assertDevice(DID1, SW1, event.subject());
+ assertTrue("Port was enabled", event.port().isEnabled());
+ } else {
+ fail("Unknown port number encountered: " + num);
+ }
+ }
+
+ }
+
+ @Test
+ public final void testUpdatePortStatus() {
+ putDevice(DID1, SW1);
+ List<PortDescription> pds = Arrays.<PortDescription>asList(
+ new DefaultPortDescription(P1, true)
+ );
+ deviceStore.updatePorts(PID, DID1, pds);
+
+ DeviceEvent event = deviceStore.updatePortStatus(PID, DID1,
+ new DefaultPortDescription(P1, false));
+ assertEquals(PORT_UPDATED, event.type());
+ assertDevice(DID1, SW1, event.subject());
+ assertEquals(P1, event.port().number());
+ assertFalse("Port is disabled", event.port().isEnabled());
+
+ }
+ @Test
+ public final void testUpdatePortStatusAncillary() {
+ putDeviceAncillary(DID1, SW1);
+ putDevice(DID1, SW1);
+ List<PortDescription> pds = Arrays.<PortDescription>asList(
+ new DefaultPortDescription(P1, true, A1)
+ );
+ deviceStore.updatePorts(PID, DID1, pds);
+
+ DeviceEvent event = deviceStore.updatePortStatus(PID, DID1,
+ new DefaultPortDescription(P1, false, A1_2));
+ assertEquals(PORT_UPDATED, event.type());
+ assertDevice(DID1, SW1, event.subject());
+ assertEquals(P1, event.port().number());
+ assertAnnotationsEquals(event.port().annotations(), A1, A1_2);
+ assertFalse("Port is disabled", event.port().isEnabled());
+
+ DeviceEvent event2 = deviceStore.updatePortStatus(PIDA, DID1,
+ new DefaultPortDescription(P1, true));
+ assertNull("Ancillary is ignored if primary exists", event2);
+
+ // but, Ancillary annotation update will be notified
+ DeviceEvent event3 = deviceStore.updatePortStatus(PIDA, DID1,
+ new DefaultPortDescription(P1, true, A2));
+ assertEquals(PORT_UPDATED, event3.type());
+ assertDevice(DID1, SW1, event3.subject());
+ assertEquals(P1, event3.port().number());
+ assertAnnotationsEquals(event3.port().annotations(), A1, A1_2, A2);
+ assertFalse("Port is disabled", event3.port().isEnabled());
+
+ // port only reported from Ancillary will be notified as down
+ DeviceEvent event4 = deviceStore.updatePortStatus(PIDA, DID1,
+ new DefaultPortDescription(P2, true));
+ assertEquals(PORT_ADDED, event4.type());
+ assertDevice(DID1, SW1, event4.subject());
+ assertEquals(P2, event4.port().number());
+ assertAnnotationsEquals(event4.port().annotations());
+ assertFalse("Port is disabled if not given from primary provider",
+ event4.port().isEnabled());
+ }
+
+ @Test
+ public final void testGetPorts() {
+ putDevice(DID1, SW1);
+ putDevice(DID2, SW1);
+ List<PortDescription> pds = Arrays.<PortDescription>asList(
+ new DefaultPortDescription(P1, true),
+ new DefaultPortDescription(P2, true)
+ );
+ deviceStore.updatePorts(PID, DID1, pds);
+
+ Set<PortNumber> expectedPorts = Sets.newHashSet(P1, P2);
+ List<Port> ports = deviceStore.getPorts(DID1);
+ for (Port port : ports) {
+ assertTrue("Port is enabled", port.isEnabled());
+ assertTrue("PortNumber is one of expected",
+ expectedPorts.remove(port.number()));
+ }
+ assertTrue("Event for all expectedport appeared", expectedPorts.isEmpty());
+
+
+ assertTrue("DID2 has no ports", deviceStore.getPorts(DID2).isEmpty());
+ }
+
+ @Test
+ public final void testGetPort() {
+ putDevice(DID1, SW1);
+ putDevice(DID2, SW1);
+ List<PortDescription> pds = Arrays.<PortDescription>asList(
+ new DefaultPortDescription(P1, true),
+ new DefaultPortDescription(P2, false)
+ );
+ deviceStore.updatePorts(PID, DID1, pds);
+
+ Port port1 = deviceStore.getPort(DID1, P1);
+ assertEquals(P1, port1.number());
+ assertTrue("Port is enabled", port1.isEnabled());
+
+ Port port2 = deviceStore.getPort(DID1, P2);
+ assertEquals(P2, port2.number());
+ assertFalse("Port is disabled", port2.isEnabled());
+
+ Port port3 = deviceStore.getPort(DID1, P3);
+ assertNull("P3 not expected", port3);
+ }
+
+ @Test
+ public final void testRemoveDevice() {
+ putDevice(DID1, SW1, A1);
+ List<PortDescription> pds = Arrays.<PortDescription>asList(
+ new DefaultPortDescription(P1, true, A2)
+ );
+ deviceStore.updatePorts(PID, DID1, pds);
+ putDevice(DID2, SW1);
+
+ assertEquals(2, deviceStore.getDeviceCount());
+ assertEquals(1, deviceStore.getPorts(DID1).size());
+ assertAnnotationsEquals(deviceStore.getDevice(DID1).annotations(), A1);
+ assertAnnotationsEquals(deviceStore.getPort(DID1, P1).annotations(), A2);
+
+ DeviceEvent event = deviceStore.removeDevice(DID1);
+ assertEquals(DEVICE_REMOVED, event.type());
+ assertDevice(DID1, SW1, event.subject());
+
+ assertEquals(1, deviceStore.getDeviceCount());
+ assertEquals(0, deviceStore.getPorts(DID1).size());
+
+ // putBack Device, Port w/o annotation
+ putDevice(DID1, SW1);
+ List<PortDescription> pds2 = Arrays.<PortDescription>asList(
+ new DefaultPortDescription(P1, true)
+ );
+ deviceStore.updatePorts(PID, DID1, pds2);
+
+ // annotations should not survive
+ assertEquals(2, deviceStore.getDeviceCount());
+ assertEquals(1, deviceStore.getPorts(DID1).size());
+ assertAnnotationsEquals(deviceStore.getDevice(DID1).annotations());
+ assertAnnotationsEquals(deviceStore.getPort(DID1, P1).annotations());
+ }
+
+ // If Delegates should be called only on remote events,
+ // then Simple* should never call them, thus not test required.
+ // TODO add test for Port events when we have them
+ @Ignore("Ignore until Delegate spec. is clear.")
+ @Test
+ public final void testEvents() throws InterruptedException {
+ final CountDownLatch addLatch = new CountDownLatch(1);
+ DeviceStoreDelegate checkAdd = new DeviceStoreDelegate() {
+ @Override
+ public void notify(DeviceEvent event) {
+ assertEquals(DEVICE_ADDED, event.type());
+ assertDevice(DID1, SW1, event.subject());
+ addLatch.countDown();
+ }
+ };
+ final CountDownLatch updateLatch = new CountDownLatch(1);
+ DeviceStoreDelegate checkUpdate = new DeviceStoreDelegate() {
+ @Override
+ public void notify(DeviceEvent event) {
+ assertEquals(DEVICE_UPDATED, event.type());
+ assertDevice(DID1, SW2, event.subject());
+ updateLatch.countDown();
+ }
+ };
+ final CountDownLatch removeLatch = new CountDownLatch(1);
+ DeviceStoreDelegate checkRemove = new DeviceStoreDelegate() {
+ @Override
+ public void notify(DeviceEvent event) {
+ assertEquals(DEVICE_REMOVED, event.type());
+ assertDevice(DID1, SW2, event.subject());
+ removeLatch.countDown();
+ }
+ };
+
+ DeviceDescription description =
+ new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
+ HW, SW1, SN);
+ deviceStore.setDelegate(checkAdd);
+ deviceStore.createOrUpdateDevice(PID, DID1, description);
+ assertTrue("Add event fired", addLatch.await(1, TimeUnit.SECONDS));
+
+
+ DeviceDescription description2 =
+ new DefaultDeviceDescription(DID1.uri(), SWITCH, MFR,
+ HW, SW2, SN);
+ deviceStore.unsetDelegate(checkAdd);
+ deviceStore.setDelegate(checkUpdate);
+ deviceStore.createOrUpdateDevice(PID, DID1, description2);
+ assertTrue("Update event fired", updateLatch.await(1, TimeUnit.SECONDS));
+
+ deviceStore.unsetDelegate(checkUpdate);
+ deviceStore.setDelegate(checkRemove);
+ deviceStore.removeDevice(DID1);
+ assertTrue("Remove event fired", removeLatch.await(1, TimeUnit.SECONDS));
+ }
+
+ private static final class TestGossipDeviceStore extends GossipDeviceStore {
+
+ public TestGossipDeviceStore(
+ ClockService clockService,
+ ClusterService clusterService,
+ ClusterCommunicationService clusterCommunicator) {
+ this.clockService = clockService;
+ this.clusterService = clusterService;
+ this.clusterCommunicator = clusterCommunicator;
+ }
+ }
+
+ private static final class TestClusterCommunicationService implements ClusterCommunicationService {
+ @Override
+ public boolean broadcast(ClusterMessage message) throws IOException { return true; }
+ @Override
+ public boolean unicast(ClusterMessage message, NodeId nodeId) throws IOException { return true; }
+ @Override
+ public boolean multicast(ClusterMessage message, Set<NodeId> nodeIds) throws IOException { return true; }
+ @Override
+ public void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber) {}
+ }
+
+ private static final class TestClusterService implements ClusterService {
+
+ private static final ControllerNode ONOS1 =
+ new DefaultControllerNode(new NodeId("N1"), IpPrefix.valueOf("127.0.0.1"));
+ private final Map<NodeId, ControllerNode> nodes = new HashMap<>();
+ private final Map<NodeId, ControllerNode.State> nodeStates = new HashMap<>();
+
+ public TestClusterService() {
+ nodes.put(new NodeId("N1"), ONOS1);
+ nodeStates.put(new NodeId("N1"), ControllerNode.State.ACTIVE);
+ }
+
+ @Override
+ public ControllerNode getLocalNode() {
+ return ONOS1;
+ }
+
+ @Override
+ public Set<ControllerNode> getNodes() {
+ return Sets.newHashSet(nodes.values());
+ }
+
+ @Override
+ public ControllerNode getNode(NodeId nodeId) {
+ return nodes.get(nodeId);
+ }
+
+ @Override
+ public State getState(NodeId nodeId) {
+ return nodeStates.get(nodeId);
+ }
+
+ @Override
+ public void addListener(ClusterEventListener listener) {
+ }
+
+ @Override
+ public void removeListener(ClusterEventListener listener) {
+ }
+ }
+}
diff --git a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
index f83ac59..61a7374 100644
--- a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
@@ -57,7 +57,7 @@
rawNodes = theInstance.getMap("nodes");
OptionalCacheLoader<NodeId, DefaultControllerNode> nodeLoader
- = new OptionalCacheLoader<>(kryoSerializationService, rawNodes);
+ = new OptionalCacheLoader<>(serializer, rawNodes);
nodes = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
rawNodes.addEntryListener(new RemoteCacheEventHandler<>(nodes), true);
diff --git a/core/store/hz/cluster/src/test/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStoreTest.java b/core/store/hz/cluster/src/test/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStoreTest.java
index 9178e90..bf1bb38 100644
--- a/core/store/hz/cluster/src/test/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStoreTest.java
+++ b/core/store/hz/cluster/src/test/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStoreTest.java
@@ -30,8 +30,7 @@
import org.onlab.onos.store.common.StoreManager;
import org.onlab.onos.store.common.StoreService;
import org.onlab.onos.store.common.TestStoreManager;
-import org.onlab.onos.store.serializers.KryoSerializationManager;
-import org.onlab.onos.store.serializers.KryoSerializationService;
+import org.onlab.onos.store.serializers.KryoSerializer;
import org.onlab.packet.IpPrefix;
import com.google.common.collect.Sets;
@@ -57,7 +56,7 @@
private DistributedMastershipStore dms;
private TestDistributedMastershipStore testStore;
- private KryoSerializationManager serializationMgr;
+ private KryoSerializer serializationMgr;
private StoreManager storeMgr;
@BeforeClass
@@ -76,8 +75,7 @@
storeMgr = new TestStoreManager(Hazelcast.newHazelcastInstance(config));
storeMgr.activate();
- serializationMgr = new KryoSerializationManager();
- serializationMgr.activate();
+ serializationMgr = new KryoSerializer();
dms = new TestDistributedMastershipStore(storeMgr, serializationMgr);
dms.clusterService = new TestClusterService();
@@ -90,8 +88,6 @@
public void tearDown() throws Exception {
dms.deactivate();
- serializationMgr.deactivate();
-
storeMgr.deactivate();
}
@@ -234,9 +230,9 @@
private class TestDistributedMastershipStore extends
DistributedMastershipStore {
public TestDistributedMastershipStore(StoreService storeService,
- KryoSerializationService kryoSerializationService) {
+ KryoSerializer kryoSerialization) {
this.storeService = storeService;
- this.kryoSerializationService = kryoSerializationService;
+ this.serializer = kryoSerialization;
}
//helper to populate master/backup structures
@@ -260,6 +256,7 @@
}
}
+ //a dumb utility function.
public void dump() {
System.out.println("standbys");
for (Map.Entry<byte [], byte []> e : standbys.entrySet()) {
diff --git a/core/store/hz/common/src/main/java/org/onlab/onos/store/common/AbstractHazelcastStore.java b/core/store/hz/common/src/main/java/org/onlab/onos/store/common/AbstractHazelcastStore.java
index 0302105..a22dd89 100644
--- a/core/store/hz/common/src/main/java/org/onlab/onos/store/common/AbstractHazelcastStore.java
+++ b/core/store/hz/common/src/main/java/org/onlab/onos/store/common/AbstractHazelcastStore.java
@@ -15,7 +15,8 @@
import org.onlab.onos.event.Event;
import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.StoreDelegate;
-import org.onlab.onos.store.serializers.KryoSerializationService;
+import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.onos.store.serializers.StoreSerializer;
import org.slf4j.Logger;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -24,7 +25,7 @@
/**
* Abstraction of a distributed store based on Hazelcast.
*/
-@Component(componentAbstract = true)
+@Component
public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDelegate<E>>
extends AbstractStore<E, D> {
@@ -33,13 +34,13 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StoreService storeService;
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected KryoSerializationService kryoSerializationService;
+ protected StoreSerializer serializer;
protected HazelcastInstance theInstance;
@Activate
public void activate() {
+ serializer = new KryoSerializer();
theInstance = storeService.getHazelcastInstance();
}
@@ -50,7 +51,7 @@
* @return serialized object
*/
protected byte[] serialize(Object obj) {
- return kryoSerializationService.serialize(obj);
+ return serializer.encode(obj);
}
/**
@@ -61,7 +62,7 @@
* @return deserialized object
*/
protected <T> T deserialize(byte[] bytes) {
- return kryoSerializationService.deserialize(bytes);
+ return serializer.decode(bytes);
}
diff --git a/core/store/hz/common/src/main/java/org/onlab/onos/store/common/OptionalCacheLoader.java b/core/store/hz/common/src/main/java/org/onlab/onos/store/common/OptionalCacheLoader.java
index f96fdd8..d5fc380 100644
--- a/core/store/hz/common/src/main/java/org/onlab/onos/store/common/OptionalCacheLoader.java
+++ b/core/store/hz/common/src/main/java/org/onlab/onos/store/common/OptionalCacheLoader.java
@@ -2,7 +2,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
-import org.onlab.onos.store.serializers.KryoSerializationService;
+import org.onlab.onos.store.serializers.StoreSerializer;
import com.google.common.base.Optional;
import com.google.common.cache.CacheLoader;
@@ -18,28 +18,28 @@
public final class OptionalCacheLoader<K, V> extends
CacheLoader<K, Optional<V>> {
- private final KryoSerializationService kryoSerializationService;
+ private final StoreSerializer serializer;
private IMap<byte[], byte[]> rawMap;
/**
* Constructor.
*
- * @param kryoSerializationService to use for serialization
+ * @param serializer to use for serialization
* @param rawMap underlying IMap
*/
- public OptionalCacheLoader(KryoSerializationService kryoSerializationService, IMap<byte[], byte[]> rawMap) {
- this.kryoSerializationService = checkNotNull(kryoSerializationService);
+ public OptionalCacheLoader(StoreSerializer serializer, IMap<byte[], byte[]> rawMap) {
+ this.serializer = checkNotNull(serializer);
this.rawMap = checkNotNull(rawMap);
}
@Override
public Optional<V> load(K key) throws Exception {
- byte[] keyBytes = kryoSerializationService.serialize(key);
+ byte[] keyBytes = serializer.encode(key);
byte[] valBytes = rawMap.get(keyBytes);
if (valBytes == null) {
return Optional.absent();
}
- V dev = kryoSerializationService.deserialize(valBytes);
+ V dev = serializer.decode(valBytes);
return Optional.of(dev);
}
}
diff --git a/core/store/hz/net/src/main/java/org/onlab/onos/store/device/impl/DistributedDeviceStore.java b/core/store/hz/net/src/main/java/org/onlab/onos/store/device/impl/DistributedDeviceStore.java
index 5feb1ba..0016939 100644
--- a/core/store/hz/net/src/main/java/org/onlab/onos/store/device/impl/DistributedDeviceStore.java
+++ b/core/store/hz/net/src/main/java/org/onlab/onos/store/device/impl/DistributedDeviceStore.java
@@ -47,6 +47,7 @@
import static org.onlab.onos.net.device.DeviceEvent.Type.*;
import static org.slf4j.LoggerFactory.getLogger;
+//TODO: Add support for multiple provider and annotations
/**
* Manages inventory of infrastructure devices using Hazelcast-backed map.
*/
@@ -87,7 +88,7 @@
// TODO decide on Map name scheme to avoid collision
rawDevices = theInstance.getMap("devices");
final OptionalCacheLoader<DeviceId, DefaultDevice> deviceLoader
- = new OptionalCacheLoader<>(kryoSerializationService, rawDevices);
+ = new OptionalCacheLoader<>(serializer, rawDevices);
devices = new AbsentInvalidatingLoadingCache<>(newBuilder().build(deviceLoader));
// refresh/populate cache based on notification from other instance
devicesListener = rawDevices.addEntryListener(new RemoteDeviceEventHandler(devices), includeValue);
@@ -97,7 +98,7 @@
rawDevicePorts = theInstance.getMap("devicePorts");
final OptionalCacheLoader<DeviceId, Map<PortNumber, Port>> devicePortLoader
- = new OptionalCacheLoader<>(kryoSerializationService, rawDevicePorts);
+ = new OptionalCacheLoader<>(serializer, rawDevicePorts);
devicePorts = new AbsentInvalidatingLoadingCache<>(newBuilder().build(devicePortLoader));
// refresh/populate cache based on notification from other instance
portsListener = rawDevicePorts.addEntryListener(new RemotePortEventHandler(devicePorts), includeValue);
diff --git a/core/store/hz/net/src/main/java/org/onlab/onos/store/device/impl/NoOpClockProviderService.java b/core/store/hz/net/src/main/java/org/onlab/onos/store/device/impl/NoOpClockProviderService.java
new file mode 100644
index 0000000..b68620a
--- /dev/null
+++ b/core/store/hz/net/src/main/java/org/onlab/onos/store/device/impl/NoOpClockProviderService.java
@@ -0,0 +1,20 @@
+package org.onlab.onos.store.device.impl;
+
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.MastershipTerm;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.store.ClockProviderService;
+
+// FIXME: Code clone in onos-core-trivial, onos-core-hz-net
+/**
+ * Dummy implementation of {@link ClockProviderService}.
+ */
+@Component(immediate = true)
+@Service
+public class NoOpClockProviderService implements ClockProviderService {
+
+ @Override
+ public void setMastershipTerm(DeviceId deviceId, MastershipTerm term) {
+ }
+}
diff --git a/core/store/hz/net/src/main/java/org/onlab/onos/store/device/impl/NoOpClockService.java b/core/store/hz/net/src/main/java/org/onlab/onos/store/device/impl/NoOpClockService.java
deleted file mode 100644
index 2c443e9..0000000
--- a/core/store/hz/net/src/main/java/org/onlab/onos/store/device/impl/NoOpClockService.java
+++ /dev/null
@@ -1,32 +0,0 @@
-package org.onlab.onos.store.device.impl;
-
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.onos.cluster.MastershipTerm;
-import org.onlab.onos.net.DeviceId;
-import org.onlab.onos.store.ClockService;
-import org.onlab.onos.store.Timestamp;
-
-// FIXME: Code clone in onos-core-trivial, onos-core-hz-net
-/**
- * Dummy implementation of {@link ClockService}.
- */
-@Component(immediate = true)
-@Service
-public class NoOpClockService implements ClockService {
-
- @Override
- public Timestamp getTimestamp(DeviceId deviceId) {
- return new Timestamp() {
-
- @Override
- public int compareTo(Timestamp o) {
- throw new IllegalStateException("Never expected to be used.");
- }
- };
- }
-
- @Override
- public void setMastershipTerm(DeviceId deviceId, MastershipTerm term) {
- }
-}
diff --git a/core/store/hz/net/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/hz/net/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index 6ec7c51..d49e00b 100644
--- a/core/store/hz/net/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/hz/net/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -1,6 +1,5 @@
package org.onlab.onos.store.flow.impl;
-import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_ADDED;
import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
import static org.slf4j.LoggerFactory.getLogger;
@@ -13,9 +12,10 @@
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.ApplicationId;
import org.onlab.onos.net.DeviceId;
-import org.onlab.onos.net.flow.DefaultFlowRule;
+import org.onlab.onos.net.flow.DefaultFlowEntry;
+import org.onlab.onos.net.flow.FlowEntry;
+import org.onlab.onos.net.flow.FlowEntry.FlowEntryState;
import org.onlab.onos.net.flow.FlowRule;
-import org.onlab.onos.net.flow.FlowRule.FlowRuleState;
import org.onlab.onos.net.flow.FlowRuleEvent;
import org.onlab.onos.net.flow.FlowRuleEvent.Type;
import org.onlab.onos.net.flow.FlowRuleStore;
@@ -28,20 +28,20 @@
import com.google.common.collect.Multimap;
/**
- * TEMPORARY: Manages inventory of flow rules using distributed store implementation.
+ * Manages inventory of flow rules using trivial in-memory implementation.
*/
-//FIXME: I LIE I AM NOT DISTRIBUTED
+//FIXME I LIE. I AIN'T DISTRIBUTED
@Component(immediate = true)
@Service
public class DistributedFlowRuleStore
-extends AbstractStore<FlowRuleEvent, FlowRuleStoreDelegate>
-implements FlowRuleStore {
+ extends AbstractStore<FlowRuleEvent, FlowRuleStoreDelegate>
+ implements FlowRuleStore {
private final Logger log = getLogger(getClass());
// store entries as a pile of rules, no info about device tables
- private final Multimap<DeviceId, FlowRule> flowEntries =
- ArrayListMultimap.<DeviceId, FlowRule>create();
+ private final Multimap<DeviceId, FlowEntry> flowEntries =
+ ArrayListMultimap.<DeviceId, FlowEntry>create();
private final Multimap<ApplicationId, FlowRule> flowEntriesById =
ArrayListMultimap.<ApplicationId, FlowRule>create();
@@ -58,8 +58,13 @@
@Override
- public synchronized FlowRule getFlowRule(FlowRule rule) {
- for (FlowRule f : flowEntries.get(rule.deviceId())) {
+ public int getFlowRuleCount() {
+ return flowEntries.size();
+ }
+
+ @Override
+ public synchronized FlowEntry getFlowEntry(FlowRule rule) {
+ for (FlowEntry f : flowEntries.get(rule.deviceId())) {
if (f.equals(rule)) {
return f;
}
@@ -68,8 +73,8 @@
}
@Override
- public synchronized Iterable<FlowRule> getFlowEntries(DeviceId deviceId) {
- Collection<FlowRule> rules = flowEntries.get(deviceId);
+ public synchronized Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
+ Collection<FlowEntry> rules = flowEntries.get(deviceId);
if (rules == null) {
return Collections.emptyList();
}
@@ -77,7 +82,7 @@
}
@Override
- public synchronized Iterable<FlowRule> getFlowEntriesByAppId(ApplicationId appId) {
+ public synchronized Iterable<FlowRule> getFlowRulesByAppId(ApplicationId appId) {
Collection<FlowRule> rules = flowEntriesById.get(appId);
if (rules == null) {
return Collections.emptyList();
@@ -87,7 +92,7 @@
@Override
public synchronized void storeFlowRule(FlowRule rule) {
- FlowRule f = new DefaultFlowRule(rule, FlowRuleState.PENDING_ADD);
+ FlowEntry f = new DefaultFlowEntry(rule);
DeviceId did = f.deviceId();
if (!flowEntries.containsEntry(did, f)) {
flowEntries.put(did, f);
@@ -97,57 +102,41 @@
@Override
public synchronized void deleteFlowRule(FlowRule rule) {
- FlowRule f = new DefaultFlowRule(rule, FlowRuleState.PENDING_REMOVE);
- DeviceId did = f.deviceId();
-
- /*
- * find the rule and mark it for deletion.
- * Ultimately a flow removed will come remove it.
- */
-
- if (flowEntries.containsEntry(did, f)) {
- //synchronized (flowEntries) {
- flowEntries.remove(did, f);
- flowEntries.put(did, f);
- flowEntriesById.remove(rule.appId(), rule);
- //}
+ FlowEntry entry = getFlowEntry(rule);
+ if (entry == null) {
+ return;
}
+ entry.setState(FlowEntryState.PENDING_REMOVE);
}
@Override
- public synchronized FlowRuleEvent addOrUpdateFlowRule(FlowRule rule) {
+ public synchronized FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
DeviceId did = rule.deviceId();
// check if this new rule is an update to an existing entry
- if (flowEntries.containsEntry(did, rule)) {
- //synchronized (flowEntries) {
- // Multimaps support duplicates so we have to remove our rule
- // and replace it with the current version.
- flowEntries.remove(did, rule);
- flowEntries.put(did, rule);
- //}
+ FlowEntry stored = getFlowEntry(rule);
+ if (stored != null) {
+ stored.setBytes(rule.bytes());
+ stored.setLife(rule.life());
+ stored.setPackets(rule.packets());
+ if (stored.state() == FlowEntryState.PENDING_ADD) {
+ stored.setState(FlowEntryState.ADDED);
+ return new FlowRuleEvent(Type.RULE_ADDED, rule);
+ }
return new FlowRuleEvent(Type.RULE_UPDATED, rule);
}
flowEntries.put(did, rule);
- return new FlowRuleEvent(RULE_ADDED, rule);
+ return null;
}
@Override
- public synchronized FlowRuleEvent removeFlowRule(FlowRule rule) {
- //synchronized (this) {
+ public synchronized FlowRuleEvent removeFlowRule(FlowEntry rule) {
+ // This is where one could mark a rule as removed and still keep it in the store.
if (flowEntries.remove(rule.deviceId(), rule)) {
return new FlowRuleEvent(RULE_REMOVED, rule);
} else {
return null;
}
- //}
}
-
-
-
-
-
-
-
}
diff --git a/core/store/hz/net/src/main/java/org/onlab/onos/store/link/impl/DistributedLinkStore.java b/core/store/hz/net/src/main/java/org/onlab/onos/store/link/impl/DistributedLinkStore.java
index 5161f2f..3dd42a3 100644
--- a/core/store/hz/net/src/main/java/org/onlab/onos/store/link/impl/DistributedLinkStore.java
+++ b/core/store/hz/net/src/main/java/org/onlab/onos/store/link/impl/DistributedLinkStore.java
@@ -38,6 +38,7 @@
import com.google.common.collect.ImmutableSet.Builder;
import com.hazelcast.core.IMap;
+//TODO: Add support for multiple provider and annotations
/**
* Manages inventory of infrastructure links using Hazelcast-backed map.
*/
@@ -70,7 +71,7 @@
// TODO decide on Map name scheme to avoid collision
rawLinks = theInstance.getMap("links");
final OptionalCacheLoader<LinkKey, DefaultLink> linkLoader
- = new OptionalCacheLoader<>(kryoSerializationService, rawLinks);
+ = new OptionalCacheLoader<>(serializer, rawLinks);
links = new AbsentInvalidatingLoadingCache<>(newBuilder().build(linkLoader));
// refresh/populate cache based on notification from other instance
linksListener = rawLinks.addEntryListener(new RemoteLinkEventHandler(links), includeValue);
diff --git a/core/store/hz/net/src/main/java/org/onlab/onos/store/topology/impl/DistributedTopologyStore.java b/core/store/hz/net/src/main/java/org/onlab/onos/store/topology/impl/DistributedTopologyStore.java
index 4728850..04f5fce 100644
--- a/core/store/hz/net/src/main/java/org/onlab/onos/store/topology/impl/DistributedTopologyStore.java
+++ b/core/store/hz/net/src/main/java/org/onlab/onos/store/topology/impl/DistributedTopologyStore.java
@@ -125,7 +125,8 @@
// Promote the new topology to current and return a ready-to-send event.
synchronized (this) {
current = newTopology;
- return new TopologyEvent(TopologyEvent.Type.TOPOLOGY_CHANGED, current);
+ return new TopologyEvent(TopologyEvent.Type.TOPOLOGY_CHANGED,
+ current, reasons);
}
}
diff --git a/core/store/hz/net/src/test/java/org/onlab/onos/store/device/impl/DistributedDeviceStoreTest.java b/core/store/hz/net/src/test/java/org/onlab/onos/store/device/impl/DistributedDeviceStoreTest.java
index 80c9464..7e2924b 100644
--- a/core/store/hz/net/src/test/java/org/onlab/onos/store/device/impl/DistributedDeviceStoreTest.java
+++ b/core/store/hz/net/src/test/java/org/onlab/onos/store/device/impl/DistributedDeviceStoreTest.java
@@ -36,9 +36,6 @@
import org.onlab.onos.store.common.StoreManager;
import org.onlab.onos.store.common.StoreService;
import org.onlab.onos.store.common.TestStoreManager;
-import org.onlab.onos.store.serializers.KryoSerializationManager;
-import org.onlab.onos.store.serializers.KryoSerializationService;
-
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.hazelcast.config.Config;
@@ -63,7 +60,6 @@
private static final PortNumber P3 = PortNumber.portNumber(3);
private DistributedDeviceStore deviceStore;
- private KryoSerializationManager serializationMgr;
private StoreManager storeManager;
@@ -85,10 +81,7 @@
storeManager = new TestStoreManager(Hazelcast.newHazelcastInstance(config));
storeManager.activate();
- serializationMgr = new KryoSerializationManager();
- serializationMgr.activate();
-
- deviceStore = new TestDistributedDeviceStore(storeManager, serializationMgr);
+ deviceStore = new TestDistributedDeviceStore(storeManager);
deviceStore.activate();
}
@@ -96,8 +89,6 @@
public void tearDown() throws Exception {
deviceStore.deactivate();
- serializationMgr.deactivate();
-
storeManager.deactivate();
}
@@ -392,10 +383,8 @@
}
private class TestDistributedDeviceStore extends DistributedDeviceStore {
- public TestDistributedDeviceStore(StoreService storeService,
- KryoSerializationService kryoSerializationService) {
+ public TestDistributedDeviceStore(StoreService storeService) {
this.storeService = storeService;
- this.kryoSerializationService = kryoSerializationService;
}
}
}
diff --git a/core/store/hz/net/src/test/java/org/onlab/onos/store/link/impl/DistributedLinkStoreTest.java b/core/store/hz/net/src/test/java/org/onlab/onos/store/link/impl/DistributedLinkStoreTest.java
index a76e901..dd959b5 100644
--- a/core/store/hz/net/src/test/java/org/onlab/onos/store/link/impl/DistributedLinkStoreTest.java
+++ b/core/store/hz/net/src/test/java/org/onlab/onos/store/link/impl/DistributedLinkStoreTest.java
@@ -30,9 +30,6 @@
import org.onlab.onos.store.common.StoreManager;
import org.onlab.onos.store.common.StoreService;
import org.onlab.onos.store.common.TestStoreManager;
-import org.onlab.onos.store.serializers.KryoSerializationManager;
-import org.onlab.onos.store.serializers.KryoSerializationService;
-
import com.google.common.collect.Iterables;
import com.hazelcast.config.Config;
import com.hazelcast.core.Hazelcast;
@@ -51,7 +48,6 @@
private static final PortNumber P3 = PortNumber.portNumber(3);
private StoreManager storeManager;
- private KryoSerializationManager serializationMgr;
private DistributedLinkStore linkStore;
@@ -71,17 +67,13 @@
storeManager = new TestStoreManager(Hazelcast.newHazelcastInstance(config));
storeManager.activate();
- serializationMgr = new KryoSerializationManager();
- serializationMgr.activate();
-
- linkStore = new TestDistributedLinkStore(storeManager, serializationMgr);
+ linkStore = new TestDistributedLinkStore(storeManager);
linkStore.activate();
}
@After
public void tearDown() throws Exception {
linkStore.deactivate();
- serializationMgr.deactivate();
storeManager.deactivate();
}
@@ -361,10 +353,8 @@
class TestDistributedLinkStore extends DistributedLinkStore {
- TestDistributedLinkStore(StoreService storeService,
- KryoSerializationService kryoSerializationService) {
+ TestDistributedLinkStore(StoreService storeService) {
this.storeService = storeService;
- this.kryoSerializationService = kryoSerializationService;
}
}
}
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ConnectPointSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ConnectPointSerializer.java
index 46badcb..14a64d2 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ConnectPointSerializer.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ConnectPointSerializer.java
@@ -3,7 +3,6 @@
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.ElementId;
import org.onlab.onos.net.PortNumber;
-
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
@@ -15,7 +14,7 @@
public class ConnectPointSerializer extends Serializer<ConnectPoint> {
/**
- * Default constructor.
+ * Creates {@link ConnectPointSerializer} serializer instance.
*/
public ConnectPointSerializer() {
// non-null, immutable
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/DefaultLinkSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/DefaultLinkSerializer.java
index 5ee273d..06d01b5 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/DefaultLinkSerializer.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/DefaultLinkSerializer.java
@@ -16,7 +16,7 @@
public class DefaultLinkSerializer extends Serializer<DefaultLink> {
/**
- * Default constructor.
+ * Creates {@link DefaultLink} serializer instance.
*/
public DefaultLinkSerializer() {
// non-null, immutable
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/DefaultPortSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/DefaultPortSerializer.java
index 8455e80..5dc310b 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/DefaultPortSerializer.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/DefaultPortSerializer.java
@@ -16,7 +16,7 @@
Serializer<DefaultPort> {
/**
- * Default constructor.
+ * Creates {@link DefaultPort} serializer instance.
*/
public DefaultPortSerializer() {
// non-null, immutable
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/DeviceIdSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/DeviceIdSerializer.java
index c63b676..36d0a21 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/DeviceIdSerializer.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/DeviceIdSerializer.java
@@ -14,6 +14,14 @@
*/
public final class DeviceIdSerializer extends Serializer<DeviceId> {
+ /**
+ * Creates {@link DeviceId} serializer instance.
+ */
+ public DeviceIdSerializer() {
+ // non-null, immutable
+ super(false, true);
+ }
+
@Override
public void write(Kryo kryo, Output output, DeviceId object) {
kryo.writeObject(output, object.uri());
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ImmutableMapSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ImmutableMapSerializer.java
index 244cc57..734033f 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ImmutableMapSerializer.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ImmutableMapSerializer.java
@@ -19,6 +19,9 @@
private final MapSerializer mapSerializer = new MapSerializer();
+ /**
+ * Creates {@link ImmutableMap} serializer instance.
+ */
public ImmutableMapSerializer() {
// non-null, immutable
super(false, true);
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ImmutableSetSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ImmutableSetSerializer.java
index c08bf9a..051a843 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ImmutableSetSerializer.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ImmutableSetSerializer.java
@@ -18,6 +18,9 @@
private final CollectionSerializer serializer = new CollectionSerializer();
+ /**
+ * Creates {@link ImmutableSet} serializer instance.
+ */
public ImmutableSetSerializer() {
// non-null, immutable
super(false, true);
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/IpAddressSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/IpAddressSerializer.java
new file mode 100644
index 0000000..b923df7
--- /dev/null
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/IpAddressSerializer.java
@@ -0,0 +1,41 @@
+package org.onlab.onos.store.serializers;
+
+import org.onlab.packet.IpAddress;
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+/**
+ * Kryo Serializer for {@link IpAddress}.
+ */
+public class IpAddressSerializer extends Serializer<IpAddress> {
+
+ /**
+ * Creates {@link IpAddress} serializer instance.
+ */
+ public IpAddressSerializer() {
+ // non-null, immutable
+ super(false, true);
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output,
+ IpAddress object) {
+ byte[] octs = object.toOctets();
+ output.writeInt(octs.length);
+ output.writeBytes(octs);
+ output.writeInt(object.prefixLength());
+ }
+
+ @Override
+ public IpAddress read(Kryo kryo, Input input,
+ Class<IpAddress> type) {
+ int octLen = input.readInt();
+ byte[] octs = new byte[octLen];
+ input.read(octs);
+ int prefLen = input.readInt();
+ return IpAddress.valueOf(octs, prefLen);
+ }
+
+}
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/IpPrefixSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/IpPrefixSerializer.java
index 2dbec57..2e92692 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/IpPrefixSerializer.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/IpPrefixSerializer.java
@@ -13,7 +13,7 @@
public final class IpPrefixSerializer extends Serializer<IpPrefix> {
/**
- * Default constructor.
+ * Creates {@link IpPrefix} serializer instance.
*/
public IpPrefixSerializer() {
// non-null, immutable
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java
new file mode 100644
index 0000000..0c33cfe
--- /dev/null
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java
@@ -0,0 +1,85 @@
+package org.onlab.onos.store.serializers;
+
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+
+import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.cluster.DefaultControllerNode;
+import org.onlab.onos.cluster.MastershipTerm;
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.net.ConnectPoint;
+import org.onlab.onos.net.DefaultAnnotations;
+import org.onlab.onos.net.DefaultDevice;
+import org.onlab.onos.net.DefaultLink;
+import org.onlab.onos.net.DefaultPort;
+import org.onlab.onos.net.Device;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.Element;
+import org.onlab.onos.net.Link;
+import org.onlab.onos.net.LinkKey;
+import org.onlab.onos.net.MastershipRole;
+import org.onlab.onos.net.Port;
+import org.onlab.onos.net.PortNumber;
+import org.onlab.onos.net.device.DefaultDeviceDescription;
+import org.onlab.onos.net.device.DefaultPortDescription;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.IpPrefix;
+import org.onlab.util.KryoPool;
+
+import de.javakaffee.kryoserializers.URISerializer;
+
+public final class KryoPoolUtil {
+
+ /**
+ * KryoPool which can serialize ON.lab misc classes.
+ */
+ public static final KryoPool MISC = KryoPool.newBuilder()
+ .register(IpPrefix.class, new IpPrefixSerializer())
+ .register(IpAddress.class, new IpAddressSerializer())
+ .build();
+
+ // TODO: Populate other classes
+ /**
+ * KryoPool which can serialize API bundle classes.
+ */
+ public static final KryoPool API = KryoPool.newBuilder()
+ .register(MISC)
+ .register(
+ //
+ ArrayList.class,
+ Arrays.asList().getClass(),
+ HashMap.class,
+ //
+ ControllerNode.State.class,
+ Device.Type.class,
+ DefaultAnnotations.class,
+ DefaultControllerNode.class,
+ DefaultDevice.class,
+ DefaultDeviceDescription.class,
+ MastershipRole.class,
+ Port.class,
+ DefaultPortDescription.class,
+ Element.class,
+ Link.Type.class
+ )
+ .register(URI.class, new URISerializer())
+ .register(NodeId.class, new NodeIdSerializer())
+ .register(ProviderId.class, new ProviderIdSerializer())
+ .register(DeviceId.class, new DeviceIdSerializer())
+ .register(PortNumber.class, new PortNumberSerializer())
+ .register(DefaultPort.class, new DefaultPortSerializer())
+ .register(LinkKey.class, new LinkKeySerializer())
+ .register(ConnectPoint.class, new ConnectPointSerializer())
+ .register(DefaultLink.class, new DefaultLinkSerializer())
+ .register(MastershipTerm.class, new MastershipTermSerializer())
+ .register(MastershipRole.class, new MastershipRoleSerializer())
+
+ .build();
+
+
+ // not to be instantiated
+ private KryoPoolUtil() {}
+}
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializationManager.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializationManager.java
deleted file mode 100644
index 04d1a88..0000000
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializationManager.java
+++ /dev/null
@@ -1,115 +0,0 @@
-package org.onlab.onos.store.serializers;
-
-import de.javakaffee.kryoserializers.URISerializer;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.onos.cluster.ControllerNode;
-import org.onlab.onos.cluster.DefaultControllerNode;
-import org.onlab.onos.cluster.NodeId;
-import org.onlab.onos.net.ConnectPoint;
-import org.onlab.onos.net.DefaultAnnotations;
-import org.onlab.onos.net.DefaultDevice;
-import org.onlab.onos.net.DefaultLink;
-import org.onlab.onos.net.DefaultPort;
-import org.onlab.onos.net.Device;
-import org.onlab.onos.net.DeviceId;
-import org.onlab.onos.net.Element;
-import org.onlab.onos.net.Link;
-import org.onlab.onos.net.LinkKey;
-import org.onlab.onos.net.MastershipRole;
-import org.onlab.onos.net.Port;
-import org.onlab.onos.net.PortNumber;
-import org.onlab.onos.net.provider.ProviderId;
-import org.onlab.packet.IpPrefix;
-import org.onlab.util.KryoPool;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.URI;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-
-/**
- * Serialization service using Kryo.
- */
-@Component(immediate = true)
-@Service
-public class KryoSerializationManager implements KryoSerializationService {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
- private KryoPool serializerPool;
-
-
- @Activate
- public void activate() {
- setupKryoPool();
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- log.info("Stopped");
- }
-
- /**
- * Sets up the common serialzers pool.
- */
- protected void setupKryoPool() {
- // FIXME Slice out types used in common to separate pool/namespace.
- serializerPool = KryoPool.newBuilder()
- .register(ArrayList.class,
- HashMap.class,
-
- ControllerNode.State.class,
- Device.Type.class,
-
- DefaultAnnotations.class,
- DefaultControllerNode.class,
- DefaultDevice.class,
- MastershipRole.class,
- Port.class,
- Element.class,
-
- Link.Type.class
- )
- .register(IpPrefix.class, new IpPrefixSerializer())
- .register(URI.class, new URISerializer())
- .register(NodeId.class, new NodeIdSerializer())
- .register(ProviderId.class, new ProviderIdSerializer())
- .register(DeviceId.class, new DeviceIdSerializer())
- .register(PortNumber.class, new PortNumberSerializer())
- .register(DefaultPort.class, new DefaultPortSerializer())
- .register(LinkKey.class, new LinkKeySerializer())
- .register(ConnectPoint.class, new ConnectPointSerializer())
- .register(DefaultLink.class, new DefaultLinkSerializer())
- .build()
- .populate(1);
- }
-
- @Override
- public byte[] serialize(final Object obj) {
- return serializerPool.serialize(obj);
- }
-
- @Override
- public <T> T deserialize(final byte[] bytes) {
- if (bytes == null) {
- return null;
- }
- return serializerPool.deserialize(bytes);
- }
-
- @Override
- public void serialize(Object obj, ByteBuffer buffer) {
- serializerPool.serialize(obj, buffer);
- }
-
- @Override
- public <T> T deserialize(ByteBuffer buffer) {
- return serializerPool.deserialize(buffer);
- }
-
-}
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializationService.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializationService.java
deleted file mode 100644
index 385128c..0000000
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializationService.java
+++ /dev/null
@@ -1,46 +0,0 @@
-package org.onlab.onos.store.serializers;
-
-import java.nio.ByteBuffer;
-
-// TODO: To be replaced with SerializationService from IOLoop activity
-/**
- * Service to serialize Objects into byte array.
- */
-public interface KryoSerializationService {
-
- /**
- * Serializes the specified object into bytes using one of the
- * pre-registered serializers.
- *
- * @param obj object to be serialized
- * @return serialized bytes
- */
- public byte[] serialize(final Object obj);
-
- /**
- * Serializes the specified object into bytes using one of the
- * pre-registered serializers.
- *
- * @param obj object to be serialized
- * @param buffer to write serialized bytes
- */
- public void serialize(final Object obj, ByteBuffer buffer);
-
- /**
- * Deserializes the specified bytes into an object using one of the
- * pre-registered serializers.
- *
- * @param bytes bytes to be deserialized
- * @return deserialized object
- */
- public <T> T deserialize(final byte[] bytes);
-
- /**
- * Deserializes the specified bytes into an object using one of the
- * pre-registered serializers.
- *
- * @param buffer bytes to be deserialized
- * @return deserialized object
- */
- public <T> T deserialize(final ByteBuffer buffer);
-}
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializer.java
new file mode 100644
index 0000000..738086e
--- /dev/null
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializer.java
@@ -0,0 +1,55 @@
+package org.onlab.onos.store.serializers;
+
+import org.onlab.util.KryoPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+
+/**
+ * StoreSerializer implementation using Kryo.
+ */
+public class KryoSerializer implements StoreSerializer {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ protected KryoPool serializerPool;
+
+
+ public KryoSerializer() {
+ setupKryoPool();
+ }
+
+ /**
+ * Sets up the common serialzers pool.
+ */
+ protected void setupKryoPool() {
+ serializerPool = KryoPool.newBuilder()
+ .register(KryoPoolUtil.API)
+ .build()
+ .populate(1);
+ }
+
+ @Override
+ public byte[] encode(final Object obj) {
+ return serializerPool.serialize(obj);
+ }
+
+ @Override
+ public <T> T decode(final byte[] bytes) {
+ if (bytes == null) {
+ return null;
+ }
+ return serializerPool.deserialize(bytes);
+ }
+
+ @Override
+ public void encode(Object obj, ByteBuffer buffer) {
+ serializerPool.serialize(obj, buffer);
+ }
+
+ @Override
+ public <T> T decode(ByteBuffer buffer) {
+ return serializerPool.deserialize(buffer);
+ }
+
+}
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/LinkKeySerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/LinkKeySerializer.java
index f635f3c..bafee4f 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/LinkKeySerializer.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/LinkKeySerializer.java
@@ -2,6 +2,7 @@
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.LinkKey;
+
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
@@ -13,7 +14,7 @@
public class LinkKeySerializer extends Serializer<LinkKey> {
/**
- * Default constructor.
+ * Creates {@link LinkKey} serializer instance.
*/
public LinkKeySerializer() {
// non-null, immutable
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/MastershipRoleSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/MastershipRoleSerializer.java
index 3903491..dab5aa8 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/MastershipRoleSerializer.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/MastershipRoleSerializer.java
@@ -12,6 +12,14 @@
*/
public class MastershipRoleSerializer extends Serializer<MastershipRole> {
+ /**
+ * Creates {@link MastershipRole} serializer instance.
+ */
+ public MastershipRoleSerializer() {
+ // non-null, immutable
+ super(false, true);
+ }
+
@Override
public MastershipRole read(Kryo kryo, Input input, Class<MastershipRole> type) {
final String role = kryo.readObject(input, String.class);
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/MastershipTermSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/MastershipTermSerializer.java
index a5d6198..0ac61a8 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/MastershipTermSerializer.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/MastershipTermSerializer.java
@@ -2,7 +2,6 @@
import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.cluster.NodeId;
-
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
@@ -13,9 +12,17 @@
*/
public class MastershipTermSerializer extends Serializer<MastershipTerm> {
+ /**
+ * Creates {@link MastershipTerm} serializer instance.
+ */
+ public MastershipTermSerializer() {
+ // non-null, immutable
+ super(false, true);
+ }
+
@Override
public MastershipTerm read(Kryo kryo, Input input, Class<MastershipTerm> type) {
- final NodeId node = new NodeId(kryo.readObject(input, String.class));
+ final NodeId node = new NodeId(input.readString());
final int term = input.readInt();
return MastershipTerm.of(node, term);
}
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/NodeIdSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/NodeIdSerializer.java
index ef9d3f1..460b63d 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/NodeIdSerializer.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/NodeIdSerializer.java
@@ -4,6 +4,7 @@
import com.esotericsoftware.kryo.Serializer;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
+
import org.onlab.onos.cluster.NodeId;
/**
@@ -11,14 +12,22 @@
*/
public final class NodeIdSerializer extends Serializer<NodeId> {
+ /**
+ * Creates {@link NodeId} serializer instance.
+ */
+ public NodeIdSerializer() {
+ // non-null, immutable
+ super(false, true);
+ }
+
@Override
public void write(Kryo kryo, Output output, NodeId object) {
- kryo.writeObject(output, object.toString());
+ output.writeString(object.toString());
}
@Override
public NodeId read(Kryo kryo, Input input, Class<NodeId> type) {
- final String id = kryo.readObject(input, String.class);
+ final String id = input.readString();
return new NodeId(id);
}
}
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/PortNumberSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/PortNumberSerializer.java
index 02805bb..3792966 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/PortNumberSerializer.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/PortNumberSerializer.java
@@ -14,7 +14,7 @@
Serializer<PortNumber> {
/**
- * Default constructor.
+ * Creates {@link PortNumber} serializer instance.
*/
public PortNumberSerializer() {
// non-null, immutable
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ProviderIdSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ProviderIdSerializer.java
index 1a1c6f6..060ac7d 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ProviderIdSerializer.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/ProviderIdSerializer.java
@@ -13,7 +13,7 @@
public class ProviderIdSerializer extends Serializer<ProviderId> {
/**
- * Default constructor.
+ * Creates {@link ProviderId} serializer instance.
*/
public ProviderIdSerializer() {
// non-null, immutable
@@ -24,13 +24,15 @@
public void write(Kryo kryo, Output output, ProviderId object) {
output.writeString(object.scheme());
output.writeString(object.id());
+ output.writeBoolean(object.isAncillary());
}
@Override
public ProviderId read(Kryo kryo, Input input, Class<ProviderId> type) {
String scheme = input.readString();
String id = input.readString();
- return new ProviderId(scheme, id);
+ boolean isAncillary = input.readBoolean();
+ return new ProviderId(scheme, id, isAncillary);
}
}
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/StoreSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/StoreSerializer.java
new file mode 100644
index 0000000..6c43a1b
--- /dev/null
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/StoreSerializer.java
@@ -0,0 +1,42 @@
+package org.onlab.onos.store.serializers;
+
+import java.nio.ByteBuffer;
+
+// TODO: To be replaced with SerializationService from IOLoop activity
+/**
+ * Service to serialize Objects into byte array.
+ */
+public interface StoreSerializer {
+
+ /**
+ * Serializes the specified object into bytes.
+ *
+ * @param obj object to be serialized
+ * @return serialized bytes
+ */
+ public byte[] encode(final Object obj);
+
+ /**
+ * Serializes the specified object into bytes.
+ *
+ * @param obj object to be serialized
+ * @param buffer to write serialized bytes
+ */
+ public void encode(final Object obj, ByteBuffer buffer);
+
+ /**
+ * Deserializes the specified bytes into an object.
+ *
+ * @param bytes bytes to be deserialized
+ * @return deserialized object
+ */
+ public <T> T decode(final byte[] bytes);
+
+ /**
+ * Deserializes the specified bytes into an object.
+ *
+ * @param buffer bytes to be deserialized
+ * @return deserialized object
+ */
+ public <T> T decode(final ByteBuffer buffer);
+}
diff --git a/core/store/serializers/src/test/java/org/onlab/onos/store/serializers/KryoSerializerTests.java b/core/store/serializers/src/test/java/org/onlab/onos/store/serializers/KryoSerializerTest.java
similarity index 61%
rename from core/store/serializers/src/test/java/org/onlab/onos/store/serializers/KryoSerializerTests.java
rename to core/store/serializers/src/test/java/org/onlab/onos/store/serializers/KryoSerializerTest.java
index c972d1a..d651d56 100644
--- a/core/store/serializers/src/test/java/org/onlab/onos/store/serializers/KryoSerializerTests.java
+++ b/core/store/serializers/src/test/java/org/onlab/onos/store/serializers/KryoSerializerTest.java
@@ -1,12 +1,10 @@
package org.onlab.onos.store.serializers;
+import static org.junit.Assert.assertEquals;
import static org.onlab.onos.net.DeviceId.deviceId;
import static org.onlab.onos.net.PortNumber.portNumber;
-import java.net.URI;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
import org.junit.After;
import org.junit.Before;
@@ -14,7 +12,9 @@
import org.junit.Test;
import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.net.Annotations;
import org.onlab.onos.net.ConnectPoint;
+import org.onlab.onos.net.DefaultAnnotations;
import org.onlab.onos.net.DefaultDevice;
import org.onlab.onos.net.DefaultLink;
import org.onlab.onos.net.DefaultPort;
@@ -24,7 +24,9 @@
import org.onlab.onos.net.LinkKey;
import org.onlab.onos.net.MastershipRole;
import org.onlab.onos.net.PortNumber;
+import org.onlab.onos.net.SparseAnnotations;
import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.packet.IpAddress;
import org.onlab.packet.IpPrefix;
import org.onlab.util.KryoPool;
@@ -32,10 +34,10 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.testing.EqualsTester;
-import de.javakaffee.kryoserializers.URISerializer;
+public class KryoSerializerTest {
-public class KryoSerializerTests {
private static final ProviderId PID = new ProviderId("of", "foo");
+ private static final ProviderId PIDA = new ProviderId("of", "foo", true);
private static final DeviceId DID1 = deviceId("of:foo");
private static final DeviceId DID2 = deviceId("of:bar");
private static final PortNumber P1 = portNumber(1);
@@ -48,44 +50,23 @@
private static final String SW2 = "3.9.5";
private static final String SN = "43311-12345";
private static final Device DEV1 = new DefaultDevice(PID, DID1, Device.Type.SWITCH, MFR, HW, SW1, SN);
+ private static final SparseAnnotations A1 = DefaultAnnotations.builder()
+ .set("A1", "a1")
+ .set("B1", "b1")
+ .build();
+ private static final SparseAnnotations A1_2 = DefaultAnnotations.builder()
+ .remove("A1")
+ .set("B3", "b3")
+ .build();
private static KryoPool kryos;
@BeforeClass
public static void setUpBeforeClass() throws Exception {
kryos = KryoPool.newBuilder()
- .register(
- ArrayList.class,
- HashMap.class
- )
- .register(
- Device.Type.class,
- Link.Type.class
-
-// ControllerNode.State.class,
-// DefaultControllerNode.class,
-// MastershipRole.class,
-// Port.class,
-// Element.class,
- )
- .register(ConnectPoint.class, new ConnectPointSerializer())
- .register(DefaultLink.class, new DefaultLinkSerializer())
- .register(DefaultPort.class, new DefaultPortSerializer())
- .register(DeviceId.class, new DeviceIdSerializer())
+ .register(KryoPoolUtil.API)
.register(ImmutableMap.class, new ImmutableMapSerializer())
.register(ImmutableSet.class, new ImmutableSetSerializer())
- .register(IpPrefix.class, new IpPrefixSerializer())
- .register(LinkKey.class, new LinkKeySerializer())
- .register(NodeId.class, new NodeIdSerializer())
- .register(PortNumber.class, new PortNumberSerializer())
- .register(ProviderId.class, new ProviderIdSerializer())
-
- .register(DefaultDevice.class)
-
- .register(URI.class, new URISerializer())
-
- .register(MastershipRole.class, new MastershipRoleSerializer())
- .register(MastershipTerm.class, new MastershipTermSerializer())
.build();
}
@@ -112,10 +93,12 @@
@Test
- public final void test() {
+ public final void testSerialization() {
testSerialized(new ConnectPoint(DID1, P1));
testSerialized(new DefaultLink(PID, CP1, CP2, Link.Type.DIRECT));
testSerialized(new DefaultPort(DEV1, P1, true));
+ testSerialized(new DefaultLink(PID, CP1, CP2, Link.Type.DIRECT, A1));
+ testSerialized(new DefaultPort(DEV1, P1, true, A1_2));
testSerialized(DID1);
testSerialized(ImmutableMap.of(DID1, DEV1, DID2, DEV1));
testSerialized(ImmutableMap.of(DID1, DEV1));
@@ -124,10 +107,41 @@
testSerialized(ImmutableSet.of(DID1));
testSerialized(ImmutableSet.of());
testSerialized(IpPrefix.valueOf("192.168.0.1/24"));
+ testSerialized(IpAddress.valueOf("192.168.0.1"));
testSerialized(new LinkKey(CP1, CP2));
testSerialized(new NodeId("SomeNodeIdentifier"));
testSerialized(P1);
testSerialized(PID);
+ testSerialized(PIDA);
+ testSerialized(new NodeId("bar"));
+ testSerialized(MastershipTerm.of(new NodeId("foo"), 2));
+ for (MastershipRole role : MastershipRole.values()) {
+ testSerialized(role);
+ }
+ }
+
+ @Test
+ public final void testAnnotations() {
+ // Annotations does not have equals defined, manually test equality
+ final byte[] a1Bytes = kryos.serialize(A1);
+ SparseAnnotations copiedA1 = kryos.deserialize(a1Bytes);
+ assertAnnotationsEquals(copiedA1, A1);
+
+ final byte[] a12Bytes = kryos.serialize(A1_2);
+ SparseAnnotations copiedA12 = kryos.deserialize(a12Bytes);
+ assertAnnotationsEquals(copiedA12, A1_2);
+ }
+
+ // code clone
+ public static void assertAnnotationsEquals(Annotations actual, SparseAnnotations... annotations) {
+ SparseAnnotations expected = DefaultAnnotations.builder().build();
+ for (SparseAnnotations a : annotations) {
+ expected = DefaultAnnotations.union(expected, a);
+ }
+ assertEquals(expected.keys(), actual.keys());
+ for (String key : expected.keys()) {
+ assertEquals(expected.value(key), actual.value(key));
+ }
}
}
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/NoOpClockProviderService.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/NoOpClockProviderService.java
new file mode 100644
index 0000000..ff4b31a
--- /dev/null
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/NoOpClockProviderService.java
@@ -0,0 +1,20 @@
+package org.onlab.onos.store.trivial.impl;
+
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.MastershipTerm;
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.store.ClockProviderService;
+
+//FIXME: Code clone in onos-core-trivial, onos-core-hz-net
+/**
+ * Dummy implementation of {@link ClockProviderService}.
+ */
+@Component(immediate = true)
+@Service
+public class NoOpClockProviderService implements ClockProviderService {
+
+ @Override
+ public void setMastershipTerm(DeviceId deviceId, MastershipTerm term) {
+ }
+}
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/NoOpClockService.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/NoOpClockService.java
deleted file mode 100644
index b3f8320..0000000
--- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/NoOpClockService.java
+++ /dev/null
@@ -1,32 +0,0 @@
-package org.onlab.onos.store.trivial.impl;
-
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.onos.cluster.MastershipTerm;
-import org.onlab.onos.net.DeviceId;
-import org.onlab.onos.store.ClockService;
-import org.onlab.onos.store.Timestamp;
-
-//FIXME: Code clone in onos-core-trivial, onos-core-hz-net
-/**
- * Dummy implementation of {@link ClockService}.
- */
-@Component(immediate = true)
-@Service
-public class NoOpClockService implements ClockService {
-
- @Override
- public Timestamp getTimestamp(DeviceId deviceId) {
- return new Timestamp() {
-
- @Override
- public int compareTo(Timestamp o) {
- throw new IllegalStateException("Never expected to be used.");
- }
- };
- }
-
- @Override
- public void setMastershipTerm(DeviceId deviceId, MastershipTerm term) {
- }
-}
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleDeviceStore.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleDeviceStore.java
index 0b0ae37..514a22e 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleDeviceStore.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleDeviceStore.java
@@ -2,6 +2,8 @@
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import org.apache.commons.lang3.concurrent.ConcurrentException;
import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
@@ -9,7 +11,7 @@
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
-import org.onlab.onos.net.Annotations;
+import org.onlab.onos.net.AnnotationsUtil;
import org.onlab.onos.net.DefaultAnnotations;
import org.onlab.onos.net.DefaultDevice;
import org.onlab.onos.net.DefaultPort;
@@ -28,10 +30,10 @@
import org.onlab.onos.net.device.PortDescription;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.AbstractStore;
+import org.onlab.util.NewConcurrentHashMap;
import org.slf4j.Logger;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
@@ -47,12 +49,13 @@
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Predicates.notNull;
+import static com.google.common.base.Verify.verify;
import static org.onlab.onos.net.device.DeviceEvent.Type.*;
import static org.slf4j.LoggerFactory.getLogger;
import static org.apache.commons.lang3.concurrent.ConcurrentUtils.createIfAbsentUnchecked;
+import static org.onlab.onos.net.DefaultAnnotations.union;
import static org.onlab.onos.net.DefaultAnnotations.merge;
-// TODO: synchronization should be done in more fine-grained manner.
/**
* Manages inventory of infrastructure devices using trivial in-memory
* structures implementation.
@@ -70,14 +73,14 @@
// collection of Description given from various providers
private final ConcurrentMap<DeviceId,
ConcurrentMap<ProviderId, DeviceDescriptions>>
- deviceDescs = new ConcurrentHashMap<>();
+ deviceDescs = Maps.newConcurrentMap();
// cache of Device and Ports generated by compositing descriptions from providers
- private final ConcurrentMap<DeviceId, Device> devices = new ConcurrentHashMap<>();
- private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = new ConcurrentHashMap<>();
+ private final ConcurrentMap<DeviceId, Device> devices = Maps.newConcurrentMap();
+ private final ConcurrentMap<DeviceId, ConcurrentMap<PortNumber, Port>> devicePorts = Maps.newConcurrentMap();
// available(=UP) devices
- private final Set<DeviceId> availableDevices = new HashSet<>();
+ private final Set<DeviceId> availableDevices = Sets.newConcurrentHashSet();
@Activate
@@ -87,6 +90,10 @@
@Deactivate
public void deactivate() {
+ deviceDescs.clear();
+ devices.clear();
+ devicePorts.clear();
+ availableDevices.clear();
log.info("Stopped");
}
@@ -106,117 +113,142 @@
}
@Override
- public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId, DeviceId deviceId,
+ public DeviceEvent createOrUpdateDevice(ProviderId providerId,
+ DeviceId deviceId,
DeviceDescription deviceDescription) {
+
ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs
- = createIfAbsentUnchecked(deviceDescs, deviceId,
- new InitConcurrentHashMap<ProviderId, DeviceDescriptions>());
+ = getDeviceDescriptions(deviceId);
- Device oldDevice = devices.get(deviceId);
+ synchronized (providerDescs) {
+ // locking per device
- DeviceDescriptions descs
- = createIfAbsentUnchecked(providerDescs, providerId,
- new InitDeviceDescs(deviceDescription));
+ DeviceDescriptions descs
+ = createIfAbsentUnchecked(providerDescs, providerId,
+ new InitDeviceDescs(deviceDescription));
- // update description
- descs.putDeviceDesc(deviceDescription);
- Device newDevice = composeDevice(deviceId, providerDescs);
+ Device oldDevice = devices.get(deviceId);
+ // update description
+ descs.putDeviceDesc(deviceDescription);
+ Device newDevice = composeDevice(deviceId, providerDescs);
- if (oldDevice == null) {
- // ADD
- return createDevice(providerId, newDevice);
- } else {
- // UPDATE or ignore (no change or stale)
- return updateDevice(providerId, oldDevice, newDevice);
+ if (oldDevice == null) {
+ // ADD
+ return createDevice(providerId, newDevice);
+ } else {
+ // UPDATE or ignore (no change or stale)
+ return updateDevice(providerId, oldDevice, newDevice);
+ }
}
}
// Creates the device and returns the appropriate event if necessary.
+ // Guarded by deviceDescs value (=Device lock)
private DeviceEvent createDevice(ProviderId providerId, Device newDevice) {
// update composed device cache
- synchronized (this) {
- devices.putIfAbsent(newDevice.id(), newDevice);
- if (!providerId.isAncillary()) {
- availableDevices.add(newDevice.id());
- }
+ Device oldDevice = devices.putIfAbsent(newDevice.id(), newDevice);
+ verify(oldDevice == null,
+ "Unexpected Device in cache. PID:%s [old=%s, new=%s]",
+ providerId, oldDevice, newDevice);
+
+ if (!providerId.isAncillary()) {
+ availableDevices.add(newDevice.id());
}
return new DeviceEvent(DeviceEvent.Type.DEVICE_ADDED, newDevice, null);
}
// Updates the device and returns the appropriate event if necessary.
+ // Guarded by deviceDescs value (=Device lock)
private DeviceEvent updateDevice(ProviderId providerId, Device oldDevice, Device newDevice) {
// We allow only certain attributes to trigger update
if (!Objects.equals(oldDevice.hwVersion(), newDevice.hwVersion()) ||
!Objects.equals(oldDevice.swVersion(), newDevice.swVersion()) ||
- !isAnnotationsEqual(oldDevice.annotations(), newDevice.annotations())) {
+ !AnnotationsUtil.isEqual(oldDevice.annotations(), newDevice.annotations())) {
- synchronized (this) {
- devices.replace(newDevice.id(), oldDevice, newDevice);
- if (!providerId.isAncillary()) {
- availableDevices.add(newDevice.id());
- }
+ boolean replaced = devices.replace(newDevice.id(), oldDevice, newDevice);
+ if (!replaced) {
+ verify(replaced,
+ "Replacing devices cache failed. PID:%s [expected:%s, found:%s, new=%s]",
+ providerId, oldDevice, devices.get(newDevice.id())
+ , newDevice);
+ }
+ if (!providerId.isAncillary()) {
+ availableDevices.add(newDevice.id());
}
return new DeviceEvent(DeviceEvent.Type.DEVICE_UPDATED, newDevice, null);
}
// Otherwise merely attempt to change availability if primary provider
if (!providerId.isAncillary()) {
- synchronized (this) {
boolean added = availableDevices.add(newDevice.id());
return !added ? null :
new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, newDevice, null);
- }
}
return null;
}
@Override
public DeviceEvent markOffline(DeviceId deviceId) {
- synchronized (this) {
+ ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs
+ = getDeviceDescriptions(deviceId);
+
+ // locking device
+ synchronized (providerDescs) {
Device device = devices.get(deviceId);
- boolean removed = (device != null) && availableDevices.remove(deviceId);
- return !removed ? null :
- new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
+ if (device == null) {
+ return null;
+ }
+ boolean removed = availableDevices.remove(deviceId);
+ if (removed) {
+ // TODO: broadcast ... DOWN only?
+ return new DeviceEvent(DEVICE_AVAILABILITY_CHANGED, device, null);
+ }
+ return null;
}
}
@Override
- public synchronized List<DeviceEvent> updatePorts(ProviderId providerId, DeviceId deviceId,
- List<PortDescription> portDescriptions) {
+ public List<DeviceEvent> updatePorts(ProviderId providerId,
+ DeviceId deviceId,
+ List<PortDescription> portDescriptions) {
- // TODO: implement multi-provider
Device device = devices.get(deviceId);
checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
- DeviceDescriptions descs = descsMap.get(providerId);
- checkArgument(descs != null,
- "Device description for Device ID %s from Provider %s was not found",
- deviceId, providerId);
-
-
List<DeviceEvent> events = new ArrayList<>();
- synchronized (this) {
- ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
+ synchronized (descsMap) {
+ DeviceDescriptions descs = descsMap.get(providerId);
+ // every provider must provide DeviceDescription.
+ checkArgument(descs != null,
+ "Device description for Device ID %s from Provider %s was not found",
+ deviceId, providerId);
+
+ Map<PortNumber, Port> ports = getPortMap(deviceId);
// Add new ports
Set<PortNumber> processed = new HashSet<>();
for (PortDescription portDescription : portDescriptions) {
- PortNumber number = portDescription.portNumber();
- Port oldPort = ports.get(number);
+ final PortNumber number = portDescription.portNumber();
+ processed.add(portDescription.portNumber());
+
+ final Port oldPort = ports.get(number);
+ final Port newPort;
+
+// event suppression hook?
+
// update description
descs.putPortDesc(portDescription);
- Port newPort = composePort(device, number, descsMap);
+ newPort = composePort(device, number, descsMap);
events.add(oldPort == null ?
- createPort(device, newPort, ports) :
- updatePort(device, oldPort, newPort, ports));
- processed.add(portDescription.portNumber());
+ createPort(device, newPort, ports) :
+ updatePort(device, oldPort, newPort, ports));
}
events.addAll(pruneOldPorts(device, ports, processed));
@@ -226,19 +258,21 @@
// Creates a new port based on the port description adds it to the map and
// Returns corresponding event.
+ // Guarded by deviceDescs value (=Device lock)
private DeviceEvent createPort(Device device, Port newPort,
- ConcurrentMap<PortNumber, Port> ports) {
+ Map<PortNumber, Port> ports) {
ports.put(newPort.number(), newPort);
return new DeviceEvent(PORT_ADDED, device, newPort);
}
// Checks if the specified port requires update and if so, it replaces the
// existing entry in the map and returns corresponding event.
+ // Guarded by deviceDescs value (=Device lock)
private DeviceEvent updatePort(Device device, Port oldPort,
Port newPort,
- ConcurrentMap<PortNumber, Port> ports) {
+ Map<PortNumber, Port> ports) {
if (oldPort.isEnabled() != newPort.isEnabled() ||
- !isAnnotationsEqual(oldPort.annotations(), newPort.annotations())) {
+ !AnnotationsUtil.isEqual(oldPort.annotations(), newPort.annotations())) {
ports.put(oldPort.number(), newPort);
return new DeviceEvent(PORT_UPDATED, device, newPort);
@@ -248,6 +282,7 @@
// Prunes the specified list of ports based on which ports are in the
// processed list and returns list of corresponding events.
+ // Guarded by deviceDescs value (=Device lock)
private List<DeviceEvent> pruneOldPorts(Device device,
Map<PortNumber, Port> ports,
Set<PortNumber> processed) {
@@ -268,11 +303,17 @@
// exist, it creates and registers a new one.
private ConcurrentMap<PortNumber, Port> getPortMap(DeviceId deviceId) {
return createIfAbsentUnchecked(devicePorts, deviceId,
- new InitConcurrentHashMap<PortNumber, Port>());
+ NewConcurrentHashMap.<PortNumber, Port>ifNeeded());
+ }
+
+ private ConcurrentMap<ProviderId, DeviceDescriptions> getDeviceDescriptions(
+ DeviceId deviceId) {
+ return createIfAbsentUnchecked(deviceDescs, deviceId,
+ NewConcurrentHashMap.<ProviderId, DeviceDescriptions>ifNeeded());
}
@Override
- public synchronized DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId,
+ public DeviceEvent updatePortStatus(ProviderId providerId, DeviceId deviceId,
PortDescription portDescription) {
Device device = devices.get(deviceId);
checkArgument(device != null, DEVICE_NOT_FOUND, deviceId);
@@ -280,19 +321,22 @@
ConcurrentMap<ProviderId, DeviceDescriptions> descsMap = deviceDescs.get(deviceId);
checkArgument(descsMap != null, DEVICE_NOT_FOUND, deviceId);
- DeviceDescriptions descs = descsMap.get(providerId);
- // assuming all providers must to give DeviceDescription
- checkArgument(descs != null,
- "Device description for Device ID %s from Provider %s was not found",
- deviceId, providerId);
+ synchronized (descsMap) {
+ DeviceDescriptions descs = descsMap.get(providerId);
+ // assuming all providers must to give DeviceDescription
+ checkArgument(descs != null,
+ "Device description for Device ID %s from Provider %s was not found",
+ deviceId, providerId);
- synchronized (this) {
ConcurrentMap<PortNumber, Port> ports = getPortMap(deviceId);
final PortNumber number = portDescription.portNumber();
- Port oldPort = ports.get(number);
+ final Port oldPort = ports.get(number);
+ final Port newPort;
+
// update description
descs.putPortDesc(portDescription);
- Port newPort = composePort(device, number, descsMap);
+ newPort = composePort(device, number, descsMap);
+
if (oldPort == null) {
return createPort(device, newPort, ports);
} else {
@@ -323,31 +367,19 @@
@Override
public DeviceEvent removeDevice(DeviceId deviceId) {
- synchronized (this) {
+ ConcurrentMap<ProviderId, DeviceDescriptions> descs = getDeviceDescriptions(deviceId);
+ synchronized (descs) {
Device device = devices.remove(deviceId);
- return device == null ? null :
- new DeviceEvent(DEVICE_REMOVED, device, null);
- }
- }
-
- private static boolean isAnnotationsEqual(Annotations lhs, Annotations rhs) {
- if (lhs == rhs) {
- return true;
- }
- if (lhs == null || rhs == null) {
- return false;
- }
-
- if (!lhs.keys().equals(rhs.keys())) {
- return false;
- }
-
- for (String key : lhs.keys()) {
- if (!lhs.value(key).equals(rhs.value(key))) {
- return false;
+ // should DEVICE_REMOVED carry removed ports?
+ Map<PortNumber, Port> ports = devicePorts.get(deviceId);
+ if (ports != null) {
+ ports.clear();
}
+ availableDevices.remove(deviceId);
+ descs.clear();
+ return device == null ? null :
+ new DeviceEvent(DEVICE_REMOVED, device, null);
}
- return true;
}
/**
@@ -366,14 +398,14 @@
DeviceDescriptions desc = providerDescs.get(primary);
- // base
- Type type = desc.getDeviceDesc().type();
- String manufacturer = desc.getDeviceDesc().manufacturer();
- String hwVersion = desc.getDeviceDesc().hwVersion();
- String swVersion = desc.getDeviceDesc().swVersion();
- String serialNumber = desc.getDeviceDesc().serialNumber();
+ final DeviceDescription base = desc.getDeviceDesc();
+ Type type = base.type();
+ String manufacturer = base.manufacturer();
+ String hwVersion = base.hwVersion();
+ String swVersion = base.swVersion();
+ String serialNumber = base.serialNumber();
DefaultAnnotations annotations = DefaultAnnotations.builder().build();
- annotations = merge(annotations, desc.getDeviceDesc().annotations());
+ annotations = merge(annotations, base.annotations());
for (Entry<ProviderId, DeviceDescriptions> e : providerDescs.entrySet()) {
if (e.getKey().equals(primary)) {
@@ -392,7 +424,14 @@
hwVersion, swVersion, serialNumber, annotations);
}
- // probably want composePort"s" also
+ /**
+ * Returns a Port, merging description given from multiple Providers.
+ *
+ * @param device device the port is on
+ * @param number port number
+ * @param providerDescs Collection of Descriptions from multiple providers
+ * @return Port instance
+ */
private Port composePort(Device device, PortNumber number,
ConcurrentMap<ProviderId, DeviceDescriptions> providerDescs) {
@@ -445,18 +484,11 @@
return fallBackPrimary;
}
- // TODO: can be made generic
- private static final class InitConcurrentHashMap<K, V> implements
- ConcurrentInitializer<ConcurrentMap<K, V>> {
- @Override
- public ConcurrentMap<K, V> get() throws ConcurrentException {
- return new ConcurrentHashMap<>();
- }
- }
-
public static final class InitDeviceDescs
implements ConcurrentInitializer<DeviceDescriptions> {
+
private final DeviceDescription deviceDesc;
+
public InitDeviceDescs(DeviceDescription deviceDesc) {
this.deviceDesc = checkNotNull(deviceDesc);
}
@@ -471,8 +503,6 @@
* Collection of Description of a Device and it's Ports given from a Provider.
*/
private static class DeviceDescriptions {
- // private final DeviceId id;
- // private final ProviderId pid;
private final AtomicReference<DeviceDescription> deviceDesc;
private final ConcurrentMap<PortNumber, PortDescription> portDescs;
@@ -490,10 +520,6 @@
return portDescs.get(number);
}
- public Collection<PortDescription> getPortDescs() {
- return Collections.unmodifiableCollection(portDescs.values());
- }
-
/**
* Puts DeviceDescription, merging annotations as necessary.
*
@@ -504,7 +530,7 @@
DeviceDescription oldOne = deviceDesc.get();
DeviceDescription newOne = newDesc;
if (oldOne != null) {
- SparseAnnotations merged = merge(oldOne.annotations(),
+ SparseAnnotations merged = union(oldOne.annotations(),
newDesc.annotations());
newOne = new DefaultDeviceDescription(newOne, merged);
}
@@ -521,7 +547,7 @@
PortDescription oldOne = portDescs.get(newDesc.portNumber());
PortDescription newOne = newDesc;
if (oldOne != null) {
- SparseAnnotations merged = merge(oldOne.annotations(),
+ SparseAnnotations merged = union(oldOne.annotations(),
newDesc.annotations());
newOne = new DefaultPortDescription(newOne, merged);
}
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
index d12d00e..7ff797c 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
@@ -12,9 +12,10 @@
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.ApplicationId;
import org.onlab.onos.net.DeviceId;
-import org.onlab.onos.net.flow.DefaultFlowRule;
+import org.onlab.onos.net.flow.DefaultFlowEntry;
+import org.onlab.onos.net.flow.FlowEntry;
+import org.onlab.onos.net.flow.FlowEntry.FlowEntryState;
import org.onlab.onos.net.flow.FlowRule;
-import org.onlab.onos.net.flow.FlowRule.FlowRuleState;
import org.onlab.onos.net.flow.FlowRuleEvent;
import org.onlab.onos.net.flow.FlowRuleEvent.Type;
import org.onlab.onos.net.flow.FlowRuleStore;
@@ -38,8 +39,8 @@
private final Logger log = getLogger(getClass());
// store entries as a pile of rules, no info about device tables
- private final Multimap<DeviceId, FlowRule> flowEntries =
- ArrayListMultimap.<DeviceId, FlowRule>create();
+ private final Multimap<DeviceId, FlowEntry> flowEntries =
+ ArrayListMultimap.<DeviceId, FlowEntry>create();
private final Multimap<ApplicationId, FlowRule> flowEntriesById =
ArrayListMultimap.<ApplicationId, FlowRule>create();
@@ -56,8 +57,13 @@
@Override
- public synchronized FlowRule getFlowRule(FlowRule rule) {
- for (FlowRule f : flowEntries.get(rule.deviceId())) {
+ public int getFlowRuleCount() {
+ return flowEntries.size();
+ }
+
+ @Override
+ public synchronized FlowEntry getFlowEntry(FlowRule rule) {
+ for (FlowEntry f : flowEntries.get(rule.deviceId())) {
if (f.equals(rule)) {
return f;
}
@@ -66,8 +72,8 @@
}
@Override
- public synchronized Iterable<FlowRule> getFlowEntries(DeviceId deviceId) {
- Collection<FlowRule> rules = flowEntries.get(deviceId);
+ public synchronized Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
+ Collection<FlowEntry> rules = flowEntries.get(deviceId);
if (rules == null) {
return Collections.emptyList();
}
@@ -75,7 +81,7 @@
}
@Override
- public synchronized Iterable<FlowRule> getFlowEntriesByAppId(ApplicationId appId) {
+ public synchronized Iterable<FlowRule> getFlowRulesByAppId(ApplicationId appId) {
Collection<FlowRule> rules = flowEntriesById.get(appId);
if (rules == null) {
return Collections.emptyList();
@@ -85,7 +91,7 @@
@Override
public synchronized void storeFlowRule(FlowRule rule) {
- FlowRule f = new DefaultFlowRule(rule, FlowRuleState.PENDING_ADD);
+ FlowEntry f = new DefaultFlowEntry(rule);
DeviceId did = f.deviceId();
if (!flowEntries.containsEntry(did, f)) {
flowEntries.put(did, f);
@@ -95,51 +101,42 @@
@Override
public synchronized void deleteFlowRule(FlowRule rule) {
- FlowRule f = new DefaultFlowRule(rule, FlowRuleState.PENDING_REMOVE);
- DeviceId did = f.deviceId();
-
- /*
- * find the rule and mark it for deletion.
- * Ultimately a flow removed will come remove it.
- */
-
- if (flowEntries.containsEntry(did, f)) {
- flowEntries.remove(did, f);
- flowEntries.put(did, f);
- flowEntriesById.remove(rule.appId(), rule);
+ FlowEntry entry = getFlowEntry(rule);
+ if (entry == null) {
+ //log.warn("Cannot find rule {}", rule);
+ return;
}
+ entry.setState(FlowEntryState.PENDING_REMOVE);
}
@Override
- public synchronized FlowRuleEvent addOrUpdateFlowRule(FlowRule rule) {
+ public synchronized FlowRuleEvent addOrUpdateFlowRule(FlowEntry rule) {
DeviceId did = rule.deviceId();
// check if this new rule is an update to an existing entry
- FlowRule stored = getFlowRule(rule);
+ FlowEntry stored = getFlowEntry(rule);
if (stored != null) {
- // Multimaps support duplicates so we have to remove our rule
- // and replace it with the current version.
- flowEntries.remove(did, rule);
- flowEntries.put(did, rule);
-
- if (stored.state() == FlowRuleState.PENDING_ADD) {
+ stored.setBytes(rule.bytes());
+ stored.setLife(rule.life());
+ stored.setPackets(rule.packets());
+ if (stored.state() == FlowEntryState.PENDING_ADD) {
+ stored.setState(FlowEntryState.ADDED);
return new FlowRuleEvent(Type.RULE_ADDED, rule);
}
return new FlowRuleEvent(Type.RULE_UPDATED, rule);
}
- flowEntries.put(did, rule);
+ //flowEntries.put(did, rule);
return null;
}
@Override
- public synchronized FlowRuleEvent removeFlowRule(FlowRule rule) {
- //synchronized (this) {
+ public synchronized FlowRuleEvent removeFlowRule(FlowEntry rule) {
+ // This is where one could mark a rule as removed and still keep it in the store.
if (flowEntries.remove(rule.deviceId(), rule)) {
return new FlowRuleEvent(RULE_REMOVED, rule);
} else {
return null;
}
- //}
}
}
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleIntentStore.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleIntentStore.java
new file mode 100644
index 0000000..732d753
--- /dev/null
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleIntentStore.java
@@ -0,0 +1,108 @@
+package org.onlab.onos.store.trivial.impl;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.net.intent.InstallableIntent;
+import org.onlab.onos.net.intent.Intent;
+import org.onlab.onos.net.intent.IntentEvent;
+import org.onlab.onos.net.intent.IntentId;
+import org.onlab.onos.net.intent.IntentState;
+import org.onlab.onos.net.intent.IntentStore;
+import org.onlab.onos.net.intent.IntentStoreDelegate;
+import org.onlab.onos.store.AbstractStore;
+import org.slf4j.Logger;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.onlab.onos.net.intent.IntentState.*;
+import static org.slf4j.LoggerFactory.getLogger;
+
+@Component(immediate = true)
+@Service
+public class SimpleIntentStore
+ extends AbstractStore<IntentEvent, IntentStoreDelegate>
+ implements IntentStore {
+
+ private final Logger log = getLogger(getClass());
+ private final Map<IntentId, Intent> intents = new HashMap<>();
+ private final Map<IntentId, IntentState> states = new HashMap<>();
+ private final Map<IntentId, List<InstallableIntent>> installable = new HashMap<>();
+
+ @Activate
+ public void activate() {
+ log.info("Started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ log.info("Stopped");
+ }
+
+ @Override
+ public IntentEvent createIntent(Intent intent) {
+ intents.put(intent.id(), intent);
+ return this.setState(intent, IntentState.SUBMITTED);
+ }
+
+ @Override
+ public IntentEvent removeIntent(IntentId intentId) {
+ Intent intent = intents.remove(intentId);
+ installable.remove(intentId);
+ IntentEvent event = this.setState(intent, WITHDRAWN);
+ states.remove(intentId);
+ return event;
+ }
+
+ @Override
+ public long getIntentCount() {
+ return intents.size();
+ }
+
+ @Override
+ public Iterable<Intent> getIntents() {
+ return ImmutableSet.copyOf(intents.values());
+ }
+
+ @Override
+ public Intent getIntent(IntentId intentId) {
+ return intents.get(intentId);
+ }
+
+ @Override
+ public IntentState getIntentState(IntentId id) {
+ return states.get(id);
+ }
+
+ @Override
+ public IntentEvent setState(Intent intent, IntentState state) {
+ IntentId id = intent.id();
+ states.put(id, state);
+ IntentEvent.Type type = (state == SUBMITTED ? IntentEvent.Type.SUBMITTED :
+ (state == INSTALLED ? IntentEvent.Type.INSTALLED :
+ (state == FAILED ? IntentEvent.Type.FAILED :
+ state == WITHDRAWN ? IntentEvent.Type.WITHDRAWN :
+ null)));
+ return type == null ? null : new IntentEvent(type, intent);
+ }
+
+ @Override
+ public void addInstallableIntents(IntentId intentId, List<InstallableIntent> result) {
+ installable.put(intentId, result);
+ }
+
+ @Override
+ public List<InstallableIntent> getInstallableIntents(IntentId intentId) {
+ return installable.get(intentId);
+ }
+
+ @Override
+ public void removeInstalledIntents(IntentId intentId) {
+ installable.remove(intentId);
+ }
+
+}
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleLinkStore.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleLinkStore.java
index a0e569d..daf28df 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleLinkStore.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleLinkStore.java
@@ -1,36 +1,51 @@
package org.onlab.onos.store.trivial.impl;
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
import com.google.common.collect.HashMultimap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Multimap;
+import com.google.common.collect.SetMultimap;
+import org.apache.commons.lang3.concurrent.ConcurrentUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.net.AnnotationsUtil;
import org.onlab.onos.net.ConnectPoint;
+import org.onlab.onos.net.DefaultAnnotations;
import org.onlab.onos.net.DefaultLink;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Link;
+import org.onlab.onos.net.SparseAnnotations;
+import org.onlab.onos.net.Link.Type;
import org.onlab.onos.net.LinkKey;
+import org.onlab.onos.net.Provided;
+import org.onlab.onos.net.link.DefaultLinkDescription;
import org.onlab.onos.net.link.LinkDescription;
import org.onlab.onos.net.link.LinkEvent;
import org.onlab.onos.net.link.LinkStore;
import org.onlab.onos.net.link.LinkStoreDelegate;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.AbstractStore;
+import org.onlab.util.NewConcurrentHashMap;
import org.slf4j.Logger;
import java.util.Collections;
import java.util.HashSet;
-import java.util.Map;
import java.util.Set;
+import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import static org.onlab.onos.net.DefaultAnnotations.union;
+import static org.onlab.onos.net.DefaultAnnotations.merge;
import static org.onlab.onos.net.Link.Type.DIRECT;
import static org.onlab.onos.net.Link.Type.INDIRECT;
import static org.onlab.onos.net.link.LinkEvent.Type.*;
import static org.slf4j.LoggerFactory.getLogger;
+import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
+import static com.google.common.base.Predicates.notNull;
/**
* Manages inventory of infrastructure links using trivial in-memory structures
@@ -45,11 +60,17 @@
private final Logger log = getLogger(getClass());
// Link inventory
- private final Map<LinkKey, DefaultLink> links = new ConcurrentHashMap<>();
+ private final ConcurrentMap<LinkKey,
+ ConcurrentMap<ProviderId, LinkDescription>>
+ linkDescs = new ConcurrentHashMap<>();
+
+ // Link instance cache
+ private final ConcurrentMap<LinkKey, Link> links = new ConcurrentHashMap<>();
// Egress and ingress link sets
- private final Multimap<DeviceId, Link> srcLinks = HashMultimap.create();
- private final Multimap<DeviceId, Link> dstLinks = HashMultimap.create();
+ private final SetMultimap<DeviceId, LinkKey> srcLinks = createSynchronizedHashMultiMap();
+ private final SetMultimap<DeviceId, LinkKey> dstLinks = createSynchronizedHashMultiMap();
+
@Activate
public void activate() {
@@ -58,6 +79,10 @@
@Deactivate
public void deactivate() {
+ linkDescs.clear();
+ links.clear();
+ srcLinks.clear();
+ dstLinks.clear();
log.info("Stopped");
}
@@ -68,17 +93,29 @@
@Override
public Iterable<Link> getLinks() {
- return Collections.unmodifiableSet(new HashSet<Link>(links.values()));
+ return Collections.unmodifiableCollection(links.values());
}
@Override
public Set<Link> getDeviceEgressLinks(DeviceId deviceId) {
- return ImmutableSet.copyOf(srcLinks.get(deviceId));
+ // lock for iteration
+ synchronized (srcLinks) {
+ return FluentIterable.from(srcLinks.get(deviceId))
+ .transform(lookupLink())
+ .filter(notNull())
+ .toSet();
+ }
}
@Override
public Set<Link> getDeviceIngressLinks(DeviceId deviceId) {
- return ImmutableSet.copyOf(dstLinks.get(deviceId));
+ // lock for iteration
+ synchronized (dstLinks) {
+ return FluentIterable.from(dstLinks.get(deviceId))
+ .transform(lookupLink())
+ .filter(notNull())
+ .toSet();
+ }
}
@Override
@@ -89,9 +126,9 @@
@Override
public Set<Link> getEgressLinks(ConnectPoint src) {
Set<Link> egress = new HashSet<>();
- for (Link link : srcLinks.get(src.deviceId())) {
- if (link.src().equals(src)) {
- egress.add(link);
+ for (LinkKey linkKey : srcLinks.get(src.deviceId())) {
+ if (linkKey.src().equals(src)) {
+ egress.add(links.get(linkKey));
}
}
return egress;
@@ -100,9 +137,9 @@
@Override
public Set<Link> getIngressLinks(ConnectPoint dst) {
Set<Link> ingress = new HashSet<>();
- for (Link link : dstLinks.get(dst.deviceId())) {
- if (link.dst().equals(dst)) {
- ingress.add(link);
+ for (LinkKey linkKey : dstLinks.get(dst.deviceId())) {
+ if (linkKey.dst().equals(dst)) {
+ ingress.add(links.get(linkKey));
}
}
return ingress;
@@ -112,56 +149,172 @@
public LinkEvent createOrUpdateLink(ProviderId providerId,
LinkDescription linkDescription) {
LinkKey key = new LinkKey(linkDescription.src(), linkDescription.dst());
- DefaultLink link = links.get(key);
- if (link == null) {
- return createLink(providerId, key, linkDescription);
+
+ ConcurrentMap<ProviderId, LinkDescription> descs = getLinkDescriptions(key);
+ synchronized (descs) {
+ final Link oldLink = links.get(key);
+ // update description
+ createOrUpdateLinkDescription(descs, providerId, linkDescription);
+ final Link newLink = composeLink(descs);
+ if (oldLink == null) {
+ return createLink(key, newLink);
+ }
+ return updateLink(key, oldLink, newLink);
}
- return updateLink(providerId, link, key, linkDescription);
+ }
+
+ // Guarded by linkDescs value (=locking each Link)
+ private LinkDescription createOrUpdateLinkDescription(
+ ConcurrentMap<ProviderId, LinkDescription> descs,
+ ProviderId providerId,
+ LinkDescription linkDescription) {
+
+ // merge existing attributes and merge
+ LinkDescription oldDesc = descs.get(providerId);
+ LinkDescription newDesc = linkDescription;
+ if (oldDesc != null) {
+ SparseAnnotations merged = union(oldDesc.annotations(),
+ linkDescription.annotations());
+ newDesc = new DefaultLinkDescription(
+ linkDescription.src(),
+ linkDescription.dst(),
+ linkDescription.type(), merged);
+ }
+ return descs.put(providerId, newDesc);
}
// Creates and stores the link and returns the appropriate event.
- private LinkEvent createLink(ProviderId providerId, LinkKey key,
- LinkDescription linkDescription) {
- DefaultLink link = new DefaultLink(providerId, key.src(), key.dst(),
- linkDescription.type());
- synchronized (this) {
- links.put(key, link);
- srcLinks.put(link.src().deviceId(), link);
- dstLinks.put(link.dst().deviceId(), link);
+ // Guarded by linkDescs value (=locking each Link)
+ private LinkEvent createLink(LinkKey key, Link newLink) {
+
+ if (newLink.providerId().isAncillary()) {
+ // TODO: revisit ancillary only Link handling
+
+ // currently treating ancillary only as down (not visible outside)
+ return null;
}
- return new LinkEvent(LINK_ADDED, link);
+
+ links.put(key, newLink);
+ srcLinks.put(newLink.src().deviceId(), key);
+ dstLinks.put(newLink.dst().deviceId(), key);
+ return new LinkEvent(LINK_ADDED, newLink);
}
// Updates, if necessary the specified link and returns the appropriate event.
- private LinkEvent updateLink(ProviderId providerId, DefaultLink link,
- LinkKey key, LinkDescription linkDescription) {
- if (link.type() == INDIRECT && linkDescription.type() == DIRECT) {
- synchronized (this) {
- srcLinks.remove(link.src().deviceId(), link);
- dstLinks.remove(link.dst().deviceId(), link);
+ // Guarded by linkDescs value (=locking each Link)
+ private LinkEvent updateLink(LinkKey key, Link oldLink, Link newLink) {
- DefaultLink updated =
- new DefaultLink(providerId, link.src(), link.dst(),
- linkDescription.type());
- links.put(key, updated);
- srcLinks.put(link.src().deviceId(), updated);
- dstLinks.put(link.dst().deviceId(), updated);
- return new LinkEvent(LINK_UPDATED, updated);
- }
+ if (newLink.providerId().isAncillary()) {
+ // TODO: revisit ancillary only Link handling
+
+ // currently treating ancillary only as down (not visible outside)
+ return null;
+ }
+
+ if ((oldLink.type() == INDIRECT && newLink.type() == DIRECT) ||
+ !AnnotationsUtil.isEqual(oldLink.annotations(), newLink.annotations())) {
+
+ links.put(key, newLink);
+ // strictly speaking following can be ommitted
+ srcLinks.put(oldLink.src().deviceId(), key);
+ dstLinks.put(oldLink.dst().deviceId(), key);
+ return new LinkEvent(LINK_UPDATED, newLink);
}
return null;
}
@Override
public LinkEvent removeLink(ConnectPoint src, ConnectPoint dst) {
- synchronized (this) {
- Link link = links.remove(new LinkKey(src, dst));
+ final LinkKey key = new LinkKey(src, dst);
+ ConcurrentMap<ProviderId, LinkDescription> descs = getLinkDescriptions(key);
+ synchronized (descs) {
+ Link link = links.remove(key);
+ descs.clear();
if (link != null) {
- srcLinks.remove(link.src().deviceId(), link);
- dstLinks.remove(link.dst().deviceId(), link);
+ srcLinks.remove(link.src().deviceId(), key);
+ dstLinks.remove(link.dst().deviceId(), key);
return new LinkEvent(LINK_REMOVED, link);
}
return null;
}
}
+
+ private static <K, V> SetMultimap<K, V> createSynchronizedHashMultiMap() {
+ return synchronizedSetMultimap(HashMultimap.<K, V>create());
+ }
+
+ /**
+ * @return primary ProviderID, or randomly chosen one if none exists
+ */
+ private ProviderId pickPrimaryPID(
+ ConcurrentMap<ProviderId, LinkDescription> providerDescs) {
+
+ ProviderId fallBackPrimary = null;
+ for (Entry<ProviderId, LinkDescription> e : providerDescs.entrySet()) {
+ if (!e.getKey().isAncillary()) {
+ return e.getKey();
+ } else if (fallBackPrimary == null) {
+ // pick randomly as a fallback in case there is no primary
+ fallBackPrimary = e.getKey();
+ }
+ }
+ return fallBackPrimary;
+ }
+
+ private Link composeLink(ConcurrentMap<ProviderId, LinkDescription> descs) {
+ ProviderId primary = pickPrimaryPID(descs);
+ LinkDescription base = descs.get(primary);
+
+ ConnectPoint src = base.src();
+ ConnectPoint dst = base.dst();
+ Type type = base.type();
+ DefaultAnnotations annotations = DefaultAnnotations.builder().build();
+ annotations = merge(annotations, base.annotations());
+
+ for (Entry<ProviderId, LinkDescription> e : descs.entrySet()) {
+ if (primary.equals(e.getKey())) {
+ continue;
+ }
+
+ // TODO: should keep track of Description timestamp
+ // and only merge conflicting keys when timestamp is newer
+ // Currently assuming there will never be a key conflict between
+ // providers
+
+ // annotation merging. not so efficient, should revisit later
+ annotations = merge(annotations, e.getValue().annotations());
+ }
+
+ return new DefaultLink(primary , src, dst, type, annotations);
+ }
+
+ private ConcurrentMap<ProviderId, LinkDescription> getLinkDescriptions(LinkKey key) {
+ return ConcurrentUtils.createIfAbsentUnchecked(linkDescs, key,
+ NewConcurrentHashMap.<ProviderId, LinkDescription>ifNeeded());
+ }
+
+ private final Function<LinkKey, Link> lookupLink = new LookupLink();
+ private Function<LinkKey, Link> lookupLink() {
+ return lookupLink;
+ }
+
+ private final class LookupLink implements Function<LinkKey, Link> {
+ @Override
+ public Link apply(LinkKey input) {
+ return links.get(input);
+ }
+ }
+
+ private static final Predicate<Provided> IS_PRIMARY = new IsPrimary();
+ private static final Predicate<Provided> isPrimary() {
+ return IS_PRIMARY;
+ }
+
+ private static final class IsPrimary implements Predicate<Provided> {
+
+ @Override
+ public boolean apply(Provided input) {
+ return !input.providerId().isAncillary();
+ }
+ }
}
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleTopologyStore.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleTopologyStore.java
index 4e7d5ed..bd7db8a 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleTopologyStore.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleTopologyStore.java
@@ -124,7 +124,8 @@
// Promote the new topology to current and return a ready-to-send event.
synchronized (this) {
current = newTopology;
- return new TopologyEvent(TopologyEvent.Type.TOPOLOGY_CHANGED, current);
+ return new TopologyEvent(TopologyEvent.Type.TOPOLOGY_CHANGED,
+ current, reasons);
}
}
diff --git a/core/store/trivial/src/test/java/org/onlab/onos/store/trivial/impl/SimpleDeviceStoreTest.java b/core/store/trivial/src/test/java/org/onlab/onos/store/trivial/impl/SimpleDeviceStoreTest.java
index a0d6e1c..146086a 100644
--- a/core/store/trivial/src/test/java/org/onlab/onos/store/trivial/impl/SimpleDeviceStoreTest.java
+++ b/core/store/trivial/src/test/java/org/onlab/onos/store/trivial/impl/SimpleDeviceStoreTest.java
@@ -103,17 +103,19 @@
simpleDeviceStore.deactivate();
}
- private void putDevice(DeviceId deviceId, String swVersion) {
+ private void putDevice(DeviceId deviceId, String swVersion,
+ SparseAnnotations... annotations) {
DeviceDescription description =
new DefaultDeviceDescription(deviceId.uri(), SWITCH, MFR,
- HW, swVersion, SN);
+ HW, swVersion, SN, annotations);
deviceStore.createOrUpdateDevice(PID, deviceId, description);
}
- private void putDeviceAncillary(DeviceId deviceId, String swVersion) {
+ private void putDeviceAncillary(DeviceId deviceId, String swVersion,
+ SparseAnnotations... annotations) {
DeviceDescription description =
new DefaultDeviceDescription(deviceId.uri(), SWITCH, MFR,
- HW, swVersion, SN);
+ HW, swVersion, SN, annotations);
deviceStore.createOrUpdateDevice(PIDA, deviceId, description);
}
@@ -126,6 +128,7 @@
assertEquals(SN, device.serialNumber());
}
+ // TODO slice this out somewhere
/**
* Verifies that Annotations created by merging {@code annotations} is
* equal to actual Annotations.
@@ -133,7 +136,7 @@
* @param actual Annotations to check
* @param annotations
*/
- private static void assertAnnotationsEquals(Annotations actual, SparseAnnotations... annotations) {
+ public static void assertAnnotationsEquals(Annotations actual, SparseAnnotations... annotations) {
DefaultAnnotations expected = DefaultAnnotations.builder().build();
for (SparseAnnotations a : annotations) {
expected = DefaultAnnotations.merge(expected, a);
@@ -347,6 +350,7 @@
assertFalse("Port is disabled", event.port().isEnabled());
}
+
@Test
public final void testUpdatePortStatusAncillary() {
putDeviceAncillary(DID1, SW1);
@@ -435,16 +439,37 @@
@Test
public final void testRemoveDevice() {
- putDevice(DID1, SW1);
+ putDevice(DID1, SW1, A1);
+ List<PortDescription> pds = Arrays.<PortDescription>asList(
+ new DefaultPortDescription(P1, true, A2)
+ );
+ deviceStore.updatePorts(PID, DID1, pds);
putDevice(DID2, SW1);
assertEquals(2, deviceStore.getDeviceCount());
+ assertEquals(1, deviceStore.getPorts(DID1).size());
+ assertAnnotationsEquals(deviceStore.getDevice(DID1).annotations(), A1);
+ assertAnnotationsEquals(deviceStore.getPort(DID1, P1).annotations(), A2);
DeviceEvent event = deviceStore.removeDevice(DID1);
assertEquals(DEVICE_REMOVED, event.type());
assertDevice(DID1, SW1, event.subject());
assertEquals(1, deviceStore.getDeviceCount());
+ assertEquals(0, deviceStore.getPorts(DID1).size());
+
+ // putBack Device, Port w/o annotation
+ putDevice(DID1, SW1);
+ List<PortDescription> pds2 = Arrays.<PortDescription>asList(
+ new DefaultPortDescription(P1, true)
+ );
+ deviceStore.updatePorts(PID, DID1, pds2);
+
+ // annotations should not survive
+ assertEquals(2, deviceStore.getDeviceCount());
+ assertEquals(1, deviceStore.getPorts(DID1).size());
+ assertAnnotationsEquals(deviceStore.getDevice(DID1).annotations());
+ assertAnnotationsEquals(deviceStore.getPort(DID1, P1).annotations());
}
// If Delegates should be called only on remote events,
diff --git a/core/store/trivial/src/test/java/org/onlab/onos/store/trivial/impl/SimpleLinkStoreTest.java b/core/store/trivial/src/test/java/org/onlab/onos/store/trivial/impl/SimpleLinkStoreTest.java
index eb4a312..8a16609 100644
--- a/core/store/trivial/src/test/java/org/onlab/onos/store/trivial/impl/SimpleLinkStoreTest.java
+++ b/core/store/trivial/src/test/java/org/onlab/onos/store/trivial/impl/SimpleLinkStoreTest.java
@@ -4,7 +4,9 @@
import static org.onlab.onos.net.DeviceId.deviceId;
import static org.onlab.onos.net.Link.Type.*;
import static org.onlab.onos.net.link.LinkEvent.Type.*;
+import static org.onlab.onos.store.trivial.impl.SimpleDeviceStoreTest.assertAnnotationsEquals;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@@ -18,10 +20,12 @@
import org.junit.Ignore;
import org.junit.Test;
import org.onlab.onos.net.ConnectPoint;
+import org.onlab.onos.net.DefaultAnnotations;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Link;
import org.onlab.onos.net.LinkKey;
import org.onlab.onos.net.PortNumber;
+import org.onlab.onos.net.SparseAnnotations;
import org.onlab.onos.net.Link.Type;
import org.onlab.onos.net.link.DefaultLinkDescription;
import org.onlab.onos.net.link.LinkEvent;
@@ -37,6 +41,7 @@
public class SimpleLinkStoreTest {
private static final ProviderId PID = new ProviderId("of", "foo");
+ private static final ProviderId PIDA = new ProviderId("of", "bar", true);
private static final DeviceId DID1 = deviceId("of:foo");
private static final DeviceId DID2 = deviceId("of:bar");
@@ -44,6 +49,23 @@
private static final PortNumber P2 = PortNumber.portNumber(2);
private static final PortNumber P3 = PortNumber.portNumber(3);
+ private static final SparseAnnotations A1 = DefaultAnnotations.builder()
+ .set("A1", "a1")
+ .set("B1", "b1")
+ .build();
+ private static final SparseAnnotations A1_2 = DefaultAnnotations.builder()
+ .remove("A1")
+ .set("B3", "b3")
+ .build();
+ private static final SparseAnnotations A2 = DefaultAnnotations.builder()
+ .set("A2", "a2")
+ .set("B2", "b2")
+ .build();
+ private static final SparseAnnotations A2_2 = DefaultAnnotations.builder()
+ .remove("A2")
+ .set("B4", "b4")
+ .build();
+
private SimpleLinkStore simpleLinkStore;
private LinkStore linkStore;
@@ -69,16 +91,17 @@
}
private void putLink(DeviceId srcId, PortNumber srcNum,
- DeviceId dstId, PortNumber dstNum, Type type) {
+ DeviceId dstId, PortNumber dstNum, Type type,
+ SparseAnnotations... annotations) {
ConnectPoint src = new ConnectPoint(srcId, srcNum);
ConnectPoint dst = new ConnectPoint(dstId, dstNum);
- linkStore.createOrUpdateLink(PID, new DefaultLinkDescription(src, dst, type));
+ linkStore.createOrUpdateLink(PID, new DefaultLinkDescription(src, dst, type, annotations));
}
- private void putLink(LinkKey key, Type type) {
+ private void putLink(LinkKey key, Type type, SparseAnnotations... annotations) {
putLink(key.src().deviceId(), key.src().port(),
key.dst().deviceId(), key.dst().port(),
- type);
+ type, annotations);
}
private static void assertLink(DeviceId srcId, PortNumber srcNum,
@@ -270,14 +293,67 @@
}
@Test
+ public final void testCreateOrUpdateLinkAncillary() {
+ ConnectPoint src = new ConnectPoint(DID1, P1);
+ ConnectPoint dst = new ConnectPoint(DID2, P2);
+
+ // add Ancillary link
+ LinkEvent event = linkStore.createOrUpdateLink(PIDA,
+ new DefaultLinkDescription(src, dst, INDIRECT, A1));
+
+ assertNull("Ancillary only link is ignored", event);
+
+ // add Primary link
+ LinkEvent event2 = linkStore.createOrUpdateLink(PID,
+ new DefaultLinkDescription(src, dst, INDIRECT, A2));
+
+ assertLink(DID1, P1, DID2, P2, INDIRECT, event2.subject());
+ assertAnnotationsEquals(event2.subject().annotations(), A2, A1);
+ assertEquals(LINK_ADDED, event2.type());
+
+ // update link type
+ LinkEvent event3 = linkStore.createOrUpdateLink(PID,
+ new DefaultLinkDescription(src, dst, DIRECT, A2));
+ assertLink(DID1, P1, DID2, P2, DIRECT, event3.subject());
+ assertAnnotationsEquals(event3.subject().annotations(), A2, A1);
+ assertEquals(LINK_UPDATED, event3.type());
+
+
+ // no change
+ LinkEvent event4 = linkStore.createOrUpdateLink(PID,
+ new DefaultLinkDescription(src, dst, DIRECT));
+ assertNull("No change event expected", event4);
+
+ // update link annotation (Primary)
+ LinkEvent event5 = linkStore.createOrUpdateLink(PID,
+ new DefaultLinkDescription(src, dst, DIRECT, A2_2));
+ assertLink(DID1, P1, DID2, P2, DIRECT, event5.subject());
+ assertAnnotationsEquals(event5.subject().annotations(), A2, A2_2, A1);
+ assertEquals(LINK_UPDATED, event5.type());
+
+ // update link annotation (Ancillary)
+ LinkEvent event6 = linkStore.createOrUpdateLink(PIDA,
+ new DefaultLinkDescription(src, dst, DIRECT, A1_2));
+ assertLink(DID1, P1, DID2, P2, DIRECT, event6.subject());
+ assertAnnotationsEquals(event6.subject().annotations(), A2, A2_2, A1, A1_2);
+ assertEquals(LINK_UPDATED, event6.type());
+
+ // update link type (Ancillary) : ignored
+ LinkEvent event7 = linkStore.createOrUpdateLink(PIDA,
+ new DefaultLinkDescription(src, dst, EDGE));
+ assertNull("Ancillary change other than annotation is ignored", event7);
+ }
+
+
+ @Test
public final void testRemoveLink() {
final ConnectPoint d1P1 = new ConnectPoint(DID1, P1);
final ConnectPoint d2P2 = new ConnectPoint(DID2, P2);
LinkKey linkId1 = new LinkKey(d1P1, d2P2);
LinkKey linkId2 = new LinkKey(d2P2, d1P1);
- putLink(linkId1, DIRECT);
- putLink(linkId2, DIRECT);
+ putLink(linkId1, DIRECT, A1);
+ putLink(linkId2, DIRECT, A2);
// DID1,P1 => DID2,P2
// DID2,P2 => DID1,P1
@@ -285,10 +361,41 @@
LinkEvent event = linkStore.removeLink(d1P1, d2P2);
assertEquals(LINK_REMOVED, event.type());
+ assertAnnotationsEquals(event.subject().annotations(), A1);
LinkEvent event2 = linkStore.removeLink(d1P1, d2P2);
assertNull(event2);
assertLink(linkId2, DIRECT, linkStore.getLink(d2P2, d1P1));
+ assertAnnotationsEquals(linkStore.getLink(d2P2, d1P1).annotations(), A2);
+
+ // annotations, etc. should not survive remove
+ putLink(linkId1, DIRECT);
+ assertLink(linkId1, DIRECT, linkStore.getLink(d1P1, d2P2));
+ assertAnnotationsEquals(linkStore.getLink(d1P1, d2P2).annotations());
+ }
+
+ @Test
+ public final void testAncillaryOnlyNotVisible() {
+ ConnectPoint src = new ConnectPoint(DID1, P1);
+ ConnectPoint dst = new ConnectPoint(DID2, P2);
+
+ // add Ancillary link
+ linkStore.createOrUpdateLink(PIDA,
+ new DefaultLinkDescription(src, dst, INDIRECT, A1));
+
+ // Ancillary only link should not be visible
+ assertEquals(0, linkStore.getLinkCount());
+
+ assertTrue(Iterables.isEmpty(linkStore.getLinks()));
+
+ assertNull(linkStore.getLink(src, dst));
+
+ assertEquals(Collections.emptySet(), linkStore.getIngressLinks(dst));
+
+ assertEquals(Collections.emptySet(), linkStore.getEgressLinks(src));
+
+ assertEquals(Collections.emptySet(), linkStore.getDeviceEgressLinks(DID1));
+ assertEquals(Collections.emptySet(), linkStore.getDeviceIngressLinks(DID2));
}
// If Delegates should be called only on remote events,