Merge branch 'master' of ssh://gerrit.onlab.us:29418/onos-next
diff --git a/apps/config/src/main/java/org/onlab/onos/config/AddressEntry.java b/apps/config/src/main/java/org/onlab/onos/config/AddressEntry.java
index 318aebd..081efed 100644
--- a/apps/config/src/main/java/org/onlab/onos/config/AddressEntry.java
+++ b/apps/config/src/main/java/org/onlab/onos/config/AddressEntry.java
@@ -3,8 +3,6 @@
import java.util.List;
import org.codehaus.jackson.annotate.JsonProperty;
-import org.onlab.packet.IpPrefix;
-import org.onlab.packet.MacAddress;
/**
* Represents a set of addresses bound to a port.
@@ -12,8 +10,8 @@
public class AddressEntry {
private String dpid;
private short portNumber;
- private List<IpPrefix> ipAddresses;
- private MacAddress macAddress;
+ private List<String> ipAddresses;
+ private String macAddress;
public String getDpid() {
return dpid;
@@ -33,21 +31,21 @@
this.portNumber = portNumber;
}
- public List<IpPrefix> getIpAddresses() {
+ public List<String> getIpAddresses() {
return ipAddresses;
}
@JsonProperty("ips")
- public void setIpAddresses(List<IpPrefix> ipAddresses) {
- this.ipAddresses = ipAddresses;
+ public void setIpAddresses(List<String> strIps) {
+ this.ipAddresses = strIps;
}
- public MacAddress getMacAddress() {
+ public String getMacAddress() {
return macAddress;
}
@JsonProperty("mac")
- public void setMacAddress(MacAddress macAddress) {
+ public void setMacAddress(String macAddress) {
this.macAddress = macAddress;
}
}
diff --git a/apps/config/src/main/java/org/onlab/onos/config/NetworkConfigReader.java b/apps/config/src/main/java/org/onlab/onos/config/NetworkConfigReader.java
index 985c4a2..4f1a48a 100644
--- a/apps/config/src/main/java/org/onlab/onos/config/NetworkConfigReader.java
+++ b/apps/config/src/main/java/org/onlab/onos/config/NetworkConfigReader.java
@@ -5,6 +5,8 @@
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -17,10 +19,10 @@
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.host.HostAdminService;
import org.onlab.onos.net.host.PortAddresses;
+import org.onlab.packet.IpPrefix;
+import org.onlab.packet.MacAddress;
import org.slf4j.Logger;
-import com.google.common.collect.Sets;
-
/**
* Simple configuration module to read in supplementary network configuration
* from a file.
@@ -51,9 +53,29 @@
DeviceId.deviceId(dpidToUri(entry.getDpid())),
PortNumber.portNumber(entry.getPortNumber()));
+ Set<IpPrefix> ipAddresses = new HashSet<IpPrefix>();
+
+ for (String strIp : entry.getIpAddresses()) {
+ try {
+ IpPrefix address = IpPrefix.valueOf(strIp);
+ ipAddresses.add(address);
+ } catch (IllegalArgumentException e) {
+ log.warn("Bad format for IP address in config: {}", strIp);
+ }
+ }
+
+ MacAddress macAddress = null;
+ if (entry.getMacAddress() != null) {
+ try {
+ macAddress = MacAddress.valueOf(entry.getMacAddress());
+ } catch (IllegalArgumentException e) {
+ log.warn("Bad format for MAC address in config: {}",
+ entry.getMacAddress());
+ }
+ }
+
PortAddresses addresses = new PortAddresses(cp,
- Sets.newHashSet(entry.getIpAddresses()),
- entry.getMacAddress());
+ ipAddresses, macAddress);
hostAdminService.bindAddressesToPort(addresses);
}
diff --git a/apps/fwd/src/main/java/org/onlab/onos/fwd/ReactiveForwarding.java b/apps/fwd/src/main/java/org/onlab/onos/fwd/ReactiveForwarding.java
index f7fbdbb..aaf5350 100644
--- a/apps/fwd/src/main/java/org/onlab/onos/fwd/ReactiveForwarding.java
+++ b/apps/fwd/src/main/java/org/onlab/onos/fwd/ReactiveForwarding.java
@@ -38,6 +38,8 @@
@Component(immediate = true)
public class ReactiveForwarding {
+ private static final int TIMEOUT = 10;
+
private final Logger log = getLogger(getClass());
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
@@ -184,15 +186,15 @@
Ethernet inPkt = context.inPacket().parsed();
TrafficSelector.Builder builder = new DefaultTrafficSelector.Builder();
builder.matchEthType(inPkt.getEtherType())
- .matchEthSrc(inPkt.getSourceMAC())
- .matchEthDst(inPkt.getDestinationMAC())
- .matchInport(context.inPacket().receivedFrom().port());
+ .matchEthSrc(inPkt.getSourceMAC())
+ .matchEthDst(inPkt.getDestinationMAC())
+ .matchInport(context.inPacket().receivedFrom().port());
TrafficTreatment.Builder treat = new DefaultTrafficTreatment.Builder();
treat.setOutput(portNumber);
FlowRule f = new DefaultFlowRule(context.inPacket().receivedFrom().deviceId(),
- builder.build(), treat.build(), 0, appId);
+ builder.build(), treat.build(), 0, appId, TIMEOUT);
flowRuleService.applyFlowRules(f);
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/DefaultFlowRule.java b/core/api/src/main/java/org/onlab/onos/net/flow/DefaultFlowRule.java
index f705a94..bb4805b 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/DefaultFlowRule.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/DefaultFlowRule.java
@@ -27,11 +27,12 @@
private final ApplicationId appId;
- private boolean expired;
+ private final int timeout;
public DefaultFlowRule(DeviceId deviceId, TrafficSelector selector,
TrafficTreatment treatment, int priority, FlowRuleState state,
- long life, long packets, long bytes, long flowId, boolean expired) {
+ long life, long packets, long bytes, long flowId, boolean expired,
+ int timeout) {
this.deviceId = deviceId;
this.priority = priority;
this.selector = selector;
@@ -39,26 +40,30 @@
this.state = state;
this.appId = ApplicationId.valueOf((int) (flowId >> 32));
this.id = FlowId.valueOf(flowId);
- this.expired = expired;
this.life = life;
this.packets = packets;
this.bytes = bytes;
this.created = System.currentTimeMillis();
+ this.timeout = timeout;
}
public DefaultFlowRule(DeviceId deviceId, TrafficSelector selector,
- TrafficTreatment treatement, int priority, ApplicationId appId) {
- this(deviceId, selector, treatement, priority, FlowRuleState.CREATED, appId);
+ TrafficTreatment treatement, int priority, ApplicationId appId,
+ int timeout) {
+ this(deviceId, selector, treatement, priority,
+ FlowRuleState.CREATED, appId, timeout);
}
public DefaultFlowRule(FlowRule rule, FlowRuleState state) {
this(rule.deviceId(), rule.selector(), rule.treatment(),
- rule.priority(), state, rule.id(), rule.appId());
+ rule.priority(), state, rule.id(), rule.appId(),
+ rule.timeout());
}
private DefaultFlowRule(DeviceId deviceId,
TrafficSelector selector, TrafficTreatment treatment,
- int priority, FlowRuleState state, ApplicationId appId) {
+ int priority, FlowRuleState state, ApplicationId appId,
+ int timeout) {
this.deviceId = deviceId;
this.priority = priority;
this.selector = selector;
@@ -69,13 +74,16 @@
this.bytes = 0;
this.appId = appId;
+ this.timeout = timeout;
+
this.id = FlowId.valueOf((((long) appId().id()) << 32) | (this.hash() & 0xffffffffL));
this.created = System.currentTimeMillis();
}
private DefaultFlowRule(DeviceId deviceId,
TrafficSelector selector, TrafficTreatment treatment,
- int priority, FlowRuleState state, FlowId flowId, ApplicationId appId) {
+ int priority, FlowRuleState state, FlowId flowId, ApplicationId appId,
+ int timeout) {
this.deviceId = deviceId;
this.priority = priority;
this.selector = selector;
@@ -86,6 +94,7 @@
this.bytes = 0;
this.appId = appId;
this.id = flowId;
+ this.timeout = timeout;
this.created = System.currentTimeMillis();
}
@@ -149,7 +158,7 @@
* @see java.lang.Object#equals(java.lang.Object)
*/
public int hashCode() {
- return Objects.hash(deviceId, id);
+ return Objects.hash(deviceId, selector, priority);
}
public int hash() {
@@ -170,7 +179,10 @@
if (obj instanceof DefaultFlowRule) {
DefaultFlowRule that = (DefaultFlowRule) obj;
return Objects.equals(deviceId, that.deviceId) &&
- Objects.equals(id, that.id);
+ //Objects.equals(id, that.id) &&
+ Objects.equals(priority, that.priority) &&
+ Objects.equals(selector, that.selector);
+
}
return false;
}
@@ -181,16 +193,16 @@
.add("id", id)
.add("deviceId", deviceId)
.add("priority", priority)
- .add("selector", selector)
- .add("treatment", treatment)
+ .add("selector", selector.criteria())
+ .add("treatment", treatment == null ? "N/A" : treatment.instructions())
.add("created", created)
.add("state", state)
.toString();
}
@Override
- public boolean expired() {
- return expired;
+ public int timeout() {
+ return timeout > MAX_TIMEOUT ? MAX_TIMEOUT : this.timeout;
}
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/DefaultTrafficSelector.java b/core/api/src/main/java/org/onlab/onos/net/flow/DefaultTrafficSelector.java
index 8f68ea5..d792c7e 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/DefaultTrafficSelector.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/DefaultTrafficSelector.java
@@ -3,8 +3,9 @@
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.flow.criteria.Criteria;
@@ -16,22 +17,42 @@
public final class DefaultTrafficSelector implements TrafficSelector {
- private final List<Criterion> selector;
+ private final Set<Criterion> selector;
- private DefaultTrafficSelector(List<Criterion> selector) {
- this.selector = Collections.unmodifiableList(selector);
+ private DefaultTrafficSelector(Set<Criterion> selector) {
+ this.selector = Collections.unmodifiableSet(selector);
}
@Override
- public List<Criterion> criteria() {
+ public Set<Criterion> criteria() {
return selector;
}
+ @Override
+ public int hashCode() {
+ return Objects.hash(selector);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj instanceof DefaultTrafficSelector) {
+ DefaultTrafficSelector that = (DefaultTrafficSelector) obj;
+ return Objects.equals(selector, that.selector);
+
+ }
+ return false;
+ }
+
+
+
public static class Builder implements TrafficSelector.Builder {
private final Logger log = getLogger(getClass());
- private final List<Criterion> selector = new LinkedList<>();
+ private final Set<Criterion> selector = new HashSet<>();
@Override
public Builder add(Criterion criterion) {
@@ -39,38 +60,47 @@
return this;
}
+ @Override
public Builder matchInport(PortNumber port) {
return add(Criteria.matchInPort(port));
}
+ @Override
public Builder matchEthSrc(MacAddress addr) {
return add(Criteria.matchEthSrc(addr));
}
+ @Override
public Builder matchEthDst(MacAddress addr) {
return add(Criteria.matchEthDst(addr));
}
+ @Override
public Builder matchEthType(short ethType) {
return add(Criteria.matchEthType(ethType));
}
+ @Override
public Builder matchVlanId(VlanId vlanId) {
return add(Criteria.matchVlanId(vlanId));
}
+ @Override
public Builder matchVlanPcp(Byte vlanPcp) {
return add(Criteria.matchVlanPcp(vlanPcp));
}
+ @Override
public Builder matchIPProtocol(Byte proto) {
return add(Criteria.matchIPProtocol(proto));
}
+ @Override
public Builder matchIPSrc(IpPrefix ip) {
return add(Criteria.matchIPSrc(ip));
}
+ @Override
public Builder matchIPDst(IpPrefix ip) {
return add(Criteria.matchIPDst(ip));
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRule.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRule.java
index 2728e21..4d1b3cf 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRule.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRule.java
@@ -9,6 +9,7 @@
*/
public interface FlowRule {
+ static final int MAX_TIMEOUT = 60;
public enum FlowRuleState {
/**
@@ -112,10 +113,9 @@
long bytes();
/**
- * Indicates that this flow has expired at the device.
- *
- * @return true if it has expired, false otherwise
+ * Returns the timeout for this flow requested by an application.
+ * @return integer value of the timeout
*/
- boolean expired();
+ int timeout();
}
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java
index b2c3d30..c4e2f92 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProvider.java
@@ -8,6 +8,8 @@
*/
public interface FlowRuleProvider extends Provider {
+ static final int POLL_INTERVAL = 5;
+
/**
* Instructs the provider to apply the specified flow rules to their
* respective devices.
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProviderService.java b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProviderService.java
index 01e4372..2076103 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProviderService.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/FlowRuleProviderService.java
@@ -17,27 +17,6 @@
void flowRemoved(FlowRule flowRule);
/**
- * Signals that a flow rule is missing for some network traffic.
- *
- * @param flowRule information about traffic in need of flow rule(s)
- */
- void flowMissing(FlowRule flowRule);
-
- /**
- * Signals that a flow rule is on the switch but not in the store.
- *
- * @param flowRule the extra flow rule
- */
- void extraneousFlow(FlowRule flowRule);
-
- /**
- * Signals that a flow rule was indeed added.
- *
- * @param flowRule the added flow rule
- */
- void flowAdded(FlowRule flowRule);
-
- /**
* Pushes the collection of flow entries currently applied on the given
* device.
*
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/TrafficSelector.java b/core/api/src/main/java/org/onlab/onos/net/flow/TrafficSelector.java
index 249d1f9..c704c8f 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/TrafficSelector.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/TrafficSelector.java
@@ -1,6 +1,6 @@
package org.onlab.onos.net.flow;
-import java.util.List;
+import java.util.Set;
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.flow.criteria.Criterion;
@@ -18,7 +18,7 @@
*
* @return list of criteria
*/
- List<Criterion> criteria();
+ Set<Criterion> criteria();
/**
* Builder of traffic selector entities.
diff --git a/core/api/src/main/java/org/onlab/onos/net/flow/criteria/Criteria.java b/core/api/src/main/java/org/onlab/onos/net/flow/criteria/Criteria.java
index 758c51c..a819bd3 100644
--- a/core/api/src/main/java/org/onlab/onos/net/flow/criteria/Criteria.java
+++ b/core/api/src/main/java/org/onlab/onos/net/flow/criteria/Criteria.java
@@ -2,6 +2,8 @@
import static com.google.common.base.MoreObjects.toStringHelper;
+import java.util.Objects;
+
import org.onlab.onos.net.PortNumber;
import org.onlab.onos.net.flow.criteria.Criterion.Type;
import org.onlab.packet.IpPrefix;
@@ -137,6 +139,25 @@
return toStringHelper(type().toString())
.add("port", port).toString();
}
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(port);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj instanceof PortCriterion) {
+ PortCriterion that = (PortCriterion) obj;
+ return Objects.equals(port, that.port);
+
+ }
+ return false;
+ }
+
}
@@ -164,6 +185,27 @@
.add("mac", mac).toString();
}
+ @Override
+ public int hashCode() {
+ return Objects.hash(mac, type);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj instanceof EthCriterion) {
+ EthCriterion that = (EthCriterion) obj;
+ return Objects.equals(mac, that.mac) &&
+ Objects.equals(type, that.type);
+
+
+ }
+ return false;
+ }
+
+
}
public static final class EthTypeCriterion implements Criterion {
@@ -189,6 +231,25 @@
.add("ethType", Long.toHexString(ethType)).toString();
}
+ @Override
+ public int hashCode() {
+ return Objects.hash(ethType);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj instanceof EthTypeCriterion) {
+ EthTypeCriterion that = (EthTypeCriterion) obj;
+ return Objects.equals(ethType, that.ethType);
+
+
+ }
+ return false;
+ }
+
}
@@ -217,6 +278,26 @@
.add("ip", ip).toString();
}
+ @Override
+ public int hashCode() {
+ return Objects.hash(ip, type);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj instanceof IPCriterion) {
+ IPCriterion that = (IPCriterion) obj;
+ return Objects.equals(ip, that.ip) &&
+ Objects.equals(type, that.type);
+
+
+ }
+ return false;
+ }
+
}
@@ -243,6 +324,25 @@
.add("protocol", Long.toHexString(proto)).toString();
}
+ @Override
+ public int hashCode() {
+ return Objects.hash(proto);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj instanceof IPProtocolCriterion) {
+ IPProtocolCriterion that = (IPProtocolCriterion) obj;
+ return Objects.equals(proto, that.proto);
+
+
+ }
+ return false;
+ }
+
}
@@ -269,6 +369,25 @@
.add("pcp", Long.toHexString(vlanPcp)).toString();
}
+ @Override
+ public int hashCode() {
+ return Objects.hash(vlanPcp);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj instanceof VlanPcpCriterion) {
+ VlanPcpCriterion that = (VlanPcpCriterion) obj;
+ return Objects.equals(vlanPcp, that.vlanPcp);
+
+
+ }
+ return false;
+ }
+
}
@@ -296,6 +415,25 @@
.add("id", vlanId).toString();
}
+ @Override
+ public int hashCode() {
+ return Objects.hash(vlanId);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj instanceof VlanIdCriterion) {
+ VlanIdCriterion that = (VlanIdCriterion) obj;
+ return Objects.equals(vlanId, that.vlanId);
+
+
+ }
+ return false;
+ }
+
}
diff --git a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
index a6f5ebb..00619b3 100644
--- a/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/flow/impl/FlowRuleManager.java
@@ -5,6 +5,9 @@
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -59,6 +62,8 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected DeviceService deviceService;
+ private final Map<FlowRule, AtomicInteger> deadRounds = new ConcurrentHashMap<>();
+
@Activate
public void activate() {
store.setDelegate(delegate);
@@ -84,6 +89,7 @@
FlowRule f = flowRules[i];
final Device device = deviceService.getDevice(f.deviceId());
final FlowRuleProvider frp = getProvider(device.providerId());
+ deadRounds.put(f, new AtomicInteger(0));
store.storeFlowRule(f);
frp.applyFlowRule(f);
}
@@ -98,6 +104,7 @@
f = flowRules[i];
device = deviceService.getDevice(f.deviceId());
frp = getProvider(device.providerId());
+ deadRounds.remove(f);
store.deleteFlowRule(f);
frp.removeFlowRule(f);
}
@@ -161,11 +168,7 @@
switch (stored.state()) {
case ADDED:
case PENDING_ADD:
- if (flowRule.expired()) {
- event = store.removeFlowRule(flowRule);
- } else {
frp.applyFlowRule(stored);
- }
break;
case PENDING_REMOVE:
case REMOVED:
@@ -181,8 +184,8 @@
}
}
- @Override
- public void flowMissing(FlowRule flowRule) {
+
+ private void flowMissing(FlowRule flowRule) {
checkNotNull(flowRule, FLOW_RULE_NULL);
checkValidity();
Device device = deviceService.getDevice(flowRule.deviceId());
@@ -209,29 +212,47 @@
}
- @Override
- public void extraneousFlow(FlowRule flowRule) {
+
+ private void extraneousFlow(FlowRule flowRule) {
checkNotNull(flowRule, FLOW_RULE_NULL);
checkValidity();
removeFlowRules(flowRule);
log.debug("Flow {} is on switch but not in store.", flowRule);
}
- @Override
- public void flowAdded(FlowRule flowRule) {
+
+ private void flowAdded(FlowRule flowRule) {
checkNotNull(flowRule, FLOW_RULE_NULL);
checkValidity();
- FlowRuleEvent event = store.addOrUpdateFlowRule(flowRule);
- if (event == null) {
- log.debug("No flow store event generated.");
+ if (deadRounds.containsKey(flowRule) &&
+ checkRuleLiveness(flowRule, store.getFlowRule(flowRule))) {
+
+ FlowRuleEvent event = store.addOrUpdateFlowRule(flowRule);
+ if (event == null) {
+ log.debug("No flow store event generated.");
+ } else {
+ log.debug("Flow {} {}", flowRule, event.type());
+ post(event);
+ }
} else {
- log.debug("Flow {} {}", flowRule, event.type());
- post(event);
+ removeFlowRules(flowRule);
}
}
+ private boolean checkRuleLiveness(FlowRule swRule, FlowRule storedRule) {
+ int timeout = storedRule.timeout();
+ if (storedRule.packets() != swRule.packets()) {
+ deadRounds.get(swRule).set(0);
+ return true;
+ }
+
+ return (deadRounds.get(swRule).getAndIncrement() *
+ FlowRuleProvider.POLL_INTERVAL) <= timeout;
+
+ }
+
// Posts the specified event to the local event dispatcher.
private void post(FlowRuleEvent event) {
if (event != null) {
diff --git a/core/net/src/main/java/org/onlab/onos/net/host/impl/HostManager.java b/core/net/src/main/java/org/onlab/onos/net/host/impl/HostManager.java
index e3f53fe..88b6923 100644
--- a/core/net/src/main/java/org/onlab/onos/net/host/impl/HostManager.java
+++ b/core/net/src/main/java/org/onlab/onos/net/host/impl/HostManager.java
@@ -1,5 +1,10 @@
package org.onlab.onos.net.host.impl;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.Set;
+
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
import org.apache.felix.scr.annotations.Deactivate;
@@ -12,6 +17,7 @@
import org.onlab.onos.net.DeviceId;
import org.onlab.onos.net.Host;
import org.onlab.onos.net.HostId;
+import org.onlab.onos.net.device.DeviceService;
import org.onlab.onos.net.host.HostAdminService;
import org.onlab.onos.net.host.HostDescription;
import org.onlab.onos.net.host.HostEvent;
@@ -23,6 +29,7 @@
import org.onlab.onos.net.host.HostStore;
import org.onlab.onos.net.host.HostStoreDelegate;
import org.onlab.onos.net.host.PortAddresses;
+import org.onlab.onos.net.packet.PacketService;
import org.onlab.onos.net.provider.AbstractProviderRegistry;
import org.onlab.onos.net.provider.AbstractProviderService;
import org.onlab.packet.IpAddress;
@@ -31,11 +38,6 @@
import org.onlab.packet.VlanId;
import org.slf4j.Logger;
-import java.util.Set;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-import static org.slf4j.LoggerFactory.getLogger;
-
/**
* Provides basic implementation of the host SB & NB APIs.
*/
@@ -59,12 +61,22 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected EventDeliveryService eventDispatcher;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected DeviceService deviceService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected PacketService packetService;
+
+ private HostMonitor monitor;
@Activate
public void activate() {
+ log.info("Started");
store.setDelegate(delegate);
eventDispatcher.addSink(HostEvent.class, listenerRegistry);
- log.info("Started");
+
+ monitor = new HostMonitor(deviceService, packetService, this);
+
}
@Deactivate
@@ -76,6 +88,8 @@
@Override
protected HostProviderService createProviderService(HostProvider provider) {
+ monitor.registerHostProvider(provider);
+
return new InternalHostProviderService(provider);
}
@@ -126,12 +140,12 @@
@Override
public void startMonitoringIp(IpAddress ip) {
- // TODO pass through to HostMonitor
+ monitor.addMonitoringFor(ip);
}
@Override
public void stopMonitoringIp(IpAddress ip) {
- // TODO pass through to HostMonitor
+ monitor.stopMonitoring(ip);
}
@Override
diff --git a/core/net/src/main/java/org/onlab/onos/net/host/impl/HostMonitor.java b/core/net/src/main/java/org/onlab/onos/net/host/impl/HostMonitor.java
index a5aa13e..9f8dd48 100644
--- a/core/net/src/main/java/org/onlab/onos/net/host/impl/HostMonitor.java
+++ b/core/net/src/main/java/org/onlab/onos/net/host/impl/HostMonitor.java
@@ -2,10 +2,11 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.jboss.netty.util.Timeout;
@@ -21,19 +22,19 @@
import org.onlab.onos.net.flow.instructions.Instruction;
import org.onlab.onos.net.flow.instructions.Instructions;
import org.onlab.onos.net.host.HostProvider;
-import org.onlab.onos.net.host.HostService;
-import org.onlab.onos.net.host.HostStore;
import org.onlab.onos.net.host.PortAddresses;
import org.onlab.onos.net.packet.DefaultOutboundPacket;
import org.onlab.onos.net.packet.OutboundPacket;
import org.onlab.onos.net.packet.PacketService;
-import org.onlab.onos.net.topology.TopologyService;
+import org.onlab.onos.net.provider.ProviderId;
import org.onlab.packet.ARP;
import org.onlab.packet.Ethernet;
import org.onlab.packet.IpAddress;
import org.onlab.packet.IpPrefix;
import org.onlab.packet.MacAddress;
import org.onlab.util.Timer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Monitors hosts on the dataplane to detect changes in host data.
@@ -43,9 +44,7 @@
* probe for hosts that have not yet been detected (specified by IP address).
*/
public class HostMonitor implements TimerTask {
-
- private static final byte[] DEFAULT_MAC_ADDRESS =
- MacAddress.valueOf("00:00:00:00:00:01").getAddress();
+ private static final Logger log = LoggerFactory.getLogger(HostMonitor.class);
private static final byte[] ZERO_MAC_ADDRESS =
MacAddress.valueOf("00:00:00:00:00:00").getAddress();
@@ -54,59 +53,77 @@
private static final byte[] BROADCAST_MAC =
MacAddress.valueOf("ff:ff:ff:ff:ff:ff").getAddress();
- private final HostService hostService;
- private final TopologyService topologyService;
- private final DeviceService deviceService;
- private final HostProvider hostProvider;
- private final PacketService packetService;
- private final HostStore hostStore;
+ private DeviceService deviceService;
+ private PacketService packetService;
+ private HostManager hostManager;
private final Set<IpAddress> monitoredAddresses;
+ private final Map<ProviderId, HostProvider> hostProviders;
+
private final long probeRate;
private final Timeout timeout;
- public HostMonitor(HostService hostService, TopologyService topologyService,
+ public HostMonitor(
DeviceService deviceService,
- HostProvider hostProvider, PacketService packetService,
- HostStore hostStore) {
- this.hostService = hostService;
- this.topologyService = topologyService;
+ PacketService packetService,
+ HostManager hostService) {
+
this.deviceService = deviceService;
- this.hostProvider = hostProvider;
this.packetService = packetService;
- this.hostStore = hostStore;
+ this.hostManager = hostService;
monitoredAddresses = new HashSet<>();
+ hostProviders = new ConcurrentHashMap<>();
probeRate = 30000; // milliseconds
timeout = Timer.getTimer().newTimeout(this, 0, TimeUnit.MILLISECONDS);
+
+ addDefaultAddresses();
}
- public void addMonitoringFor(IpAddress ip) {
+ private void addDefaultAddresses() {
+ //monitoredAddresses.add(IpAddress.valueOf("10.0.0.1"));
+ }
+
+ void addMonitoringFor(IpAddress ip) {
monitoredAddresses.add(ip);
}
- public void stopMonitoring(IpAddress ip) {
+ void stopMonitoring(IpAddress ip) {
monitoredAddresses.remove(ip);
}
- public void shutdown() {
+ void shutdown() {
timeout.cancel();
}
+ void registerHostProvider(HostProvider provider) {
+ hostProviders.put(provider.id(), provider);
+ }
+
+ void unregisterHostProvider(HostProvider provider) {
+ // TODO find out how to call this
+ }
+
@Override
public void run(Timeout timeout) throws Exception {
for (IpAddress ip : monitoredAddresses) {
- Set<Host> hosts = Collections.emptySet(); //TODO hostService.getHostsByIp(ip);
+ // TODO have to convert right now because the HostService API uses IpPrefix
+ IpPrefix prefix = IpPrefix.valueOf(ip.toOctets());
+
+ Set<Host> hosts = hostManager.getHostsByIp(prefix);
if (hosts.isEmpty()) {
sendArpRequest(ip);
} else {
for (Host host : hosts) {
- hostProvider.triggerProbe(host);
+ HostProvider provider = hostProviders.get(host.providerId());
+ if (provider != null) {
+ provider.triggerProbe(host);
+ }
}
}
}
@@ -120,29 +137,26 @@
* @param targetIp IP address to ARP for
*/
private void sendArpRequest(IpAddress targetIp) {
-
// Find ports with an IP address in the target's subnet and sent ARP
// probes out those ports.
for (Device device : deviceService.getDevices()) {
for (Port port : deviceService.getPorts(device.id())) {
ConnectPoint cp = new ConnectPoint(device.id(), port.number());
- PortAddresses addresses = hostStore.getAddressBindingsForPort(cp);
+ PortAddresses addresses = hostManager.getAddressBindingsForPort(cp);
- /*for (IpPrefix prefix : addresses.ips()) {
+ for (IpPrefix prefix : addresses.ips()) {
if (prefix.contains(targetIp)) {
- sendProbe(device.id(), port, addresses, targetIp);
+ sendProbe(device.id(), port, targetIp,
+ prefix.toIpAddress(), addresses.mac());
}
- }*/
+ }
}
}
-
- // TODO case where no address was found.
- // Broadcast out internal edge ports?
}
- private void sendProbe(DeviceId deviceId, Port port, PortAddresses portAddresses,
- IpAddress targetIp) {
- Ethernet arpPacket = createArpFor(targetIp, portAddresses);
+ private void sendProbe(DeviceId deviceId, Port port, IpAddress targetIp,
+ IpAddress sourceIp, MacAddress sourceMac) {
+ Ethernet arpPacket = buildArpRequest(targetIp, sourceIp, sourceMac);
List<Instruction> instructions = new ArrayList<>();
instructions.add(Instructions.createOutput(port.number()));
@@ -158,31 +172,26 @@
packetService.emit(outboundPacket);
}
- private Ethernet createArpFor(IpAddress targetIp, PortAddresses portAddresses) {
+ private Ethernet buildArpRequest(IpAddress targetIp, IpAddress sourceIp,
+ MacAddress sourceMac) {
ARP arp = new ARP();
arp.setHardwareType(ARP.HW_TYPE_ETHERNET)
- .setHardwareAddressLength((byte) Ethernet.DATALAYER_ADDRESS_LENGTH)
- .setProtocolType(ARP.PROTO_TYPE_IP)
- .setProtocolAddressLength((byte) IpPrefix.INET_LEN);
+ .setHardwareAddressLength((byte) Ethernet.DATALAYER_ADDRESS_LENGTH)
+ .setProtocolType(ARP.PROTO_TYPE_IP)
+ .setProtocolAddressLength((byte) IpPrefix.INET_LEN)
+ .setOpCode(ARP.OP_REQUEST);
- byte[] sourceMacAddress;
- if (portAddresses.mac() == null) {
- sourceMacAddress = DEFAULT_MAC_ADDRESS;
- } else {
- sourceMacAddress = portAddresses.mac().getAddress();
- }
-
- arp.setSenderHardwareAddress(sourceMacAddress)
- //TODO .setSenderProtocolAddress(portAddresses.ips().toOctets())
- .setTargetHardwareAddress(ZERO_MAC_ADDRESS)
- .setTargetProtocolAddress(targetIp.toOctets());
+ arp.setSenderHardwareAddress(sourceMac.getAddress())
+ .setSenderProtocolAddress(sourceIp.toOctets())
+ .setTargetHardwareAddress(ZERO_MAC_ADDRESS)
+ .setTargetProtocolAddress(targetIp.toOctets());
Ethernet ethernet = new Ethernet();
ethernet.setEtherType(Ethernet.TYPE_ARP)
- .setDestinationMACAddress(BROADCAST_MAC)
- .setSourceMACAddress(sourceMacAddress)
- .setPayload(arp);
+ .setDestinationMACAddress(BROADCAST_MAC)
+ .setSourceMACAddress(sourceMac.getAddress())
+ .setPayload(arp);
return ethernet;
}
diff --git a/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java b/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java
index 5ff72a2..0b451c0 100644
--- a/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java
+++ b/core/net/src/test/java/org/onlab/onos/net/flow/impl/FlowRuleManagerTest.java
@@ -9,7 +9,9 @@
import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_UPDATED;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
+import java.util.Set;
import org.junit.After;
import org.junit.Before;
@@ -42,6 +44,7 @@
import org.onlab.onos.net.provider.ProviderId;
import org.onlab.onos.store.trivial.impl.SimpleFlowRuleStore;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
@@ -52,6 +55,7 @@
private static final ProviderId PID = new ProviderId("of", "foo");
private static final DeviceId DID = DeviceId.deviceId("of:001");
+ private static final int TIMEOUT = 10;
private static final Device DEV = new DefaultDevice(
PID, DID, Type.SWITCH, "", "", "", "");
@@ -96,7 +100,7 @@
private FlowRule flowRule(int tsval, int trval) {
TestSelector ts = new TestSelector(tsval);
TestTreatment tr = new TestTreatment(trval);
- return new DefaultFlowRule(DID, ts, tr, 0, appId);
+ return new DefaultFlowRule(DID, ts, tr, 0, appId, TIMEOUT);
}
private FlowRule flowRule(FlowRule rule, FlowRuleState state) {
@@ -105,7 +109,8 @@
private FlowRule addFlowRule(int hval) {
FlowRule rule = flowRule(hval, hval);
- providerService.flowAdded(rule);
+ service.applyFlowRules(rule);
+
assertNotNull("rule should be found", service.getFlowEntries(DID));
return rule;
}
@@ -135,13 +140,18 @@
public void getFlowEntries() {
assertTrue("store should be empty",
Sets.newHashSet(service.getFlowEntries(DID)).isEmpty());
- addFlowRule(1);
- addFlowRule(2);
+ FlowRule f1 = addFlowRule(1);
+ FlowRule f2 = addFlowRule(2);
+
assertEquals("2 rules should exist", 2, flowCount());
+
+ providerService.pushFlowMetrics(DID, ImmutableList.of(f1, f2));
validateEvents(RULE_ADDED, RULE_ADDED);
addFlowRule(1);
assertEquals("should still be 2 rules", 2, flowCount());
+
+ providerService.pushFlowMetrics(DID, ImmutableList.of(f1));
validateEvents(RULE_UPDATED);
}
@@ -179,8 +189,10 @@
public void removeFlowRules() {
FlowRule f1 = addFlowRule(1);
FlowRule f2 = addFlowRule(2);
- addFlowRule(3);
+ FlowRule f3 = addFlowRule(3);
assertEquals("3 rules should exist", 3, flowCount());
+
+ providerService.pushFlowMetrics(DID, ImmutableList.of(f1, f2, f3));
validateEvents(RULE_ADDED, RULE_ADDED, RULE_ADDED);
FlowRule rem1 = flowRule(f1, FlowRuleState.REMOVED);
@@ -200,8 +212,9 @@
@Test
public void flowRemoved() {
FlowRule f1 = addFlowRule(1);
+ FlowRule f2 = addFlowRule(2);
+ providerService.pushFlowMetrics(f1.deviceId(), ImmutableList.of(f1, f2));
service.removeFlowRules(f1);
- addFlowRule(2);
FlowRule rem1 = flowRule(f1, FlowRuleState.REMOVED);
providerService.flowRemoved(rem1);
validateEvents(RULE_ADDED, RULE_ADDED, RULE_REMOVED);
@@ -209,9 +222,11 @@
providerService.flowRemoved(rem1);
validateEvents();
- FlowRule f3 = flowRule(flowRule(3, 3), FlowRuleState.ADDED);
- providerService.flowAdded(f3);
+ FlowRule f3 = flowRule(3, 3);
+ service.applyFlowRules(f3);
+ providerService.pushFlowMetrics(f3.deviceId(), Collections.singletonList(f3));
validateEvents(RULE_ADDED);
+
providerService.flowRemoved(f3);
validateEvents();
}
@@ -223,9 +238,10 @@
FlowRule f3 = flowRule(3, 3);
+
+ mgr.applyFlowRules(f1, f2, f3);
FlowRule updatedF1 = flowRule(f1, FlowRuleState.ADDED);
FlowRule updatedF2 = flowRule(f2, FlowRuleState.ADDED);
- mgr.applyFlowRules(f1, f2, f3);
providerService.pushFlowMetrics(DID, Lists.newArrayList(updatedF1, updatedF2));
@@ -233,7 +249,7 @@
validateState(FlowRuleState.PENDING_ADD, FlowRuleState.ADDED,
FlowRuleState.ADDED));
- validateEvents(RULE_UPDATED, RULE_UPDATED);
+ validateEvents(RULE_ADDED, RULE_ADDED);
}
@Test
@@ -241,15 +257,15 @@
FlowRule f1 = flowRule(1, 1);
FlowRule f2 = flowRule(2, 2);
FlowRule f3 = flowRule(3, 3);
+ mgr.applyFlowRules(f1, f2);
FlowRule updatedF1 = flowRule(f1, FlowRuleState.ADDED);
FlowRule updatedF2 = flowRule(f2, FlowRuleState.ADDED);
FlowRule updatedF3 = flowRule(f3, FlowRuleState.ADDED);
- mgr.applyFlowRules(f1, f2);
providerService.pushFlowMetrics(DID, Lists.newArrayList(updatedF1, updatedF2, updatedF3));
- validateEvents(RULE_UPDATED, RULE_UPDATED);
+ validateEvents(RULE_ADDED, RULE_ADDED);
}
@@ -271,7 +287,7 @@
providerService.pushFlowMetrics(DID, Lists.newArrayList(updatedF1, updatedF2));
- validateEvents(RULE_UPDATED, RULE_UPDATED, RULE_REMOVED);
+ validateEvents(RULE_ADDED, RULE_ADDED, RULE_REMOVED);
}
@@ -386,7 +402,7 @@
}
@Override
- public List<Criterion> criteria() {
+ public Set<Criterion> criteria() {
return null;
}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
index e25c964..9408cc9 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/impl/DistributedClusterStore.java
@@ -20,7 +20,7 @@
import org.onlab.onos.cluster.NodeId;
import org.onlab.onos.store.AbstractStore;
import org.onlab.onos.store.cluster.messaging.ClusterCommunicationAdminService;
-import org.onlab.onos.store.cluster.messaging.impl.OnosClusterCommunicationManager;
+import org.onlab.onos.store.cluster.messaging.impl.ClusterCommunicationManager;
import org.onlab.packet.IpPrefix;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,7 +50,7 @@
private final Map<NodeId, State> states = new ConcurrentHashMap<>();
private final Cache<NodeId, ControllerNode> livenessCache = CacheBuilder.newBuilder()
.maximumSize(1000)
- .expireAfterWrite(OnosClusterCommunicationManager.HEART_BEAT_INTERVAL_MILLIS * 3, TimeUnit.MILLISECONDS)
+ .expireAfterWrite(ClusterCommunicationManager.HEART_BEAT_INTERVAL_MILLIS * 3, TimeUnit.MILLISECONDS)
.removalListener(new LivenessCacheRemovalListener()).build();
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageHandler.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageHandler.java
index 15e756d..7ec27ec 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageHandler.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/ClusterMessageHandler.java
@@ -1,5 +1,13 @@
package org.onlab.onos.store.cluster.messaging;
+/**
+ * Interface for handling cluster messages.
+ */
public interface ClusterMessageHandler {
+
+ /**
+ * Handles/Processes the cluster message.
+ * @param message cluster message.
+ */
public void handle(ClusterMessage message);
}
\ No newline at end of file
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java
index 4c9eefa..ee8d9c1 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubject.java
@@ -2,6 +2,8 @@
/**
* Representation of a message subject.
+ * Cluster messages have associated subjects that dictate how they get handled
+ * on the receiving side.
*/
public class MessageSubject {
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubscriber.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubscriber.java
deleted file mode 100644
index 666ac6d..0000000
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/MessageSubscriber.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package org.onlab.onos.store.cluster.messaging;
-
-import org.onlab.onos.cluster.NodeId;
-
-/**
- * Represents a message consumer.
- */
-public interface MessageSubscriber {
-
- /**
- * Receives the specified cluster message.
- *
- * @param message message to be received
- * @param fromNodeId node from which the message was received
- */
- void receive(Object messagePayload, NodeId fromNodeId);
-
-}
diff --git a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/OnosClusterCommunicationManager.java b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
similarity index 99%
rename from core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/OnosClusterCommunicationManager.java
rename to core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
index e6e4a4d..d4fd9c0 100644
--- a/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/OnosClusterCommunicationManager.java
+++ b/core/store/dist/src/main/java/org/onlab/onos/store/cluster/messaging/impl/ClusterCommunicationManager.java
@@ -32,7 +32,7 @@
@Component(immediate = true)
@Service
-public class OnosClusterCommunicationManager
+public class ClusterCommunicationManager
implements ClusterCommunicationService, ClusterCommunicationAdminService {
private final Logger log = LoggerFactory.getLogger(getClass());
diff --git a/core/store/dist/src/test/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManagerTest.java b/core/store/dist/src/test/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManagerTest.java
index 44e5421..bba12f2 100644
--- a/core/store/dist/src/test/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManagerTest.java
+++ b/core/store/dist/src/test/java/org/onlab/onos/store/cluster/impl/ClusterCommunicationManagerTest.java
@@ -6,7 +6,7 @@
import org.junit.Test;
import org.onlab.onos.cluster.DefaultControllerNode;
import org.onlab.onos.cluster.NodeId;
-import org.onlab.onos.store.cluster.messaging.impl.OnosClusterCommunicationManager;
+import org.onlab.onos.store.cluster.messaging.impl.ClusterCommunicationManager;
import org.onlab.netty.NettyMessagingService;
import org.onlab.packet.IpPrefix;
@@ -29,8 +29,8 @@
private static final IpPrefix IP = IpPrefix.valueOf("127.0.0.1");
- private OnosClusterCommunicationManager ccm1;
- private OnosClusterCommunicationManager ccm2;
+ private ClusterCommunicationManager ccm1;
+ private ClusterCommunicationManager ccm2;
private TestDelegate cnd1 = new TestDelegate();
private TestDelegate cnd2 = new TestDelegate();
@@ -46,11 +46,11 @@
NettyMessagingService messagingService = new NettyMessagingService();
messagingService.activate();
- ccm1 = new OnosClusterCommunicationManager();
+ ccm1 = new ClusterCommunicationManager();
// ccm1.serializationService = messageSerializer;
ccm1.activate();
- ccm2 = new OnosClusterCommunicationManager();
+ ccm2 = new ClusterCommunicationManager();
// ccm2.serializationService = messageSerializer;
ccm2.activate();
diff --git a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
index 2f43211..d12d00e 100644
--- a/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
+++ b/core/store/trivial/src/main/java/org/onlab/onos/store/trivial/impl/SimpleFlowRuleStore.java
@@ -1,6 +1,5 @@
package org.onlab.onos.store.trivial.impl;
-import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_ADDED;
import static org.onlab.onos.net.flow.FlowRuleEvent.Type.RULE_REMOVED;
import static org.slf4j.LoggerFactory.getLogger;
@@ -116,18 +115,21 @@
DeviceId did = rule.deviceId();
// check if this new rule is an update to an existing entry
- if (flowEntries.containsEntry(did, rule)) {
- //synchronized (flowEntries) {
+ FlowRule stored = getFlowRule(rule);
+ if (stored != null) {
// Multimaps support duplicates so we have to remove our rule
// and replace it with the current version.
flowEntries.remove(did, rule);
flowEntries.put(did, rule);
- //}
+
+ if (stored.state() == FlowRuleState.PENDING_ADD) {
+ return new FlowRuleEvent(Type.RULE_ADDED, rule);
+ }
return new FlowRuleEvent(Type.RULE_UPDATED, rule);
}
flowEntries.put(did, rule);
- return new FlowRuleEvent(RULE_ADDED, rule);
+ return null;
}
@Override
@@ -140,11 +142,4 @@
}
//}
}
-
-
-
-
-
-
-
}
diff --git a/features/features.xml b/features/features.xml
index f008c14..68fa8c3 100644
--- a/features/features.xml
+++ b/features/features.xml
@@ -11,7 +11,7 @@
<bundle>mvn:io.netty/netty/3.9.2.Final</bundle>
<bundle>mvn:com.hazelcast/hazelcast/3.3</bundle>
- <bundle>mvn:com.codahale.metrics/metrics-core/3.0.2</bundle>
+ <bundle>mvn:io.dropwizard.metrics/metrics-core/3.1.0</bundle>
<bundle>mvn:com.eclipsesource.minimal-json/minimal-json/0.9.1</bundle>
<bundle>mvn:com.esotericsoftware.kryo/kryo/2.24.0</bundle>
diff --git a/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/OpenFlowControllerImpl.java b/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/OpenFlowControllerImpl.java
index eb12286..e8ebcd1 100644
--- a/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/OpenFlowControllerImpl.java
+++ b/openflow/ctl/src/main/java/org/onlab/onos/openflow/controller/impl/OpenFlowControllerImpl.java
@@ -169,7 +169,12 @@
@Override
public void setRole(Dpid dpid, RoleState role) {
- getSwitch(dpid).setRole(role);
+ final OpenFlowSwitch sw = getSwitch(dpid);
+ if (sw == null) {
+ log.debug("Switch not connected. Ignoring setRole({}, {})", dpid, role);
+ return;
+ }
+ sw.setRole(role);
}
/**
diff --git a/pom.xml b/pom.xml
index 56bbd74..cb00f32 100644
--- a/pom.xml
+++ b/pom.xml
@@ -248,6 +248,11 @@
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>commons-pool</groupId>
+ <artifactId>commons-pool</artifactId>
+ <version>1.6</version>
+ </dependency>
</dependencies>
</dependencyManagement>
diff --git a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java
index 86ab701..ade651e 100644
--- a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java
+++ b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowModBuilder.java
@@ -77,7 +77,6 @@
.setCookie(U64.of(cookie.value()))
.setBufferId(OFBufferId.NO_BUFFER)
.setActions(actions)
- .setIdleTimeout(10)
.setMatch(match)
.setFlags(Collections.singleton(OFFlowModFlags.SEND_FLOW_REM))
.setPriority(priority)
diff --git a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowRuleBuilder.java b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowRuleBuilder.java
index ac00f05..eba2282 100644
--- a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowRuleBuilder.java
+++ b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/FlowRuleBuilder.java
@@ -71,7 +71,7 @@
buildSelector(), buildTreatment(), stat.getPriority(),
FlowRuleState.ADDED, stat.getDurationNsec() / 1000000,
stat.getPacketCount().getValue(), stat.getByteCount().getValue(),
- stat.getCookie().getValue(), false);
+ stat.getCookie().getValue(), false, stat.getIdleTimeout());
} else {
// TODO: revisit potentially.
return new DefaultFlowRule(DeviceId.deviceId(Dpid.uri(dpid)),
@@ -79,7 +79,8 @@
FlowRuleState.REMOVED, removed.getDurationNsec() / 1000000,
removed.getPacketCount().getValue(), removed.getByteCount().getValue(),
removed.getCookie().getValue(),
- removed.getReason() == OFFlowRemovedReason.IDLE_TIMEOUT.ordinal());
+ removed.getReason() == OFFlowRemovedReason.IDLE_TIMEOUT.ordinal(),
+ stat.getIdleTimeout());
}
}
diff --git a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
index bf29ae4..24a7ea8 100644
--- a/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
+++ b/providers/openflow/flow/src/main/java/org/onlab/onos/provider/of/flow/impl/OpenFlowRuleProvider.java
@@ -127,7 +127,7 @@
@Override
public void switchAdded(Dpid dpid) {
- FlowStatsCollector fsc = new FlowStatsCollector(controller.getSwitch(dpid), 5);
+ FlowStatsCollector fsc = new FlowStatsCollector(controller.getSwitch(dpid), POLL_INTERVAL);
fsc.start();
collectors.put(dpid, fsc);
}
diff --git a/tools/build/conf/src/main/resources/onos/checkstyle.xml b/tools/build/conf/src/main/resources/onos/checkstyle.xml
index 06413aa..dad602d 100644
--- a/tools/build/conf/src/main/resources/onos/checkstyle.xml
+++ b/tools/build/conf/src/main/resources/onos/checkstyle.xml
@@ -176,7 +176,7 @@
</module>
<module name="ParameterNumber">
- <property name="max" value="10"/>
+ <property name="max" value="15"/>
<property name="tokens" value="CTOR_DEF"/>
</module>
<!-- Checks for whitespace -->
diff --git a/utils/misc/pom.xml b/utils/misc/pom.xml
index bb25635..bd3cc08 100644
--- a/utils/misc/pom.xml
+++ b/utils/misc/pom.xml
@@ -56,9 +56,13 @@
<artifactId>objenesis</artifactId>
</dependency>
<dependency>
- <groupId>com.codahale.metrics</groupId>
+ <groupId>io.dropwizard.metrics</groupId>
<artifactId>metrics-core</artifactId>
- <version>3.0.2</version>
+ <version>3.1.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.felix</groupId>
+ <artifactId>org.apache.felix.scr.annotations</artifactId>
</dependency>
</dependencies>
diff --git a/utils/misc/src/main/java/org/onlab/metrics/MetricsManager.java b/utils/misc/src/main/java/org/onlab/metrics/MetricsManager.java
index 2b13efb..e07d3f9 100644
--- a/utils/misc/src/main/java/org/onlab/metrics/MetricsManager.java
+++ b/utils/misc/src/main/java/org/onlab/metrics/MetricsManager.java
@@ -1,10 +1,18 @@
package org.onlab.metrics;
+import java.io.File;
+import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
import com.codahale.metrics.Counter;
+import com.codahale.metrics.CsvReporter;
import com.codahale.metrics.Gauge;
import com.codahale.metrics.Histogram;
import com.codahale.metrics.Meter;
@@ -45,24 +53,44 @@
* </code>
* </pre>
*/
+@Component(immediate = true)
public final class MetricsManager implements MetricsService {
/**
* Registry to hold the Components defined in the system.
*/
- private ConcurrentMap<String, MetricsComponent> componentsRegistry =
- new ConcurrentHashMap<>();
+ private ConcurrentMap<String, MetricsComponent> componentsRegistry;
/**
* Registry for the Metrics objects created in the system.
*/
- private final MetricRegistry metricsRegistry = new MetricRegistry();
+ private final MetricRegistry metricsRegistry;
/**
- * Hide constructor. The only way to get the registry is through the
- * singleton getter.
+ * Default Reporter for this metrics manager.
*/
- private MetricsManager() {}
+ private final CsvReporter reporter;
+
+ public MetricsManager() {
+ this.componentsRegistry = new ConcurrentHashMap<>();
+ this.metricsRegistry = new MetricRegistry();
+
+ this.reporter = CsvReporter.forRegistry(metricsRegistry)
+ .formatFor(Locale.US)
+ .convertRatesTo(TimeUnit.SECONDS)
+ .convertDurationsTo(TimeUnit.MICROSECONDS)
+ .build(new File("/tmp/"));
+
+ reporter.start(10, TimeUnit.SECONDS);
+ }
+
+ @Activate
+ public void activate() {
+ }
+
+ @Deactivate
+ public void deactivate() {
+ }
/**
* Registers a component.
diff --git a/utils/misc/src/main/java/org/onlab/packet/IpPrefix.java b/utils/misc/src/main/java/org/onlab/packet/IpPrefix.java
index b205f90..84acb82 100644
--- a/utils/misc/src/main/java/org/onlab/packet/IpPrefix.java
+++ b/utils/misc/src/main/java/org/onlab/packet/IpPrefix.java
@@ -250,6 +250,17 @@
return new IpPrefix(version, host, netmask);
}
+ /**
+ * Returns an IpAddress of the bytes contained in this prefix.
+ * FIXME this is a hack for now and only works because IpPrefix doesn't
+ * mask the input bytes on creation.
+ *
+ * @return the IpAddress
+ */
+ public IpAddress toIpAddress() {
+ return IpAddress.valueOf(octets);
+ }
+
public boolean isMasked() {
return mask() != 0;
}
@@ -278,6 +289,17 @@
return false;
}
+ public boolean contains(IpAddress address) {
+ // Need to get the network address because prefixes aren't automatically
+ // masked on creation
+ IpPrefix meMasked = network();
+
+ IpPrefix otherMasked =
+ IpPrefix.valueOf(address.octets, netmask).network();
+
+ return Arrays.equals(meMasked.octets, otherMasked.octets);
+ }
+
@Override
public int hashCode() {
final int prime = 31;
@@ -303,6 +325,7 @@
if (netmask != other.netmask) {
return false;
}
+ // TODO not quite right until we mask the input
if (!Arrays.equals(octets, other.octets)) {
return false;
}
diff --git a/utils/misc/src/test/java/org/onlab/packet/IpPrefixTest.java b/utils/misc/src/test/java/org/onlab/packet/IpPrefixTest.java
index f6bf6f1..297a0f3 100644
--- a/utils/misc/src/test/java/org/onlab/packet/IpPrefixTest.java
+++ b/utils/misc/src/test/java/org/onlab/packet/IpPrefixTest.java
@@ -76,7 +76,7 @@
}
@Test
- public void testContains() {
+ public void testContainsIpPrefix() {
IpPrefix slash31 = IpPrefix.valueOf(BYTES1, 31);
IpPrefix slash32 = IpPrefix.valueOf(BYTES1, 32);
IpPrefix differentSlash32 = IpPrefix.valueOf(BYTES2, 32);
@@ -96,4 +96,17 @@
assertTrue(slash8.contains(slash31));
assertFalse(slash31.contains(slash8));
}
+
+ @Test
+ public void testContainsIpAddress() {
+ IpPrefix slash31 = IpPrefix.valueOf(BYTES1, 31);
+ IpAddress slash32 = IpAddress.valueOf(BYTES1, 32);
+
+ assertTrue(slash31.contains(slash32));
+
+ IpPrefix intf = IpPrefix.valueOf("192.168.10.101/24");
+ IpAddress addr = IpAddress.valueOf("192.168.10.1");
+
+ assertTrue(intf.contains(addr));
+ }
}
diff --git a/utils/netty/pom.xml b/utils/netty/pom.xml
index d335117..a980d1d 100644
--- a/utils/netty/pom.xml
+++ b/utils/netty/pom.xml
@@ -42,7 +42,6 @@
<dependency>
<groupId>commons-pool</groupId>
<artifactId>commons-pool</artifactId>
- <version>1.6</version>
</dependency>
</dependencies>
diff --git a/utils/netty/src/main/java/org/onlab/netty/AsyncResponse.java b/utils/netty/src/main/java/org/onlab/netty/AsyncResponse.java
new file mode 100644
index 0000000..b2b490e
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/AsyncResponse.java
@@ -0,0 +1,68 @@
+package org.onlab.netty;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * An asynchronous response.
+ * This class provides a base implementation of Response, with methods to retrieve the
+ * result and query to see if the result is ready. The result can only be retrieved when
+ * it is ready and the get methods will block if the result is not ready yet.
+ * @param <T> type of response.
+ */
+public class AsyncResponse<T> implements Response<T> {
+
+ private T value;
+ private boolean done = false;
+ private final long start = System.nanoTime();
+
+ @Override
+ public T get(long timeout, TimeUnit tu) throws TimeoutException {
+ timeout = tu.toNanos(timeout);
+ boolean interrupted = false;
+ try {
+ synchronized (this) {
+ while (!done) {
+ try {
+ long timeRemaining = timeout - (System.nanoTime() - start);
+ if (timeRemaining <= 0) {
+ throw new TimeoutException("Operation timed out.");
+ }
+ TimeUnit.NANOSECONDS.timedWait(this, timeRemaining);
+ } catch (InterruptedException e) {
+ interrupted = true;
+ }
+ }
+ }
+ } finally {
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ return value;
+ }
+
+ @Override
+ public T get() throws InterruptedException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isReady() {
+ return done;
+ }
+
+ /**
+ * Sets response value and unblocks any thread blocking on the response to become
+ * available.
+ * @param data response data.
+ */
+ @SuppressWarnings("unchecked")
+ public synchronized void setResponse(Object data) {
+ if (!done) {
+ done = true;
+ value = (T) data;
+ this.notifyAll();
+ }
+ }
+}
diff --git a/utils/netty/src/main/java/org/onlab/netty/EchoHandler.java b/utils/netty/src/main/java/org/onlab/netty/EchoHandler.java
new file mode 100644
index 0000000..313a448
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/EchoHandler.java
@@ -0,0 +1,15 @@
+package org.onlab.netty;
+
+import java.io.IOException;
+
+/**
+ * Message handler that echos the message back to the sender.
+ */
+public class EchoHandler implements MessageHandler {
+
+ @Override
+ public void handle(Message message) throws IOException {
+ System.out.println("Received: " + message.payload() + ". Echoing it back to the sender.");
+ message.respond(message.payload());
+ }
+}
diff --git a/utils/netty/src/main/java/org/onlab/netty/Endpoint.java b/utils/netty/src/main/java/org/onlab/netty/Endpoint.java
new file mode 100644
index 0000000..8681093
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/Endpoint.java
@@ -0,0 +1,62 @@
+package org.onlab.netty;
+
+/**
+ * Representation of a TCP/UDP communication end point.
+ */
+public class Endpoint {
+
+ private final int port;
+ private final String host;
+
+ public Endpoint(String host, int port) {
+ this.host = host;
+ this.port = port;
+ }
+
+ public String host() {
+ return host;
+ }
+
+ public int port() {
+ return port;
+ }
+
+ @Override
+ public String toString() {
+ return "Endpoint [port=" + port + ", host=" + host + "]";
+ }
+
+ @Override
+ public int hashCode() {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((host == null) ? 0 : host.hashCode());
+ result = prime * result + port;
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null) {
+ return false;
+ }
+ if (getClass() != obj.getClass()) {
+ return false;
+ }
+ Endpoint other = (Endpoint) obj;
+ if (host == null) {
+ if (other.host != null) {
+ return false;
+ }
+ } else if (!host.equals(other.host)) {
+ return false;
+ }
+ if (port != other.port) {
+ return false;
+ }
+ return true;
+ }
+}
diff --git a/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java b/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
new file mode 100644
index 0000000..bcf6f52
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/InternalMessage.java
@@ -0,0 +1,85 @@
+package org.onlab.netty;
+
+import java.io.IOException;
+
+/**
+ * Internal message representation with additional attributes
+ * for supporting, synchronous request/reply behavior.
+ */
+public final class InternalMessage implements Message {
+
+ private long id;
+ private Endpoint sender;
+ private String type;
+ private Object payload;
+ private transient NettyMessagingService messagingService;
+ public static final String REPLY_MESSAGE_TYPE = "NETTY_MESSAGIG_REQUEST_REPLY";
+
+ // Must be created using the Builder.
+ private InternalMessage() {}
+
+ public long id() {
+ return id;
+ }
+
+ public String type() {
+ return type;
+ }
+
+ public Endpoint sender() {
+ return sender;
+ }
+
+ @Override
+ public Object payload() {
+ return payload;
+ }
+
+ @Override
+ public void respond(Object data) throws IOException {
+ Builder builder = new Builder(messagingService);
+ InternalMessage message = builder.withId(this.id)
+ // FIXME: Sender should be messagingService.localEp.
+ .withSender(this.sender)
+ .withPayload(data)
+ .withType(REPLY_MESSAGE_TYPE)
+ .build();
+ messagingService.sendAsync(sender, message);
+ }
+
+
+ /**
+ * Builder for InternalMessages.
+ */
+ public static class Builder {
+ private InternalMessage message;
+
+ public Builder(NettyMessagingService messagingService) {
+ message = new InternalMessage();
+ message.messagingService = messagingService;
+ }
+
+ public Builder withId(long id) {
+ message.id = id;
+ return this;
+ }
+
+ public Builder withType(String type) {
+ message.type = type;
+ return this;
+ }
+
+ public Builder withSender(Endpoint sender) {
+ message.sender = sender;
+ return this;
+ }
+ public Builder withPayload(Object payload) {
+ message.payload = payload;
+ return this;
+ }
+
+ public InternalMessage build() {
+ return message;
+ }
+ }
+}
diff --git a/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java b/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
new file mode 100644
index 0000000..73c01a0
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/KryoSerializer.java
@@ -0,0 +1,47 @@
+package org.onlab.netty;
+
+import org.onlab.util.KryoPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+
+/**
+ * Kryo Serializer.
+ */
+public class KryoSerializer implements Serializer {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private KryoPool serializerPool;
+
+ public KryoSerializer() {
+ setupKryoPool();
+ }
+
+ /**
+ * Sets up the common serialzers pool.
+ */
+ protected void setupKryoPool() {
+ // FIXME Slice out types used in common to separate pool/namespace.
+ serializerPool = KryoPool.newBuilder()
+ .register(ArrayList.class,
+ HashMap.class,
+ ArrayList.class
+ )
+ .build()
+ .populate(1);
+ }
+
+
+ @Override
+ public Object decode(byte[] data) {
+ return serializerPool.deserialize(data);
+ }
+
+ @Override
+ public byte[] encode(Object payload) {
+ return serializerPool.serialize(payload);
+ }
+}
diff --git a/utils/netty/src/main/java/org/onlab/netty/LoggingHandler.java b/utils/netty/src/main/java/org/onlab/netty/LoggingHandler.java
new file mode 100644
index 0000000..ed6cdb4
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/LoggingHandler.java
@@ -0,0 +1,12 @@
+package org.onlab.netty;
+
+/**
+ * A MessageHandler that simply logs the information.
+ */
+public class LoggingHandler implements MessageHandler {
+
+ @Override
+ public void handle(Message message) {
+ System.out.println("Received: " + message.payload());
+ }
+}
diff --git a/utils/netty/src/main/java/org/onlab/netty/Message.java b/utils/netty/src/main/java/org/onlab/netty/Message.java
new file mode 100644
index 0000000..54b9526
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/Message.java
@@ -0,0 +1,23 @@
+package org.onlab.netty;
+
+import java.io.IOException;
+
+/**
+ * A unit of communication.
+ * Has a payload. Also supports a feature to respond back to the sender.
+ */
+public interface Message {
+
+ /**
+ * Returns the payload of this message.
+ * @return message payload.
+ */
+ public Object payload();
+
+ /**
+ * Sends a reply back to the sender of this messge.
+ * @param data payload of the response.
+ * @throws IOException if there is a communication error.
+ */
+ public void respond(Object data) throws IOException;
+}
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
new file mode 100644
index 0000000..ecf2d62
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageDecoder.java
@@ -0,0 +1,58 @@
+package org.onlab.netty;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkState;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+
+/**
+ * Decode bytes into a InternalMessage.
+ */
+public class MessageDecoder extends ByteToMessageDecoder {
+
+ private final NettyMessagingService messagingService;
+ private final Serializer serializer;
+
+ public MessageDecoder(NettyMessagingService messagingService, Serializer serializer) {
+ this.messagingService = messagingService;
+ this.serializer = serializer;
+ }
+
+ @Override
+ protected void decode(ChannelHandlerContext context, ByteBuf in,
+ List<Object> messages) throws Exception {
+
+ byte[] preamble = in.readBytes(MessageEncoder.PREAMBLE.length).array();
+ checkState(Arrays.equals(MessageEncoder.PREAMBLE, preamble), "Message has wrong preamble");
+
+ // read message Id.
+ long id = in.readLong();
+
+ // read message type; first read size and then bytes.
+ String type = new String(in.readBytes(in.readInt()).array());
+
+ // read sender host name; first read size and then bytes.
+ String host = new String(in.readBytes(in.readInt()).array());
+
+ // read sender port.
+ int port = in.readInt();
+
+ Endpoint sender = new Endpoint(host, port);
+
+ // read message payload; first read size and then bytes.
+ Object payload = serializer.decode(in.readBytes(in.readInt()).array());
+
+ InternalMessage message = new InternalMessage.Builder(messagingService)
+ .withId(id)
+ .withSender(sender)
+ .withType(type)
+ .withPayload(payload)
+ .build();
+
+ messages.add(message);
+ }
+}
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
new file mode 100644
index 0000000..1b52a0f
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageEncoder.java
@@ -0,0 +1,60 @@
+package org.onlab.netty;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToByteEncoder;
+
+/**
+ * Encode InternalMessage out into a byte buffer.
+ */
+public class MessageEncoder extends MessageToByteEncoder<InternalMessage> {
+
+ // onosiscool in ascii
+ public static final byte[] PREAMBLE = "onosiscool".getBytes();
+
+ private final Serializer serializer;
+
+ public MessageEncoder(Serializer serializer) {
+ this.serializer = serializer;
+ }
+
+ @Override
+ protected void encode(ChannelHandlerContext context, InternalMessage message,
+ ByteBuf out) throws Exception {
+
+ // write preamble
+ out.writeBytes(PREAMBLE);
+
+ // write id
+ out.writeLong(message.id());
+
+ // write type length
+ out.writeInt(message.type().length());
+
+ // write type
+ out.writeBytes(message.type().getBytes());
+
+ // write sender host name size
+ out.writeInt(message.sender().host().length());
+
+ // write sender host name.
+ out.writeBytes(message.sender().host().getBytes());
+
+ // write port
+ out.writeInt(message.sender().port());
+
+ try {
+ serializer.encode(message.payload());
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ byte[] payload = serializer.encode(message.payload());
+
+ // write payload length.
+ out.writeInt(payload.length);
+
+ // write payload bytes
+ out.writeBytes(payload);
+ }
+}
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessageHandler.java b/utils/netty/src/main/java/org/onlab/netty/MessageHandler.java
new file mode 100644
index 0000000..7bd5a7f
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/MessageHandler.java
@@ -0,0 +1,16 @@
+package org.onlab.netty;
+
+import java.io.IOException;
+
+/**
+ * Handler for a message.
+ */
+public interface MessageHandler {
+
+ /**
+ * Handles the message.
+ * @param message message.
+ * @throws IOException.
+ */
+ public void handle(Message message) throws IOException;
+}
diff --git a/utils/netty/src/main/java/org/onlab/netty/MessagingService.java b/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
new file mode 100644
index 0000000..ebad442
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/MessagingService.java
@@ -0,0 +1,41 @@
+package org.onlab.netty;
+
+import java.io.IOException;
+
+/**
+ * Interface for low level messaging primitives.
+ */
+public interface MessagingService {
+ /**
+ * Sends a message asynchronously to the specified communication end point.
+ * The message is specified using the type and payload.
+ * @param ep end point to send the message to.
+ * @param type type of message.
+ * @param payload message payload.
+ * @throws IOException
+ */
+ public void sendAsync(Endpoint ep, String type, Object payload) throws IOException;
+
+ /**
+ * Sends a message synchronously and waits for a response.
+ * @param ep end point to send the message to.
+ * @param type type of message.
+ * @param payload message payload.
+ * @return a response future
+ * @throws IOException
+ */
+ public <T> Response<T> sendAndReceive(Endpoint ep, String type, Object payload) throws IOException;
+
+ /**
+ * Registers a new message handler for message type.
+ * @param type message type.
+ * @param handler message handler
+ */
+ public void registerHandler(String type, MessageHandler handler);
+
+ /**
+ * Unregister current handler, if one exists for message type.
+ * @param type message type
+ */
+ public void unregisterHandler(String type);
+}
diff --git a/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
new file mode 100644
index 0000000..54da8cc
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/NettyMessagingService.java
@@ -0,0 +1,244 @@
+package org.onlab.netty;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+
+import org.apache.commons.lang.math.RandomUtils;
+import org.apache.commons.pool.KeyedObjectPool;
+import org.apache.commons.pool.KeyedPoolableObjectFactory;
+import org.apache.commons.pool.impl.GenericKeyedObjectPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+
+/**
+ * A Netty based implementation of MessagingService.
+ */
+public class NettyMessagingService implements MessagingService {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private KeyedObjectPool<Endpoint, Channel> channels =
+ new GenericKeyedObjectPool<Endpoint, Channel>(new OnosCommunicationChannelFactory());
+ private final int port;
+ private final EventLoopGroup bossGroup = new NioEventLoopGroup();
+ private final EventLoopGroup workerGroup = new NioEventLoopGroup();
+ private final ConcurrentMap<String, MessageHandler> handlers = new ConcurrentHashMap<>();
+ private Cache<Long, AsyncResponse<?>> responseFutures;
+ private final Endpoint localEp;
+
+ protected Serializer serializer;
+
+ public NettyMessagingService() {
+ // TODO: Default port should be configurable.
+ this(8080);
+ }
+
+ // FIXME: Constructor should not throw exceptions.
+ public NettyMessagingService(int port) {
+ this.port = port;
+ try {
+ localEp = new Endpoint(java.net.InetAddress.getLocalHost().getHostName(), port);
+ } catch (UnknownHostException e) {
+ // bailing out.
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void activate() throws Exception {
+ responseFutures = CacheBuilder.newBuilder()
+ .maximumSize(100000)
+ .weakValues()
+ // TODO: Once the entry expires, notify blocking threads (if any).
+ .expireAfterWrite(10, TimeUnit.MINUTES)
+ .build();
+ startAcceptingConnections();
+ }
+
+ public void deactivate() throws Exception {
+ channels.close();
+ bossGroup.shutdownGracefully();
+ workerGroup.shutdownGracefully();
+ }
+
+ @Override
+ public void sendAsync(Endpoint ep, String type, Object payload) throws IOException {
+ InternalMessage message = new InternalMessage.Builder(this)
+ .withId(RandomUtils.nextLong())
+ .withSender(localEp)
+ .withType(type)
+ .withPayload(payload)
+ .build();
+ sendAsync(ep, message);
+ }
+
+ protected void sendAsync(Endpoint ep, InternalMessage message) throws IOException {
+ Channel channel = null;
+ try {
+ channel = channels.borrowObject(ep);
+ channel.eventLoop().execute(new WriteTask(channel, message));
+ } catch (Exception e) {
+ throw new IOException(e);
+ } finally {
+ try {
+ channels.returnObject(ep, channel);
+ } catch (Exception e) {
+ log.warn("Error returning object back to the pool", e);
+ // ignored.
+ }
+ }
+ }
+
+ @Override
+ public <T> Response<T> sendAndReceive(Endpoint ep, String type, Object payload)
+ throws IOException {
+ AsyncResponse<T> futureResponse = new AsyncResponse<T>();
+ Long messageId = RandomUtils.nextLong();
+ responseFutures.put(messageId, futureResponse);
+ InternalMessage message = new InternalMessage.Builder(this)
+ .withId(messageId)
+ .withSender(localEp)
+ .withType(type)
+ .withPayload(payload)
+ .build();
+ sendAsync(ep, message);
+ return futureResponse;
+ }
+
+ @Override
+ public void registerHandler(String type, MessageHandler handler) {
+ // TODO: Is this the right semantics for handler registration?
+ handlers.putIfAbsent(type, handler);
+ }
+
+ public void unregisterHandler(String type) {
+ handlers.remove(type);
+ }
+
+ private MessageHandler getMessageHandler(String type) {
+ return handlers.get(type);
+ }
+
+ private void startAcceptingConnections() throws InterruptedException {
+ ServerBootstrap b = new ServerBootstrap();
+ b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+ b.group(bossGroup, workerGroup)
+ .channel(NioServerSocketChannel.class)
+ .childHandler(new OnosCommunicationChannelInitializer())
+ .option(ChannelOption.SO_BACKLOG, 128)
+ .childOption(ChannelOption.SO_KEEPALIVE, true);
+
+ // Bind and start to accept incoming connections.
+ b.bind(port).sync();
+ }
+
+ private class OnosCommunicationChannelFactory
+ implements KeyedPoolableObjectFactory<Endpoint, Channel> {
+
+ @Override
+ public void activateObject(Endpoint endpoint, Channel channel)
+ throws Exception {
+ }
+
+ @Override
+ public void destroyObject(Endpoint ep, Channel channel) throws Exception {
+ channel.close();
+ }
+
+ @Override
+ public Channel makeObject(Endpoint ep) throws Exception {
+ Bootstrap b = new Bootstrap();
+ b.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
+ b.group(workerGroup);
+ // TODO: Make this faster:
+ // http://normanmaurer.me/presentations/2014-facebook-eng-netty/slides.html#37.0
+ b.channel(NioSocketChannel.class);
+ b.option(ChannelOption.SO_KEEPALIVE, true);
+ b.handler(new OnosCommunicationChannelInitializer());
+
+ // Start the client.
+ ChannelFuture f = b.connect(ep.host(), ep.port()).sync();
+ return f.channel();
+ }
+
+ @Override
+ public void passivateObject(Endpoint ep, Channel channel)
+ throws Exception {
+ }
+
+ @Override
+ public boolean validateObject(Endpoint ep, Channel channel) {
+ return channel.isOpen();
+ }
+ }
+
+ private class OnosCommunicationChannelInitializer extends ChannelInitializer<SocketChannel> {
+
+ @Override
+ protected void initChannel(SocketChannel channel) throws Exception {
+ channel.pipeline()
+ .addLast(new MessageEncoder(serializer))
+ .addLast(new MessageDecoder(NettyMessagingService.this, serializer))
+ .addLast(new NettyMessagingService.InboundMessageDispatcher());
+ }
+ }
+
+ private class WriteTask implements Runnable {
+
+ private final Object message;
+ private final Channel channel;
+
+ public WriteTask(Channel channel, Object message) {
+ this.message = message;
+ this.channel = channel;
+ }
+
+ @Override
+ public void run() {
+ channel.writeAndFlush(message);
+ }
+ }
+
+ private class InboundMessageDispatcher extends SimpleChannelInboundHandler<InternalMessage> {
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext ctx, InternalMessage message) throws Exception {
+ String type = message.type();
+ if (type.equals(InternalMessage.REPLY_MESSAGE_TYPE)) {
+ try {
+ AsyncResponse<?> futureResponse =
+ NettyMessagingService.this.responseFutures.getIfPresent(message.id());
+ if (futureResponse != null) {
+ futureResponse.setResponse(message.payload());
+ }
+ log.warn("Received a reply. But was unable to locate the request handle");
+ } finally {
+ NettyMessagingService.this.responseFutures.invalidate(message.id());
+ }
+ return;
+ }
+ MessageHandler handler = NettyMessagingService.this.getMessageHandler(type);
+ handler.handle(message);
+ }
+ }
+}
diff --git a/utils/netty/src/main/java/org/onlab/netty/Response.java b/utils/netty/src/main/java/org/onlab/netty/Response.java
new file mode 100644
index 0000000..04675ce
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/Response.java
@@ -0,0 +1,36 @@
+package org.onlab.netty;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Response object returned when making synchronous requests.
+ * Can you used to check is a response is ready and/or wait for a response
+ * to become available.
+ *
+ * @param <T> type of response.
+ */
+public interface Response<T> {
+
+ /**
+ * Gets the response waiting for a designated timeout period.
+ * @param timeout timeout period (since request was sent out)
+ * @param tu unit of time.
+ * @return response
+ * @throws TimeoutException if the timeout expires before the response arrives.
+ */
+ public T get(long timeout, TimeUnit tu) throws TimeoutException;
+
+ /**
+ * Gets the response waiting for indefinite timeout period.
+ * @return response
+ * @throws InterruptedException if the thread is interrupted before the response arrives.
+ */
+ public T get() throws InterruptedException;
+
+ /**
+ * Checks if the response is ready without blocking.
+ * @return true if response is ready, false otherwise.
+ */
+ public boolean isReady();
+}
diff --git a/utils/netty/src/main/java/org/onlab/netty/Serializer.java b/utils/netty/src/main/java/org/onlab/netty/Serializer.java
new file mode 100644
index 0000000..ac55f5a
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/Serializer.java
@@ -0,0 +1,24 @@
+package org.onlab.netty;
+
+/**
+ * Interface for encoding/decoding message payloads.
+ */
+public interface Serializer {
+
+ /**
+ * Decodes the specified byte array to a POJO.
+ *
+ * @param data byte array.
+ * @return POJO
+ */
+ Object decode(byte[] data);
+
+ /**
+ * Encodes the specified POJO into a byte array.
+ *
+ * @param data POJO to be encoded
+ * @return byte array.
+ */
+ byte[] encode(Object message);
+
+}
diff --git a/utils/netty/src/main/java/org/onlab/netty/SimpleClient.java b/utils/netty/src/main/java/org/onlab/netty/SimpleClient.java
new file mode 100644
index 0000000..1573780
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/SimpleClient.java
@@ -0,0 +1,24 @@
+package org.onlab.netty;
+
+import java.util.concurrent.TimeUnit;
+
+public final class SimpleClient {
+ private SimpleClient() {}
+
+ public static void main(String... args) throws Exception {
+ NettyMessagingService messaging = new TestNettyMessagingService(9081);
+ messaging.activate();
+
+ messaging.sendAsync(new Endpoint("localhost", 8080), "simple", "Hello World");
+ Response<String> response = messaging.sendAndReceive(new Endpoint("localhost", 8080), "echo", "Hello World");
+ System.out.println("Got back:" + response.get(2, TimeUnit.SECONDS));
+ }
+
+ public static class TestNettyMessagingService extends NettyMessagingService {
+ public TestNettyMessagingService(int port) throws Exception {
+ super(port);
+ Serializer serializer = new KryoSerializer();
+ this.serializer = serializer;
+ }
+ }
+}
diff --git a/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java b/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java
new file mode 100644
index 0000000..12fa025
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/SimpleServer.java
@@ -0,0 +1,19 @@
+package org.onlab.netty;
+
+public final class SimpleServer {
+ private SimpleServer() {}
+
+ public static void main(String... args) throws Exception {
+ NettyMessagingService server = new TestNettyMessagingService();
+ server.activate();
+ server.registerHandler("simple", new LoggingHandler());
+ server.registerHandler("echo", new EchoHandler());
+ }
+
+ public static class TestNettyMessagingService extends NettyMessagingService {
+ protected TestNettyMessagingService() {
+ Serializer serializer = new KryoSerializer();
+ this.serializer = serializer;
+ }
+ }
+}
diff --git a/utils/netty/src/main/java/org/onlab/netty/package-info.java b/utils/netty/src/main/java/org/onlab/netty/package-info.java
new file mode 100644
index 0000000..b1b90a3
--- /dev/null
+++ b/utils/netty/src/main/java/org/onlab/netty/package-info.java
@@ -0,0 +1,4 @@
+/**
+ * Asynchronous messaging APIs implemented using the Netty framework.
+ */
+package org.onlab.netty;
\ No newline at end of file