Implemented global packet-out functionality in the PacketModule.

Updated with javadoc for the interfaces, and fixes to bugs pointed out by
Pavlin and Yuta regarding the implementation of calculateOutPorts in
BroadcastPacketOutNotification.

Change-Id: I6174d27877972cf437955ef8d82e9a02b36d0b5f
diff --git a/src/main/java/net/onrc/onos/core/packetservice/BroadcastPacketOutNotification.java b/src/main/java/net/onrc/onos/core/packetservice/BroadcastPacketOutNotification.java
index cea27ae..3e098ed 100644
--- a/src/main/java/net/onrc/onos/core/packetservice/BroadcastPacketOutNotification.java
+++ b/src/main/java/net/onrc/onos/core/packetservice/BroadcastPacketOutNotification.java
@@ -1,7 +1,10 @@
 package net.onrc.onos.core.packetservice;
 
+import java.util.Map;
 
-// TODO This class is too generic to be handled by ProxyArpService.
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
+
 // TODO The generic broadcast packet shouldn't contain an IP address which is
 // only for ARP packets.
 
@@ -70,4 +73,18 @@
     public int getTargetAddress() {
         return address;
     }
+
+    @Override
+    public Multimap<Long, Short> calculateOutPorts(Multimap<Long, Short> localSwitches) {
+        Multimap<Long, Short> outPorts = HashMultimap.create();
+
+        for (Map.Entry<Long, Short> entry : localSwitches.entries()) {
+            if (!entry.getKey().equals(inSwitch) ||
+                    !entry.getValue().equals(inPort)) {
+                outPorts.put(entry.getKey(), entry.getValue());
+            }
+        }
+
+        return outPorts;
+    }
 }
diff --git a/src/main/java/net/onrc/onos/core/packetservice/PacketModule.java b/src/main/java/net/onrc/onos/core/packetservice/PacketModule.java
index b804a82..27bfbb5 100644
--- a/src/main/java/net/onrc/onos/core/packetservice/PacketModule.java
+++ b/src/main/java/net/onrc/onos/core/packetservice/PacketModule.java
@@ -17,23 +17,71 @@
 import net.floodlightcontroller.core.module.IFloodlightService;
 import net.onrc.onos.api.packet.IPacketListener;
 import net.onrc.onos.api.packet.IPacketService;
+import net.onrc.onos.core.datagrid.IDatagridService;
+import net.onrc.onos.core.datagrid.IEventChannel;
+import net.onrc.onos.core.datagrid.IEventChannelListener;
+import net.onrc.onos.core.flowprogrammer.IFlowPusherService;
 import net.onrc.onos.core.packet.Ethernet;
 import net.onrc.onos.core.topology.INetworkGraphService;
 import net.onrc.onos.core.topology.NetworkGraph;
 import net.onrc.onos.core.topology.Port;
 import net.onrc.onos.core.topology.Switch;
+import net.onrc.onos.core.util.SwitchPort;
 
 import org.openflow.protocol.OFMessage;
 import org.openflow.protocol.OFPacketIn;
+import org.openflow.protocol.OFPacketOut;
+import org.openflow.protocol.OFPhysicalPort;
+import org.openflow.protocol.OFPort;
 import org.openflow.protocol.OFType;
