[ONOS-7262] Cfm improvements to allow RMeps and Mds and Mas to be added and deleted

Change-Id: Ibffb13d046bfb29dbe88de7b558c95fbf9db046d
diff --git a/incubator/net/src/main/java/org/onosproject/incubator/net/l2monitoring/cfm/impl/CfmMdManager.java b/incubator/net/src/main/java/org/onosproject/incubator/net/l2monitoring/cfm/impl/CfmMdManager.java
index 5a125f5..b18d2f6 100644
--- a/incubator/net/src/main/java/org/onosproject/incubator/net/l2monitoring/cfm/impl/CfmMdManager.java
+++ b/incubator/net/src/main/java/org/onosproject/incubator/net/l2monitoring/cfm/impl/CfmMdManager.java
@@ -65,10 +65,8 @@
     @Activate
     public void activate() {
         appId = coreService.registerApplication(APP_ID);
-
         eventDispatcher.addSink(MdEvent.class, listenerRegistry);
         store.setDelegate(delegate);
-
         log.info("CFM Service Started");
     }
 
@@ -163,9 +161,8 @@
     private class InternalStoreDelegate implements MdStoreDelegate {
         @Override
         public void notify(MdEvent event) {
-            log.debug("New MD event: {}", event.subject());
+            log.debug("New MD event: {}", event);
             eventDispatcher.post(event);
         }
     }
-
 }
diff --git a/incubator/net/src/main/java/org/onosproject/incubator/net/l2monitoring/cfm/impl/CfmMepManager.java b/incubator/net/src/main/java/org/onosproject/incubator/net/l2monitoring/cfm/impl/CfmMepManager.java
index 5723466..e58a570 100644
--- a/incubator/net/src/main/java/org/onosproject/incubator/net/l2monitoring/cfm/impl/CfmMepManager.java
+++ b/incubator/net/src/main/java/org/onosproject/incubator/net/l2monitoring/cfm/impl/CfmMepManager.java
@@ -15,10 +15,21 @@
  */
 package org.onosproject.incubator.net.l2monitoring.cfm.impl;
 
+import static org.onlab.util.Tools.groupedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
@@ -29,6 +40,8 @@
 import org.onosproject.core.CoreService;
 import org.onosproject.core.IdGenerator;
 import org.onosproject.event.AbstractListenerManager;
+import org.onosproject.event.Event;
+import org.onosproject.incubator.net.l2monitoring.cfm.MaintenanceDomain;
 import org.onosproject.incubator.net.l2monitoring.cfm.Mep;
 import org.onosproject.incubator.net.l2monitoring.cfm.MepEntry;
 import org.onosproject.incubator.net.l2monitoring.cfm.MepLbCreate;
@@ -36,12 +49,17 @@
 import org.onosproject.incubator.net.l2monitoring.cfm.identifier.MaIdShort;
 import org.onosproject.incubator.net.l2monitoring.cfm.identifier.MdId;
 import org.onosproject.incubator.net.l2monitoring.cfm.identifier.MepId;
+import org.onosproject.incubator.net.l2monitoring.cfm.identifier.MepKeyId;
 import org.onosproject.incubator.net.l2monitoring.cfm.service.CfmConfigException;
 import org.onosproject.incubator.net.l2monitoring.cfm.service.CfmMdService;
 import org.onosproject.incubator.net.l2monitoring.cfm.service.CfmMepEvent;
 import org.onosproject.incubator.net.l2monitoring.cfm.service.CfmMepListener;
 import org.onosproject.incubator.net.l2monitoring.cfm.service.CfmMepProgrammable;
 import org.onosproject.incubator.net.l2monitoring.cfm.service.CfmMepService;
+import org.onosproject.incubator.net.l2monitoring.cfm.service.MdEvent;
+import org.onosproject.incubator.net.l2monitoring.cfm.service.MdListener;
+import org.onosproject.incubator.net.l2monitoring.cfm.service.MepStore;
+import org.onosproject.incubator.net.l2monitoring.cfm.service.MepStoreDelegate;
 import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.Device;
 import org.onosproject.net.DeviceId;
@@ -61,7 +79,8 @@
 
     private final Logger log = getLogger(getClass());
 
-    private final DeviceListener deviceListener = new InternalDeviceListener();
+    private InternalDeviceListener deviceListener = null;
+    private InternalMdListener mdListener = null;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected DeviceService deviceService;
@@ -78,22 +97,39 @@
     private static final int DEFAULT_POLL_FREQUENCY = 30;
     private int fallbackMepPollFrequency = DEFAULT_POLL_FREQUENCY;
 
+    private InternalEventHandler eventHandler = new InternalEventHandler();
+    private static final Object THREAD_SCHED_LOCK = new Object();
+    private static int numOfEventsQueued = 0;
+    private static int numOfEventsExecuted = 0;
+    private static int numOfHandlerExecution = 0;
+    private static int numOfHandlerScheduled = 0;
+
+    private ScheduledExecutorService executorService = Executors
+            .newScheduledThreadPool(1,
+                    groupedThreads("CfmMepManager", "event-%d", log));
+
+    @SuppressWarnings("unused")
+    private static ScheduledFuture<?> eventHandlerFuture = null;
+    @SuppressWarnings("rawtypes")
+    private ConcurrentLinkedQueue<Event> eventQueue = new ConcurrentLinkedQueue<>();
+
+
     private IdGenerator idGenerator;
 
