Implemented global packet-out functionality in the PacketModule.

Updated with javadoc for the interfaces, and fixes to bugs pointed out by
Pavlin and Yuta regarding the implementation of calculateOutPorts in
BroadcastPacketOutNotification.

Change-Id: I6174d27877972cf437955ef8d82e9a02b36d0b5f
diff --git a/src/main/java/net/onrc/onos/core/packetservice/PacketModule.java b/src/main/java/net/onrc/onos/core/packetservice/PacketModule.java
index b804a82..27bfbb5 100644
--- a/src/main/java/net/onrc/onos/core/packetservice/PacketModule.java
+++ b/src/main/java/net/onrc/onos/core/packetservice/PacketModule.java
@@ -17,23 +17,71 @@
 import net.floodlightcontroller.core.module.IFloodlightService;
 import net.onrc.onos.api.packet.IPacketListener;
 import net.onrc.onos.api.packet.IPacketService;
+import net.onrc.onos.core.datagrid.IDatagridService;
+import net.onrc.onos.core.datagrid.IEventChannel;
+import net.onrc.onos.core.datagrid.IEventChannelListener;
+import net.onrc.onos.core.flowprogrammer.IFlowPusherService;
 import net.onrc.onos.core.packet.Ethernet;
 import net.onrc.onos.core.topology.INetworkGraphService;
 import net.onrc.onos.core.topology.NetworkGraph;
 import net.onrc.onos.core.topology.Port;
 import net.onrc.onos.core.topology.Switch;
+import net.onrc.onos.core.util.SwitchPort;
 
 import org.openflow.protocol.OFMessage;
 import org.openflow.protocol.OFPacketIn;
+import org.openflow.protocol.OFPacketOut;
+import org.openflow.protocol.OFPhysicalPort;
+import org.openflow.protocol.OFPort;
 import org.openflow.protocol.OFType;
