Refactor OpenstackTelemetry App for better readability
Change-Id: I93353de31fb9671d8670ee44fc248fe7f36ac12b
diff --git a/apps/openstacktelemetry/app/BUCK b/apps/openstacktelemetry/app/BUCK
index 39561d6..97bc0f9 100644
--- a/apps/openstacktelemetry/app/BUCK
+++ b/apps/openstacktelemetry/app/BUCK
@@ -10,6 +10,7 @@
'//lib:jersey-client',
'//cli:onos-cli',
'//lib:org.apache.karaf.shell.console',
+ '//apps/openstacknetworking/api:onos-apps-openstacknetworking-api',
'//apps/openstacktelemetry/api:onos-apps-openstacktelemetry-api',
'//lib:kafka-clients',
'//lib:influxdb-java',
diff --git a/apps/openstacktelemetry/app/app.xml b/apps/openstacktelemetry/app/app.xml
index 0e31e0e..75a5b3f 100644
--- a/apps/openstacktelemetry/app/app.xml
+++ b/apps/openstacktelemetry/app/app.xml
@@ -14,8 +14,9 @@
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
-<app name="org.onosproject.openstacktelemetry" origin="Open Networking Foundation" version="${project.version}"
- category="Utility" title="OpenStack Telemetry App" features="${project.artifactId}"
+<app name="org.onosproject.openstacktelemetry" origin="Open Networking Foundation"
+ version="${project.version}" category="Utility" title="OpenStack Telemetry App"
+ features="${project.artifactId}" apps="org.onosproject.openstacknetworking"
featuresRepo="mvn:${project.groupId}/${project.artifactId}/${project.version}/xml/features">
<description>${project.description}</description>
<artifact>mvn:${project.groupId}/onos-apps-openstacktelemetry-api/${project.version}</artifact>
diff --git a/apps/openstacktelemetry/app/pom.xml b/apps/openstacktelemetry/app/pom.xml
index 841ce2e..7d81b2a 100644
--- a/apps/openstacktelemetry/app/pom.xml
+++ b/apps/openstacktelemetry/app/pom.xml
@@ -81,6 +81,12 @@
<dependency>
<groupId>org.onosproject</groupId>
+ <artifactId>onos-apps-openstacknetworking-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.onosproject</groupId>
<artifactId>onlab-osgi</artifactId>
<version>${project.version}</version>
</dependency>
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/TinaFlowInfoByteBufferCodec.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/TinaFlowInfoByteBufferCodec.java
index 35547b2..8b28797 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/TinaFlowInfoByteBufferCodec.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/TinaFlowInfoByteBufferCodec.java
@@ -35,14 +35,19 @@
public class TinaFlowInfoByteBufferCodec extends ByteBufferCodec<FlowInfo> {
private static final int MESSAGE_SIZE = 88;
+ private static final String OF_PREFIX = "of:";
@Override
public ByteBuffer encode(FlowInfo flowInfo) {
ByteBuffer byteBuffer = ByteBuffer.allocate(MESSAGE_SIZE);
+ String deviceId = flowInfo.deviceId().toString();
+ short switchId = (short) Integer.parseInt(deviceId.substring(3,
+ deviceId.length()), 16);
+
byteBuffer.put(flowInfo.flowType())
- .putShort(Short.valueOf(flowInfo.deviceId().toString()))
+ .putShort(switchId)
.putInt(flowInfo.inputInterfaceId())
.putInt(flowInfo.outputInterfaceId())
.putShort(flowInfo.vlanId().toShort())
@@ -67,7 +72,8 @@
public FlowInfo decode(ByteBuffer byteBuffer) {
byte flowType = byteBuffer.get();
- DeviceId deviceId = DeviceId.deviceId(String.valueOf(byteBuffer.getShort()));
+ String deviceIdStr = String.format("%016x", byteBuffer.getShort());
+ DeviceId deviceId = DeviceId.deviceId(OF_PREFIX + deviceIdStr);
int inputInterfaceId = byteBuffer.getInt();
int outputInterfaceId = byteBuffer.getInt();
VlanId vlanId = VlanId.vlanId(byteBuffer.getShort());
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/TinaMessageByteBufferCodec.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/TinaMessageByteBufferCodec.java
new file mode 100644
index 0000000..3b849d1
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/TinaMessageByteBufferCodec.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.codec;
+
+import org.onosproject.openstacktelemetry.api.FlowInfo;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+/**
+ * Tina Message ByteBuffer Codec.
+ */
+public class TinaMessageByteBufferCodec {
+
+ private static final int HEADER_SIZE = 8;
+ private static final int ENTRY_SIZE = 88;
+ private static final int MILLISECONDS = 1000;
+ private static final short KAFKA_MESSAGE_TYPE = 1;
+
+ /**
+ * Encodes a collection flow infos into byte buffer.
+ *
+ * @param flowInfos a collection of flow info
+ * @return encoded byte buffer
+ */
+ public ByteBuffer encode(Set<FlowInfo> flowInfos) {
+ ByteBuffer byteBuffer =
+ ByteBuffer.allocate(HEADER_SIZE + flowInfos.size() * ENTRY_SIZE);
+
+ byteBuffer.put(buildMessageHeader(flowInfos));
+ byteBuffer.put(buildMessageBody(flowInfos));
+
+ return byteBuffer;
+ }
+
+ private byte[] buildMessageHeader(Set<FlowInfo> flowInfos) {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(HEADER_SIZE);
+
+ byteBuffer.putShort((short) flowInfos.size());
+ byteBuffer.putShort(KAFKA_MESSAGE_TYPE);
+ byteBuffer.putInt((int) (System.currentTimeMillis() / MILLISECONDS));
+
+ return byteBuffer.array();
+ }
+
+ private byte[] buildMessageBody(Set<FlowInfo> flowInfos) {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(flowInfos.size() * ENTRY_SIZE);
+
+ TinaFlowInfoByteBufferCodec codec = new TinaFlowInfoByteBufferCodec();
+ flowInfos.forEach(flowInfo -> byteBuffer.put(codec.encode(flowInfo)));
+
+ return byteBuffer.array();
+ }
+}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultFlowInfo.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultFlowInfo.java
index 74a5016..3f56362 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultFlowInfo.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultFlowInfo.java
@@ -141,6 +141,16 @@
}
@Override
+ public boolean roughEquals(FlowInfo flowInfo) {
+ return deviceId.equals(flowInfo.deviceId()) &&
+ srcIp.equals(flowInfo.srcIp()) &&
+ dstIp.equals(flowInfo.dstIp()) &&
+ srcPort.equals(flowInfo.srcPort()) &&
+ dstPort.equals(flowInfo.dstPort()) &&
+ (protocol == flowInfo.protocol());
+ }
+
+ @Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java
index 4629a4e..da492a9 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java
@@ -21,7 +21,6 @@
import org.apache.felix.scr.annotations.Deactivate;
import org.apache.felix.scr.annotations.Service;
import org.apache.kafka.clients.producer.ProducerRecord;
-import org.onosproject.openstacktelemetry.api.ByteBufferCodec;
import org.onosproject.openstacktelemetry.api.FlowInfo;
import org.onosproject.openstacktelemetry.api.GrpcTelemetryService;
import org.onosproject.openstacktelemetry.api.InfluxDbTelemetryService;
@@ -29,12 +28,13 @@
import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
import org.onosproject.openstacktelemetry.api.RestTelemetryService;
import org.onosproject.openstacktelemetry.api.TelemetryService;
-import org.onosproject.openstacktelemetry.codec.TinaFlowInfoByteBufferCodec;
+import org.onosproject.openstacktelemetry.codec.TinaMessageByteBufferCodec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.nio.ByteBuffer;
import java.util.List;
+import java.util.Set;
/**
* Openstack telemetry manager.
@@ -45,6 +45,9 @@
private final Logger log = LoggerFactory.getLogger(getClass());
+ private static final String KAFKA_TOPIC = "sona.flow";
+ private static final String KAFKA_KEY = "flowdata";
+
private List<TelemetryService> telemetryServices = Lists.newArrayList();
@Activate
@@ -68,45 +71,42 @@
}
@Override
- public void publish(FlowInfo flowInfo) {
+ public void publish(Set<FlowInfo> flowInfos) {
telemetryServices.forEach(service -> {
-
if (service instanceof GrpcTelemetryManager) {
- invokeGrpcPublisher((GrpcTelemetryService) service, flowInfo);
+ invokeGrpcPublisher((GrpcTelemetryService) service, flowInfos);
}
if (service instanceof InfluxDbTelemetryManager) {
- invokeInfluxDbPublisher((InfluxDbTelemetryService) service, flowInfo);
+ invokeInfluxDbPublisher((InfluxDbTelemetryService) service, flowInfos);
}
if (service instanceof KafkaTelemetryManager) {
- invokeKafkaPublisher((KafkaTelemetryService) service, flowInfo);
+ invokeKafkaPublisher((KafkaTelemetryService) service, flowInfos);
}
if (service instanceof RestTelemetryManager) {
- invokeRestPublisher((RestTelemetryService) service, flowInfo);
+ invokeRestPublisher((RestTelemetryService) service, flowInfos);
}
-
});
}
- private void invokeGrpcPublisher(GrpcTelemetryService service, FlowInfo flowInfo) {
+ private void invokeGrpcPublisher(GrpcTelemetryService service, Set<FlowInfo> flowInfos) {
// TODO: need provide implementation
}
- private void invokeInfluxDbPublisher(InfluxDbTelemetryService service, FlowInfo flowInfo) {
+ private void invokeInfluxDbPublisher(InfluxDbTelemetryService service, Set<FlowInfo> flowInfos) {
// TODO: need provide implementation
}
- private void invokeKafkaPublisher(KafkaTelemetryService service, FlowInfo flowInfo) {
- ByteBufferCodec codec = new TinaFlowInfoByteBufferCodec();
- ByteBuffer buffer = codec.encode(flowInfo);
- service.publish(new ProducerRecord<>("sona.flow", "flowdata", buffer.array()));
+ private void invokeKafkaPublisher(KafkaTelemetryService service, Set<FlowInfo> flowInfos) {
+ TinaMessageByteBufferCodec codec = new TinaMessageByteBufferCodec();
+ ByteBuffer buffer = codec.encode(flowInfos);
+ service.publish(new ProducerRecord<>(KAFKA_TOPIC, KAFKA_KEY, buffer.array()));
}
- private void invokeRestPublisher(RestTelemetryService service, FlowInfo flowInfo) {
+ private void invokeRestPublisher(RestTelemetryService service, Set<FlowInfo> flowInfos) {
// TODO: need provide implementation
}
-
}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java
index c0646ad..e2f505c 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/StatsFlowRuleManager.java
@@ -15,6 +15,7 @@
*/
package org.onosproject.openstacktelemetry.impl;
+import com.google.common.collect.Sets;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.felix.scr.annotations.Activate;
import org.apache.felix.scr.annotations.Component;
@@ -22,9 +23,6 @@
import org.apache.felix.scr.annotations.Reference;
import org.apache.felix.scr.annotations.ReferenceCardinality;
import org.apache.felix.scr.annotations.Service;
-import org.onlab.osgi.DefaultServiceDirectory;
-import org.onlab.packet.Ethernet;
-import org.onlab.packet.IPv4;
import org.onlab.packet.IpAddress;
import org.onlab.packet.MacAddress;
import org.onlab.packet.VlanId;
@@ -45,27 +43,39 @@
import org.onosproject.net.flow.IndexTableId;
import org.onosproject.net.flow.TrafficSelector;
import org.onosproject.net.flow.TrafficTreatment;
-import org.onosproject.net.flow.criteria.Criterion;
import org.onosproject.net.flow.criteria.IPCriterion;
import org.onosproject.net.flow.criteria.IPProtocolCriterion;
import org.onosproject.net.flow.criteria.TcpPortCriterion;
import org.onosproject.net.flow.criteria.UdpPortCriterion;
import org.onosproject.net.host.HostService;
import org.onosproject.openstacktelemetry.api.FlowInfo;
+import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
import org.onosproject.openstacktelemetry.api.StatsFlowRule;
import org.onosproject.openstacktelemetry.api.StatsFlowRuleAdminService;
import org.onosproject.openstacktelemetry.api.StatsInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashSet;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
+import static org.onlab.packet.Ethernet.TYPE_IPV4;
+import static org.onlab.packet.IPv4.PROTOCOL_TCP;
+import static org.onlab.packet.IPv4.PROTOCOL_UDP;
+import static org.onosproject.net.flow.criteria.Criterion.Type.IPV4_DST;
+import static org.onosproject.net.flow.criteria.Criterion.Type.IPV4_SRC;
+import static org.onosproject.net.flow.criteria.Criterion.Type.IP_PROTO;
+import static org.onosproject.net.flow.criteria.Criterion.Type.TCP_DST;
+import static org.onosproject.net.flow.criteria.Criterion.Type.TCP_SRC;
+import static org.onosproject.net.flow.criteria.Criterion.Type.UDP_DST;
+import static org.onosproject.net.flow.criteria.Criterion.Type.UDP_SRC;
+import static org.onosproject.openstacknetworking.api.Constants.DHCP_ARP_TABLE;
+import static org.onosproject.openstacknetworking.api.Constants.FORWARDING_TABLE;
+import static org.onosproject.openstacknetworking.api.Constants.STAT_INBOUND_TABLE;
+import static org.onosproject.openstacknetworking.api.Constants.STAT_OUTBOUND_TABLE;
import static org.onosproject.openstacktelemetry.api.Constants.OPENSTACK_TELEMETRY_APP_ID;
-
/**
* Flow rule manager for network statistics of a VM.
*/
@@ -77,7 +87,7 @@
private static final byte FLOW_TYPE_SONA = 1; // VLAN
- public static final int MILLISECONDS = 1000;
+ private static final int MILLISECONDS = 1000;
private static final int REFRESH_INTERVAL = 5;
private ApplicationId appId;
@@ -91,11 +101,16 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected HostService hostService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected DeviceService deviceService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected OpenstackTelemetryService telemetryService;
+
private Timer timer;
private TimerTask task;
- private OpenstackTelemetryManager osTelemetryManager;
- Set<FlowInfo> gFlowInfoSet = new HashSet<>();
+ private final Set<FlowInfo> gFlowInfoSet = Sets.newHashSet();
private int loopCount = 0;
private static final int SOURCE_ID = 1;
@@ -104,51 +119,29 @@
private static final int METRIC_PRIORITY_SOURCE = SOURCE_ID * PRIORITY_BASE;
private static final int METRIC_PRIORITY_TARGET = TARGET_ID * PRIORITY_BASE;
- public static final int FLOW_TABLE_VM_SOURCE = 0; // STAT_INBOUND_TABLE
- public static final int FLOW_TABLE_DHCP_ARP = 1; // DHCP_ARP_TABLE
- public static final int FLOW_TABLE_VM_TARGET = 49; // STAT_OUTBOUND_TABLE
- public static final int FLOW_TABLE_FORWARDING = 50; // FORWARDING_TABLE
-
- static final MacAddress NO_HOST_MAC = MacAddress.valueOf("00:00:00:00:00:00");
+ private static final MacAddress NO_HOST_MAC = MacAddress.valueOf("00:00:00:00:00:00");
public StatsFlowRuleManager() {
- log.info("Object is instantiated");
this.timer = new Timer("openstack-telemetry-sender");
}
@Activate
protected void activate() {
appId = coreService.registerApplication(OPENSTACK_TELEMETRY_APP_ID);
- log.info("Application is activated");
- osTelemetryManager = new OpenstackTelemetryManager();
+
this.start();
+
+ log.info("Started");
}
@Deactivate
protected void deactivate() {
- log.info("Application is deactivated");
- }
-
- private class InternalTimerTask extends TimerTask {
- @Override
- public void run() {
- log.debug("Timger Task Thread Starts ({})", loopCount++);
- try {
- Set<FlowInfo> flowInfoSet = getFlowRule();
- for (FlowInfo flowInfo: flowInfoSet) {
- log.info("Publish FlowInfo to NMS: {}", flowInfo.toString());
- osTelemetryManager.publish(flowInfo);
- }
- } catch (Exception ex) {
- log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
- }
- }
+ log.info("Stopped");
}
@Override
public void start() {
log.info("Start publishing thread");
- Set<FlowInfo> gFlowInfoSet = getFlowRule();
task = new InternalTimerTask();
timer.scheduleAtFixedRate(task, MILLISECONDS * REFRESH_INTERVAL,
MILLISECONDS * REFRESH_INTERVAL);
@@ -161,65 +154,86 @@
task = null;
}
- public void connectTables(
- DeviceId deviceId,
- int fromTable,
- int toTable,
- StatsFlowRule statsFlowRule,
- int rulePriority,
- boolean installFlag) {
- try {
- log.debug("Table Transition: {} -> {}", fromTable, toTable);
- int srcPrefixLength = statsFlowRule.srcIpPrefix().prefixLength();
- int dstPrefixLength = statsFlowRule.dstIpPrefix().prefixLength();
- int prefixLength = rulePriority + srcPrefixLength + dstPrefixLength;
+ @Override
+ public void createStatFlowRule(StatsFlowRule statsFlowRule) {
- TrafficSelector.Builder selector;
- if (statsFlowRule == null) {
- selector = DefaultTrafficSelector.builder();
- } else {
- selector = DefaultTrafficSelector.builder()
- .matchEthType(Ethernet.TYPE_IPV4)
- .matchIPSrc(statsFlowRule.srcIpPrefix())
- .matchIPDst(statsFlowRule.dstIpPrefix());
- if (statsFlowRule.ipProtocol() == IPv4.PROTOCOL_TCP) {
- selector = selector.matchIPProtocol(statsFlowRule.ipProtocol())
- .matchTcpSrc(statsFlowRule.srcTpPort())
- .matchTcpDst(statsFlowRule.dstTpPort());
- } else if (statsFlowRule.ipProtocol() == IPv4.PROTOCOL_UDP) {
- selector = selector.matchIPProtocol(statsFlowRule.ipProtocol())
- .matchUdpSrc(statsFlowRule.srcTpPort())
- .matchUdpDst(statsFlowRule.dstTpPort());
- }
- }
+ setStatFlowRule(statsFlowRule, true);
- TrafficTreatment.Builder treatment = DefaultTrafficTreatment.builder();
- treatment.transition(toTable);
- FlowRule flowRule = DefaultFlowRule.builder()
- .forDevice(deviceId)
- .withSelector(selector.build())
- .withTreatment(treatment.build())
- .withPriority(prefixLength)
- .fromApp(appId)
- .makePermanent()
- .forTable(fromTable)
- .build();
- applyRule(flowRule, installFlag);
- } catch (Exception ex) {
- log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
+ log.info("Install stat flow rule for SrcIp:{} DstIp:{}",
+ statsFlowRule.srcIpPrefix().toString(),
+ statsFlowRule.dstIpPrefix().toString());
+ }
+
+ @Override
+ public void deleteStatFlowRule(StatsFlowRule statsFlowRule) {
+ // FIXME: following code might not be necessary
+ flowRuleService.removeFlowRulesById(appId);
+
+ setStatFlowRule(statsFlowRule, false);
+
+ log.info("Remove stat flow rule for SrcIp:{} DstIp:{}",
+ statsFlowRule.srcIpPrefix().toString(),
+ statsFlowRule.dstIpPrefix().toString());
+ }
+
+ private void connectTables(DeviceId deviceId, int fromTable, int toTable,
+ StatsFlowRule statsFlowRule, int rulePriority,
+ boolean install) {
+
+ log.debug("Table Transition: {} -> {}", fromTable, toTable);
+ int srcPrefixLength = statsFlowRule.srcIpPrefix().prefixLength();
+ int dstPrefixLength = statsFlowRule.dstIpPrefix().prefixLength();
+ int prefixLength = rulePriority + srcPrefixLength + dstPrefixLength;
+ byte protocol = statsFlowRule.ipProtocol();
+
+ TrafficSelector.Builder selectorBuilder =
+ DefaultTrafficSelector.builder()
+ .matchEthType(TYPE_IPV4)
+ .matchIPSrc(statsFlowRule.srcIpPrefix())
+ .matchIPDst(statsFlowRule.dstIpPrefix());
+
+ if (protocol == PROTOCOL_TCP) {
+ selectorBuilder = selectorBuilder
+ .matchIPProtocol(statsFlowRule.ipProtocol())
+ .matchTcpSrc(statsFlowRule.srcTpPort())
+ .matchTcpDst(statsFlowRule.dstTpPort());
+
+ } else if (protocol == PROTOCOL_UDP) {
+ selectorBuilder = selectorBuilder
+ .matchIPProtocol(statsFlowRule.ipProtocol())
+ .matchUdpSrc(statsFlowRule.srcTpPort())
+ .matchUdpDst(statsFlowRule.dstTpPort());
+ } else {
+ log.warn("Unsupported protocol {}", statsFlowRule.ipProtocol());
}
+
+ TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
+
+ treatmentBuilder.transition(toTable);
+
+ FlowRule flowRule = DefaultFlowRule.builder()
+ .forDevice(deviceId)
+ .withSelector(selectorBuilder.build())
+ .withTreatment(treatmentBuilder.build())
+ .withPriority(prefixLength)
+ .fromApp(appId)
+ .makePermanent()
+ .forTable(fromTable)
+ .build();
+
+ applyRule(flowRule, install);
}
/**
- * Apply FlowRule to switch.
+ * Installs stats related flow rule to switch.
*
- * @param flowRule FlowRule
- * @param install Flag to install or not
+ * @param flowRule flow rule
+ * @param install flag to install or not
*/
private void applyRule(FlowRule flowRule, boolean install) {
- log.debug("Apply flow rule to bridge device");
FlowRuleOperations.Builder flowOpsBuilder = FlowRuleOperations.builder();
- flowOpsBuilder = install ? flowOpsBuilder.add(flowRule) : flowOpsBuilder.remove(flowRule);
+ flowOpsBuilder = install ?
+ flowOpsBuilder.add(flowRule) : flowOpsBuilder.remove(flowRule);
flowRuleService.apply(flowOpsBuilder.build(new FlowRuleOperationsContext() {
@Override
@@ -235,184 +249,97 @@
}
/**
- * Craete a flow rule.
+ * Gets a set of the flow infos.
*
- * @param flowRule flow rule for Openstack VMs
+ * @return a set of flow infos
*/
- @Override
- public void createFlowRule(StatsFlowRule flowRule) {
- try {
- log.debug("Create Flow Rule. SrcIp:{} DstIp:{}",
- flowRule.srcIpPrefix().toString(),
- flowRule.dstIpPrefix().toString());
+ public Set<FlowInfo> getFlowInfo() {
+ Set<FlowInfo> flowInfos = Sets.newConcurrentHashSet();
- // To make a inversed flow rule.
- DefaultStatsFlowRule.Builder inverseFlowRuleBuilder
- = DefaultStatsFlowRule
- .builder()
- .srcIpPrefix(flowRule.dstIpPrefix())
- .dstIpPrefix(flowRule.srcIpPrefix())
- .ipProtocol(flowRule.ipProtocol())
- .srcTpPort(flowRule.dstTpPort())
- .dstTpPort(flowRule.srcTpPort());
- StatsFlowRule inverseFlowRule = inverseFlowRuleBuilder.build();
- DeviceService deviceService = DefaultServiceDirectory.getService(DeviceService.class);
- Iterable<Device> devices = deviceService.getDevices();
- for (Device d : devices) {
- log.debug("Device: {}", d.toString());
- if (d.type() == Device.Type.CONTROLLER) {
- log.info("Don't create flow rule for 'DeviceType=CONTROLLER' ({})",
- d.id().toString());
- continue;
- }
- connectTables(d.id(), FLOW_TABLE_VM_SOURCE, FLOW_TABLE_DHCP_ARP,
- flowRule, METRIC_PRIORITY_SOURCE, true);
- connectTables(d.id(), FLOW_TABLE_VM_TARGET, FLOW_TABLE_FORWARDING,
- inverseFlowRule, METRIC_PRIORITY_TARGET, true);
+ // obtain all flow rule entries installed by telemetry app
+ for (FlowEntry entry : flowRuleService.getFlowEntriesById(appId)) {
+ FlowInfo.Builder fBuilder = new DefaultFlowInfo.DefaultBuilder();
+ TrafficSelector selector = entry.selector();
+
+ IPCriterion srcIp = (IPCriterion) selector.getCriterion(IPV4_SRC);
+ IPCriterion dstIp = (IPCriterion) selector.getCriterion(IPV4_DST);
+ IPProtocolCriterion ipProtocol =
+ (IPProtocolCriterion) selector.getCriterion(IP_PROTO);
+
+ log.debug("[FlowInfo] TableID:{} SRC_IP:{} DST_IP:{} Pkt:{} Byte:{}",
+ ((IndexTableId) entry.table()).id(),
+ srcIp.ip().toString(),
+ dstIp.ip().toString(),
+ entry.packets(),
+ entry.bytes());
+
+ fBuilder.withFlowType(FLOW_TYPE_SONA)
+ .withSrcIp(srcIp.ip())
+ .withDstIp(dstIp.ip())
+ .withProtocol((byte) ipProtocol.protocol());
+
+ if (ipProtocol.protocol() == PROTOCOL_TCP) {
+ TcpPortCriterion tcpSrc =
+ (TcpPortCriterion) selector.getCriterion(TCP_SRC);
+ TcpPortCriterion tcpDst =
+ (TcpPortCriterion) selector.getCriterion(TCP_DST);
+
+ log.debug("TCP SRC Port: {}, DST Port: {}",
+ tcpSrc.tcpPort().toInt(),
+ tcpDst.tcpPort().toInt());
+
+ fBuilder.withSrcPort(tcpSrc.tcpPort());
+ fBuilder.withDstPort(tcpDst.tcpPort());
+
+ } else if (ipProtocol.protocol() == PROTOCOL_UDP) {
+
+ UdpPortCriterion udpSrc =
+ (UdpPortCriterion) selector.getCriterion(UDP_SRC);
+ UdpPortCriterion udpDst =
+ (UdpPortCriterion) selector.getCriterion(UDP_DST);
+
+ log.debug("UDP SRC Port: {}, DST Port: {}",
+ udpSrc.udpPort().toInt(),
+ udpDst.udpPort().toInt());
+
+ fBuilder.withSrcPort(udpSrc.udpPort());
+ fBuilder.withDstPort(udpDst.udpPort());
+ } else {
+ log.debug("Other protocol: {}", ipProtocol.protocol());
}
- } catch (Exception ex) {
- log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
+
+ fBuilder.withSrcMac(getMacAddress(srcIp.ip().address()))
+ .withDstMac(getMacAddress(dstIp.ip().address()))
+ .withInputInterfaceId(getInterfaceId(srcIp.ip().address()))
+ .withOutputInterfaceId(getInterfaceId(dstIp.ip().address()))
+ .withVlanId(getVlanId(srcIp.ip().address()))
+ .withDeviceId(entry.deviceId());
+
+ StatsInfo.Builder sBuilder = new DefaultStatsInfo.DefaultBuilder();
+
+ // TODO: need to collect error and drop packets stats
+ // TODO: need to make the refresh interval configurable
+ sBuilder.withStartupTime(0)
+ .withCurrAccPkts((int) entry.packets())
+ .withCurrAccBytes(entry.bytes())
+ .withErrorPkts((short) 0)
+ .withDropPkts((short) 0)
+ .withLstPktOffset(REFRESH_INTERVAL * MILLISECONDS);
+
+ fBuilder.withStatsInfo(sBuilder.build());
+
+ FlowInfo flowInfo = mergeFlowInfo(fBuilder.build(), fBuilder, sBuilder);
+
+ flowInfos.add(flowInfo);
+
+ log.debug("FlowInfo: \n{}", flowInfo.toString());
}
+
+ return flowInfos;
}
/**
- * Get FlowRule.
- *
- * @param flowRule Flow rule for a VM
- * @return Set of FlowInfo
- */
- public Set<FlowInfo> getFlowRule(StatsFlowRule flowRule) {
- Set<FlowInfo> flowInfoSet = new HashSet<>();
- log.info("Get flow rule: {}", flowRule.toString());
- // TODO Make a implementation here.
- return flowInfoSet;
- }
-
- /**
- * Delete FlowRule for StatsInfo.
- *
- * @param flowRule Flow rule for Openstack VM
- */
- @Override
- public void deleteFlowRule(StatsFlowRule flowRule) {
- log.debug("Delete Flow Rule: {}", flowRule.toString());
- flowRuleService = DefaultServiceDirectory.getService(FlowRuleService.class);
- flowRuleService.removeFlowRulesById(appId);
- // TODO Write a implementation code here
-
- try {
- log.debug("Delete Flow Rule. SrcIp:{} DstIp:{}",
- flowRule.srcIpPrefix().toString(),
- flowRule.dstIpPrefix().toString());
-
- // To make a inversed flow rule.
- DefaultStatsFlowRule.Builder inverseFlowRuleBuilder
- = DefaultStatsFlowRule
- .builder()
- .srcIpPrefix(flowRule.dstIpPrefix())
- .dstIpPrefix(flowRule.srcIpPrefix())
- .ipProtocol(flowRule.ipProtocol())
- .srcTpPort(flowRule.dstTpPort())
- .dstTpPort(flowRule.srcTpPort());
- StatsFlowRule inverseFlowRule = inverseFlowRuleBuilder.build();
- DeviceService deviceService = DefaultServiceDirectory.getService(DeviceService.class);
- Iterable<Device> devices = deviceService.getDevices();
- for (Device d : devices) {
- log.debug("Device: {}", d.toString());
- if (d.type() == Device.Type.CONTROLLER) {
- log.info("Don't care for 'DeviceType=CONTROLLER' ({})",
- d.id().toString());
- continue;
- }
- connectTables(d.id(), FLOW_TABLE_VM_SOURCE, FLOW_TABLE_DHCP_ARP,
- flowRule, METRIC_PRIORITY_SOURCE, false);
- connectTables(d.id(), FLOW_TABLE_VM_TARGET, FLOW_TABLE_FORWARDING,
- inverseFlowRule, METRIC_PRIORITY_TARGET, false);
- }
- } catch (Exception ex) {
- log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
- }
- }
-
- /**
- * Get a list of the FlowRule Store.
- *
- * @return list of Flow Rule
- */
- public Set<FlowInfo> getFlowRule() {
- log.debug("Get Flow Information List");
- Set<FlowInfo> flowInfoSet = new HashSet<>();
- try {
- flowRuleService = DefaultServiceDirectory.getService(FlowRuleService.class);
- Iterable<FlowEntry> flowEntries = flowRuleService.getFlowEntriesById(appId);
-
- for (FlowEntry entry : flowEntries) {
- FlowInfo.Builder fBuilder = new DefaultFlowInfo.DefaultBuilder();
- IPCriterion srcIpCriterion =
- (IPCriterion) entry.selector().getCriterion(Criterion.Type.IPV4_SRC);
- IPCriterion dstIpCriterion =
- (IPCriterion) entry.selector().getCriterion(Criterion.Type.IPV4_DST);
- IPProtocolCriterion ipProtocolCriterion =
- (IPProtocolCriterion) entry.selector().getCriterion(Criterion.Type.IP_PROTO);
-
- log.debug("[FlowInfo] TableID:{} SRC_IP:{} DST_IP:{} Pkt:{} Byte:{}",
- ((IndexTableId) entry.table()).id(),
- srcIpCriterion.ip().toString(), dstIpCriterion.ip().toString(),
- entry.packets(), entry.bytes());
-
- fBuilder.withFlowType(FLOW_TYPE_SONA).withSrcIp(srcIpCriterion.ip())
- .withDstIp(dstIpCriterion.ip())
- .withProtocol((byte) ipProtocolCriterion.protocol());
-
- if (ipProtocolCriterion.protocol() == IPv4.PROTOCOL_TCP) {
- TcpPortCriterion tcpSrcCriterion =
- (TcpPortCriterion) entry.selector().getCriterion(Criterion.Type.TCP_SRC);
- TcpPortCriterion tcpDstCriterion =
- (TcpPortCriterion) entry.selector().getCriterion(Criterion.Type.TCP_DST);
- log.debug("TCP SRC Port: {} Dst Port: {}",
- tcpSrcCriterion.tcpPort().toInt(), tcpDstCriterion.tcpPort().toInt());
- fBuilder.withSrcPort(tcpSrcCriterion.tcpPort());
- fBuilder.withDstPort(tcpDstCriterion.tcpPort());
- } else if (ipProtocolCriterion.protocol() == IPv4.PROTOCOL_UDP) {
- UdpPortCriterion udpSrcCriterion =
- (UdpPortCriterion) entry.selector().getCriterion(Criterion.Type.UDP_SRC);
- UdpPortCriterion udpDstCriterion =
- (UdpPortCriterion) entry.selector().getCriterion(Criterion.Type.UDP_DST);
- log.debug("UDP SRC Port: {} Dst Port: {}",
- udpSrcCriterion.udpPort().toInt(), udpDstCriterion.udpPort().toInt());
- fBuilder.withSrcPort(udpSrcCriterion.udpPort());
- fBuilder.withDstPort(udpDstCriterion.udpPort());
- } else {
- log.debug("Other protocol: {}", ipProtocolCriterion.protocol());
- }
-
- fBuilder.withSrcMac(getMacAddress(srcIpCriterion.ip().address()))
- .withDstMac(getMacAddress(dstIpCriterion.ip().address()))
- .withInputInterfaceId(getInterfaceId(srcIpCriterion.ip().address()))
- .withOutputInterfaceId(getInterfaceId(dstIpCriterion.ip().address()))
- .withVlanId(getVlanId(srcIpCriterion.ip().address()))
- .withDeviceId(entry.deviceId());
-
- StatsInfo.Builder sBuilder = new DefaultStatsInfo.DefaultBuilder();
- sBuilder.withStartupTime(0)
- .withCurrAccPkts((int) entry.packets()).withCurrAccBytes(entry.bytes())
- .withErrorPkts((short) 0).withDropPkts((short) 0)
- .withLstPktOffset(REFRESH_INTERVAL * MILLISECONDS);
-
- fBuilder.withStatsInfo(sBuilder.build());
-
- FlowInfo flowInfo = mergeFlowInfo(fBuilder.build(), fBuilder, sBuilder);
- flowInfoSet.add(flowInfo);
- log.debug("FlowInfo: \n{}", flowInfo.toString());
- }
- } catch (Exception ex) {
- log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
- }
- return flowInfoSet;
- }
-
- /**
- * Merge old FlowInfo.StatsInfo and current FlowInfo.StatsInfo.
+ * Merges old FlowInfo.StatsInfo and current FlowInfo.StatsInfo.
*
* @param flowInfo current FlowInfo object
* @param fBuilder Builder for FlowInfo
@@ -422,39 +349,53 @@
private FlowInfo mergeFlowInfo(FlowInfo flowInfo,
FlowInfo.Builder fBuilder,
StatsInfo.Builder sBuilder) {
- try {
- log.debug("Current FlowInfo:\n{}", flowInfo.toString());
- for (FlowInfo gFlowInfo: gFlowInfoSet) {
- log.debug("Old FlowInfo:\n{}", gFlowInfo.toString());
- if (gFlowInfo.deviceId().equals(flowInfo.deviceId()) &&
- gFlowInfo.srcIp().equals(flowInfo.srcIp()) &&
- gFlowInfo.dstIp().equals(flowInfo.dstIp()) &&
- gFlowInfo.srcPort().equals(flowInfo.srcPort()) &&
- gFlowInfo.dstPort().equals(flowInfo.dstPort()) &&
- (gFlowInfo.protocol() == flowInfo.protocol())
- ) {
- // Get old StatsInfo object and merge the value to current object.
- StatsInfo oldStatsInfo = gFlowInfo.statsInfo();
- sBuilder.withPrevAccPkts(oldStatsInfo.currAccPkts());
- sBuilder.withPrevAccBytes(oldStatsInfo.currAccBytes());
- FlowInfo newFlowInfo = fBuilder.withStatsInfo(sBuilder.build()).build();
- gFlowInfoSet.remove(gFlowInfo);
- gFlowInfoSet.add(newFlowInfo);
- log.info("Old FlowInfo found, Merge this {}", newFlowInfo.toString());
- return newFlowInfo;
- }
+ for (FlowInfo gFlowInfo : gFlowInfoSet) {
+ log.debug("Old FlowInfo:\n{}", gFlowInfo.toString());
+ if (gFlowInfo.roughEquals(flowInfo)) {
+
+ // Get old StatsInfo object and merge the value to current object.
+ StatsInfo oldStatsInfo = gFlowInfo.statsInfo();
+ sBuilder.withPrevAccPkts(oldStatsInfo.currAccPkts());
+ sBuilder.withPrevAccBytes(oldStatsInfo.currAccBytes());
+ FlowInfo newFlowInfo = fBuilder.withStatsInfo(sBuilder.build())
+ .build();
+
+ gFlowInfoSet.remove(gFlowInfo);
+ gFlowInfoSet.add(newFlowInfo);
+ log.info("Old FlowInfo found, Merge this {}", newFlowInfo.toString());
+ return newFlowInfo;
}
- // No such record, then build the FlowInfo object and return this object.
- log.info("No FlowInfo found, add new FlowInfo {}", flowInfo.toString());
- FlowInfo newFlowInfo = fBuilder.withStatsInfo(sBuilder.build()).build();
- gFlowInfoSet.add(newFlowInfo);
- return newFlowInfo;
- } catch (Exception ex) {
- log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
}
- log.debug("Add this FlowInfo {}", flowInfo.toString());
- gFlowInfoSet.add(flowInfo);
- return flowInfo;
+
+ // No such record, then build the FlowInfo object and return this object.
+ log.info("No FlowInfo found, add new FlowInfo {}", flowInfo.toString());
+ FlowInfo newFlowInfo = fBuilder.withStatsInfo(sBuilder.build()).build();
+ gFlowInfoSet.add(newFlowInfo);
+ return newFlowInfo;
+ }
+
+ private void setStatFlowRule(StatsFlowRule statsFlowRule, boolean install) {
+ StatsFlowRule inverseFlowRule = DefaultStatsFlowRule.builder()
+ .srcIpPrefix(statsFlowRule.dstIpPrefix())
+ .dstIpPrefix(statsFlowRule.srcIpPrefix())
+ .ipProtocol(statsFlowRule.ipProtocol())
+ .srcTpPort(statsFlowRule.dstTpPort())
+ .dstTpPort(statsFlowRule.srcTpPort())
+ .build();
+
+ // FIXME: install stat flow rules for all devices for now
+ // need to query the device where the host with the given IP located
+ for (Device d : deviceService.getDevices()) {
+ if (d.type() == Device.Type.CONTROLLER) {
+ log.info("Not provide stats for 'CONTROLLER' ({})", d.id().toString());
+ continue;
+ }
+
+ connectTables(d.id(), STAT_INBOUND_TABLE, DHCP_ARP_TABLE,
+ statsFlowRule, METRIC_PRIORITY_SOURCE, install);
+ connectTables(d.id(), STAT_OUTBOUND_TABLE, FORWARDING_TABLE,
+ inverseFlowRule, METRIC_PRIORITY_TARGET, install);
+ }
}
/**
@@ -463,14 +404,10 @@
* @param ipAddress IP Address of host
* @return VLAN ID
*/
- public VlanId getVlanId(IpAddress ipAddress) {
- try {
- if (!hostService.getHostsByIp(ipAddress).isEmpty()) {
- Host host = hostService.getHostsByIp(ipAddress).stream().findAny().get();
- return host.vlan();
- }
- } catch (Exception ex) {
- log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
+ private VlanId getVlanId(IpAddress ipAddress) {
+ if (!hostService.getHostsByIp(ipAddress).isEmpty()) {
+ Host host = hostService.getHostsByIp(ipAddress).stream().findAny().get();
+ return host.vlan();
}
return VlanId.vlanId();
}
@@ -481,14 +418,10 @@
* @param ipAddress IP Address of host
* @return Interface ID of Switch
*/
- public int getInterfaceId(IpAddress ipAddress) {
- try {
- if (!hostService.getHostsByIp(ipAddress).isEmpty()) {
- Host host = hostService.getHostsByIp(ipAddress).stream().findAny().get();
- return (int) host.location().port().toLong();
- }
- } catch (Exception ex) {
- log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
+ private int getInterfaceId(IpAddress ipAddress) {
+ if (!hostService.getHostsByIp(ipAddress).isEmpty()) {
+ Host host = hostService.getHostsByIp(ipAddress).stream().findAny().get();
+ return (int) host.location().port().toLong();
}
return -1;
}
@@ -499,15 +432,24 @@
* @param ipAddress IP Address of host
* @return MAC Address of host
*/
- public MacAddress getMacAddress(IpAddress ipAddress) {
- try {
- if (!hostService.getHostsByIp(ipAddress).isEmpty()) {
- Host host = hostService.getHostsByIp(ipAddress).stream().findAny().get();
- return host.mac();
- }
- } catch (Exception ex) {
- log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
+ private MacAddress getMacAddress(IpAddress ipAddress) {
+ if (!hostService.getHostsByIp(ipAddress).isEmpty()) {
+ Host host = hostService.getHostsByIp(ipAddress).stream().findAny().get();
+ return host.mac();
}
+
return NO_HOST_MAC;
}
+
+ private class InternalTimerTask extends TimerTask {
+ @Override
+ public void run() {
+ log.debug("Timer Task Thread Starts ({})", loopCount++);
+ try {
+ telemetryService.publish(getFlowInfo());
+ } catch (Exception ex) {
+ log.error("Exception Stack:\n{}", ExceptionUtils.getStackTrace(ex));
+ }
+ }
+ }
}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/web/OpenstackTelemetryWebResource.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/web/OpenstackTelemetryWebResource.java
index 9b5209b..2efab8b 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/web/OpenstackTelemetryWebResource.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/web/OpenstackTelemetryWebResource.java
@@ -61,7 +61,8 @@
private static final String JSON_NODE_FLOW_RULE = "rules";
private static final String FLOW_RULE_ID = "STATS_FLOW_RULE_ID";
- private final StatsFlowRuleAdminService statsFlowRuleService = get(StatsFlowRuleAdminService.class);
+ private final StatsFlowRuleAdminService
+ statsFlowRuleService = get(StatsFlowRuleAdminService.class);
@Context
private UriInfo uriInfo;
@@ -81,7 +82,7 @@
readNodeConfiguration(input).forEach(flowRule -> {
log.debug("FlowRule: {}", flowRule.toString());
- statsFlowRuleService.createFlowRule(flowRule);
+ statsFlowRuleService.createStatFlowRule(flowRule);
});
UriBuilder locationBuilder = uriInfo.getBaseUriBuilder()
@@ -102,13 +103,12 @@
readNodeConfiguration(input).forEach(flowRule -> {
log.debug("FlowRule: {}", flowRule.toString());
- statsFlowRuleService.deleteFlowRule(flowRule);
+ statsFlowRuleService.deleteStatFlowRule(flowRule);
});
return ok(root).build();
}
-
/**
* Get flow rules which is installed on ONOS.
*
@@ -120,7 +120,6 @@
return ok(root).build();
}
-
/**
* Get flow information list.
*
@@ -133,7 +132,7 @@
log.info("GET BULK FLOW RULE");
Set<FlowInfo> flowInfoSet;
- flowInfoSet = statsFlowRuleService.getFlowRule();
+ flowInfoSet = statsFlowRuleService.getFlowInfo();
log.info("\n\n======================================================\n" +
"FlowInfo Set: \n{}" +
@@ -160,7 +159,6 @@
return ok(root).build();
}
-
private Set<StatsFlowRule> readNodeConfiguration(InputStream input) {
log.info("Input JSON Data: \n\t\t{}", input.toString());
Set<StatsFlowRule> flowRuleSet = Sets.newHashSet();
@@ -187,17 +185,4 @@
return flowRuleSet;
}
-
- /**
- * OpenstackTelemetryImpl method.
- *
- * @return 200 OK
- *
- * @onos.rsModel dummy
- */
- @GET
- @Produces(MediaType.APPLICATION_JSON)
- public Response dummy() {
- return ok(root).build();
- }
}
diff --git a/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/codec/FlowInfoJsonCodecTest.java b/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/codec/FlowInfoJsonCodecTest.java
index aed2b76..24fe947 100644
--- a/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/codec/FlowInfoJsonCodecTest.java
+++ b/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/codec/FlowInfoJsonCodecTest.java
@@ -61,7 +61,7 @@
private static final int VLAN_ID = 1;
private static final int PROTOCOL = 1;
private static final int FLOW_TYPE = 1;
- private static final String DEVICE_ID = "foo";
+ private static final String DEVICE_ID = "of:00000000000000a1";
private static final String SRC_IP_ADDRESS = "10.10.10.1";
private static final int SRC_IP_PREFIX = 24;
diff --git a/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/codec/TinaFlowInfoByteBufferCodecTest.java b/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/codec/TinaFlowInfoByteBufferCodecTest.java
index f8bc900..9e3294d 100644
--- a/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/codec/TinaFlowInfoByteBufferCodecTest.java
+++ b/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/codec/TinaFlowInfoByteBufferCodecTest.java
@@ -37,7 +37,7 @@
public final class TinaFlowInfoByteBufferCodecTest {
private static final byte FLOW_TYPE = 1;
- private static final DeviceId DEVICE_ID = DeviceId.deviceId("1234");
+ private static final DeviceId DEVICE_ID = DeviceId.deviceId("of:00000000000000a1");
private static final int INPUT_INTERFACE_ID = 10;
private static final int OUTPUT_INTERFACE_ID = 10;
private static final VlanId VLAN_ID = VlanId.vlanId("100");