Added Meter object accounting

Change-Id: I2a3b88ffd1756b738e197943b3e02771f5729e45
diff --git a/core/api/src/main/java/org/onosproject/net/meter/MeterStore.java b/core/api/src/main/java/org/onosproject/net/meter/MeterStore.java
index c5d57d5..5112a4a 100644
--- a/core/api/src/main/java/org/onosproject/net/meter/MeterStore.java
+++ b/core/api/src/main/java/org/onosproject/net/meter/MeterStore.java
@@ -80,4 +80,11 @@
      * @param reason a failure reason
      */
     void failedMeter(MeterOperation op, MeterFailReason reason);
+
+    /**
+     * Delete this meter immediately.
+     * @param m a meter
+     */
+    void deleteMeterNow(Meter m);
+
 }
diff --git a/incubator/net/src/main/java/org/onosproject/incubator/net/meter/impl/MeterManager.java b/incubator/net/src/main/java/org/onosproject/incubator/net/meter/impl/MeterManager.java
index 4e75301..b48a25b 100644
--- a/incubator/net/src/main/java/org/onosproject/incubator/net/meter/impl/MeterManager.java
+++ b/incubator/net/src/main/java/org/onosproject/incubator/net/meter/impl/MeterManager.java
@@ -45,6 +45,8 @@
 import org.slf4j.Logger;
 
 import java.util.Collection;
+import java.util.Map;
+import java.util.stream.Collectors;
 
 import static org.slf4j.LoggerFactory.getLogger;
 
@@ -170,7 +172,25 @@
 
         @Override
         public void pushMeterMetrics(DeviceId deviceId, Collection<Meter> meterEntries) {
-            meterEntries.forEach(m -> store.updateMeterState(m));
+            //FIXME: FOLLOWING CODE CANNOT BE TESTED UNTIL SOMETHING THAT
+            //FIXME: IMPLEMENTS METERS EXISTS
+            Map<MeterId, Meter> storedMeterMap = store.getAllMeters().stream()
+                    .collect(Collectors.toMap(Meter::id, m -> m));
+
+            meterEntries.stream()
+                    .filter(m -> storedMeterMap.remove(m.id()) != null)
+                    .forEach(m -> store.updateMeterState(m));
+
+            storedMeterMap.values().stream().forEach(m -> {
+                if (m.state() == MeterState.PENDING_ADD) {
+                    provider().performMeterOperation(m.deviceId(),
+                                                     new MeterOperation(m,
+                                                                        MeterOperation.Type.ADD,
+                                                                        null));
+                } else {
+                    store.deleteMeterNow(m);
+                }
+            });
         }
     }
 
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java
index bc8456e..f343d83 100644
--- a/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java
@@ -198,6 +198,12 @@
                 new MeterData(v.meter(), reason, v.origin()));
     }
 
+    @Override
+    public void deleteMeterNow(Meter m) {
+        futures.remove(m.id());
+        meters.remove(m.id());
+    }
+
     private class InternalMapEventListener implements MapEventListener<MeterId, MeterData> {
         @Override
         public void event(MapEvent<MeterId, MeterData> event) {
@@ -217,12 +223,12 @@
                                 } else if (data.reason().isPresent() && local.equals(data.origin())) {
                                     MeterStoreResult msr = MeterStoreResult.fail(data.reason().get());
                                     //TODO: No future -> no friend
-                                    futures.remove(data.meter().id()).complete(msr);
+                                    futures.get(data.meter().id()).complete(msr);
                                 }
                                 break;
                             case ADDED:
                             case REMOVED:
-                                if (local.equals(data.origin())) {
+                                if (local.equals(data.origin()) && data.meter().state() == MeterState.PENDING_REMOVE) {
                                     futures.remove(data.meter().id()).complete(MeterStoreResult.success());
                                 }
                                 break;
diff --git a/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java b/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
index 4ef4d81..26cfb6d 100644
--- a/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
+++ b/openflow/ctl/src/main/java/org/onosproject/openflow/controller/impl/OpenFlowControllerImpl.java
@@ -306,6 +306,9 @@
                 case PORT:
                     executorMsgs.submit(new OFMessageHandler(dpid, reply));
                     break;
+                case METER:
+                    executorMsgs.submit(new OFMessageHandler(dpid, reply));
+                    break;
                 case EXPERIMENTER:
                     if (reply instanceof OFCalientFlowStatsReply) {
                         // Convert Calient flow statistics to regular flow stats
@@ -353,6 +356,7 @@
                     }
                     break;
                 default:
+                    log.warn("Discarding unknown stats reply type {}", reply.getStatsType());
                     break;
             }
             break;