Merge changes I675fbfe2,I33acebec into dev/ramcloud-new-datamodel
* changes:
WIP: Adding PathInstallRuntime for installing flow entries using FlowPusher
WIP: Initial implementation of runtime #2: take PathIntents and build a plan
diff --git a/src/main/java/net/onrc/onos/datagrid/web/DatagridWebRoutable.java b/src/main/java/net/onrc/onos/datagrid/web/DatagridWebRoutable.java
old mode 100644
new mode 100755
index e0e1666..90c7a18
--- a/src/main/java/net/onrc/onos/datagrid/web/DatagridWebRoutable.java
+++ b/src/main/java/net/onrc/onos/datagrid/web/DatagridWebRoutable.java
@@ -18,6 +18,7 @@
Router router = new Router(context);
router.attach("/get/map/{map-name}/json", GetMapResource.class);
router.attach("/add/intent/json", IntentResource.class);
+ router.attach("/get/intents/json", IntentResource.class);
return router;
}
diff --git a/src/main/java/net/onrc/onos/datagrid/web/IntentResource.java b/src/main/java/net/onrc/onos/datagrid/web/IntentResource.java
index 0d50865..0f552d3 100755
--- a/src/main/java/net/onrc/onos/datagrid/web/IntentResource.java
+++ b/src/main/java/net/onrc/onos/datagrid/web/IntentResource.java
@@ -11,6 +11,7 @@
import net.onrc.onos.intent.ShortestPathIntent;
import net.onrc.onos.intent.IntentOperation;
import net.onrc.onos.intent.IntentMap;
+//import net.onrc.onos.intent.Intent.IntentState;
import net.onrc.onos.ofcontroller.networkgraph.INetworkGraphService;
import net.onrc.onos.ofcontroller.networkgraph.NetworkGraph;
import net.onrc.onos.registry.controller.IControllerRegistryService;
@@ -28,6 +29,8 @@
import java.util.LinkedList;
import java.util.Map;
import org.codehaus.jackson.node.ArrayNode;
+import org.codehaus.jackson.node.ObjectNode;
+import org.restlet.resource.Get;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,11 +46,11 @@
private class IntentStatus {
String intentId;
- boolean status;
+ String status;
public IntentStatus() {}
- public IntentStatus(String intentId, boolean status) {
+ public IntentStatus(String intentId, String status) {
this.intentId = intentId;
this.status = status;
}
@@ -60,33 +63,28 @@
this.intentId = intentId;
}
- public boolean getStatus() {
+ public String getStatus() {
return status;
}
- public void setStatus(boolean status) {
+ public void setStatus(String status) {
this.status = status;
}
}
@Post("json")
- public String store(String jsonFlowIntent) throws IOException {
+ public String store(String jsonIntent) throws IOException {
IDatagridService datagridService = (IDatagridService) getContext()
.getAttributes().get(IDatagridService.class.getCanonicalName());
if (datagridService == null) {
log.debug("FlowIntentResource ONOS Datagrid Service not found");
return "";
}
- INetworkGraphService networkGraphService = (INetworkGraphService) getContext()
- .getAttributes().get(
- INetworkGraphService.class.getCanonicalName());
- NetworkGraph graph = networkGraphService.getNetworkGraph();
String reply = "";
ObjectMapper mapper = new ObjectMapper();
JsonNode jNode = null;
try {
- System.out.println("json string " + jsonFlowIntent);
- jNode = mapper.readValue(jsonFlowIntent, JsonNode.class);
+ jNode = mapper.readValue(jsonIntent, JsonNode.class);
} catch (JsonGenerationException ex) {
log.error("JsonGeneration exception ", ex);
} catch (JsonMappingException ex) {
@@ -98,11 +96,15 @@
if (jNode != null) {
Kryo kryo = new Kryo();
reply = parseJsonNode(kryo, jNode.getElements(), datagridService);
- // datagridService.registerIntent(intents);
}
return reply;
}
+ @Get("json")
+ public String retrieve() {
+ return "123";
+ }
+
private String parseJsonNode(Kryo kryo, Iterator<JsonNode> nodes,
IDatagridService datagridService) throws IOException {
LinkedList<IntentOperation> operations = new LinkedList<>();
@@ -119,8 +121,7 @@
data = node.get(fieldName);
parseFields(data, fieldName, fields);
}
- System.out.println("recv fields " + fields);
- boolean status = processIntent(fields, operations);
+ String status = processIntent(fields, operations);
appendIntentStatus(status, (String)fields.get("intent_id"), mapper, arrayNode);
// datagridService.registerIntent(Long.toString(uuid),
// sb.toString().getBytes());
@@ -132,22 +133,25 @@
}
- private void appendIntentStatus(boolean status, final String applnIntentId,
+ private void appendIntentStatus(String status, final String applnIntentId,
ObjectMapper mapper, ArrayNode arrayNode) throws IOException {
+ System.out.println("status " + status);
String intentId = applnIntentId.split(":")[1];
- String boolStr = Boolean.TRUE.toString();
- if (status == false) {
- boolStr = Boolean.FALSE.toString();
- }
- String jsonString = "{\"intent_id\":" + intentId + "," + "\"status\":" + boolStr + "}";
- JsonNode parsedNode = mapper.readValue(jsonString, JsonNode.class);
- arrayNode.add(parsedNode);
+ ObjectNode node = mapper.createObjectNode();
+ node.put("intent_id", intentId);
+ node.put("status", status);
+ arrayNode.add(node);
}
- private boolean processIntent(Map<String, Object> fields, LinkedList<IntentOperation> operations) {
+ private String processIntent(Map<String, Object> fields, LinkedList<IntentOperation> operations) {
String intentType = (String)fields.get("intent_type");
- boolean status = false;
+ String intentOp = (String)fields.get("intent_op");
+ String status = null;
+ IntentOperation.Operator operation = IntentOperation.Operator.ADD;
+ if ((intentOp.equals("remove"))) {
+ operation = IntentOperation.Operator.REMOVE;
+ }
if (intentType.equals("shortest_intent_type")) {
ShortestPathIntent spi = new ShortestPathIntent((String) fields.get("intent_id"),
Long.decode((String) fields.get("srcSwitch")),
@@ -156,8 +160,9 @@
Long.decode((String) fields.get("dstSwitch")),
(long) fields.get("dstPort"),
MACAddress.valueOf((String) fields.get("dstMac")).toLong());
- operations.add(new IntentOperation(IntentOperation.Operator.ADD, spi));
- status = true;
+ operations.add(new IntentOperation(operation, spi));
+ System.out.println("intent operation " + operation.toString());
+ status = (spi.getState()).toString();
} else {
ConstrainedShortestPathIntent cspi = new ConstrainedShortestPathIntent((String) fields.get("intent_id"),
Long.decode((String) fields.get("srcSwitch")),
@@ -167,24 +172,20 @@
(long) fields.get("dstPort"),
MACAddress.valueOf((String) fields.get("dstMac")).toLong(),
(double) fields.get("bandwidth"));
- operations.add(new IntentOperation(IntentOperation.Operator.ADD, cspi));
- status = true;
+ operations.add(new IntentOperation(operation, cspi));
+ status = (cspi.getState()).toString();
}
return status;
}
private void parseFields(JsonNode node, String fieldName, Map<String, Object> fields) {
if ((node.isTextual())) {
- System.out.println("textual fieldname = " + fieldName);
fields.put(fieldName, node.getTextValue());
} else if ((node.isInt())) {
- System.out.println("int fieldname = " + fieldName);
fields.put(fieldName, (long)node.getIntValue());
} else if (node.isDouble()) {
- System.out.println("double fieldname = " + fieldName);
fields.put(fieldName, node.getDoubleValue());
} else if ((node.isLong())) {
- System.out.println("long fieldname = " + fieldName);
fields.put(fieldName, node.getLongValue());
}
}
diff --git a/src/main/java/net/onrc/onos/intent/Intent.java b/src/main/java/net/onrc/onos/intent/Intent.java
index de960f0..3429d1f 100644
--- a/src/main/java/net/onrc/onos/intent/Intent.java
+++ b/src/main/java/net/onrc/onos/intent/Intent.java
@@ -4,7 +4,7 @@
* @author Toshio Koide (t-koide@onlab.us)
*/
public class Intent {
- enum IntentState {
+ public enum IntentState {
CREATED,
INST_REQ,
INST_NACK,
diff --git a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/PerformanceMonitor.java b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/PerformanceMonitor.java
index 9e23555..3721ceb 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/flowmanager/PerformanceMonitor.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/flowmanager/PerformanceMonitor.java
@@ -1,8 +1,5 @@
package net.onrc.onos.ofcontroller.flowmanager;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
@@ -15,7 +12,7 @@
* Class for collecting performance measurements
*/
public class PerformanceMonitor {
- private final static Map<String, Queue<Measurement>> map = new ConcurrentHashMap<>();;
+ private final static ConcurrentHashMap<String, Queue<Measurement>> map = new ConcurrentHashMap<>();;
private final static Logger log = LoggerFactory.getLogger(PerformanceMonitor.class);
private static long overhead;
private static long experimentStart = Long.MAX_VALUE;
@@ -23,10 +20,10 @@
/**
* Start a performance measurement, identified by a tag
- *
+ *
* Note: Only a single measurement can use the same tag at a time.
* ..... not true anymore.
- *
+ *
* @param tag for performance measurement
*/
public static Measurement start(String tag) {
@@ -37,7 +34,11 @@
Queue<Measurement> list = map.get(tag);
if(list == null) {
list = new ConcurrentLinkedQueue<Measurement>();
- map.put(tag, list);
+ Queue<Measurement> existing_list = map.putIfAbsent(tag, list);
+ if (existing_list != null) {
+ // someone concurrently added, using theirs
+ list = existing_list;
+ }
}
Measurement m = new Measurement();
list.add(m);
@@ -45,12 +46,12 @@
overhead += System.nanoTime() - start;
return m;
}
-
+
/**
- * Stop a performance measurement.
- *
+ * Stop a performance measurement.
+ *
* You must have already started a measurement with tag.
- *
+ *
* @param tag for performance measurement
*/
public static void stop(String tag) {
@@ -65,7 +66,7 @@
}
overhead += System.nanoTime() - time;
}
-
+
/**
* Clear all performance measurements.
*/
@@ -74,7 +75,7 @@
overhead = 0;
experimentStart = Long.MAX_VALUE;
}
-
+
/**
* Write all performance measurements to the log
*/
@@ -104,7 +105,7 @@
if(stop > experimentEnd) {
experimentEnd = stop;
}
- }
+ }
// Collect statistics for average
total += m.elapsed();
count++;
@@ -113,10 +114,10 @@
// Normalize start/stop
start -= experimentStart;
stop -= experimentStart;
- result += key + '=' +
- (avg / normalization) + '/' +
- (start / normalization) + '/' +
- (stop / normalization) + '/' +
+ result += key + '=' +
+ (avg / normalization) + '/' +
+ (start / normalization) + '/' +
+ (stop / normalization) + '/' +
count + '\n';
}
double overheadMs = overhead / normalization;
@@ -152,8 +153,8 @@
public static class Measurement {
long start;
long stop = -1;
-
- /**
+
+ /**
* Start the measurement
*/
public void start() {
@@ -161,7 +162,7 @@
start = System.nanoTime();
}
}
-
+
/**
* Stop the measurement
*/
@@ -169,7 +170,7 @@
long now = System.nanoTime();
stop(now);
}
-
+
/**
* Stop the measurement at a specific time
* @param time to stop
@@ -179,10 +180,10 @@
stop = time;
}
}
-
+
/**
* Compute the elapsed time of the measurement in nanoseconds
- *
+ *
* @return the measurement time in nanoseconds, or -1 if the measurement is stil running.
*/
public long elapsed() {
@@ -193,19 +194,20 @@
return stop - start;
}
}
-
+
/**
* Returns the number of milliseconds for the measurement as a String.
*/
+ @Override
public String toString() {
double milli = elapsed() / normalization;
double startMs = start / normalization;
double stopMs = stop / normalization;
-
+
return milli + "ms/" + startMs + '/' + stopMs;
}
}
-
+
public static void main(String args[]){
// test the measurement overhead
String tag;
diff --git a/src/main/java/net/onrc/onos/ofcontroller/networkgraph/AbstractNetworkGraph.java b/src/main/java/net/onrc/onos/ofcontroller/networkgraph/AbstractNetworkGraph.java
deleted file mode 100644
index b2f9247..0000000
--- a/src/main/java/net/onrc/onos/ofcontroller/networkgraph/AbstractNetworkGraph.java
+++ /dev/null
@@ -1,109 +0,0 @@
-package net.onrc.onos.ofcontroller.networkgraph;
-
-import java.net.InetAddress;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import net.floodlightcontroller.util.MACAddress;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class AbstractNetworkGraph implements NetworkGraph {
- @SuppressWarnings("unused")
- private static final Logger log = LoggerFactory.getLogger(AbstractNetworkGraph.class);
-
- // DPID -> Switch
- protected ConcurrentMap<Long, Switch> switches;
-
- protected ConcurrentMap<InetAddress, Set<Device>> addr2Device;
- protected ConcurrentMap<MACAddress, Device> mac2Device;
-
- public AbstractNetworkGraph() {
- // TODO: Does these object need to be stored in Concurrent Collection?
- switches = new ConcurrentHashMap<>();
- addr2Device = new ConcurrentHashMap<>();
- mac2Device = new ConcurrentHashMap<>();
- }
-
- @Override
- public Switch getSwitch(Long dpid) {
- // TODO Check if it is safe to directly return this Object.
- return switches.get(dpid);
- }
-
- @Override
- public Iterable<Switch> getSwitches() {
- // TODO Check if it is safe to directly return this Object.
- return Collections.unmodifiableCollection(switches.values());
- }
-
- @Override
- public Iterable<Link> getLinks() {
- List<Link> linklist = new LinkedList<>();
-
- for (Switch sw : switches.values()) {
- Iterable<Link> links = sw.getOutgoingLinks();
- for (Link l : links) {
- linklist.add(l);
- }
- }
- return linklist;
- }
-
- @Override
- public Iterable<Link> getOutgoingLinksFromSwitch(Long dpid) {
- Switch sw = getSwitch(dpid);
- if (sw == null) {
- return Collections.emptyList();
- }
- Iterable<Link> links = sw.getOutgoingLinks();
- if (links instanceof Collection) {
- return Collections.unmodifiableCollection((Collection<Link>) links);
- } else {
- List<Link> linklist = new LinkedList<>();
- for (Link l : links) {
- linklist.add(l);
- }
- return linklist;
- }
- }
-
- @Override
- public Iterable<Link> getIncomingLinksFromSwitch(Long dpid) {
- Switch sw = getSwitch(dpid);
- if (sw == null) {
- return Collections.emptyList();
- }
- Iterable<Link> links = sw.getIncomingLinks();
- if (links instanceof Collection) {
- return Collections.unmodifiableCollection((Collection<Link>) links);
- } else {
- List<Link> linklist = new LinkedList<>();
- for (Link l : links) {
- linklist.add(l);
- }
- return linklist;
- }
- }
-
-
- @Override
- public Iterable<Device> getDeviceByIp(InetAddress ipAddress) {
- Set<Device> devices = addr2Device.get(ipAddress);
- if (devices == null) {
- return Collections.emptySet();
- }
- return Collections.unmodifiableCollection(devices);
- }
-
- @Override
- public Device getDeviceByMac(MACAddress address) {
- return mac2Device.get(address);
- }
-}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/networkgraph/DeviceImpl.java b/src/main/java/net/onrc/onos/ofcontroller/networkgraph/DeviceImpl.java
index 808d4a6..4a79edd 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/networkgraph/DeviceImpl.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/networkgraph/DeviceImpl.java
@@ -53,7 +53,7 @@
}
/**
- * Only {@link NetworkGraphImpl} should use this method
+ * Only {@link TopologyManager} should use this method
* @param p
*/
void addAttachmentPoint(Port p) {
@@ -62,7 +62,7 @@
}
/**
- * Only {@link NetworkGraphImpl} should use this method
+ * Only {@link TopologyManager} should use this method
* @param p
*/
boolean removeAttachmentPoint(Port p) {
@@ -70,7 +70,7 @@
}
/**
- * Only {@link NetworkGraphImpl} should use this method
+ * Only {@link TopologyManager} should use this method
* @param p
*/
boolean addIpAddress(InetAddress addr) {
@@ -78,7 +78,7 @@
}
/**
- * Only {@link NetworkGraphImpl} should use this method
+ * Only {@link TopologyManager} should use this method
* @param p
*/
boolean removeIpAddress(InetAddress addr) {
diff --git a/src/main/java/net/onrc/onos/ofcontroller/networkgraph/INetworkGraphListener.java b/src/main/java/net/onrc/onos/ofcontroller/networkgraph/INetworkGraphListener.java
new file mode 100644
index 0000000..88806ef
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/networkgraph/INetworkGraphListener.java
@@ -0,0 +1,22 @@
+package net.onrc.onos.ofcontroller.networkgraph;
+
+/**
+ * Interface which needs to be implemented to receive Topology events from
+ * NetworkGraph
+ *
+ * TODO Should these interface hand over Event object or Object in NetworkGraph.
+ */
+public interface INetworkGraphListener {
+ public void putSwitchEvent(SwitchEvent switchEvent);
+ public void removeSwitchEvent(SwitchEvent switchEvent);
+
+ public void putPortEvent(PortEvent portEvent);
+ public void removePortEvent(PortEvent portEvent);
+
+ public void putLinkEvent(LinkEvent linkEvent);
+ public void removeLinkEvent(LinkEvent linkEvent);
+
+ public void putDeviceEvent(DeviceEvent deviceEvent);
+ public void removeDeviceEvent(DeviceEvent deviceEvent);
+
+}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/networkgraph/INetworkGraphService.java b/src/main/java/net/onrc/onos/ofcontroller/networkgraph/INetworkGraphService.java
index 318c0f7..5d94f65 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/networkgraph/INetworkGraphService.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/networkgraph/INetworkGraphService.java
@@ -11,11 +11,14 @@
* @return
*/
public NetworkGraph getNetworkGraph();
-
+
+ public void registerNetworkGraphListener(INetworkGraphListener listener);
+ public void deregisterNetworkGraphListener(INetworkGraphListener listener);
+
/**
* Allows a module to get a reference to the southbound interface to
* the network graph.
- * TODO Figure out how to hide the southbound interface from
+ * TODO Figure out how to hide the southbound interface from
* applications/modules that shouldn't touch it
* @return
*/
diff --git a/src/main/java/net/onrc/onos/ofcontroller/networkgraph/NetworkGraphDatastore.java b/src/main/java/net/onrc/onos/ofcontroller/networkgraph/NetworkGraphDatastore.java
index d34ac46..d5dd9e0 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/networkgraph/NetworkGraphDatastore.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/networkgraph/NetworkGraphDatastore.java
@@ -37,9 +37,9 @@
private static final int NUM_RETRIES = 10;
- private final NetworkGraphImpl graph;
+ private final TopologyManager graph;
- public NetworkGraphDatastore(NetworkGraphImpl graph) {
+ public NetworkGraphDatastore(TopologyManager graph) {
this.graph = graph;
}
@@ -164,7 +164,7 @@
public void addLink(LinkEvent linkEvent) {
log.debug("Adding link {}", linkEvent);
-
+
RCLink rcLink = new RCLink(linkEvent.getSrc().getDpid(), linkEvent.getSrc().getNumber(),
linkEvent.getDst().getDpid(), linkEvent.getDst().getNumber());
@@ -219,6 +219,7 @@
//rcDstPort.read();
rcLink.read();
} catch (ObjectDoesntExistException e) {
+ // XXX Note: This error might be harmless, if triggered by out-dated remove Link event
log.error("Remove link failed {}", linkEvent, e);
return;
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/networkgraph/NetworkGraphImpl.java b/src/main/java/net/onrc/onos/ofcontroller/networkgraph/NetworkGraphImpl.java
index feb0f57..88cedac 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/networkgraph/NetworkGraphImpl.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/networkgraph/NetworkGraphImpl.java
@@ -1,1076 +1,109 @@
package net.onrc.onos.ofcontroller.networkgraph;
import java.net.InetAddress;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashSet;
+import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
-import net.onrc.onos.datagrid.IDatagridService;
-import net.onrc.onos.datagrid.IEventChannel;
-import net.onrc.onos.datagrid.IEventChannelListener;
-import net.onrc.onos.datastore.topology.RCLink;
-import net.onrc.onos.datastore.topology.RCPort;
-import net.onrc.onos.datastore.topology.RCSwitch;
-import net.onrc.onos.ofcontroller.networkgraph.PortEvent.SwitchPort;
-import net.onrc.onos.ofcontroller.util.EventEntry;
-import net.onrc.onos.ofcontroller.util.Dpid;
+import net.floodlightcontroller.util.MACAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-/**
- * The "NB" read-only Network Map.
- *
- * - Maintain Invariant/Relationships between Topology Objects.
- *
- * TODO To be synchronized based on TopologyEvent Notification.
- *
- * TODO TBD: Caller is expected to maintain parent/child calling order. Parent
- * Object must exist before adding sub component(Add Switch -> Port).
- *
- * TODO TBD: This class may delay the requested change to handle event
- * re-ordering. e.g.) Link Add came in, but Switch was not there.
- *
- */
-public class NetworkGraphImpl extends AbstractNetworkGraph implements
- NetworkGraphDiscoveryInterface, NetworkGraphReplicationInterface {
+public class NetworkGraphImpl implements NetworkGraph {
+ @SuppressWarnings("unused")
+ private static final Logger log = LoggerFactory.getLogger(NetworkGraphImpl.class);
- private static final Logger log = LoggerFactory
- .getLogger(NetworkGraphImpl.class);
+ // DPID -> Switch
+ protected ConcurrentMap<Long, Switch> switches;
- private IEventChannel<byte[], TopologyEvent> eventChannel;
- private static final String EVENT_CHANNEL_NAME = "onos.topology";
- private EventHandler eventHandler = new EventHandler();
+ protected ConcurrentMap<InetAddress, Set<Device>> addr2Device;
+ protected ConcurrentMap<MACAddress, Device> mac2Device;
- private final NetworkGraphDatastore datastore;
-
- public NetworkGraphImpl() {
- super();
- datastore = new NetworkGraphDatastore(this);
- }
-
- /**
- * Event handler class.
- */
- private class EventHandler extends Thread implements
- IEventChannelListener<byte[], TopologyEvent> {
- private BlockingQueue<EventEntry<TopologyEvent>> topologyEvents =
- new LinkedBlockingQueue<EventEntry<TopologyEvent>>();
-
- /**
- * Startup processing.
- */
- private void startup() {
- //
- // TODO: Read all state from the database
- // For now, as a shortcut we read it from the datagrid
- //
- Collection<TopologyEvent> topologyEvents =
- eventChannel.getAllEntries();
- Collection<EventEntry<TopologyEvent>> collection =
- new LinkedList<EventEntry<TopologyEvent>>();
-
- for (TopologyEvent topologyEvent : topologyEvents) {
- EventEntry<TopologyEvent> eventEntry =
- new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_ADD,
- topologyEvent);
- collection.add(eventEntry);
- }
- processEvents(collection);
- }
-
- /**
- * Run the thread.
- */
- public void run() {
- Collection<EventEntry<TopologyEvent>> collection =
- new LinkedList<EventEntry<TopologyEvent>>();
-
- this.setName("NetworkGraphImpl.EventHandler " + this.getId());
- startup();
-
- //
- // The main loop
- //
- try {
- while (true) {
- EventEntry<TopologyEvent> eventEntry = topologyEvents.take();
- collection.add(eventEntry);
- topologyEvents.drainTo(collection);
-
- processEvents(collection);
- collection.clear();
- }
- } catch (Exception exception) {
- log.debug("Exception processing Topology Events: ", exception);
- }
- }
-
- /**
- * Process all topology events.
- *
- * @param events the events to process.
- */
- private void processEvents(Collection<EventEntry<TopologyEvent>> events) {
- for (EventEntry<TopologyEvent> event : events) {
- TopologyEvent topologyEvent = event.eventData();
- switch (event.eventType()) {
- case ENTRY_ADD:
- log.debug("Topology event ENTRY_ADD: {}", topologyEvent);
- if (topologyEvent.switchEvent != null)
- putSwitchReplicationEvent(topologyEvent.switchEvent);
- if (topologyEvent.portEvent != null)
- putPortReplicationEvent(topologyEvent.portEvent);
- if (topologyEvent.linkEvent != null)
- putLinkReplicationEvent(topologyEvent.linkEvent);
- if (topologyEvent.deviceEvent != null)
- putDeviceReplicationEvent(topologyEvent.deviceEvent);
- break;
- case ENTRY_REMOVE:
- log.debug("Topology event ENTRY_REMOVE: {}", topologyEvent);
- if (topologyEvent.switchEvent != null)
- removeSwitchReplicationEvent(topologyEvent.switchEvent);
- if (topologyEvent.portEvent != null)
- removePortReplicationEvent(topologyEvent.portEvent);
- if (topologyEvent.linkEvent != null)
- removeLinkReplicationEvent(topologyEvent.linkEvent);
- if (topologyEvent.deviceEvent != null)
- removeDeviceReplicationEvent(topologyEvent.deviceEvent);
- break;
- }
- }
- }
-
- /**
- * Receive a notification that an entry is added.
- *
- * @param value the value for the entry.
- */
- @Override
- public void entryAdded(TopologyEvent value) {
- EventEntry<TopologyEvent> eventEntry =
- new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_ADD,
- value);
- topologyEvents.add(eventEntry);
- }
-
- /**
- * Receive a notification that an entry is removed.
- *
- * @param value the value for the entry.
- */
- @Override
- public void entryRemoved(TopologyEvent value) {
- EventEntry<TopologyEvent> eventEntry =
- new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_REMOVE,
- value);
- topologyEvents.add(eventEntry);
- }
-
- /**
- * Receive a notification that an entry is updated.
- *
- * @param value the value for the entry.
- */
- @Override
- public void entryUpdated(TopologyEvent value) {
- // NOTE: The ADD and UPDATE events are processed in same way
- entryAdded(value);
- }
- }
-
- /**
- * Startup processing.
- *
- * @param datagridService the datagrid service to use.
- */
- void startup(IDatagridService datagridService) {
- eventChannel = datagridService.addListener(EVENT_CHANNEL_NAME,
- eventHandler,
- byte[].class,
- TopologyEvent.class);
- eventHandler.start();
- }
-
- /**
- * Exception to be thrown when Modification to the Network Graph cannot be continued due to broken invariant.
- *
- * XXX Should this be checked exception or RuntimeException
- */
- public static class BrokenInvariantException extends RuntimeException {
- private static final long serialVersionUID = 1L;
-
- public BrokenInvariantException() {
- super();
- }
-
- public BrokenInvariantException(String message) {
- super(message);
- }
- }
-
- /* ******************************
- * NetworkGraphDiscoveryInterface methods
- * ******************************/
-
- @Override
- public void putSwitchEvent(SwitchEvent switchEvent) {
- if (prepareForAddSwitchEvent(switchEvent)) {
- datastore.addSwitch(switchEvent);
- putSwitch(switchEvent);
- // Send out notification
- TopologyEvent topologyEvent =
- new TopologyEvent(switchEvent);
- eventChannel.addEntry(topologyEvent.getID(),
- topologyEvent);
- }
- // TODO handle invariant violation
+ public NetworkGraphImpl() {
+ // TODO: Does these object need to be stored in Concurrent Collection?
+ switches = new ConcurrentHashMap<>();
+ addr2Device = new ConcurrentHashMap<>();
+ mac2Device = new ConcurrentHashMap<>();
}
@Override
- public void removeSwitchEvent(SwitchEvent switchEvent) {
- if (prepareForRemoveSwitchEvent(switchEvent)) {
- datastore.deactivateSwitch(switchEvent);
- removeSwitch(switchEvent);
- // Send out notification
- eventChannel.removeEntry(switchEvent.getID());
- }
- // TODO handle invariant violation
+ public Switch getSwitch(Long dpid) {
+ // TODO Check if it is safe to directly return this Object.
+ return switches.get(dpid);
}
@Override
- public void putPortEvent(PortEvent portEvent) {
- if (prepareForAddPortEvent(portEvent)) {
- datastore.addPort(portEvent);
- putPort(portEvent);
- // Send out notification
- TopologyEvent topologyEvent =
- new TopologyEvent(portEvent);
- eventChannel.addEntry(topologyEvent.getID(),
- topologyEvent);
- }
- // TODO handle invariant violation
+ public Iterable<Switch> getSwitches() {
+ // TODO Check if it is safe to directly return this Object.
+ return Collections.unmodifiableCollection(switches.values());
}
@Override
- public void removePortEvent(PortEvent portEvent) {
- if (prepareForRemovePortEvent(portEvent)) {
- datastore.deactivatePort(portEvent);
- removePort(portEvent);
- // Send out notification
- eventChannel.removeEntry(portEvent.getID());
- }
- // TODO handle invariant violation
- }
+ public Iterable<Link> getLinks() {
+ List<Link> linklist = new LinkedList<>();
- @Override
- public void putLinkEvent(LinkEvent linkEvent) {
- if (prepareForAddLinkEvent(linkEvent)) {
- datastore.addLink(linkEvent);
- putLink(linkEvent);
- // Send out notification
- TopologyEvent topologyEvent =
- new TopologyEvent(linkEvent);
- eventChannel.addEntry(topologyEvent.getID(),
- topologyEvent);
- }
- // TODO handle invariant violation
- }
-
- @Override
- public void removeLinkEvent(LinkEvent linkEvent) {
- if (prepareForRemoveLinkEvent(linkEvent)) {
- datastore.removeLink(linkEvent);
- removeLink(linkEvent);
- // Send out notification
- eventChannel.removeEntry(linkEvent.getID());
- }
- // TODO handle invariant violation
- }
-
- @Override
- public void putDeviceEvent(DeviceEvent deviceEvent) {
- if (prepareForAddDeviceEvent(deviceEvent)) {
-// datastore.addDevice(deviceEvent);
- // Send out notification
- TopologyEvent topologyEvent =
- new TopologyEvent(deviceEvent);
- eventChannel.addEntry(topologyEvent.getID(),
- topologyEvent);
- }
- // TODO handle invariant violation
- // XXX if prepareFor~ method returned false, event should be dropped
- }
-
- @Override
- public void removeDeviceEvent(DeviceEvent deviceEvent) {
- if (prepareForRemoveDeviceEvent(deviceEvent)) {
-// datastore.removeDevice(deviceEvent);
- // Send out notification
- eventChannel.removeEntry(deviceEvent.getID());
- }
- // TODO handle invariant violation
- // XXX if prepareFor~ method returned false, event should be dropped
- }
-
- /* *****************
- * Internal methods to maintain invariants of the network graph
- * *****************/
-
- /**
- *
- * @param swEvt
- * @return true if ready to accept event.
- */
- private boolean prepareForAddSwitchEvent(SwitchEvent swEvt) {
- // No show stopping precondition
- // Prep: remove(deactivate) Ports on Switch, which is not on event
- removePortsNotOnEvent(swEvt);
- return true;
- }
-
- private boolean prepareForRemoveSwitchEvent(SwitchEvent swEvt) {
- // No show stopping precondition
- // Prep: remove(deactivate) Ports on Switch, which is not on event
- // XXX may be remove switch should imply wipe all ports
- removePortsNotOnEvent(swEvt);
- return true;
- }
-
- private void removePortsNotOnEvent(SwitchEvent swEvt) {
- Switch sw = switches.get( swEvt.getDpid() );
- if ( sw != null ) {
- Set<Long> port_noOnEvent = new HashSet<>();
- for( PortEvent portEvent : swEvt.getPorts()) {
- port_noOnEvent.add(portEvent.getNumber());
- }
- // Existing ports not on event should be removed.
- // TODO Should batch eventually for performance?
- List<Port> portsToRemove = new ArrayList<Port>();
- for( Port p : sw.getPorts() ) {
- if ( !port_noOnEvent.contains(p.getNumber()) ) {
- //PortEvent rmEvent = new PortEvent(p.getSwitch().getDpid(), p.getNumber());
- // calling Discovery removePort() API to wipe from DB, etc.
- //removePortEvent(rmEvent);
-
- // We can't remove ports here because this will trigger a remove
- // from the switch's port list, which we are currently iterating
- // over.
- portsToRemove.add(p);
- }
- }
- for (Port p : portsToRemove) {
- PortEvent rmEvent = new PortEvent(p.getSwitch().getDpid(), p.getNumber());
- // calling Discovery removePort() API to wipe from DB, etc.
- removePortEvent(rmEvent);
- }
- }
- }
-
- private boolean prepareForAddPortEvent(PortEvent portEvt) {
- // Parent Switch must exist
- if ( getSwitch(portEvt.getDpid()) == null) {
- return false;
- }
- // Prep: None
- return true;
- }
-
- private boolean prepareForRemovePortEvent(PortEvent portEvt) {
- // Parent Switch must exist
- Switch sw = getSwitch(portEvt.getDpid());
- if ( sw == null ) {
- log.debug("Switch already removed? {}", portEvt);
- return false;
- }
- Port port = sw.getPort(portEvt.getNumber());
- if ( port == null ) {
- log.debug("Port already removed? {}", portEvt);
- // let it pass
- return true;
- }
-
- // Prep: Remove Link and Device Attachment
- for (Device device : port.getDevices()) {
- log.debug("Removing Device {} on Port {}", device, portEvt);
- DeviceEvent devEvt = new DeviceEvent(device.getMacAddress());
- devEvt.addAttachmentPoint(new SwitchPort(port.getSwitch().getDpid(), port.getNumber()));
- // calling Discovery API to wipe from DB, etc.
- removeDeviceEvent(devEvt);
- }
- Set<Link> links = new HashSet<>();
- links.add(port.getOutgoingLink());
- links.add(port.getIncomingLink());
- for ( Link link : links) {
- if (link == null ) {
- continue;
- }
- log.debug("Removing Link {} on Port {}", link, portEvt);
- LinkEvent linkEvent = new LinkEvent(link.getSourceSwitchDpid(), link.getSourcePortNumber(), link.getDestinationSwitchDpid(), link.getDestinationPortNumber());
- // calling Discovery API to wipe from DB, etc.
- removeLinkEvent(linkEvent);
- }
- return true;
- }
-
- private boolean prepareForAddLinkEvent(LinkEvent linkEvt) {
- // Src/Dst Switch must exist
- Switch srcSw = getSwitch(linkEvt.getSrc().dpid);
- Switch dstSw = getSwitch(linkEvt.getDst().dpid);
- if ( srcSw == null || dstSw == null ) {
- return false;
- }
- // Src/Dst Port must exist
- Port srcPort = srcSw.getPort(linkEvt.getSrc().number);
- Port dstPort = dstSw.getPort(linkEvt.getDst().number);
- if ( srcPort == null || dstPort == null ) {
- return false;
- }
-
- // Prep: remove Device attachment on both Ports
- for (Device device : srcPort.getDevices()) {
- DeviceEvent devEvt = new DeviceEvent(device.getMacAddress());
- devEvt.addAttachmentPoint(new SwitchPort(srcPort.getSwitch().getDpid(), srcPort.getNumber()));
- // calling Discovery API to wipe from DB, etc.
- removeDeviceEvent(devEvt);
- }
- for (Device device : dstPort.getDevices()) {
- DeviceEvent devEvt = new DeviceEvent(device.getMacAddress());
- devEvt.addAttachmentPoint(new SwitchPort(dstPort.getSwitch().getDpid(), dstPort.getNumber()));
- // calling Discovery API to wipe from DB, etc.
- removeDeviceEvent(devEvt);
- }
-
- return true;
- }
-
- private boolean prepareForRemoveLinkEvent(LinkEvent linkEvt) {
- // Src/Dst Switch must exist
- Switch srcSw = getSwitch(linkEvt.getSrc().dpid);
- Switch dstSw = getSwitch(linkEvt.getDst().dpid);
- if ( srcSw == null || dstSw == null ) {
- log.warn("Rejecting removeLink {} because switch doesn't exist", linkEvt);
- return false;
- }
- // Src/Dst Port must exist
- Port srcPort = srcSw.getPort(linkEvt.getSrc().number);
- Port dstPort = dstSw.getPort(linkEvt.getDst().number);
- if ( srcPort == null || dstPort == null ) {
- log.warn("Rejecting removeLink {} because port doesn't exist", linkEvt);
- return false;
- }
-
- Link link = srcPort.getOutgoingLink();
-
- // Link is already gone, or different Link exist in memory
- // XXX Check if we should reject or just accept these cases.
- // it should be harmless to remove the Link on event from DB anyways
- if (link == null ||
- !link.getDestinationPortNumber().equals(linkEvt.getDst().number)
- || !link.getDestinationSwitchDpid().equals(linkEvt.getDst().dpid)) {
- log.warn("Rejecting removeLink {} because link doesn't exist", linkEvt);
- return false;
- }
- // Prep: None
- return true;
- }
-
- /**
- *
- * @param deviceEvt Event will be modified to remove inapplicable attachemntPoints/ipAddress
- * @return false if this event should be dropped.
- */
- private boolean prepareForAddDeviceEvent(DeviceEvent deviceEvt) {
- boolean preconditionBroken = false;
- ArrayList<PortEvent.SwitchPort> failedSwitchPort = new ArrayList<>();
- for ( PortEvent.SwitchPort swp : deviceEvt.getAttachmentPoints() ) {
- // Attached Ports' Parent Switch must exist
- Switch sw = getSwitch(swp.dpid);
- if ( sw == null ) {
- preconditionBroken = true;
- failedSwitchPort.add(swp);
- continue;
- }
- // Attached Ports must exist
- Port port = sw.getPort(swp.number);
- if ( port == null ) {
- preconditionBroken = true;
- failedSwitchPort.add(swp);
- continue;
- }
- // Attached Ports must not have Link
- if ( port.getOutgoingLink() != null || port.getIncomingLink() != null ) {
- preconditionBroken = true;
- failedSwitchPort.add(swp);
- continue;
- }
- }
-
- // Rewriting event to exclude failed attachmentPoint
- // XXX Assumption behind this is that inapplicable device event should
- // be dropped, not deferred. If we decide to defer Device event,
- // rewriting can become a problem
- List<SwitchPort> attachmentPoints = deviceEvt.getAttachmentPoints();
- attachmentPoints.removeAll(failedSwitchPort);
- deviceEvt.setAttachmentPoints(attachmentPoints);
-
- if ( deviceEvt.getAttachmentPoints().isEmpty() && deviceEvt.getIpAddresses().isEmpty() ) {
- // return false to represent: Nothing left to do for this event. Caller should drop event
- return false;
- }
-
- // Should we return false to tell caller that the event was trimmed?
- // if ( preconditionBroken ) {
- // return false;
- // }
-
- return true;
- }
-
- private boolean prepareForRemoveDeviceEvent(DeviceEvent deviceEvt) {
- // No show stopping precondition?
- // Prep: none
- return true;
- }
-
- /* ******************************
- * NetworkGraphReplicationInterface methods
- * ******************************/
-
- @Override
- public void putSwitchReplicationEvent(SwitchEvent switchEvent) {
- if (prepareForAddSwitchEvent(switchEvent)) {
- putSwitch(switchEvent);
- }
- // TODO handle invariant violation
- }
-
- @Override
- public void removeSwitchReplicationEvent(SwitchEvent switchEvent) {
- if (prepareForRemoveSwitchEvent(switchEvent)) {
- removeSwitch(switchEvent);
- }
- // TODO handle invariant violation
- }
-
- @Override
- public void putPortReplicationEvent(PortEvent portEvent) {
- if (prepareForAddPortEvent(portEvent)) {
- putPort(portEvent);
- }
- // TODO handle invariant violation
- }
-
- @Override
- public void removePortReplicationEvent(PortEvent portEvent) {
- if (prepareForRemovePortEvent(portEvent)) {
- removePort(portEvent);
- }
- // TODO handle invariant violation
- }
-
- @Override
- public void putLinkReplicationEvent(LinkEvent linkEvent) {
- if (prepareForAddLinkEvent(linkEvent)) {
- putLink(linkEvent);
- }
- // TODO handle invariant violation
- }
-
- @Override
- public void removeLinkReplicationEvent(LinkEvent linkEvent) {
- if (prepareForRemoveLinkEvent(linkEvent)) {
- removeLink(linkEvent);
- }
- // TODO handle invariant violation
- }
-
- @Override
- public void putDeviceReplicationEvent(DeviceEvent deviceEvent) {
- if (prepareForAddDeviceEvent(deviceEvent)) {
- putDevice(deviceEvent);
- }
- // TODO handle invariant violation
- }
-
- @Override
- public void removeDeviceReplicationEvent(DeviceEvent deviceEvent) {
- if (prepareForRemoveDeviceEvent(deviceEvent)) {
- removeDevice(deviceEvent);
- }
- // TODO handle invariant violation
- }
-
- /* ************************************************
- * Internal In-memory object mutation methods.
- * ************************************************/
-
- void putSwitch(SwitchEvent swEvt) {
- if (swEvt == null) {
- throw new IllegalArgumentException("Switch cannot be null");
- }
-
- Switch sw = switches.get(swEvt.getDpid());
-
- if (sw == null) {
- sw = new SwitchImpl(this, swEvt.getDpid());
- Switch existing = switches.putIfAbsent(swEvt.getDpid(), sw);
- if (existing != null) {
- log.warn(
- "Concurrent putSwitch not expected. Continuing updating {}",
- existing);
- sw = existing;
- }
- }
-
- // Update when more attributes are added to Event object
- // no attribute to update for now
-
- // TODO handle child Port event properly for performance
- for (PortEvent portEvt : swEvt.getPorts() ) {
- putPort(portEvt);
- }
-
- }
-
- void removeSwitch(SwitchEvent swEvt) {
- if (swEvt == null) {
- throw new IllegalArgumentException("Switch cannot be null");
- }
-
- // TODO handle child Port event properly for performance
- for (PortEvent portEvt : swEvt.getPorts() ) {
- removePort(portEvt);
- }
-
- Switch sw = switches.get(swEvt.getDpid());
-
- if (sw == null) {
- log.warn("Switch {} already removed, ignoring", swEvt);
- return;
- }
-
- // Sanity check
- if (!sw.getPorts().isEmpty()) {
- log.warn(
- "Ports on Switch {} should be removed prior to removing Switch. Removing Switch anyways",
- swEvt);
- // XXX Should we remove Port?
- }
- if (!sw.getDevices().isEmpty()) {
- log.warn(
- "Devices on Switch {} should be removed prior to removing Switch. Removing Switch anyways",
- swEvt);
- // XXX Should we remove Device to Switch relation?
- }
- if (!sw.getIncomingLinks().iterator().hasNext()) {
- log.warn(
- "IncomingLinks on Switch {} should be removed prior to removing Switch. Removing Switch anyways",
- swEvt);
- // XXX Should we remove Link?
- }
- if (!sw.getOutgoingLinks().iterator().hasNext()) {
- log.warn(
- "OutgoingLinks on Switch {} should be removed prior to removing Switch. Removing Switch anyways",
- swEvt);
- // XXX Should we remove Link?
- }
-
- boolean removed = switches.remove(swEvt.getDpid(), sw);
- if (removed) {
- log.warn(
- "Switch instance was replaced concurrently while removing {}. Something is not right.",
- sw);
- }
- }
-
- void putPort(PortEvent portEvt) {
- if (portEvt == null) {
- throw new IllegalArgumentException("Port cannot be null");
- }
- Switch sw = switches.get(portEvt.getDpid());
- if (sw == null) {
- throw new BrokenInvariantException(String.format(
- "Switch with dpid %s did not exist.",
- new Dpid(portEvt.getDpid())));
- }
- Port p = sw.getPort(portEvt.getNumber());
- PortImpl port = null;
- if (p != null) {
- port = getPortImpl(p);
- }
-
- if (port == null) {
- port = new PortImpl(this, sw, portEvt.getNumber());
- }
-
- // TODO update attributes
-
- SwitchImpl s = getSwitchImpl(sw);
- s.addPort(port);
- }
-
- void removePort(PortEvent portEvt) {
- if (portEvt == null) {
- throw new IllegalArgumentException("Port cannot be null");
- }
-
- Switch sw = switches.get(portEvt.getDpid());
- if (sw == null) {
- log.warn("Parent Switch for Port {} already removed, ignoring", portEvt);
- return;
- }
-
- Port p = sw.getPort(portEvt.getNumber());
- if (p == null) {
- log.warn("Port {} already removed, ignoring", portEvt);
- return;
- }
-
- // check if there is something referring to this Port
-
- if (!p.getDevices().iterator().hasNext()) {
- log.warn(
- "Devices on Port {} should be removed prior to removing Port. Removing Port anyways",
- portEvt);
- // XXX Should we remove Device to Port relation?
- }
- if (p.getIncomingLink() != null) {
- log.warn(
- "IncomingLinks on Port {} should be removed prior to removing Port. Removing Port anyways",
- portEvt);
- // XXX Should we remove Link?
- }
- if (p.getOutgoingLink() != null) {
- log.warn(
- "OutgoingLinks on Port {} should be removed prior to removing Port. Removing Port anyways",
- portEvt);
- // XXX Should we remove Link?
- }
-
- // remove Port from Switch
- SwitchImpl s = getSwitchImpl(sw);
- s.removePort(p);
- }
-
- void putLink(LinkEvent linkEvt) {
- if (linkEvt == null) {
- throw new IllegalArgumentException("Link cannot be null");
- }
-
- Switch srcSw = switches.get(linkEvt.getSrc().dpid);
- if (srcSw == null) {
- throw new BrokenInvariantException(
- String.format(
- "Switch with dpid %s did not exist.",
- new Dpid(linkEvt.getSrc().dpid)));
- }
-
- Switch dstSw = switches.get(linkEvt.getDst().dpid);
- if (dstSw == null) {
- throw new BrokenInvariantException(
- String.format(
- "Switch with dpid %s did not exist.",
- new Dpid(linkEvt.getDst().dpid)));
- }
-
- Port srcPort = srcSw.getPort(linkEvt.getSrc().number);
- if (srcPort == null) {
- throw new BrokenInvariantException(
- String.format(
- "Src Port %s of a Link did not exist.",
- linkEvt.getSrc() ));
- }
-
- Port dstPort = dstSw.getPort(linkEvt.getDst().number);
- if (dstPort == null) {
- throw new BrokenInvariantException(
- String.format(
- "Dst Port %s of a Link did not exist.",
- linkEvt.getDst() ));
- }
-
- // getting Link instance from destination port incoming Link
- Link l = dstPort.getIncomingLink();
- LinkImpl link = null;
- assert( l == srcPort.getOutgoingLink() );
- if (l != null) {
- link = getLinkImpl(l);
- }
-
- if (link == null) {
- link = new LinkImpl(this, srcPort, dstPort);
- }
-
-
- PortImpl dstPortMem = getPortImpl(dstPort);
- PortImpl srcPortMem = getPortImpl(srcPort);
-
- // Add Link first to avoid further Device addition
-
- // add Link to Port
- dstPortMem.setIncomingLink(link);
- srcPortMem.setOutgoingLink(link);
-
- // remove Device Pointing to Port if any
- for(Device d : dstPortMem.getDevices() ) {
- log.error("Device {} on Port {} should have been removed prior to adding Link {}", d, dstPort, linkEvt);
- DeviceImpl dev = getDeviceImpl(d);
- dev.removeAttachmentPoint(dstPort);
- // This implies that change is made to Device Object.
- // sending Device attachment point removed event
- DeviceEvent rmEvent = new DeviceEvent(d.getMacAddress());
- rmEvent.addAttachmentPoint(new SwitchPort(dstPort.getDpid(), dstPort.getNumber()));
- removeDeviceEvent(rmEvent);
- }
- dstPortMem.removeAllDevice();
- for(Device d : srcPortMem.getDevices() ) {
- log.error("Device {} on Port {} should have been removed prior to adding Link {}", d, srcPort, linkEvt);
- DeviceImpl dev = getDeviceImpl(d);
- dev.removeAttachmentPoint(srcPort);
- // This implies that change is made to Device Object.
- // sending Device attachment point removed event
- DeviceEvent rmEvent = new DeviceEvent(d.getMacAddress());
- rmEvent.addAttachmentPoint(new SwitchPort(dstPort.getDpid(), dstPort.getNumber()));
- removeDeviceEvent(rmEvent);
- }
- srcPortMem.removeAllDevice();
-
- }
-
- void removeLink(LinkEvent linkEvt) {
- if (linkEvt == null) {
- throw new IllegalArgumentException("Link cannot be null");
- }
-
- Switch srcSw = switches.get(linkEvt.getSrc().dpid);
- if (srcSw == null) {
- log.warn("Src Switch for Link {} already removed, ignoring", linkEvt);
- return;
- }
-
- Switch dstSw = switches.get(linkEvt.getDst().dpid);
- if (dstSw == null) {
- log.warn("Dst Switch for Link {} already removed, ignoring", linkEvt);
- return;
- }
-
- Port srcPort = srcSw.getPort(linkEvt.getSrc().number);
- if (srcPort == null) {
- log.warn("Src Port for Link {} already removed, ignoring", linkEvt);
- return;
- }
-
- Port dstPort = dstSw.getPort(linkEvt.getDst().number);
- if (dstPort == null) {
- log.warn("Dst Port for Link {} already removed, ignoring", linkEvt);
- return;
- }
-
- Link l = dstPort.getIncomingLink();
- if ( l == null ) {
- log.warn("Link {} already removed on destination Port", linkEvt);
- }
- l = srcPort.getOutgoingLink();
- if ( l == null ) {
- log.warn("Link {} already removed on src Port", linkEvt);
- }
-
- getPortImpl(dstPort).setIncomingLink(null);
- getPortImpl(srcPort).setOutgoingLink(null);
- }
-
- // XXX Need to rework Device related
- void putDevice(DeviceEvent deviceEvt) {
- if (deviceEvt == null) {
- throw new IllegalArgumentException("Device cannot be null");
- }
-
- Device device = getDeviceByMac(deviceEvt.getMac());
- if ( device == null ) {
- device = new DeviceImpl(this, deviceEvt.getMac());
- Device existing = mac2Device.putIfAbsent(deviceEvt.getMac(), device);
- if (existing != null) {
- log.warn(
- "Concurrent putDevice seems to be in action. Continuing updating {}",
- existing);
- device = existing;
- }
- }
- DeviceImpl memDevice = getDeviceImpl(device);
-
- // for each attachment point
- for (SwitchPort swp : deviceEvt.getAttachmentPoints() ) {
- // Attached Ports' Parent Switch must exist
- Switch sw = getSwitch(swp.dpid);
- if ( sw == null ) {
- log.warn("Switch for the attachment point {} did not exist. skipping mutation", swp);
- continue;
- }
- // Attached Ports must exist
- Port port = sw.getPort(swp.number);
- if ( port == null ) {
- log.warn("Port for the attachment point {} did not exist. skipping mutation", swp);
- continue;
- }
- // Attached Ports must not have Link
- if ( port.getOutgoingLink() != null || port.getIncomingLink() != null ) {
- log.warn("Link (Out:{},In:{}) exist on the attachment point, skipping mutation.", port.getOutgoingLink(), port.getIncomingLink());
- continue;
- }
-
- // finally add Device <-> Port on In-memory structure
- PortImpl memPort = getPortImpl(port);
- memPort.addDevice(device);
- memDevice.addAttachmentPoint(port);
- }
-
- // for each IP address
- for( InetAddress ipAddr : deviceEvt.getIpAddresses() ) {
- // Add Device -> IP
- memDevice.addIpAddress(ipAddr);
-
- // Add IP -> Set<Device>
- boolean updated = false;
- do {
- Set<Device> devices = this.addr2Device.get(ipAddr);
- if ( devices == null ) {
- devices = new HashSet<>();
- Set<Device> existing = this.addr2Device.putIfAbsent(ipAddr, devices);
- if ( existing == null ) {
- // success
- updated = true;
+ for (Switch sw : switches.values()) {
+ Iterable<Link> links = sw.getOutgoingLinks();
+ for (Link l : links) {
+ linklist.add(l);
}
- } else {
- Set<Device> updateDevices = new HashSet<>(devices);
- updateDevices.add(device);
- updated = this.addr2Device.replace(ipAddr, devices, updateDevices);
- }
- if (!updated) {
- log.debug("Collision detected, updating IP to Device mapping retrying.");
- }
- } while( !updated );
- }
- }
-
- void removeDevice(DeviceEvent deviceEvt) {
- if (deviceEvt == null) {
- throw new IllegalArgumentException("Device cannot be null");
- }
-
- Device device = getDeviceByMac(deviceEvt.getMac());
- if ( device == null ) {
- log.warn("Device {} already removed, ignoring", deviceEvt);
- return;
- }
- DeviceImpl memDevice = getDeviceImpl(device);
-
- // for each attachment point
- for (SwitchPort swp : deviceEvt.getAttachmentPoints() ) {
- // Attached Ports' Parent Switch must exist
- Switch sw = getSwitch(swp.dpid);
- if ( sw == null ) {
- log.warn("Switch for the attachment point {} did not exist. skipping attachment point mutation", swp);
- continue;
}
- // Attached Ports must exist
- Port port = sw.getPort(swp.number);
- if ( port == null ) {
- log.warn("Port for the attachment point {} did not exist. skipping attachment point mutation", swp);
- continue;
+ return linklist;
+ }
+
+ @Override
+ public Iterable<Link> getOutgoingLinksFromSwitch(Long dpid) {
+ Switch sw = getSwitch(dpid);
+ if (sw == null) {
+ return Collections.emptyList();
}
-
- // finally remove Device <-> Port on In-memory structure
- PortImpl memPort = getPortImpl(port);
- memPort.removeDevice(device);
- memDevice.removeAttachmentPoint(port);
- }
-
- // for each IP address
- for( InetAddress ipAddr : deviceEvt.getIpAddresses() ) {
- // Remove Device -> IP
- memDevice.removeIpAddress(ipAddr);
-
- // Remove IP -> Set<Device>
- boolean updated = false;
- do {
- Set<Device> devices = this.addr2Device.get(ipAddr);
- if ( devices == null ) {
- // already empty set, nothing to do
- updated = true;
- } else {
- Set<Device> updateDevices = new HashSet<>(devices);
- updateDevices.remove(device);
- updated = this.addr2Device.replace(ipAddr, devices, updateDevices);
- }
- if (!updated) {
- log.debug("Collision detected, updating IP to Device mapping retrying.");
- }
- } while( !updated );
- }
- }
-
- private SwitchImpl getSwitchImpl(Switch sw) {
- if (sw instanceof SwitchImpl) {
- return (SwitchImpl) sw;
- }
- throw new ClassCastException("SwitchImpl expected, but found: " + sw);
- }
-
- private PortImpl getPortImpl(Port p) {
- if (p instanceof PortImpl) {
- return (PortImpl) p;
- }
- throw new ClassCastException("PortImpl expected, but found: " + p);
- }
-
- private LinkImpl getLinkImpl(Link l) {
- if (l instanceof LinkImpl) {
- return (LinkImpl) l;
- }
- throw new ClassCastException("LinkImpl expected, but found: " + l);
- }
-
- private DeviceImpl getDeviceImpl(Device d) {
- if (d instanceof DeviceImpl) {
- return (DeviceImpl) d;
- }
- throw new ClassCastException("DeviceImpl expected, but found: " + d);
- }
-
- @Deprecated
- public void loadWholeTopologyFromDB() {
- // XXX clear everything first?
-
- for (RCSwitch sw : RCSwitch.getAllSwitches()) {
- if ( sw.getStatus() != RCSwitch.STATUS.ACTIVE ) {
- continue;
+ Iterable<Link> links = sw.getOutgoingLinks();
+ if (links instanceof Collection) {
+ return Collections.unmodifiableCollection((Collection<Link>) links);
+ } else {
+ List<Link> linklist = new LinkedList<>();
+ for (Link l : links) {
+ linklist.add(l);
+ }
+ return linklist;
}
- putSwitchReplicationEvent(new SwitchEvent(sw.getDpid()));
- }
+ }
- for (RCPort p : RCPort.getAllPorts()) {
- if (p.getStatus() != RCPort.STATUS.ACTIVE) {
- continue;
+ @Override
+ public Iterable<Link> getIncomingLinksFromSwitch(Long dpid) {
+ Switch sw = getSwitch(dpid);
+ if (sw == null) {
+ return Collections.emptyList();
}
- putPortReplicationEvent(new PortEvent(p.getDpid(), p.getNumber() ));
- }
+ Iterable<Link> links = sw.getIncomingLinks();
+ if (links instanceof Collection) {
+ return Collections.unmodifiableCollection((Collection<Link>) links);
+ } else {
+ List<Link> linklist = new LinkedList<>();
+ for (Link l : links) {
+ linklist.add(l);
+ }
+ return linklist;
+ }
+ }
- // TODO Is Device going to be in DB? If so, read from DB.
- // for (RCDevice d : RCDevice.getAllDevices()) {
- // DeviceEvent devEvent = new DeviceEvent( MACAddress.valueOf(d.getMac()) );
- // for (byte[] portId : d.getAllPortIds() ) {
- // devEvent.addAttachmentPoint( new SwitchPort( RCPort.getDpidFromKey(portId), RCPort.getNumberFromKey(portId) ));
- // }
- // }
- for (RCLink l : RCLink.getAllLinks()) {
- putLinkReplicationEvent( new LinkEvent(l.getSrc().dpid, l.getSrc().number, l.getDst().dpid, l.getDst().number));
- }
+ @Override
+ public Iterable<Device> getDeviceByIp(InetAddress ipAddress) {
+ Set<Device> devices = addr2Device.get(ipAddress);
+ if (devices == null) {
+ return Collections.emptySet();
+ }
+ return Collections.unmodifiableCollection(devices);
+ }
+
+ @Override
+ public Device getDeviceByMac(MACAddress address) {
+ return mac2Device.get(address);
}
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/networkgraph/NetworkGraphModule.java b/src/main/java/net/onrc/onos/ofcontroller/networkgraph/NetworkGraphModule.java
index 3edf5c2..4b2ef34 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/networkgraph/NetworkGraphModule.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/networkgraph/NetworkGraphModule.java
@@ -5,6 +5,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
import net.floodlightcontroller.core.module.FloodlightModuleContext;
import net.floodlightcontroller.core.module.FloodlightModuleException;
@@ -13,30 +14,34 @@
import net.floodlightcontroller.restserver.IRestApiService;
import net.onrc.onos.datagrid.IDatagridService;
import net.onrc.onos.ofcontroller.networkgraph.web.NetworkGraphWebRoutable;
+import net.onrc.onos.registry.controller.IControllerRegistryService;
public class NetworkGraphModule implements IFloodlightModule, INetworkGraphService {
// This is initialized as a module for now
// private RCNetworkGraphPublisher eventListener;
-
- private NetworkGraphImpl networkGraph;
+
+ private TopologyManager networkGraph;
//private NetworkGraphDatastore southboundNetworkGraph;
private IDatagridService datagridService;
+ private IControllerRegistryService registryService;
+
+ private CopyOnWriteArrayList<INetworkGraphListener> networkGraphListeners;
private IRestApiService restApi;
@Override
public Collection<Class<? extends IFloodlightService>> getModuleServices() {
- List<Class<? extends IFloodlightService>> services =
+ List<Class<? extends IFloodlightService>> services =
new ArrayList<Class<? extends IFloodlightService>>();
services.add(INetworkGraphService.class);
return services;
}
@Override
- public Map<Class<? extends IFloodlightService>, IFloodlightService>
+ public Map<Class<? extends IFloodlightService>, IFloodlightService>
getServiceImpls() {
- Map<Class<? extends IFloodlightService>, IFloodlightService> impls =
+ Map<Class<? extends IFloodlightService>, IFloodlightService> impls =
new HashMap<Class<? extends IFloodlightService>, IFloodlightService>();
impls.put(INetworkGraphService.class, this);
return impls;
@@ -44,20 +49,22 @@
@Override
public Collection<Class<? extends IFloodlightService>> getModuleDependencies() {
- List<Class<? extends IFloodlightService>> dependencies =
+ List<Class<? extends IFloodlightService>> dependencies =
new ArrayList<Class<? extends IFloodlightService>>();
dependencies.add(IDatagridService.class);
dependencies.add(IRestApiService.class);
return dependencies;
}
-
+
@Override
public void init(FloodlightModuleContext context)
throws FloodlightModuleException {
restApi = context.getServiceImpl(IRestApiService.class);
datagridService = context.getServiceImpl(IDatagridService.class);
-
- networkGraph = new NetworkGraphImpl();
+ registryService = context.getServiceImpl(IControllerRegistryService.class);
+
+ networkGraphListeners = new CopyOnWriteArrayList<>();
+ networkGraph = new TopologyManager(registryService, networkGraphListeners);
//southboundNetworkGraph = new NetworkGraphDatastore(networkGraph);
}
@@ -77,4 +84,14 @@
return networkGraph;
}
+ @Override
+ public void registerNetworkGraphListener(INetworkGraphListener listener) {
+ networkGraphListeners.addIfAbsent(listener);
+ }
+
+ @Override
+ public void deregisterNetworkGraphListener(INetworkGraphListener listener) {
+ networkGraphListeners.remove(listener);
+ }
+
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/networkgraph/TopologyEvent.java b/src/main/java/net/onrc/onos/ofcontroller/networkgraph/TopologyEvent.java
index 927d266..2892e0c 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/networkgraph/TopologyEvent.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/networkgraph/TopologyEvent.java
@@ -8,6 +8,8 @@
* in a single transaction.
*/
public class TopologyEvent {
+ public static final String NOBODY = "";
+ String originID = NOBODY;
SwitchEvent switchEvent = null; // Set for Switch event
PortEvent portEvent = null; // Set for Port event
LinkEvent linkEvent = null; // Set for Link event
@@ -24,8 +26,9 @@
*
* @param switchEvent the Switch event to use.
*/
- TopologyEvent(SwitchEvent switchEvent) {
+ TopologyEvent(SwitchEvent switchEvent, String originID) {
this.switchEvent = switchEvent;
+ setOriginID(originID);
}
/**
@@ -33,8 +36,9 @@
*
* @param portEvent the Port event to use.
*/
- TopologyEvent(PortEvent portEvent) {
+ TopologyEvent(PortEvent portEvent, String originID) {
this.portEvent = portEvent;
+ setOriginID(originID);
}
/**
@@ -42,8 +46,9 @@
*
* @param linkEvent the Link event to use.
*/
- TopologyEvent(LinkEvent linkEvent) {
+ TopologyEvent(LinkEvent linkEvent, String originID) {
this.linkEvent = linkEvent;
+ setOriginID(originID);
}
/**
@@ -51,8 +56,9 @@
*
* @param deviceEvent the Device event to use.
*/
- TopologyEvent(DeviceEvent deviceEvent) {
+ TopologyEvent(DeviceEvent deviceEvent, String originID) {
this.deviceEvent = deviceEvent;
+ setOriginID(originID);
}
/**
@@ -70,7 +76,7 @@
return linkEvent.toString();
if (deviceEvent != null)
return deviceEvent.toString();
- return null;
+ return "[Empty TopologyEvent]";
}
/**
@@ -89,4 +95,16 @@
return deviceEvent.getID();
return null;
}
+
+ public String getOriginID() {
+ return originID;
+ }
+
+ void setOriginID(String originID) {
+ if (originID != null) {
+ this.originID = originID;
+ } else {
+ this.originID = NOBODY;
+ }
+ }
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/networkgraph/TopologyManager.java b/src/main/java/net/onrc/onos/ofcontroller/networkgraph/TopologyManager.java
new file mode 100644
index 0000000..d0e215f
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/networkgraph/TopologyManager.java
@@ -0,0 +1,1144 @@
+package net.onrc.onos.ofcontroller.networkgraph;
+
+import java.net.InetAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import net.onrc.onos.datagrid.IDatagridService;
+import net.onrc.onos.datagrid.IEventChannel;
+import net.onrc.onos.datagrid.IEventChannelListener;
+import net.onrc.onos.datastore.topology.RCLink;
+import net.onrc.onos.datastore.topology.RCPort;
+import net.onrc.onos.datastore.topology.RCSwitch;
+import net.onrc.onos.ofcontroller.networkgraph.PortEvent.SwitchPort;
+import net.onrc.onos.ofcontroller.util.EventEntry;
+import net.onrc.onos.ofcontroller.util.Dpid;
+import net.onrc.onos.registry.controller.IControllerRegistryService;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The "NB" read-only Network Map.
+ *
+ * - Maintain Invariant/Relationships between Topology Objects.
+ *
+ * TODO To be synchronized based on TopologyEvent Notification.
+ *
+ * TODO TBD: Caller is expected to maintain parent/child calling order. Parent
+ * Object must exist before adding sub component(Add Switch -> Port).
+ *
+ * TODO TBD: This class may delay the requested change to handle event
+ * re-ordering. e.g.) Link Add came in, but Switch was not there.
+ *
+ */
+public class TopologyManager extends NetworkGraphImpl implements
+ NetworkGraphDiscoveryInterface, NetworkGraphReplicationInterface {
+
+ private static final Logger log = LoggerFactory
+ .getLogger(TopologyManager.class);
+
+ private IEventChannel<byte[], TopologyEvent> eventChannel;
+ private static final String EVENT_CHANNEL_NAME = "onos.topology";
+ private EventHandler eventHandler = new EventHandler();
+
+ private final NetworkGraphDatastore datastore;
+ private final IControllerRegistryService registryService;
+ private CopyOnWriteArrayList<INetworkGraphListener> networkGraphListeners;
+
+ public TopologyManager(IControllerRegistryService registryService, CopyOnWriteArrayList<INetworkGraphListener> networkGraphListeners) {
+ super();
+ datastore = new NetworkGraphDatastore(this);
+ this.registryService = registryService;
+ this.networkGraphListeners = networkGraphListeners;
+ }
+
+ /**
+ * Event handler class.
+ */
+ private class EventHandler extends Thread implements
+ IEventChannelListener<byte[], TopologyEvent> {
+ private BlockingQueue<EventEntry<TopologyEvent>> topologyEvents =
+ new LinkedBlockingQueue<EventEntry<TopologyEvent>>();
+
+ /**
+ * Startup processing.
+ */
+ private void startup() {
+ //
+ // TODO: Read all state from the database
+ // For now, as a shortcut we read it from the datagrid
+ //
+ Collection<TopologyEvent> topologyEvents =
+ eventChannel.getAllEntries();
+ Collection<EventEntry<TopologyEvent>> collection =
+ new LinkedList<EventEntry<TopologyEvent>>();
+
+ for (TopologyEvent topologyEvent : topologyEvents) {
+ EventEntry<TopologyEvent> eventEntry =
+ new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_ADD,
+ topologyEvent);
+ collection.add(eventEntry);
+ }
+ processEvents(collection);
+ }
+
+ /**
+ * Run the thread.
+ */
+ @Override
+ public void run() {
+ Collection<EventEntry<TopologyEvent>> collection =
+ new LinkedList<EventEntry<TopologyEvent>>();
+
+ this.setName("TopologyManager.EventHandler " + this.getId());
+ startup();
+
+ //
+ // The main loop
+ //
+ try {
+ while (true) {
+ EventEntry<TopologyEvent> eventEntry = topologyEvents.take();
+ collection.add(eventEntry);
+ topologyEvents.drainTo(collection);
+
+ processEvents(collection);
+ collection.clear();
+ }
+ } catch (Exception exception) {
+ log.debug("Exception processing Topology Events: ", exception);
+ }
+ }
+
+ /**
+ * Process all topology events.
+ *
+ * @param events the events to process.
+ */
+ private void processEvents(Collection<EventEntry<TopologyEvent>> events) {
+ for (EventEntry<TopologyEvent> event : events) {
+ if (event.eventData().getOriginID().equals(registryService.getControllerId())) {
+ // ignore event triggered by myself
+ continue;
+ }
+ TopologyEvent topologyEvent = event.eventData();
+ switch (event.eventType()) {
+ case ENTRY_ADD:
+ log.debug("Topology event ENTRY_ADD: {}", topologyEvent);
+ if (topologyEvent.switchEvent != null)
+ putSwitchReplicationEvent(topologyEvent.switchEvent);
+ if (topologyEvent.portEvent != null)
+ putPortReplicationEvent(topologyEvent.portEvent);
+ if (topologyEvent.linkEvent != null)
+ putLinkReplicationEvent(topologyEvent.linkEvent);
+ if (topologyEvent.deviceEvent != null)
+ putDeviceReplicationEvent(topologyEvent.deviceEvent);
+ break;
+ case ENTRY_REMOVE:
+ log.debug("Topology event ENTRY_REMOVE: {}", topologyEvent);
+ if (topologyEvent.switchEvent != null)
+ removeSwitchReplicationEvent(topologyEvent.switchEvent);
+ if (topologyEvent.portEvent != null)
+ removePortReplicationEvent(topologyEvent.portEvent);
+ if (topologyEvent.linkEvent != null)
+ removeLinkReplicationEvent(topologyEvent.linkEvent);
+ if (topologyEvent.deviceEvent != null)
+ removeDeviceReplicationEvent(topologyEvent.deviceEvent);
+ break;
+ }
+ }
+ }
+
+ /**
+ * Receive a notification that an entry is added.
+ *
+ * @param value the value for the entry.
+ */
+ @Override
+ public void entryAdded(TopologyEvent value) {
+ EventEntry<TopologyEvent> eventEntry =
+ new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_ADD,
+ value);
+ topologyEvents.add(eventEntry);
+ }
+
+ /**
+ * Receive a notification that an entry is removed.
+ *
+ * @param value the value for the entry.
+ */
+ @Override
+ public void entryRemoved(TopologyEvent value) {
+ EventEntry<TopologyEvent> eventEntry =
+ new EventEntry<TopologyEvent>(EventEntry.Type.ENTRY_REMOVE,
+ value);
+ topologyEvents.add(eventEntry);
+ }
+
+ /**
+ * Receive a notification that an entry is updated.
+ *
+ * @param value the value for the entry.
+ */
+ @Override
+ public void entryUpdated(TopologyEvent value) {
+ // NOTE: The ADD and UPDATE events are processed in same way
+ entryAdded(value);
+ }
+ }
+
+ /**
+ * Startup processing.
+ *
+ * @param datagridService the datagrid service to use.
+ */
+ void startup(IDatagridService datagridService) {
+ eventChannel = datagridService.addListener(EVENT_CHANNEL_NAME,
+ eventHandler,
+ byte[].class,
+ TopologyEvent.class);
+ eventHandler.start();
+ }
+
+ /**
+ * Exception to be thrown when Modification to the Network Graph cannot be continued due to broken invariant.
+ *
+ * XXX Should this be checked exception or RuntimeException
+ */
+ public static class BrokenInvariantException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public BrokenInvariantException() {
+ super();
+ }
+
+ public BrokenInvariantException(String message) {
+ super(message);
+ }
+ }
+
+ /* ******************************
+ * NetworkGraphDiscoveryInterface methods
+ * ******************************/
+
+ @Override
+ public void putSwitchEvent(SwitchEvent switchEvent) {
+ if (prepareForAddSwitchEvent(switchEvent)) {
+ datastore.addSwitch(switchEvent);
+ putSwitch(switchEvent);
+ // Send out notification
+ TopologyEvent topologyEvent =
+ new TopologyEvent(switchEvent, registryService.getControllerId());
+ eventChannel.addEntry(topologyEvent.getID(),
+ topologyEvent);
+ }
+ // TODO handle invariant violation
+ }
+
+ @Override
+ public void removeSwitchEvent(SwitchEvent switchEvent) {
+ if (prepareForRemoveSwitchEvent(switchEvent)) {
+ datastore.deactivateSwitch(switchEvent);
+ removeSwitch(switchEvent);
+ // Send out notification
+ eventChannel.removeEntry(switchEvent.getID());
+ }
+ // TODO handle invariant violation
+ }
+
+ @Override
+ public void putPortEvent(PortEvent portEvent) {
+ if (prepareForAddPortEvent(portEvent)) {
+ datastore.addPort(portEvent);
+ putPort(portEvent);
+ // Send out notification
+ TopologyEvent topologyEvent =
+ new TopologyEvent(portEvent, registryService.getControllerId());
+ eventChannel.addEntry(topologyEvent.getID(),
+ topologyEvent);
+ }
+ // TODO handle invariant violation
+ }
+
+ @Override
+ public void removePortEvent(PortEvent portEvent) {
+ if (prepareForRemovePortEvent(portEvent)) {
+ datastore.deactivatePort(portEvent);
+ removePort(portEvent);
+ // Send out notification
+ eventChannel.removeEntry(portEvent.getID());
+ }
+ // TODO handle invariant violation
+ }
+
+ @Override
+ public void putLinkEvent(LinkEvent linkEvent) {
+ if (prepareForAddLinkEvent(linkEvent)) {
+ datastore.addLink(linkEvent);
+ putLink(linkEvent);
+ // Send out notification
+ TopologyEvent topologyEvent =
+ new TopologyEvent(linkEvent, registryService.getControllerId());
+ eventChannel.addEntry(topologyEvent.getID(),
+ topologyEvent);
+ }
+ // TODO handle invariant violation
+ }
+
+ @Override
+ public void removeLinkEvent(LinkEvent linkEvent) {
+ removeLinkEvent(linkEvent, false);
+
+ }
+
+ private void removeLinkEvent(LinkEvent linkEvent, boolean dstCheckBeforeDBmodify) {
+ if (prepareForRemoveLinkEvent(linkEvent)) {
+ if (dstCheckBeforeDBmodify) {
+ // write to DB only if it is owner of the dst dpid
+ if (registryService.hasControl(linkEvent.getDst().dpid)) {
+ datastore.removeLink(linkEvent);
+ }
+ } else {
+ datastore.removeLink(linkEvent);
+ }
+ removeLink(linkEvent);
+ // Send out notification
+ eventChannel.removeEntry(linkEvent.getID());
+ }
+ // TODO handle invariant violation
+ }
+
+ @Override
+ public void putDeviceEvent(DeviceEvent deviceEvent) {
+ if (prepareForAddDeviceEvent(deviceEvent)) {
+// datastore.addDevice(deviceEvent);
+// putDevice(deviceEvent);
+ // Send out notification
+ TopologyEvent topologyEvent =
+ new TopologyEvent(deviceEvent, registryService.getControllerId());
+ eventChannel.addEntry(topologyEvent.getID(),
+ topologyEvent);
+ }
+ // TODO handle invariant violation
+ // XXX if prepareFor~ method returned false, event should be dropped
+ }
+
+ @Override
+ public void removeDeviceEvent(DeviceEvent deviceEvent) {
+ if (prepareForRemoveDeviceEvent(deviceEvent)) {
+// datastore.removeDevice(deviceEvent);
+// removeDevice(deviceEvent);
+ // Send out notification
+ eventChannel.removeEntry(deviceEvent.getID());
+ }
+ // TODO handle invariant violation
+ // XXX if prepareFor~ method returned false, event should be dropped
+ }
+
+ /* *****************
+ * Internal methods to maintain invariants of the network graph
+ * *****************/
+
+ /**
+ *
+ * @param swEvt
+ * @return true if ready to accept event.
+ */
+ private boolean prepareForAddSwitchEvent(SwitchEvent swEvt) {
+ // No show stopping precondition
+ // Prep: remove(deactivate) Ports on Switch, which is not on event
+ removePortsNotOnEvent(swEvt);
+ return true;
+ }
+
+ private boolean prepareForRemoveSwitchEvent(SwitchEvent swEvt) {
+ // No show stopping precondition
+ // Prep: remove(deactivate) Ports on Switch, which is not on event
+ // XXX may be remove switch should imply wipe all ports
+ removePortsNotOnEvent(swEvt);
+ return true;
+ }
+
+ private void removePortsNotOnEvent(SwitchEvent swEvt) {
+ Switch sw = switches.get( swEvt.getDpid() );
+ if ( sw != null ) {
+ Set<Long> port_noOnEvent = new HashSet<>();
+ for( PortEvent portEvent : swEvt.getPorts()) {
+ port_noOnEvent.add(portEvent.getNumber());
+ }
+ // Existing ports not on event should be removed.
+ // TODO Should batch eventually for performance?
+ List<Port> portsToRemove = new ArrayList<Port>();
+ for( Port p : sw.getPorts() ) {
+ if ( !port_noOnEvent.contains(p.getNumber()) ) {
+ //PortEvent rmEvent = new PortEvent(p.getSwitch().getDpid(), p.getNumber());
+ // calling Discovery removePort() API to wipe from DB, etc.
+ //removePortEvent(rmEvent);
+
+ // We can't remove ports here because this will trigger a remove
+ // from the switch's port list, which we are currently iterating
+ // over.
+ portsToRemove.add(p);
+ }
+ }
+ for (Port p : portsToRemove) {
+ PortEvent rmEvent = new PortEvent(p.getSwitch().getDpid(), p.getNumber());
+ // calling Discovery removePort() API to wipe from DB, etc.
+ removePortEvent(rmEvent);
+ }
+ }
+ }
+
+ private boolean prepareForAddPortEvent(PortEvent portEvt) {
+ // Parent Switch must exist
+ if ( getSwitch(portEvt.getDpid()) == null) {
+ log.warn("Dropping add port event because switch doesn't exist: {}",
+ portEvt);
+ return false;
+ }
+ // Prep: None
+ return true;
+ }
+
+ private boolean prepareForRemovePortEvent(PortEvent portEvt) {
+ Port port = getPort(portEvt.getDpid(), portEvt.getNumber());
+ if ( port == null ) {
+ log.debug("Port already removed? {}", portEvt);
+ // let it pass
+ return true;
+ }
+
+ // Prep: Remove Link and Device Attachment
+ ArrayList<DeviceEvent> deviceEvts = new ArrayList<>();
+ for (Device device : port.getDevices()) {
+ log.debug("Removing Device {} on Port {}", device, portEvt);
+ DeviceEvent devEvt = new DeviceEvent(device.getMacAddress());
+ devEvt.addAttachmentPoint(new SwitchPort(port.getSwitch().getDpid(), port.getNumber()));
+ deviceEvts.add(devEvt);
+ }
+ for (DeviceEvent devEvt : deviceEvts) {
+ // calling Discovery API to wipe from DB, etc.
+ removeDeviceEvent(devEvt);
+ }
+
+ Set<Link> links = new HashSet<>();
+ links.add(port.getOutgoingLink());
+ links.add(port.getIncomingLink());
+ for ( Link link : links) {
+ if (link == null ) {
+ continue;
+ }
+ log.debug("Removing Link {} on Port {}", link, portEvt);
+ LinkEvent linkEvent = new LinkEvent(link.getSourceSwitchDpid(), link.getSourcePortNumber(), link.getDestinationSwitchDpid(), link.getDestinationPortNumber());
+ // calling Discovery API to wipe from DB, etc.
+
+ // Call internal remove Link, which will check
+ // ownership of DST dpid and modify DB only if it is the owner
+ removeLinkEvent(linkEvent, true);
+ }
+ return true;
+ }
+
+ private boolean prepareForAddLinkEvent(LinkEvent linkEvt) {
+ // Src/Dst Port must exist
+ Port srcPort = getPort(linkEvt.getSrc().dpid, linkEvt.getSrc().number);
+ Port dstPort = getPort(linkEvt.getDst().dpid, linkEvt.getDst().number);
+ if ( srcPort == null || dstPort == null ) {
+ log.warn("Dropping add link event because port doesn't exist: {}",
+ linkEvt);
+ return false;
+ }
+
+ // Prep: remove Device attachment on both Ports
+ ArrayList<DeviceEvent> deviceEvents = new ArrayList<>();
+ for (Device device : srcPort.getDevices()) {
+ DeviceEvent devEvt = new DeviceEvent(device.getMacAddress());
+ devEvt.addAttachmentPoint(new SwitchPort(srcPort.getSwitch().getDpid(), srcPort.getNumber()));
+ deviceEvents.add(devEvt);
+ }
+ for (Device device : dstPort.getDevices()) {
+ DeviceEvent devEvt = new DeviceEvent(device.getMacAddress());
+ devEvt.addAttachmentPoint(new SwitchPort(dstPort.getSwitch().getDpid(), dstPort.getNumber()));
+ deviceEvents.add(devEvt);
+ }
+ for (DeviceEvent devEvt : deviceEvents) {
+ // calling Discovery API to wipe from DB, etc.
+ removeDeviceEvent(devEvt);
+ }
+
+ return true;
+ }
+
+ private boolean prepareForRemoveLinkEvent(LinkEvent linkEvt) {
+ // Src/Dst Port must exist
+ Port srcPort = getPort(linkEvt.getSrc().dpid, linkEvt.getSrc().number);
+ Port dstPort = getPort(linkEvt.getDst().dpid, linkEvt.getDst().number);
+ if ( srcPort == null || dstPort == null ) {
+ log.warn("Dropping remove link event because port doesn't exist {}", linkEvt);
+ return false;
+ }
+
+ Link link = srcPort.getOutgoingLink();
+
+ // Link is already gone, or different Link exist in memory
+ // XXX Check if we should reject or just accept these cases.
+ // it should be harmless to remove the Link on event from DB anyways
+ if (link == null ||
+ !link.getDestinationPortNumber().equals(linkEvt.getDst().number)
+ || !link.getDestinationSwitchDpid().equals(linkEvt.getDst().dpid)) {
+ log.warn("Dropping remove link event because link doesn't exist: {}", linkEvt);
+ return false;
+ }
+ // Prep: None
+ return true;
+ }
+
+ /**
+ *
+ * @param deviceEvt Event will be modified to remove inapplicable attachemntPoints/ipAddress
+ * @return false if this event should be dropped.
+ */
+ private boolean prepareForAddDeviceEvent(DeviceEvent deviceEvt) {
+ boolean preconditionBroken = false;
+ ArrayList<PortEvent.SwitchPort> failedSwitchPort = new ArrayList<>();
+ for ( PortEvent.SwitchPort swp : deviceEvt.getAttachmentPoints() ) {
+ // Attached Ports must exist
+ Port port = getPort(swp.dpid, swp.number);
+ if ( port == null ) {
+ preconditionBroken = true;
+ failedSwitchPort.add(swp);
+ continue;
+ }
+ // Attached Ports must not have Link
+ if ( port.getOutgoingLink() != null || port.getIncomingLink() != null ) {
+ preconditionBroken = true;
+ failedSwitchPort.add(swp);
+ continue;
+ }
+ }
+
+ // Rewriting event to exclude failed attachmentPoint
+ // XXX Assumption behind this is that inapplicable device event should
+ // be dropped, not deferred. If we decide to defer Device event,
+ // rewriting can become a problem
+ List<SwitchPort> attachmentPoints = deviceEvt.getAttachmentPoints();
+ attachmentPoints.removeAll(failedSwitchPort);
+ deviceEvt.setAttachmentPoints(attachmentPoints);
+
+ if ( deviceEvt.getAttachmentPoints().isEmpty() && deviceEvt.getIpAddresses().isEmpty() ) {
+ // return false to represent: Nothing left to do for this event. Caller should drop event
+ return false;
+ }
+
+ // Should we return false to tell caller that the event was trimmed?
+ // if ( preconditionBroken ) {
+ // return false;
+ // }
+
+ return true;
+ }
+
+ private boolean prepareForRemoveDeviceEvent(DeviceEvent deviceEvt) {
+ // No show stopping precondition?
+ // Prep: none
+ return true;
+ }
+
+ /* ******************************
+ * NetworkGraphReplicationInterface methods
+ * ******************************/
+
+ @Override
+ public void putSwitchReplicationEvent(SwitchEvent switchEvent) {
+ if (prepareForAddSwitchEvent(switchEvent)) {
+ putSwitch(switchEvent);
+ }
+ // TODO handle invariant violation
+ // trigger instance local topology event handler
+ dispatchPutSwitchEvent(switchEvent);
+ }
+
+ @Override
+ public void removeSwitchReplicationEvent(SwitchEvent switchEvent) {
+ if (prepareForRemoveSwitchEvent(switchEvent)) {
+ removeSwitch(switchEvent);
+ }
+ // TODO handle invariant violation
+ // trigger instance local topology event handler
+ dispatchRemoveSwitchEvent(switchEvent);
+ }
+
+ @Override
+ public void putPortReplicationEvent(PortEvent portEvent) {
+ if (prepareForAddPortEvent(portEvent)) {
+ putPort(portEvent);
+ }
+ // TODO handle invariant violation
+ // trigger instance local topology event handler
+ dispatchPutPortEvent(portEvent);
+ }
+
+ @Override
+ public void removePortReplicationEvent(PortEvent portEvent) {
+ if (prepareForRemovePortEvent(portEvent)) {
+ removePort(portEvent);
+ }
+ // TODO handle invariant violation
+ // trigger instance local topology event handler
+ dispatchRemovePortEvent(portEvent);
+ }
+
+ @Override
+ public void putLinkReplicationEvent(LinkEvent linkEvent) {
+ if (prepareForAddLinkEvent(linkEvent)) {
+ putLink(linkEvent);
+ }
+ // TODO handle invariant violation
+ // trigger instance local topology event handler
+ dispatchPutLinkEvent(linkEvent);
+ }
+
+ @Override
+ public void removeLinkReplicationEvent(LinkEvent linkEvent) {
+ if (prepareForRemoveLinkEvent(linkEvent)) {
+ removeLink(linkEvent);
+ }
+ // TODO handle invariant violation
+ // trigger instance local topology event handler
+ dispatchRemoveLinkEvent(linkEvent);
+ }
+
+ @Override
+ public void putDeviceReplicationEvent(DeviceEvent deviceEvent) {
+ if (prepareForAddDeviceEvent(deviceEvent)) {
+ putDevice(deviceEvent);
+ }
+ // TODO handle invariant violation
+ // trigger instance local topology event handler
+ dispatchPutDeviceEvent(deviceEvent);
+ }
+
+ @Override
+ public void removeDeviceReplicationEvent(DeviceEvent deviceEvent) {
+ if (prepareForRemoveDeviceEvent(deviceEvent)) {
+ removeDevice(deviceEvent);
+ }
+ // TODO handle invariant violation
+ // trigger instance local topology event handler
+ dispatchRemoveDeviceEvent(deviceEvent);
+ }
+
+ /* ************************************************
+ * Internal In-memory object mutation methods.
+ * ************************************************/
+
+ void putSwitch(SwitchEvent swEvt) {
+ if (swEvt == null) {
+ throw new IllegalArgumentException("Switch cannot be null");
+ }
+
+ Switch sw = switches.get(swEvt.getDpid());
+
+ if (sw == null) {
+ sw = new SwitchImpl(this, swEvt.getDpid());
+ Switch existing = switches.putIfAbsent(swEvt.getDpid(), sw);
+ if (existing != null) {
+ log.warn(
+ "Concurrent putSwitch not expected. Continuing updating {}",
+ existing);
+ sw = existing;
+ }
+ }
+
+ // Update when more attributes are added to Event object
+ // no attribute to update for now
+
+ // TODO handle child Port event properly for performance
+ for (PortEvent portEvt : swEvt.getPorts() ) {
+ putPort(portEvt);
+ }
+
+ }
+
+ void removeSwitch(SwitchEvent swEvt) {
+ if (swEvt == null) {
+ throw new IllegalArgumentException("Switch cannot be null");
+ }
+
+ // TODO handle child Port event properly for performance
+ for (PortEvent portEvt : swEvt.getPorts() ) {
+ removePort(portEvt);
+ }
+
+ Switch sw = switches.get(swEvt.getDpid());
+
+ if (sw == null) {
+ log.warn("Switch {} already removed, ignoring", swEvt);
+ return;
+ }
+
+ // remove all ports if there still exist
+ ArrayList<PortEvent> portsToRemove = new ArrayList<>();
+ for (Port port : sw.getPorts()) {
+ log.warn(
+ "Port {} on Switch {} should be removed prior to removing Switch. Removing Port now",
+ port, swEvt);
+ PortEvent portEvt = new PortEvent(port.getDpid(), port.getNumber());
+ portsToRemove.add(portEvt);
+ }
+ for (PortEvent portEvt : portsToRemove) {
+ // XXX calling removePortEvent() may trigger duplicate event, once at prepare phase, second time here
+ // If event can be squashed, ignored etc. at receiver side it shouldn't be a problem, but if not
+ // need to re-visit this issue.
+
+ // Note: removePortEvent() implies removal of attached Device, etc.
+ // if we decide not to call removePortEvent(), Device needs to be handled properly
+ removePortEvent(portEvt);
+ }
+
+ boolean removed = switches.remove(swEvt.getDpid(), sw);
+ if (removed) {
+ log.warn(
+ "Switch instance was replaced concurrently while removing {}. Something is not right.",
+ sw);
+ }
+ }
+
+ void putPort(PortEvent portEvt) {
+ if (portEvt == null) {
+ throw new IllegalArgumentException("Port cannot be null");
+ }
+ Switch sw = switches.get(portEvt.getDpid());
+ if (sw == null) {
+ throw new BrokenInvariantException(String.format(
+ "Switch with dpid %s did not exist.",
+ new Dpid(portEvt.getDpid())));
+ }
+ Port p = sw.getPort(portEvt.getNumber());
+ PortImpl port = null;
+ if (p != null) {
+ port = getPortImpl(p);
+ }
+
+ if (port == null) {
+ port = new PortImpl(this, sw, portEvt.getNumber());
+ }
+
+ // TODO update attributes
+
+ SwitchImpl s = getSwitchImpl(sw);
+ s.addPort(port);
+ }
+
+ void removePort(PortEvent portEvt) {
+ if (portEvt == null) {
+ throw new IllegalArgumentException("Port cannot be null");
+ }
+
+ Switch sw = switches.get(portEvt.getDpid());
+ if (sw == null) {
+ log.warn("Parent Switch for Port {} already removed, ignoring", portEvt);
+ return;
+ }
+
+ Port p = sw.getPort(portEvt.getNumber());
+ if (p == null) {
+ log.warn("Port {} already removed, ignoring", portEvt);
+ return;
+ }
+
+ // Remove Link and Device Attachment
+ for (Device device : p.getDevices()) {
+ log.debug("Removing Device {} on Port {}", device, portEvt);
+ DeviceEvent devEvt = new DeviceEvent(device.getMacAddress());
+ devEvt.addAttachmentPoint(new SwitchPort(p.getSwitch().getDpid(), p.getNumber()));
+
+ // XXX calling removeDeviceEvent() may trigger duplicate event, once at prepare phase, second time here
+ // If event can be squashed, ignored etc. at receiver side it shouldn't be a problem, but if not
+ // need to re-visit
+
+ // calling Discovery API to wipe from DB, etc.
+ removeDeviceEvent(devEvt);
+ }
+ Set<Link> links = new HashSet<>();
+ links.add(p.getOutgoingLink());
+ links.add(p.getIncomingLink());
+ ArrayList<LinkEvent> linksToRemove = new ArrayList<>();
+ for (Link link : links) {
+ if (link == null) {
+ continue;
+ }
+ log.debug("Removing Link {} on Port {}", link, portEvt);
+ LinkEvent linkEvent = new LinkEvent(link.getSourceSwitchDpid(), link.getSourcePortNumber(), link.getDestinationSwitchDpid(), link.getDestinationPortNumber());
+ linksToRemove.add(linkEvent);
+ }
+ for (LinkEvent linkEvent : linksToRemove) {
+ // XXX calling removeLinkEvent() may trigger duplicate event, once at prepare phase, second time here
+ // If event can be squashed, ignored etc. at receiver side it shouldn't be a problem, but if not
+ // need to re-visit
+
+ // calling Discovery API to wipe from DB, etc.
+ removeLinkEvent(linkEvent);
+ }
+
+ // remove Port from Switch
+ SwitchImpl s = getSwitchImpl(sw);
+ s.removePort(p);
+ }
+
+ void putLink(LinkEvent linkEvt) {
+ if (linkEvt == null) {
+ throw new IllegalArgumentException("Link cannot be null");
+ }
+
+ Port srcPort = getPort(linkEvt.getSrc().dpid, linkEvt.getSrc().number);
+ if (srcPort == null) {
+ throw new BrokenInvariantException(
+ String.format(
+ "Src Port %s of a Link did not exist.",
+ linkEvt.getSrc() ));
+ }
+
+ Port dstPort = getPort(linkEvt.getDst().dpid, linkEvt.getDst().number);
+ if (dstPort == null) {
+ throw new BrokenInvariantException(
+ String.format(
+ "Dst Port %s of a Link did not exist.",
+ linkEvt.getDst() ));
+ }
+
+ // getting Link instance from destination port incoming Link
+ Link l = dstPort.getIncomingLink();
+ LinkImpl link = null;
+ assert( l == srcPort.getOutgoingLink() );
+ if (l != null) {
+ link = getLinkImpl(l);
+ }
+
+ if (link == null) {
+ link = new LinkImpl(this, srcPort, dstPort);
+ }
+
+
+ PortImpl dstPortMem = getPortImpl(dstPort);
+ PortImpl srcPortMem = getPortImpl(srcPort);
+
+ // Add Link first to avoid further Device addition
+
+ // add Link to Port
+ dstPortMem.setIncomingLink(link);
+ srcPortMem.setOutgoingLink(link);
+
+ // remove Device Pointing to Port if any
+ for(Device d : dstPortMem.getDevices() ) {
+ log.error("Device {} on Port {} should have been removed prior to adding Link {}", d, dstPort, linkEvt);
+ DeviceImpl dev = getDeviceImpl(d);
+ dev.removeAttachmentPoint(dstPort);
+ // This implies that change is made to Device Object.
+ // sending Device attachment point removed event
+ DeviceEvent rmEvent = new DeviceEvent(d.getMacAddress());
+ rmEvent.addAttachmentPoint(new SwitchPort(dstPort.getDpid(), dstPort.getNumber()));
+ removeDeviceEvent(rmEvent);
+ }
+ dstPortMem.removeAllDevice();
+ for(Device d : srcPortMem.getDevices() ) {
+ log.error("Device {} on Port {} should have been removed prior to adding Link {}", d, srcPort, linkEvt);
+ DeviceImpl dev = getDeviceImpl(d);
+ dev.removeAttachmentPoint(srcPort);
+ // This implies that change is made to Device Object.
+ // sending Device attachment point removed event
+ DeviceEvent rmEvent = new DeviceEvent(d.getMacAddress());
+ rmEvent.addAttachmentPoint(new SwitchPort(dstPort.getDpid(), dstPort.getNumber()));
+ removeDeviceEvent(rmEvent);
+ }
+ srcPortMem.removeAllDevice();
+
+ }
+
+ void removeLink(LinkEvent linkEvt) {
+ if (linkEvt == null) {
+ throw new IllegalArgumentException("Link cannot be null");
+ }
+
+ Port srcPort = getPort(linkEvt.getSrc().dpid, linkEvt.getSrc().number);
+ if (srcPort == null) {
+ log.warn("Src Port for Link {} already removed, ignoring", linkEvt);
+ return;
+ }
+
+ Port dstPort = getPort(linkEvt.getDst().dpid, linkEvt.getDst().number);
+ if (dstPort == null) {
+ log.warn("Dst Port for Link {} already removed, ignoring", linkEvt);
+ return;
+ }
+
+ Link l = dstPort.getIncomingLink();
+ if ( l == null ) {
+ log.warn("Link {} already removed on destination Port", linkEvt);
+ }
+ l = srcPort.getOutgoingLink();
+ if ( l == null ) {
+ log.warn("Link {} already removed on src Port", linkEvt);
+ }
+
+ getPortImpl(dstPort).setIncomingLink(null);
+ getPortImpl(srcPort).setOutgoingLink(null);
+ }
+
+ // XXX Need to rework Device related
+ void putDevice(DeviceEvent deviceEvt) {
+ if (deviceEvt == null) {
+ throw new IllegalArgumentException("Device cannot be null");
+ }
+
+ Device device = getDeviceByMac(deviceEvt.getMac());
+ if ( device == null ) {
+ device = new DeviceImpl(this, deviceEvt.getMac());
+ Device existing = mac2Device.putIfAbsent(deviceEvt.getMac(), device);
+ if (existing != null) {
+ log.warn(
+ "Concurrent putDevice seems to be in action. Continuing updating {}",
+ existing);
+ device = existing;
+ }
+ }
+ DeviceImpl memDevice = getDeviceImpl(device);
+
+ // for each attachment point
+ for (SwitchPort swp : deviceEvt.getAttachmentPoints() ) {
+ // Attached Ports must exist
+ Port port = getPort(swp.dpid, swp.number);
+ if ( port == null ) {
+ log.warn("Port for the attachment point {} did not exist. skipping mutation", swp);
+ continue;
+ }
+ // Attached Ports must not have Link
+ if ( port.getOutgoingLink() != null || port.getIncomingLink() != null ) {
+ log.warn("Link (Out:{},In:{}) exist on the attachment point, skipping mutation.", port.getOutgoingLink(), port.getIncomingLink());
+ continue;
+ }
+
+ // finally add Device <-> Port on In-memory structure
+ PortImpl memPort = getPortImpl(port);
+ memPort.addDevice(device);
+ memDevice.addAttachmentPoint(port);
+ }
+
+ // for each IP address
+ for( InetAddress ipAddr : deviceEvt.getIpAddresses() ) {
+ // Add Device -> IP
+ memDevice.addIpAddress(ipAddr);
+
+ // Add IP -> Set<Device>
+ boolean updated = false;
+ do {
+ Set<Device> devices = this.addr2Device.get(ipAddr);
+ if ( devices == null ) {
+ devices = new HashSet<>();
+ Set<Device> existing = this.addr2Device.putIfAbsent(ipAddr, devices);
+ if ( existing == null ) {
+ // success
+ updated = true;
+ }
+ } else {
+ Set<Device> updateDevices = new HashSet<>(devices);
+ updateDevices.add(device);
+ updated = this.addr2Device.replace(ipAddr, devices, updateDevices);
+ }
+ if (!updated) {
+ log.debug("Collision detected, updating IP to Device mapping retrying.");
+ }
+ } while( !updated );
+ }
+ }
+
+ void removeDevice(DeviceEvent deviceEvt) {
+ if (deviceEvt == null) {
+ throw new IllegalArgumentException("Device cannot be null");
+ }
+
+ Device device = getDeviceByMac(deviceEvt.getMac());
+ if ( device == null ) {
+ log.warn("Device {} already removed, ignoring", deviceEvt);
+ return;
+ }
+ DeviceImpl memDevice = getDeviceImpl(device);
+
+ // for each attachment point
+ for (SwitchPort swp : deviceEvt.getAttachmentPoints() ) {
+ // Attached Ports must exist
+ Port port = getPort(swp.dpid, swp.number);
+ if ( port == null ) {
+ log.warn("Port for the attachment point {} did not exist. skipping attachment point mutation", swp);
+ continue;
+ }
+
+ // finally remove Device <-> Port on In-memory structure
+ PortImpl memPort = getPortImpl(port);
+ memPort.removeDevice(device);
+ memDevice.removeAttachmentPoint(port);
+ }
+
+ // for each IP address
+ for( InetAddress ipAddr : deviceEvt.getIpAddresses() ) {
+ // Remove Device -> IP
+ memDevice.removeIpAddress(ipAddr);
+
+ // Remove IP -> Set<Device>
+ boolean updated = false;
+ do {
+ Set<Device> devices = this.addr2Device.get(ipAddr);
+ if ( devices == null ) {
+ // already empty set, nothing to do
+ updated = true;
+ } else {
+ Set<Device> updateDevices = new HashSet<>(devices);
+ updateDevices.remove(device);
+ updated = this.addr2Device.replace(ipAddr, devices, updateDevices);
+ }
+ if (!updated) {
+ log.debug("Collision detected, updating IP to Device mapping retrying.");
+ }
+ } while( !updated );
+ }
+ }
+
+ private void dispatchPutSwitchEvent(SwitchEvent switchEvent) {
+ for (INetworkGraphListener listener : this.networkGraphListeners) {
+ // TODO Should copy before handing them over to listener
+ listener.putSwitchEvent(switchEvent);
+ }
+ }
+
+ private void dispatchRemoveSwitchEvent(SwitchEvent switchEvent) {
+ for (INetworkGraphListener listener : this.networkGraphListeners) {
+ // TODO Should copy before handing them over to listener
+ listener.removeSwitchEvent(switchEvent);
+ }
+ }
+
+ private void dispatchPutPortEvent(PortEvent portEvent) {
+ for (INetworkGraphListener listener : this.networkGraphListeners) {
+ // TODO Should copy before handing them over to listener
+ listener.putPortEvent(portEvent);
+ }
+ }
+
+ private void dispatchRemovePortEvent(PortEvent portEvent) {
+ for (INetworkGraphListener listener : this.networkGraphListeners) {
+ // TODO Should copy before handing them over to listener
+ listener.removePortEvent(portEvent);
+ }
+ }
+
+ private void dispatchPutLinkEvent(LinkEvent linkEvent) {
+ for (INetworkGraphListener listener : this.networkGraphListeners) {
+ // TODO Should copy before handing them over to listener
+ listener.putLinkEvent(linkEvent);
+ }
+ }
+
+ private void dispatchRemoveLinkEvent(LinkEvent linkEvent) {
+ for (INetworkGraphListener listener : this.networkGraphListeners) {
+ // TODO Should copy before handing them over to listener
+ listener.removeLinkEvent(linkEvent);
+ }
+ }
+
+ private void dispatchPutDeviceEvent(DeviceEvent deviceEvent) {
+ for (INetworkGraphListener listener : this.networkGraphListeners) {
+ // TODO Should copy before handing them over to listener
+ listener.putDeviceEvent(deviceEvent);;
+ }
+ }
+
+ private void dispatchRemoveDeviceEvent(DeviceEvent deviceEvent) {
+ for (INetworkGraphListener listener : this.networkGraphListeners) {
+ // TODO Should copy before handing them over to listener
+ listener.removeDeviceEvent(deviceEvent);
+ }
+ }
+
+ // we might want to include this in NetworkGraph interface
+ private Port getPort(Long dpid, Long number) {
+ Switch sw = getSwitch(dpid);
+ if (sw != null) {
+ return sw.getPort(number);
+ }
+ return null;
+ }
+
+ private SwitchImpl getSwitchImpl(Switch sw) {
+ if (sw instanceof SwitchImpl) {
+ return (SwitchImpl) sw;
+ }
+ throw new ClassCastException("SwitchImpl expected, but found: " + sw);
+ }
+
+ private PortImpl getPortImpl(Port p) {
+ if (p instanceof PortImpl) {
+ return (PortImpl) p;
+ }
+ throw new ClassCastException("PortImpl expected, but found: " + p);
+ }
+
+ private LinkImpl getLinkImpl(Link l) {
+ if (l instanceof LinkImpl) {
+ return (LinkImpl) l;
+ }
+ throw new ClassCastException("LinkImpl expected, but found: " + l);
+ }
+
+ private DeviceImpl getDeviceImpl(Device d) {
+ if (d instanceof DeviceImpl) {
+ return (DeviceImpl) d;
+ }
+ throw new ClassCastException("DeviceImpl expected, but found: " + d);
+ }
+
+ @Deprecated
+ public void loadWholeTopologyFromDB() {
+ // XXX May need to clear whole topology first, depending on
+ // how we initially subscribe to replication events
+
+ for (RCSwitch sw : RCSwitch.getAllSwitches()) {
+ if ( sw.getStatus() != RCSwitch.STATUS.ACTIVE ) {
+ continue;
+ }
+ putSwitchReplicationEvent(new SwitchEvent(sw.getDpid()));
+ }
+
+ for (RCPort p : RCPort.getAllPorts()) {
+ if (p.getStatus() != RCPort.STATUS.ACTIVE) {
+ continue;
+ }
+ putPortReplicationEvent(new PortEvent(p.getDpid(), p.getNumber() ));
+ }
+
+ // TODO Is Device going to be in DB? If so, read from DB.
+ // for (RCDevice d : RCDevice.getAllDevices()) {
+ // DeviceEvent devEvent = new DeviceEvent( MACAddress.valueOf(d.getMac()) );
+ // for (byte[] portId : d.getAllPortIds() ) {
+ // devEvent.addAttachmentPoint( new SwitchPort( RCPort.getDpidFromKey(portId), RCPort.getNumberFromKey(portId) ));
+ // }
+ // }
+
+ for (RCLink l : RCLink.getAllLinks()) {
+ // check if src/dst switch/port exist before triggering event
+ Port srcPort = getPort(l.getSrc().dpid, l.getSrc().number);
+ Port dstPort = getPort(l.getDst().dpid, l.getDst().number);
+ if ( srcPort == null || dstPort == null ) {
+ continue;
+ }
+ putLinkReplicationEvent( new LinkEvent(l.getSrc().dpid, l.getSrc().number, l.getDst().dpid, l.getDst().number));
+ }
+ }
+}
diff --git a/src/test/java/net/onrc/onos/intent/MockNetworkGraph.java b/src/test/java/net/onrc/onos/intent/MockNetworkGraph.java
index ec0d54f..cde98f0 100644
--- a/src/test/java/net/onrc/onos/intent/MockNetworkGraph.java
+++ b/src/test/java/net/onrc/onos/intent/MockNetworkGraph.java
@@ -1,12 +1,12 @@
package net.onrc.onos.intent;
-import net.onrc.onos.ofcontroller.networkgraph.AbstractNetworkGraph;
+import net.onrc.onos.ofcontroller.networkgraph.NetworkGraphImpl;
import net.onrc.onos.ofcontroller.networkgraph.Link;
import net.onrc.onos.ofcontroller.networkgraph.LinkImpl;
import net.onrc.onos.ofcontroller.networkgraph.Switch;
import net.onrc.onos.ofcontroller.networkgraph.SwitchImpl;
-public class MockNetworkGraph extends AbstractNetworkGraph {
+public class MockNetworkGraph extends NetworkGraphImpl {
public Switch addSwitch(Long switchId) {
SwitchImpl sw = new SwitchImpl(this, switchId);
switches.put(sw.getDpid(), sw);
diff --git a/web/add-intent.rb b/web/add-intent.rb
deleted file mode 100644
index b1d8dfd..0000000
--- a/web/add-intent.rb
+++ /dev/null
@@ -1,122 +0,0 @@
-require "rest-client"
-require "optparse"
-
-options = { :intent_id => 123, :intent_type => "shortest_intent_type", :max_switches => 4 }
-
-parser = OptionParser.new do |opts|
- opts.banner = "Usage add-intent [options]"
- opts.on('-t', '--max_intents max_intents', 'max. number of intents') do |max_intents|
- options[:max_intents] = max_intents
- end
- opts.on('-a', '--application application_id', 'set application id') do |appl_id|
- options[:application_id] = appl_id.to_i
- end
- opts.on('-i', '--intent_id intent_id', 'global intent id') do |id|
- options[:intent_id] = id.to_i
- end
- opts.on('-s', '--shortest', 'create a shortest path intent') do
- options[:intent_type] = "shortest_intent_type"
- end
- opts.on('-c', '--constrained', 'create a constrained shortest path intent') do
- options[:intent_type] = "constrained_shortest_intent_type"
- end
- opts.on('-m', '--max_switches max_switches', 'max. number of switches') do |max_switches|
- options[:max_switches] = max_switches.to_i
- end
- opts.on('-h', '--help', 'Display help') do
- puts opts
- exit
- end
-end
-parser.parse!
-
-puts options.inspect
-server = options[:server]
-server ||= "127.0.0.1"
-port = options[:port]
-port ||= 8080
-
-def rand_mac
- mac = `openssl rand -hex 6`
- mac.scan(/(..)/).join(":")
-end
-
-def rand_switch
- switch = `openssl rand -hex 5`.chomp
-end
-
-class Intent
- attr_reader :switches
- attr_reader :ports
- attr_reader :intent_id
- attr_reader :application_id
- attr_reader :intent_type
-
- def initialize options
- parse_options options
- end
-
- def create_intent
- json_intents = []
- @switches.each do |sw|
- rest = switches - [sw]
- json_intents = _create_intent sw, rest, json_intents
- end
- json_intents
- end
-
- def parse_options options
- max_switches = options[:max_switches].to_i || 4
- @switches = (1..max_switches).to_a
- @ports = (1..(max_switches - 1)).to_a
- @intent_id = options[:intent_id]
- @intent_id ||= 1
- @application_id = options[:application_id]
- @application_id ||= 1
- @intent_type = options[:intent_type]
- end
-
-
- def _create_intent src_switch, iterable_switches, json_intents
- network_id = 1
- iterable_switches.each_index do |sw_i|
- dst_switch = iterable_switches[sw_i]
- sw_set = @switches - [dst_switch]
- dst_port = sw_set.index(src_switch)
- dst_port = dst_port + 1
- intent = {
- :intent_id => "#{@application_id}:#{@intent_id}",
- :intent_type => @intent_type,
- :srcSwitch => src_switch.to_s,
- :srcPort => @ports[sw_i],
- :srcMac => "00:00:c0:a8:#{mac_format(src_switch)}",
- :dstSwitch => iterable_switches[sw_i].to_s,
- :dstPort => dst_port,
- :dstMac => "00:00:c0:a8:#{mac_format(iterable_switches[sw_i].to_i)}"
- }
-puts intent
- @intent_id = @intent_id + 1
- json_intents << intent
-puts
- end
- #sha256 = Digest::SHA256.new
- #sha256.update intent_hash.to_s
- #puts sha256.hexdigest
- #puts "intent hash = #{intent_hash}"
- json_intents
- end
-
- def mac_format number
- if number > 255
- divisor = number / 256
- remainder = number % 256
- return sprintf("%02x:%02x",divisor ,remainder)
- end
- "00:%02x" % number
- end
-end
-
-intent = Intent.new options
-json_data = intent.create_intent
-response = RestClient.post "http://#{server}:#{port}/wm/onos/datagrid/add/intent/json", json_data.to_json, :content_type => :json, :accept => :json
-puts response.inspect
diff --git a/web/rest-intent/add-get-intent.rb b/web/rest-intent/add-get-intent.rb
new file mode 100644
index 0000000..1627aea
--- /dev/null
+++ b/web/rest-intent/add-get-intent.rb
@@ -0,0 +1,207 @@
+require "rest-client"
+require "optparse"
+
+options = {
+ :rest_op => "add",
+ :intent_id => 123,
+ :intent_type => "shortest_intent_type",
+ :max_switches => 4,
+ :intent_op => "add"
+}
+
+parser = OptionParser.new do |opts|
+ opts.banner = "Usage add-get-intent [options]"
+ opts.on('-g', '--get_intents', 'get intents state') do
+ options[:rest_op] = "get"
+ end
+ opts.on('-t', '--max_intents max_intents', 'max. number of intents') do |max_intents|
+ options[:max_intents] = max_intents
+ end
+ opts.on('-l', '--application application_id', 'set application id') do |appl_id|
+ options[:application_id] = appl_id.to_i
+ end
+ opts.on('-i', '--intent_id intent_id', 'global intent id') do |id|
+ options[:intent_id] = id.to_i
+ end
+ # optional argument
+ opts.on('-s', '--shortest', 'create a shortest path intent') do
+ options[:intent_type] = "shortest_intent_type"
+ end
+ # optional argument
+ opts.on('-c', '--constrained', 'create a constrained shortest path intent') do
+ options[:intent_type] = "constrained_shortest_intent_type"
+ end
+ # optional argument
+ opts.on('-r', '--random_intent', 'create minimum no. of random intents') do
+ options[:random_intent] = true
+ end
+ opts.on('-m', '--max_switches max_switches', 'max. number of switches') do |max_switches|
+ options[:max_switches] = max_switches.to_i
+ end
+ opts.on('-o', '--intent_op add|remove', 'an operation to post an intent') do |operation|
+ options[:intent_op] = operation
+ end
+ opts.on('-w', '--server server', 'server to post intents') do |server|
+ options[:server] = server
+ end
+ opts.on('-p','--port port', 'server port') do |port|
+ options[:port] = port
+ end
+ opts.on('b', '--bulk_limit bulk_limit', 'bulk request upto this limit') do |bulk_limit|
+ options[:bulk_limit] = bulk_limit
+ end
+ opts.on('-h', '--help', 'Display help') do
+ puts opts
+ exit
+ end
+end
+parser.parse!
+
+def rand_mac
+ mac = `openssl rand -hex 6`
+ mac.scan(/(..)/).join(":")
+end
+
+def rand_switch
+ switch = `openssl rand -hex 5`.chomp
+end
+
+class Intent
+ attr_reader :switches, :ports, :intent_id
+ attr_reader :application_id, :intent_type, :intent_op
+ attr_reader :random_intent, :server, :port
+ attr_reader :bulk_limit
+
+ def initialize options
+ parse_options options
+ end
+
+ def post_intent
+ create_specific_intent
+ end
+
+ def get_intent
+ request = RestClient.get "http://#{@server}:#{@port}/wm/onos/datagrid/get/intents/json"
+ puts request
+ end
+
+ private
+
+ def create_specific_intent
+ if @random_intent == true
+ create_random_intent
+ else
+ create_many_intents
+ end
+ end
+
+ # create as many intents as the number of switches
+ def create_many_intents
+ intents = []
+ @switches.each do |sw|
+ rest = @switches - [sw]
+ intents = _create_intent sw, rest, intents
+puts intents.size
+ post_slice intents
+ end
+ post_slice intents, true
+ end
+
+ # pick a random src switch and create intents to all other switches
+ def create_random_intent
+ intents = []
+ sw = @switches.shuffle[0]
+ rest = @switches - [sw]
+ intents = _create_intent sw, rest, intents
+ post_slice intents, true
+ end
+
+ def post_slice intents, last=false
+ @bulk_limit = @bulk_limit.to_i
+ if intents.size >= @bulk_limit
+ post intents.slice!(0..(@bulk_limit - 1))
+ end
+ if last == true
+ loop do
+ new_bulk_limit = intents.size > @bulk_limit ? @bulk_limit : intents.size
+ post intents.slice!(0..(new_bulk_limit - 1))
+ break if new_bulk_limit < @bulk_limit
+ end
+ end
+ end
+
+ def post intents
+ json_data = intents.to_json
+ response = RestClient.post "http://#{@server}:#{@port}/wm/onos/datagrid/#{intent_op}/intent/json", json_data, :content_type => :json, :accept => :json
+ puts response
+ end
+
+ def parse_options options
+ max_switches = options[:max_switches].to_i || 4
+ @switches = (1..max_switches).to_a
+ @ports = (1..(max_switches - 1)).to_a
+ @intent_id = options[:intent_id]
+ @intent_id ||= 1
+ @application_id = options[:application_id]
+ @application_id ||= 1
+ @intent_type = options[:intent_type]
+ @intent_op = options[:intent_op]
+ @intent_op ||= "add"
+ @random_intent = options[:random_intent]
+ @random_intent ||= false
+ @server = options[:server]
+ @server ||= "127.0.0.1"
+ @port = options[:port]
+ @port ||= 8080
+ @bulk_limit = options[:bulk_limit]
+ @bulk_limit ||= 10000
+ end
+
+
+ def _create_intent src_switch, iterable_switches, json_intents
+ network_id = 1
+ iterable_switches.each_index do |sw_i|
+ dst_switch = iterable_switches[sw_i]
+ sw_set = @switches - [dst_switch]
+ dst_port = sw_set.index(src_switch)
+ dst_port = dst_port + 1
+ intent = {
+ :intent_id => "#{@application_id}:#{@intent_id}",
+ :intent_type => @intent_type,
+ :intent_op => @intent_op,
+ :srcSwitch => src_switch.to_s,
+ :srcPort => @ports[sw_i],
+ :srcMac => "00:00:c0:a8:#{mac_format(src_switch)}",
+ :dstSwitch => iterable_switches[sw_i].to_s,
+ :dstPort => dst_port,
+ :dstMac => "00:00:c0:a8:#{mac_format(iterable_switches[sw_i].to_i)}"
+ }
+puts intent.inspect
+ @intent_id = @intent_id + 1
+ json_intents << intent
+puts
+ end
+ #sha256 = Digest::SHA256.new
+ #sha256.update intent_hash.to_s
+ #puts sha256.hexdigest
+ #puts "intent hash = #{intent_hash}"
+ json_intents
+ end
+
+ def mac_format number
+ if number > 255
+ divisor = number / 256
+ remainder = number % 256
+ return sprintf("%02x:%02x",divisor ,remainder)
+ end
+ "00:%02x" % number
+ end
+end
+
+intent = Intent.new options
+if options[:rest_op] == "get"
+ intent.get_intent
+else
+ json_data = intent.post_intent
+end
+