Began implementing IPC with Hazelcast in order to broadcast ARP via other ONOS nodes
diff --git a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
index d9fb7c3..7f8a7a7 100644
--- a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
+++ b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
@@ -5,12 +5,10 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.Set;
-
-import com.esotericsoftware.kryo2.Kryo;
-import com.esotericsoftware.kryo2.io.Input;
-import com.esotericsoftware.kryo2.io.Output;
+import java.util.concurrent.TimeUnit;
import net.floodlightcontroller.core.IFloodlightProviderService;
import net.floodlightcontroller.core.module.FloodlightModuleContext;
@@ -18,9 +16,9 @@
import net.floodlightcontroller.core.module.IFloodlightModule;
import net.floodlightcontroller.core.module.IFloodlightService;
import net.floodlightcontroller.restserver.IRestApiService;
-
import net.onrc.onos.datagrid.web.DatagridWebRoutable;
import net.onrc.onos.ofcontroller.flowmanager.IFlowEventHandlerService;
+import net.onrc.onos.ofcontroller.proxyarp.IArpEventHandler;
import net.onrc.onos.ofcontroller.topology.TopologyElement;
import net.onrc.onos.ofcontroller.util.FlowEntry;
import net.onrc.onos.ofcontroller.util.FlowEntryId;
@@ -28,9 +26,13 @@
import net.onrc.onos.ofcontroller.util.FlowPath;
import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
+import org.openflow.util.HexString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.esotericsoftware.kryo2.Kryo;
+import com.esotericsoftware.kryo2.io.Input;
+import com.esotericsoftware.kryo2.io.Output;
import com.hazelcast.config.Config;
import com.hazelcast.config.FileSystemXmlConfig;
import com.hazelcast.core.EntryEvent;
@@ -76,6 +78,12 @@
private IMap<String, byte[]> mapTopology = null;
private MapTopologyListener mapTopologyListener = null;
private String mapTopologyListenerId = null;
+
+ // State related to the ARP map
+ protected static final String arpMapName = "arpMap";
+ private IMap<byte[], byte[]> arpMap = null;
+ private List<IArpEventHandler> arpEventHandlers = new ArrayList<IArpEventHandler>();
+ private final byte[] dummyByte = {0};
/**
* Class for receiving notifications for Flow state.
@@ -322,6 +330,91 @@
// NOTE: We don't use eviction for this map
}
}
+
+ /**
+ * Class for receiving notifications for ARP requests.
+ *
+ * The datagrid map is:
+ * - Key: Request ID (String)
+ * - Value: ARP request packet (byte[])
+ */
+ class ArpMapListener implements EntryListener<byte[], byte[]> {
+ /**
+ * Receive a notification that an entry is added.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryAdded(EntryEvent<byte[], byte[]> event) {
+ for (IArpEventHandler arpEventHandler : arpEventHandlers) {
+ arpEventHandler.arpRequestNotification(event.getKey());
+ }
+
+ //
+ // Decode the value and deliver the notification
+ //
+ /*
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ TopologyElement topologyElement =
+ kryo.readObject(input, TopologyElement.class);
+ kryoFactory.deleteKryo(kryo);
+ flowEventHandlerService.notificationRecvTopologyElementAdded(topologyElement);
+ */
+ }
+
+ /**
+ * Receive a notification that an entry is removed.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryRemoved(EntryEvent<byte[], byte[]> event) {
+ /*
+ String keyString = (String)event.getKey();
+ byte[] valueBytes = (byte[])event.getValue();
+
+ //
+ // Decode the value and deliver the notification
+ //
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ TopologyElement topologyElement =
+ kryo.readObject(input, TopologyElement.class);
+ kryoFactory.deleteKryo(kryo);
+ flowEventHandlerService.notificationRecvTopologyElementRemoved(topologyElement);
+ */
+ }
+
+ /**
+ * Receive a notification that an entry is updated.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryUpdated(EntryEvent<byte[], byte[]> event) {
+ /*
+ String keyString = (String)event.getKey();
+ byte[] valueBytes = (byte[])event.getValue();
+
+ //
+ // Decode the value and deliver the notification
+ //
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ TopologyElement topologyElement =
+ kryo.readObject(input, TopologyElement.class);
+ kryoFactory.deleteKryo(kryo);
+ flowEventHandlerService.notificationRecvTopologyElementUpdated(topologyElement);
+ */
+ }
+
+ /**
+ * Receive a notification that an entry is evicted.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryEvicted(EntryEvent<byte[], byte[]> event) {
+ // NOTE: We don't use eviction for this map
+ }
+ }
/**
* Initialize the Hazelcast Datagrid operation.
@@ -437,6 +530,9 @@
hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastConfig);
restApi.addRestletRoutable(new DatagridWebRoutable());
+
+ arpMap = hazelcastInstance.getMap(arpMapName);
+ arpMap.addEntryListener(new ArpMapListener(), true);
}
/**
@@ -495,7 +591,19 @@
this.flowEventHandlerService = null;
}
-
+
+ @Override
+ public void registerArpEventHandler(IArpEventHandler arpEventHandler) {
+ if (arpEventHandler != null) {
+ arpEventHandlers.add(arpEventHandler);
+ }
+ }
+
+ @Override
+ public void deregisterArpEventHandler(IArpEventHandler arpEventHandler) {
+ arpEventHandlers.remove(arpEventHandler);
+ }
+
/**
* Get all Flows that are currently in the datagrid.
*
@@ -782,4 +890,10 @@
mapTopology.removeAsync(key);
}
}
+
+ @Override
+ public void sendArpRequest(byte[] arpRequest) {
+ log.debug("ARP bytes: {}", HexString.toHexString(arpRequest));
+ arpMap.putAsync(arpRequest, dummyByte, 1L, TimeUnit.MILLISECONDS);
+ }
}
diff --git a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
index 1bcf601..3f40480 100644
--- a/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
+++ b/src/main/java/net/onrc/onos/datagrid/IDatagridService.java
@@ -3,8 +3,8 @@
import java.util.Collection;
import net.floodlightcontroller.core.module.IFloodlightService;
-
import net.onrc.onos.ofcontroller.flowmanager.IFlowEventHandlerService;
+import net.onrc.onos.ofcontroller.proxyarp.IArpEventHandler;
import net.onrc.onos.ofcontroller.topology.TopologyElement;
import net.onrc.onos.ofcontroller.util.FlowEntry;
import net.onrc.onos.ofcontroller.util.FlowEntryId;
@@ -37,6 +37,20 @@
void deregisterFlowEventHandlerService(IFlowEventHandlerService flowEventHandlerService);
/**
+ * Register event handler for ARP events.
+ *
+ * @param arpEventHandler The ARP event handler to register.
+ */
+ public void registerArpEventHandler(IArpEventHandler arpEventHandler);
+
+ /**
+ * De-register event handler service for ARP events.
+ *
+ * @param arpEventHandler The ARP event handler to de-register.
+ */
+ public void deregisterArpEventHandler(IArpEventHandler arpEventHandler);
+
+ /**
* Get all Flows that are currently in the datagrid.
*
* @return all Flows that are currently in the datagrid.
@@ -134,4 +148,10 @@
* Send a notification that all Topology Elements are removed.
*/
void notificationSendAllTopologyElementsRemoved();
+
+ /**
+ * Send an ARP request to other ONOS instances
+ * @param arpRequest The request packet to send
+ */
+ public void sendArpRequest(byte[] arpRequest);
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/core/module/OnosModuleLoader.java b/src/main/java/net/onrc/onos/ofcontroller/core/module/OnosModuleLoader.java
index f3d0da5..b14dc12 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/core/module/OnosModuleLoader.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/core/module/OnosModuleLoader.java
@@ -11,9 +11,9 @@
import net.floodlightcontroller.core.module.FloodlightModuleException;
import net.floodlightcontroller.core.module.IFloodlightModule;
import net.floodlightcontroller.core.module.IFloodlightService;
-import net.floodlightcontroller.devicemanager.IDeviceService;
import net.floodlightcontroller.restserver.IRestApiService;
import net.floodlightcontroller.topology.ITopologyService;
+import net.onrc.onos.datagrid.IDatagridService;
import net.onrc.onos.ofcontroller.core.config.DefaultConfiguration;
import net.onrc.onos.ofcontroller.core.config.IConfigInfoService;
import net.onrc.onos.ofcontroller.flowmanager.IFlowService;
@@ -24,10 +24,10 @@
public class OnosModuleLoader implements IFloodlightModule {
private IFloodlightProviderService floodlightProvider;
private ITopologyService topology;
- private IDeviceService deviceService;
private IConfigInfoService config;
private IRestApiService restApi;
private IFlowService flowService;
+ private IDatagridService datagrid;
private ProxyArpManager arpManager;
private Forwarding forwarding;
@@ -59,10 +59,9 @@
new ArrayList<Class<? extends IFloodlightService>>();
dependencies.add(IFloodlightProviderService.class);
dependencies.add(ITopologyService.class);
- dependencies.add(IDeviceService.class);
- //dependencies.add(IConfigInfoService.class);
dependencies.add(IRestApiService.class);
dependencies.add(IFlowService.class);
+ dependencies.add(IDatagridService.class);
return dependencies;
}
@@ -71,9 +70,9 @@
throws FloodlightModuleException {
floodlightProvider = context.getServiceImpl(IFloodlightProviderService.class);
topology = context.getServiceImpl(ITopologyService.class);
- deviceService = context.getServiceImpl(IDeviceService.class);
restApi = context.getServiceImpl(IRestApiService.class);
flowService = context.getServiceImpl(IFlowService.class);
+ datagrid = context.getServiceImpl(IDatagridService.class);
//This could be null because it's not mandatory to have an
//IConfigInfoService loaded.
@@ -82,7 +81,7 @@
config = new DefaultConfiguration();
}
- arpManager.init(floodlightProvider, topology, deviceService, config, restApi);
+ arpManager.init(floodlightProvider, topology, datagrid, config, restApi);
forwarding.init(floodlightProvider, flowService);
}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/IArpEventHandler.java b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/IArpEventHandler.java
new file mode 100644
index 0000000..51d92bd
--- /dev/null
+++ b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/IArpEventHandler.java
@@ -0,0 +1,11 @@
+package net.onrc.onos.ofcontroller.proxyarp;
+
+public interface IArpEventHandler {
+
+ /**
+ * Notify the ARP event handler that an ARP request has been received.
+ * @param id The string ID of the ARP request
+ * @param arpRequest The ARP request packet
+ */
+ public void arpRequestNotification(byte[] arpRequest);
+}
diff --git a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ProxyArpManager.java b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ProxyArpManager.java
index a5dabc9..4d261a2 100644
--- a/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ProxyArpManager.java
+++ b/src/main/java/net/onrc/onos/ofcontroller/proxyarp/ProxyArpManager.java
@@ -17,16 +17,18 @@
import net.floodlightcontroller.core.IFloodlightProviderService;
import net.floodlightcontroller.core.IOFMessageListener;
import net.floodlightcontroller.core.IOFSwitch;
-import net.floodlightcontroller.devicemanager.IDeviceService;
import net.floodlightcontroller.packet.ARP;
import net.floodlightcontroller.packet.Ethernet;
import net.floodlightcontroller.packet.IPv4;
import net.floodlightcontroller.restserver.IRestApiService;
import net.floodlightcontroller.topology.ITopologyService;
import net.floodlightcontroller.util.MACAddress;
+import net.onrc.onos.datagrid.IDatagridService;
import net.onrc.onos.ofcontroller.bgproute.Interface;
import net.onrc.onos.ofcontroller.core.IDeviceStorage;
import net.onrc.onos.ofcontroller.core.INetMapTopologyObjects.IDeviceObject;
+import net.onrc.onos.ofcontroller.core.INetMapTopologyService.ITopoLinkService;
+import net.onrc.onos.ofcontroller.core.INetMapTopologyService.ITopoSwitchService;
import net.onrc.onos.ofcontroller.core.config.IConfigInfoService;
import net.onrc.onos.ofcontroller.core.internal.DeviceStorageImpl;
@@ -46,7 +48,8 @@
import com.google.common.collect.SetMultimap;
import com.google.common.net.InetAddresses;
-public class ProxyArpManager implements IProxyArpService, IOFMessageListener {
+public class ProxyArpManager implements IProxyArpService, IOFMessageListener,
+ IArpEventHandler {
private final static Logger log = LoggerFactory.getLogger(ProxyArpManager.class);
private final long ARP_TIMER_PERIOD = 60000; //ms (== 1 min)
@@ -55,11 +58,13 @@
private IFloodlightProviderService floodlightProvider;
private ITopologyService topology;
- private IDeviceService deviceService;
+ private IDatagridService datagrid;
private IConfigInfoService configService;
private IRestApiService restApi;
private IDeviceStorage deviceStorage;
+ private ITopoSwitchService topoSwitchService;
+ private ITopoLinkService topoLinkService;
private short vlan;
private static final short NO_VLAN = 0;
@@ -124,11 +129,11 @@
*/
public void init(IFloodlightProviderService floodlightProvider,
- ITopologyService topology, IDeviceService deviceService,
+ ITopologyService topology, IDatagridService datagrid,
IConfigInfoService config, IRestApiService restApi){
this.floodlightProvider = floodlightProvider;
this.topology = topology;
- this.deviceService = deviceService;
+ this.datagrid = datagrid;
this.configService = config;
this.restApi = restApi;
@@ -145,6 +150,8 @@
restApi.addRestletRoutable(new ArpWebRoutable());
floodlightProvider.addOFMessageListener(OFType.PACKET_IN, this);
+ datagrid.registerArpEventHandler(this);
+
deviceStorage = new DeviceStorageImpl();
deviceStorage.init("");
@@ -232,9 +239,10 @@
public Command receive(
IOFSwitch sw, OFMessage msg, FloodlightContext cntx) {
- if (msg.getType() != OFType.PACKET_IN){
- return Command.CONTINUE;
- }
+ //if (msg.getType() != OFType.PACKET_IN){
+ //return Command.CONTINUE;
+ //}
+ log.debug("received packet");
OFPacketIn pi = (OFPacketIn) msg;
@@ -243,13 +251,13 @@
if (eth.getEtherType() == Ethernet.TYPE_ARP){
ARP arp = (ARP) eth.getPayload();
-
+ log.debug("etharp {}", arp);
if (arp.getOpCode() == ARP.OP_REQUEST) {
//TODO check what the DeviceManager does about propagating
//or swallowing ARPs. We want to go after DeviceManager in the
//chain but we really need it to CONTINUE ARP packets so we can
//get them.
- handleArpRequest(sw, pi, arp);
+ handleArpRequest(sw, pi, arp, eth);
}
else if (arp.getOpCode() == ARP.OP_REPLY) {
//handleArpReply(sw, pi, arp);
@@ -261,7 +269,7 @@
return Command.CONTINUE;
}
- private void handleArpRequest(IOFSwitch sw, OFPacketIn pi, ARP arp) {
+ private void handleArpRequest(IOFSwitch sw, OFPacketIn pi, ARP arp, Ethernet eth) {
if (log.isTraceEnabled()) {
log.trace("ARP request received for {}",
inetAddressToString(arp.getTargetProtocolAddress()));
@@ -302,8 +310,10 @@
IDeviceObject targetDevice =
deviceStorage.getDeviceByIP(InetAddresses.coerceToInteger(target));
+ log.debug("targetDevice: {}", targetDevice);
+
if (targetDevice != null) {
- //We have the device in our database, so send a reply
+ // We have the device in our database, so send a reply
MACAddress macAddress = MACAddress.valueOf(targetDevice.getMACAddress());
if (log.isTraceEnabled()) {
@@ -315,6 +325,16 @@
sendArpReply(arp, sw.getId(), pi.getInPort(), macAddress);
}
+ else {
+ // We don't know the device so broadcast the request out
+ // the edge of the network
+
+ //Record where the request came from so we know where to send the reply
+ arpRequests.put(target, new ArpRequest(
+ new HostArpRequester(arp, sw.getId(), pi.getInPort()), false));
+
+ sendArpRequestToEdge(eth, pi);
+ }
/*if (macAddress == null){
//MAC address is not in our ARP cache.
@@ -459,6 +479,27 @@
}
}
+ private void sendArpRequestToEdge(Ethernet arpRequest, OFPacketIn pi) {
+ // Pull the topology from the network map and find edge ports
+ /*for (ISwitchObject swObject : topoSwitchService.getActiveSwitches()) {
+ //List<Link> linksOnSwitch = topoLinkService.getLinksOnSwitch(swObject.getDPID());
+
+ Set<IPortObject> edgePorts = new HashSet<IPortObject>();
+
+ for (IPortObject portObject : swObject.getPorts()) {
+ if (!portObject.getLinkedPorts().iterator().hasNext()) {
+ edgePorts.add(portObject);
+ }
+ }
+
+
+ }*/
+ //log.debug("Sending ARP request for {} to other ONOS instances",
+ //HexString.toHexString(arpRequest.getTargetProtocolAddress()));
+ //log.debug("Sent request bytes: {}", );
+ datagrid.sendArpRequest(arpRequest.serialize());
+ }
+
private void broadcastArpRequestOutEdge(byte[] arpRequest, long inSwitch, short inPort) {
for (IOFSwitch sw : floodlightProvider.getSwitches().values()){
Collection<Short> enabledPorts = sw.getEnabledPortNumbers();
@@ -637,4 +678,14 @@
public List<String> getMappings() {
return arpCache.getMappings();
}
+
+ /*
+ * IArpEventHandler methods
+ */
+
+ @Override
+ public void arpRequestNotification(byte[] arpRequest) {
+ log.debug("Received ARP notification from other instances");
+ broadcastArpRequestOutEdge(arpRequest, Long.MAX_VALUE, Short.MAX_VALUE);
+ }
}