Began implementing IPC with Hazelcast in order to broadcast ARP via other ONOS nodes
diff --git a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
index d9fb7c3..7f8a7a7 100644
--- a/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
+++ b/src/main/java/net/onrc/onos/datagrid/HazelcastDatagrid.java
@@ -5,12 +5,10 @@
import java.util.Collection;
import java.util.HashMap;
import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.Set;
-
-import com.esotericsoftware.kryo2.Kryo;
-import com.esotericsoftware.kryo2.io.Input;
-import com.esotericsoftware.kryo2.io.Output;
+import java.util.concurrent.TimeUnit;
import net.floodlightcontroller.core.IFloodlightProviderService;
import net.floodlightcontroller.core.module.FloodlightModuleContext;
@@ -18,9 +16,9 @@
import net.floodlightcontroller.core.module.IFloodlightModule;
import net.floodlightcontroller.core.module.IFloodlightService;
import net.floodlightcontroller.restserver.IRestApiService;
-
import net.onrc.onos.datagrid.web.DatagridWebRoutable;
import net.onrc.onos.ofcontroller.flowmanager.IFlowEventHandlerService;
+import net.onrc.onos.ofcontroller.proxyarp.IArpEventHandler;
import net.onrc.onos.ofcontroller.topology.TopologyElement;
import net.onrc.onos.ofcontroller.util.FlowEntry;
import net.onrc.onos.ofcontroller.util.FlowEntryId;
@@ -28,9 +26,13 @@
import net.onrc.onos.ofcontroller.util.FlowPath;
import net.onrc.onos.ofcontroller.util.serializers.KryoFactory;
+import org.openflow.util.HexString;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.esotericsoftware.kryo2.Kryo;
+import com.esotericsoftware.kryo2.io.Input;
+import com.esotericsoftware.kryo2.io.Output;
import com.hazelcast.config.Config;
import com.hazelcast.config.FileSystemXmlConfig;
import com.hazelcast.core.EntryEvent;
@@ -76,6 +78,12 @@
private IMap<String, byte[]> mapTopology = null;
private MapTopologyListener mapTopologyListener = null;
private String mapTopologyListenerId = null;
+
+ // State related to the ARP map
+ protected static final String arpMapName = "arpMap";
+ private IMap<byte[], byte[]> arpMap = null;
+ private List<IArpEventHandler> arpEventHandlers = new ArrayList<IArpEventHandler>();
+ private final byte[] dummyByte = {0};
/**
* Class for receiving notifications for Flow state.
@@ -322,6 +330,91 @@
// NOTE: We don't use eviction for this map
}
}
+
+ /**
+ * Class for receiving notifications for ARP requests.
+ *
+ * The datagrid map is:
+ * - Key: Request ID (String)
+ * - Value: ARP request packet (byte[])
+ */
+ class ArpMapListener implements EntryListener<byte[], byte[]> {
+ /**
+ * Receive a notification that an entry is added.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryAdded(EntryEvent<byte[], byte[]> event) {
+ for (IArpEventHandler arpEventHandler : arpEventHandlers) {
+ arpEventHandler.arpRequestNotification(event.getKey());
+ }
+
+ //
+ // Decode the value and deliver the notification
+ //
+ /*
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ TopologyElement topologyElement =
+ kryo.readObject(input, TopologyElement.class);
+ kryoFactory.deleteKryo(kryo);
+ flowEventHandlerService.notificationRecvTopologyElementAdded(topologyElement);
+ */
+ }
+
+ /**
+ * Receive a notification that an entry is removed.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryRemoved(EntryEvent<byte[], byte[]> event) {
+ /*
+ String keyString = (String)event.getKey();
+ byte[] valueBytes = (byte[])event.getValue();
+
+ //
+ // Decode the value and deliver the notification
+ //
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ TopologyElement topologyElement =
+ kryo.readObject(input, TopologyElement.class);
+ kryoFactory.deleteKryo(kryo);
+ flowEventHandlerService.notificationRecvTopologyElementRemoved(topologyElement);
+ */
+ }
+
+ /**
+ * Receive a notification that an entry is updated.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryUpdated(EntryEvent<byte[], byte[]> event) {
+ /*
+ String keyString = (String)event.getKey();
+ byte[] valueBytes = (byte[])event.getValue();
+
+ //
+ // Decode the value and deliver the notification
+ //
+ Kryo kryo = kryoFactory.newKryo();
+ Input input = new Input(valueBytes);
+ TopologyElement topologyElement =
+ kryo.readObject(input, TopologyElement.class);
+ kryoFactory.deleteKryo(kryo);
+ flowEventHandlerService.notificationRecvTopologyElementUpdated(topologyElement);
+ */
+ }
+
+ /**
+ * Receive a notification that an entry is evicted.
+ *
+ * @param event the notification event for the entry.
+ */
+ public void entryEvicted(EntryEvent<byte[], byte[]> event) {
+ // NOTE: We don't use eviction for this map
+ }
+ }
/**
* Initialize the Hazelcast Datagrid operation.
@@ -437,6 +530,9 @@
hazelcastInstance = Hazelcast.newHazelcastInstance(hazelcastConfig);
restApi.addRestletRoutable(new DatagridWebRoutable());
+
+ arpMap = hazelcastInstance.getMap(arpMapName);
+ arpMap.addEntryListener(new ArpMapListener(), true);
}
/**
@@ -495,7 +591,19 @@
this.flowEventHandlerService = null;
}
-
+
+ @Override
+ public void registerArpEventHandler(IArpEventHandler arpEventHandler) {
+ if (arpEventHandler != null) {
+ arpEventHandlers.add(arpEventHandler);
+ }
+ }
+
+ @Override
+ public void deregisterArpEventHandler(IArpEventHandler arpEventHandler) {
+ arpEventHandlers.remove(arpEventHandler);
+ }
+
/**
* Get all Flows that are currently in the datagrid.
*
@@ -782,4 +890,10 @@
mapTopology.removeAsync(key);
}
}
+
+ @Override
+ public void sendArpRequest(byte[] arpRequest) {
+ log.debug("ARP bytes: {}", HexString.toHexString(arpRequest));
+ arpMap.putAsync(arpRequest, dummyByte, 1L, TimeUnit.MILLISECONDS);
+ }
}