Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next
diff --git a/apps/foo/src/main/java/org/onlab/onos/foo/FooComponent.java b/apps/foo/src/main/java/org/onlab/onos/foo/FooComponent.java
index 1fafc32..473ea6e 100644
--- a/apps/foo/src/main/java/org/onlab/onos/foo/FooComponent.java
+++ b/apps/foo/src/main/java/org/onlab/onos/foo/FooComponent.java
@@ -11,6 +11,9 @@
import org.onlab.onos.net.device.DeviceEvent;
import org.onlab.onos.net.device.DeviceListener;
import org.onlab.onos.net.device.DeviceService;
+import org.onlab.onos.net.intent.IntentEvent;
+import org.onlab.onos.net.intent.IntentListener;
+import org.onlab.onos.net.intent.IntentService;
import org.slf4j.Logger;
import static org.slf4j.LoggerFactory.getLogger;
@@ -29,13 +32,18 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceService deviceService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected IntentService intentService;
+
private final ClusterEventListener clusterListener = new InnerClusterListener();
private final DeviceListener deviceListener = new InnerDeviceListener();
+ private final IntentListener intentListener = new InnerIntentListener();
@Activate
public void activate() {
clusterService.addListener(clusterListener);
deviceService.addListener(deviceListener);
+ intentService.addListener(intentListener);
log.info("Started");
}
@@ -43,6 +51,7 @@
public void deactivate() {
clusterService.removeListener(clusterListener);
deviceService.removeListener(deviceListener);
+ intentService.removeListener(intentListener);
log.info("Stopped");
}
@@ -59,6 +68,23 @@
log.info("YEEEEHAAAAW! {}", event);
}
}
+
+ private class InnerIntentListener implements IntentListener {
+ @Override
+ public void event(IntentEvent event) {
+ String message;
+ if (event.type() == IntentEvent.Type.SUBMITTED) {
+ message = "WOW! It looks like someone has some intentions: {}";
+ } else if (event.type() == IntentEvent.Type.INSTALLED) {
+ message = "AWESOME! So far things are going great: {}";
+ } else if (event.type() == IntentEvent.Type.WITHDRAWN) {
+ message = "HMMM! Ambitions are fading apparently: {}";
+ } else {
+ message = "CRAP!!! Things are not turning out as intended: {}";
+ }
+ log.info(message, event.subject());
+ }
+ }
}
diff --git a/cli/src/main/java/org/onlab/onos/cli/SummaryCommand.java b/cli/src/main/java/org/onlab/onos/cli/SummaryCommand.java
new file mode 100644
index 0000000..b3e03b3
--- /dev/null
+++ b/cli/src/main/java/org/onlab/onos/cli/SummaryCommand.java
@@ -0,0 +1,35 @@
+package org.onlab.onos.cli;
+
+import org.apache.karaf.shell.commands.Command;
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.net.device.DeviceService;
+import org.onlab.onos.net.flow.FlowRuleService;
+import org.onlab.onos.net.host.HostService;
+import org.onlab.onos.net.intent.IntentService;
+import org.onlab.onos.net.link.LinkService;
+import org.onlab.onos.net.topology.Topology;
+import org.onlab.onos.net.topology.TopologyService;
+
+/**
+ * Provides summary of ONOS model.
+ */
+@Command(scope = "onos", name = "summary",
+ description = "Provides summary of ONOS model")
+public class SummaryCommand extends AbstractShellCommand {
+
+ @Override
+ protected void execute() {
+ TopologyService topologyService = get(TopologyService.class);
+ Topology topology = topologyService.currentTopology();
+ print("nodes=%d, devices=%d, links=%d, hosts=%d, clusters=%s, paths=%d, flows=%d, intents=%d",
+ get(ClusterService.class).getNodes().size(),
+ get(DeviceService.class).getDeviceCount(),
+ get(LinkService.class).getLinkCount(),
+ get(HostService.class).getHostCount(),
+ topologyService.getClusters(topology).size(),
+ topology.pathCount(),
+ get(FlowRuleService.class).getFlowRuleCount(),
+ get(IntentService.class).getIntentCount());
+ }
+
+}
diff --git a/cli/src/main/java/org/onlab/onos/cli/net/DevicesListCommand.java b/cli/src/main/java/org/onlab/onos/cli/net/DevicesListCommand.java
index 6f0dd30..f34f97e 100644
--- a/cli/src/main/java/org/onlab/onos/cli/net/DevicesListCommand.java
+++ b/cli/src/main/java/org/onlab/onos/cli/net/DevicesListCommand.java
@@ -35,7 +35,7 @@
* @param service device service
* @return sorted device list
*/
- protected List<Device> getSortedDevices(DeviceService service) {
+ protected static List<Device> getSortedDevices(DeviceService service) {
List<Device> devices = newArrayList(service.getDevices());
Collections.sort(devices, Comparators.ELEMENT_COMPARATOR);
return devices;
diff --git a/cli/src/main/java/org/onlab/onos/cli/net/FlowsListCommand.java b/cli/src/main/java/org/onlab/onos/cli/net/FlowsListCommand.java
index 8b6cefc..902b27b 100644
--- a/cli/src/main/java/org/onlab/onos/cli/net/FlowsListCommand.java
+++ b/cli/src/main/java/org/onlab/onos/cli/net/FlowsListCommand.java
@@ -1,6 +1,7 @@
package org.onlab.onos.cli.net;
import static com.google.common.collect.Lists.newArrayList;
+import static org.onlab.onos.cli.net.DevicesListCommand.getSortedDevices;
import java.util.Collections;
import java.util.List;
@@ -46,7 +47,7 @@
DeviceService deviceService = get(DeviceService.class);
FlowRuleService service = get(FlowRuleService.class);
Map<Device, List<FlowEntry>> flows = getSortedFlows(deviceService, service);
- for (Device d : flows.keySet()) {
+ for (Device d : getSortedDevices(deviceService)) {
printFlows(d, flows.get(d));
}
}
@@ -57,14 +58,15 @@
* @param service device service
* @return sorted device list
*/
- protected Map<Device, List<FlowEntry>> getSortedFlows(DeviceService deviceService, FlowRuleService service) {
+ protected Map<Device, List<FlowEntry>> getSortedFlows(DeviceService deviceService,
+ FlowRuleService service) {
Map<Device, List<FlowEntry>> flows = Maps.newHashMap();
List<FlowEntry> rules;
FlowEntryState s = null;
if (state != null && !state.equals("any")) {
s = FlowEntryState.valueOf(state.toUpperCase());
}
- Iterable<Device> devices = uri == null ? deviceService.getDevices() :
+ Iterable<Device> devices = uri == null ? deviceService.getDevices() :
Collections.singletonList(deviceService.getDevice(DeviceId.deviceId(uri)));
for (Device d : devices) {
if (s == null) {
@@ -89,18 +91,16 @@
* @param flows the set of flows for that device.
*/
protected void printFlows(Device d, List<FlowEntry> flows) {
- print("Device: " + d.id());
- if (flows == null | flows.isEmpty()) {
- print(" %s", "No flows.");
- return;
+ boolean empty = flows == null || flows.isEmpty();
+ print("deviceId=%s, flowRuleCount=%d", d.id(), empty ? 0 : flows.size());
+ if (!empty) {
+ for (FlowEntry f : flows) {
+ print(FMT, Long.toHexString(f.id().value()), f.state(), f.bytes(),
+ f.packets(), f.life(), f.priority());
+ print(SFMT, f.selector().criteria());
+ print(TFMT, f.treatment().instructions());
+ }
}
- for (FlowEntry f : flows) {
- print(FMT, Long.toHexString(f.id().value()), f.state(), f.bytes(),
- f.packets(), f.life(), f.priority());
- print(SFMT, f.selector().criteria());
- print(TFMT, f.treatment().instructions());
- }
-
}
}
diff --git a/cli/src/main/java/org/onlab/onos/cli/net/IntentIdCompleter.java b/cli/src/main/java/org/onlab/onos/cli/net/IntentIdCompleter.java
index 5e217d6..5d2e952 100644
--- a/cli/src/main/java/org/onlab/onos/cli/net/IntentIdCompleter.java
+++ b/cli/src/main/java/org/onlab/onos/cli/net/IntentIdCompleter.java
@@ -24,7 +24,7 @@
Iterator<Intent> it = service.getIntents().iterator();
SortedSet<String> strings = delegate.getStrings();
while (it.hasNext()) {
- strings.add(it.next().getId().toString());
+ strings.add(it.next().id().toString());
}
// Now let the completer do the work for figuring out what to offer.
diff --git a/cli/src/main/java/org/onlab/onos/cli/net/RemoveHostToHostIntentCommand.java b/cli/src/main/java/org/onlab/onos/cli/net/IntentRemoveCommand.java
similarity index 82%
rename from cli/src/main/java/org/onlab/onos/cli/net/RemoveHostToHostIntentCommand.java
rename to cli/src/main/java/org/onlab/onos/cli/net/IntentRemoveCommand.java
index 1840c0d..7684da4 100644
--- a/cli/src/main/java/org/onlab/onos/cli/net/RemoveHostToHostIntentCommand.java
+++ b/cli/src/main/java/org/onlab/onos/cli/net/IntentRemoveCommand.java
@@ -10,9 +10,9 @@
/**
* Removes host-to-host connectivity intent.
*/
-@Command(scope = "onos", name = "remove-host-intent",
- description = "Removes host-to-host connectivity intent")
-public class RemoveHostToHostIntentCommand extends AbstractShellCommand {
+@Command(scope = "onos", name = "remove-intent",
+ description = "Removes the specified intent")
+public class IntentRemoveCommand extends AbstractShellCommand {
@Argument(index = 0, name = "id", description = "Intent ID",
required = true, multiValued = false)
diff --git a/cli/src/main/java/org/onlab/onos/cli/net/IntentsListCommand.java b/cli/src/main/java/org/onlab/onos/cli/net/IntentsListCommand.java
index a7d260d..a0c9845 100644
--- a/cli/src/main/java/org/onlab/onos/cli/net/IntentsListCommand.java
+++ b/cli/src/main/java/org/onlab/onos/cli/net/IntentsListCommand.java
@@ -17,8 +17,8 @@
protected void execute() {
IntentService service = get(IntentService.class);
for (Intent intent : service.getIntents()) {
- IntentState state = service.getIntentState(intent.getId());
- print("%s %s %s", intent.getId(), state, intent);
+ IntentState state = service.getIntentState(intent.id());
+ print("%s %s %s", intent.id(), state, intent);
}
}
diff --git a/cli/src/main/java/org/onlab/onos/cli/net/WipeOutCommand.java b/cli/src/main/java/org/onlab/onos/cli/net/WipeOutCommand.java
index 3f90f2a..e62f850 100644
--- a/cli/src/main/java/org/onlab/onos/cli/net/WipeOutCommand.java
+++ b/cli/src/main/java/org/onlab/onos/cli/net/WipeOutCommand.java
@@ -1,5 +1,6 @@
package org.onlab.onos.cli.net;
+import org.apache.karaf.shell.commands.Argument;
import org.apache.karaf.shell.commands.Command;
import org.onlab.onos.net.Device;
import org.onlab.onos.net.Host;
@@ -18,8 +19,22 @@
description = "Wipes-out the entire network information base, i.e. devices, links, hosts")
public class WipeOutCommand extends ClustersListCommand {
+
+ private static final String DISCLAIMER = "Yes, I know it will delete everything!";
+
+ @Argument(index = 0, name = "disclaimer", description = "Device ID",
+ required = true, multiValued = false)
+ String disclaimer = null;
+
@Override
protected void execute() {
+ if (!disclaimer.equals(DISCLAIMER)) {
+ print("I'm afraid I can't do that...");
+ print("You have to acknowledge by: " + DISCLAIMER);
+ return;
+ }
+
+ print("Good bye...");
DeviceAdminService deviceAdminService = get(DeviceAdminService.class);
DeviceService deviceService = get(DeviceService.class);
for (Device device : deviceService.getDevices()) {
@@ -34,7 +49,7 @@
IntentService intentService = get(IntentService.class);
for (Intent intent : intentService.getIntents()) {
- if (intentService.getIntentState(intent.getId()) == IntentState.INSTALLED) {
+ if (intentService.getIntentState(intent.id()) == IntentState.INSTALLED) {
intentService.withdraw(intent);
}
}
diff --git a/cli/src/main/javadoc/doc-files/intent-states.png b/cli/src/main/javadoc/doc-files/intent-states.png
new file mode 100644
index 0000000..70c333b
--- /dev/null
+++ b/cli/src/main/javadoc/doc-files/intent-states.png
Binary files differ
diff --git a/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml b/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml
index 9ed8a47..92f26fd 100644
--- a/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml
+++ b/cli/src/main/resources/OSGI-INF/blueprint/shell-config.xml
@@ -2,6 +2,9 @@
<command-bundle xmlns="http://karaf.apache.org/xmlns/shell/v1.1.0">
<command>
+ <action class="org.onlab.onos.cli.SummaryCommand"/>
+ </command>
+ <command>
<action class="org.onlab.onos.cli.NodesListCommand"/>
</command>
<command>
@@ -61,15 +64,15 @@
<action class="org.onlab.onos.cli.net.IntentsListCommand"/>
</command>
<command>
- <action class="org.onlab.onos.cli.net.AddHostToHostIntentCommand"/>
+ <action class="org.onlab.onos.cli.net.IntentRemoveCommand"/>
<completers>
- <ref component-id="hostIdCompleter"/>
+ <ref component-id="intentIdCompleter"/>
</completers>
</command>
<command>
- <action class="org.onlab.onos.cli.net.RemoveHostToHostIntentCommand"/>
+ <action class="org.onlab.onos.cli.net.AddHostToHostIntentCommand"/>
<completers>
- <ref component-id="intentIdCompleter"/>
+ <ref component-id="hostIdCompleter"/>
</completers>
</command>
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/DefaultFlowRule.java b/core/api/src/main/java/org/onlab/onos/net/flow/DefaultFlowRule.java
index 9011a93..47e9fed 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/DefaultFlowRule.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/DefaultFlowRule.java
@@ -115,7 +115,7 @@
}
public int hash() {
- return Objects.hash(deviceId, selector, id);
+ return Objects.hash(deviceId, selector, treatment);
}
@Override
@@ -132,7 +132,7 @@
if (obj instanceof DefaultFlowRule) {
DefaultFlowRule that = (DefaultFlowRule) obj;
return Objects.equals(deviceId, that.deviceId) &&
- //Objects.equals(id, that.id) &&
+ Objects.equals(id, that.id) &&
Objects.equals(priority, that.priority) &&
Objects.equals(selector, that.selector);
@@ -143,7 +143,7 @@
@Override
public String toString() {
return toStringHelper(this)
- .add("id", id)
+ .add("id", Long.toHexString(id.value()))
.add("deviceId", deviceId)
.add("priority", priority)
.add("selector", selector.criteria())
@@ -154,7 +154,7 @@
@Override
public int timeout() {
- return timeout > MAX_TIMEOUT ? MAX_TIMEOUT : this.timeout;
+ return timeout;
}
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/DefaultTrafficTreatment.java b/core/api/src/main/java/org/onlab/onos/net/flow/DefaultTrafficTreatment.java
index 3f43598..7182916 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/DefaultTrafficTreatment.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/DefaultTrafficTreatment.java
@@ -1,5 +1,12 @@
package org.onlab.onos.net.flow;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.flow.instructions.Instruction;
import org.onlab.onos.net.flow.instructions.Instructions;
@@ -8,12 +15,6 @@
import org.onlab.packet.VlanId;
import org.slf4j.Logger;
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-
-import static org.slf4j.LoggerFactory.getLogger;
-
/**
* Default traffic treatment implementation.
*/
@@ -44,6 +45,25 @@
return new Builder();
}
+ //FIXME: Order of instructions may affect hashcode
+ @Override
+ public int hashCode() {
+ return Objects.hash(instructions);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj instanceof DefaultTrafficTreatment) {
+ DefaultTrafficTreatment that = (DefaultTrafficTreatment) obj;
+ return Objects.equals(instructions, that.instructions);
+
+ }
+ return false;
+ }
+
/**
* Builds a list of treatments following the following order.
* Modifications -> Group -> Output (including drop)
@@ -66,6 +86,7 @@
private Builder() {
}
+ @Override
public Builder add(Instruction instruction) {
if (drop) {
return this;
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleService.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleService.java
index 2ebc5a25..8600c54 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleService.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleService.java
@@ -13,6 +13,13 @@
public interface FlowRuleService {
/**
+ * Returns the number of flow rules in the system.
+ *
+ * @return flow rule count
+ */
+ int getFlowRuleCount();
+
+ /**
* Returns the collection of flow entries applied on the specified device.
* This will include flow rules which may not yet have been applied to
* the device.
@@ -59,6 +66,8 @@
*/
Iterable<FlowRule> getFlowRulesById(ApplicationId id);
+ //Future<CompletedBatchOperation> applyBatch(BatchOperation<FlowRuleBatchEntry>)
+
/**
* Adds the specified flow rule listener.
*
@@ -72,7 +81,4 @@
* @param listener flow rule listener
*/
void removeListener(FlowRuleListener listener);
-
-
-
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleStore.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleStore.java
index 4d68e74..5ce7eb1 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleStore.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleStore.java
@@ -10,7 +10,15 @@
public interface FlowRuleStore extends Store<FlowRuleEvent, FlowRuleStoreDelegate> {
/**
+ * Returns the number of flow rule in the store.
+ *
+ * @return number of flow rules
+ */
+ int getFlowRuleCount();
+
+ /**
* Returns the stored flow.
+ *
* @param rule the rule to look for
* @return a flow rule
*/
@@ -60,5 +68,4 @@
* @return flow_removed event, or null if nothing removed
*/
FlowRuleEvent removeFlowRule(FlowEntry rule);
-
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/instructions/Instructions.java b/core/api/src/main/java/org/onlab/onos/net/flow/instructions/Instructions.java
index 1bf5531..b2ebdee 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/instructions/Instructions.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/instructions/Instructions.java
@@ -3,6 +3,8 @@
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkNotNull;
+import java.util.Objects;
+
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.flow.instructions.L2ModificationInstruction.L2SubType;
import org.onlab.onos.net.flow.instructions.L2ModificationInstruction.ModEtherInstruction;
@@ -117,6 +119,24 @@
return toStringHelper(type()).toString();
}
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(type());
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj instanceof DropInstruction) {
+ DropInstruction that = (DropInstruction) obj;
+ return Objects.equals(type(), that.type());
+
+ }
+ return false;
+ }
}
@@ -140,6 +160,26 @@
return toStringHelper(type().toString())
.add("port", port).toString();
}
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(port, type());
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj instanceof OutputInstruction) {
+ OutputInstruction that = (OutputInstruction) obj;
+ Objects.equals(port, that.port);
+
+ }
+ return false;
+ }
}
}
+
+
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/instructions/L2ModificationInstruction.java b/core/api/src/main/java/org/onlab/onos/net/flow/instructions/L2ModificationInstruction.java
index 8c51624..ce7e16b 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/instructions/L2ModificationInstruction.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/instructions/L2ModificationInstruction.java
@@ -2,6 +2,8 @@
import static com.google.common.base.MoreObjects.toStringHelper;
+import java.util.Objects;
+
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
@@ -74,6 +76,25 @@
.add("mac", mac).toString();
}
+ @Override
+ public int hashCode() {
+ return Objects.hash(mac, subtype);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj instanceof ModEtherInstruction) {
+ ModEtherInstruction that = (ModEtherInstruction) obj;
+ return Objects.equals(mac, that.mac) &&
+ Objects.equals(subtype, that.subtype);
+
+ }
+ return false;
+ }
+
}
@@ -103,6 +124,25 @@
.add("id", vlanId).toString();
}
+ @Override
+ public int hashCode() {
+ return Objects.hash(vlanId, subtype());
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj instanceof ModVlanIdInstruction) {
+ ModVlanIdInstruction that = (ModVlanIdInstruction) obj;
+ return Objects.equals(vlanId, that.vlanId);
+
+ }
+ return false;
+ }
+
+
}
/**
@@ -131,6 +171,24 @@
.add("pcp", Long.toHexString(vlanPcp)).toString();
}
+ @Override
+ public int hashCode() {
+ return Objects.hash(vlanPcp, subtype());
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj instanceof ModVlanPcpInstruction) {
+ ModVlanPcpInstruction that = (ModVlanPcpInstruction) obj;
+ return Objects.equals(vlanPcp, that.vlanPcp);
+
+ }
+ return false;
+ }
+
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/instructions/L3ModificationInstruction.java b/core/api/src/main/java/org/onlab/onos/net/flow/instructions/L3ModificationInstruction.java
index ae82cd9..cf81f86 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/instructions/L3ModificationInstruction.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/instructions/L3ModificationInstruction.java
@@ -2,6 +2,8 @@
import static com.google.common.base.MoreObjects.toStringHelper;
+import java.util.Objects;
+
import org.onlab.packet.IpPrefix;
/**
@@ -66,5 +68,23 @@
.add("ip", ip).toString();
}
+ @Override
+ public int hashCode() {
+ return Objects.hash(ip, subtype());
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj instanceof ModIPInstruction) {
+ ModIPInstruction that = (ModIPInstruction) obj;
+ return Objects.equals(ip, that.ip);
+
+ }
+ return false;
+ }
+
}
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/intent/AbstractIntent.java b/core/api/src/main/java/org/onlab/onos/net/intent/AbstractIntent.java
index eefe750..c8a4a05 100644
--- a/core/api/src/main/java/org/onlab/onos/net/intent/AbstractIntent.java
+++ b/core/api/src/main/java/org/onlab/onos/net/intent/AbstractIntent.java
@@ -24,7 +24,7 @@
}
@Override
- public IntentId getId() {
+ public IntentId id() {
return id;
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/intent/ConnectivityIntent.java b/core/api/src/main/java/org/onlab/onos/net/intent/ConnectivityIntent.java
index 70cec58..ed0c5cc 100644
--- a/core/api/src/main/java/org/onlab/onos/net/intent/ConnectivityIntent.java
+++ b/core/api/src/main/java/org/onlab/onos/net/intent/ConnectivityIntent.java
@@ -53,7 +53,7 @@
*
* @return traffic match
*/
- public TrafficSelector getTrafficSelector() {
+ public TrafficSelector selector() {
return selector;
}
@@ -62,7 +62,7 @@
*
* @return applied action
*/
- public TrafficTreatment getTrafficTreatment() {
+ public TrafficTreatment treatment() {
return treatment;
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/intent/HostToHostIntent.java b/core/api/src/main/java/org/onlab/onos/net/intent/HostToHostIntent.java
index 7cef3da..f420fc2 100644
--- a/core/api/src/main/java/org/onlab/onos/net/intent/HostToHostIntent.java
+++ b/core/api/src/main/java/org/onlab/onos/net/intent/HostToHostIntent.java
@@ -80,9 +80,9 @@
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
- .add("id", getId())
- .add("selector", getTrafficSelector())
- .add("treatment", getTrafficTreatment())
+ .add("id", id())
+ .add("selector", selector())
+ .add("treatment", treatment())
.add("one", one)
.add("two", two)
.toString();
diff --git a/core/api/src/main/java/org/onlab/onos/net/intent/Intent.java b/core/api/src/main/java/org/onlab/onos/net/intent/Intent.java
index d4c630a..3e339d1 100644
--- a/core/api/src/main/java/org/onlab/onos/net/intent/Intent.java
+++ b/core/api/src/main/java/org/onlab/onos/net/intent/Intent.java
@@ -11,5 +11,5 @@
*
* @return intent identifier
*/
- IntentId getId();
+ IntentId id();
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/intent/IntentEvent.java b/core/api/src/main/java/org/onlab/onos/net/intent/IntentEvent.java
index c98e788..742a590 100644
--- a/core/api/src/main/java/org/onlab/onos/net/intent/IntentEvent.java
+++ b/core/api/src/main/java/org/onlab/onos/net/intent/IntentEvent.java
@@ -1,106 +1,55 @@
package org.onlab.onos.net.intent;
-import com.google.common.base.MoreObjects;
import org.onlab.onos.event.AbstractEvent;
-import java.util.Objects;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
/**
* A class to represent an intent related event.
*/
-public class IntentEvent extends AbstractEvent<IntentState, Intent> {
+public class IntentEvent extends AbstractEvent<IntentEvent.Type, Intent> {
- // TODO: determine a suitable parent class; if one does not exist, consider
- // introducing one
+ public enum Type {
+ /**
+ * Signifies that a new intent has been submitted to the system.
+ */
+ SUBMITTED,
- private final long time;
- private final Intent intent;
- private final IntentState state;
- private final IntentState previous;
+ /**
+ * Signifies that an intent has been successfully installed.
+ */
+ INSTALLED,
- /**
- * Creates an event describing a state change of an intent.
- *
- * @param intent subject intent
- * @param state new intent state
- * @param previous previous intent state
- * @param time time the event created in milliseconds since start of epoch
- * @throws NullPointerException if the intent or state is null
- */
- public IntentEvent(Intent intent, IntentState state, IntentState previous, long time) {
- super(state, intent);
- this.intent = checkNotNull(intent);
- this.state = checkNotNull(state);
- this.previous = previous;
- this.time = time;
+ /**
+ * Signifies that an intent has failed compilation or installation.
+ */
+ FAILED,
+
+ /**
+ * Signifies that an intent has been withdrawn from the system.
+ */
+ WITHDRAWN
}
/**
- * Returns the state of the intent which caused the event.
+ * Creates an event of a given type and for the specified intent and the
+ * current time.
*
- * @return the state of the intent
+ * @param type event type
+ * @param intent subject intent
+ * @param time time the event created in milliseconds since start of epoch
*/
- public IntentState getState() {
- return state;
+ public IntentEvent(Type type, Intent intent, long time) {
+ super(type, intent, time);
}
/**
- * Returns the previous state of the intent which caused the event.
+ * Creates an event of a given type and for the specified intent and the
+ * current time.
*
- * @return the previous state of the intent
+ * @param type event type
+ * @param intent subject intent
*/
- public IntentState getPreviousState() {
- return previous;
+ public IntentEvent(Type type, Intent intent) {
+ super(type, intent);
}
- /**
- * Returns the intent associated with the event.
- *
- * @return the intent
- */
- public Intent getIntent() {
- return intent;
- }
-
- /**
- * Returns the time at which the event was created.
- *
- * @return the time in milliseconds since start of epoch
- */
- public long getTime() {
- return time;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- IntentEvent that = (IntentEvent) o;
- return Objects.equals(this.intent, that.intent)
- && Objects.equals(this.state, that.state)
- && Objects.equals(this.previous, that.previous)
- && Objects.equals(this.time, that.time);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(intent, state, previous, time);
- }
-
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .add("intent", intent)
- .add("state", state)
- .add("previous", previous)
- .add("time", time)
- .toString();
- }
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/intent/IntentState.java b/core/api/src/main/java/org/onlab/onos/net/intent/IntentState.java
index 20476e5..bd140af 100644
--- a/core/api/src/main/java/org/onlab/onos/net/intent/IntentState.java
+++ b/core/api/src/main/java/org/onlab/onos/net/intent/IntentState.java
@@ -1,55 +1,70 @@
package org.onlab.onos.net.intent;
/**
- * This class represents the states of an intent.
- *
- * <p>
- * Note: The state is expressed as enum, but there is possibility
- * in the future that we define specific class instead of enum to improve
- * the extensibility of state definition.
- * </p>
+ * Representation of the phases an intent may attain during its lifecycle.
*/
public enum IntentState {
- // FIXME: requires discussion on State vs. EventType and a solid state-transition diagram
- // TODO: consider the impact of conflict detection
- // TODO: consider the impact that external events affect an installed intent
+
/**
- * The beginning state.
- *
+ * Signifies that the intent has been submitted and will start compiling
+ * shortly. However, this compilation may not necessarily occur on the
+ * local controller instance.
+ * <p/>
* All intent in the runtime take this state first.
*/
SUBMITTED,
/**
- * The intent compilation has been completed.
- *
- * An intent translation graph (tree) is completely created.
- * Leaves of the graph are installable intent type.
+ * Signifies that the intent is being compiled into installable intents.
+ * This is a transitional state after which the intent will enter either
+ * {@link #FAILED} state or {@link #INSTALLING} state.
*/
- COMPILED,
+ COMPILING,
/**
- * The intent has been successfully installed.
+ * Signifies that the resulting installable intents are being installed
+ * into the network environment. This is a transitional state after which
+ * the intent will enter either {@link #INSTALLED} state or
+ * {@link #RECOMPILING} state.
+ */
+ INSTALLING,
+
+ /**
+ * The intent has been successfully installed. This is a state where the
+ * intent may remain parked until it is withdrawn by the application or
+ * until the network environment changes in some way to make the original
+ * set of installable intents untenable.
*/
INSTALLED,
/**
- * The intent is being withdrawn.
- *
- * When {@link IntentService#withdraw(Intent)} is called,
- * the intent takes this state first.
+ * Signifies that the intent is being recompiled into installable intents
+ * as an attempt to adapt to an anomaly in the network environment.
+ * This is a transitional state after which the intent will enter either
+ * {@link #FAILED} state or {@link #INSTALLING} state.
+ * <p/>
+ * Exit to the {@link #FAILED} state may be caused by failure to compile
+ * or by compiling into the same set of installable intents which have
+ * previously failed to be installed.
+ */
+ RECOMPILING,
+
+ /**
+ * Indicates that the intent is being withdrawn. This is a transitional
+ * state, triggered by invocation of the
+ * {@link IntentService#withdraw(Intent)} but one with only one outcome,
+ * which is the the intent being placed in the {@link #WITHDRAWN} state.
*/
WITHDRAWING,
/**
- * The intent has been successfully withdrawn.
+ * Indicates that the intent has been successfully withdrawn.
*/
WITHDRAWN,
/**
- * The intent has failed to be compiled, installed, or withdrawn.
- *
- * When the intent failed to be withdrawn, it is still, at least partially installed.
+ * Signifies that the intent has failed compiling, installing or
+ * recompiling states.
*/
- FAILED,
+ FAILED
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/intent/IntentStore.java b/core/api/src/main/java/org/onlab/onos/net/intent/IntentStore.java
index 037f179..fc023bb 100644
--- a/core/api/src/main/java/org/onlab/onos/net/intent/IntentStore.java
+++ b/core/api/src/main/java/org/onlab/onos/net/intent/IntentStore.java
@@ -10,10 +10,16 @@
public interface IntentStore extends Store<IntentEvent, IntentStoreDelegate> {
/**
- * Creates a new intent.
+ * Submits a new intent into the store. If the returned event is not
+ * null, the manager is expected to dispatch the event and then to kick
+ * off intent compilation process. Otherwise, another node has been elected
+ * to perform the compilation process and the node will learn about
+ * the submittal and results of the intent compilation via the delegate
+ * mechanism.
*
- * @param intent intent
- * @return appropriate event or null if no change resulted
+ * @param intent intent to be submitted
+ * @return event indicating the intent was submitted or null if no
+ * change resulted, e.g. duplicate intent
*/
IntentEvent createIntent(Intent intent);
@@ -68,10 +74,9 @@
*
* @param intentId original intent identifier
* @param installableIntents compiled installable intents
- * @return compiled state transition event
*/
- IntentEvent addInstallableIntents(IntentId intentId,
- List<InstallableIntent> installableIntents);
+ void addInstallableIntents(IntentId intentId,
+ List<InstallableIntent> installableIntents);
/**
* Returns the list of the installable events associated with the specified
diff --git a/core/api/src/main/java/org/onlab/onos/net/intent/MultiPointToSinglePointIntent.java b/core/api/src/main/java/org/onlab/onos/net/intent/MultiPointToSinglePointIntent.java
index af1e84b..be8d309 100644
--- a/core/api/src/main/java/org/onlab/onos/net/intent/MultiPointToSinglePointIntent.java
+++ b/core/api/src/main/java/org/onlab/onos/net/intent/MultiPointToSinglePointIntent.java
@@ -1,25 +1,24 @@
package org.onlab.onos.net.intent;
-import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.Objects;
-import java.util.Set;
-
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.Sets;
import org.onlab.onos.net.ConnectPoint;
import org.onlab.onos.net.flow.TrafficSelector;
import org.onlab.onos.net.flow.TrafficTreatment;
-import com.google.common.base.MoreObjects;
-import com.google.common.collect.Sets;
+import java.util.Objects;
+import java.util.Set;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
/**
* Abstraction of multiple source to single destination connectivity intent.
*/
public class MultiPointToSinglePointIntent extends ConnectivityIntent {
- private final Set<ConnectPoint> ingressPorts;
- private final ConnectPoint egressPort;
+ private final Set<ConnectPoint> ingressPoints;
+ private final ConnectPoint egressPoint;
/**
* Creates a new multi-to-single point connectivity intent for the specified
@@ -28,25 +27,25 @@
* @param id intent identifier
* @param match traffic match
* @param action action
- * @param ingressPorts set of ports from which ingress traffic originates
- * @param egressPort port to which traffic will egress
- * @throws NullPointerException if {@code ingressPorts} or
- * {@code egressPort} is null.
- * @throws IllegalArgumentException if the size of {@code ingressPorts} is
+ * @param ingressPoints set of ports from which ingress traffic originates
+ * @param egressPoint port to which traffic will egress
+ * @throws NullPointerException if {@code ingressPoints} or
+ * {@code egressPoint} is null.
+ * @throws IllegalArgumentException if the size of {@code ingressPoints} is
* not more than 1
*/
public MultiPointToSinglePointIntent(IntentId id, TrafficSelector match,
TrafficTreatment action,
- Set<ConnectPoint> ingressPorts,
- ConnectPoint egressPort) {
+ Set<ConnectPoint> ingressPoints,
+ ConnectPoint egressPoint) {
super(id, match, action);
- checkNotNull(ingressPorts);
- checkArgument(!ingressPorts.isEmpty(),
+ checkNotNull(ingressPoints);
+ checkArgument(!ingressPoints.isEmpty(),
"there should be at least one ingress port");
- this.ingressPorts = Sets.newHashSet(ingressPorts);
- this.egressPort = checkNotNull(egressPort);
+ this.ingressPoints = Sets.newHashSet(ingressPoints);
+ this.egressPoint = checkNotNull(egressPoint);
}
/**
@@ -54,8 +53,8 @@
*/
protected MultiPointToSinglePointIntent() {
super();
- this.ingressPorts = null;
- this.egressPort = null;
+ this.ingressPoints = null;
+ this.egressPoint = null;
}
/**
@@ -64,8 +63,8 @@
*
* @return set of ingress ports
*/
- public Set<ConnectPoint> getIngressPorts() {
- return ingressPorts;
+ public Set<ConnectPoint> ingressPoints() {
+ return ingressPoints;
}
/**
@@ -73,8 +72,8 @@
*
* @return egress port
*/
- public ConnectPoint getEgressPort() {
- return egressPort;
+ public ConnectPoint egressPoint() {
+ return egressPoint;
}
@Override
@@ -90,23 +89,23 @@
}
MultiPointToSinglePointIntent that = (MultiPointToSinglePointIntent) o;
- return Objects.equals(this.ingressPorts, that.ingressPorts)
- && Objects.equals(this.egressPort, that.egressPort);
+ return Objects.equals(this.ingressPoints, that.ingressPoints)
+ && Objects.equals(this.egressPoint, that.egressPoint);
}
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), ingressPorts, egressPort);
+ return Objects.hash(super.hashCode(), ingressPoints, egressPoint);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
- .add("id", getId())
- .add("match", getTrafficSelector())
- .add("action", getTrafficTreatment())
- .add("ingressPorts", getIngressPorts())
- .add("egressPort", getEgressPort())
+ .add("id", id())
+ .add("match", selector())
+ .add("action", treatment())
+ .add("ingressPoints", ingressPoints())
+ .add("egressPoint", egressPoint())
.toString();
}
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/intent/PathIntent.java b/core/api/src/main/java/org/onlab/onos/net/intent/PathIntent.java
index 4c3486f..ff2e917 100644
--- a/core/api/src/main/java/org/onlab/onos/net/intent/PathIntent.java
+++ b/core/api/src/main/java/org/onlab/onos/net/intent/PathIntent.java
@@ -46,7 +46,7 @@
*
* @return traversed links
*/
- public Path getPath() {
+ public Path path() {
return path;
}
@@ -79,11 +79,11 @@
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
- .add("id", getId())
- .add("match", getTrafficSelector())
- .add("action", getTrafficTreatment())
- .add("ingressPort", getIngressPort())
- .add("egressPort", getEgressPort())
+ .add("id", id())
+ .add("match", selector())
+ .add("action", treatment())
+ .add("ingressPort", ingressPoint())
+ .add("egressPort", egressPoint())
.add("path", path)
.toString();
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/intent/PointToPointIntent.java b/core/api/src/main/java/org/onlab/onos/net/intent/PointToPointIntent.java
index 4c86bae..7b7c18a 100644
--- a/core/api/src/main/java/org/onlab/onos/net/intent/PointToPointIntent.java
+++ b/core/api/src/main/java/org/onlab/onos/net/intent/PointToPointIntent.java
@@ -14,27 +14,27 @@
*/
public class PointToPointIntent extends ConnectivityIntent {
- private final ConnectPoint ingressPort;
- private final ConnectPoint egressPort;
+ private final ConnectPoint ingressPoint;
+ private final ConnectPoint egressPoint;
/**
* Creates a new point-to-point intent with the supplied ingress/egress
* ports.
*
- * @param id intent identifier
- * @param selector traffic selector
- * @param treatment treatment
- * @param ingressPort ingress port
- * @param egressPort egress port
- * @throws NullPointerException if {@code ingressPort} or {@code egressPort} is null.
+ * @param id intent identifier
+ * @param selector traffic selector
+ * @param treatment treatment
+ * @param ingressPoint ingress port
+ * @param egressPoint egress port
+ * @throws NullPointerException if {@code ingressPoint} or {@code egressPoints} is null.
*/
public PointToPointIntent(IntentId id, TrafficSelector selector,
TrafficTreatment treatment,
- ConnectPoint ingressPort,
- ConnectPoint egressPort) {
+ ConnectPoint ingressPoint,
+ ConnectPoint egressPoint) {
super(id, selector, treatment);
- this.ingressPort = checkNotNull(ingressPort);
- this.egressPort = checkNotNull(egressPort);
+ this.ingressPoint = checkNotNull(ingressPoint);
+ this.egressPoint = checkNotNull(egressPoint);
}
/**
@@ -42,8 +42,8 @@
*/
protected PointToPointIntent() {
super();
- this.ingressPort = null;
- this.egressPort = null;
+ this.ingressPoint = null;
+ this.egressPoint = null;
}
/**
@@ -52,8 +52,8 @@
*
* @return ingress port
*/
- public ConnectPoint getIngressPort() {
- return ingressPort;
+ public ConnectPoint ingressPoint() {
+ return ingressPoint;
}
/**
@@ -61,8 +61,8 @@
*
* @return egress port
*/
- public ConnectPoint getEgressPort() {
- return egressPort;
+ public ConnectPoint egressPoint() {
+ return egressPoint;
}
@Override
@@ -78,23 +78,23 @@
}
PointToPointIntent that = (PointToPointIntent) o;
- return Objects.equals(this.ingressPort, that.ingressPort)
- && Objects.equals(this.egressPort, that.egressPort);
+ return Objects.equals(this.ingressPoint, that.ingressPoint)
+ && Objects.equals(this.egressPoint, that.egressPoint);
}
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), ingressPort, egressPort);
+ return Objects.hash(super.hashCode(), ingressPoint, egressPoint);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
- .add("id", getId())
- .add("match", getTrafficSelector())
- .add("action", getTrafficTreatment())
- .add("ingressPort", ingressPort)
- .add("egressPort", egressPort)
+ .add("id", id())
+ .add("match", selector())
+ .add("action", treatment())
+ .add("ingressPoint", ingressPoint)
+ .add("egressPoints", egressPoint)
.toString();
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/intent/SinglePointToMultiPointIntent.java b/core/api/src/main/java/org/onlab/onos/net/intent/SinglePointToMultiPointIntent.java
index af2616b..2a17bfe 100644
--- a/core/api/src/main/java/org/onlab/onos/net/intent/SinglePointToMultiPointIntent.java
+++ b/core/api/src/main/java/org/onlab/onos/net/intent/SinglePointToMultiPointIntent.java
@@ -17,34 +17,34 @@
*/
public class SinglePointToMultiPointIntent extends ConnectivityIntent {
- private final ConnectPoint ingressPort;
- private final Set<ConnectPoint> egressPorts;
+ private final ConnectPoint ingressPoint;
+ private final Set<ConnectPoint> egressPoints;
/**
* Creates a new single-to-multi point connectivity intent.
*
- * @param id intent identifier
- * @param selector traffic selector
- * @param treatment treatment
- * @param ingressPort port on which traffic will ingress
- * @param egressPorts set of ports on which traffic will egress
- * @throws NullPointerException if {@code ingressPort} or
- * {@code egressPorts} is null
- * @throws IllegalArgumentException if the size of {@code egressPorts} is
+ * @param id intent identifier
+ * @param selector traffic selector
+ * @param treatment treatment
+ * @param ingressPoint port on which traffic will ingress
+ * @param egressPoints set of ports on which traffic will egress
+ * @throws NullPointerException if {@code ingressPoint} or
+ * {@code egressPoints} is null
+ * @throws IllegalArgumentException if the size of {@code egressPoints} is
* not more than 1
*/
public SinglePointToMultiPointIntent(IntentId id, TrafficSelector selector,
TrafficTreatment treatment,
- ConnectPoint ingressPort,
- Set<ConnectPoint> egressPorts) {
+ ConnectPoint ingressPoint,
+ Set<ConnectPoint> egressPoints) {
super(id, selector, treatment);
- checkNotNull(egressPorts);
- checkArgument(!egressPorts.isEmpty(),
+ checkNotNull(egressPoints);
+ checkArgument(!egressPoints.isEmpty(),
"there should be at least one egress port");
- this.ingressPort = checkNotNull(ingressPort);
- this.egressPorts = Sets.newHashSet(egressPorts);
+ this.ingressPoint = checkNotNull(ingressPoint);
+ this.egressPoints = Sets.newHashSet(egressPoints);
}
/**
@@ -52,8 +52,8 @@
*/
protected SinglePointToMultiPointIntent() {
super();
- this.ingressPort = null;
- this.egressPorts = null;
+ this.ingressPoint = null;
+ this.egressPoints = null;
}
/**
@@ -61,8 +61,8 @@
*
* @return ingress port
*/
- public ConnectPoint getIngressPort() {
- return ingressPort;
+ public ConnectPoint ingressPoint() {
+ return ingressPoint;
}
/**
@@ -70,8 +70,8 @@
*
* @return set of egress ports
*/
- public Set<ConnectPoint> getEgressPorts() {
- return egressPorts;
+ public Set<ConnectPoint> egressPoints() {
+ return egressPoints;
}
@Override
@@ -87,23 +87,23 @@
}
SinglePointToMultiPointIntent that = (SinglePointToMultiPointIntent) o;
- return Objects.equals(this.ingressPort, that.ingressPort)
- && Objects.equals(this.egressPorts, that.egressPorts);
+ return Objects.equals(this.ingressPoint, that.ingressPoint)
+ && Objects.equals(this.egressPoints, that.egressPoints);
}
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), ingressPort, egressPorts);
+ return Objects.hash(super.hashCode(), ingressPoint, egressPoints);
}
@Override
public String toString() {
return MoreObjects.toStringHelper(getClass())
- .add("id", getId())
- .add("match", getTrafficSelector())
- .add("action", getTrafficTreatment())
- .add("ingressPort", ingressPort)
- .add("egressPort", egressPorts)
+ .add("id", id())
+ .add("match", selector())
+ .add("action", treatment())
+ .add("ingressPoint", ingressPoint)
+ .add("egressPort", egressPoints)
.toString();
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/intent/package-info.java b/core/api/src/main/java/org/onlab/onos/net/intent/package-info.java
index 2517067..ff97f5b 100644
--- a/core/api/src/main/java/org/onlab/onos/net/intent/package-info.java
+++ b/core/api/src/main/java/org/onlab/onos/net/intent/package-info.java
@@ -4,5 +4,53 @@
* <em>what</em> rather than the <em>how</em>. This makes such instructions
* largely independent of topology and device specifics, thus allowing them to
* survive topology mutations.
+ * <p/>
+ * The controller core provides a suite of built-in intents and their compilers
+ * and installers. However, the intent framework is extensible in that it allows
+ * additional intents and their compilers or installers to be added
+ * dynamically at run-time. This allows others to enhance the initial arsenal of
+ * connectivity and policy-based intents available in base controller software.
+ * <p/>
+ * The following diagram depicts the state transition diagram for each top-level intent:<br>
+ * <img src="doc-files/intent-states.png" alt="ONOS intent states">
+ * <p/>
+ * The controller core accepts the intent specifications and translates them, via a
+ * process referred to as intent compilation, to installable intents, which are
+ * essentially actionable operations on the network environment.
+ * These actions are carried out by intent installation process, which results
+ * in some changes to the environment, e.g. tunnel links being provisioned,
+ * flow rules being installed on the data-plane, optical lambdas being reserved.
+ * <p/>
+ * After an intent is submitted by an application, it will be sent immediately
+ * (but asynchronously) into a compiling phase, then to installing phase and if
+ * all goes according to plan into installed state. Once an application decides
+ * it no longer wishes the intent to hold, it can withdraw it. This describes
+ * the nominal flow. However, it may happen that some issue is encountered.
+ * For example, an application may ask for an objective that is not currently
+ * achievable, e.g. connectivity across to unconnected network segments.
+ * If this is the case, the compiling phase may fail to produce a set of
+ * installable intents and instead result in a failed compile. If this occurs,
+ * only a change in the environment can trigger a transition back to the
+ * compiling state.
+ * <p/>
+ * Similarly, an issue may be encountered during the installation phase in
+ * which case the framework will attempt to recompile the intent to see if an
+ * alternate approach is available. If so, the intent will be sent back to
+ * installing phase. Otherwise, it will be parked in the failed state. Another
+ * scenario that’s very likely to be encountered is where the intent is
+ * successfully compiled and installed, but due to some topology event, such
+ * as a downed or downgraded link, loss of throughput may occur or connectivity
+ * may be lost altogether, thus impacting the viability of a previously
+ * satisfied intent. If this occurs, the framework will attempt to recompile
+ * the intent, and if an alternate approach is available, its installation
+ * will be attempted. Otherwise, the original top-level intent will be parked
+ * in the failed state.
+ * <p/>
+ * Please note that all *ing states, depicted in orange, are transitional and
+ * are expected to last only a brief amount of time. The rest of the states
+ * are parking states where the intent may spent some time; except for the
+ * submitted state of course. There, the intent may pause, but only briefly,
+ * while the system determines where to perform the compilation or while it
+ * performs global recomputation/optimization across all prior intents.
*/
package org.onlab.onos.net.intent;
\ No newline at end of file
diff --git a/core/api/src/main/javadoc/org/onlab/onos/net/intent/doc-files/intent-states.png b/core/api/src/main/javadoc/org/onlab/onos/net/intent/doc-files/intent-states.png
new file mode 100644
index 0000000..9a3e1dc
--- /dev/null
+++ b/core/api/src/main/javadoc/org/onlab/onos/net/intent/doc-files/intent-states.png
Binary files differ
diff --git a/core/api/src/test/java/org/onlab/onos/net/intent/FakeIntentManager.java b/core/api/src/test/java/org/onlab/onos/net/intent/FakeIntentManager.java
index 349749e..58c5a9c 100644
--- a/core/api/src/test/java/org/onlab/onos/net/intent/FakeIntentManager.java
+++ b/core/api/src/test/java/org/onlab/onos/net/intent/FakeIntentManager.java
@@ -40,8 +40,7 @@
@Override
public void run() {
try {
- List<InstallableIntent> installable = compileIntent(intent);
- installIntents(intent, installable);
+ executeCompilingPhase(intent);
} catch (IntentException e) {
exceptions.add(e);
}
@@ -55,8 +54,8 @@
@Override
public void run() {
try {
- List<InstallableIntent> installable = getInstallable(intent.getId());
- uninstallIntents(intent, installable);
+ List<InstallableIntent> installable = getInstallable(intent.id());
+ executeWithdrawingPhase(intent, installable);
} catch (IntentException e) {
exceptions.add(e);
}
@@ -84,53 +83,60 @@
return installer;
}
- private <T extends Intent> List<InstallableIntent> compileIntent(T intent) {
+ private <T extends Intent> void executeCompilingPhase(T intent) {
+ setState(intent, IntentState.COMPILING);
try {
// For the fake, we compile using a single level pass
List<InstallableIntent> installable = new ArrayList<>();
for (Intent compiled : getCompiler(intent).compile(intent)) {
installable.add((InstallableIntent) compiled);
}
- setState(intent, IntentState.COMPILED);
- return installable;
+ executeInstallingPhase(intent, installable);
+
} catch (IntentException e) {
setState(intent, IntentState.FAILED);
- throw e;
+ dispatch(new IntentEvent(IntentEvent.Type.FAILED, intent));
}
}
- private void installIntents(Intent intent, List<InstallableIntent> installable) {
+ private void executeInstallingPhase(Intent intent,
+ List<InstallableIntent> installable) {
+ setState(intent, IntentState.INSTALLING);
try {
for (InstallableIntent ii : installable) {
registerSubclassInstallerIfNeeded(ii);
getInstaller(ii).install(ii);
}
setState(intent, IntentState.INSTALLED);
- putInstallable(intent.getId(), installable);
+ putInstallable(intent.id(), installable);
+ dispatch(new IntentEvent(IntentEvent.Type.INSTALLED, intent));
+
} catch (IntentException e) {
setState(intent, IntentState.FAILED);
- throw e;
+ dispatch(new IntentEvent(IntentEvent.Type.FAILED, intent));
}
}
- private void uninstallIntents(Intent intent, List<InstallableIntent> installable) {
+ private void executeWithdrawingPhase(Intent intent,
+ List<InstallableIntent> installable) {
+ setState(intent, IntentState.WITHDRAWING);
try {
for (InstallableIntent ii : installable) {
getInstaller(ii).uninstall(ii);
}
+ removeInstallable(intent.id());
setState(intent, IntentState.WITHDRAWN);
- removeInstallable(intent.getId());
+ dispatch(new IntentEvent(IntentEvent.Type.WITHDRAWN, intent));
} catch (IntentException e) {
+ // FIXME: Rework this to always go from WITHDRAWING to WITHDRAWN!
setState(intent, IntentState.FAILED);
- throw e;
+ dispatch(new IntentEvent(IntentEvent.Type.FAILED, intent));
}
}
// Sets the internal state for the given intent and dispatches an event
private void setState(Intent intent, IntentState state) {
- IntentState previous = intentStates.get(intent.getId());
- intentStates.put(intent.getId(), state);
- dispatch(new IntentEvent(intent, state, previous, System.currentTimeMillis()));
+ intentStates.put(intent.id(), state);
}
private void putInstallable(IntentId id, List<InstallableIntent> installable) {
@@ -152,15 +158,15 @@
@Override
public void submit(Intent intent) {
- intents.put(intent.getId(), intent);
+ intents.put(intent.id(), intent);
setState(intent, IntentState.SUBMITTED);
+ dispatch(new IntentEvent(IntentEvent.Type.SUBMITTED, intent));
executeSubmit(intent);
}
@Override
public void withdraw(Intent intent) {
- intents.remove(intent.getId());
- setState(intent, IntentState.WITHDRAWING);
+ intents.remove(intent.id());
executeWithdraw(intent);
}
diff --git a/core/api/src/test/java/org/onlab/onos/net/intent/IntentServiceTest.java b/core/api/src/test/java/org/onlab/onos/net/intent/IntentServiceTest.java
index 825be86..7eb0e19 100644
--- a/core/api/src/test/java/org/onlab/onos/net/intent/IntentServiceTest.java
+++ b/core/api/src/test/java/org/onlab/onos/net/intent/IntentServiceTest.java
@@ -10,11 +10,9 @@
import java.util.Iterator;
import java.util.List;
-import static org.onlab.onos.net.intent.IntentState.*;
import static org.junit.Assert.*;
+import static org.onlab.onos.net.intent.IntentEvent.Type.*;
-// TODO: consider make it categorized as integration test when it become
-// slow test or fragile test
/**
* Suite of tests for the intent service contract.
*/
@@ -64,13 +62,13 @@
TestTools.assertAfter(GRACE_MS, new Runnable() {
@Override
public void run() {
- assertEquals("incorrect intent state", INSTALLED,
- service.getIntentState(intent.getId()));
+ assertEquals("incorrect intent state", IntentState.INSTALLED,
+ service.getIntentState(intent.id()));
}
});
// Make sure that all expected events have been emitted
- validateEvents(intent, SUBMITTED, COMPILED, INSTALLED);
+ validateEvents(intent, SUBMITTED, INSTALLED);
// Make sure there is just one intent (and is ours)
assertEquals("incorrect intent count", 1, service.getIntentCount());
@@ -85,19 +83,19 @@
TestTools.assertAfter(GRACE_MS, new Runnable() {
@Override
public void run() {
- assertEquals("incorrect intent state", WITHDRAWN,
- service.getIntentState(intent.getId()));
+ assertEquals("incorrect intent state", IntentState.WITHDRAWN,
+ service.getIntentState(intent.id()));
}
});
// Make sure that all expected events have been emitted
- validateEvents(intent, WITHDRAWING, WITHDRAWN);
+ validateEvents(intent, WITHDRAWN);
// TODO: discuss what is the fate of intents after they have been withdrawn
// Make sure that the intent is no longer in the system
// assertEquals("incorrect intent count", 0, service.getIntents().size());
-// assertNull("intent should not be found", service.getIntent(intent.getId()));
-// assertNull("intent state should not be found", service.getIntentState(intent.getId()));
+// assertNull("intent should not be found", service.getIntent(intent.id()));
+// assertNull("intent state should not be found", service.getIntentState(intent.id()));
}
@Test
@@ -113,8 +111,8 @@
TestTools.assertAfter(GRACE_MS, new Runnable() {
@Override
public void run() {
- assertEquals("incorrect intent state", FAILED,
- service.getIntentState(intent.getId()));
+ assertEquals("incorrect intent state", IntentState.FAILED,
+ service.getIntentState(intent.id()));
}
});
@@ -136,13 +134,13 @@
TestTools.assertAfter(GRACE_MS, new Runnable() {
@Override
public void run() {
- assertEquals("incorrect intent state", FAILED,
- service.getIntentState(intent.getId()));
+ assertEquals("incorrect intent state", IntentState.FAILED,
+ service.getIntentState(intent.id()));
}
});
// Make sure that all expected events have been emitted
- validateEvents(intent, SUBMITTED, COMPILED, FAILED);
+ validateEvents(intent, SUBMITTED, FAILED);
}
/**
@@ -151,23 +149,23 @@
* considered.
*
* @param intent intent subject
- * @param states list of states for which events are expected
+ * @param types list of event types for which events are expected
*/
- protected void validateEvents(Intent intent, IntentState... states) {
+ protected void validateEvents(Intent intent, IntentEvent.Type... types) {
Iterator<IntentEvent> events = listener.events.iterator();
- for (IntentState state : states) {
+ for (IntentEvent.Type type : types) {
IntentEvent event = events.hasNext() ? events.next() : null;
if (event == null) {
- fail("expected event not found: " + state);
- } else if (intent.equals(event.getIntent())) {
- assertEquals("incorrect state", state, event.getState());
+ fail("expected event not found: " + type);
+ } else if (intent.equals(event.subject())) {
+ assertEquals("incorrect state", type, event.type());
}
}
// Remainder of events should not apply to this intent; make sure.
while (events.hasNext()) {
assertFalse("unexpected event for intent",
- intent.equals(events.next().getIntent()));
+ intent.equals(events.next().subject()));
}
}
@@ -228,8 +226,8 @@
TestTools.assertAfter(GRACE_MS, new Runnable() {
@Override
public void run() {
- assertEquals("incorrect intent state", INSTALLED,
- service.getIntentState(intent.getId()));
+ assertEquals("incorrect intent state", IntentState.INSTALLED,
+ service.getIntentState(intent.id()));
}
});
diff --git a/core/api/src/test/java/org/onlab/onos/net/intent/MultiPointToSinglePointIntentTest.java b/core/api/src/test/java/org/onlab/onos/net/intent/MultiPointToSinglePointIntentTest.java
index d971ba2..66d294a 100644
--- a/core/api/src/test/java/org/onlab/onos/net/intent/MultiPointToSinglePointIntentTest.java
+++ b/core/api/src/test/java/org/onlab/onos/net/intent/MultiPointToSinglePointIntentTest.java
@@ -12,10 +12,10 @@
@Test
public void basics() {
MultiPointToSinglePointIntent intent = createOne();
- assertEquals("incorrect id", IID, intent.getId());
- assertEquals("incorrect match", MATCH, intent.getTrafficSelector());
- assertEquals("incorrect ingress", PS1, intent.getIngressPorts());
- assertEquals("incorrect egress", P2, intent.getEgressPort());
+ assertEquals("incorrect id", IID, intent.id());
+ assertEquals("incorrect match", MATCH, intent.selector());
+ assertEquals("incorrect ingress", PS1, intent.ingressPoints());
+ assertEquals("incorrect egress", P2, intent.egressPoint());
}
@Override
diff --git a/core/api/src/test/java/org/onlab/onos/net/intent/PathIntentTest.java b/core/api/src/test/java/org/onlab/onos/net/intent/PathIntentTest.java
index bd8dc08..7c15c37 100644
--- a/core/api/src/test/java/org/onlab/onos/net/intent/PathIntentTest.java
+++ b/core/api/src/test/java/org/onlab/onos/net/intent/PathIntentTest.java
@@ -16,12 +16,12 @@
@Test
public void basics() {
PathIntent intent = createOne();
- assertEquals("incorrect id", IID, intent.getId());
- assertEquals("incorrect match", MATCH, intent.getTrafficSelector());
- assertEquals("incorrect action", NOP, intent.getTrafficTreatment());
- assertEquals("incorrect ingress", P1, intent.getIngressPort());
- assertEquals("incorrect egress", P2, intent.getEgressPort());
- assertEquals("incorrect path", PATH1, intent.getPath());
+ assertEquals("incorrect id", IID, intent.id());
+ assertEquals("incorrect match", MATCH, intent.selector());
+ assertEquals("incorrect action", NOP, intent.treatment());
+ assertEquals("incorrect ingress", P1, intent.ingressPoint());
+ assertEquals("incorrect egress", P2, intent.egressPoint());
+ assertEquals("incorrect path", PATH1, intent.path());
}
@Override
diff --git a/core/api/src/test/java/org/onlab/onos/net/intent/PointToPointIntentTest.java b/core/api/src/test/java/org/onlab/onos/net/intent/PointToPointIntentTest.java
index 426a3d9..e0c5562 100644
--- a/core/api/src/test/java/org/onlab/onos/net/intent/PointToPointIntentTest.java
+++ b/core/api/src/test/java/org/onlab/onos/net/intent/PointToPointIntentTest.java
@@ -12,10 +12,10 @@
@Test
public void basics() {
PointToPointIntent intent = createOne();
- assertEquals("incorrect id", IID, intent.getId());
- assertEquals("incorrect match", MATCH, intent.getTrafficSelector());
- assertEquals("incorrect ingress", P1, intent.getIngressPort());
- assertEquals("incorrect egress", P2, intent.getEgressPort());
+ assertEquals("incorrect id", IID, intent.id());
+ assertEquals("incorrect match", MATCH, intent.selector());
+ assertEquals("incorrect ingress", P1, intent.ingressPoint());
+ assertEquals("incorrect egress", P2, intent.egressPoint());
}
@Override
diff --git a/core/api/src/test/java/org/onlab/onos/net/intent/SinglePointToMultiPointIntentTest.java b/core/api/src/test/java/org/onlab/onos/net/intent/SinglePointToMultiPointIntentTest.java
index 0561a87..64c9292 100644
--- a/core/api/src/test/java/org/onlab/onos/net/intent/SinglePointToMultiPointIntentTest.java
+++ b/core/api/src/test/java/org/onlab/onos/net/intent/SinglePointToMultiPointIntentTest.java
@@ -12,10 +12,10 @@
@Test
public void basics() {
SinglePointToMultiPointIntent intent = createOne();
- assertEquals("incorrect id", IID, intent.getId());
- assertEquals("incorrect match", MATCH, intent.getTrafficSelector());
- assertEquals("incorrect ingress", P1, intent.getIngressPort());
- assertEquals("incorrect egress", PS2, intent.getEgressPorts());
+ assertEquals("incorrect id", IID, intent.id());
+ assertEquals("incorrect match", MATCH, intent.selector());
+ assertEquals("incorrect ingress", P1, intent.ingressPoint());
+ assertEquals("incorrect egress", PS2, intent.egressPoints());
}
@Override
diff --git a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
index 385e300..ce11cea 100644
--- a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
@@ -40,14 +40,14 @@
@Component(immediate = true)
@Service
public class FlowRuleManager
-extends AbstractProviderRegistry<FlowRuleProvider, FlowRuleProviderService>
-implements FlowRuleService, FlowRuleProviderRegistry {
+ extends AbstractProviderRegistry<FlowRuleProvider, FlowRuleProviderService>
+ implements FlowRuleService, FlowRuleProviderRegistry {
public static final String FLOW_RULE_NULL = "FlowRule cannot be null";
private final Logger log = getLogger(getClass());
private final AbstractListenerRegistry<FlowRuleEvent, FlowRuleListener>
- listenerRegistry = new AbstractListenerRegistry<>();
+ listenerRegistry = new AbstractListenerRegistry<>();
private final FlowRuleStoreDelegate delegate = new InternalStoreDelegate();
@@ -75,6 +75,11 @@
}
@Override
+ public int getFlowRuleCount() {
+ return store.getFlowRuleCount();
+ }
+
+ @Override
public Iterable<FlowEntry> getFlowEntries(DeviceId deviceId) {
return store.getFlowEntries(deviceId);
}
@@ -98,15 +103,17 @@
for (int i = 0; i < flowRules.length; i++) {
f = flowRules[i];
device = deviceService.getDevice(f.deviceId());
- frp = getProvider(device.providerId());
store.deleteFlowRule(f);
- frp.removeFlowRule(f);
+ if (device != null) {
+ frp = getProvider(device.providerId());
+ frp.removeFlowRule(f);
+ }
}
}
@Override
public void removeFlowRulesById(ApplicationId id) {
- Iterable<FlowRule> rules = getFlowRulesById(id);
+ Iterable<FlowRule> rules = getFlowRulesById(id);
FlowRuleProvider frp;
Device device;
@@ -140,8 +147,8 @@
}
private class InternalFlowRuleProviderService
- extends AbstractProviderService<FlowRuleProvider>
- implements FlowRuleProviderService {
+ extends AbstractProviderService<FlowRuleProvider>
+ implements FlowRuleProviderService {
protected InternalFlowRuleProviderService(FlowRuleProvider provider) {
super(provider);
@@ -160,16 +167,16 @@
FlowRuleProvider frp = getProvider(device.providerId());
FlowRuleEvent event = null;
switch (stored.state()) {
- case ADDED:
- case PENDING_ADD:
+ case ADDED:
+ case PENDING_ADD:
frp.applyFlowRule(stored);
- break;
- case PENDING_REMOVE:
- case REMOVED:
- event = store.removeFlowRule(stored);
- break;
- default:
- break;
+ break;
+ case PENDING_REMOVE:
+ case REMOVED:
+ event = store.removeFlowRule(stored);
+ break;
+ default:
+ break;
}
if (event != null) {
@@ -186,17 +193,17 @@
FlowRuleProvider frp = getProvider(device.providerId());
FlowRuleEvent event = null;
switch (flowRule.state()) {
- case PENDING_REMOVE:
- case REMOVED:
- event = store.removeFlowRule(flowRule);
- frp.removeFlowRule(flowRule);
- break;
- case ADDED:
- case PENDING_ADD:
- frp.applyFlowRule(flowRule);
- break;
- default:
- log.debug("Flow {} has not been installed.", flowRule);
+ case PENDING_REMOVE:
+ case REMOVED:
+ event = store.removeFlowRule(flowRule);
+ frp.removeFlowRule(flowRule);
+ break;
+ case ADDED:
+ case PENDING_ADD:
+ frp.applyFlowRule(flowRule);
+ break;
+ default:
+ log.debug("Flow {} has not been installed.", flowRule);
}
if (event != null) {
diff --git a/core/net/src/main/java/org/onlab/onos/net/intent/impl/HostToHostIntentCompiler.java b/core/net/src/main/java/org/onlab/onos/net/intent/impl/HostToHostIntentCompiler.java
index 541a702..de61e8e 100644
--- a/core/net/src/main/java/org/onlab/onos/net/intent/impl/HostToHostIntentCompiler.java
+++ b/core/net/src/main/java/org/onlab/onos/net/intent/impl/HostToHostIntentCompiler.java
@@ -71,11 +71,11 @@
private Intent createPathIntent(Path path, Host src, Host dst,
HostToHostIntent intent) {
- TrafficSelector selector = builder(intent.getTrafficSelector())
+ TrafficSelector selector = builder(intent.selector())
.matchEthSrc(src.mac()).matchEthDst(dst.mac()).build();
return new PathIntent(intentIdGenerator.getNewId(),
- selector, intent.getTrafficTreatment(),
+ selector, intent.treatment(),
path.src(), path.dst(), path);
}
diff --git a/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java b/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
index ebcb789..16b75f2 100644
--- a/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/intent/impl/IntentManager.java
@@ -1,6 +1,25 @@
package org.onlab.onos.net.intent.impl;
-import com.google.common.collect.ImmutableMap;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.onlab.onos.net.intent.IntentState.COMPILING;
+import static org.onlab.onos.net.intent.IntentState.FAILED;
+import static org.onlab.onos.net.intent.IntentState.INSTALLED;
+import static org.onlab.onos.net.intent.IntentState.INSTALLING;
+import static org.onlab.onos.net.intent.IntentState.RECOMPILING;
+import static org.onlab.onos.net.intent.IntentState.WITHDRAWING;
+import static org.onlab.onos.net.intent.IntentState.WITHDRAWN;
+import static org.onlab.util.Tools.namedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -25,15 +44,7 @@
import org.onlab.onos.net.intent.IntentStoreDelegate;
import org.slf4j.Logger;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.onlab.onos.net.intent.IntentState.*;
-import static org.slf4j.LoggerFactory.getLogger;
+import com.google.common.collect.ImmutableMap;
/**
* An implementation of Intent Manager.
@@ -56,6 +67,8 @@
private final AbstractListenerRegistry<IntentEvent, IntentListener>
listenerRegistry = new AbstractListenerRegistry<>();
+ private final ExecutorService executor = newSingleThreadExecutor(namedThreads("onos-intents"));
+
private final IntentStoreDelegate delegate = new InternalStoreDelegate();
private final TopologyChangeDelegate topoDelegate = new InternalTopoChangeDelegate();
@@ -63,7 +76,7 @@
protected IntentStore store;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected FlowTrackerService trackerService;
+ protected ObjectiveTrackerService trackerService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected EventDeliveryService eventDispatcher;
@@ -89,21 +102,16 @@
checkNotNull(intent, INTENT_NULL);
registerSubclassCompilerIfNeeded(intent);
IntentEvent event = store.createIntent(intent);
- processStoreEvent(event);
+ if (event != null) {
+ eventDispatcher.post(event);
+ executor.execute(new IntentTask(COMPILING, intent));
+ }
}
@Override
public void withdraw(Intent intent) {
checkNotNull(intent, INTENT_NULL);
- IntentEvent event = store.setState(intent, WITHDRAWING);
- List<InstallableIntent> installables = store.getInstallableIntents(intent.getId());
- if (installables != null) {
- for (InstallableIntent installable : installables) {
- trackerService.removeTrackedResources(intent.getId(),
- installable.requiredLinks());
- }
- }
- processStoreEvent(event);
+ executor.execute(new IntentTask(WITHDRAWING, intent));
}
// FIXME: implement this method
@@ -207,56 +215,142 @@
}
/**
- * Compiles an intent.
+ * Compiles the specified intent.
*
- * @param intent intent
+ * @param intent intent to be compiled
*/
- private void compileIntent(Intent intent) {
- // FIXME: To make SDN-IP workable ASAP, only single level compilation is implemented
- // TODO: implement compilation traversing tree structure
+ private void executeCompilingPhase(Intent intent) {
+ // Indicate that the intent is entering the compiling phase.
+ store.setState(intent, COMPILING);
+
+ try {
+ // Compile the intent into installable derivatives.
+ List<InstallableIntent> installable = compileIntent(intent);
+
+ // If all went well, associate the resulting list of installable
+ // intents with the top-level intent and proceed to install.
+ store.addInstallableIntents(intent.id(), installable);
+ executeInstallingPhase(intent);
+
+ } catch (Exception e) {
+ log.warn("Unable to compile intent {} due to: {}", intent.id(), e);
+
+ // If compilation failed, mark the intent as failed.
+ store.setState(intent, FAILED);
+ }
+ }
+
+ // FIXME: To make SDN-IP workable ASAP, only single level compilation is implemented
+ // TODO: implement compilation traversing tree structure
+ private List<InstallableIntent> compileIntent(Intent intent) {
List<InstallableIntent> installable = new ArrayList<>();
for (Intent compiled : getCompiler(intent).compile(intent)) {
InstallableIntent installableIntent = (InstallableIntent) compiled;
installable.add(installableIntent);
- trackerService.addTrackedResources(intent.getId(),
- installableIntent.requiredLinks());
}
- IntentEvent event = store.addInstallableIntents(intent.getId(), installable);
- processStoreEvent(event);
+ return installable;
}
/**
- * Installs an intent.
+ * Installs all installable intents associated with the specified top-level
+ * intent.
*
- * @param intent intent
+ * @param intent intent to be installed
*/
- private void installIntent(Intent intent) {
- List<InstallableIntent> installables = store.getInstallableIntents(intent.getId());
- if (installables != null) {
- for (InstallableIntent installable : installables) {
- registerSubclassInstallerIfNeeded(installable);
- getInstaller(installable).install(installable);
- }
- }
- IntentEvent event = store.setState(intent, INSTALLED);
- processStoreEvent(event);
+ private void executeInstallingPhase(Intent intent) {
+ // Indicate that the intent is entering the installing phase.
+ store.setState(intent, INSTALLING);
+ try {
+ List<InstallableIntent> installables = store.getInstallableIntents(intent.id());
+ if (installables != null) {
+ for (InstallableIntent installable : installables) {
+ registerSubclassInstallerIfNeeded(installable);
+ trackerService.addTrackedResources(intent.id(),
+ installable.requiredLinks());
+ getInstaller(installable).install(installable);
+ }
+ }
+ eventDispatcher.post(store.setState(intent, INSTALLED));
+
+ } catch (Exception e) {
+ log.warn("Unable to install intent {} due to: {}", intent.id(), e);
+ uninstallIntent(intent);
+
+ // If compilation failed, kick off the recompiling phase.
+ executeRecompilingPhase(intent);
+ }
}
/**
- * Uninstalls an intent.
+ * Recompiles the specified intent.
*
- * @param intent intent
+ * @param intent intent to be recompiled
+ */
+ private void executeRecompilingPhase(Intent intent) {
+ // Indicate that the intent is entering the recompiling phase.
+ store.setState(intent, RECOMPILING);
+
+ try {
+ // Compile the intent into installable derivatives.
+ List<InstallableIntent> installable = compileIntent(intent);
+
+ // If all went well, compare the existing list of installable
+ // intents with the newly compiled list. If they are the same,
+ // bail, out since the previous approach was determined not to
+ // be viable.
+ List<InstallableIntent> originalInstallable =
+ store.getInstallableIntents(intent.id());
+
+ if (Objects.equals(originalInstallable, installable)) {
+ eventDispatcher.post(store.setState(intent, FAILED));
+ } else {
+ // Otherwise, re-associate the newly compiled installable intents
+ // with the top-level intent and kick off installing phase.
+ store.addInstallableIntents(intent.id(), installable);
+ executeInstallingPhase(intent);
+ }
+ } catch (Exception e) {
+ log.warn("Unable to recompile intent {} due to: {}", intent.id(), e);
+
+ // If compilation failed, mark the intent as failed.
+ eventDispatcher.post(store.setState(intent, FAILED));
+ }
+ }
+
+ /**
+ * Uninstalls the specified intent by uninstalling all of its associated
+ * installable derivatives.
+ *
+ * @param intent intent to be installed
+ */
+ private void executeWithdrawingPhase(Intent intent) {
+ // Indicate that the intent is being withdrawn.
+ store.setState(intent, WITHDRAWING);
+ uninstallIntent(intent);
+
+ // If all went well, disassociate the top-level intent with its
+ // installable derivatives and mark it as withdrawn.
+ store.removeInstalledIntents(intent.id());
+ eventDispatcher.post(store.setState(intent, WITHDRAWN));
+ }
+
+ /**
+ * Uninstalls all installable intents associated with the given intent.
+ *
+ * @param intent intent to be uninstalled
*/
private void uninstallIntent(Intent intent) {
- List<InstallableIntent> installables = store.getInstallableIntents(intent.getId());
- if (installables != null) {
- for (InstallableIntent installable : installables) {
- getInstaller(installable).uninstall(installable);
+ try {
+ List<InstallableIntent> installables = store.getInstallableIntents(intent.id());
+ if (installables != null) {
+ for (InstallableIntent installable : installables) {
+ getInstaller(installable).uninstall(installable);
+ }
}
+ } catch (IntentException e) {
+ log.warn("Unable to uninstall intent {} due to: {}", intent.id(), e);
}
- store.removeInstalledIntents(intent.getId());
- store.setState(intent, WITHDRAWN);
}
/**
@@ -309,63 +403,61 @@
}
}
- /**
- * Handles state transition of submitted intents.
- */
- private void processStoreEvent(IntentEvent event) {
- eventDispatcher.post(event);
- Intent intent = event.getIntent();
- try {
- switch (event.getState()) {
- case SUBMITTED:
- compileIntent(intent);
- break;
- case COMPILED:
- installIntent(intent);
- break;
- case INSTALLED:
- break;
- case WITHDRAWING:
- uninstallIntent(intent);
- break;
- case WITHDRAWN:
- break;
- case FAILED:
- break;
- default:
- throw new IllegalStateException("the state of IntentEvent is illegal: " +
- event.getState());
- }
- } catch (IntentException e) {
- store.setState(intent, FAILED);
- }
-
- }
-
// Store delegate to re-post events emitted from the store.
private class InternalStoreDelegate implements IntentStoreDelegate {
@Override
public void notify(IntentEvent event) {
- processStoreEvent(event);
+ eventDispatcher.post(event);
+ if (event.type() == IntentEvent.Type.SUBMITTED) {
+ executor.execute(new IntentTask(COMPILING, event.subject()));
+ }
}
}
// Topology change delegate
private class InternalTopoChangeDelegate implements TopologyChangeDelegate {
@Override
- public void bumpIntents(Iterable<IntentId> intentIds) {
- for (IntentId intentId : intentIds) {
- compileIntent(getIntent(intentId));
- }
- }
-
- @Override
- public void failIntents(Iterable<IntentId> intentIds) {
+ public void triggerCompile(Iterable<IntentId> intentIds,
+ boolean compileAllFailed) {
+ // Attempt recompilation of the specified intents first.
for (IntentId intentId : intentIds) {
Intent intent = getIntent(intentId);
uninstallIntent(intent);
- compileIntent(intent);
+
+ executeRecompilingPhase(intent);
+ }
+
+ if (compileAllFailed) {
+ // If required, compile all currently failed intents.
+ for (Intent intent : getIntents()) {
+ if (getIntentState(intent.id()) == FAILED) {
+ executeCompilingPhase(intent);
+ }
+ }
}
}
}
+
+ // Auxiliary runnable to perform asynchronous tasks.
+ private class IntentTask implements Runnable {
+ private final IntentState state;
+ private final Intent intent;
+
+ public IntentTask(IntentState state, Intent intent) {
+ this.state = state;
+ this.intent = intent;
+ }
+
+ @Override
+ public void run() {
+ if (state == COMPILING) {
+ executeCompilingPhase(intent);
+ } else if (state == RECOMPILING) {
+ executeRecompilingPhase(intent);
+ } else if (state == WITHDRAWING) {
+ executeWithdrawingPhase(intent);
+ }
+ }
+ }
+
}
diff --git a/core/net/src/main/java/org/onlab/onos/net/intent/impl/FlowTracker.java b/core/net/src/main/java/org/onlab/onos/net/intent/impl/ObjectiveTracker.java
similarity index 82%
rename from core/net/src/main/java/org/onlab/onos/net/intent/impl/FlowTracker.java
rename to core/net/src/main/java/org/onlab/onos/net/intent/impl/ObjectiveTracker.java
index 8f4a5c7..d84c367 100644
--- a/core/net/src/main/java/org/onlab/onos/net/intent/impl/FlowTracker.java
+++ b/core/net/src/main/java/org/onlab/onos/net/intent/impl/ObjectiveTracker.java
@@ -19,12 +19,15 @@
import org.slf4j.Logger;
import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.collect.Multimaps.synchronizedSetMultimap;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.onlab.onos.net.link.LinkEvent.Type.LINK_REMOVED;
import static org.onlab.util.Tools.namedThreads;
import static org.slf4j.LoggerFactory.getLogger;
@@ -34,7 +37,7 @@
*/
@Component
@Service
-public class FlowTracker implements FlowTrackerService {
+public class ObjectiveTracker implements ObjectiveTrackerService {
private final Logger log = getLogger(getClass());
@@ -110,19 +113,26 @@
@Override
public void run() {
if (event.reasons() == null) {
- delegate.bumpIntents(intentsByLink.values());
+ delegate.triggerCompile(new HashSet<IntentId>(), true);
+
} else {
+ Set<IntentId> toBeRecompiled = new HashSet<>();
+ boolean recompileOnly = true;
+
+ // Scan through the list of reasons and keep accruing all
+ // intents that need to be recompiled.
for (Event reason : event.reasons()) {
if (reason instanceof LinkEvent) {
LinkEvent linkEvent = (LinkEvent) reason;
- if (linkEvent.type() == LinkEvent.Type.LINK_ADDED ||
- linkEvent.type() == LinkEvent.Type.LINK_UPDATED) {
- delegate.bumpIntents(intentsByLink.get(new LinkKey(linkEvent.subject())));
- } else if (linkEvent.type() == LinkEvent.Type.LINK_REMOVED) {
- delegate.failIntents(intentsByLink.get(new LinkKey(linkEvent.subject())));
+ if (linkEvent.type() == LINK_REMOVED) {
+ Set<IntentId> intentIds = intentsByLink.get(new LinkKey(linkEvent.subject()));
+ toBeRecompiled.addAll(intentIds);
}
+ recompileOnly = recompileOnly && linkEvent.type() == LINK_REMOVED;
}
}
+
+ delegate.triggerCompile(toBeRecompiled, !recompileOnly);
}
}
}
diff --git a/core/net/src/main/java/org/onlab/onos/net/intent/impl/FlowTrackerService.java b/core/net/src/main/java/org/onlab/onos/net/intent/impl/ObjectiveTrackerService.java
similarity index 96%
rename from core/net/src/main/java/org/onlab/onos/net/intent/impl/FlowTrackerService.java
rename to core/net/src/main/java/org/onlab/onos/net/intent/impl/ObjectiveTrackerService.java
index b96de7c..15496ff 100644
--- a/core/net/src/main/java/org/onlab/onos/net/intent/impl/FlowTrackerService.java
+++ b/core/net/src/main/java/org/onlab/onos/net/intent/impl/ObjectiveTrackerService.java
@@ -9,7 +9,7 @@
* Auxiliary service for tracking intent path flows and for notifying the
* intent service of environment changes via topology change delegate.
*/
-public interface FlowTrackerService {
+public interface ObjectiveTrackerService {
/**
* Sets a topology change delegate.
diff --git a/core/net/src/main/java/org/onlab/onos/net/intent/impl/PathIntentInstaller.java b/core/net/src/main/java/org/onlab/onos/net/intent/impl/PathIntentInstaller.java
index f9cfa67..a0995e4 100644
--- a/core/net/src/main/java/org/onlab/onos/net/intent/impl/PathIntentInstaller.java
+++ b/core/net/src/main/java/org/onlab/onos/net/intent/impl/PathIntentInstaller.java
@@ -1,6 +1,7 @@
package org.onlab.onos.net.intent.impl;
import static org.onlab.onos.net.flow.DefaultTrafficTreatment.builder;
+import static org.slf4j.LoggerFactory.getLogger;
import java.util.Iterator;
@@ -21,6 +22,7 @@
import org.onlab.onos.net.intent.IntentExtensionService;
import org.onlab.onos.net.intent.IntentInstaller;
import org.onlab.onos.net.intent.PathIntent;
+import org.slf4j.Logger;
/**
* Installer for {@link PathIntent path connectivity intents}.
@@ -28,6 +30,8 @@
@Component(immediate = true)
public class PathIntentInstaller implements IntentInstaller<PathIntent> {
+ private final Logger log = getLogger(getClass());
+
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected IntentExtensionService intentManager;
@@ -49,8 +53,8 @@
@Override
public void install(PathIntent intent) {
TrafficSelector.Builder builder =
- DefaultTrafficSelector.builder(intent.getTrafficSelector());
- Iterator<Link> links = intent.getPath().links().iterator();
+ DefaultTrafficSelector.builder(intent.selector());
+ Iterator<Link> links = intent.path().links().iterator();
ConnectPoint prev = links.next().dst();
while (links.hasNext()) {
@@ -70,8 +74,8 @@
@Override
public void uninstall(PathIntent intent) {
TrafficSelector.Builder builder =
- DefaultTrafficSelector.builder(intent.getTrafficSelector());
- Iterator<Link> links = intent.getPath().links().iterator();
+ DefaultTrafficSelector.builder(intent.selector());
+ Iterator<Link> links = intent.path().links().iterator();
ConnectPoint prev = links.next().dst();
while (links.hasNext()) {
@@ -82,6 +86,7 @@
FlowRule rule = new DefaultFlowRule(link.src().deviceId(),
builder.build(), treatment,
123, appId, 600);
+
flowRuleService.removeFlowRules(rule);
prev = link.dst();
}
diff --git a/core/net/src/main/java/org/onlab/onos/net/intent/impl/TopologyChangeDelegate.java b/core/net/src/main/java/org/onlab/onos/net/intent/impl/TopologyChangeDelegate.java
index 8c39c75..30e6899 100644
--- a/core/net/src/main/java/org/onlab/onos/net/intent/impl/TopologyChangeDelegate.java
+++ b/core/net/src/main/java/org/onlab/onos/net/intent/impl/TopologyChangeDelegate.java
@@ -9,18 +9,14 @@
/**
* Notifies that topology has changed in such a way that the specified
- * intents should be recompiled.
+ * intents should be recompiled. If the {@code compileAllFailed} parameter
+ * is true, then all intents in {@link org.onlab.onos.net.intent.IntentState#FAILED}
+ * state should be compiled as well.
*
* @param intentIds intents that should be recompiled
+ * @param compileAllFailed true implies full compile of all failed intents
+ * is required; false for selective recompile only
*/
- void bumpIntents(Iterable<IntentId> intentIds);
-
- /**
- * Notifies that topology has changed in such a way that the specified
- * intents should be marked failed and then recompiled.
- *
- * @param intentIds intents that should be failed and recompiled
- */
- void failIntents(Iterable<IntentId> intentIds);
+ void triggerCompile(Iterable<IntentId> intentIds, boolean compileAllFailed);
}
diff --git a/core/net/src/test/java/org/onlab/onos/net/device/impl/DistributedDeviceManagerTest.java b/core/net/src/test/java/org/onlab/onos/net/device/impl/DistributedDeviceManagerTest.java
index 10e9b39..3f8c0a8 100644
--- a/core/net/src/test/java/org/onlab/onos/net/device/impl/DistributedDeviceManagerTest.java
+++ b/core/net/src/test/java/org/onlab/onos/net/device/impl/DistributedDeviceManagerTest.java
@@ -36,8 +36,6 @@
import org.onlab.onos.store.common.StoreService;
import org.onlab.onos.store.common.TestStoreManager;
import org.onlab.onos.store.device.impl.DistributedDeviceStore;
-import org.onlab.onos.store.serializers.KryoSerializationManager;
-import org.onlab.onos.store.serializers.KryoSerializationService;
import org.onlab.packet.IpPrefix;
import java.util.ArrayList;
@@ -95,7 +93,6 @@
private DistributedDeviceStore dstore;
private TestMastershipManager masterManager;
private EventDeliveryService eventService;
- private KryoSerializationManager serializationMgr;
@Before
public void setUp() {
@@ -111,10 +108,7 @@
storeManager = new TestStoreManager(Hazelcast.newHazelcastInstance(config));
storeManager.activate();
- serializationMgr = new KryoSerializationManager();
- serializationMgr.activate();
-
- dstore = new TestDistributedDeviceStore(storeManager, serializationMgr);
+ dstore = new TestDistributedDeviceStore(storeManager);
dstore.activate();
mgr.store = dstore;
@@ -140,7 +134,6 @@
mgr.deactivate();
dstore.deactivate();
- serializationMgr.deactivate();
storeManager.deactivate();
}
@@ -306,10 +299,8 @@
private class TestDistributedDeviceStore extends DistributedDeviceStore {
- public TestDistributedDeviceStore(StoreService storeService,
- KryoSerializationService kryoSerializationService) {
+ public TestDistributedDeviceStore(StoreService storeService) {
this.storeService = storeService;
- this.kryoSerializationService = kryoSerializationService;
}
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterManagementMessageSubjects.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterManagementMessageSubjects.java
new file mode 100644
index 0000000..74c22f1
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterManagementMessageSubjects.java
@@ -0,0 +1,10 @@
+package org.onlab.onos.store.cluster.impl;
+
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+
+public final class ClusterManagementMessageSubjects {
+ // avoid instantiation
+ private ClusterManagementMessageSubjects() {}
+
+ public static final MessageSubject CLUSTER_MEMBERSHIP_EVENT = new MessageSubject("CLUSTER_MEMBERSHIP_EVENT");
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterMembershipEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMembershipEvent.java
similarity index 91%
rename from core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterMembershipEvent.java
rename to core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMembershipEvent.java
index 961ed4f..30b847f 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterMembershipEvent.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMembershipEvent.java
@@ -1,4 +1,4 @@
-package org.onlab.onos.store.cluster.messaging.impl;
+package org.onlab.onos.store.cluster.impl;
import org.onlab.onos.cluster.ControllerNode;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterMembershipEventType.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMembershipEventType.java
similarity index 69%
rename from core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterMembershipEventType.java
rename to core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMembershipEventType.java
index 1f5fd3f..cdfd145 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterMembershipEventType.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/ClusterMembershipEventType.java
@@ -1,4 +1,4 @@
-package org.onlab.onos.store.cluster.messaging.impl;
+package org.onlab.onos.store.cluster.impl;
public enum ClusterMembershipEventType {
NEW_MEMBER,
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java
index 5d04a46..2bdf5a0 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessage.java
@@ -11,14 +11,15 @@
private final NodeId sender;
private final MessageSubject subject;
- private final Object payload;
+ private final byte[] payload;
+ // TODO: add field specifying Serializer for payload
/**
* Creates a cluster message.
*
* @param subject message subject
*/
- public ClusterMessage(NodeId sender, MessageSubject subject, Object payload) {
+ public ClusterMessage(NodeId sender, MessageSubject subject, byte[] payload) {
this.sender = sender;
this.subject = subject;
this.payload = payload;
@@ -47,7 +48,7 @@
*
* @return message payload.
*/
- public Object payload() {
+ public byte[] payload() {
return payload;
}
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/SerializationService.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/SerializationService.java
index d85f488..4d76ce3 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/SerializationService.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/SerializationService.java
@@ -1,7 +1,7 @@
package org.onlab.onos.store.cluster.messaging;
/**
- * Service for encoding & decoding intra-cluster messages.
+ * Service for encoding & decoding intra-cluster message payload.
*/
public interface SerializationService {
@@ -11,7 +11,7 @@
* @param buffer byte buffer with message(s)
* @return parsed message
*/
- Object decode(byte[] data);
+ <T> T decode(byte[] data);
/**
* Encodes the specified message into the given byte buffer.
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
index 2e8937c..babe4d3 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -12,17 +12,20 @@
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
import org.onlab.onos.cluster.ControllerNode;
import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.store.cluster.impl.ClusterMembershipEvent;
+import org.onlab.onos.store.cluster.impl.ClusterMembershipEventType;
import org.onlab.onos.store.cluster.impl.ClusterNodesDelegate;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationAdminService;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
import org.onlab.onos.store.cluster.messaging.ClusterMessage;
import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
import org.onlab.onos.store.cluster.messaging.MessageSubject;
+import org.onlab.onos.store.serializers.KryoPoolUtil;
+import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.util.KryoPool;
import org.onlab.netty.Endpoint;
import org.onlab.netty.Message;
import org.onlab.netty.MessageHandler;
@@ -44,16 +47,35 @@
private final Timer timer = new Timer("onos-controller-heatbeats");
public static final long HEART_BEAT_INTERVAL_MILLIS = 1000L;
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ // TODO: This probably should not be a OSGi service.
+ //@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
private MessagingService messagingService;
+ private static final KryoSerializer SERIALIZER = new KryoSerializer() {
+ protected void setupKryoPool() {
+ serializerPool = KryoPool.newBuilder()
+ .register(KryoPoolUtil.API)
+ .register(ClusterMessage.class)
+ .register(ClusterMembershipEvent.class)
+ .build()
+ .populate(1);
+ }
+
+ };
+
@Activate
public void activate() {
+ // TODO: initialize messagingService
+ // TODO: setPayloadSerializer, which is capable of
+ // (1) serialize ClusterMessage - ClusterMessage.payload
+ // (2) serialize ClusterMessage.payload using user specified serializer
+// messagingService.setPayloadSerializer(...);
log.info("Started");
}
@Deactivate
public void deactivate() {
+ // TODO: cleanup messageingService if needed.
log.info("Stopped");
}
@@ -85,7 +107,7 @@
checkArgument(node != null, "Unknown nodeId: %s", toNodeId);
Endpoint nodeEp = new Endpoint(node.ip().toString(), node.tcpPort());
try {
- messagingService.sendAsync(nodeEp, message.subject().value(), message);
+ messagingService.sendAsync(nodeEp, message.subject().value(), SERIALIZER.encode(message));
return true;
} catch (IOException e) {
log.error("Failed to send cluster message to nodeId: " + toNodeId, e);
@@ -119,7 +141,7 @@
broadcast(new ClusterMessage(
localNode.id(),
new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
- new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node)));
+ SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.LEAVING_MEMBER, node))));
members.remove(node.id());
}
@@ -131,7 +153,7 @@
broadcast(new ClusterMessage(
localNode.id(),
new MessageSubject("CLUSTER_MEMBERSHIP_EVENT"),
- new ClusterMembershipEvent(ClusterMembershipEventType.HEART_BEAT, localNode)));
+ SERIALIZER.encode(new ClusterMembershipEvent(ClusterMembershipEventType.HEART_BEAT, localNode))));
}
}
@@ -140,7 +162,7 @@
@Override
public void handle(ClusterMessage message) {
- ClusterMembershipEvent event = (ClusterMembershipEvent) message.payload();
+ ClusterMembershipEvent event = SERIALIZER.decode(message.payload());
ControllerNode node = event.node();
if (event.type() == ClusterMembershipEventType.HEART_BEAT) {
log.info("Node {} sent a hearbeat", node.id());
@@ -165,7 +187,8 @@
@Override
public void handle(Message message) {
- handler.handle((ClusterMessage) message.payload());
+ ClusterMessage clusterMessage = SERIALIZER.decode(message.payload());
+ handler.handle(clusterMessage);
}
}
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterMessageSubjects.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterMessageSubjects.java
deleted file mode 100644
index 11f6228..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterMessageSubjects.java
+++ /dev/null
@@ -1,10 +0,0 @@
-package org.onlab.onos.store.cluster.messaging.impl;
-
-import org.onlab.onos.store.cluster.messaging.MessageSubject;
-
-public final class ClusterMessageSubjects {
- // avoid instantiation
- private ClusterMessageSubjects() {}
-
- public static final MessageSubject CLUSTER_MEMBERSHIP_EVENT = new MessageSubject("CLUSTER_MEMBERSHIP_EVENT");
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/MessageSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/MessageSerializer.java
similarity index 94%
rename from core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/MessageSerializer.java
rename to core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/MessageSerializer.java
index 10368aa..bf47f49 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/MessageSerializer.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/MessageSerializer.java
@@ -1,4 +1,4 @@
-package org.onlab.onos.store.cluster.impl;
+package org.onlab.onos.store.cluster.messaging.impl;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -52,7 +52,7 @@
@Override
- public Object decode(byte[] data) {
+ public <T> T decode(byte[] data) {
return serializerPool.deserialize(data);
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/AntiEntropyAdvertisement.java b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/AntiEntropyAdvertisement.java
similarity index 95%
rename from core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/AntiEntropyAdvertisement.java
rename to core/store/dist/src/main/java/org/onlab/onos/store/common/impl/AntiEntropyAdvertisement.java
index b70da73..132f27a 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/AntiEntropyAdvertisement.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/AntiEntropyAdvertisement.java
@@ -1,4 +1,4 @@
-package org.onlab.onos.store.cluster.messaging;
+package org.onlab.onos.store.common.impl;
import java.util.Map;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/AntiEntropyReply.java b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/AntiEntropyReply.java
similarity index 97%
rename from core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/AntiEntropyReply.java
rename to core/store/dist/src/main/java/org/onlab/onos/store/common/impl/AntiEntropyReply.java
index 095752b..033a1de 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/AntiEntropyReply.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/common/impl/AntiEntropyReply.java
@@ -1,4 +1,4 @@
-package org.onlab.onos.store.cluster.messaging;
+package org.onlab.onos.store.common.impl;
import java.util.Map;
import java.util.Set;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyAdvertisement.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyAdvertisement.java
index 301884c..d05659b 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyAdvertisement.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyAdvertisement.java
@@ -8,7 +8,7 @@
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.store.Timestamp;
-import org.onlab.onos.store.cluster.messaging.AntiEntropyAdvertisement;
+import org.onlab.onos.store.common.impl.AntiEntropyAdvertisement;
// TODO DeviceID needs to be changed to something like (ProviderID, DeviceID)
// TODO: Handle Port as part of these messages, or separate messages for Ports?
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyReply.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyReply.java
index 011713e..e7a4d0a 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyReply.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/DeviceAntiEntropyReply.java
@@ -10,7 +10,7 @@
import org.onlab.onos.net.Device;
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.store.Timestamp;
-import org.onlab.onos.store.cluster.messaging.AntiEntropyReply;
+import org.onlab.onos.store.common.impl.AntiEntropyReply;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
index 8316769..2f1e504 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStore.java
@@ -4,6 +4,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+
import org.apache.commons.lang3.concurrent.ConcurrentException;
import org.apache.commons.lang3.concurrent.ConcurrentInitializer;
import org.apache.felix.scr.annotations.Activate;
@@ -12,6 +13,7 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
+import org.onlab.onos.cluster.ClusterService;
import org.onlab.onos.net.AnnotationsUtil;
import org.onlab.onos.net.DefaultAnnotations;
import org.onlab.onos.net.DefaultDevice;
@@ -33,10 +35,18 @@
import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.ClockService;
import org.onlab.onos.store.Timestamp;
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
+import org.onlab.onos.store.common.impl.MastershipBasedTimestamp;
import org.onlab.onos.store.common.impl.Timestamped;
+import org.onlab.onos.store.serializers.KryoPoolUtil;
+import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.util.KryoPool;
import org.onlab.util.NewConcurrentHashMap;
import org.slf4j.Logger;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
@@ -96,8 +106,35 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected ClockService clockService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterCommunicationService clusterCommunicator;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ private static final KryoSerializer SERIALIZER = new KryoSerializer() {
+ protected void setupKryoPool() {
+ serializerPool = KryoPool.newBuilder()
+ .register(KryoPoolUtil.API)
+ .register(InternalDeviceEvent.class)
+ .register(InternalPortEvent.class)
+ .register(InternalPortStatusEvent.class)
+ .register(Timestamped.class)
+ .register(MastershipBasedTimestamp.class)
+ .build()
+ .populate(1);
+ }
+
+ };
+
@Activate
public void activate() {
+ clusterCommunicator.addSubscriber(
+ GossipDeviceStoreMessageSubjects.DEVICE_UPDATE, new InternalDeviceEventListener());
+ clusterCommunicator.addSubscriber(
+ GossipDeviceStoreMessageSubjects.PORT_UPDATE, new InternalPortEventListener());
+ clusterCommunicator.addSubscriber(
+ GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE, new InternalPortStatusEventListener());
log.info("Started");
}
@@ -133,8 +170,14 @@
final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
DeviceEvent event = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
if (event != null) {
- // FIXME: broadcast deltaDesc, UP
- log.debug("broadcast deltaDesc");
+ log.info("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
+ providerId, deviceId);
+ try {
+ notifyPeers(new InternalDeviceEvent(providerId, deviceId, deltaDesc));
+ } catch (IOException e) {
+ log.error("Failed to notify peers of a device update topology event or providerId: "
+ + providerId + " and deviceId: " + deviceId, e);
+ }
}
return event;
}
@@ -298,19 +341,21 @@
List<PortDescription> portDescriptions) {
Timestamp newTimestamp = clockService.getTimestamp(deviceId);
- List<Timestamped<PortDescription>> deltaDescs = new ArrayList<>(portDescriptions.size());
- for (PortDescription e : portDescriptions) {
- deltaDescs.add(new Timestamped<PortDescription>(e, newTimestamp));
- }
+ Timestamped<List<PortDescription>> timestampedPortDescriptions =
+ new Timestamped<>(portDescriptions, newTimestamp);
- List<DeviceEvent> events = updatePortsInternal(providerId, deviceId,
- new Timestamped<>(portDescriptions, newTimestamp));
+ List<DeviceEvent> events = updatePortsInternal(providerId, deviceId, timestampedPortDescriptions);
if (!events.isEmpty()) {
- // FIXME: broadcast deltaDesc, UP
- log.debug("broadcast deltaDesc");
+ log.info("Notifying peers of a port update topology event for providerId: {} and deviceId: {}",
+ providerId, deviceId);
+ try {
+ notifyPeers(new InternalPortEvent(providerId, deviceId, timestampedPortDescriptions));
+ } catch (IOException e) {
+ log.error("Failed to notify peers of a port update topology event or providerId: "
+ + providerId + " and deviceId: " + deviceId, e);
+ }
}
return events;
-
}
private List<DeviceEvent> updatePortsInternal(ProviderId providerId,
@@ -437,8 +482,14 @@
final Timestamped<PortDescription> deltaDesc = new Timestamped<>(portDescription, newTimestamp);
DeviceEvent event = updatePortStatusInternal(providerId, deviceId, deltaDesc);
if (event != null) {
- // FIXME: broadcast deltaDesc
- log.debug("broadcast deltaDesc");
+ log.info("Notifying peers of a port status update topology event for providerId: {} and deviceId: {}",
+ providerId, deviceId);
+ try {
+ notifyPeers(new InternalPortStatusEvent(providerId, deviceId, deltaDesc));
+ } catch (IOException e) {
+ log.error("Failed to notify peers of a port status update topology event or providerId: "
+ + providerId + " and deviceId: " + deviceId, e);
+ }
}
return event;
}
@@ -749,4 +800,70 @@
return portDescs.put(newOne.value().portNumber(), newOne);
}
}
+
+ private void notifyPeers(InternalDeviceEvent event) throws IOException {
+ ClusterMessage message = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ GossipDeviceStoreMessageSubjects.DEVICE_UPDATE,
+ SERIALIZER.encode(event));
+ clusterCommunicator.broadcast(message);
+ }
+
+ private void notifyPeers(InternalPortEvent event) throws IOException {
+ ClusterMessage message = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ GossipDeviceStoreMessageSubjects.PORT_UPDATE,
+ SERIALIZER.encode(event));
+ clusterCommunicator.broadcast(message);
+ }
+
+ private void notifyPeers(InternalPortStatusEvent event) throws IOException {
+ ClusterMessage message = new ClusterMessage(
+ clusterService.getLocalNode().id(),
+ GossipDeviceStoreMessageSubjects.PORT_STATUS_UPDATE,
+ SERIALIZER.encode(event));
+ clusterCommunicator.broadcast(message);
+ }
+
+ private class InternalDeviceEventListener implements ClusterMessageHandler {
+ @Override
+ public void handle(ClusterMessage message) {
+ log.info("Received device update event from peer: {}", message.sender());
+ InternalDeviceEvent event = (InternalDeviceEvent) SERIALIZER.decode(message.payload());
+ ProviderId providerId = event.providerId();
+ DeviceId deviceId = event.deviceId();
+ Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();
+ createOrUpdateDeviceInternal(providerId, deviceId, deviceDescription);
+ }
+ }
+
+ private class InternalPortEventListener implements ClusterMessageHandler {
+ @Override
+ public void handle(ClusterMessage message) {
+
+ log.info("Received port update event from peer: {}", message.sender());
+ InternalPortEvent event = (InternalPortEvent) SERIALIZER.decode(message.payload());
+
+ ProviderId providerId = event.providerId();
+ DeviceId deviceId = event.deviceId();
+ Timestamped<List<PortDescription>> portDescriptions = event.portDescriptions();
+
+ updatePortsInternal(providerId, deviceId, portDescriptions);
+ }
+ }
+
+ private class InternalPortStatusEventListener implements ClusterMessageHandler {
+ @Override
+ public void handle(ClusterMessage message) {
+
+ log.info("Received port status update event from peer: {}", message.sender());
+ InternalPortStatusEvent event = (InternalPortStatusEvent) SERIALIZER.decode(message.payload());
+
+ ProviderId providerId = event.providerId();
+ DeviceId deviceId = event.deviceId();
+ Timestamped<PortDescription> portDescription = event.portDescription();
+
+ updatePortStatusInternal(providerId, deviceId, portDescription);
+ }
+ }
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStoreMessageSubjects.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStoreMessageSubjects.java
new file mode 100644
index 0000000..58fed70
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/GossipDeviceStoreMessageSubjects.java
@@ -0,0 +1,15 @@
+package org.onlab.onos.store.device.impl;
+
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+
+/**
+ * MessageSubjects used by GossipDeviceStore.
+ */
+public final class GossipDeviceStoreMessageSubjects {
+
+ private GossipDeviceStoreMessageSubjects() {}
+
+ public static final MessageSubject DEVICE_UPDATE = new MessageSubject("peer-device-update");
+ public static final MessageSubject PORT_UPDATE = new MessageSubject("peer-port-update");
+ public static final MessageSubject PORT_STATUS_UPDATE = new MessageSubject("peer-port-status-update");
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEvent.java
new file mode 100644
index 0000000..26f1d7f
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalDeviceEvent.java
@@ -0,0 +1,38 @@
+package org.onlab.onos.store.device.impl;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.device.DeviceDescription;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.common.impl.Timestamped;
+
+/**
+ * Information published by GossipDeviceStore to notify peers of a device
+ * change event.
+ */
+public class InternalDeviceEvent {
+
+ private final ProviderId providerId;
+ private final DeviceId deviceId;
+ private final Timestamped<DeviceDescription> deviceDescription;
+
+ protected InternalDeviceEvent(
+ ProviderId providerId,
+ DeviceId deviceId,
+ Timestamped<DeviceDescription> deviceDescription) {
+ this.providerId = providerId;
+ this.deviceId = deviceId;
+ this.deviceDescription = deviceDescription;
+ }
+
+ public DeviceId deviceId() {
+ return deviceId;
+ }
+
+ public ProviderId providerId() {
+ return providerId;
+ }
+
+ public Timestamped<DeviceDescription> deviceDescription() {
+ return deviceDescription;
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEvent.java
new file mode 100644
index 0000000..48e3be6
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortEvent.java
@@ -0,0 +1,40 @@
+package org.onlab.onos.store.device.impl;
+
+import java.util.List;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.device.PortDescription;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.common.impl.Timestamped;
+
+/**
+ * Information published by GossipDeviceStore to notify peers of a port
+ * change event.
+ */
+public class InternalPortEvent {
+
+ private final ProviderId providerId;
+ private final DeviceId deviceId;
+ private final Timestamped<List<PortDescription>> portDescriptions;
+
+ protected InternalPortEvent(
+ ProviderId providerId,
+ DeviceId deviceId,
+ Timestamped<List<PortDescription>> portDescriptions) {
+ this.providerId = providerId;
+ this.deviceId = deviceId;
+ this.portDescriptions = portDescriptions;
+ }
+
+ public DeviceId deviceId() {
+ return deviceId;
+ }
+
+ public ProviderId providerId() {
+ return providerId;
+ }
+
+ public Timestamped<List<PortDescription>> portDescriptions() {
+ return portDescriptions;
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEvent.java b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEvent.java
new file mode 100644
index 0000000..0bdfdbf
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/device/impl/InternalPortStatusEvent.java
@@ -0,0 +1,38 @@
+package org.onlab.onos.store.device.impl;
+
+import org.onlab.onos.net.DeviceId;
+import org.onlab.onos.net.device.PortDescription;
+import org.onlab.onos.net.provider.ProviderId;
+import org.onlab.onos.store.common.impl.Timestamped;
+
+/**
+ * Information published by GossipDeviceStore to notify peers of a port
+ * status change event.
+ */
+public class InternalPortStatusEvent {
+
+ private final ProviderId providerId;
+ private final DeviceId deviceId;
+ private final Timestamped<PortDescription> portDescription;
+
+ protected InternalPortStatusEvent(
+ ProviderId providerId,
+ DeviceId deviceId,
+ Timestamped<PortDescription> portDescription) {
+ this.providerId = providerId;
+ this.deviceId = deviceId;
+ this.portDescription = portDescription;
+ }
+
+ public DeviceId deviceId() {
+ return deviceId;
+ }
+
+ public ProviderId providerId() {
+ return providerId;
+ }
+
+ public Timestamped<PortDescription> portDescription() {
+ return portDescription;
+ }
+}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index f3c8f34..d49e00b 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -58,6 +58,11 @@
@Override
+ public int getFlowRuleCount() {
+ return flowEntries.size();
+ }
+
+ @Override
public synchronized FlowEntry getFlowEntry(FlowRule rule) {
for (FlowEntry f : flowEntries.get(rule.deviceId())) {
if (f.equals(rule)) {
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java
new file mode 100644
index 0000000..f4dadad
--- /dev/null
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/serializers/ClusterMessageSerializer.java
@@ -0,0 +1,36 @@
+package org.onlab.onos.store.serializers;
+
+import org.onlab.onos.cluster.NodeId;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.esotericsoftware.kryo.Serializer;
+import com.esotericsoftware.kryo.io.Input;
+import com.esotericsoftware.kryo.io.Output;
+
+public final class ClusterMessageSerializer extends Serializer<ClusterMessage> {
+
+ public ClusterMessageSerializer() {
+ // does not accept null
+ super(false);
+ }
+
+ @Override
+ public void write(Kryo kryo, Output output, ClusterMessage message) {
+ kryo.writeClassAndObject(output, message.sender());
+ kryo.writeClassAndObject(output, message.subject());
+ output.writeInt(message.payload().length);
+ output.writeBytes(message.payload());
+ }
+
+ @Override
+ public ClusterMessage read(Kryo kryo, Input input,
+ Class<ClusterMessage> type) {
+ NodeId sender = (NodeId) kryo.readClassAndObject(input);
+ MessageSubject subject = (MessageSubject) kryo.readClassAndObject(input);
+ int payloadSize = input.readInt();
+ byte[] payload = input.readBytes(payloadSize);
+ return new ClusterMessage(sender, subject, payload);
+ }
+}
\ No newline at end of file
diff --git a/core/store/dist/src/test/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManagerTest.java b/core/store/dist/src/test/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManagerTest.java
index bba12f2..e63fcaa 100644
--- a/core/store/dist/src/test/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManagerTest.java
+++ b/core/store/dist/src/test/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManagerTest.java
@@ -7,6 +7,7 @@
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.cluster.messaging.impl.ClusterCommunicationManager;
+import org.onlab.onos.store.cluster.messaging.impl.MessageSerializer;
import org.onlab.netty.NettyMessagingService;
import org.onlab.packet.IpPrefix;
diff --git a/core/store/dist/src/test/java/org/onlab/onos/store/device/impl/GossipDeviceStoreTest.java b/core/store/dist/src/test/java/org/onlab/onos/store/device/impl/GossipDeviceStoreTest.java
index 94de9b2..fa42a6b 100644
--- a/core/store/dist/src/test/java/org/onlab/onos/store/device/impl/GossipDeviceStoreTest.java
+++ b/core/store/dist/src/test/java/org/onlab/onos/store/device/impl/GossipDeviceStoreTest.java
@@ -5,6 +5,7 @@
import static org.onlab.onos.net.DeviceId.deviceId;
import static org.onlab.onos.net.device.DeviceEvent.Type.*;
+import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
@@ -19,6 +20,11 @@
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
+import org.onlab.onos.cluster.ClusterEventListener;
+import org.onlab.onos.cluster.ClusterService;
+import org.onlab.onos.cluster.ControllerNode;
+import org.onlab.onos.cluster.ControllerNode.State;
+import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.MastershipTerm;
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.net.Annotations;
@@ -37,6 +43,11 @@
import org.onlab.onos.net.device.PortDescription;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.ClockService;
+import org.onlab.onos.store.cluster.messaging.ClusterCommunicationService;
+import org.onlab.onos.store.cluster.messaging.ClusterMessage;
+import org.onlab.onos.store.cluster.messaging.ClusterMessageHandler;
+import org.onlab.onos.store.cluster.messaging.MessageSubject;
+import org.onlab.packet.IpPrefix;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
@@ -105,7 +116,10 @@
deviceClockManager.setMastershipTerm(DID1, MastershipTerm.of(MYSELF, 1));
deviceClockManager.setMastershipTerm(DID2, MastershipTerm.of(MYSELF, 2));
- gossipDeviceStore = new TestGossipDeviceStore(clockService);
+ ClusterCommunicationService clusterCommunicator = new TestClusterCommunicationService();
+ ClusterService clusterService = new TestClusterService();
+
+ gossipDeviceStore = new TestGossipDeviceStore(clockService, clusterService, clusterCommunicator);
gossipDeviceStore.activate();
deviceStore = gossipDeviceStore;
}
@@ -541,8 +555,65 @@
private static final class TestGossipDeviceStore extends GossipDeviceStore {
- public TestGossipDeviceStore(ClockService clockService) {
+ public TestGossipDeviceStore(
+ ClockService clockService,
+ ClusterService clusterService,
+ ClusterCommunicationService clusterCommunicator) {
this.clockService = clockService;
+ this.clusterService = clusterService;
+ this.clusterCommunicator = clusterCommunicator;
+ }
+ }
+
+ private static final class TestClusterCommunicationService implements ClusterCommunicationService {
+ @Override
+ public boolean broadcast(ClusterMessage message) throws IOException { return true; }
+ @Override
+ public boolean unicast(ClusterMessage message, NodeId nodeId) throws IOException { return true; }
+ @Override
+ public boolean multicast(ClusterMessage message, Set<NodeId> nodeIds) throws IOException { return true; }
+ @Override
+ public void addSubscriber(MessageSubject subject, ClusterMessageHandler subscriber) {}
+ }
+
+ private static final class TestClusterService implements ClusterService {
+
+ private static final ControllerNode ONOS1 =
+ new DefaultControllerNode(new NodeId("N1"), IpPrefix.valueOf("127.0.0.1"));
+ private final Map<NodeId, ControllerNode> nodes = new HashMap<>();
+ private final Map<NodeId, ControllerNode.State> nodeStates = new HashMap<>();
+
+ public TestClusterService() {
+ nodes.put(new NodeId("N1"), ONOS1);
+ nodeStates.put(new NodeId("N1"), ControllerNode.State.ACTIVE);
+ }
+
+ @Override
+ public ControllerNode getLocalNode() {
+ return ONOS1;
+ }
+
+ @Override
+ public Set<ControllerNode> getNodes() {
+ return Sets.newHashSet(nodes.values());
+ }
+
+ @Override
+ public ControllerNode getNode(NodeId nodeId) {
+ return nodes.get(nodeId);
+ }
+
+ @Override
+ public State getState(NodeId nodeId) {
+ return nodeStates.get(nodeId);
+ }
+
+ @Override
+ public void addListener(ClusterEventListener listener) {
+ }
+
+ @Override
+ public void removeListener(ClusterEventListener listener) {
}
}
}
diff --git a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
index f83ac59..61a7374 100644
--- a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
@@ -57,7 +57,7 @@
rawNodes = theInstance.getMap("nodes");
OptionalCacheLoader<NodeId, DefaultControllerNode> nodeLoader
- = new OptionalCacheLoader<>(kryoSerializationService, rawNodes);
+ = new OptionalCacheLoader<>(serializer, rawNodes);
nodes = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
rawNodes.addEntryListener(new RemoteCacheEventHandler<>(nodes), true);
diff --git a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java
index 18e6e96..aafbe4b 100644
--- a/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java
+++ b/core/store/hz/cluster/src/main/java/org/onlab/onos/store/cluster/impl/DistributedMastershipStore.java
@@ -52,7 +52,7 @@
rawMasters = theInstance.getMap("masters");
OptionalCacheLoader<DeviceId, NodeId> nodeLoader
- = new OptionalCacheLoader<>(kryoSerializationService, rawMasters);
+ = new OptionalCacheLoader<>(serializer, rawMasters);
masters = new AbsentInvalidatingLoadingCache<>(newBuilder().build(nodeLoader));
rawMasters.addEntryListener(new RemoteMasterShipEventHandler(masters), true);
diff --git a/core/store/hz/common/src/main/java/org/onlab/onos/store/common/AbstractHazelcastStore.java b/core/store/hz/common/src/main/java/org/onlab/onos/store/common/AbstractHazelcastStore.java
index 0302105..ff9f43a 100644
--- a/core/store/hz/common/src/main/java/org/onlab/onos/store/common/AbstractHazelcastStore.java
+++ b/core/store/hz/common/src/main/java/org/onlab/onos/store/common/AbstractHazelcastStore.java
@@ -15,7 +15,8 @@
import org.onlab.onos.event.Event;
import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.StoreDelegate;
-import org.onlab.onos.store.serializers.KryoSerializationService;
+import org.onlab.onos.store.serializers.KryoSerializer;
+import org.onlab.onos.store.serializers.Serializer;
import org.slf4j.Logger;
import static com.google.common.base.Preconditions.checkNotNull;
@@ -24,7 +25,7 @@
/**
* Abstraction of a distributed store based on Hazelcast.
*/
-@Component(componentAbstract = true)
+@Component
public abstract class AbstractHazelcastStore<E extends Event, D extends StoreDelegate<E>>
extends AbstractStore<E, D> {
@@ -33,13 +34,13 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected StoreService storeService;
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
- protected KryoSerializationService kryoSerializationService;
+ protected Serializer serializer;
protected HazelcastInstance theInstance;
@Activate
public void activate() {
+ serializer = new KryoSerializer();
theInstance = storeService.getHazelcastInstance();
}
@@ -50,7 +51,7 @@
* @return serialized object
*/
protected byte[] serialize(Object obj) {
- return kryoSerializationService.serialize(obj);
+ return serializer.encode(obj);
}
/**
@@ -61,7 +62,7 @@
* @return deserialized object
*/
protected <T> T deserialize(byte[] bytes) {
- return kryoSerializationService.deserialize(bytes);
+ return serializer.decode(bytes);
}
diff --git a/core/store/hz/common/src/main/java/org/onlab/onos/store/common/OptionalCacheLoader.java b/core/store/hz/common/src/main/java/org/onlab/onos/store/common/OptionalCacheLoader.java
index f96fdd8..6631594 100644
--- a/core/store/hz/common/src/main/java/org/onlab/onos/store/common/OptionalCacheLoader.java
+++ b/core/store/hz/common/src/main/java/org/onlab/onos/store/common/OptionalCacheLoader.java
@@ -2,7 +2,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
-import org.onlab.onos.store.serializers.KryoSerializationService;
+import org.onlab.onos.store.serializers.Serializer;
import com.google.common.base.Optional;
import com.google.common.cache.CacheLoader;
@@ -18,28 +18,28 @@
public final class OptionalCacheLoader<K, V> extends
CacheLoader<K, Optional<V>> {
- private final KryoSerializationService kryoSerializationService;
+ private final Serializer serializer;
private IMap<byte[], byte[]> rawMap;
/**
* Constructor.
*
- * @param kryoSerializationService to use for serialization
+ * @param serializer to use for serialization
* @param rawMap underlying IMap
*/
- public OptionalCacheLoader(KryoSerializationService kryoSerializationService, IMap<byte[], byte[]> rawMap) {
- this.kryoSerializationService = checkNotNull(kryoSerializationService);
+ public OptionalCacheLoader(Serializer serializer, IMap<byte[], byte[]> rawMap) {
+ this.serializer = checkNotNull(serializer);
this.rawMap = checkNotNull(rawMap);
}
@Override
public Optional<V> load(K key) throws Exception {
- byte[] keyBytes = kryoSerializationService.serialize(key);
+ byte[] keyBytes = serializer.encode(key);
byte[] valBytes = rawMap.get(keyBytes);
if (valBytes == null) {
return Optional.absent();
}
- V dev = kryoSerializationService.deserialize(valBytes);
+ V dev = serializer.decode(valBytes);
return Optional.of(dev);
}
}
diff --git a/core/store/hz/net/src/main/java/org/onlab/onos/store/device/impl/DistributedDeviceStore.java b/core/store/hz/net/src/main/java/org/onlab/onos/store/device/impl/DistributedDeviceStore.java
index 2ecd525..0016939 100644
--- a/core/store/hz/net/src/main/java/org/onlab/onos/store/device/impl/DistributedDeviceStore.java
+++ b/core/store/hz/net/src/main/java/org/onlab/onos/store/device/impl/DistributedDeviceStore.java
@@ -88,7 +88,7 @@
// TODO decide on Map name scheme to avoid collision
rawDevices = theInstance.getMap("devices");
final OptionalCacheLoader<DeviceId, DefaultDevice> deviceLoader
- = new OptionalCacheLoader<>(kryoSerializationService, rawDevices);
+ = new OptionalCacheLoader<>(serializer, rawDevices);
devices = new AbsentInvalidatingLoadingCache<>(newBuilder().build(deviceLoader));
// refresh/populate cache based on notification from other instance
devicesListener = rawDevices.addEntryListener(new RemoteDeviceEventHandler(devices), includeValue);
@@ -98,7 +98,7 @@
rawDevicePorts = theInstance.getMap("devicePorts");
final OptionalCacheLoader<DeviceId, Map<PortNumber, Port>> devicePortLoader
- = new OptionalCacheLoader<>(kryoSerializationService, rawDevicePorts);
+ = new OptionalCacheLoader<>(serializer, rawDevicePorts);
devicePorts = new AbsentInvalidatingLoadingCache<>(newBuilder().build(devicePortLoader));
// refresh/populate cache based on notification from other instance
portsListener = rawDevicePorts.addEntryListener(new RemotePortEventHandler(devicePorts), includeValue);
diff --git a/core/store/hz/net/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java b/core/store/hz/net/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
index f3c8f34..d49e00b 100644
--- a/core/store/hz/net/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
+++ b/core/store/hz/net/src/main/java/org/onlab/onos/store/flow/impl/DistributedFlowRuleStore.java
@@ -58,6 +58,11 @@
@Override
+ public int getFlowRuleCount() {
+ return flowEntries.size();
+ }
+
+ @Override
public synchronized FlowEntry getFlowEntry(FlowRule rule) {
for (FlowEntry f : flowEntries.get(rule.deviceId())) {
if (f.equals(rule)) {
diff --git a/core/store/hz/net/src/main/java/org/onlab/onos/store/link/impl/DistributedLinkStore.java b/core/store/hz/net/src/main/java/org/onlab/onos/store/link/impl/DistributedLinkStore.java
index 5f5184f..3dd42a3 100644
--- a/core/store/hz/net/src/main/java/org/onlab/onos/store/link/impl/DistributedLinkStore.java
+++ b/core/store/hz/net/src/main/java/org/onlab/onos/store/link/impl/DistributedLinkStore.java
@@ -71,7 +71,7 @@
// TODO decide on Map name scheme to avoid collision
rawLinks = theInstance.getMap("links");
final OptionalCacheLoader<LinkKey, DefaultLink> linkLoader
- = new OptionalCacheLoader<>(kryoSerializationService, rawLinks);
+ = new OptionalCacheLoader<>(serializer, rawLinks);
links = new AbsentInvalidatingLoadingCache<>(newBuilder().build(linkLoader));
// refresh/populate cache based on notification from other instance
linksListener = rawLinks.addEntryListener(new RemoteLinkEventHandler(links), includeValue);
diff --git a/core/store/hz/net/src/test/java/org/onlab/onos/store/device/impl/DistributedDeviceStoreTest.java b/core/store/hz/net/src/test/java/org/onlab/onos/store/device/impl/DistributedDeviceStoreTest.java
index 80c9464..7e2924b 100644
--- a/core/store/hz/net/src/test/java/org/onlab/onos/store/device/impl/DistributedDeviceStoreTest.java
+++ b/core/store/hz/net/src/test/java/org/onlab/onos/store/device/impl/DistributedDeviceStoreTest.java
@@ -36,9 +36,6 @@
import org.onlab.onos.store.common.StoreManager;
import org.onlab.onos.store.common.StoreService;
import org.onlab.onos.store.common.TestStoreManager;
-import org.onlab.onos.store.serializers.KryoSerializationManager;
-import org.onlab.onos.store.serializers.KryoSerializationService;
-
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.hazelcast.config.Config;
@@ -63,7 +60,6 @@
private static final PortNumber P3 = PortNumber.portNumber(3);
private DistributedDeviceStore deviceStore;
- private KryoSerializationManager serializationMgr;
private StoreManager storeManager;
@@ -85,10 +81,7 @@
storeManager = new TestStoreManager(Hazelcast.newHazelcastInstance(config));
storeManager.activate();
- serializationMgr = new KryoSerializationManager();
- serializationMgr.activate();
-
- deviceStore = new TestDistributedDeviceStore(storeManager, serializationMgr);
+ deviceStore = new TestDistributedDeviceStore(storeManager);
deviceStore.activate();
}
@@ -96,8 +89,6 @@
public void tearDown() throws Exception {
deviceStore.deactivate();
- serializationMgr.deactivate();
-
storeManager.deactivate();
}
@@ -392,10 +383,8 @@
}
private class TestDistributedDeviceStore extends DistributedDeviceStore {
- public TestDistributedDeviceStore(StoreService storeService,
- KryoSerializationService kryoSerializationService) {
+ public TestDistributedDeviceStore(StoreService storeService) {
this.storeService = storeService;
- this.kryoSerializationService = kryoSerializationService;
}
}
}
diff --git a/core/store/hz/net/src/test/java/org/onlab/onos/store/link/impl/DistributedLinkStoreTest.java b/core/store/hz/net/src/test/java/org/onlab/onos/store/link/impl/DistributedLinkStoreTest.java
index a76e901..dd959b5 100644
--- a/core/store/hz/net/src/test/java/org/onlab/onos/store/link/impl/DistributedLinkStoreTest.java
+++ b/core/store/hz/net/src/test/java/org/onlab/onos/store/link/impl/DistributedLinkStoreTest.java
@@ -30,9 +30,6 @@
import org.onlab.onos.store.common.StoreManager;
import org.onlab.onos.store.common.StoreService;
import org.onlab.onos.store.common.TestStoreManager;
-import org.onlab.onos.store.serializers.KryoSerializationManager;
-import org.onlab.onos.store.serializers.KryoSerializationService;
-
import com.google.common.collect.Iterables;
import com.hazelcast.config.Config;
import com.hazelcast.core.Hazelcast;
@@ -51,7 +48,6 @@
private static final PortNumber P3 = PortNumber.portNumber(3);
private StoreManager storeManager;
- private KryoSerializationManager serializationMgr;
private DistributedLinkStore linkStore;
@@ -71,17 +67,13 @@
storeManager = new TestStoreManager(Hazelcast.newHazelcastInstance(config));
storeManager.activate();
- serializationMgr = new KryoSerializationManager();
- serializationMgr.activate();
-
- linkStore = new TestDistributedLinkStore(storeManager, serializationMgr);
+ linkStore = new TestDistributedLinkStore(storeManager);
linkStore.activate();
}
@After
public void tearDown() throws Exception {
linkStore.deactivate();
- serializationMgr.deactivate();
storeManager.deactivate();
}
@@ -361,10 +353,8 @@
class TestDistributedLinkStore extends DistributedLinkStore {
- TestDistributedLinkStore(StoreService storeService,
- KryoSerializationService kryoSerializationService) {
+ TestDistributedLinkStore(StoreService storeService) {
this.storeService = storeService;
- this.kryoSerializationService = kryoSerializationService;
}
}
}
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java
index f1a12fe..0c33cfe 100644
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoPoolUtil.java
@@ -2,6 +2,7 @@
import java.net.URI;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashMap;
import org.onlab.onos.cluster.ControllerNode;
@@ -21,6 +22,8 @@
import org.onlab.onos.net.MastershipRole;
import org.onlab.onos.net.Port;
import org.onlab.onos.net.PortNumber;
+import org.onlab.onos.net.device.DefaultDeviceDescription;
+import org.onlab.onos.net.device.DefaultPortDescription;
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.packet.IpAddress;
import org.onlab.packet.IpPrefix;
@@ -47,6 +50,7 @@
.register(
//
ArrayList.class,
+ Arrays.asList().getClass(),
HashMap.class,
//
ControllerNode.State.class,
@@ -54,8 +58,10 @@
DefaultAnnotations.class,
DefaultControllerNode.class,
DefaultDevice.class,
+ DefaultDeviceDescription.class,
MastershipRole.class,
Port.class,
+ DefaultPortDescription.class,
Element.class,
Link.Type.class
)
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializationManager.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializationManager.java
deleted file mode 100644
index 1b5cac4..0000000
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializationManager.java
+++ /dev/null
@@ -1,68 +0,0 @@
-package org.onlab.onos.store.serializers;
-
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Service;
-import org.onlab.util.KryoPool;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.nio.ByteBuffer;
-
-/**
- * Serialization service using Kryo.
- */
-@Component(immediate = true)
-@Service
-public class KryoSerializationManager implements KryoSerializationService {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
- private KryoPool serializerPool;
-
-
- @Activate
- public void activate() {
- setupKryoPool();
- log.info("Started");
- }
-
- @Deactivate
- public void deactivate() {
- log.info("Stopped");
- }
-
- /**
- * Sets up the common serialzers pool.
- */
- protected void setupKryoPool() {
- serializerPool = KryoPool.newBuilder()
- .register(KryoPoolUtil.API)
- .build()
- .populate(1);
- }
-
- @Override
- public byte[] serialize(final Object obj) {
- return serializerPool.serialize(obj);
- }
-
- @Override
- public <T> T deserialize(final byte[] bytes) {
- if (bytes == null) {
- return null;
- }
- return serializerPool.deserialize(bytes);
- }
-
- @Override
- public void serialize(Object obj, ByteBuffer buffer) {
- serializerPool.serialize(obj, buffer);
- }
-
- @Override
- public <T> T deserialize(ByteBuffer buffer) {
- return serializerPool.deserialize(buffer);
- }
-
-}
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializationService.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializationService.java
deleted file mode 100644
index 385128c..0000000
--- a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializationService.java
+++ /dev/null
@@ -1,46 +0,0 @@
-package org.onlab.onos.store.serializers;
-
-import java.nio.ByteBuffer;
-
-// TODO: To be replaced with SerializationService from IOLoop activity
-/**
- * Service to serialize Objects into byte array.
- */
-public interface KryoSerializationService {
-
- /**
- * Serializes the specified object into bytes using one of the
- * pre-registered serializers.
- *
- * @param obj object to be serialized
- * @return serialized bytes
- */
- public byte[] serialize(final Object obj);
-
- /**
- * Serializes the specified object into bytes using one of the
- * pre-registered serializers.
- *
- * @param obj object to be serialized
- * @param buffer to write serialized bytes
- */
- public void serialize(final Object obj, ByteBuffer buffer);
-
- /**
- * Deserializes the specified bytes into an object using one of the
- * pre-registered serializers.
- *
- * @param bytes bytes to be deserialized
- * @return deserialized object
- */
- public <T> T deserialize(final byte[] bytes);
-
- /**
- * Deserializes the specified bytes into an object using one of the
- * pre-registered serializers.
- *
- * @param buffer bytes to be deserialized
- * @return deserialized object
- */
- public <T> T deserialize(final ByteBuffer buffer);
-}
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializer.java
new file mode 100644
index 0000000..93ee854
--- /dev/null
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/KryoSerializer.java
@@ -0,0 +1,55 @@
+package org.onlab.onos.store.serializers;
+
+import org.onlab.util.KryoPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Serializer implementation using Kryo.
+ */
+public class KryoSerializer implements Serializer {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ protected KryoPool serializerPool;
+
+
+ public KryoSerializer() {
+ setupKryoPool();
+ }
+
+ /**
+ * Sets up the common serialzers pool.
+ */
+ protected void setupKryoPool() {
+ serializerPool = KryoPool.newBuilder()
+ .register(KryoPoolUtil.API)
+ .build()
+ .populate(1);
+ }
+
+ @Override
+ public byte[] encode(final Object obj) {
+ return serializerPool.serialize(obj);
+ }
+
+ @Override
+ public <T> T decode(final byte[] bytes) {
+ if (bytes == null) {
+ return null;
+ }
+ return serializerPool.deserialize(bytes);
+ }
+
+ @Override
+ public void encode(Object obj, ByteBuffer buffer) {
+ serializerPool.serialize(obj, buffer);
+ }
+
+ @Override
+ public <T> T decode(ByteBuffer buffer) {
+ return serializerPool.deserialize(buffer);
+ }
+
+}
diff --git a/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/Serializer.java b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/Serializer.java
new file mode 100644
index 0000000..12cf3bc
--- /dev/null
+++ b/core/store/serializers/src/main/java/org/onlab/onos/store/serializers/Serializer.java
@@ -0,0 +1,42 @@
+package org.onlab.onos.store.serializers;
+
+import java.nio.ByteBuffer;
+
+// TODO: To be replaced with SerializationService from IOLoop activity
+/**
+ * Service to serialize Objects into byte array.
+ */
+public interface Serializer {
+
+ /**
+ * Serializes the specified object into bytes.
+ *
+ * @param obj object to be serialized
+ * @return serialized bytes
+ */
+ public byte[] encode(final Object obj);
+
+ /**
+ * Serializes the specified object into bytes.
+ *
+ * @param obj object to be serialized
+ * @param buffer to write serialized bytes
+ */
+ public void encode(final Object obj, ByteBuffer buffer);
+
+ /**
+ * Deserializes the specified bytes into an object.
+ *
+ * @param bytes bytes to be deserialized
+ * @return deserialized object
+ */
+ public <T> T decode(final byte[] bytes);
+
+ /**
+ * Deserializes the specified bytes into an object.
+ *
+ * @param buffer bytes to be deserialized
+ * @return deserialized object
+ */
+ public <T> T decode(final ByteBuffer buffer);
+}
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
index 5fa92f3..7ff797c 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
@@ -57,6 +57,11 @@
@Override
+ public int getFlowRuleCount() {
+ return flowEntries.size();
+ }
+
+ @Override
public synchronized FlowEntry getFlowEntry(FlowRule rule) {
for (FlowEntry f : flowEntries.get(rule.deviceId())) {
if (f.equals(rule)) {
@@ -98,6 +103,7 @@
public synchronized void deleteFlowRule(FlowRule rule) {
FlowEntry entry = getFlowEntry(rule);
if (entry == null) {
+ //log.warn("Cannot find rule {}", rule);
return;
}
entry.setState(FlowEntryState.PENDING_REMOVE);
@@ -120,7 +126,7 @@
return new FlowRuleEvent(Type.RULE_UPDATED, rule);
}
- flowEntries.put(did, rule);
+ //flowEntries.put(did, rule);
return null;
}
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleIntentStore.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleIntentStore.java
index 4143548..732d753 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleIntentStore.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleIntentStore.java
@@ -1,12 +1,6 @@
package org.onlab.onos.store.trivial.impl;
-import static org.onlab.onos.net.intent.IntentState.COMPILED;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
+import com.google.common.collect.ImmutableSet;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -21,13 +15,18 @@
import org.onlab.onos.store.AbstractStore;
import org.slf4j.Logger;
-import com.google.common.collect.ImmutableSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.onlab.onos.net.intent.IntentState.*;
+import static org.slf4j.LoggerFactory.getLogger;
@Component(immediate = true)
@Service
public class SimpleIntentStore
- extends AbstractStore<IntentEvent, IntentStoreDelegate>
- implements IntentStore {
+ extends AbstractStore<IntentEvent, IntentStoreDelegate>
+ implements IntentStore {
private final Logger log = getLogger(getClass());
private final Map<IntentId, Intent> intents = new HashMap<>();
@@ -46,7 +45,7 @@
@Override
public IntentEvent createIntent(Intent intent) {
- intents.put(intent.getId(), intent);
+ intents.put(intent.id(), intent);
return this.setState(intent, IntentState.SUBMITTED);
}
@@ -54,7 +53,7 @@
public IntentEvent removeIntent(IntentId intentId) {
Intent intent = intents.remove(intentId);
installable.remove(intentId);
- IntentEvent event = this.setState(intent, IntentState.WITHDRAWN);
+ IntentEvent event = this.setState(intent, WITHDRAWN);
states.remove(intentId);
return event;
}
@@ -79,19 +78,21 @@
return states.get(id);
}
- // TODO return dispatch event here... replace with state transition methods
@Override
- public IntentEvent setState(Intent intent, IntentState newState) {
- IntentId id = intent.getId();
- IntentState oldState = states.get(id);
- states.put(id, newState);
- return new IntentEvent(intent, newState, oldState, System.currentTimeMillis());
+ public IntentEvent setState(Intent intent, IntentState state) {
+ IntentId id = intent.id();
+ states.put(id, state);
+ IntentEvent.Type type = (state == SUBMITTED ? IntentEvent.Type.SUBMITTED :
+ (state == INSTALLED ? IntentEvent.Type.INSTALLED :
+ (state == FAILED ? IntentEvent.Type.FAILED :
+ state == WITHDRAWN ? IntentEvent.Type.WITHDRAWN :
+ null)));
+ return type == null ? null : new IntentEvent(type, intent);
}
@Override
- public IntentEvent addInstallableIntents(IntentId intentId, List<InstallableIntent> result) {
+ public void addInstallableIntents(IntentId intentId, List<InstallableIntent> result) {
installable.put(intentId, result);
- return this.setState(intents.get(intentId), COMPILED);
}
@Override
diff --git a/features/features.xml b/features/features.xml
index 44606eb..491d46b 100644
--- a/features/features.xml
+++ b/features/features.xml
@@ -53,6 +53,8 @@
<feature>onos-api</feature>
<bundle>mvn:org.onlab.onos/onos-core-net/1.0.0-SNAPSHOT</bundle>
<bundle>mvn:org.onlab.onos/onos-core-dist/1.0.0-SNAPSHOT</bundle>
+ <bundle>mvn:org.onlab.onos/onos-core-serializers/1.0.0-SNAPSHOT</bundle>
+ <bundle>mvn:org.onlab.onos/onlab-netty/1.0.0-SNAPSHOT</bundle>
</feature>
<feature name="onos-core-hazelcast" version="1.0.0"
diff --git a/tools/dev/bash_profile b/tools/dev/bash_profile
index 270370a..f4ed3c3 100644
--- a/tools/dev/bash_profile
+++ b/tools/dev/bash_profile
@@ -33,6 +33,7 @@
alias op='onos-package'
alias ot='onos-test'
alias ol='onos-log'
+alias pub='onos-push-update-bundle'
# Short-hand for tailing the ONOS (karaf) log
alias tl='$ONOS_ROOT/tools/dev/bin/onos-local-log'
@@ -89,5 +90,5 @@
}
function nuke {
- spy | cut -c7-11 | xargs kill
+ spy "$@" | cut -c7-11 | xargs kill
}
diff --git a/tools/test/bin/onos-install b/tools/test/bin/onos-install
index a87ff17..38934fd 100755
--- a/tools/test/bin/onos-install
+++ b/tools/test/bin/onos-install
@@ -32,6 +32,10 @@
# Remove any previous ON.Lab bits from ~/.m2 repo
rm -fr ~/.m2/repository/org/onlab
+
+ # Drop log level for the console
+ echo "log4j.logger.org.apache.sshd = WARN" >> $ONOS_INSTALL_DIR/$KARAF_DIST/etc/org.ops4j.pax.logging.cfg
+
"
# Configure the ONOS installation
diff --git a/tools/test/bin/onos-patch-vm b/tools/test/bin/onos-patch-vm
index 52038ce..ccc4007 100755
--- a/tools/test/bin/onos-patch-vm
+++ b/tools/test/bin/onos-patch-vm
@@ -15,7 +15,7 @@
ssh $remote "
sudo perl -pi.bak -e \"s/127.0.1.1.*/127.0.1.1 $name/g\" /etc/hosts
- sudo perl -pi.bak -e \"local \$/ = ''; s/.*/$name/g\" /etc/hostname
+ sudo bash -c \"echo $name >/etc/hostname\"
sudo hostname $name
" 2>/dev/null
diff --git a/tools/test/bin/onos-push-keys b/tools/test/bin/onos-push-keys
index fd49f86..247d331 100755
--- a/tools/test/bin/onos-push-keys
+++ b/tools/test/bin/onos-push-keys
@@ -9,5 +9,9 @@
remote=$ONOS_USER@${1:-$OCI}
scp -q ~/.ssh/id_rsa.pub $remote:/tmp
-ssh $remote "cat /tmp/id_rsa.pub >> ~/.ssh/authorized_keys"
+ssh $remote "
+ cat /tmp/id_rsa.pub >> ~/.ssh/authorized_keys
+ sort -u ~/.ssh/authorized_keys > ~/.ssh/authorized_keys.bak
+ mv ~/.ssh/authorized_keys.bak ~/.ssh/authorized_keys
+"
ssh -n -o PasswordAuthentication=no $remote true
diff --git a/tools/test/bin/onos-push-update-bundle b/tools/test/bin/onos-push-update-bundle
new file mode 100755
index 0000000..6073bac
--- /dev/null
+++ b/tools/test/bin/onos-push-update-bundle
@@ -0,0 +1,21 @@
+#!/bin/bash
+#-------------------------------------------------------------------------------
+# Pushes the specified bundle to the remote ONOS cell machines and updates it.
+#-------------------------------------------------------------------------------
+
+[ ! -d "$ONOS_ROOT" ] && echo "ONOS_ROOT is not defined" >&2 && exit 1
+. $ONOS_ROOT/tools/build/envDefaults
+
+cd ~/.m2/repository
+jar=$(find org/onlab -type f -name '*.jar' | grep $1 | grep -v -e -tests | head -n 1)
+
+[ -z "$jar" ] && echo "No bundle $1 found for" && exit 1
+
+bundle=$(echo $(basename $jar .jar) | sed 's/-[0-9].*//g')
+
+nodes=$(env | sort | egrep "OC[0-9]+" | cut -d= -f2)
+for node in $nodes; do
+ scp -q $jar $ONOS_USER@$node:$ONOS_INSTALL_DIR/$KARAF_DIST/system/$jar
+ ssh $ONOS_USER@$node "ls -l $ONOS_INSTALL_DIR/$KARAF_DIST/system/$jar"
+ ssh $ONOS_USER@$node "$ONOS_INSTALL_DIR/bin/onos \"bundle:update -f $bundle\"" 2>/dev/null
+done
diff --git a/tools/test/topos/tower.py b/tools/test/topos/tower.py
new file mode 100644
index 0000000..b8a5f7c
--- /dev/null
+++ b/tools/test/topos/tower.py
@@ -0,0 +1,101 @@
+#!/usr/bin/env python
+from mininet.cli import CLI
+from mininet.net import Mininet
+from mininet.node import RemoteController, OVSKernelSwitch
+
+MAC = 12
+DPID = 16
+
+def string_to_hex(s, length):
+ """ Convert a string like 00:00 in to hex 0x0000 format"""
+ tmp = '{0:#x}'.format(int(s.replace(':', '').lstrip('0'),length))
+ return tmp
+
+def hex_to_string(h, length):
+ """Convert a hex number from 0x0000 to 00:00 format"""
+ tmp = h.lstrip('0x').zfill(length)
+ tmp = ':'.join(a+b for a,b in zip(tmp[::2], tmp[1::2]))
+ return tmp
+
+class Tower(object):
+ """ Create a tower topology from semi-scratch in Mininet """
+
+ def __init__(self, cname='flare', cip='15.255.126.183', k=4, h=6,
+ proto=None):
+ """Create tower topology for mininet
+ cname: controller name
+ cip: controller ip
+ k: number of leaf switches
+ h: number of hosts perl leaf switch
+ """
+
+ # We are creating the controller with local-loopback on purpose to avoid
+ # having the switches connect immediately. Instead, we'll set controller
+ # explicitly for each switch after configuring it as we want.
+ self.flare = RemoteController(cname, '127.0.0.1', 6633)
+ self.net = Mininet(controller=self.flare, switch = OVSKernelSwitch,
+ build=False)
+
+ self.cip = cip
+ self.spines = []
+ self.leaves = []
+ self.hosts = []
+ self.proto = proto
+
+ # Create the two spine switches
+ self.spines.append(self.net.addSwitch('s1'))
+ self.spines.append(self.net.addSwitch('s2'))
+
+ # Create two links between the spine switches
+ self.net.addLink(self.spines[0], self.spines[1])
+ self.net.addLink(self.spines[1], self.spines[0])
+
+ # Now create the leaf switches, their hosts and connect them together
+ i = 1
+ c = 0
+ while i <= k:
+ self.leaves.append(self.net.addSwitch('s1%d' % i))
+ for spine in self.spines:
+ self.net.addLink(self.leaves[i-1], spine)
+
+ j = 1
+ while j <= h:
+ self.hosts.append(self.net.addHost('h%d%d' % (i, j)))
+ self.net.addLink(self.hosts[c], self.leaves[i-1])
+ j+=1
+ c+=1
+
+ i+=1
+
+ def run(self):
+ """ Runs the created network topology and launches mininet cli"""
+ self.run_silent()
+ CLI(self.net)
+ self.net.stop()
+
+ def run_silent(self):
+ """ Runs silently - for unit testing """
+ self.net.build()
+
+ # Start the switches, configure them with desired protocols and only
+ # then set the controller
+ for sw in self.spines:
+ sw.start([self.flare])
+ if self.proto:
+ sw.cmd('ovs-vsctl set bridge %(sw)s protocols=%(proto)s' % \
+ { 'sw': sw.name, 'proto': self.proto})
+ sw.cmdPrint('ovs-vsctl set-controller %(sw)s tcp:%(ctl)s:6633' % \
+ {'sw': sw.name, 'ctl': self.cip})
+
+ for sw in self.leaves:
+ sw.start([self.flare])
+ sw.cmdPrint('ovs-vsctl set-controller %(sw)s tcp:%(ctl)s:6633' % \
+ {'sw': sw.name, 'ctl': self.cip})
+
+ def pingAll(self):
+ """ PingAll to create flows - for unit testing """
+ self.net.pingAll()
+
+ def stop(self):
+ "Stops the topology. You should call this after run_silent"
+ self.net.stop()
diff --git a/utils/misc/src/main/java/org/onlab/util/Timer.java b/utils/misc/src/main/java/org/onlab/util/Timer.java
index 7719fa1..6a3be49 100644
--- a/utils/misc/src/main/java/org/onlab/util/Timer.java
+++ b/utils/misc/src/main/java/org/onlab/util/Timer.java
@@ -28,8 +28,9 @@
private static synchronized void initTimer() {
if (Timer.timer == null) {
- Timer.timer = new HashedWheelTimer();
- Timer.timer.start();
+ HashedWheelTimer hwTimer = new HashedWheelTimer();
+ hwTimer.start();
+ Timer.timer = hwTimer;
}
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/AsyncResponse.java b/utils/netty/src/main/java/org/onlab/netty/AsyncResponse.java
index f4024a4..1772a3c 100644
--- a/utils/netty/src/main/java/org/onlab/netty/AsyncResponse.java
+++ b/utils/netty/src/main/java/org/onlab/netty/AsyncResponse.java
@@ -8,16 +8,15 @@
* This class provides a base implementation of Response, with methods to retrieve the
* result and query to see if the result is ready. The result can only be retrieved when
* it is ready and the get methods will block if the result is not ready yet.
- * @param <T> type of response.
*/
-public class AsyncResponse<T> implements Response<T> {
+public class AsyncResponse implements Response {
- private T value;
+ private byte[] value;
private boolean done = false;
private final long start = System.nanoTime();
@Override
- public T get(long timeout, TimeUnit timeUnit) throws TimeoutException {
+ public byte[] get(long timeout, TimeUnit timeUnit) throws TimeoutException {
timeout = timeUnit.toNanos(timeout);
boolean interrupted = false;
try {
@@ -43,7 +42,7 @@
}
@Override
- public T get() throws InterruptedException {
+ public byte[] get() throws InterruptedException {
throw new UnsupportedOperationException();
}
@@ -57,11 +56,10 @@
* available.
* @param data response data.
*/
- @SuppressWarnings("unchecked")
- public synchronized void setResponse(Object data) {
+ public synchronized void setResponse(byte[] data) {
if (!done) {
done = true;
- value = (T) data;
+ value = data;
this.notifyAll();
}
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/EchoHandler.java b/utils/netty/src/main/java/org/onlab/netty/EchoHandler.java
index 6ba4bdf..b038db8 100644
--- a/utils/netty/src/main/java/org/onlab/netty/EchoHandler.java
+++ b/utils/netty/src/main/java/org/onlab/netty/EchoHandler.java
@@ -5,6 +5,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+//FIXME: Should be move out to test or app
/**
* Message handler that echos the message back to the sender.
*/
diff --git a/utils/netty/src/main/java/org/onlab/netty/Endpoint.java b/utils/netty/src/main/java/org/onlab/netty/Endpoint.java
index d6a87b5..660f2b9 100644
--- a/utils/netty/src/main/java/org/onlab/netty/Endpoint.java
+++ b/utils/netty/src/main/java/org/onlab/netty/Endpoint.java
@@ -1,5 +1,9 @@
package org.onlab.netty;
+import java.util.Objects;
+
+import com.google.common.base.MoreObjects;
+
/**
* Representation of a TCP/UDP communication end point.
*/
@@ -32,16 +36,15 @@
@Override
public String toString() {
- return "Endpoint [port=" + port + ", host=" + host + "]";
+ return MoreObjects.toStringHelper(getClass())
+ .add("port", port)
+ .add("host", host)
+ .toString();
}
@Override
public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + ((host == null) ? 0 : host.hashCode());
- result = prime * result + port;
- return result;
+ return Objects.hash(host, port);
}
@Override
@@ -55,17 +58,8 @@
if (getClass() != obj.getClass()) {
return false;
}
- Endpoint other = (Endpoint) obj;
- if (host == null) {
- if (other.host != null) {
- return false;
- }
- } else if (!host.equals(other.host)) {
- return false;
- }
- if (port != other.port) {
- return false;
- }
- return true;
+ Endpoint that = (Endpoint) obj;
+ return Objects.equals(this.port, that.port) &&
+ Objects.equals(this.host, that.host);
}
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java b/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
index 96cbe79..938ec7b 100644
--- a/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
+++ b/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
@@ -8,12 +8,13 @@
*/
public final class InternalMessage implements Message {
+ public static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGIG_REQUEST_REPLY";
+
private long id;
private Endpoint sender;
private String type;
- private Object payload;
+ private byte[] payload;
private transient NettyMessagingService messagingService;
- public static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGIG_REQUEST_REPLY";
// Must be created using the Builder.
private InternalMessage() {}
@@ -31,7 +32,7 @@
}
@Override
- public Object payload() {
+ public byte[] payload() {
return payload;
}
@@ -40,7 +41,7 @@
}
@Override
- public void respond(Object data) throws IOException {
+ public void respond(byte[] data) throws IOException {
Builder builder = new Builder(messagingService);
InternalMessage message = builder.withId(this.id)
// FIXME: Sender should be messagingService.localEp.
@@ -55,7 +56,7 @@
/**
* Builder for InternalMessages.
*/
- public static class Builder {
+ public static final class Builder {
private InternalMessage message;
public Builder(NettyMessagingService messagingService) {
@@ -77,7 +78,7 @@
message.sender = sender;
return this;
}
- public Builder withPayload(Object payload) {
+ public Builder withPayload(byte[] payload) {
message.payload = payload;
return this;
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java b/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
index 6df0b23..b8efb51 100644
--- a/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
+++ b/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
@@ -6,10 +6,11 @@
import java.util.ArrayList;
import java.util.HashMap;
+//FIXME: Should be move out to test or app
/**
* Kryo Serializer.
*/
-public class KryoSerializer implements Serializer {
+public class KryoSerializer {
private KryoPool serializerPool;
@@ -27,29 +28,26 @@
HashMap.class,
ArrayList.class,
InternalMessage.class,
- Endpoint.class
+ Endpoint.class,
+ byte[].class
)
.build()
.populate(1);
}
- @Override
public <T> T decode(byte[] data) {
return serializerPool.deserialize(data);
}
- @Override
public byte[] encode(Object payload) {
return serializerPool.serialize(payload);
}
- @Override
public <T> T decode(ByteBuffer buffer) {
return serializerPool.deserialize(buffer);
}
- @Override
public void encode(Object obj, ByteBuffer buffer) {
serializerPool.serialize(obj, buffer);
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/LoggingHandler.java b/utils/netty/src/main/java/org/onlab/netty/LoggingHandler.java
index 23c4073..366898b 100644
--- a/utils/netty/src/main/java/org/onlab/netty/LoggingHandler.java
+++ b/utils/netty/src/main/java/org/onlab/netty/LoggingHandler.java
@@ -12,6 +12,6 @@
@Override
public void handle(Message message) {
- log.info("Received message. Payload: " + message.payload());
+ log.info("Received message. Payload has {} bytes", message.payload().length);
}
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/Message.java b/utils/netty/src/main/java/org/onlab/netty/Message.java
index 54b9526..87a8bb6 100644
--- a/utils/netty/src/main/java/org/onlab/netty/Message.java
+++ b/utils/netty/src/main/java/org/onlab/netty/Message.java
@@ -12,12 +12,12 @@
* Returns the payload of this message.
* @return message payload.
*/
- public Object payload();
+ public byte[] payload();
/**
- * Sends a reply back to the sender of this messge.
+ * Sends a reply back to the sender of this message.
* @param data payload of the response.
* @throws IOException if there is a communication error.
*/
- public void respond(Object data) throws IOException;
+ public void respond(byte[] data) throws IOException;
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
index a0d34a5..d4832e5 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
@@ -14,14 +14,14 @@
public class MessageDecoder extends ReplayingDecoder<DecoderState> {
private final NettyMessagingService messagingService;
- private final Serializer serializer;
+
+ private static final KryoSerializer SERIALIZER = new KryoSerializer();
private int contentLength;
- public MessageDecoder(NettyMessagingService messagingService, Serializer serializer) {
+ public MessageDecoder(NettyMessagingService messagingService) {
super(DecoderState.READ_HEADER_VERSION);
this.messagingService = messagingService;
- this.serializer = serializer;
}
@Override
@@ -48,7 +48,7 @@
checkState(serializerVersion == MessageEncoder.SERIALIZER_VERSION, "Unexpected serializer version");
checkpoint(DecoderState.READ_CONTENT);
case READ_CONTENT:
- InternalMessage message = serializer.decode(buffer.readBytes(contentLength).nioBuffer());
+ InternalMessage message = SERIALIZER.decode(buffer.readBytes(contentLength).nioBuffer());
message.setMessagingService(messagingService);
out.add(message);
checkpoint(DecoderState.READ_HEADER_VERSION);
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
index 5581747..716efb9 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
@@ -17,11 +17,7 @@
public static final int SERIALIZER_VERSION = 1;
- private final Serializer serializer;
-
- public MessageEncoder(Serializer serializer) {
- this.serializer = serializer;
- }
+ private static final KryoSerializer SERIALIZER = new KryoSerializer();
@Override
protected void encode(
@@ -35,12 +31,17 @@
// write preamble
out.writeBytes(PREAMBLE);
- byte[] payload = serializer.encode(message);
+ try {
+ SERIALIZER.encode(message);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ byte[] payload = SERIALIZER.encode(message);
// write payload length
out.writeInt(payload.length);
- // write serializer version
+ // write payloadSerializer version
out.writeInt(SERIALIZER_VERSION);
// write payload.
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessagingService.java b/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
index ebad442..08676ac 100644
--- a/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
@@ -11,10 +11,10 @@
* The message is specified using the type and payload.
* @param ep end point to send the message to.
* @param type type of message.
- * @param payload message payload.
+ * @param payload message payload bytes.
* @throws IOException
*/
- public void sendAsync(Endpoint ep, String type, Object payload) throws IOException;
+ public void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException;
/**
* Sends a message synchronously and waits for a response.
@@ -24,7 +24,7 @@
* @return a response future
* @throws IOException
*/
- public <T> Response<T> sendAndReceive(Endpoint ep, String type, Object payload) throws IOException;
+ public Response sendAndReceive(Endpoint ep, String type, byte[] payload) throws IOException;
/**
* Registers a new message handler for message type.
@@ -38,4 +38,4 @@
* @param type message type
*/
public void unregisterHandler(String type);
-}
+}
\ No newline at end of file
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
index f0c4861..48aeb30 100644
--- a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -43,7 +43,7 @@
private final EventLoopGroup bossGroup = new NioEventLoopGroup();
private final EventLoopGroup workerGroup = new NioEventLoopGroup();
private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
- private final Cache<Long, AsyncResponse<?>> responseFutures = CacheBuilder.newBuilder()
+ private final Cache<Long, AsyncResponse> responseFutures = CacheBuilder.newBuilder()
.maximumSize(100000)
.weakValues()
// TODO: Once the entry expires, notify blocking threads (if any).
@@ -52,8 +52,6 @@
private final GenericKeyedObjectPool<Endpoint, Channel> channels
= new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
- protected Serializer serializer;
-
public NettyMessagingService() {
// TODO: Default port should be configurable.
this(8080);
@@ -83,7 +81,7 @@
}
@Override
- public void sendAsync(Endpoint ep, String type, Object payload) throws IOException {
+ public void sendAsync(Endpoint ep, String type, byte[] payload) throws IOException {
InternalMessage message = new InternalMessage.Builder(this)
.withId(RandomUtils.nextLong())
.withSender(localEp)
@@ -108,9 +106,9 @@
}
@Override
- public <T> Response<T> sendAndReceive(Endpoint ep, String type, Object payload)
+ public Response sendAndReceive(Endpoint ep, String type, byte[] payload)
throws IOException {
- AsyncResponse<T> futureResponse = new AsyncResponse<T>();
+ AsyncResponse futureResponse = new AsyncResponse();
Long messageId = RandomUtils.nextLong();
responseFutures.put(messageId, futureResponse);
InternalMessage message = new InternalMessage.Builder(this)
@@ -133,10 +131,6 @@
handlers.remove(type);
}
- public void setSerializer(Serializer serializer) {
- this.serializer = serializer;
- }
-
private MessageHandler getMessageHandler(String type) {
return handlers.get(type);
}
@@ -201,13 +195,13 @@
private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
private final ChannelHandler dispatcher = new InboundMessageDispatcher();
- private final ChannelHandler encoder = new MessageEncoder(serializer);
+ private final ChannelHandler encoder = new MessageEncoder();
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline()
.addLast("encoder", encoder)
- .addLast("decoder", new MessageDecoder(NettyMessagingService.this, serializer))
+ .addLast("decoder", new MessageDecoder(NettyMessagingService.this))
.addLast("handler", dispatcher);
}
}
@@ -236,12 +230,13 @@
String type = message.type();
if (type.equals(InternalMessage.REPLY_MESSAGE_TYPE)) {
try {
- AsyncResponse<?> futureResponse =
+ AsyncResponse futureResponse =
NettyMessagingService.this.responseFutures.getIfPresent(message.id());
if (futureResponse != null) {
futureResponse.setResponse(message.payload());
+ } else {
+ log.warn("Received a reply. But was unable to locate the request handle");
}
- log.warn("Received a reply. But was unable to locate the request handle");
} finally {
NettyMessagingService.this.responseFutures.invalidate(message.id());
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/Response.java b/utils/netty/src/main/java/org/onlab/netty/Response.java
index 04675ce..150755e 100644
--- a/utils/netty/src/main/java/org/onlab/netty/Response.java
+++ b/utils/netty/src/main/java/org/onlab/netty/Response.java
@@ -7,26 +7,24 @@
* Response object returned when making synchronous requests.
* Can you used to check is a response is ready and/or wait for a response
* to become available.
- *
- * @param <T> type of response.
*/
-public interface Response<T> {
+public interface Response {
/**
* Gets the response waiting for a designated timeout period.
* @param timeout timeout period (since request was sent out)
* @param tu unit of time.
- * @return response
+ * @return response payload
* @throws TimeoutException if the timeout expires before the response arrives.
*/
- public T get(long timeout, TimeUnit tu) throws TimeoutException;
+ public byte[] get(long timeout, TimeUnit tu) throws TimeoutException;
/**
* Gets the response waiting for indefinite timeout period.
- * @return response
+ * @return response payload
* @throws InterruptedException if the thread is interrupted before the response arrives.
*/
- public T get() throws InterruptedException;
+ public byte[] get() throws InterruptedException;
/**
* Checks if the response is ready without blocking.
diff --git a/utils/netty/src/main/java/org/onlab/netty/Serializer.java b/utils/netty/src/main/java/org/onlab/netty/Serializer.java
deleted file mode 100644
index 46550d4..0000000
--- a/utils/netty/src/main/java/org/onlab/netty/Serializer.java
+++ /dev/null
@@ -1,41 +0,0 @@
-package org.onlab.netty;
-
-import java.nio.ByteBuffer;
-
-/**
- * Interface for encoding/decoding message payloads.
- */
-public interface Serializer {
-
- /**
- * Decodes the specified byte array to a POJO.
- *
- * @param data byte array.
- * @return POJO
- */
- public <T> T decode(byte[] data);
-
- /**
- * Encodes the specified POJO into a byte array.
- *
- * @param data POJO to be encoded
- * @return byte array.
- */
- public byte[] encode(Object data);
-
- /**
- * Encodes the specified POJO into a byte buffer.
- *
- * @param data POJO to be encoded
- * @param buffer to write serialized bytes
- */
- public void encode(final Object data, ByteBuffer buffer);
-
- /**
- * Decodes the specified byte buffer to a POJO.
- *
- * @param buffer bytes to be decoded
- * @return POJO
- */
- public <T> T decode(final ByteBuffer buffer);
-}
diff --git a/utils/netty/src/main/java/org/onlab/netty/SimpleClient.java b/utils/netty/src/main/java/org/onlab/netty/SimpleClient.java
index 5ce8f2e..3869948 100644
--- a/utils/netty/src/main/java/org/onlab/netty/SimpleClient.java
+++ b/utils/netty/src/main/java/org/onlab/netty/SimpleClient.java
@@ -8,6 +8,7 @@
import com.codahale.metrics.Timer;
+// FIXME: Should be move out to test or app
public final class SimpleClient {
private SimpleClient() {
}
@@ -23,7 +24,7 @@
final int warmup = 100;
for (int i = 0; i < warmup; i++) {
Timer.Context context = sendAsyncTimer.time();
- messaging.sendAsync(new Endpoint("localhost", 8080), "simple", "Hello World");
+ messaging.sendAsync(new Endpoint("localhost", 8080), "simple", "Hello World".getBytes());
context.stop();
}
metrics.registerMetric(component, feature, "AsyncTimer", sendAsyncTimer);
@@ -32,10 +33,10 @@
final int iterations = 1000000;
for (int i = 0; i < iterations; i++) {
Timer.Context context = sendAndReceiveTimer.time();
- Response<String> response = messaging
+ Response response = messaging
.sendAndReceive(new Endpoint("localhost", 8080), "echo",
- "Hello World");
- System.out.println("Got back:" + response.get(2, TimeUnit.SECONDS));
+ "Hello World".getBytes());
+ System.out.println("Got back:" + new String(response.get(2, TimeUnit.SECONDS)));
context.stop();
}
metrics.registerMetric(component, feature, "AsyncTimer", sendAndReceiveTimer);
@@ -44,8 +45,6 @@
public static class TestNettyMessagingService extends NettyMessagingService {
public TestNettyMessagingService(int port) throws Exception {
super(port);
- Serializer serializer = new KryoSerializer();
- this.serializer = serializer;
}
}
}
diff --git a/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java b/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java
index 6a93149..b8ae5b0 100644
--- a/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java
+++ b/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java
@@ -1,12 +1,12 @@
package org.onlab.netty;
+//FIXME: Should be move out to test or app
public final class SimpleServer {
private SimpleServer() {}
public static void main(String... args) throws Exception {
NettyMessagingService server = new NettyMessagingService(8080);
server.activate();
- server.setSerializer(new KryoSerializer());
server.registerHandler("simple", new LoggingHandler());
server.registerHandler("echo", new EchoHandler());
}
diff --git a/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java b/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java
new file mode 100644
index 0000000..36d2a1e
--- /dev/null
+++ b/utils/netty/src/test/java/org/onlab/netty/PingPongTest.java
@@ -0,0 +1,30 @@
+package org.onlab.netty;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.lang3.RandomUtils;
+import static org.junit.Assert.*;
+import org.junit.Test;
+
+/**
+ * Simple ping-pong test that exercises NettyMessagingService.
+ */
+public class PingPongTest {
+
+ @Test
+ public void testPingPong() throws Exception {
+ NettyMessagingService pinger = new NettyMessagingService(8085);
+ NettyMessagingService ponger = new NettyMessagingService(9086);
+ try {
+ pinger.activate();
+ ponger.activate();
+ ponger.registerHandler("echo", new EchoHandler());
+ byte[] payload = RandomUtils.nextBytes(100);
+ Response response = pinger.sendAndReceive(new Endpoint("localhost", 9086), "echo", payload);
+ assertArrayEquals(payload, response.get(10000, TimeUnit.MILLISECONDS));
+ } finally {
+ pinger.deactivate();
+ ponger.deactivate();
+ }
+ }
+}