-    //FIXME Get rid of this hack - we will use this in memory to emulate
-    // a store for the short term.
-    //Note: This is not distributed and will not work in a clustered system
-    //TODO Create a MepStore for this
-    private Collection<Mep> mepCollection;
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected MepStore mepStore;
 
+    protected final MepStoreDelegate delegate = new InternalStoreDelegate();
 
     @Activate
     public void activate() {
-        //FIXME Get rid of this local list
-        mepCollection = new ArrayList<>();
+        mepStore.setDelegate(delegate);
 
-        eventDispatcher.addSink(CfmMepEvent.class, listenerRegistry);
+        deviceListener = new InternalDeviceListener();
         deviceService.addListener(deviceListener);
+        mdListener = new InternalMdListener();
+        cfmMdService.addListener(mdListener);
+        eventDispatcher.addSink(CfmMepEvent.class, listenerRegistry);
         idGenerator = coreService.getIdGenerator("mep-ids");
         log.info("CFM MEP Manager Started");
     }
@@ -101,9 +137,12 @@
     @Deactivate
     public void deactivate() {
         deviceService.removeListener(deviceListener);
+        cfmMdService.removeListener(mdListener);
         eventDispatcher.removeSink(CfmMepEvent.class);
         log.info("CFM MEP Manager Stopped");
-        mepCollection.clear();
+        mepStore.unsetDelegate(delegate);
+        deviceListener = null;
+        mdListener = null;
     }
 
     @Override
@@ -112,27 +151,28 @@
         //Will throw IllegalArgumentException if ma does not exist
         cfmMdService.getMaintenanceAssociation(mdName, maName);
 
+        Collection<Mep> mepStoreCollection = mepStore.getAllMeps();
         Collection<MepEntry> mepEntryCollection = new ArrayList<>();
 
-        for (Mep mep:mepCollection) {
+        for (Mep mep : mepStoreCollection) {
             if (mep.mdId().equals(mdName) && mep.maId().equals(maName)) {
                 DeviceId mepDeviceId = mep.deviceId();
                 if (deviceService.getDevice(mepDeviceId) == null) {
                     log.warn("Device not found/available " + mepDeviceId +
-                        " for MEP: " + mdName + "/" + maName + "/" + mep.mepId());
+                            " for MEP: " + mdName + "/" + maName + "/" + mep.mepId());
                     continue;
                 } else if (!deviceService.getDevice(mepDeviceId)
-                                        .is(CfmMepProgrammable.class)) {
+                        .is(CfmMepProgrammable.class)) {
                     throw new CfmConfigException("Device " + mepDeviceId +
                             " does not support CfmMepProgrammable behaviour.");
                 }
 
                 log.debug("Retrieving MEP results for Mep {} in MD {}, MA {} "
-                    + "on Device {}", mep.mepId(), mdName, maName, mepDeviceId);
+                        + "on Device {}", mep.mepId(), mdName, maName, mepDeviceId);
                 mepEntryCollection.add(deviceService
-                                        .getDevice(mepDeviceId)
-                                        .as(CfmMepProgrammable.class)
-                                        .getMep(mdName, maName, mep.mepId()));
+                        .getDevice(mepDeviceId)
+                        .as(CfmMepProgrammable.class)
+                        .getMep(mdName, maName, mep.mepId()));
             }
         }
 
@@ -140,75 +180,116 @@
     }
 
     @Override