+import org.openflow.protocol.action.OFAction;
+import org.openflow.protocol.action.OFActionOutput;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
 
 public class PacketModule implements IOFMessageListener, IPacketService,
                                      IFloodlightModule {
+    private static final Logger log = LoggerFactory.getLogger(PacketModule.class);
 
     private final CopyOnWriteArrayList<IPacketListener> listeners;
 
     private IFloodlightProviderService floodlightProvider;
     private NetworkGraph networkGraph;
+    private IDatagridService datagrid;
+    private IFlowPusherService flowPusher;
+
+    private IEventChannel<Long, PacketOutNotification>
+            packetOutEventChannel;
+
+    private static final String PACKET_OUT_CHANNEL_NAME =
+            "onos.packet_out";
+
+    private PacketOutEventHandler packetOutEventHandler =
+            new PacketOutEventHandler();
+
+    private class PacketOutEventHandler implements
+            IEventChannelListener<Long, PacketOutNotification> {
+
+        @Override
+        public void entryAdded(PacketOutNotification value) {
+            Multimap<Long, Short> outPorts = value.calculateOutPorts(
+                    findLocalEdgePorts());
+            sendPacketToSwitches(outPorts, value.getPacketData());
+        }
+
+        @Override
+        public void entryUpdated(PacketOutNotification value) {
+            entryAdded(value);
+        }
+
+        @Override
+        public void entryRemoved(PacketOutNotification value) {
+            // Not used
+        }
+    }
 
     public PacketModule() {
         listeners = new CopyOnWriteArrayList<>();
@@ -45,13 +93,18 @@
     }
 
     @Override
-    public void sendPacket(Port port, Ethernet eth) {
-        // TODO Auto-generated method stub
+    public void sendPacket(SwitchPort switchPort, Ethernet eth) {
+        SinglePacketOutNotification notification =
+                new SinglePacketOutNotification(eth.serialize(), 0,
+                switchPort.dpid().value(), switchPort.port().value());
 
+        // TODO We shouldn't care what the destination MAC is
+        long dstMac = eth.getDestinationMAC().toLong();
+        packetOutEventChannel.addTransientEntry(dstMac, notification);
     }
 
     @Override
-    public void sendPacket(List<Port> ports, Ethernet eth) {
+    public void sendPacket(List<SwitchPort> switchPorts, Ethernet eth) {
         // TODO Auto-generated method stub
 
     }
@@ -63,15 +116,18 @@
     }
 
     @Override
-    public void broadcastPacket(Ethernet eth, Port inPort) {
-        // TODO Auto-generated method stub
+    public void broadcastPacket(Ethernet eth, SwitchPort inSwitchPort) {
+        BroadcastPacketOutNotification notification =
+                new BroadcastPacketOutNotification(eth.serialize(), 0,
+                inSwitchPort.dpid().value(), inSwitchPort.port().value());
 
+        long dstMac = eth.getDestinationMAC().toLong();
+        packetOutEventChannel.addTransientEntry(dstMac, notification);
     }
 
     @Override
     public String getName() {
-        // TODO Auto-generated method stub
-        return null;
+        return "packetmodule";
     }
 
     @Override
@@ -82,7 +138,6 @@
 
     @Override
     public boolean isCallbackOrderingPostreq(OFType type, String name) {
-        // TODO Auto-generated method stub
         return false;
     }
 
@@ -128,9 +183,9 @@
 
     @Override
     public Map<Class<? extends IFloodlightService>, IFloodlightService>
-            getServiceImpls() {
+    getServiceImpls() {
         Map<Class<? extends IFloodlightService>, IFloodlightService>
-                serviceImpls = new HashMap<>();
+        serviceImpls = new HashMap<>();
         serviceImpls.put(IPacketService.class, this);
         return serviceImpls;
     }
@@ -140,6 +195,8 @@
         List<Class<? extends IFloodlightService>> dependencies = new ArrayList<>();
         dependencies.add(IFloodlightProviderService.class);
         dependencies.add(INetworkGraphService.class);
+        dependencies.add(IDatagridService.class);
+        dependencies.add(IFlowPusherService.class);
         return dependencies;
     }
 
@@ -150,10 +207,63 @@
                 context.getServiceImpl(IFloodlightProviderService.class);
         networkGraph = context.getServiceImpl(INetworkGraphService.class)
                 .getNetworkGraph();
+        datagrid = context.getServiceImpl(IDatagridService.class);
+        flowPusher = context.getServiceImpl(IFlowPusherService.class);
     }
 
     @Override
     public void startUp(FloodlightModuleContext context) {
         floodlightProvider.addOFMessageListener(OFType.PACKET_IN, this);
+
+        packetOutEventChannel = datagrid.addListener(PACKET_OUT_CHANNEL_NAME,
+                packetOutEventHandler,
+                Long.class,
+                PacketOutNotification.class);
+    }
+
+    private Multimap<Long, Short> findLocalEdgePorts() {
+        Multimap<Long, Short> edgePorts = HashMultimap.create();
+        Map<Long, IOFSwitch> localSwitches = floodlightProvider.getSwitches();
+        for (IOFSwitch sw : localSwitches.values()) {
+            for (OFPhysicalPort localPort : sw.getEnabledPorts()) {
+                Port globalPort =
+                        networkGraph.getPort(sw.getId(), (long) localPort.getPortNumber());
+                if (globalPort.getOutgoingLink() == null) {
+                    edgePorts.put(sw.getId(), localPort.getPortNumber());
+                }
+            }
+        }
+        return edgePorts;
+    }
+
+    private void sendPacketToSwitches(Multimap<Long, Short> outPorts,
+            byte[] packetData) {
+        for (Long dpid : outPorts.keySet()) {
+            OFPacketOut po = new OFPacketOut();
+            po.setInPort(OFPort.OFPP_NONE)
+              .setBufferId(OFPacketOut.BUFFER_ID_NONE)
+              .setPacketData(packetData);
+
+            List<OFAction> actions = new ArrayList<OFAction>();
+            for (Short port : outPorts.get(dpid)) {
+                actions.add(new OFActionOutput(port));
+            }
+
+            po.setActions(actions);
+            short actionsLength = (short)
+                    (actions.size() * OFActionOutput.MINIMUM_LENGTH);
+            po.setActionsLength(actionsLength);
+            po.setLengthU(OFPacketOut.MINIMUM_LENGTH + actionsLength
+                    + packetData.length);
+
+            IOFSwitch sw = floodlightProvider.getSwitches().get(dpid);
+
+            if (sw == null) {
+                log.warn("Switch not found when sending packet");
+                return;
+            }
+
+            flowPusher.add(sw, po);
+        }
     }
 }
diff --git a/src/main/java/net/onrc/onos/core/packetservice/PacketOutNotification.java b/src/main/java/net/onrc/onos/core/packetservice/PacketOutNotification.java
index 30e3986..8fe8efe 100644
--- a/src/main/java/net/onrc/onos/core/packetservice/PacketOutNotification.java
+++ b/src/main/java/net/onrc/onos/core/packetservice/PacketOutNotification.java
@@ -3,6 +3,8 @@
 import java.io.Serializable;
 import java.util.Arrays;
 
+import com.google.common.collect.Multimap;
+
 /**
  * A PacketOutNotification contains data sent between ONOS instances that
  * directs other instances to send a packet out a set of ports. This is an
@@ -16,19 +18,44 @@
     private final byte[] packet;
 
     /**
+     * Default constructor.
+     */
+    protected PacketOutNotification() {
+        packet = null;
+    }
+
+    /**
      * Class constructor.
      *
      * @param packet the packet data to send in the packet-out
      */
-    public PacketOutNotification() {
-        packet = null;
-    }
-
     public PacketOutNotification(byte[] packet) {
         this.packet = Arrays.copyOf(packet, packet.length);
     }
 
+    /**
+     * Gets the packet that needs to be sent into the network.
+     *
+     * @return the packet data as a serialized byte array
+     */
     public byte[] getPacketData() {
         return Arrays.copyOf(packet, packet.length);
     }
+
+    /**
+     * Calculate a list of local ports that the packet should be sent out.
+     * <p/>
+     * A {@link PacketOutNotification} contains a high-level description of the
+     * where to send the packet. The receiver of the notification needs to know
+     * an explicit list of ports to send the packet out. This function will
+     * calculate that list, given the list of edge ports controlled by this
+     * instance.
+     *
+     * @param localSwitches the map of locally-controlled edge ports
+     * @return a multimap of ports that the packet should be sent out,
+     * in the form
+     * {@code {dpid1 => {portnum1, portnum2, ...}, dpid2 => {portnum1}, ...}}
+     */
+    public abstract Multimap<Long, Short> calculateOutPorts(
+            Multimap<Long, Short> localEdgePorts);
 }
