Initial implementation of Meter Service (needs testing)

Change-Id: Ie07bd3e2bd7c67a6499c965d8926eb361ad16462

store impl started

Change-Id: Ib8b474f40dcecff335a421c12ad149fe9830c427

full implementation

Change-Id: Ie59fd61d02972bd04d887bdcca9745793b880681
diff --git a/incubator/api/src/main/java/org/onosproject/incubator/net/meter/Band.java b/incubator/api/src/main/java/org/onosproject/incubator/net/meter/Band.java
index ecd6504..1342a22 100644
--- a/incubator/api/src/main/java/org/onosproject/incubator/net/meter/Band.java
+++ b/incubator/api/src/main/java/org/onosproject/incubator/net/meter/Band.java
@@ -69,6 +69,20 @@
      */
     Type type();
 
+    /**
+     * Returns the packets seen by this band.
+     *
+     * @return a long value
+     */
+    long packets();
+
+    /**
+     * Return the bytes seen by this band.
+     *
+     * @return a byte counter
+     */
+    long bytes();
+
     interface Builder {
 
         /**
diff --git a/incubator/api/src/main/java/org/onosproject/incubator/net/meter/BandEntry.java b/incubator/api/src/main/java/org/onosproject/incubator/net/meter/BandEntry.java
new file mode 100644
index 0000000..369e1cc
--- /dev/null
+++ b/incubator/api/src/main/java/org/onosproject/incubator/net/meter/BandEntry.java
@@ -0,0 +1,37 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.incubator.net.meter;
+
+/**
+ * Represents a stored band.
+ */
+public interface BandEntry extends Band {
+
+    /**
+     * Sets the number of packets seen by this band.
+     *
+     * @param packets a packet count
+     */
+    void setPackets(long packets);
+
+    /**
+     * Sets the number of bytes seen by this band.
+     *
+     * @param bytes a byte counter
+     */
+    void setBytes(long bytes);
+
+}
diff --git a/incubator/api/src/main/java/org/onosproject/incubator/net/meter/DefaultBand.java b/incubator/api/src/main/java/org/onosproject/incubator/net/meter/DefaultBand.java
index bff757b..669dc0d 100644
--- a/incubator/api/src/main/java/org/onosproject/incubator/net/meter/DefaultBand.java
+++ b/incubator/api/src/main/java/org/onosproject/incubator/net/meter/DefaultBand.java
@@ -20,12 +20,14 @@
 /**
  * A default implementation for a Band.
  */
-public final class DefaultBand implements Band {
+public final class DefaultBand implements Band, BandEntry {
 
     private final Type type;
     private final long rate;
     private final long burstSize;
     private final short prec;
+    private long packets;
+    private long bytes;
 
     public DefaultBand(Type type, long rate,
                        long burstSize, short prec) {
@@ -55,6 +57,26 @@
         return type;
     }
 
+    @Override
+    public long packets() {
+        return packets;
+    }
+
+    @Override
+    public long bytes() {
+        return bytes;
+    }
+
+    @Override
+    public void setPackets(long packets) {
+        this.packets = packets;
+    }
+
+    @Override
+    public void setBytes(long bytes) {
+        this.bytes = bytes;
+    }
+
     public static Builder builder() {
         return new Builder();
     }
@@ -91,7 +113,7 @@
         }
 
         @Override
-        public Band build() {
+        public DefaultBand build() {
             checkArgument(prec != null && type == Type.REMARK,
                           "Only REMARK bands can have a precendence.");
 
diff --git a/incubator/api/src/main/java/org/onosproject/incubator/net/meter/DefaultMeter.java b/incubator/api/src/main/java/org/onosproject/incubator/net/meter/DefaultMeter.java
index 953feb2..138bb18 100644
--- a/incubator/api/src/main/java/org/onosproject/incubator/net/meter/DefaultMeter.java
+++ b/incubator/api/src/main/java/org/onosproject/incubator/net/meter/DefaultMeter.java
@@ -28,7 +28,7 @@
 /**
  * A default implementation of a meter.
  */
-public final class DefaultMeter implements Meter {
+public final class DefaultMeter implements Meter, MeterEntry  {
 
 
     private final MeterId id;
@@ -39,6 +39,12 @@
     private final DeviceId deviceId;
     private final Optional<MeterContext> context;
 
+    private MeterState state;
+    private long life;
+    private long refCount;
+    private long packets;
+    private long bytes;
+
     private DefaultMeter(DeviceId deviceId, MeterId id, ApplicationId appId,
                         Unit unit, boolean burst,
                         Collection<Band> bands, Optional<MeterContext> context) {
@@ -86,10 +92,60 @@
         return null;
     }
 
+    @Override
+    public MeterState state() {
+        return state;
+    }
+
+    @Override
+    public long life() {
+        return life;
+    }
+
+    @Override
+    public long referenceCount() {
+        return refCount;
+    }
+
+    @Override
+    public long packetsSeen() {
+        return packets;
+    }
+
+    @Override
+    public long bytesSeen() {
+        return bytes;
+    }
+
     public static Builder builder() {
         return new Builder();
     }
 
+    @Override
+    public void setState(MeterState state) {
+        this.state = state;
+    }
+
+    @Override
+    public void setLife(long life) {
+        this.life = life;
+    }
+
+    @Override
+    public void setReferenceCount(long count) {
+        this.refCount = count;
+    }
+
+    @Override
+    public void setProcessedPackets(long packets) {
+        this.packets = packets;
+    }
+
+    @Override
+    public void setProcessedBytes(long bytes) {
+        this.bytes = bytes;
+    }
+
     public static final class Builder implements Meter.Builder {
 
         private MeterId id;
@@ -108,7 +164,7 @@
         }
 
         @Override
-        public Meter.Builder withId(int id) {
+        public Meter.Builder withId(long id) {
             this.id = MeterId.meterId(id);
             return this;
         }
@@ -144,7 +200,7 @@
         }
 
         @Override
-        public Meter build() {
+        public DefaultMeter build() {
             checkNotNull(deviceId, "Must specify a device");
             checkNotNull(bands, "Must have bands.");
             checkArgument(bands.size() > 0, "Must have at least one band.");
diff --git a/incubator/api/src/main/java/org/onosproject/incubator/net/meter/Meter.java b/incubator/api/src/main/java/org/onosproject/incubator/net/meter/Meter.java
index e70dd88..14593df 100644
--- a/incubator/api/src/main/java/org/onosproject/incubator/net/meter/Meter.java
+++ b/incubator/api/src/main/java/org/onosproject/incubator/net/meter/Meter.java
@@ -89,6 +89,41 @@
     Optional<MeterContext> context();
 
     /**
+     * Fetches the state of this meter.
+     *
+     * @return a meter state
+     */
+    MeterState state();
+
+    /**
+     * The lifetime in seconds of this meter.
+     *
+     * @return number of seconds
+     */
+    long life();
+
+    /**
+     * The number of flows pointing to this meter.
+     *
+     * @return a reference count
+     */
+    long referenceCount();
+
+    /**
+     * Number of packets processed by this meter.
+     *
+     * @return a packet count
+     */
+    long packetsSeen();
+
+    /**
+     * Number of bytes processed by this meter.
+     *
+     * @return a byte count
+     */
+    long bytesSeen();
+
+    /**
      * A meter builder.
      */
     interface Builder {
@@ -104,10 +139,10 @@
         /**
          * Assigns the id to this meter.
          *
-         * @param id an integer
+         * @param id a long
          * @return this
          */
-        Builder withId(int id);
+        Builder withId(long id);
 
         /**
          * Assigns the application that built this meter.
diff --git a/incubator/api/src/main/java/org/onosproject/incubator/net/meter/MeterContext.java b/incubator/api/src/main/java/org/onosproject/incubator/net/meter/MeterContext.java
index 4c814f8..919cf63 100644
--- a/incubator/api/src/main/java/org/onosproject/incubator/net/meter/MeterContext.java
+++ b/incubator/api/src/main/java/org/onosproject/incubator/net/meter/MeterContext.java
@@ -34,6 +34,5 @@
      * @param op a meter operation
      * @param reason the reason why it failed
      */
-    default void onError(MeterOperation op, MeterFailReason reason) {
-    }
+    default void onError(MeterOperation op, MeterFailReason reason) {}
 }
diff --git a/incubator/api/src/main/java/org/onosproject/incubator/net/meter/MeterEntry.java b/incubator/api/src/main/java/org/onosproject/incubator/net/meter/MeterEntry.java
new file mode 100644
index 0000000..6157578
--- /dev/null
+++ b/incubator/api/src/main/java/org/onosproject/incubator/net/meter/MeterEntry.java
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.incubator.net.meter;
+
+/**
+ * Represents a stored meter.
+ */
+public interface MeterEntry extends Meter {
+
+    /**
+     * Updates the state of this meter.
+     *
+     * @param state a meter state
+     */
+    void setState(MeterState state);
+
+    /**
+     * Set the amount of time the meter has existed in seconds.
+     *
+     * @param life number of seconds
+     */
+    void setLife(long life);
+
+    /**
+     * Sets the number of flows which are using this meter.
+     *
+     * @param count a reference count.
+     */
+    void setReferenceCount(long count);
+
+    /**
+     * Updates the number of packets seen by this meter.
+     *
+     * @param packets a packet count.
+     */
+    void setProcessedPackets(long packets);
+
+    /**
+     * Updates the number of bytes seen by the meter.
+     *
+     * @param bytes a byte counter.
+     */
+    void setProcessedBytes(long bytes);
+}
diff --git a/incubator/api/src/main/java/org/onosproject/incubator/net/meter/MeterEvent.java b/incubator/api/src/main/java/org/onosproject/incubator/net/meter/MeterEvent.java
index a952a14..9439145 100644
--- a/incubator/api/src/main/java/org/onosproject/incubator/net/meter/MeterEvent.java
+++ b/incubator/api/src/main/java/org/onosproject/incubator/net/meter/MeterEvent.java
@@ -20,20 +20,12 @@
 /**
  * Entity that represents Meter events.
  */
-public class MeterEvent extends AbstractEvent<MeterEvent.Type, Meter> {
+public class MeterEvent extends AbstractEvent<MeterEvent.Type, MeterOperation> {
 
 
-    enum Type {
+    private final MeterFailReason reason;
 
-        /**
-         * Signals that a new meter has been added.
-         */
-        METER_ADDED,
-
-        /**
-         * Signals that a meter has been removed.
-         */
-        METER_REMOVED,
+    public enum Type {
 
         /**
          * Signals that a meter has been added.
@@ -41,19 +33,15 @@
         METER_UPDATED,
 
         /**
-         * Signals that a meter addition failed.
-         */
-        METER_ADD_FAILED,
-
-        /**
-         * Signals that a meter removal failed.
-         */
-        METER_REMOVE_FAILED,
-
-        /**
          * Signals that a meter update failed.
          */
-        METER_UPDATE_FAILED
+        METER_OP_FAILED,
+
+        /**
+         * A meter operation was requested.
+         */
+        METER_OP_REQ,
+
     }
 
 
@@ -62,21 +50,32 @@
      * current time.
      *
      * @param type  meter event type
-     * @param meter event subject
+     * @param op event subject
      */
-    public MeterEvent(Type type, Meter meter) {
-        super(type, meter);
+    public MeterEvent(Type type, MeterOperation op) {
+        super(type, op);
+        this.reason = null;
     }
 
     /**
      * Creates an event of a given type and for the specified meter and time.
      *
      * @param type  meter event type
-     * @param meter event subject
+     * @param op event subject
      * @param time  occurrence time
      */
-    public MeterEvent(Type type, Meter meter, long time) {
-        super(type, meter, time);
+    public MeterEvent(Type type, MeterOperation op, long time) {
+        super(type, op, time);
+        this.reason = null;
+    }
+
+    public MeterEvent(Type type, MeterOperation op, MeterFailReason reason) {
+        super(type, op);
+        this.reason = reason;
+    }
+
+    public MeterFailReason reason() {
+        return reason;
     }
 
 }
diff --git a/incubator/api/src/main/java/org/onosproject/incubator/net/meter/MeterId.java b/incubator/api/src/main/java/org/onosproject/incubator/net/meter/MeterId.java
index ea7cf36..8619e14 100644
--- a/incubator/api/src/main/java/org/onosproject/incubator/net/meter/MeterId.java
+++ b/incubator/api/src/main/java/org/onosproject/incubator/net/meter/MeterId.java
@@ -19,19 +19,19 @@
 
 /**
  * A representation of a meter id.
- * Uniquely identifies a meter for a given device.
+ * Uniquely identifies a meter system wide.
  */
 public final class MeterId {
 
     static final long MAX = 0xFFFF0000;
 
-    private final int id;
+    private final long id;
 
     public static final MeterId SLOWPATH = new MeterId(0xFFFFFFFD);
     public static final MeterId CONTROLLER = new MeterId(0xFFFFFFFE);
     public static final MeterId ALL = new MeterId(0xFFFFFFFF);
 
-    private MeterId(int id) {
+    private MeterId(long id) {
         checkArgument(id <= MAX, "id cannot be larger than 0xFFFF0000");
         this.id = id;
     }
@@ -39,9 +39,9 @@
     /**
      * The integer representation of the meter id.
      *
-     * @return an integer
+     * @return a long
      */
-    public int id() {
+    public long id() {
         return id;
     }
 
@@ -62,10 +62,10 @@
 
     @Override
     public int hashCode() {
-        return id;
+        return Long.hashCode(id);
     }
 
-    public static MeterId meterId(int id) {
+    public static MeterId meterId(long id) {
         return new MeterId(id);
 
     }
diff --git a/incubator/api/src/main/java/org/onosproject/incubator/net/meter/MeterState.java b/incubator/api/src/main/java/org/onosproject/incubator/net/meter/MeterState.java
new file mode 100644
index 0000000..a910d6d
--- /dev/null
+++ b/incubator/api/src/main/java/org/onosproject/incubator/net/meter/MeterState.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.incubator.net.meter;
+
+/**
+ * Represents the state of the meter as seen by the store.
+ */
+public enum MeterState {
+
+    /**
+     * The meter is in the process of being added.
+     */
+    PENDING_ADD,
+
+    /**
+     * THe meter has been added.
+     */
+    ADDED,
+
+    /**
+     * The meter is in the process of being removed.
+     */
+    PENDING_REMOVE,
+
+    /**
+     * The meter has been removed.
+     */
+    REMOVED,
+
+}
diff --git a/incubator/api/src/main/java/org/onosproject/incubator/net/meter/MeterStore.java b/incubator/api/src/main/java/org/onosproject/incubator/net/meter/MeterStore.java
new file mode 100644
index 0000000..232eedd
--- /dev/null
+++ b/incubator/api/src/main/java/org/onosproject/incubator/net/meter/MeterStore.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.incubator.net.meter;
+
+import org.onosproject.store.Store;
+
+import java.util.Collection;
+
+/**
+ * Entity that stores and distributed meter objects.
+ */
+public interface MeterStore extends Store<MeterEvent, MeterStoreDelegate> {
+
+    /**
+     * Adds a meter to the store.
+     *
+     * @param meter a meter
+     */
+    void storeMeter(Meter meter);
+
+    /**
+     * Deletes a meter from the store.
+     *
+     * @param meter a meter
+     */
+    void deleteMeter(Meter meter);
+
+    /**
+     * Updates a meter whose meter id is the same as the passed meter.
+     *
+     * @param meter a new meter
+     */
+    void updateMeter(Meter meter);
+
+    /**
+     * Updates a given meter's state with the provided state.
+     * @param meter a meter
+     */
+    void updateMeterState(Meter meter);
+
+    /**
+     * Obtains a meter matching the given meter id.
+     *
+     * @param meterId a meter id
+     * @return a meter
+     */
+    Meter getMeter(MeterId meterId);
+
+    /**
+     * Returns all meters stored in the store.
+     *
+     * @return a collection of meters
+     */
+    Collection<Meter> getAllMeters();
+
+    /**
+     * Update the store by deleting the failed meter.
+     * Notifies the delegate that the meter failed to allow it
+     * to nofity the app.
+     *
+     * @param op a failed meter operation
+     * @param reason a failure reason
+     */
+    void failedMeter(MeterOperation op, MeterFailReason reason);
+}
diff --git a/incubator/net/src/main/java/org/onosproject/incubator/net/meter/impl/MeterManager.java b/incubator/net/src/main/java/org/onosproject/incubator/net/meter/impl/MeterManager.java
index 6039247..b0726d6 100644
--- a/incubator/net/src/main/java/org/onosproject/incubator/net/meter/impl/MeterManager.java
+++ b/incubator/net/src/main/java/org/onosproject/incubator/net/meter/impl/MeterManager.java
@@ -21,6 +21,7 @@
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
 import org.apache.felix.scr.annotations.Service;
+import org.onosproject.incubator.net.meter.DefaultMeter;
 import org.onosproject.incubator.net.meter.Meter;
 import org.onosproject.incubator.net.meter.MeterEvent;
 import org.onosproject.incubator.net.meter.MeterFailReason;
@@ -31,6 +32,8 @@
 import org.onosproject.incubator.net.meter.MeterProviderRegistry;
 import org.onosproject.incubator.net.meter.MeterProviderService;
 import org.onosproject.incubator.net.meter.MeterService;
+import org.onosproject.incubator.net.meter.MeterState;
+import org.onosproject.incubator.net.meter.MeterStore;
 import org.onosproject.incubator.net.meter.MeterStoreDelegate;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.provider.AbstractListenerProviderRegistry;
@@ -41,6 +44,7 @@
 
 import java.util.Collection;
 
+import static com.google.common.base.Preconditions.checkNotNull;
 import static org.slf4j.LoggerFactory.getLogger;
 
 
@@ -60,6 +64,9 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected StorageService storageService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    MeterStore store;
+
     private AtomicCounter meterIdCounter;
 
     @Activate
@@ -82,27 +89,35 @@
 
     @Override
     public void addMeter(Meter meter) {
-
+        DefaultMeter m = (DefaultMeter) meter;
+        m.setState(MeterState.PENDING_ADD);
+        store.storeMeter(m);
     }
 
     @Override
     public void updateMeter(Meter meter) {
-
+        DefaultMeter m = (DefaultMeter) meter;
+        m.setState(MeterState.PENDING_ADD);
+        store.updateMeter(m);
     }
 
     @Override
     public void removeMeter(Meter meter) {
-
+        DefaultMeter m = (DefaultMeter) meter;
+        m.setState(MeterState.PENDING_REMOVE);
+        store.deleteMeter(m);
     }
 
     @Override
     public void removeMeter(MeterId id) {
-
+        DefaultMeter meter = (DefaultMeter) store.getMeter(id);
+        checkNotNull(meter, "No such meter {}", id);
+        removeMeter(meter);
     }
 
     @Override
     public Meter getMeter(MeterId id) {
-        return null;
+        return store.getMeter(id);
     }
 
     @Override
@@ -125,13 +140,14 @@
         }
 
         @Override
-        public void meterOperationFailed(MeterOperation operation, MeterFailReason reason) {
-
+        public void meterOperationFailed(MeterOperation operation,
+                                         MeterFailReason reason) {
+            store.failedMeter(operation, reason);
         }
 
         @Override
         public void pushMeterMetrics(DeviceId deviceId, Collection<Meter> meterEntries) {
-
+            meterEntries.forEach(m -> store.updateMeterState(m));
         }
     }
 
@@ -139,6 +155,21 @@
 
         @Override
         public void notify(MeterEvent event) {
+            DeviceId deviceId = event.subject().meter().deviceId();
+            MeterProvider p = getProvider(event.subject().meter().deviceId());
+            switch (event.type()) {
+                case METER_UPDATED:
+                    break;
+                case METER_OP_FAILED:
+                    event.subject().meter().context().ifPresent(c ->
+                        c.onError(event.subject(), event.reason()));
+                    break;
+                case METER_OP_REQ:
+                    p.performMeterOperation(deviceId, event.subject());
+                    break;
+                default:
+                    log.warn("Unknown meter event {}", event.type());
+            }
 
         }
     }
diff --git a/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java b/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java
new file mode 100644
index 0000000..b8f2080
--- /dev/null
+++ b/incubator/store/src/main/java/org/onosproject/incubator/store/meter/impl/DistributedMeterStore.java
@@ -0,0 +1,273 @@
+/*
+ * Copyright 2015 Open Networking Laboratory
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.incubator.store.meter.impl;
+
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.onlab.util.KryoNamespace;
+import org.onlab.util.Tools;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.incubator.net.meter.DefaultBand;
+import org.onosproject.incubator.net.meter.DefaultMeter;
+import org.onosproject.incubator.net.meter.Meter;
+import org.onosproject.incubator.net.meter.MeterEvent;
+import org.onosproject.incubator.net.meter.MeterFailReason;
+import org.onosproject.incubator.net.meter.MeterId;
+import org.onosproject.incubator.net.meter.MeterOperation;
+import org.onosproject.incubator.net.meter.MeterStore;
+import org.onosproject.incubator.net.meter.MeterStoreDelegate;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.slf4j.Logger;
+
+import java.util.Collection;
+import java.util.Objects;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * A distributed meter store implementation. Meters are stored consistently
+ * across the cluster.
+ */
+public class DistributedMeterStore extends AbstractStore<MeterEvent, MeterStoreDelegate>
+                    implements MeterStore {
+
+    private Logger log = getLogger(getClass());
+
+    private static final String METERSTORE = "onos-meter-store";
+    private static final int MESSAGE_HANDLER_THREAD_POOL_SIZE = 8;
+
+    private static final MessageSubject UPDATE_METER = new MessageSubject("peer-mod-meter");
+
+
+    @Property(name = "msgHandlerPoolSize", intValue = MESSAGE_HANDLER_THREAD_POOL_SIZE,
+            label = "Number of threads in the message handler pool")
+    private int msgPoolSize;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private StorageService storageService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private ClusterCommunicationService clusterCommunicationService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private MastershipService mastershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    private ClusterService clusterService;
+
+    private ConsistentMap<MeterId, Meter> meters;
+    private NodeId local;
+    private KryoNamespace kryoNameSpace;
+
+    private Serializer serializer;
+
+    @Activate
+    public void activate() {
+
+        local = clusterService.getLocalNode().id();
+
+        kryoNameSpace =
+                KryoNamespace.newBuilder()
+                                .register(DefaultMeter.class)
+                                .register(DefaultBand.class)
+                                .build();
+
+        serializer = Serializer.using(kryoNameSpace);
+
+        meters = storageService.<MeterId, Meter>consistentMapBuilder()
+                    .withName(METERSTORE)
+                    .withSerializer(serializer)
+                    .build();
+
+        ExecutorService executors = Executors.newFixedThreadPool(
+                msgPoolSize, Tools.groupedThreads("onos/store/meter", "message-handlers"));
+        registerMessageHandlers(executors);
+
+        log.info("Started");
+    }
+
+    @Deactivate
+    public void deactivate() {
+
+
+        log.info("Stopped");
+    }
+
+    private void registerMessageHandlers(ExecutorService executor) {
+        clusterCommunicationService.<MeterEvent>addSubscriber(UPDATE_METER, kryoNameSpace::deserialize,
+                                                              this::notifyDelegate, executor);
+
+    }
+
+
+    @Override
+    public void storeMeter(Meter meter) {
+        NodeId master = mastershipService.getMasterFor(meter.deviceId());
+
+        meters.put(meter.id(), meter);
+
+        MeterEvent event = new MeterEvent(MeterEvent.Type.METER_OP_REQ,
+                                          new MeterOperation(meter, MeterOperation.Type.ADD));
+        if (Objects.equals(local, master)) {
+            notifyDelegate(event);
+        } else {
+            clusterCommunicationService.unicast(
+                    event,
+                    UPDATE_METER,
+                    serializer::encode,
+                    master
+            ).whenComplete((result, error) -> {
+                if (error != null) {
+                    log.warn("Failed to install meter {} because {} on {}",
+                             meter, error, master);
+
+                    // notify app of failure
+                    meter.context().ifPresent(c -> c.onError(
+                            event.subject(), MeterFailReason.UNKNOWN));
+                }
+            });
+        }
+
+    }
+
+    @Override
+    public void deleteMeter(Meter meter) {
+
+        NodeId master = mastershipService.getMasterFor(meter.deviceId());
+
+        // update the state of the meter. It will be pruned by observing
+        // that it has been removed from the dataplane.
+        meters.put(meter.id(), meter);
+
+        MeterEvent event = new MeterEvent(MeterEvent.Type.METER_OP_REQ,
+                                          new MeterOperation(meter, MeterOperation.Type.REMOVE));
+        if (Objects.equals(local, master)) {
+            notifyDelegate(event);
+        } else {
+            clusterCommunicationService.unicast(
+                    event,
+                    UPDATE_METER,
+                    serializer::encode,
+                    master
+            ).whenComplete((result, error) -> {
+                if (error != null) {
+                    log.warn("Failed to delete meter {} because {} on {}",
+                             meter, error, master);
+
+                    // notify app of failure
+                    meter.context().ifPresent(c -> c.onError(
+                            event.subject(), MeterFailReason.UNKNOWN));
+                }
+            });
+        }
+
+    }
+
+    @Override
+    public void updateMeter(Meter meter) {
+
+        NodeId master = mastershipService.getMasterFor(meter.deviceId());
+
+        meters.put(meter.id(), meter);
+
+        MeterEvent event = new MeterEvent(MeterEvent.Type.METER_OP_REQ,
+                                          new MeterOperation(meter, MeterOperation.Type.MODIFY));
+        if (Objects.equals(local, master)) {
+            notifyDelegate(event);
+        } else {
+            clusterCommunicationService.unicast(
+                    event,
+                    UPDATE_METER,
+                    serializer::encode,
+                    master
+            ).whenComplete((result, error) -> {
+                if (error != null) {
+                    log.warn("Failed to update meter {} because {} on {}",
+                             meter, error, master);
+
+                    // notify app of failure
+                    meter.context().ifPresent(c -> c.onError(
+                            event.subject(), MeterFailReason.UNKNOWN));
+                }
+            });
+        }
+
+    }
+
+    @Override
+    public void updateMeterState(Meter meter) {
+        meters.compute(meter.id(), (id, v) -> {
+            DefaultMeter m = (DefaultMeter) v;
+            m.setState(meter.state());
+            m.setProcessedPackets(meter.packetsSeen());
+            m.setProcessedBytes(meter.bytesSeen());
+            m.setLife(meter.life());
+            m.setReferenceCount(meter.referenceCount());
+            return m;
+        });
+    }
+
+    @Override
+    public Meter getMeter(MeterId meterId) {
+        return meters.get(meterId).value();
+    }
+
+    @Override
+    public Collection<Meter> getAllMeters() {
+        return meters.values().stream()
+                .map(v -> v.value()).collect(Collectors.toSet());
+    }
+
+    @Override
+    public void failedMeter(MeterOperation op, MeterFailReason reason) {
+        NodeId master = mastershipService.getMasterFor(op.meter().deviceId());
+        meters.remove(op.meter().id());
+
+        MeterEvent event = new MeterEvent(MeterEvent.Type.METER_OP_FAILED, op, reason);
+        if (Objects.equals(local, master)) {
+            notifyDelegate(event);
+        } else {
+            clusterCommunicationService.unicast(
+                    event,
+                    UPDATE_METER,
+                    serializer::encode,
+                    master
+            ).whenComplete((result, error) -> {
+                if (error != null) {
+                    log.warn("Failed to delete failed meter {} because {} on {}",
+                             op.meter(), error, master);
+
+                    // Can't do any more...
+                }
+            });
+        }
+
+    }
+
+}
diff --git a/providers/openflow/meter/src/main/java/org/onosproject/provider/of/meter/impl/MeterModBuilder.java b/providers/openflow/meter/src/main/java/org/onosproject/provider/of/meter/impl/MeterModBuilder.java
index a87250e..00ad8ef 100644
--- a/providers/openflow/meter/src/main/java/org/onosproject/provider/of/meter/impl/MeterModBuilder.java
+++ b/providers/openflow/meter/src/main/java/org/onosproject/provider/of/meter/impl/MeterModBuilder.java
@@ -46,7 +46,7 @@
     private final OFFactory factory;
     private Meter.Unit unit = Meter.Unit.KB_PER_SEC;
     private boolean burst = false;
-    private Integer id;
+    private Long id;
     private Collection<Band> bands;
 
     public MeterModBuilder(long xid, OFFactory factory) {
diff --git a/providers/openflow/meter/src/main/java/org/onosproject/provider/of/meter/impl/OpenFlowMeterProvider.java b/providers/openflow/meter/src/main/java/org/onosproject/provider/of/meter/impl/OpenFlowMeterProvider.java
index dabeda0..e56a54c 100644
--- a/providers/openflow/meter/src/main/java/org/onosproject/provider/of/meter/impl/OpenFlowMeterProvider.java
+++ b/providers/openflow/meter/src/main/java/org/onosproject/provider/of/meter/impl/OpenFlowMeterProvider.java
@@ -20,7 +20,6 @@
 import com.google.common.cache.Cache;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.RemovalCause;
-
 import com.google.common.cache.RemovalNotification;
 import com.google.common.collect.Maps;
 import org.apache.felix.scr.annotations.Activate;
@@ -28,7 +27,10 @@
 import org.apache.felix.scr.annotations.Deactivate;
 import org.apache.felix.scr.annotations.Reference;
 import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.onosproject.net.DeviceId;
+import org.onosproject.core.CoreService;
+import org.onosproject.incubator.net.meter.Band;
+import org.onosproject.incubator.net.meter.DefaultBand;
+import org.onosproject.incubator.net.meter.DefaultMeter;
 import org.onosproject.incubator.net.meter.Meter;
 import org.onosproject.incubator.net.meter.MeterFailReason;
 import org.onosproject.incubator.net.meter.MeterOperation;
@@ -36,6 +38,8 @@
 import org.onosproject.incubator.net.meter.MeterProvider;
 import org.onosproject.incubator.net.meter.MeterProviderRegistry;
 import org.onosproject.incubator.net.meter.MeterProviderService;
+import org.onosproject.incubator.net.meter.MeterState;
+import org.onosproject.net.DeviceId;
 import org.onosproject.net.provider.AbstractProvider;
 import org.onosproject.net.provider.ProviderId;
 import org.onosproject.openflow.controller.Dpid;
@@ -47,15 +51,23 @@
 import org.projectfloodlight.openflow.protocol.OFErrorMsg;
 import org.projectfloodlight.openflow.protocol.OFErrorType;
 import org.projectfloodlight.openflow.protocol.OFMessage;
+import org.projectfloodlight.openflow.protocol.OFMeterBandStats;
+import org.projectfloodlight.openflow.protocol.OFMeterConfigStatsReply;
+import org.projectfloodlight.openflow.protocol.OFMeterStats;
+import org.projectfloodlight.openflow.protocol.OFMeterStatsReply;
 import org.projectfloodlight.openflow.protocol.OFPortStatus;
 import org.projectfloodlight.openflow.protocol.OFStatsReply;
+import org.projectfloodlight.openflow.protocol.OFStatsType;
 import org.projectfloodlight.openflow.protocol.OFVersion;
 import org.projectfloodlight.openflow.protocol.errormsg.OFMeterModFailedErrorMsg;
 import org.slf4j.Logger;
 
+import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 import static org.slf4j.LoggerFactory.getLogger;
 
@@ -74,6 +86,9 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected MeterProviderRegistry providerRegistry;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected CoreService coreService;
+
     private MeterProviderService providerService;
 
     private static final AtomicLong XID_COUNTER = new AtomicLong(1);
@@ -81,8 +96,7 @@
     static final int POLL_INTERVAL = 10;
     static final long TIMEOUT = 30;
 
-    private Cache<Integer, MeterOperation> pendingOperations;
-    private Cache<Long, MeterOperation> pendingXid;
+    private Cache<Long, MeterOperation> pendingOperations;
 
 
     private InternalMeterListener listener = new InternalMeterListener();
@@ -101,7 +115,7 @@
 
         pendingOperations = CacheBuilder.newBuilder()
                 .expireAfterWrite(TIMEOUT, TimeUnit.SECONDS)
-                .removalListener((RemovalNotification<Integer, MeterOperation> notification) -> {
+                .removalListener((RemovalNotification<Long, MeterOperation> notification) -> {
                     if (notification.getCause() == RemovalCause.EXPIRED) {
                         providerService.meterOperationFailed(notification.getValue(),
                                                              MeterFailReason.TIMEOUT);
@@ -149,6 +163,8 @@
             return;
         }
 
+        performOperation(sw, meterOp);
+
     }
 
     private void performOperation(OpenFlowSwitch sw, MeterOperation op) {
@@ -203,7 +219,57 @@
     }
 
     private void pushMeterStats(Dpid dpid, OFStatsReply msg) {
+        DeviceId deviceId = DeviceId.deviceId(Dpid.uri(dpid));
 
+        if (msg.getStatsType() == OFStatsType.METER) {
+            OFMeterStatsReply reply = (OFMeterStatsReply) msg;
+            Collection<Meter> meters = buildMeters(deviceId, reply.getEntries());
+            //TODO do meter accounting here.
+            providerService.pushMeterMetrics(deviceId, meters);
+        } else if (msg.getStatsType() == OFStatsType.METER_CONFIG) {
+            OFMeterConfigStatsReply reply  = (OFMeterConfigStatsReply) msg;
+            // FIXME: Map<Long, Meter> meters = collectMeters(deviceId, reply);
+        }
+
+    }
+
+    private Map<Long, Meter> collectMeters(DeviceId deviceId,
+                                           OFMeterConfigStatsReply reply) {
+        return Maps.newHashMap();
+        //TODO: Needs a fix to be applied to loxi MeterConfig stat is incorrect
+    }
+
+    private Collection<Meter> buildMeters(DeviceId deviceId,
+                                          List<OFMeterStats> entries) {
+        return entries.stream().map(stat -> {
+            DefaultMeter.Builder builder = DefaultMeter.builder();
+            Collection<Band> bands = buildBands(stat.getBandStats());
+            builder.forDevice(deviceId)
+                    .withId(stat.getMeterId())
+                    //FIXME: need to encode appId in meter id, but that makes
+                    // things a little annoying for debugging
+                    .fromApp(coreService.getAppId("org.onosproject.core"))
+                    .withBands(bands);
+            DefaultMeter meter = builder.build();
+            meter.setState(MeterState.ADDED);
+            meter.setLife(stat.getDurationSec());
+            meter.setProcessedBytes(stat.getByteInCount().getValue());
+            meter.setProcessedPackets(stat.getPacketInCount().getValue());
+            meter.setReferenceCount(stat.getFlowCount());
+
+            // marks the meter as seen on the dataplane
+            pendingOperations.invalidate(stat.getMeterId());
+            return meter;
+        }).collect(Collectors.toSet());
+    }
+
+    private Collection<Band> buildBands(List<OFMeterBandStats> bandStats) {
+        return bandStats.stream().map(stat -> {
+            DefaultBand band = DefaultBand.builder().build();
+            band.setBytes(stat.getByteBandCount().getValue());
+            band.setPackets(stat.getPacketBandCount().getValue());
+            return band;
+        }).collect(Collectors.toSet());
     }
 
     private void signalMeterError(OFMeterModFailedErrorMsg meterError,
diff --git a/tools/test/bin/onos-form-cluster b/tools/test/bin/onos-form-cluster
index b6823b1..daca34c 100755
--- a/tools/test/bin/onos-form-cluster
+++ b/tools/test/bin/onos-form-cluster
@@ -29,4 +29,4 @@
 
 set -x
 
-ssh $ONOS_USER@$node /tmp/$ONOS_BITS/bin/onos-form-cluster -u $user -p $password $nodes
\ No newline at end of file
+ssh $ONOS_USER@$node $ONOS_INSTALL_DIR/bin/onos-form-cluster -u $user -p $password $nodes