[ONOS-6810] Implement Mastership handling in general DeviceProvider

Change-Id: I14b706d364cf5124da248230fbcda65d0bd284ce
diff --git a/providers/p4runtime/packet/src/main/java/org/onosproject/provider/p4runtime/packet/impl/P4RuntimePacketProvider.java b/providers/p4runtime/packet/src/main/java/org/onosproject/provider/p4runtime/packet/impl/P4RuntimePacketProvider.java
index fb4a97a..8693eff 100644
--- a/providers/p4runtime/packet/src/main/java/org/onosproject/provider/p4runtime/packet/impl/P4RuntimePacketProvider.java
+++ b/providers/p4runtime/packet/src/main/java/org/onosproject/provider/p4runtime/packet/impl/P4RuntimePacketProvider.java
@@ -22,6 +22,7 @@
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.Device;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.device.DeviceService;
@@ -67,6 +68,9 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected DeviceService deviceService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected MastershipService mastershipService;
+
     private PacketProviderService providerService;
 
     private InternalPacketListener packetListener = new InternalPacketListener();
@@ -98,7 +102,7 @@
         if (packet != null) {
             DeviceId deviceId = packet.sendThrough();
             Device device = deviceService.getDevice(deviceId);
-            if (device.is(PacketProgrammable.class)) {
+            if (device.is(PacketProgrammable.class) && mastershipService.isLocalMaster(deviceId)) {
                 PacketProgrammable packetProgrammable = device.as(PacketProgrammable.class);
                 packetProgrammable.emit(packet);
             } else {
@@ -148,7 +152,10 @@
 
         @Override
         public void event(P4RuntimeEvent event) {
-            if (event.type() != P4RuntimeEvent.Type.PACKET_IN) {
+            //Masterhip message is sent to everybody but picked up only by master.
+            //FIXME we need the device ID into p4RuntimeEvnetSubject to check for mastsership
+            if (!(event.subject() instanceof P4RuntimePacketIn) || event.type() != P4RuntimeEvent.Type.PACKET_IN) {
+                log.debug("Event type {}", event.type());
                 // Not a packet-in event, ignore it.
                 return;
             }
@@ -163,7 +170,7 @@
 
             if (!device.is(PiPipelineInterpreter.class)) {
                 log.warn("Unable to process packet-in from {}, device has no PiPipelineInterpreter behaviour",
-                         deviceId);
+                        deviceId);
                 return;
             }
 
@@ -184,7 +191,7 @@
             log.debug("Processing inbound packet: {}", inPkt.toString());
 
             OutboundPacket outPkt = new DefaultOutboundPacket(eventSubject.deviceId(), null,
-                                                              operation.data().asReadOnlyBuffer());
+                    operation.data().asReadOnlyBuffer());
             PacketContext pktCtx = new P4RuntimePacketContext(System.currentTimeMillis(), inPkt, outPkt, false);
 
             // Pushing the packet context up for processing.