diff --git a/src/main/java/net/onrc/onos/core/packetservice/SinglePacketOutNotification.java b/src/main/java/net/onrc/onos/core/packetservice/SinglePacketOutNotification.java
index 4e9f47d..e6260e0 100644
--- a/src/main/java/net/onrc/onos/core/packetservice/SinglePacketOutNotification.java
+++ b/src/main/java/net/onrc/onos/core/packetservice/SinglePacketOutNotification.java
@@ -1,7 +1,7 @@
 package net.onrc.onos.core.packetservice;
 
-
-// TODO This class is too generic to be handled by ProxyArpService.
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
 
 /**
  * Notification to another ONOS instance to send a packet out a single port.
@@ -58,4 +58,15 @@
     public int getTargetAddress() {
         return address;
     }
+
+    @Override
+    public Multimap<Long, Short> calculateOutPorts(Multimap<Long, Short> localSwitches) {
+        Multimap<Long, Short> outPorts = HashMultimap.create();
+
+        if (localSwitches.containsEntry(outSwitch, outPort)) {
+            outPorts.put(outSwitch, outPort);
+        }
+
+        return outPorts;
+    }
 }
diff --git a/src/main/java/net/onrc/onos/core/util/SwitchPort.java b/src/main/java/net/onrc/onos/core/util/SwitchPort.java
index 9178267..edb9f77 100644
--- a/src/main/java/net/onrc/onos/core/util/SwitchPort.java
+++ b/src/main/java/net/onrc/onos/core/util/SwitchPort.java
@@ -30,6 +30,17 @@
     }
 
     /**
+     * Constructor for the specified primitive values of a DPID and port.
+     *
+     * @param dpid the long DPID to use
+     * @param port the short port number to use
+     */
+    public SwitchPort(long dpid, short port) {
+        this.dpid = new Dpid(dpid);
+        this.port = new Port(port);
+    }
+
+    /**
      * Get the DPID value of the Switch-Port.
      *
      * @return the DPID value of the Switch-Port.
diff --git a/src/main/java/net/onrc/onos/core/util/serializers/KryoFactory.java b/src/main/java/net/onrc/onos/core/util/serializers/KryoFactory.java
index d1794d1..a1c4728 100644
--- a/src/main/java/net/onrc/onos/core/util/serializers/KryoFactory.java
+++ b/src/main/java/net/onrc/onos/core/util/serializers/KryoFactory.java
@@ -19,6 +19,7 @@
 import net.onrc.onos.core.intent.ShortestPathIntent;
 import net.onrc.onos.core.intent.runtime.IntentStateList;
 import net.onrc.onos.core.packetservice.BroadcastPacketOutNotification;
+import net.onrc.onos.core.packetservice.PacketOutNotification;
 import net.onrc.onos.core.packetservice.SinglePacketOutNotification;
 import net.onrc.onos.core.topology.DeviceEvent;
 import net.onrc.onos.core.topology.LinkEvent;
@@ -51,7 +52,6 @@
 import net.onrc.onos.core.util.Switch;
 // import net.onrc.onos.core.util.SwitchPort;
 
-
 import com.esotericsoftware.kryo.Kryo;
 
 /**
@@ -217,6 +217,7 @@
         kryo.register(Date.class);
 
         // ProxyArp-related classes
+        kryo.register(PacketOutNotification.class);
         kryo.register(BroadcastPacketOutNotification.class);
         kryo.register(SinglePacketOutNotification.class);
         kryo.register(ArpReplyNotification.class);