-    public MepEntry getMep(MdId mdName, MaIdShort maName, MepId mepId) throws CfmConfigException {
-        //Will throw IllegalArgumentException if ma does not exist
-        cfmMdService.getMaintenanceAssociation(mdName, maName);
-
-        for (Mep mep : mepCollection) {
-            if (mep.mdId().equals(mdName) && mep.maId().equals(maName)
-                    && mep.mepId().equals(mepId)) {
-
-                DeviceId mepDeviceId = mep.deviceId();
-                if (deviceService.getDevice(mepDeviceId) == null) {
-                    throw new CfmConfigException("Device not found " + mepDeviceId);
-                } else if (!deviceService.getDevice(mepDeviceId).is(CfmMepProgrammable.class)) {
-                    throw new CfmConfigException("Device " + mepDeviceId +
-                            " does not support CfmMepProgrammable behaviour.");
-                }
-
-                log.debug("Retrieving MEP reults for Mep {} in MD {}, MA {} on Device {}",
-                        mep.mepId(), mdName, maName, mepDeviceId);
-
-                return deviceService.getDevice(mepDeviceId)
-                        .as(CfmMepProgrammable.class).getMep(mdName, maName, mepId);
-            }
-        }
-        return null;
+    public Collection<Mep> getAllMepsByDevice(DeviceId deviceId) throws CfmConfigException {
+        return mepStore.getMepsByDeviceId(deviceId);
     }
 
     @Override
-    public boolean deleteMep(MdId mdName, MaIdShort maName, MepId mepId) throws CfmConfigException {
+    public MepEntry getMep(MdId mdName, MaIdShort maName, MepId mepId) throws CfmConfigException {
+        MepKeyId key = new MepKeyId(mdName, maName, mepId);
+
         //Will throw IllegalArgumentException if ma does not exist
         cfmMdService.getMaintenanceAssociation(mdName, maName);
 
-        for (Mep mep : mepCollection) {
-            if (mep.mdId().equals(mdName) && mep.maId().equals(maName)
-                    && mep.mepId().equals(mepId)) {
-                Device mepDevice = deviceService.getDevice(mep.deviceId());
-                if (mepDevice == null || !mepDevice.is(CfmMepProgrammable.class)) {
-                    throw new CfmConfigException("Unexpeced fault on device drier for "
-                            + mep.deviceId());
-                }
-                boolean deleted = false;
-                try {
-                     deleted = mepDevice.as(CfmMepProgrammable.class)
-                            .deleteMep(mdName, maName, mepId);
-                } catch (CfmConfigException e) {
-                    log.warn("MEP could not be deleted on device - perhaps it "
-                            + "does not exist. Continuing");
-                    mepCollection.remove(mep);
-                    return false;
-                }
-                if (deleted) {
-                    mepCollection.remove(mep);
-                    return true;
-                } else {
-                    return false;
+        Optional<Mep> mepOptional = mepStore.getMep(key);
+        if (mepOptional.isPresent()) {
+            Mep mep = mepOptional.get();
+            DeviceId mepDeviceId = mep.deviceId();
+            if (deviceService.getDevice(mepDeviceId) == null) {
+                throw new CfmConfigException("Device not found " + mepDeviceId);
+            } else if (!deviceService.getDevice(mepDeviceId).is(CfmMepProgrammable.class)) {
+                throw new CfmConfigException("Device " + mepDeviceId +
+                        " does not support CfmMepProgrammable behaviour.");
+            }
+
+            log.debug("Retrieving MEP reults for Mep {} in MD {}, MA {} on Device {}",
+                    mep.mepId(), mdName, maName, mepDeviceId);
+
+            return deviceService.getDevice(mepDeviceId)
+                    .as(CfmMepProgrammable.class).getMep(mdName, maName, mepId);
+        } else {
+            return null;
+        }
+    }
+
+    @Override
+    public boolean deleteMep(MdId mdName, MaIdShort maName, MepId mepId,
+                             Optional<MaintenanceDomain> oldMd) throws CfmConfigException {
+        MepKeyId key = new MepKeyId(mdName, maName, mepId);
+
+        //Will throw IllegalArgumentException if ma does not exist
+        cfmMdService.getMaintenanceAssociation(mdName, maName);
+
+        //Get the device ID from the MEP
+        Optional<Mep> deletedMep = mepStore.getMep(key);
+        if (!deletedMep.isPresent()) {
+            log.warn("MEP {} not found when deleting Mep", key);
+            return false;
+        }
+
+        DeviceId mepDeviceId = deletedMep.get().deviceId();
+        boolean deleted = mepStore.deleteMep(key);
+
+        Device mepDevice = deviceService.getDevice(mepDeviceId);
+        if (mepDevice == null || !mepDevice.is(CfmMepProgrammable.class)) {
+            throw new CfmConfigException("Unexpeced fault on device driver for "
+                    + mepDeviceId);
+        }
+        try {
+            deleted = mepDevice.as(CfmMepProgrammable.class)
+                    .deleteMep(mdName, maName, mepId, oldMd);
+        } catch (CfmConfigException e) {
+            log.warn("MEP could not be deleted on device - perhaps it "
+                    + "does not exist. Continuing");
+        }
+
+        //Iterate through all other devices and remove as a Remote Mep
+        int mepsOnMdCount = 0;
+        int mepsOnMaCount = 0;
+        for (Mep mep : mepStore.getAllMeps()) {
+            if (mep.deviceId().equals(mepDeviceId) && mdName.equals(mep.mdId())) {
+                mepsOnMdCount++;
+                if (maName.equals(mep.maId())) {
+                    mepsOnMaCount++;
                 }
             }
+            List<DeviceId> alreadyHandledDevices = new ArrayList<>();
+            if (mep.deviceId().equals(mepDeviceId) || !mep.mdId().equals(mdName) ||
+                    !mep.maId().equals(maName) ||
+                    alreadyHandledDevices.contains(mep.deviceId())) {
+                continue;
+            }
+            deviceService.getDevice(mep.deviceId())
+                    .as(CfmMepProgrammable.class)
+                    .deleteMaRemoteMepOnDevice(mdName, maName, mepId);
+            alreadyHandledDevices.add(mep.deviceId());
+            log.info("Deleted RMep entry on {} on device {}",
+                    mdName.mdName() + "/" + maName.maName(), mep.deviceId());
         }
-        return false;
+
+        //Also if this is the last MEP in this MA then delete this MA from device
+        //If this is the last MA in this MD on device, then delete the MD from the device
+        if (mepsOnMdCount == 0) {
+            boolean deletedMd = deviceService.getDevice(mepDeviceId)
+                    .as(CfmMepProgrammable.class).deleteMdOnDevice(mdName, oldMd);
+            log.info("Deleted MD {} from Device {}", mdName.mdName(), mepDeviceId);
+        } else if (mepsOnMaCount == 0) {
+            boolean deletedMa = deviceService.getDevice(mepDeviceId)
+                    .as(CfmMepProgrammable.class).deleteMaOnDevice(mdName, maName, oldMd);
+            log.info("Deleted MA {} from Device {}",
+                    mdName.mdName() + "/" + maName.maName(), mepDeviceId);
+        }
+
+        return deleted;
     }
 
     @Override
     public boolean createMep(MdId mdName, MaIdShort maName, Mep newMep) throws CfmConfigException {
+        MepKeyId key = new MepKeyId(mdName, maName, newMep.mepId());
         log.debug("Creating MEP " + newMep.mepId() + " on MD {}, MA {} on Device {}",
                 mdName, maName, newMep.deviceId().toString());
-        for (Mep mep : mepCollection) {
-            if (mep.mdId().equals(mdName) && mep.maId().equals(maName)
-                    && mep.mepId().equals(newMep.mepId())) {
-                return false;
-            }
+        if (mepStore.getMep(key).isPresent()) {
+            return false;
         }
 
         //Will throw IllegalArgumentException if ma does not exist
@@ -225,7 +306,24 @@
                 deviceService.getDevice(mepDeviceId).as(CfmMepProgrammable.class).createMep(mdName, maName, newMep);
         log.debug("MEP created on {}", mepDeviceId);
         if (deviceResult) {
-            return mepCollection.add(newMep);
+            boolean alreadyExisted = mepStore.createUpdateMep(key, newMep);
+
+            //Add to other Remote Mep List on other devices
+            for (Mep mep:mepStore.getMepsByMdMa(mdName, maName)) {
+                List<DeviceId> alreadyHandledDevices = new ArrayList<>();
+                if (mep.deviceId().equals(mepDeviceId) ||
+                        alreadyHandledDevices.contains(mep.deviceId())) {
+                    continue;
+                }
+                boolean created = deviceService.getDevice(mep.deviceId())
+                        .as(CfmMepProgrammable.class)
+                        .createMaRemoteMepOnDevice(mdName, maName, newMep.mepId());
+                alreadyHandledDevices.add(mep.deviceId());
+                log.info("Created RMep entry on {} on device {}",
+                        mdName.mdName() + "/" + maName.maName(), mep.deviceId());
+            }
+
+            return !alreadyExisted;
         } else {
             return deviceResult;
         }
@@ -233,60 +331,253 @@
 
     @Override
     public void transmitLoopback(MdId mdName, MaIdShort maName,
-            MepId mepId, MepLbCreate lbCreate) throws CfmConfigException {
-        for (Mep mep : mepCollection) {
-            if (mep.mdId().equals(mdName) && mep.maId().equals(maName)
-                    && mep.mepId().equals(mepId)) {
-                log.debug("Transmitting Loopback on MEP {}/{}/{} on Device {}",
-                        mdName, maName, mepId, mep.deviceId());
-                deviceService.getDevice(mep.deviceId())
-                    .as(CfmMepProgrammable.class)
-                    .transmitLoopback(mdName, maName, mepId, lbCreate);
-                return;
-            }
-        }
-        throw new CfmConfigException("Mep " + mdName + "/" + maName + "/"
-                + mepId + " not found when calling Transmit Loopback");
+                                 MepId mepId, MepLbCreate lbCreate) throws CfmConfigException {
+        MepKeyId key = new MepKeyId(mdName, maName, mepId);
+        Mep mep = mepStore.getMep(key)
+                .orElseThrow(() -> new CfmConfigException("Mep " + mdName + "/" + maName + "/"
+                + mepId + " not found when calling Transmit Loopback"));
+
+        log.debug("Transmitting Loopback on MEP {} on Device {}",
+                key, mep.deviceId());
+        deviceService.getDevice(mep.deviceId())
+                .as(CfmMepProgrammable.class)
+                .transmitLoopback(mdName, maName, mepId, lbCreate);
     }
 
     @Override
     public void abortLoopback(MdId mdName, MaIdShort maName, MepId mepId)
             throws CfmConfigException {
-        for (Mep mep : mepCollection) {
-            if (mep.mdId().equals(mdName) && mep.maId().equals(maName)
-                    && mep.mepId().equals(mepId)) {
-                log.debug("Aborting Loopback on MEP {}/{}/{} on Device {}",
-                        mdName, maName, mepId, mep.deviceId());
-                deviceService.getDevice(mep.deviceId())
-                    .as(CfmMepProgrammable.class)
-                    .abortLoopback(mdName, maName, mepId);
-                return;
-            }
-        }
-        throw new CfmConfigException("Mep " + mdName + "/" + maName + "/"
-                + mepId + " not found when calling Transmit Loopback");
+
+        MepKeyId key = new MepKeyId(mdName, maName, mepId);
+        Mep mep = mepStore.getMep(key)
+                .orElseThrow(() -> new CfmConfigException("Mep " + mdName + "/" + maName + "/"
+                        + mepId + " not found when calling Aborting Loopback"));
+
+        log.debug("Aborting Loopback on MEP {} on Device {}",
+                key, mep.deviceId());
+        deviceService.getDevice(mep.deviceId())
+                .as(CfmMepProgrammable.class)
+                .abortLoopback(mdName, maName, mepId);
     }
 
     @Override
     public void transmitLinktrace(MdId mdName, MaIdShort maName, MepId mepId,
-            MepLtCreate ltCreate) {
+                                  MepLtCreate ltCreate) {
         throw new UnsupportedOperationException("Not yet implemented");
     }
 
+    private class InternalMdListener implements MdListener {
+        @Override
+        public boolean isRelevant(MdEvent event) {
+            return event.type().equals(MdEvent.Type.MD_REMOVED) ||
+                    event.type().equals(MdEvent.Type.MA_REMOVED);
+        }
+
+        @Override
+        public void event(MdEvent event) {
+            MdId mdName = event.subject();
+            switch (event.type()) {
+                case MA_REMOVED:
+                case MD_REMOVED:
+                    log.trace("Event {} receieved from MD Service for {}", event.type(), mdName);
+                    scheduleEventHandlerIfNotScheduled(event);
+                    break;
+                default:
+                    log.warn("Unhandled Event {} received from MD Service", event.type());
+                    break;
+            }
+        }
+    }
+
     private class InternalDeviceListener implements DeviceListener {
         @Override
+        public boolean isRelevant(DeviceEvent event) {
+            return event.type().equals(DeviceEvent.Type.DEVICE_REMOVED);
+        }
+
+        @Override
         public void event(DeviceEvent event) {
+            DeviceId deviceId = event.subject().id();
             switch (event.type()) {
+                case DEVICE_ADDED:
+                case PORT_UPDATED:
+                case PORT_ADDED:
+                case DEVICE_UPDATED:
                 case DEVICE_REMOVED:
                 case DEVICE_AVAILABILITY_CHANGED:
-                    DeviceId deviceId = event.subject().id();
-                    if (!deviceService.isAvailable(deviceId)) {
-                        log.warn("Device {} has been removed or changed", deviceId);
-                    }
+                    log.trace("Event {} received from Device Service", event.type());
+                    scheduleEventHandlerIfNotScheduled(event);
                     break;
                 default:
+                    log.warn("Unhandled Event {} received from Device Service", event.type());
                     break;
             }
         }
     }
+
+    @SuppressWarnings("rawtypes")
+    private void scheduleEventHandlerIfNotScheduled(Event event) {
+        synchronized (THREAD_SCHED_LOCK) {
+            eventQueue.add(event);
+            numOfEventsQueued++;
+
+            if ((numOfHandlerScheduled - numOfHandlerExecution) == 0) {
+                //No pending scheduled event handling threads. So start a new one.
+                eventHandlerFuture = executorService
+                        .schedule(eventHandler, 100, TimeUnit.MILLISECONDS);
+                numOfHandlerScheduled++;
+            }
+            log.trace("numOfEventsQueued {}, numOfEventHandlerScheduled {}",
+                    numOfEventsQueued,
+                    numOfHandlerScheduled);
+        }
+    }
+
+    private class InternalEventHandler implements Runnable {
+        @Override
+        public void run() {
+            try {
+                while (true) {
+                    @SuppressWarnings("rawtypes")
+                    Event event;
+                    synchronized (THREAD_SCHED_LOCK) {
+                        if (!eventQueue.isEmpty()) {
+                            event = eventQueue.poll();
+                            numOfEventsExecuted++;
+                        } else {
+                            numOfHandlerExecution++;
+                            log.debug("numOfHandlerExecution {} numOfEventsExecuted {}",
+                                    numOfHandlerExecution, numOfEventsExecuted);
+                            break;
+                        }
+                    }
+                    if (event.type() == DeviceEvent.Type.DEVICE_REMOVED) {
+                        DeviceId deviceId = ((Device) event.subject()).id();
+                        log.info("Processing device removal event for unavailable device {}",
+                                deviceId);
+                        processDeviceRemoved((Device) event.subject());
+                    } else if (event.type() == MdEvent.Type.MD_REMOVED) {
+                        MdId mdName = (MdId) event.subject();
+                        log.info("Processing MD removal event for MD {}",
+                                mdName);
+                        processMdRemoved(mdName, ((MdEvent) event).md().get());
+                    } else if (event.type() == MdEvent.Type.MA_REMOVED) {
+                        MdId mdName = (MdId) event.subject();
+                        MaIdShort maName = ((MdEvent) event).maId().get();
+                        log.info("Processing MA removal event for MA {}",
+                                mdName.mdName() + "/" + maName.maName());
+                        processMaRemoved(mdName, maName, ((MdEvent) event).md().get());
+                    }
+                }
+            } catch (Exception e) {
+                log.error("CfmMepService event handler "
+                        + "thread thrown an exception: {}", e);
+            }
+        }
+    }
+
+    /**
+     * This removes a MEP from the internal list of Meps, and updates remote meps list on other Meps.
+     * Note: This does not call the device's CfmMepProgrammable, because there
+     * would be no point as the device has already been removed from ONOS.
+     * The configuration for this MEP may still be present on the actual device, and
+     * any future config would have to be careful to wipe the Mep from the device
+     * before applying a Mep again
+     * @param removedDevice The device that has been removed
+     */
+    protected void processDeviceRemoved(Device removedDevice) {
+        log.warn("Remove Mep(s) associated with Device: " + removedDevice.id());
+        Collection<Mep> mepListForDevice = mepStore.getMepsByDeviceId(removedDevice.id());
+
+
+        for (Mep mep:mepStore.getAllMeps()) {
+            for (Mep mepForDevice:mepListForDevice) {
+                if (mep.mdId().equals(mepForDevice.mdId()) && mep.maId().equals(mepForDevice.maId())) {
+                    Device mepDevice = deviceService.getDevice(mep.deviceId());
+                    log.info("Removing Remote Mep {} from MA{} on device {}",
+                            mepForDevice.mepId(),
+                            mep.mdId().mdName() + "/" + mep.maId().maName(),
+                            mepDevice.id());
+                    try {
+                        mepDevice.as(CfmMepProgrammable.class)
+                                .deleteMaRemoteMepOnDevice(mep.mdId(), mep.maId(), mepForDevice.mepId());
+                    } catch (CfmConfigException e) {
+                        log.error("Error when removing Remote Mep {} from MA {}. Continuing.",
+                                mep.mdId().mdName() + "/" + mep.maId().maName(),
+                                mepForDevice.mepId());
+                    }
+                }
+            }
+        }
+
+        for (Iterator<Mep> iter = mepListForDevice.iterator(); iter.hasNext();) {
+            mepStore.deleteMep(new MepKeyId(iter.next()));
+        }
+    }
+
+    protected void processMaRemoved(MdId mdId, MaIdShort maId, MaintenanceDomain oldMd) {
+        Set<DeviceId> deviceIdsRemoved = new HashSet<>();
+
+        for (Iterator<Mep> iter = mepStore.getMepsByMdMa(mdId, maId).iterator(); iter.hasNext();) {
+            Mep mepForMdMa = iter.next();
+            DeviceId mepDeviceId = mepForMdMa.deviceId();
+            try {
+                deviceService.getDevice(mepDeviceId).as(CfmMepProgrammable.class)
+                        .deleteMep(mdId, maId, mepForMdMa.mepId(), Optional.of(oldMd));
+                deviceIdsRemoved.add(mepDeviceId);
+            } catch (CfmConfigException e) {
+                log.warn("Could not delete MEP {} from Device {}", mepForMdMa.mepId(), mepDeviceId, e);
+            }
+            iter.remove();
+
+            log.info("Removed MEP {} from Device {} because MA {} was deleted",
+                    mepForMdMa.mepId(), mepDeviceId, mdId.mdName() + "/" + maId.maName());
+        }
+
+        deviceIdsRemoved.forEach(deviceId -> {
+            try {
+                deviceService.getDevice(deviceId).as(CfmMepProgrammable.class)
+                        .deleteMaOnDevice(mdId, maId, Optional.of(oldMd));
+            } catch (CfmConfigException e) {
+                log.warn("Could not delete MA {} from Device {}",
+                        mdId.mdName() + "/" + maId.maName(), deviceId, e);
+            }
+        });
+    }
+
+    protected void processMdRemoved(MdId mdId, MaintenanceDomain oldMd) {
+        Set<DeviceId> deviceIdsRemoved = new HashSet<>();
+        for (Iterator<Mep> iter = mepStore.getMepsByMd(mdId).iterator(); iter.hasNext();) {
+            Mep mep = iter.next();
+            DeviceId mepDeviceId = mep.deviceId();
+            try {
+                deviceService.getDevice(mepDeviceId).as(CfmMepProgrammable.class)
+                        .deleteMep(mdId, mep.maId(), mep.mepId(), Optional.of(oldMd));
+                deviceIdsRemoved.add(mepDeviceId);
+            } catch (CfmConfigException e) {
+                log.warn("Could not delete MEP {} from Device {}", mep.mepId(), mepDeviceId, e);
+            }
+            iter.remove();
+            log.info("Removed MEP {} from Device {} because MD {} was deleted",
+                    mep.mepId(), mepDeviceId, mdId.mdName());
+        }
+
+        deviceIdsRemoved.forEach(deviceId -> {
+            try {
+                deviceService.getDevice(deviceId).as(CfmMepProgrammable.class)
+                        .deleteMdOnDevice(mdId, Optional.of(oldMd));
+            } catch (CfmConfigException e) {
+                log.warn("Could not delete MD {} from Device {}",
+                        mdId.mdName(), deviceId, e);
+            }
+        });
+    }
+
+    private class InternalStoreDelegate implements MepStoreDelegate {
+        @Override
+        public void notify(CfmMepEvent event) {
+            log.debug("New Mep event: {}", event);
+            eventDispatcher.post(event);
+        }
+    }
 }
diff --git a/incubator/net/src/main/java/org/onosproject/incubator/net/l2monitoring/cfm/impl/DistributedMdStore.java b/incubator/net/src/main/java/org/onosproject/incubator/net/l2monitoring/cfm/impl/DistributedMdStore.java
index 964a56a..82c550d 100644
--- a/incubator/net/src/main/java/org/onosproject/incubator/net/l2monitoring/cfm/impl/DistributedMdStore.java
+++ b/incubator/net/src/main/java/org/onosproject/incubator/net/l2monitoring/cfm/impl/DistributedMdStore.java
@@ -19,6 +19,7 @@
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.felix.scr.annotations.Activate;
 import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
@@ -42,6 +43,7 @@
 import org.onosproject.incubator.net.l2monitoring.cfm.identifier.MdIdMacUint;
 import org.onosproject.incubator.net.l2monitoring.cfm.identifier.MdIdNone;
 import org.onosproject.incubator.net.l2monitoring.cfm.identifier.MepId;
+import org.onosproject.incubator.net.l2monitoring.cfm.service.CfmConfigException;
 import org.onosproject.incubator.net.l2monitoring.cfm.service.MdEvent;
 import org.onosproject.incubator.net.l2monitoring.cfm.service.MdStore;
 import org.onosproject.incubator.net.l2monitoring.cfm.service.MdStoreDelegate;
@@ -58,6 +60,8 @@
 import java.util.Collection;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * Maintenance Domain Store implementation backed by consistent map.
@@ -75,7 +79,7 @@
     private ConsistentMap<MdId, MaintenanceDomain> maintenanceDomainConsistentMap;
     private Map<MdId, MaintenanceDomain> maintenanceDomainMap;
 
-    private final InternalMdListener listener = new InternalMdListener();
+    private MapEventListener<MdId, MaintenanceDomain> mapListener = null;
 
     @Activate
     public void activate() {
@@ -112,8 +116,17 @@
                                             .cfm.Component.TagType.class)
                         .build("md")))
                 .build();
+        mapListener = new InternalMdListener();
+        maintenanceDomainConsistentMap.addListener(mapListener);
 
         maintenanceDomainMap = maintenanceDomainConsistentMap.asJavaMap();
+        log.info("MDStore started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        maintenanceDomainConsistentMap.removeListener(mapListener);
+        log.info("Stopped");
     }
 
     @Override
@@ -141,19 +154,65 @@
         @Override
         public void event(MapEvent<MdId, MaintenanceDomain> mapEvent) {
             final MdEvent.Type type;
+            MaIdShort maId = null;
             switch (mapEvent.type()) {
                 case INSERT:
                     type = MdEvent.Type.MD_ADDED;
                     break;
                 case UPDATE:
-                    type = MdEvent.Type.MD_UPDATED;
+                    // Examine the diff to see if it was a removal or addition of an MA caused it
+                    if (mapEvent.oldValue().value().maintenanceAssociationList().size() >
+                            mapEvent.newValue().value().maintenanceAssociationList().size()) {
+                        Set<MaIdShort> newMaIds = mapEvent.newValue().value().maintenanceAssociationList()
+                                .stream()
+                                .map(MaintenanceAssociation::maId)
+                                .collect(Collectors.toSet());
+                        Optional<MaintenanceAssociation> removedMa =
+                                mapEvent.oldValue().value().maintenanceAssociationList()
+                                        .stream()
+                                        .filter(maOld -> !newMaIds.contains(maOld.maId())).findFirst();
+                        if (removedMa.isPresent()) {
+                            maId = removedMa.get().maId();
+                        }
+                        type = MdEvent.Type.MA_REMOVED;
+                    } else if (mapEvent.oldValue().value().maintenanceAssociationList().size() <
+                        mapEvent.newValue().value().maintenanceAssociationList().size()) {
+                        Set<MaIdShort> oldMaIds = mapEvent.oldValue().value().maintenanceAssociationList()
+                                .stream()
+                                .map(MaintenanceAssociation::maId)
+                                .collect(Collectors.toSet());
+                        Optional<MaintenanceAssociation> addedMa =
+                                mapEvent.newValue().value().maintenanceAssociationList()
+                                        .stream()
+                                        .filter(maNew -> !oldMaIds.contains(maNew.maId())).findFirst();
+                        if (addedMa.isPresent()) {
+                            maId = addedMa.get().maId();
+                        }
+                        type = MdEvent.Type.MA_ADDED;
+                    } else {
+                        type = MdEvent.Type.MD_UPDATED;
+                    }
                     break;
                 case REMOVE:
                 default:
                     type = MdEvent.Type.MD_REMOVED;
                     break;
             }
-            notifyDelegate(new MdEvent(type, mapEvent.key()));
+            if (mapEvent.oldValue() != null && mapEvent.oldValue().value() != null) {
+                MaintenanceDomain oldMd = mapEvent.oldValue().value();
+                try {
+                    if (maId != null) {
+                        notifyDelegate(new MdEvent(type, mapEvent.key(), oldMd, maId));
+                    } else {
+                        notifyDelegate(new MdEvent(type, mapEvent.key(), oldMd));
+                    }
+                } catch (CfmConfigException e) {
+                    log.warn("Unable to copy MD {}", oldMd);
+                    notifyDelegate(new MdEvent(type, mapEvent.key()));
+                }
+            } else {
+                notifyDelegate(new MdEvent(type, mapEvent.key()));
+            }
         }
     }
 }
diff --git a/incubator/net/src/main/java/org/onosproject/incubator/net/l2monitoring/cfm/impl/DistributedMepStore.java b/incubator/net/src/main/java/org/onosproject/incubator/net/l2monitoring/cfm/impl/DistributedMepStore.java
new file mode 100644
index 0000000..c21166a
--- /dev/null
+++ b/incubator/net/src/main/java/org/onosproject/incubator/net/l2monitoring/cfm/impl/DistributedMepStore.java
@@ -0,0 +1,193 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.incubator.net.l2monitoring.cfm.impl;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.VlanId;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.incubator.net.l2monitoring.cfm.DefaultMep;
+import org.onosproject.incubator.net.l2monitoring.cfm.Mep;
+import org.onosproject.incubator.net.l2monitoring.cfm.identifier.MaId2Octet;
+import org.onosproject.incubator.net.l2monitoring.cfm.identifier.MaIdCharStr;
+import org.onosproject.incubator.net.l2monitoring.cfm.identifier.MaIdIccY1731;
+import org.onosproject.incubator.net.l2monitoring.cfm.identifier.MaIdPrimaryVid;
+import org.onosproject.incubator.net.l2monitoring.cfm.identifier.MaIdRfc2685VpnId;
+import org.onosproject.incubator.net.l2monitoring.cfm.identifier.MaIdShort;
+import org.onosproject.incubator.net.l2monitoring.cfm.identifier.MdId;
+import org.onosproject.incubator.net.l2monitoring.cfm.identifier.MdIdCharStr;
+import org.onosproject.incubator.net.l2monitoring.cfm.identifier.MdIdDomainName;
+import org.onosproject.incubator.net.l2monitoring.cfm.identifier.MdIdMacUint;
+import org.onosproject.incubator.net.l2monitoring.cfm.identifier.MdIdNone;
+import org.onosproject.incubator.net.l2monitoring.cfm.identifier.MepId;
+import org.onosproject.incubator.net.l2monitoring.cfm.identifier.MepKeyId;
+import org.onosproject.incubator.net.l2monitoring.cfm.service.CfmMepEvent;
+import org.onosproject.incubator.net.l2monitoring.cfm.service.MepStore;
+import org.onosproject.incubator.net.l2monitoring.cfm.service.MepStoreDelegate;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.NetworkResource;
+import org.onosproject.net.PortNumber;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * MEP Store implementation backed by consistent map.
+ */
+@Component(immediate = true)
+@Service
+public class DistributedMepStore extends AbstractStore<CfmMepEvent, MepStoreDelegate>
+    implements MepStore {
+
+    private final Logger log = LoggerFactory.getLogger(getClass());
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected StorageService storageService;
+
+    private ConsistentMap<MepKeyId, Mep> mepConsistentMap;
+    private Map<MepKeyId, Mep> mepMap;
+
+    private MapEventListener<MepKeyId, Mep> mapListener = null;
+
+    @Activate
+    public void activate() {
+        mepConsistentMap = storageService
+                .<MepKeyId, Mep>consistentMapBuilder()
+                .withName("onos-cfm-mep-map")
+                .withSerializer(Serializer.using(new KryoNamespace.Builder()
+                        .register(KryoNamespaces.API)
+                        .register(DefaultMep.class)
+                        .register(MepId.class)
+                        .register(MepKeyId.class)
+                        .register(NetworkResource.class)
+                        .register(DeviceId.class)
+                        .register(PortNumber.class)
+                        .register(Mep.MepDirection.class)
+                        .register(VlanId.class)
+                        .register(Mep.Priority.class)
+                        .register(Mep.FngAddress.class)
+                        .register(Mep.FngAddressType.class)
+                        .register(IpAddress.class)
+                        .register(Mep.LowestFaultDefect.class)
+                        .register(Duration.class)
+                        .register(MdIdCharStr.class)
+                        .register(MdIdDomainName.class)
+                        .register(MdIdMacUint.class)
+                        .register(MdIdNone.class)
+                        .register(MaIdCharStr.class)
+                        .register(MaIdShort.class)
+                        .register(MaId2Octet.class)
+                        .register(MaIdIccY1731.class)
+                        .register(MaIdPrimaryVid.class)
+                        .register(MaIdRfc2685VpnId.class)
+                        .build("mep")))
+                .build();
+        mapListener = new InternalMepListener();
+        mepConsistentMap.addListener(mapListener);
+
+        mepMap = mepConsistentMap.asJavaMap();
+        log.info("MepStore started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+        mepConsistentMap.removeListener(mapListener);
+        log.info("MepStore stopped");
+    }
+
+    @Override
+    public Collection<Mep> getAllMeps() {
+        return mepMap.values();
+    }
+
+    @Override
+    public Collection<Mep> getMepsByMd(MdId mdName) {
+        return mepMap.values().stream()
+                .filter(mep -> mep.mdId().equals(mdName))
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public Collection<Mep> getMepsByMdMa(MdId mdName, MaIdShort maName) {
+        return mepMap.values().stream()
+                .filter(mep -> mep.mdId().equals(mdName) && mep.maId().equals(maName))
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public Collection<Mep> getMepsByDeviceId(DeviceId deviceId) {
+        return mepMap.values().stream()
+                .filter(mep -> mep.deviceId().equals(deviceId))
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public Optional<Mep> getMep(MepKeyId mepKeyId) {
+        return mepMap.values().stream()
+                .filter(mep -> mep.mdId().equals(mepKeyId.mdId()) &&
+                        mep.maId().equals(mepKeyId.maId()) &&
+                        mep.mepId().equals(mepKeyId.mepId()))
+                .findFirst();
+    }
+
+    @Override
+    public boolean deleteMep(MepKeyId mepKeyId) {
+        return mepMap.remove(mepKeyId) == null ? false : true;
+    }
+
+    @Override
+    public boolean createUpdateMep(MepKeyId mepKeyId, Mep mep) {
+        return mepMap.put(mepKeyId, mep) == null ? false : true;
+    }
+
+    private class InternalMepListener implements MapEventListener<MepKeyId, Mep> {
+
+        @Override
+        public void event(MapEvent<MepKeyId, Mep> mapEvent) {
+            final CfmMepEvent.Type type;
+
+            switch (mapEvent.type()) {
+                case INSERT:
+                    type = CfmMepEvent.Type.MEP_ADDED;
+                    break;
+                case UPDATE:
+                    type = CfmMepEvent.Type.MEP_UPDATED;
+                    break;
+                default:
+                case REMOVE:
+                    type = CfmMepEvent.Type.MEP_REMOVED;
+            }
+            notifyDelegate(new CfmMepEvent(type, mapEvent.key()));
+        }
+    }
+}