+import org.openflow.protocol.action.OFAction;
+import org.openflow.protocol.action.OFActionOutput;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
 
 public class PacketModule implements IOFMessageListener, IPacketService,
                                      IFloodlightModule {
+    private static final Logger log = LoggerFactory.getLogger(PacketModule.class);
 
     private final CopyOnWriteArrayList<IPacketListener> listeners;
 
     private IFloodlightProviderService floodlightProvider;
     private NetworkGraph networkGraph;
+    private IDatagridService datagrid;
+    private IFlowPusherService flowPusher;
+
+    private IEventChannel<Long, PacketOutNotification>
+            packetOutEventChannel;
+
+    private static final String PACKET_OUT_CHANNEL_NAME =
+            "onos.packet_out";
+
+    private PacketOutEventHandler packetOutEventHandler =
+            new PacketOutEventHandler();
+
+    private class PacketOutEventHandler implements
+            IEventChannelListener<Long, PacketOutNotification> {
+
+        @Override
+        public void entryAdded(PacketOutNotification value) {
+            Multimap<Long, Short> outPorts = value.calculateOutPorts(
+                    findLocalEdgePorts());
+            sendPacketToSwitches(outPorts, value.getPacketData());
+        }
+
+        @Override
+        public void entryUpdated(PacketOutNotification value) {
+            entryAdded(value);
+        }
+
+        @Override
+        public void entryRemoved(PacketOutNotification value) {
+            // Not used
+        }
+    }
 
     public PacketModule() {
         listeners = new CopyOnWriteArrayList<>();
@@ -45,13 +93,18 @@
     }
 
     @Override
-    public void sendPacket(Port port, Ethernet eth) {
-        // TODO Auto-generated method stub
+    public void sendPacket(SwitchPort switchPort, Ethernet eth) {
+        SinglePacketOutNotification notification =
+                new SinglePacketOutNotification(eth.serialize(), 0,
+                switchPort.dpid().value(), switchPort.port().value());
 
+        // TODO We shouldn't care what the destination MAC is
+        long dstMac = eth.getDestinationMAC().toLong();
+        packetOutEventChannel.addTransientEntry(dstMac, notification);
     }
 
     @Override
-    public void sendPacket(List<Port> ports, Ethernet eth) {
+    public void sendPacket(List<SwitchPort> switchPorts, Ethernet eth) {
         // TODO Auto-generated method stub
 
     }
@@ -63,15 +116,18 @@
     }
 
     @Override
-    public void broadcastPacket(Ethernet eth, Port inPort) {
-        // TODO Auto-generated method stub
+    public void broadcastPacket(Ethernet eth, SwitchPort inSwitchPort) {
+        BroadcastPacketOutNotification notification =
+                new BroadcastPacketOutNotification(eth.serialize(), 0,
+                inSwitchPort.dpid().value(), inSwitchPort.port().value());
 
+        long dstMac = eth.getDestinationMAC().toLong();
+        packetOutEventChannel.addTransientEntry(dstMac, notification);
     }
 
     @Override
     public String getName() {
-        // TODO Auto-generated method stub
-        return null;
+        return "packetmodule";
     }
 
     @Override
@@ -82,7 +138,6 @@
 
     @Override
     public boolean isCallbackOrderingPostreq(OFType type, String name) {
-        // TODO Auto-generated method stub
         return false;
     }
 
@@ -128,9 +183,9 @@
 
     @Override
     public Map<Class<? extends IFloodlightService>, IFloodlightService>
-            getServiceImpls() {
+    getServiceImpls() {
         Map<Class<? extends IFloodlightService>, IFloodlightService>
-                serviceImpls = new HashMap<>();
+        serviceImpls = new HashMap<>();
         serviceImpls.put(IPacketService.class, this);
         return serviceImpls;
     }
@@ -140,6 +195,8 @@
         List<Class<? extends IFloodlightService>> dependencies = new ArrayList<>();
         dependencies.add(IFloodlightProviderService.class);
         dependencies.add(INetworkGraphService.class);
+        dependencies.add(IDatagridService.class);
+        dependencies.add(IFlowPusherService.class);
         return dependencies;
     }
 
@@ -150,10 +207,63 @@
                 context.getServiceImpl(IFloodlightProviderService.class);
         networkGraph = context.getServiceImpl(INetworkGraphService.class)
                 .getNetworkGraph();
+        datagrid = context.getServiceImpl(IDatagridService.class);
+        flowPusher = context.getServiceImpl(IFlowPusherService.class);
     }
 
     @Override
     public void startUp(FloodlightModuleContext context) {
         floodlightProvider.addOFMessageListener(OFType.PACKET_IN, this);
+
+        packetOutEventChannel = datagrid.addListener(PACKET_OUT_CHANNEL_NAME,
+                packetOutEventHandler,
+                Long.class,
+                PacketOutNotification.class);
+    }
+
+    private Multimap<Long, Short> findLocalEdgePorts() {
+        Multimap<Long, Short> edgePorts = HashMultimap.create();
+        Map<Long, IOFSwitch> localSwitches = floodlightProvider.getSwitches();
+        for (IOFSwitch sw : localSwitches.values()) {
+            for (OFPhysicalPort localPort : sw.getEnabledPorts()) {
+                Port globalPort =
+                        networkGraph.getPort(sw.getId(), (long) localPort.getPortNumber());
+                if (globalPort.getOutgoingLink() == null) {
+                    edgePorts.put(sw.getId(), localPort.getPortNumber());
+                }
+            }
+        }
+        return edgePorts;
+    }
+
+    private void sendPacketToSwitches(Multimap<Long, Short> outPorts,
+            byte[] packetData) {
+        for (Long dpid : outPorts.keySet()) {
+            OFPacketOut po = new OFPacketOut();
+            po.setInPort(OFPort.OFPP_NONE)
+              .setBufferId(OFPacketOut.BUFFER_ID_NONE)
+              .setPacketData(packetData);
+
+            List<OFAction> actions = new ArrayList<OFAction>();
+            for (Short port : outPorts.get(dpid)) {
+                actions.add(new OFActionOutput(port));
+            }
+
+            po.setActions(actions);
+            short actionsLength = (short)
+                    (actions.size() * OFActionOutput.MINIMUM_LENGTH);
+            po.setActionsLength(actionsLength);
+            po.setLengthU(OFPacketOut.MINIMUM_LENGTH + actionsLength
+                    + packetData.length);
+
+            IOFSwitch sw = floodlightProvider.getSwitches().get(dpid);
+
+            if (sw == null) {
+                log.warn("Switch not found when sending packet");
+                return;
+            }
+
+            flowPusher.add(sw, po);
+        }
     }
 }