Refactor: move telemetry config from componentCfg to configs.xml
1. Support to export the metrics to multiple targets
2. Add a set of properties to kafka config (key, topic, etc.)
3. Add distributedStore to manage telemetry configs
4. Add CLI to query stored telemetry configs
5. Add a set of telemetry loaders to import xml definitions
6. Add unit tests for telemetry cfg, xml cfg loader and dist store
7. Add missing javadoc for a set of implementation classes
Change-Id: I39480c9a6ac07357184d2e1094b9c9f4d36fd8b1
diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/Constants.java b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/Constants.java
index bf0e013..3ebf105 100644
--- a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/Constants.java
+++ b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/Constants.java
@@ -32,6 +32,12 @@
public static final String VLAN = "VLAN";
public static final String FLAT = "FLAT";
+ public static final String GRPC_SCHEME = "grpc";
+ public static final String KAFKA_SCHEME = "kafka";
+ public static final String INFLUXDB_SCHEME = "influxdb";
+ public static final String PROMETHEUS_SCHEME = "prometheus";
+ public static final String REST_SCHEME = "rest";
+
// default configuration variables for ONOS GUI
public static final int DEFAULT_DATA_POINT_SIZE = 17280;
}
diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/GrpcTelemetryConfigService.java b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/GrpcTelemetryConfigService.java
deleted file mode 100644
index d95d752..0000000
--- a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/GrpcTelemetryConfigService.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.openstacktelemetry.api;
-
-/**
- * Configuration service API for publishing openstack telemetry through gRPC producer.
- */
-public interface GrpcTelemetryConfigService extends TelemetryConfigService {
-}
diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/GrpcTelemetryService.java b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/GrpcTelemetryService.java
index e885b7e..fd4e08b 100644
--- a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/GrpcTelemetryService.java
+++ b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/GrpcTelemetryService.java
@@ -27,4 +27,14 @@
* @return a response from gRPC server
*/
Object publish(Object record);
+
+ /**
+ * Returns GRPC telemetry service type.
+ *
+ * @return GRPC telemetry service type
+ */
+ @Override
+ default ServiceType type() {
+ return ServiceType.GRPC;
+ }
}
diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/InfluxDbTelemetryConfigService.java b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/InfluxDbTelemetryConfigService.java
deleted file mode 100644
index f525d01..0000000
--- a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/InfluxDbTelemetryConfigService.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.openstacktelemetry.api;
-
-/**
- * Configuration service API for publishing openstack telemetry through InfluxDB producer.
- */
-public interface InfluxDbTelemetryConfigService extends TelemetryConfigService {
-}
diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/InfluxDbTelemetryService.java b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/InfluxDbTelemetryService.java
index 4a1bd5c..868c3e4 100644
--- a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/InfluxDbTelemetryService.java
+++ b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/InfluxDbTelemetryService.java
@@ -28,4 +28,14 @@
* @param record a network metric to be published
*/
void publish(InfluxRecord<String, Set<FlowInfo>> record);
+
+ /**
+ * Returns influxDB telemetry service type.
+ *
+ * @return influxDB telemetry service type
+ */
+ @Override
+ default ServiceType type() {
+ return ServiceType.INFLUXDB;
+ }
}
diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/KafkaTelemetryConfigService.java b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/KafkaTelemetryConfigService.java
deleted file mode 100644
index 9b8e827..0000000
--- a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/KafkaTelemetryConfigService.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.openstacktelemetry.api;
-
-/**
- * Configuration service API for publishing openstack telemetry through Kafka producer.
- */
-public interface KafkaTelemetryConfigService extends TelemetryConfigService {
-}
diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/KafkaTelemetryService.java b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/KafkaTelemetryService.java
index caa1f28..8135221 100644
--- a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/KafkaTelemetryService.java
+++ b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/KafkaTelemetryService.java
@@ -15,9 +15,9 @@
*/
package org.onosproject.openstacktelemetry.api;
-import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
+import java.util.Set;
import java.util.concurrent.Future;
/**
@@ -28,8 +28,18 @@
/**
* Publishes openstack telemetry to Kafka server.
*
- * @param record a network metric to be published
+ * @param flowInfos network metrics to be published
* @return metadata for a record that has been acknowledged
*/
- Future<RecordMetadata> publish(ProducerRecord<String, byte[]> record);
+ Set<Future<RecordMetadata>> publish(Set<FlowInfo> flowInfos);
+
+ /**
+ * Returns kafka telemetry service type.
+ *
+ * @return kafka telemetry service type
+ */
+ @Override
+ default ServiceType type() {
+ return ServiceType.KAFKA;
+ }
}
diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/OpenstackTelemetryService.java b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/OpenstackTelemetryService.java
index 8d5df0b..850e2e7 100644
--- a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/OpenstackTelemetryService.java
+++ b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/OpenstackTelemetryService.java
@@ -27,14 +27,14 @@
*
* @param telemetryService telemetry service
*/
- void addTelemetryService(TelemetryService telemetryService);
+ void addTelemetryService(TelemetryAdminService telemetryService);
/**
* Unregisters an existing northbound telemetry service.
*
* @param telemetryService telemetry service
*/
- void removeTelemetryService(TelemetryService telemetryService);
+ void removeTelemetryService(TelemetryAdminService telemetryService);
/**
* Publishes new flow information to off-platform application through
@@ -49,5 +49,13 @@
*
* @return telemetry services
*/
- Set<TelemetryService> telemetryServices();
+ Set<TelemetryAdminService> telemetryServices();
+
+ /**
+ * Obtains a specific openstack telemetry service.
+ *
+ * @param type telemetry type
+ * @return telemetry service instance
+ */
+ TelemetryAdminService telemetryService(String type);
}
diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/PrometheusTelemetryService.java b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/PrometheusTelemetryService.java
index 2680ab1..23c2192 100644
--- a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/PrometheusTelemetryService.java
+++ b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/PrometheusTelemetryService.java
@@ -27,4 +27,14 @@
* @param flowInfos a network metric to be published
*/
void publish(Set<FlowInfo> flowInfos);
+
+ /**
+ * Returns prometheus telemetry service type.
+ *
+ * @return prometheus telemetry service type
+ */
+ @Override
+ default ServiceType type() {
+ return ServiceType.PROMETHEUS;
+ }
}
diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/RestTelemetryConfigService.java b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/RestTelemetryConfigService.java
deleted file mode 100644
index f3b60ee..0000000
--- a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/RestTelemetryConfigService.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.openstacktelemetry.api;
-
-/**
- * Configuration service API for publishing openstack telemetry through REST producer.
- */
-public interface RestTelemetryConfigService extends TelemetryConfigService {
-}
diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/RestTelemetryService.java b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/RestTelemetryService.java
index ab3755e..e0fca83 100644
--- a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/RestTelemetryService.java
+++ b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/RestTelemetryService.java
@@ -16,6 +16,7 @@
package org.onosproject.openstacktelemetry.api;
import javax.ws.rs.core.Response;
+import java.util.Set;
/**
* Service API for publishing openstack telemetry through REST producer.
@@ -24,30 +25,21 @@
/**
* Publishes openstack telemetry to REST server.
- *
- * @param endpoint an endpoint URL
- * @param method HTTP method
- * @param record network metrics
- * @return response from REST server
- */
- Response publish(String endpoint, String method, String record);
-
- /**
- * Publishes openstack telemetry to REST server.
- *
- * @param method HTTP method
- * @param record network metrics
- * @return response from REST server
- */
- Response publish(String method, String record);
-
- /**
- * Publishes openstack telemetry to REST server.
* By default, the client sends record using the HTTP method configured in
* RestTelemetryConfig.
*
* @param record network metrics
* @return response from REST server
*/
- Response publish(String record);
+ Set<Response> publish(String record);
+
+ /**
+ * Returns REST telemetry service type.
+ *
+ * @return REST telemetry service type
+ */
+ @Override
+ default ServiceType type() {
+ return ServiceType.REST;
+ }
}
diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/TelemetryAdminService.java b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/TelemetryAdminService.java
index d5ee6ad..62398db 100644
--- a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/TelemetryAdminService.java
+++ b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/TelemetryAdminService.java
@@ -15,8 +15,6 @@
*/
package org.onosproject.openstacktelemetry.api;
-import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
-
/**
* Admin service API for publishing openstack telemetry.
*/
@@ -24,10 +22,8 @@
/**
* Prepares and launches the telemetry producer.
- *
- * @param config telemetry server config
*/
- void start(TelemetryConfig config);
+ void start();
/**
* Terminates the telemetry producer.
@@ -36,8 +32,6 @@
/**
* Restarts the telemetry producer.
- *
- * @param config telemetry server config
*/
- void restart(TelemetryConfig config);
+ void restart();
}
diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/PrometheusTelemetryConfigService.java b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/TelemetryCodec.java
similarity index 67%
copy from apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/PrometheusTelemetryConfigService.java
copy to apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/TelemetryCodec.java
index 4e115de..1f0f737 100644
--- a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/PrometheusTelemetryConfigService.java
+++ b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/TelemetryCodec.java
@@ -15,8 +15,19 @@
*/
package org.onosproject.openstacktelemetry.api;
+import java.nio.ByteBuffer;
+import java.util.Set;
+
/**
- * Configuration service API for publishing openstack telemetry through Prometheus producer.
+ * Telemetry codec interface.
*/
-public interface PrometheusTelemetryConfigService extends TelemetryConfigService {
+public interface TelemetryCodec {
+
+ /**
+ * Encodes a collection flow infos into byte buffer.
+ *
+ * @param flowInfos a collection of flow info
+ * @return encoded byte buffer
+ */
+ ByteBuffer encode(Set<FlowInfo> flowInfos);
}
diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/TelemetryConfigAdminService.java b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/TelemetryConfigAdminService.java
new file mode 100644
index 0000000..18ee89b
--- /dev/null
+++ b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/TelemetryConfigAdminService.java
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.api;
+
+import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
+
+import java.util.Set;
+
+/**
+ * Service for mapping telemetry configurations implementations.
+ */
+public interface TelemetryConfigAdminService extends TelemetryConfigService {
+
+ /**
+ * Returns the set of telemetry configurations currently registered.
+ *
+ * @return registered telemetry configurations
+ */
+ Set<TelemetryConfigProvider> getProviders();
+
+ /**
+ * Registers the specified telemetry configuration provider.
+ *
+ * @param provider configuration provider to register
+ */
+ void registerProvider(TelemetryConfigProvider provider);
+
+ /**
+ * Unregisters the specified telemetry configuration provider.
+ *
+ * @param provider configuration provider to unregister
+ */
+ void unregisterProvider(TelemetryConfigProvider provider);
+
+ /**
+ * Updates an existing telemetry configuration.
+ *
+ * @param config telemetry configuration
+ */
+ void updateTelemetryConfig(TelemetryConfig config);
+}
diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/TelemetryConfigEvent.java b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/TelemetryConfigEvent.java
new file mode 100644
index 0000000..a0c2714
--- /dev/null
+++ b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/TelemetryConfigEvent.java
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.api;
+
+import org.onosproject.event.AbstractEvent;
+import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
+
+/**
+ * Describes telemetry config event.
+ */
+public class TelemetryConfigEvent extends AbstractEvent<TelemetryConfigEvent.Type, TelemetryConfig> {
+
+ /**
+ * Telemetry config event type.
+ */
+ public enum Type {
+ /**
+ * Signifies that a new telemetry config is added.
+ */
+ CONFIG_ADDED,
+ /**
+ * Signifies that an existing telemetry config is updated.
+ */
+ CONFIG_UPDATED,
+ /**
+ * Signifies that an existing telemetry config is removed.
+ */
+ CONFIG_DELETED
+ }
+
+ /**
+ * Creates an event of a given type for the specified telemetry config.
+ *
+ * @param type telemetry config type
+ * @param config telemetry config
+ */
+ public TelemetryConfigEvent(Type type, TelemetryConfig config) {
+ super(type, config);
+ }
+}
diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/PrometheusTelemetryConfigService.java b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/TelemetryConfigListener.java
similarity index 79%
rename from apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/PrometheusTelemetryConfigService.java
rename to apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/TelemetryConfigListener.java
index 4e115de..cef78af 100644
--- a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/PrometheusTelemetryConfigService.java
+++ b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/TelemetryConfigListener.java
@@ -15,8 +15,10 @@
*/
package org.onosproject.openstacktelemetry.api;
+import org.onosproject.event.EventListener;
+
/**
- * Configuration service API for publishing openstack telemetry through Prometheus producer.
+ * Listener for telemetry config event.
*/
-public interface PrometheusTelemetryConfigService extends TelemetryConfigService {
+public interface TelemetryConfigListener extends EventListener<TelemetryConfigEvent> {
}
diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/PrometheusTelemetryConfigService.java b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/TelemetryConfigProvider.java
similarity index 62%
copy from apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/PrometheusTelemetryConfigService.java
copy to apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/TelemetryConfigProvider.java
index 4e115de..57b148e 100644
--- a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/PrometheusTelemetryConfigService.java
+++ b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/TelemetryConfigProvider.java
@@ -15,8 +15,19 @@
*/
package org.onosproject.openstacktelemetry.api;
+import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
+
+import java.util.Set;
+
/**
- * Configuration service API for publishing openstack telemetry through Prometheus producer.
+ * Represents entity capable of providing telemetry configurations.
*/
-public interface PrometheusTelemetryConfigService extends TelemetryConfigService {
+public interface TelemetryConfigProvider {
+
+ /**
+ * Returns the set of telemetry configuration to be made available by this provider.
+ *
+ * @return set of telemetry configurations
+ */
+ Set<TelemetryConfig> getTelemetryConfigs();
}
diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/TelemetryConfigService.java b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/TelemetryConfigService.java
index 51e4cea..76076d2 100644
--- a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/TelemetryConfigService.java
+++ b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/TelemetryConfigService.java
@@ -15,17 +15,39 @@
*/
package org.onosproject.openstacktelemetry.api;
+import org.onosproject.event.ListenerService;
import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
+import org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType;
+
+import java.util.Set;
/**
* Telemetry configuration service interface.
*/
-public interface TelemetryConfigService {
+public interface TelemetryConfigService
+ extends ListenerService<TelemetryConfigEvent, TelemetryConfigListener> {
/**
- * Obtains the telemetry configuration.
+ * Obtains the telemetry configuration with the given telemetry
+ * configuration name.
*
- * @return telemetry configuration
+ * @param name telemetry configuration name
+ * @return provided telemetry configuration
*/
- TelemetryConfig getConfig();
+ TelemetryConfig getConfig(String name);
+
+ /**
+ * Obtains the telemetry configuration with the given telemetry config type.
+ *
+ * @param type telemetry configuration type
+ * @return provided telemetry configurations
+ */
+ Set<TelemetryConfig> getConfigsByType(ConfigType type);
+
+ /**
+ * Returns the overall set of telemetry configurations being provided.
+ *
+ * @return provided telemetry configurations
+ */
+ Set<TelemetryConfig> getConfigs();
}
diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/TelemetryConfigStore.java b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/TelemetryConfigStore.java
new file mode 100644
index 0000000..42148f6
--- /dev/null
+++ b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/TelemetryConfigStore.java
@@ -0,0 +1,79 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.api;
+
+import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
+import org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType;
+import org.onosproject.store.Store;
+
+import java.util.Set;
+
+/**
+ * Manages inventory of telemetry config; not intended for direct use.
+ */
+public interface TelemetryConfigStore
+ extends Store<TelemetryConfigEvent, TelemetryConfigStoreDelegate> {
+
+ /**
+ * Creates a new telemetry config.
+ *
+ * @param config a telemetry config
+ */
+ void createTelemetryConfig(TelemetryConfig config);
+
+ /**
+ * Updates the existing telemetry config.
+ *
+ * @param config the existing telemetry config
+ */
+ void updateTelemetryConfig(TelemetryConfig config);
+
+ /**
+ * Removes the existing telemetry config.
+ *
+ * @param name telemetry config name
+ * @return the removed telemetry config
+ */
+ TelemetryConfig removeTelemetryConfig(String name);
+
+ /**
+ * Obtains the existing telemetry config.
+ *
+ * @param name telemetry config name
+ * @return queried telemetry config
+ */
+ TelemetryConfig telemetryConfig(String name);
+
+ /**
+ * Obtains a collection of all of telemetry configs.
+ *
+ * @return a collection of all of telemetry configs
+ */
+ Set<TelemetryConfig> telemetryConfigs();
+
+ /**
+ * Obtains a collection of telemetry configs by config type.
+ *
+ * @param type config type
+ * @return a collection of telemetry configs by config type
+ */
+ Set<TelemetryConfig> telemetryConfigsByType(ConfigType type);
+
+ /**
+ * Removes all telemetry configs.
+ */
+ void clear();
+}
diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/PrometheusTelemetryConfigService.java b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/TelemetryConfigStoreDelegate.java
similarity index 78%
copy from apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/PrometheusTelemetryConfigService.java
copy to apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/TelemetryConfigStoreDelegate.java
index 4e115de..8fa7720 100644
--- a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/PrometheusTelemetryConfigService.java
+++ b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/TelemetryConfigStoreDelegate.java
@@ -15,8 +15,10 @@
*/
package org.onosproject.openstacktelemetry.api;
+import org.onosproject.store.StoreDelegate;
+
/**
- * Configuration service API for publishing openstack telemetry through Prometheus producer.
+ * Telemetry config store delegate abstraction.
*/
-public interface PrometheusTelemetryConfigService extends TelemetryConfigService {
+public interface TelemetryConfigStoreDelegate extends StoreDelegate<TelemetryConfigEvent> {
}
diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/TelemetryService.java b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/TelemetryService.java
index 86ebb65..7806ff7 100644
--- a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/TelemetryService.java
+++ b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/TelemetryService.java
@@ -21,9 +21,51 @@
public interface TelemetryService {
/**
+ * Telemetry service type.
+ */
+ enum ServiceType {
+ /**
+ * Indicates KAFKA telemetry service.
+ */
+ KAFKA,
+
+ /**
+ * Indicates GRPC telemetry service.
+ */
+ GRPC,
+
+ /**
+ * Indicates REST telemetry service.
+ */
+ REST,
+
+ /**
+ * Indicates InfluxDB telemetry service.
+ */
+ INFLUXDB,
+
+ /**
+ * Indicates prometheus telemetry service.
+ */
+ PROMETHEUS,
+
+ /**
+ * Indicates unknown telemetry service.
+ */
+ UNKNOWN
+ }
+
+ /**
* Checks whether the telemetry service is running or not.
*
* @return telemetry service running status
*/
boolean isRunning();
+
+ /**
+ * Obtains the service type.
+ *
+ * @return service type
+ */
+ ServiceType type();
}
diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/config/GrpcTelemetryConfig.java b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/config/GrpcTelemetryConfig.java
index d7c8f62..7b4589d 100644
--- a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/config/GrpcTelemetryConfig.java
+++ b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/config/GrpcTelemetryConfig.java
@@ -20,7 +20,7 @@
/**
* Configuration API of gRPC for publishing openstack telemetry.
*/
-public interface GrpcTelemetryConfig extends TelemetryConfig {
+public interface GrpcTelemetryConfig extends TelemetryConfigProperties {
/**
* Obtains gRPC server IP address.
@@ -60,7 +60,7 @@
/**
* Builder class of GrpcTelemetryConfig.
*/
- interface Builder extends TelemetryConfig.Builder {
+ interface Builder extends TelemetryConfigProperties.Builder {
/**
* Sets gRPC server IP address.
@@ -109,4 +109,4 @@
*/
GrpcTelemetryConfig build();
}
-}
+}
\ No newline at end of file
diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/config/InfluxDbTelemetryConfig.java b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/config/InfluxDbTelemetryConfig.java
index 32c150f..38017d3 100644
--- a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/config/InfluxDbTelemetryConfig.java
+++ b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/config/InfluxDbTelemetryConfig.java
@@ -20,7 +20,7 @@
/**
* Configuration API of InfluxDB for publishing openstack telemetry.
*/
-public interface InfluxDbTelemetryConfig extends TelemetryConfig {
+public interface InfluxDbTelemetryConfig extends TelemetryConfigProperties {
/**
* Obtains InfluxDB server IP address.
@@ -81,7 +81,7 @@
/**
* Builder class of InfluxDbTelemetryConfig.
*/
- interface Builder extends TelemetryConfig.Builder {
+ interface Builder extends TelemetryConfigProperties.Builder {
/**
* Sets InfluxDB server IP address.
@@ -115,7 +115,7 @@
*/
Builder withPassword(String password);
- /**
+ /**
* Sets InfluxDB measurement.
*
* @param measurement InfluxDB measurement
@@ -154,4 +154,4 @@
*/
InfluxDbTelemetryConfig build();
}
-}
+}
\ No newline at end of file
diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/config/KafkaTelemetryConfig.java b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/config/KafkaTelemetryConfig.java
index 078433f..80c7737 100644
--- a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/config/KafkaTelemetryConfig.java
+++ b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/config/KafkaTelemetryConfig.java
@@ -20,7 +20,7 @@
/**
* Configuration API of Kafka for publishing openstack telemetry.
*/
-public interface KafkaTelemetryConfig extends TelemetryConfig {
+public interface KafkaTelemetryConfig extends TelemetryConfigProperties {
/**
* Obtains kafka IP address.
@@ -86,6 +86,27 @@
String valueSerializer();
/**
+ * Obtains kafka key.
+ *
+ * @return kafka key
+ */
+ String key();
+
+ /**
+ * Obtains kafka topic.
+ *
+ * @return kafka topic
+ */
+ String topic();
+
+ /**
+ * Obtains kafka message codec.
+ *
+ * @return kafka message codec
+ */
+ String codec();
+
+ /**
* Obtains kafka config maps.
*
* @return kafka config map
@@ -95,7 +116,7 @@
/**
* Builder class of KafkaTelemetryConfig.
*/
- interface Builder extends TelemetryConfig.Builder {
+ interface Builder extends TelemetryConfigProperties.Builder {
/**
* Sets kafka IP address.
@@ -170,6 +191,30 @@
Builder withValueSerializer(String valueSerializer);
/**
+ * Sets kafka key.
+ *
+ * @param key kafka key
+ * @return builder instance
+ */
+ Builder withKey(String key);
+
+ /**
+ * Sets kafka topic.
+ *
+ * @param topic kafka topic
+ * @return builder instance
+ */
+ Builder withTopic(String topic);
+
+ /**
+ * Sets kafka message codec.
+ *
+ * @param codec kafka message codec
+ * @return builder instance
+ */
+ Builder withCodec(String codec);
+
+ /**
* Sets other kafka configuration map.
*
* @param configMap kafka configuration map
@@ -184,4 +229,4 @@
*/
KafkaTelemetryConfig build();
}
-}
+}
\ No newline at end of file
diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/config/PrometheusTelemetryConfig.java b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/config/PrometheusTelemetryConfig.java
index 8a74929..de47a7e 100644
--- a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/config/PrometheusTelemetryConfig.java
+++ b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/config/PrometheusTelemetryConfig.java
@@ -17,7 +17,7 @@
import java.util.Map;
-public interface PrometheusTelemetryConfig extends TelemetryConfig {
+public interface PrometheusTelemetryConfig extends TelemetryConfigProperties {
/**
* Obtains prometheus exporter IP address.
@@ -44,7 +44,7 @@
/**
* Builder class of PrometheusTelemetryConfig.
*/
- interface Builder extends TelemetryConfig.Builder {
+ interface Builder extends TelemetryConfigProperties.Builder {
/**
* Sets prometheus exporter IP address.
@@ -77,4 +77,4 @@
*/
PrometheusTelemetryConfig build();
}
-}
+}
\ No newline at end of file
diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/config/RestTelemetryConfig.java b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/config/RestTelemetryConfig.java
index 0e9add5..781b0c9 100644
--- a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/config/RestTelemetryConfig.java
+++ b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/config/RestTelemetryConfig.java
@@ -20,7 +20,7 @@
/**
* Configuration API of REST for publishing openstack telemetry.
*/
-public interface RestTelemetryConfig extends TelemetryConfig {
+public interface RestTelemetryConfig extends TelemetryConfigProperties {
/**
* Obtains REST IP address.
@@ -74,7 +74,7 @@
/**
* Builder class for RestTelemetryConfig.
*/
- interface Builder extends TelemetryConfig.Builder {
+ interface Builder extends TelemetryConfigProperties.Builder {
/**
* Sets REST server IP address.
@@ -139,4 +139,4 @@
*/
RestTelemetryConfig build();
}
-}
+}
\ No newline at end of file
diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/config/TelemetryConfig.java b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/config/TelemetryConfig.java
index 1d31c86..a5d20cb 100644
--- a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/config/TelemetryConfig.java
+++ b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/config/TelemetryConfig.java
@@ -15,13 +15,132 @@
*/
package org.onosproject.openstacktelemetry.api.config;
+import org.onosproject.net.Annotations;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
/**
- * An interface for telemetryconfig.
+ * An interface for telemetry config.
*/
-public interface TelemetryConfig {
+public interface TelemetryConfig extends Annotations {
- Builder createBuilder();
+ /**
+ * Telemetry configuration type.
+ */
+ enum ConfigType {
+ /**
+ * Indicates KAFKA telemetry config.
+ */
+ KAFKA,
- interface Builder {
+ /**
+ * Indicates GRPC telemetry config.
+ */
+ GRPC,
+
+ /**
+ * Indicates REST telemetry config.
+ */
+ REST,
+
+ /**
+ * Indicates InfluxDB telemetry config.
+ */
+ INFLUXDB,
+
+ /**
+ * Indicates prometheus telemetry config.
+ */
+ PROMETHEUS,
+
+ /**
+ * Indicates unknown telemetry config.
+ */
+ UNKNOWN
}
+
+ /**
+ * Returns the telemetry configuration name.
+ *
+ * @return configuration name
+ */
+ String name();
+
+ /**
+ * Returns the telemetry configuration type.
+ *
+ * @return configuration type
+ */
+ ConfigType type();
+
+ /**
+ * Returns all the parent configurations from which this configuration inherits
+ * properties.
+ *
+ * @return list of parent configurations
+ */
+ List<TelemetryConfig> parents();
+
+ /**
+ * Returns the off-platform application manufacturer name.
+ *
+ * @return manufacturer name
+ */
+ String manufacturer();
+
+ /**
+ * Returns the off-platform application software version.
+ *
+ * @return software version
+ */
+ String swVersion();
+
+ /**
+ * Returns the service enable flag.
+ *
+ * @return enable flag
+ */
+ boolean enabled();
+
+ /**
+ * Returns the set of annotations as map of key/value properties.
+ *
+ * @return map of properties
+ */
+ Map<String, String> properties();
+
+ /**
+ * Gets the value of the given property name.
+ *
+ * @param name property name
+ * @return the value of the property,
+ * or null if the property is not defined in this configuration nor
+ * in any of its ancestors
+ */
+ String getProperty(String name);
+
+ /**
+ * Get the value of the given property name.
+ *
+ * @param name property name
+ * @param defaultValue to use if the property is not defined in this configuration
+ * nor in any of its ancestors
+ * @return the value of the property,
+ * or null if the property is not defined in this configuration nor
+ * in any of its ancestors
+ */
+ default String getProperty(String name, String defaultValue) {
+ return Optional.ofNullable(getProperty(name)).orElse(defaultValue);
+ }
+
+ /**
+ * Merges the specified config properties into this one, giving preference to
+ * the other config when dealing with conflicts.
+ *
+ * @param other other configuration
+ * @return merged configuration
+ */
+ TelemetryConfig merge(TelemetryConfig other);
}
diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/PrometheusTelemetryConfigService.java b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/config/TelemetryConfigProperties.java
similarity index 63%
copy from apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/PrometheusTelemetryConfigService.java
copy to apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/config/TelemetryConfigProperties.java
index 4e115de..d3ca479 100644
--- a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/PrometheusTelemetryConfigService.java
+++ b/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/config/TelemetryConfigProperties.java
@@ -13,10 +13,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.onosproject.openstacktelemetry.api;
+package org.onosproject.openstacktelemetry.api.config;
/**
- * Configuration service API for publishing openstack telemetry through Prometheus producer.
+ * An interface for telemetry config properties.
*/
-public interface PrometheusTelemetryConfigService extends TelemetryConfigService {
+public interface TelemetryConfigProperties {
+
+ /**
+ * Creates a telemetry config properties builder.
+ *
+ * @return builder
+ */
+ Builder createBuilder();
+
+ /**
+ * Builder interface.
+ */
+ interface Builder {
+ }
}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/cli/TelemetryConfigListCommand.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/cli/TelemetryConfigListCommand.java
new file mode 100644
index 0000000..992c93a
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/cli/TelemetryConfigListCommand.java
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.cli;
+
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigService;
+import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Lists Telemetry configurations.
+ */
+@Service
+@Command(scope = "onos", name = "telemetry-configs",
+ description = "Lists all Telemetry configurations")
+public class TelemetryConfigListCommand extends AbstractShellCommand {
+
+ private static final String FORMAT = "%-30s%-15s%-15s%-30s%-15s";
+ private static final String MASTER = "master";
+
+ @Override
+ protected void doExecute() {
+ TelemetryConfigService service = get(TelemetryConfigService.class);
+ List<TelemetryConfig> configs = service.getConfigs().stream()
+ .filter(c -> !c.swVersion().equals(MASTER))
+ .sorted(Comparator.comparing(TelemetryConfig::type))
+ .collect(Collectors.toList());
+
+ print(FORMAT, "Name", "Type", "Enabled", "Manufacturer", "swVersion");
+ for (TelemetryConfig config : configs) {
+ print(FORMAT, config.name(),
+ config.type(),
+ config.enabled() ? "ENABLED" : "DISABLED",
+ config.manufacturer(),
+ config.swVersion());
+ }
+ }
+}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/StatsFlowRuleJsonCodec.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/StatsFlowRuleJsonCodec.java
index 2e84b79..384987b 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/StatsFlowRuleJsonCodec.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/StatsFlowRuleJsonCodec.java
@@ -30,6 +30,9 @@
import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.getProtocolTypeFromString;
import static org.slf4j.LoggerFactory.getLogger;
+/**
+ * JSON codec for StatsFlowRule.
+ */
public class StatsFlowRuleJsonCodec extends JsonCodec<StatsFlowRule> {
private final Logger log = getLogger(getClass());
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/TinaMessageByteBufferCodec.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/TinaMessageByteBufferCodec.java
index 2ced23b..2be5f3e 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/TinaMessageByteBufferCodec.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/codec/TinaMessageByteBufferCodec.java
@@ -16,6 +16,7 @@
package org.onosproject.openstacktelemetry.codec;
import org.onosproject.openstacktelemetry.api.FlowInfo;
+import org.onosproject.openstacktelemetry.api.TelemetryCodec;
import java.nio.ByteBuffer;
import java.util.Set;
@@ -23,10 +24,7 @@
/**
* Tina Message ByteBuffer Codec.
*/
-public class TinaMessageByteBufferCodec {
-
- public static final String KAFKA_TOPIC = "sona.flow";
- public static final String KAFKA_KEY = "flowdata";
+public class TinaMessageByteBufferCodec implements TelemetryCodec {
private static final int HEADER_SIZE = 8;
private static final int ENTRY_SIZE = 88;
@@ -39,6 +37,7 @@
* @param flowInfos a collection of flow info
* @return encoded byte buffer
*/
+ @Override
public ByteBuffer encode(Set<FlowInfo> flowInfos) {
ByteBuffer byteBuffer =
ByteBuffer.allocate(HEADER_SIZE + flowInfos.size() * ENTRY_SIZE);
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/config/DefaultGrpcTelemetryConfig.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/config/DefaultGrpcTelemetryConfig.java
index 5ece99b..8d2f0fd 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/config/DefaultGrpcTelemetryConfig.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/config/DefaultGrpcTelemetryConfig.java
@@ -19,18 +19,26 @@
import com.google.common.collect.Maps;
import org.onosproject.openstacktelemetry.api.config.GrpcTelemetryConfig;
import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
+import org.onosproject.openstacktelemetry.api.config.TelemetryConfigProperties;
import java.util.Map;
import java.util.Objects;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.GRPC;
/**
* A configuration file contains gRPC telemetry parameters.
*/
public final class DefaultGrpcTelemetryConfig implements GrpcTelemetryConfig {
+ protected static final String ADDRESS = "address";
+ protected static final String PORT = "port";
+ protected static final String USE_PLAINTEXT = "usePlaintext";
+ protected static final String MAX_INBOUND_MSG_SIZE = "maxInboundMsgSize";
+ protected static final String CONFIG_MAP = "configMap";
+
private final String address;
private final int port;
private final boolean usePlaintext;
@@ -101,20 +109,39 @@
@Override
public String toString() {
return toStringHelper(this)
- .add("address", address)
- .add("port", port)
- .add("usePlaintext", usePlaintext)
- .add("maxInboundMsgSize", maxInboundMsgSize)
- .add("configMap", configMap)
+ .add(ADDRESS, address)
+ .add(PORT, port)
+ .add(USE_PLAINTEXT, usePlaintext)
+ .add(MAX_INBOUND_MSG_SIZE, maxInboundMsgSize)
+ .add(CONFIG_MAP, configMap)
.toString();
}
@Override
- public TelemetryConfig.Builder createBuilder() {
+ public TelemetryConfigProperties.Builder createBuilder() {
return new DefaultBuilder();
}
/**
+ * Builds a gRPC telemetry config from telemetry config instance.
+ *
+ * @param config telemetry config
+ * @return gRPC telemetry config
+ */
+ public static GrpcTelemetryConfig fromTelemetryConfig(TelemetryConfig config) {
+ if (config.type() != GRPC) {
+ return null;
+ }
+
+ return new DefaultBuilder()
+ .withAddress(config.getProperty(ADDRESS))
+ .withPort(Integer.valueOf(config.getProperty(PORT)))
+ .withUsePlaintext(Boolean.valueOf(config.getProperty(USE_PLAINTEXT)))
+ .withMaxInboundMsgSize(Integer.valueOf(config.getProperty(MAX_INBOUND_MSG_SIZE)))
+ .build();
+ }
+
+ /**
* Builder class of DefaultKafkaTelemetryConfig.
*/
public static final class DefaultBuilder implements Builder {
@@ -163,4 +190,4 @@
maxInboundMsgSize, configMap);
}
}
-}
+}
\ No newline at end of file
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/config/DefaultInfluxDbTelemetryConfig.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/config/DefaultInfluxDbTelemetryConfig.java
index c63f2bc..db60cb8 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/config/DefaultInfluxDbTelemetryConfig.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/config/DefaultInfluxDbTelemetryConfig.java
@@ -15,10 +15,12 @@
*/
package org.onosproject.openstacktelemetry.config;
+import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import org.onosproject.openstacktelemetry.api.config.InfluxDbTelemetryConfig;
import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
+import org.onosproject.openstacktelemetry.api.config.TelemetryConfigProperties;
import java.util.Map;
import java.util.Objects;
@@ -31,6 +33,15 @@
*/
public final class DefaultInfluxDbTelemetryConfig implements InfluxDbTelemetryConfig {
+ protected static final String ADDRESS = "address";
+ protected static final String PORT = "port";
+ protected static final String USERNAME = "username";
+ protected static final String PASSWORD = "password";
+ protected static final String DATABASE = "database";
+ protected static final String MEASUREMENT = "measurement";
+ protected static final String ENABLE_BATCH = "enableBatch";
+ protected static final String CONFIG_MAP = "configMap";
+
private final String address;
private final int port;
private final String username;
@@ -122,29 +133,54 @@
@Override
public int hashCode() {
return Objects.hash(address, port, username, password, database,
- measurement, enableBatch, configMap);
+ measurement, enableBatch, configMap);
}
@Override
public String toString() {
return toStringHelper(this)
- .add("address", address)
- .add("port", port)
- .add("username", username)
- .add("password", password)
- .add("database", database)
- .add("measurement", measurement)
- .add("enableBatch", enableBatch)
- .add("configMap", configMap)
+ .add(ADDRESS, address)
+ .add(PORT, port)
+ .add(USERNAME, username)
+ .add(PASSWORD, password)
+ .add(DATABASE, database)
+ .add(MEASUREMENT, measurement)
+ .add(ENABLE_BATCH, enableBatch)
+ .add(CONFIG_MAP, configMap)
.toString();
}
@Override
- public TelemetryConfig.Builder createBuilder() {
+ public TelemetryConfigProperties.Builder createBuilder() {
return new DefaultBuilder();
}
/**
+ * Builds an influxDB telemetry config from telemetry config instance.
+ *
+ * @param config telemetry config
+ * @return influxDB telemetry config
+ */
+ public static InfluxDbTelemetryConfig fromTelemetryConfig(TelemetryConfig config) {
+ if (config.type() != TelemetryConfig.ConfigType.INFLUXDB) {
+ return null;
+ }
+
+ boolean enableBatch = Strings.isNullOrEmpty(config.getProperty(ENABLE_BATCH)) ? false :
+ Boolean.valueOf(config.getProperty(ENABLE_BATCH));
+
+ return new DefaultBuilder()
+ .withAddress(config.getProperty(ADDRESS))
+ .withPort(Integer.valueOf(config.getProperty(PORT)))
+ .withUsername(config.getProperty(USERNAME))
+ .withPassword(config.getProperty(PASSWORD))
+ .withDatabase(config.getProperty(DATABASE))
+ .withMeasurement(config.getProperty(MEASUREMENT))
+ .withEnableBatch(enableBatch)
+ .build();
+ }
+
+ /**
* Builder class of DefaultInfluxDbTelemetryConfig.
*/
public static final class DefaultBuilder implements Builder {
@@ -211,11 +247,10 @@
checkNotNull(address, "InfluxDB server address cannot be null");
checkNotNull(username, "InfluxDB server username cannot be null");
checkNotNull(password, "InfluxDB server password cannot be null");
- checkNotNull(database, "InfluxDB server database cannot be null");
- checkNotNull(measurement, "InfluxDB server measurement cannot be null");
return new DefaultInfluxDbTelemetryConfig(address, port, username,
password, database, measurement, enableBatch, configMap);
}
+
}
-}
+}
\ No newline at end of file
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/config/DefaultKafkaTelemetryConfig.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/config/DefaultKafkaTelemetryConfig.java
index 6fc90fd..1cd52ef 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/config/DefaultKafkaTelemetryConfig.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/config/DefaultKafkaTelemetryConfig.java
@@ -15,22 +15,39 @@
*/
package org.onosproject.openstacktelemetry.config;
+import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import org.onosproject.openstacktelemetry.api.config.KafkaTelemetryConfig;
import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
+import org.onosproject.openstacktelemetry.api.config.TelemetryConfigProperties;
import java.util.Map;
import java.util.Objects;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.KAFKA;
/**
* A configuration file contains Kafka telemetry parameters.
*/
public final class DefaultKafkaTelemetryConfig implements KafkaTelemetryConfig {
+ protected static final String ADDRESS = "address";
+ protected static final String PORT = "port";
+ protected static final String RETRIES = "retries";
+ protected static final String REQUIRED_ACKS = "requiredAcks";
+ protected static final String BATCH_SIZE = "batchSize";
+ protected static final String LINGER_MS = "lingerMs";
+ protected static final String MEMORY_BUFFER = "memoryBuffer";
+ protected static final String KEY_SERIALIZER = "keySerializer";
+ protected static final String VALUE_SERIALIZER = "valueSerializer";
+ protected static final String KEY = "key";
+ protected static final String TOPIC = "topic";
+ protected static final String CODEC = "codec";
+ protected static final String CONFIG_MAP = "configMap";
+
private final String address;
private final int port;
private final int retries;
@@ -40,6 +57,9 @@
private final int memoryBuffer;
private final String keySerializer;
private final String valueSerializer;
+ private final String key;
+ private final String topic;
+ private final String codec;
private final Map<String, Object> configMap;
private DefaultKafkaTelemetryConfig(String address, int port, int retries,
@@ -47,6 +67,7 @@
int lingerMs, int memoryBuffer,
String keySerializer,
String valueSerializer,
+ String key, String topic, String codec,
Map<String, Object> configMap) {
this.address = address;
this.port = port;
@@ -57,6 +78,9 @@
this.memoryBuffer = memoryBuffer;
this.keySerializer = keySerializer;
this.valueSerializer = valueSerializer;
+ this.key = key;
+ this.topic = topic;
+ this.codec = codec;
this.configMap = configMap;
}
@@ -106,6 +130,21 @@
}
@Override
+ public String key() {
+ return key;
+ }
+
+ @Override
+ public String topic() {
+ return topic;
+ }
+
+ @Override
+ public String codec() {
+ return codec;
+ }
+
+ @Override
public Map<String, Object> configMap() {
if (configMap != null) {
return ImmutableMap.copyOf(configMap);
@@ -131,6 +170,9 @@
Objects.equals(this.memoryBuffer, other.memoryBuffer) &&
Objects.equals(this.keySerializer, other.keySerializer) &&
Objects.equals(this.valueSerializer, other.valueSerializer) &&
+ Objects.equals(this.key, other.key) &&
+ Objects.equals(this.topic, other.topic) &&
+ Objects.equals(this.codec, other.codec) &&
Objects.equals(this.configMap, other.configMap);
}
return false;
@@ -139,31 +181,71 @@
@Override
public int hashCode() {
return Objects.hash(address, port, retries, requiredAcks, batchSize,
- lingerMs, memoryBuffer, keySerializer, valueSerializer, configMap);
+ lingerMs, memoryBuffer, keySerializer, valueSerializer,
+ key, topic, codec, configMap);
}
@Override
public String toString() {
return toStringHelper(this)
- .add("address", address)
- .add("port", port)
- .add("retries", retries)
- .add("requiredAcks", requiredAcks)
- .add("batchSize", batchSize)
- .add("lingerMs", lingerMs)
- .add("memoryBuffer", memoryBuffer)
- .add("keySerializer", keySerializer)
- .add("valueSerializer", valueSerializer)
- .add("configMap", configMap)
+ .add(ADDRESS, address)
+ .add(PORT, port)
+ .add(RETRIES, retries)
+ .add(REQUIRED_ACKS, requiredAcks)
+ .add(BATCH_SIZE, batchSize)
+ .add(LINGER_MS, lingerMs)
+ .add(MEMORY_BUFFER, memoryBuffer)
+ .add(KEY_SERIALIZER, keySerializer)
+ .add(VALUE_SERIALIZER, valueSerializer)
+ .add(KEY, key)
+ .add(TOPIC, topic)
+ .add(CODEC, codec)
+ .add(CONFIG_MAP, configMap)
.toString();
}
@Override
- public TelemetryConfig.Builder createBuilder() {
+ public TelemetryConfigProperties.Builder createBuilder() {
return new DefaultBuilder();
}
/**
+ * Builds a kafka telemetry config from telemetry config instance.
+ *
+ * @param config telemetry config
+ * @return kafka telemetry config
+ */
+ public static KafkaTelemetryConfig fromTelemetryConfig(TelemetryConfig config) {
+ if (config.type() != KAFKA) {
+ return null;
+ }
+
+ int retries = Strings.isNullOrEmpty(config.getProperty(RETRIES)) ? 0 :
+ Integer.valueOf(config.getProperty(RETRIES));
+ int batchSize = Strings.isNullOrEmpty(config.getProperty(BATCH_SIZE)) ? 0 :
+ Integer.valueOf(config.getProperty(BATCH_SIZE));
+ int lingerMs = Strings.isNullOrEmpty(config.getProperty(LINGER_MS)) ? 0 :
+ Integer.valueOf(config.getProperty(LINGER_MS));
+ int memoryBuffer = Strings.isNullOrEmpty(config.getProperty(MEMORY_BUFFER)) ? 0 :
+ Integer.valueOf(config.getProperty(MEMORY_BUFFER));
+
+ return new DefaultBuilder()
+ .withAddress(config.getProperty(ADDRESS))
+ .withPort(Integer.valueOf(config.getProperty(PORT)))
+ .withRetries(retries)
+ .withRequiredAcks(config.getProperty(REQUIRED_ACKS))
+ .withBatchSize(batchSize)
+ .withLingerMs(lingerMs)
+ .withMemoryBuffer(memoryBuffer)
+ .withKeySerializer(config.getProperty(KEY_SERIALIZER))
+ .withValueSerializer(config.getProperty(VALUE_SERIALIZER))
+ .withKey(config.getProperty(KEY))
+ .withTopic(config.getProperty(TOPIC))
+ .withCodec(config.getProperty(CODEC))
+ .build();
+ }
+
+ /**
* Builder class of DefaultKafkaTelemetryConfig.
*/
public static final class DefaultBuilder implements Builder {
@@ -176,6 +258,9 @@
private int memoryBuffer;
private String keySerializer;
private String valueSerializer;
+ private String key;
+ private String topic;
+ private String codec;
private Map<String, Object> configMap;
@Override
@@ -233,6 +318,24 @@
}
@Override
+ public Builder withKey(String key) {
+ this.key = key;
+ return this;
+ }
+
+ @Override
+ public Builder withTopic(String topic) {
+ this.topic = topic;
+ return this;
+ }
+
+ @Override
+ public Builder withCodec(String codec) {
+ this.codec = codec;
+ return this;
+ }
+
+ @Override
public Builder withConfigMap(Map<String, Object> configMap) {
this.configMap = configMap;
return this;
@@ -241,12 +344,10 @@
@Override
public KafkaTelemetryConfig build() {
checkNotNull(address, "Kafka server address cannot be null");
- checkNotNull(keySerializer, "Kafka key serializer cannot be null");
- checkNotNull(valueSerializer, "Kafka value serializer cannot be null");
return new DefaultKafkaTelemetryConfig(address, port, retries,
- requiredAcks, batchSize, lingerMs, memoryBuffer,
- keySerializer, valueSerializer, configMap);
+ requiredAcks, batchSize, lingerMs, memoryBuffer, keySerializer,
+ valueSerializer, key, topic, codec, configMap);
}
}
-}
+}
\ No newline at end of file
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/config/DefaultPrometheusTelemetryConfig.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/config/DefaultPrometheusTelemetryConfig.java
index 33d1d2e..7dc5a5a 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/config/DefaultPrometheusTelemetryConfig.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/config/DefaultPrometheusTelemetryConfig.java
@@ -19,18 +19,24 @@
import com.google.common.collect.Maps;
import org.onosproject.openstacktelemetry.api.config.PrometheusTelemetryConfig;
import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
+import org.onosproject.openstacktelemetry.api.config.TelemetryConfigProperties;
import java.util.Map;
import java.util.Objects;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.PROMETHEUS;
/**
* A configuration file contains Prometheus telemetry parameters.
*/
public final class DefaultPrometheusTelemetryConfig implements PrometheusTelemetryConfig {
+ protected static final String ADDRESS = "address";
+ protected static final String PORT = "port";
+ protected static final String CONFIG_MAP = "configMap";
+
private final String address;
private final int port;
private final Map<String, Object> configMap;
@@ -85,18 +91,35 @@
@Override
public String toString() {
return toStringHelper(this)
- .add("address", address)
- .add("port", port)
- .add("configMap", configMap)
+ .add(ADDRESS, address)
+ .add(PORT, port)
+ .add(CONFIG_MAP, configMap)
.toString();
}
@Override
- public TelemetryConfig.Builder createBuilder() {
+ public TelemetryConfigProperties.Builder createBuilder() {
return new DefaultBuilder();
}
/**
+ * Builds a prometheus telemetry config from telemetry config instance.
+ *
+ * @param config telemetry config
+ * @return prometheus telemetry config
+ */
+ public static PrometheusTelemetryConfig fromTelemetryConfig(TelemetryConfig config) {
+ if (config.type() != PROMETHEUS) {
+ return null;
+ }
+
+ return new DefaultBuilder()
+ .withAddress(config.getProperty(ADDRESS))
+ .withPort(Integer.valueOf(config.getProperty(PORT)))
+ .build();
+ }
+
+ /**
* Builder class of DefaultPrometheusTelemetryConfig.
*/
public static final class DefaultBuilder implements Builder {
@@ -121,10 +144,11 @@
this.configMap = configMap;
return this;
}
+
@Override
public PrometheusTelemetryConfig build() {
checkNotNull(address, "Prometheus exporter binding address cannot be null");
return new DefaultPrometheusTelemetryConfig(address, port, configMap);
}
}
-}
+}
\ No newline at end of file
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/config/DefaultRestTelemetryConfig.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/config/DefaultRestTelemetryConfig.java
index df9d6ce..a5977a5 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/config/DefaultRestTelemetryConfig.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/config/DefaultRestTelemetryConfig.java
@@ -19,18 +19,28 @@
import com.google.common.collect.Maps;
import org.onosproject.openstacktelemetry.api.config.RestTelemetryConfig;
import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
+import org.onosproject.openstacktelemetry.api.config.TelemetryConfigProperties;
import java.util.Map;
import java.util.Objects;
import static com.google.common.base.MoreObjects.toStringHelper;
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.REST;
/**
* A configuration file contains REST telemetry parameters.
*/
public final class DefaultRestTelemetryConfig implements RestTelemetryConfig {
+ protected static final String ADDRESS = "address";
+ protected static final String PORT = "port";
+ protected static final String ENDPOINT = "endpoint";
+ protected static final String METHOD = "method";
+ protected static final String REQUEST_MEDIA_TYPE = "requestMediaType";
+ protected static final String RESPONSE_MEDIA_TYPE = "responseMediaType";
+ protected static final String CONFIG_MAP = "configMap";
+
private final String address;
private final int port;
private final String endpoint;
@@ -119,22 +129,43 @@
@Override
public String toString() {
return toStringHelper(this)
- .add("address", address)
- .add("port", port)
- .add("endpoint", endpoint)
- .add("method", method)
- .add("requestMediaType", requestMediaType)
- .add("responseMediaType", responseMediaType)
- .add("configMap", configMap)
+ .add(ADDRESS, address)
+ .add(PORT, port)
+ .add(ENDPOINT, endpoint)
+ .add(METHOD, method)
+ .add(REQUEST_MEDIA_TYPE, requestMediaType)
+ .add(RESPONSE_MEDIA_TYPE, responseMediaType)
+ .add(CONFIG_MAP, configMap)
.toString();
}
@Override
- public TelemetryConfig.Builder createBuilder() {
+ public TelemetryConfigProperties.Builder createBuilder() {
return new DefaultBuilder();
}
/**
+ * Builds a REST telemetry config from telemetry config instance.
+ *
+ * @param config telemetry config
+ * @return REST telemetry config
+ */
+ public static RestTelemetryConfig fromTelemetryConfig(TelemetryConfig config) {
+ if (config.type() != REST) {
+ return null;
+ }
+
+ return new DefaultBuilder()
+ .withAddress(config.getProperty(ADDRESS))
+ .withPort(Integer.valueOf(config.getProperty(PORT)))
+ .withEndpoint(config.getProperty(ENDPOINT))
+ .withMethod(config.getProperty(METHOD))
+ .withRequestMediaType(config.getProperty(REQUEST_MEDIA_TYPE))
+ .withResponseMediaType(config.getProperty(RESPONSE_MEDIA_TYPE))
+ .build();
+ }
+
+ /**
* Builder class of DefaultRestTelemetryConfig.
*/
public static final class DefaultBuilder implements Builder {
@@ -198,4 +229,4 @@
method, requestMediaType, responseMediaType, configMap);
}
}
-}
+}
\ No newline at end of file
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/gui/OpenstackTelemetryUiTopovOverlay.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/gui/OpenstackTelemetryUiTopovOverlay.java
index f1575bc..597588b 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/gui/OpenstackTelemetryUiTopovOverlay.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/gui/OpenstackTelemetryUiTopovOverlay.java
@@ -17,6 +17,9 @@
import org.onosproject.ui.UiTopoOverlay;
+/**
+ * Openstack telemetry UI topology implementation.
+ */
public class OpenstackTelemetryUiTopovOverlay extends UiTopoOverlay {
private static final String OVERLAY_ID = "ostelemetry-overlay";
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/AbstractTelemetryConfigLoader.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/AbstractTelemetryConfigLoader.java
new file mode 100644
index 0000000..486cea5
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/AbstractTelemetryConfigLoader.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.impl;
+
+import org.onosproject.openstacktelemetry.api.TelemetryConfigAdminService;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigProvider;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Abstract bootstrapper for loading and registering telemetry configurations
+ * that are independent from the default telemetry configurations.
+ */
+public abstract class AbstractTelemetryConfigLoader {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private TelemetryConfigProvider provider;
+ private final String path;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected TelemetryConfigAdminService configAdminService;
+
+ /**
+ * Creates a new loader for resource with the specified path.
+ *
+ * @param path configurations definition XML resource path
+ */
+ protected AbstractTelemetryConfigLoader(String path) {
+ this.path = path;
+ }
+
+ @Activate
+ protected void activate() {
+ try {
+ provider = new XmlTelemetryConfigLoader().loadTelemetryConfigs(
+ getClass().getResourceAsStream(path));
+ configAdminService.registerProvider(provider);
+ } catch (Exception e) {
+ log.error("Unable to load {} telemetry configuration definitions", path, e);
+ }
+ log.info("Started");
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ configAdminService.unregisterProvider(provider);
+ log.info("Stopped");
+ }
+}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultStatsFlowRule.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultStatsFlowRule.java
index 2b2d7b6..9ee84f8 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultStatsFlowRule.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultStatsFlowRule.java
@@ -24,6 +24,9 @@
import static com.google.common.base.Preconditions.checkArgument;
+/**
+ * Implementation of StatsFlowRule.
+ */
public final class DefaultStatsFlowRule implements StatsFlowRule {
private final IpPrefix srcIpPrefix;
private final IpPrefix dstIpPrefix;
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultTelemetryConfig.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultTelemetryConfig.java
new file mode 100644
index 0000000..368b36b
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultTelemetryConfig.java
@@ -0,0 +1,203 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.impl;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Queue;
+import java.util.Set;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.collect.ImmutableMap.copyOf;
+
+/**
+ * Implementation of TelemetryConfig.
+ */
+public final class DefaultTelemetryConfig implements TelemetryConfig {
+ private final String name;
+ private final ConfigType type;
+ private final List<TelemetryConfig> parents;
+
+ private final String manufacturer;
+ private final String swVersion;
+ private final boolean enabled;
+
+ private final Map<String, String> properties;
+
+ /**
+ * Creates a configuration with the specified name.
+ *
+ * @param name configuration name
+ * @param type configuration type
+ * @param parents optional parent configurations
+ * @param manufacturer off-platform application manufacturer
+ * @param swVersion off-platform application software version
+ * @param enabled service enable flag
+ * @param properties properties for telemetry configuration
+ */
+ public DefaultTelemetryConfig(String name, ConfigType type,
+ List<TelemetryConfig> parents,
+ String manufacturer, String swVersion,
+ boolean enabled, Map<String, String> properties) {
+ this.name = checkNotNull(name, "Name cannot be null");
+ this.type = checkNotNull(type, "type cannot be null");
+ this.parents = parents == null ? ImmutableList.of() : ImmutableList.copyOf(parents);
+ this.manufacturer = checkNotNull(manufacturer, "Manufacturer cannot be null");
+ this.swVersion = checkNotNull(swVersion, "SW version cannot be null");
+ this.properties = copyOf(checkNotNull(properties, "Properties cannot be null"));
+ this.enabled = enabled;
+ }
+
+ @Override
+ public String name() {
+ return name;
+ }
+
+ @Override
+ public ConfigType type() {
+ return type;
+ }
+
+ @Override
+ public List<TelemetryConfig> parents() {
+ if (parents == null) {
+ return ImmutableList.of();
+ } else {
+ return ImmutableList.copyOf(parents);
+ }
+ }
+
+ @Override
+ public String manufacturer() {
+ return manufacturer;
+ }
+
+ @Override
+ public String swVersion() {
+ return swVersion;
+ }
+
+ @Override
+ public boolean enabled() {
+ return enabled;
+ }
+
+ @Override
+ public Map<String, String> properties() {
+ if (properties == null) {
+ return ImmutableMap.of();
+ } else {
+ return ImmutableMap.copyOf(properties);
+ }
+ }
+
+ @Override
+ public String getProperty(String name) {
+ Queue<TelemetryConfig> queue = new LinkedList<>();
+ queue.add(this);
+ while (!queue.isEmpty()) {
+ TelemetryConfig config = queue.remove();
+ String property = config.properties().get(name);
+ if (property != null) {
+ return property;
+ } else if (config.parents() != null) {
+ queue.addAll(config.parents());
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public TelemetryConfig merge(TelemetryConfig other) {
+ // merge the properties
+ ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
+ properties.putAll(other.properties());
+
+ // remove duplicated properties from this configuration and merge
+ this.properties().entrySet().stream()
+ .filter(e -> !other.properties().containsKey(e.getKey()))
+ .forEach(properties::put);
+
+ List<TelemetryConfig> completeParents = new ArrayList<>();
+
+ if (parents != null) {
+ parents.forEach(parent -> other.parents().forEach(otherParent -> {
+ if (otherParent.name().equals(parent.name())) {
+ completeParents.add(parent.merge(otherParent));
+ } else if (!completeParents.contains(otherParent)) {
+ completeParents.add(otherParent);
+ } else if (!completeParents.contains(parent)) {
+ completeParents.add(parent);
+ }
+ }));
+ }
+
+ return new DefaultTelemetryConfig(name, type,
+ !completeParents.isEmpty() ? completeParents : other.parents(),
+ manufacturer, swVersion, enabled, properties.build());
+ }
+
+ @Override
+ public Set<String> keys() {
+ return properties.keySet();
+ }
+
+ @Override
+ public String value(String key) {
+ return properties.get(key);
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(this)
+ .add("name", name)
+ .add("type", type)
+ .add("parents", parents)
+ .add("manufacturer", manufacturer)
+ .add("swVersion", swVersion)
+ .add("enabled", enabled)
+ .add("properties", properties)
+ .toString();
+ }
+
+ @Override
+ public boolean equals(Object configToBeCompared) {
+ if (this == configToBeCompared) {
+ return true;
+ }
+
+ if (configToBeCompared == null || getClass() != configToBeCompared.getClass()) {
+ return false;
+ }
+
+ DefaultTelemetryConfig telemetryConfig =
+ (DefaultTelemetryConfig) configToBeCompared;
+ return name.equals(telemetryConfig.name());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(name);
+ }
+}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultTelemetryConfigProvider.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultTelemetryConfigProvider.java
new file mode 100644
index 0000000..5fb1e70
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DefaultTelemetryConfigProvider.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.impl;
+
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigProvider;
+import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
+
+import java.util.Map;
+import java.util.Set;
+
+import static com.google.common.base.MoreObjects.toStringHelper;
+
+/**
+ * Default telemetry configuration provider implementation.
+ */
+public class DefaultTelemetryConfigProvider implements TelemetryConfigProvider {
+
+ protected final Map<String, TelemetryConfig> configs = Maps.newConcurrentMap();
+
+ @Override
+ public Set<TelemetryConfig> getTelemetryConfigs() {
+ return ImmutableSet.copyOf(configs.values());
+ }
+
+ /**
+ * Adds the specified configuration to the provider. If a configuration with
+ * the name does not exist yet, the specified one will be added. Otherwise,
+ * the existing configuration will be merged with the new one and the result will
+ * be registered.
+ *
+ * @param config telemetry configuration to be provided
+ * @return registered configuration
+ */
+ public TelemetryConfig addConfig(TelemetryConfig config) {
+ return configs.compute(config.name(), (name, oldConfig) ->
+ oldConfig == null ? config : oldConfig.merge(config));
+ }
+
+ /**
+ * Removes the specified configuration from the provider.
+ *
+ * @param config telemetry configuration
+ */
+ public void removeConfig(TelemetryConfig config) {
+ configs.remove(config.name());
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(this).add("configs", configs).toString();
+ }
+}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DistributedTelemetryConfigStore.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DistributedTelemetryConfigStore.java
new file mode 100644
index 0000000..b1d6133
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/DistributedTelemetryConfigStore.java
@@ -0,0 +1,202 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.impl;
+
+import com.google.common.collect.ImmutableSet;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigEvent;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigProvider;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigStore;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigStoreDelegate;
+import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
+import org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+
+import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.openstacktelemetry.api.Constants.OPENSTACK_TELEMETRY_APP_ID;
+import static org.onosproject.openstacktelemetry.api.TelemetryConfigEvent.Type.CONFIG_ADDED;
+import static org.onosproject.openstacktelemetry.api.TelemetryConfigEvent.Type.CONFIG_DELETED;
+import static org.onosproject.openstacktelemetry.api.TelemetryConfigEvent.Type.CONFIG_UPDATED;
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Manages the inventory of telemetry configurations using a {@code ConsistentMap}.
+ */
+@Component(immediate = true, service = TelemetryConfigStore.class)
+public class DistributedTelemetryConfigStore
+ extends AbstractStore<TelemetryConfigEvent, TelemetryConfigStoreDelegate>
+ implements TelemetryConfigStore {
+
+ protected final Logger log = getLogger(getClass());
+
+ private static final String ERR_NOT_FOUND = " does not exist";
+ private static final String ERR_DUPLICATE = " already exists";
+
+ private static final KryoNamespace SERIALIZER_TELEMETRY_CONFIG =
+ KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(TelemetryConfigProvider.class)
+ .register(DefaultTelemetryConfigProvider.class)
+ .register(TelemetryConfig.class)
+ .register(TelemetryConfig.ConfigType.class)
+ .register(DefaultTelemetryConfig.class)
+ .build();
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected StorageService storageService;
+
+ private final ExecutorService eventExecutor = newSingleThreadExecutor(
+ groupedThreads(this.getClass().getSimpleName(), "event-handler", log));
+
+ private final MapEventListener<String, TelemetryConfig>
+ telemetryConfigMapListener = new TelemetryConfigMapListener();
+
+ private ConsistentMap<String, TelemetryConfig> telemetryConfigStore;
+
+ @Activate
+ protected void activate() {
+ ApplicationId appId = coreService.registerApplication(OPENSTACK_TELEMETRY_APP_ID);
+
+ telemetryConfigStore = storageService.<String, TelemetryConfig>consistentMapBuilder()
+ .withSerializer(Serializer.using(SERIALIZER_TELEMETRY_CONFIG))
+ .withName("telemetry-config-store")
+ .withApplicationId(appId)
+ .build();
+ telemetryConfigStore.addListener(telemetryConfigMapListener);
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ telemetryConfigStore.removeListener(telemetryConfigMapListener);
+ eventExecutor.shutdown();
+
+ log.info("Stopped");
+ }
+
+ @Override
+ public void createTelemetryConfig(TelemetryConfig config) {
+ telemetryConfigStore.compute(config.name(), (name, existing) -> {
+ final String error = config.name() + ERR_DUPLICATE;
+ checkArgument(existing == null, error);
+ return config;
+ });
+ }
+
+ @Override
+ public void updateTelemetryConfig(TelemetryConfig config) {
+ telemetryConfigStore.compute(config.name(), (name, existing) -> {
+ final String error = config.name() + ERR_NOT_FOUND;
+ checkArgument(existing != null, error);
+ return config;
+ });
+ }
+
+ @Override
+ public TelemetryConfig removeTelemetryConfig(String name) {
+ Versioned<TelemetryConfig> config = telemetryConfigStore.remove(name);
+ return config == null ? null : config.value();
+ }
+
+ @Override
+ public TelemetryConfig telemetryConfig(String name) {
+ return telemetryConfigStore.asJavaMap().get(name);
+ }
+
+ @Override
+ public Set<TelemetryConfig> telemetryConfigs() {
+ return ImmutableSet.copyOf(telemetryConfigStore.asJavaMap().values());
+ }
+
+ @Override
+ public Set<TelemetryConfig> telemetryConfigsByType(ConfigType type) {
+ return ImmutableSet.copyOf(telemetryConfigStore.asJavaMap().values()
+ .stream().filter(c -> c.type() == type).collect(Collectors.toSet()));
+ }
+
+ @Override
+ public void clear() {
+ telemetryConfigStore.clear();
+ }
+
+ private class TelemetryConfigMapListener
+ implements MapEventListener<String, TelemetryConfig> {
+
+ @Override
+ public void event(MapEvent<String, TelemetryConfig> event) {
+ switch (event.type()) {
+ case INSERT:
+ eventExecutor.execute(() -> processTelemetryConfigMapInsertion(event));
+ break;
+ case UPDATE:
+ eventExecutor.execute(() -> processTelemetryConfigMapUpdate(event));
+ break;
+ case REMOVE:
+ eventExecutor.execute(() -> processTelemetryConfigMapRemoval(event));
+ break;
+ default:
+ log.error("Unsupported telemetry config event type");
+ break;
+ }
+ }
+
+ private void processTelemetryConfigMapInsertion(MapEvent<String,
+ TelemetryConfig> event) {
+ log.debug("Telemetry config created");
+ notifyDelegate(new TelemetryConfigEvent(
+ CONFIG_ADDED, event.newValue().value()));
+ }
+
+ private void processTelemetryConfigMapUpdate(MapEvent<String,
+ TelemetryConfig> event) {
+ log.debug("Telemetry config updated");
+ notifyDelegate(new TelemetryConfigEvent(
+ CONFIG_UPDATED, event.newValue().value()));
+ }
+
+ private void processTelemetryConfigMapRemoval(MapEvent<String,
+ TelemetryConfig> event) {
+ log.debug("Telemetry config removed");
+ notifyDelegate(new TelemetryConfigEvent(
+ CONFIG_DELETED, event.oldValue().value()));
+ }
+ }
+}
diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/PrometheusTelemetryConfigService.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/GrpcConfigsLoader.java
similarity index 65%
copy from apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/PrometheusTelemetryConfigService.java
copy to apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/GrpcConfigsLoader.java
index 4e115de..f7e4693 100644
--- a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/PrometheusTelemetryConfigService.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/GrpcConfigsLoader.java
@@ -13,10 +13,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.onosproject.openstacktelemetry.api;
+package org.onosproject.openstacktelemetry.impl;
+
+import org.osgi.service.component.annotations.Component;
/**
- * Configuration service API for publishing openstack telemetry through Prometheus producer.
+ * Loader for gRPC telemetry configurations.
*/
-public interface PrometheusTelemetryConfigService extends TelemetryConfigService {
+@Component(immediate = true)
+public class GrpcConfigsLoader extends AbstractTelemetryConfigLoader {
+ public GrpcConfigsLoader() {
+ super("grpc-configs.xml");
+ }
}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/GrpcTelemetryConfigManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/GrpcTelemetryConfigManager.java
deleted file mode 100644
index 70b1d70..0000000
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/GrpcTelemetryConfigManager.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.openstacktelemetry.impl;
-
-import org.onlab.util.Tools;
-import org.onosproject.cfg.ComponentConfigService;
-import org.onosproject.openstacktelemetry.api.GrpcTelemetryAdminService;
-import org.onosproject.openstacktelemetry.api.GrpcTelemetryConfigService;
-import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
-import org.onosproject.openstacktelemetry.config.DefaultGrpcTelemetryConfig;
-import org.osgi.service.component.ComponentContext;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Deactivate;
-import org.osgi.service.component.annotations.Modified;
-import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.component.annotations.ReferenceCardinality;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Dictionary;
-
-import static org.onlab.util.Tools.get;
-import static org.onlab.util.Tools.getIntegerProperty;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.GRPC_ENABLE_SERVICE_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.GRPC_MAX_INBOUND_MSG_SIZE_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.GRPC_SERVER_ADDRESS_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.GRPC_SERVER_PORT_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.GRPC_USE_PLAINTEXT_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_GRPC_ENABLE_SERVICE;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_GRPC_MAX_INBOUND_MSG_SIZE;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_GRPC_SERVER_ADDRESS;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_GRPC_SERVER_PORT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_GRPC_USE_PLAINTEXT;
-import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.getBooleanProperty;
-import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.initTelemetryService;
-
-/**
- * gRPC server configuration manager for publishing openstack telemetry.
- */
-@Component(
- immediate = true,
- service = GrpcTelemetryConfigService.class,
- property = {
- PROP_GRPC_ENABLE_SERVICE + ":Boolean=" + GRPC_ENABLE_SERVICE_DEFAULT,
- PROP_GRPC_SERVER_ADDRESS + "=" + GRPC_SERVER_ADDRESS_DEFAULT,
- PROP_GRPC_SERVER_PORT + ":Integer=" + GRPC_SERVER_PORT_DEFAULT,
- PROP_GRPC_USE_PLAINTEXT + ":Boolean=" + GRPC_USE_PLAINTEXT_DEFAULT,
- PROP_GRPC_MAX_INBOUND_MSG_SIZE + ":Integer=" + GRPC_MAX_INBOUND_MSG_SIZE_DEFAULT
- }
-)
-public class GrpcTelemetryConfigManager implements GrpcTelemetryConfigService {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected ComponentConfigService componentConfigService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected GrpcTelemetryAdminService grpcTelemetryAdminService;
-
- /** Default IP address to establish initial connection to gRPC server. */
- protected String address = GRPC_SERVER_ADDRESS_DEFAULT;
-
- /** Default port number to establish initial connection to gRPC server. */
- protected Integer port = GRPC_SERVER_PORT_DEFAULT;
-
- /** UsePlaintext flag value used for connecting to gRPC server. */
- protected Boolean usePlaintext = GRPC_USE_PLAINTEXT_DEFAULT;
-
- /** Maximum inbound message size used for communicating with gRPC server. */
- protected Integer maxInboundMsgSize = GRPC_MAX_INBOUND_MSG_SIZE_DEFAULT;
-
- /** Specify the default behavior of telemetry service. */
- protected Boolean enableService = GRPC_ENABLE_SERVICE_DEFAULT;
-
- @Activate
- protected void activate(ComponentContext context) {
- componentConfigService.registerProperties(getClass());
-
- if (enableService) {
- grpcTelemetryAdminService.start(getConfig());
- }
- log.info("Started");
- }
-
- @Deactivate
- protected void deactivate() {
- componentConfigService.unregisterProperties(getClass(), false);
-
- if (enableService) {
- grpcTelemetryAdminService.stop();
- }
- log.info("Stopped");
- }
-
- @Modified
- private void modified(ComponentContext context) {
- readComponentConfiguration(context);
- initTelemetryService(grpcTelemetryAdminService, getConfig(), enableService);
- log.info("Modified");
- }
-
- @Override
- public TelemetryConfig getConfig() {
- return new DefaultGrpcTelemetryConfig.DefaultBuilder()
- .withAddress(address)
- .withPort(port)
- .withUsePlaintext(usePlaintext)
- .withMaxInboundMsgSize(maxInboundMsgSize)
- .build();
- }
-
- /**
- * Extracts properties from the component configuration context.
- *
- * @param context the component context
- */
- private void readComponentConfiguration(ComponentContext context) {
- Dictionary<?, ?> properties = context.getProperties();
-
- String addressStr = get(properties, PROP_GRPC_SERVER_ADDRESS);
- address = addressStr != null ? addressStr : GRPC_SERVER_ADDRESS_DEFAULT;
- log.info("Configured. gRPC server address is {}", address);
-
- Integer portConfigured = Tools.getIntegerProperty(properties, PROP_GRPC_SERVER_PORT);
- if (portConfigured == null) {
- port = GRPC_SERVER_PORT_DEFAULT;
- log.info("gRPC server port is NOT configured, default value is {}", port);
- } else {
- port = portConfigured;
- log.info("Configured. gRPC server port is {}", port);
- }
-
- Boolean usePlaintextConfigured =
- getBooleanProperty(properties, PROP_GRPC_USE_PLAINTEXT);
- if (usePlaintextConfigured == null) {
- usePlaintext = GRPC_USE_PLAINTEXT_DEFAULT;
- log.info("gRPC server use plaintext flag is NOT " +
- "configured, default value is {}", usePlaintext);
- } else {
- usePlaintext = usePlaintextConfigured;
- log.info("Configured. gRPC server use plaintext flag is {}", usePlaintext);
- }
-
- Integer maxInboundMsgSizeConfigured =
- getIntegerProperty(properties, PROP_GRPC_MAX_INBOUND_MSG_SIZE);
- if (maxInboundMsgSizeConfigured == null) {
- maxInboundMsgSize = GRPC_MAX_INBOUND_MSG_SIZE_DEFAULT;
- log.info("gRPC server max inbound message size is NOT " +
- "configured, default value is {}", maxInboundMsgSize);
- } else {
- maxInboundMsgSize = maxInboundMsgSizeConfigured;
- log.info("Configured. gRPC server max inbound message size is {}", maxInboundMsgSize);
- }
-
- Boolean enableServiceConfigured =
- getBooleanProperty(properties, PROP_GRPC_ENABLE_SERVICE);
- if (enableServiceConfigured == null) {
- enableService = GRPC_ENABLE_SERVICE_DEFAULT;
- log.info("gRPC service enable flag is NOT " +
- "configured, default value is {}", enableService);
- } else {
- enableService = enableServiceConfigured;
- log.info("Configured. gRPC service enable flag is {}", enableService);
- }
- }
-
-}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/GrpcTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/GrpcTelemetryManager.java
index 391b1d8..f487504 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/GrpcTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/GrpcTelemetryManager.java
@@ -15,12 +15,13 @@
*/
package org.onosproject.openstacktelemetry.impl;
+import com.google.common.collect.Sets;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import org.onosproject.openstacktelemetry.api.GrpcTelemetryAdminService;
import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigService;
import org.onosproject.openstacktelemetry.api.config.GrpcTelemetryConfig;
-import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
@@ -29,6 +30,12 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Set;
+
+import static org.onosproject.openstacktelemetry.api.Constants.GRPC_SCHEME;
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.GRPC;
+import static org.onosproject.openstacktelemetry.config.DefaultGrpcTelemetryConfig.fromTelemetryConfig;
+
/**
* gRPC telemetry manager.
*/
@@ -40,7 +47,10 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected OpenstackTelemetryService openstackTelemetryService;
- private ManagedChannel channel = null;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected TelemetryConfigService telemetryConfigService;
+
+ private Set<ManagedChannel> channels = Sets.newConcurrentHashSet();
@Activate
protected void activate() {
@@ -60,43 +70,41 @@
}
@Override
- public void start(TelemetryConfig config) {
- if (channel != null) {
- log.info("gRPC producer has already been started");
- return;
- }
+ public void start() {
+ telemetryConfigService.getConfigsByType(GRPC).forEach(c -> {
+ GrpcTelemetryConfig grpcConfig = fromTelemetryConfig(c);
- GrpcTelemetryConfig grpcConfig = (GrpcTelemetryConfig) config;
- channel = ManagedChannelBuilder
- .forAddress(grpcConfig.address(), grpcConfig.port())
- .maxInboundMessageSize(grpcConfig.maxInboundMsgSize())
- .usePlaintext(grpcConfig.usePlaintext())
- .build();
+ if (grpcConfig != null && !c.name().equals(GRPC_SCHEME) && c.enabled()) {
+ ManagedChannel channel = ManagedChannelBuilder
+ .forAddress(grpcConfig.address(), grpcConfig.port())
+ .maxInboundMessageSize(grpcConfig.maxInboundMsgSize())
+ .usePlaintext(grpcConfig.usePlaintext())
+ .build();
+
+ channels.add(channel);
+ }
+ });
log.info("gRPC producer has Started");
}
@Override
public void stop() {
- if (channel != null) {
- channel.shutdown();
- channel = null;
- }
-
+ channels.forEach(ManagedChannel::shutdown);
log.info("gRPC producer has Stopped");
}
@Override
- public void restart(TelemetryConfig config) {
+ public void restart() {
stop();
- start(config);
+ start();
}
@Override
public Object publish(Object record) {
// TODO: need to find a way to invoke gRPC endpoint using channel
- if (channel == null) {
+ if (channels.isEmpty()) {
log.debug("gRPC telemetry service has not been enabled!");
}
@@ -105,6 +113,6 @@
@Override
public boolean isRunning() {
- return channel != null;
+ return !channels.isEmpty();
}
}
diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/PrometheusTelemetryConfigService.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbConfigsLoader.java
similarity index 64%
copy from apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/PrometheusTelemetryConfigService.java
copy to apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbConfigsLoader.java
index 4e115de..215e1f6 100644
--- a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/PrometheusTelemetryConfigService.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbConfigsLoader.java
@@ -13,10 +13,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.onosproject.openstacktelemetry.api;
+package org.onosproject.openstacktelemetry.impl;
+
+import org.osgi.service.component.annotations.Component;
/**
- * Configuration service API for publishing openstack telemetry through Prometheus producer.
+ * Loader for InfluxDB telemetry configurations.
*/
-public interface PrometheusTelemetryConfigService extends TelemetryConfigService {
+@Component(immediate = true)
+public class InfluxDbConfigsLoader extends AbstractTelemetryConfigLoader {
+ public InfluxDbConfigsLoader() {
+ super("influxdb-configs.xml");
+ }
}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryConfigManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryConfigManager.java
deleted file mode 100644
index d9107ca..0000000
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryConfigManager.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.openstacktelemetry.impl;
-
-import org.onlab.util.Tools;
-import org.onosproject.cfg.ComponentConfigService;
-import org.onosproject.openstacktelemetry.api.InfluxDbTelemetryAdminService;
-import org.onosproject.openstacktelemetry.api.InfluxDbTelemetryConfigService;
-import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
-import org.onosproject.openstacktelemetry.config.DefaultInfluxDbTelemetryConfig;
-import org.osgi.service.component.ComponentContext;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Deactivate;
-import org.osgi.service.component.annotations.Modified;
-import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.component.annotations.ReferenceCardinality;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Dictionary;
-
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_INFLUXDB_DATABASE;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_INFLUXDB_DATABASE_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_INFLUXDB_ENABLE_BATCH;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_INFLUXDB_ENABLE_BATCH_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_INFLUXDB_ENABLE_SERVICE;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_INFLUXDB_ENABLE_SERVICE_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_INFLUXDB_MEASUREMENT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_INFLUXDB_MEASUREMENT_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_INFLUXDB_PASSWORD;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_INFLUXDB_PASSWORD_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_INFLUXDB_SERVER_ADDRESS;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_INFLUXDB_SERVER_ADDRESS_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_INFLUXDB_SERVER_PORT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_INFLUXDB_SERVER_PORT_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_INFLUXDB_USERNAME;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_INFLUXDB_USERNAME_DEFAULT;
-import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.getBooleanProperty;
-import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.initTelemetryService;
-
-/**
- * InfluxDB server configuration manager for publishing openstack telemetry.
- */
-@Component(
- immediate = true,
- service = InfluxDbTelemetryConfigService.class,
- property = {
- PROP_INFLUXDB_ENABLE_SERVICE + ":Boolean=" + PROP_INFLUXDB_ENABLE_SERVICE_DEFAULT,
- PROP_INFLUXDB_SERVER_ADDRESS + "=" + PROP_INFLUXDB_SERVER_ADDRESS_DEFAULT,
- PROP_INFLUXDB_SERVER_PORT + ":Integer=" + PROP_INFLUXDB_SERVER_PORT_DEFAULT,
- PROP_INFLUXDB_USERNAME + "=" + PROP_INFLUXDB_USERNAME_DEFAULT,
- PROP_INFLUXDB_PASSWORD + "=" + PROP_INFLUXDB_PASSWORD_DEFAULT,
- PROP_INFLUXDB_DATABASE + "=" + PROP_INFLUXDB_DATABASE_DEFAULT,
- PROP_INFLUXDB_MEASUREMENT + "=" + PROP_INFLUXDB_MEASUREMENT_DEFAULT,
- PROP_INFLUXDB_ENABLE_BATCH + ":Boolean=" + PROP_INFLUXDB_ENABLE_BATCH_DEFAULT
- }
-)
-public class InfluxDbTelemetryConfigManager implements InfluxDbTelemetryConfigService {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected ComponentConfigService componentConfigService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected InfluxDbTelemetryAdminService influxDbTelemetryAdminService;
-
- /** Default IP address to establish initial connection to InfluxDB server. */
- protected String address = PROP_INFLUXDB_SERVER_ADDRESS_DEFAULT;
-
- /** Default port number to establish initial connection to InfluxDB server. */
- protected Integer port = PROP_INFLUXDB_SERVER_PORT_DEFAULT;
-
- /** Username used for authenticating against InfluxDB server. */
- protected String username = PROP_INFLUXDB_USERNAME_DEFAULT;
-
- /** Password used for authenticating against InfluxDB server. */
- protected String password = PROP_INFLUXDB_PASSWORD_DEFAULT;
-
- /** Database of InfluxDB server. */
- protected String database = PROP_INFLUXDB_DATABASE_DEFAULT;
-
- /** Measurement of InfluxDB server. */
- protected String measurement = PROP_INFLUXDB_MEASUREMENT_DEFAULT;
-
- /** Flag value of enabling batch mode of InfluxDB server. */
- protected Boolean enableBatch = PROP_INFLUXDB_ENABLE_SERVICE_DEFAULT;
-
- /** Specify the default behavior of telemetry service. */
- protected Boolean enableService = PROP_INFLUXDB_ENABLE_SERVICE_DEFAULT;
-
- @Activate
- protected void activate(ComponentContext context) {
- componentConfigService.registerProperties(getClass());
-
- if (enableService) {
- influxDbTelemetryAdminService.start(getConfig());
- }
- log.info("Started");
- }
-
- @Deactivate
- protected void deactivate() {
- componentConfigService.unregisterProperties(getClass(), false);
-
- if (enableService) {
- influxDbTelemetryAdminService.stop();
- }
- log.info("Stopped");
- }
-
- @Modified
- private void modified(ComponentContext context) {
- readComponentConfiguration(context);
- initTelemetryService(influxDbTelemetryAdminService, getConfig(), enableService);
- log.info("Modified");
- }
-
- @Override
- public TelemetryConfig getConfig() {
- return new DefaultInfluxDbTelemetryConfig.DefaultBuilder()
- .withAddress(address)
- .withPort(port)
- .withUsername(username)
- .withPassword(password)
- .withDatabase(database)
- .withMeasurement(measurement)
- .withEnableBatch(enableBatch)
- .build();
- }
-
- /**
- * Extracts properties from the component configuration context.
- *
- * @param context the component context
- */
- private void readComponentConfiguration(ComponentContext context) {
- Dictionary<?, ?> properties = context.getProperties();
-
- String addressStr = Tools.get(properties, PROP_INFLUXDB_SERVER_ADDRESS);
- address = addressStr != null ? addressStr : PROP_INFLUXDB_SERVER_ADDRESS_DEFAULT;
- log.info("Configured. InfluxDB server address is {}", address);
-
- Integer portConfigured = Tools.getIntegerProperty(properties, PROP_INFLUXDB_SERVER_PORT);
- if (portConfigured == null) {
- port = PROP_INFLUXDB_SERVER_PORT_DEFAULT;
- log.info("InfluxDB server port is NOT configured, default value is {}", port);
- } else {
- port = portConfigured;
- log.info("Configured. InfluxDB server port is {}", port);
- }
-
- String usernameStr = Tools.get(properties, PROP_INFLUXDB_USERNAME);
- username = usernameStr != null ? usernameStr : PROP_INFLUXDB_USERNAME_DEFAULT;
- log.info("Configured. InfluxDB server username is {}", username);
-
- String passwordStr = Tools.get(properties, PROP_INFLUXDB_PASSWORD);
- password = passwordStr != null ? passwordStr : PROP_INFLUXDB_PASSWORD_DEFAULT;
- log.info("Configured. InfluxDB server password is {}", password);
-
- String databaseStr = Tools.get(properties, PROP_INFLUXDB_DATABASE);
- database = databaseStr != null ? databaseStr : PROP_INFLUXDB_DATABASE_DEFAULT;
- log.info("Configured. InfluxDB server database is {}", database);
-
- String measurementStr = Tools.get(properties, PROP_INFLUXDB_MEASUREMENT);
- measurement = measurementStr != null ? measurementStr : PROP_INFLUXDB_MEASUREMENT_DEFAULT;
- log.info("Configured. InfluxDB server measurement is {}", measurement);
-
- Boolean enableBatchConfigured = getBooleanProperty(properties, PROP_INFLUXDB_ENABLE_BATCH);
- if (enableBatchConfigured == null) {
- enableBatch = PROP_INFLUXDB_ENABLE_BATCH_DEFAULT;
- log.info("InfluxDB server enable batch flag is " +
- "NOT configured, default value is {}", enableBatch);
- } else {
- enableBatch = enableBatchConfigured;
- log.info("Configured. InfluxDB server enable batch is {}", enableBatch);
- }
-
- Boolean enableServiceConfigured =
- getBooleanProperty(properties, PROP_INFLUXDB_ENABLE_SERVICE);
- if (enableServiceConfigured == null) {
- enableService = PROP_INFLUXDB_ENABLE_SERVICE_DEFAULT;
- log.info("InfluxDB service enable flag is NOT " +
- "configured, default value is {}", enableService);
- } else {
- enableService = enableServiceConfigured;
- log.info("Configured. InfluxDB service enable flag is {}", enableService);
- }
- }
-}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java
index 40528d4..796da19 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/InfluxDbTelemetryManager.java
@@ -24,6 +24,7 @@
import org.onosproject.openstacktelemetry.api.InfluxDbTelemetryAdminService;
import org.onosproject.openstacktelemetry.api.InfluxRecord;
import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigService;
import org.onosproject.openstacktelemetry.api.config.InfluxDbTelemetryConfig;
import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
import org.osgi.service.component.annotations.Activate;
@@ -34,8 +35,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.Map;
import java.util.Set;
+import static org.onosproject.openstacktelemetry.api.Constants.INFLUXDB_SCHEME;
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.INFLUXDB;
+import static org.onosproject.openstacktelemetry.config.DefaultInfluxDbTelemetryConfig.fromTelemetryConfig;
+
/**
* InfluxDB telemetry manager.
*/
@@ -70,10 +76,11 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected OpenstackTelemetryService openstackTelemetryService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected TelemetryConfigService telemetryConfigService;
+
private static final String INFLUX_PROTOCOL = "http";
- private InfluxDB producer = null;
- private String database = null;
- private String measurement = null;
+ private Map<String, InfluxDB> producers = null;
@Activate
protected void activate() {
@@ -93,51 +100,49 @@
}
@Override
- public void start(TelemetryConfig config) {
- if (producer != null) {
- log.info("InfluxDB producer has already been started");
- return;
- }
+ public void start() {
- InfluxDbTelemetryConfig influxDbConfig = (InfluxDbTelemetryConfig) config;
+ telemetryConfigService.getConfigsByType(INFLUXDB).forEach(c -> {
+ InfluxDbTelemetryConfig influxDbConfig = fromTelemetryConfig(c);
- StringBuilder influxDbServerBuilder = new StringBuilder();
- influxDbServerBuilder.append(INFLUX_PROTOCOL);
- influxDbServerBuilder.append(":");
- influxDbServerBuilder.append("//");
- influxDbServerBuilder.append(influxDbConfig.address());
- influxDbServerBuilder.append(":");
- influxDbServerBuilder.append(influxDbConfig.port());
+ if (influxDbConfig != null && !c.name().equals(INFLUXDB_SCHEME) && c.enabled()) {
+ StringBuilder influxDbServerBuilder = new StringBuilder();
+ influxDbServerBuilder.append(INFLUX_PROTOCOL);
+ influxDbServerBuilder.append(":");
+ influxDbServerBuilder.append("//");
+ influxDbServerBuilder.append(influxDbConfig.address());
+ influxDbServerBuilder.append(":");
+ influxDbServerBuilder.append(influxDbConfig.port());
- producer = InfluxDBFactory.connect(influxDbServerBuilder.toString(),
- influxDbConfig.username(), influxDbConfig.password());
- database = influxDbConfig.database();
- measurement = influxDbConfig.measurement();
+ InfluxDB producer = InfluxDBFactory.connect(influxDbServerBuilder.toString(),
+ influxDbConfig.username(), influxDbConfig.password());
+ producers.put(c.name(), producer);
+
+ createDB(producer, influxDbConfig.database());
+ }
+ });
log.info("InfluxDB producer has Started");
-
- createDB();
}
@Override
public void stop() {
- if (producer != null) {
- producer.close();
- producer = null;
+ if (producers != null) {
+ producers.values().forEach(InfluxDB::close);
}
log.info("InfluxDB producer has stopped");
}
@Override
- public void restart(TelemetryConfig config) {
+ public void restart() {
stop();
- start(config);
+ start();
}
@Override
public void publish(InfluxRecord<String, Set<FlowInfo>> record) {
- if (producer == null) {
+ if (producers == null || producers.isEmpty()) {
log.debug("InfluxDB telemetry service has not been enabled!");
return;
}
@@ -149,53 +154,61 @@
log.debug("Publish {} stats records to InfluxDB", record.flowInfos().size());
- BatchPoints batchPoints = BatchPoints.database(database).build();
+ producers.forEach((k, v) -> {
+ TelemetryConfig config = telemetryConfigService.getConfig(k);
+ InfluxDbTelemetryConfig influxDbConfig = fromTelemetryConfig(config);
- for (FlowInfo flowInfo: record.flowInfos()) {
- Point.Builder pointBuilder = Point
- .measurement((measurement == null) ? record.measurement() : measurement)
- .tag(FLOW_TYPE, String.valueOf(flowInfo.flowType()))
- .tag(DEVICE_ID, flowInfo.deviceId().toString())
- .tag(INPUT_INTERFACE_ID, String.valueOf(flowInfo.inputInterfaceId()))
- .tag(OUTPUT_INTERFACE_ID, String.valueOf(flowInfo.outputInterfaceId()))
- .tag(VXLAN_ID, String.valueOf(flowInfo.vxlanId()))
- .tag(SRC_IP, flowInfo.srcIp().toString())
- .tag(DST_IP, flowInfo.dstIp().toString())
- .tag(DST_PORT, getTpPort(flowInfo.dstPort()))
- .tag(PROTOCOL, String.valueOf(flowInfo.protocol()))
- .addField(STARTUP_TIME, flowInfo.statsInfo().startupTime())
- .addField(FST_PKT_ARR_TIME, flowInfo.statsInfo().fstPktArrTime())
- .addField(LST_PKT_OFFSET, flowInfo.statsInfo().lstPktOffset())
- .addField(PREV_ACC_BYTES, flowInfo.statsInfo().prevAccBytes())
- .addField(PREV_ACC_PKTS, flowInfo.statsInfo().prevAccPkts())
- .addField(CURR_ACC_BYTES, flowInfo.statsInfo().currAccBytes())
- .addField(CURR_ACC_PKTS, flowInfo.statsInfo().currAccPkts())
- .addField(ERROR_PKTS, flowInfo.statsInfo().errorPkts())
- .addField(DROP_PKTS, flowInfo.statsInfo().dropPkts());
+ String database = influxDbConfig.database();
+ String measurement = influxDbConfig.measurement();
- if (flowInfo.vlanId() != null) {
- pointBuilder.tag(VLAN_ID, flowInfo.vlanId().toString());
+ BatchPoints batchPoints = BatchPoints.database(database).build();
+
+ for (FlowInfo flowInfo: record.flowInfos()) {
+ Point.Builder pointBuilder = Point
+ .measurement((measurement == null) ? record.measurement() : measurement)
+ .tag(FLOW_TYPE, String.valueOf(flowInfo.flowType()))
+ .tag(DEVICE_ID, flowInfo.deviceId().toString())
+ .tag(INPUT_INTERFACE_ID, String.valueOf(flowInfo.inputInterfaceId()))
+ .tag(OUTPUT_INTERFACE_ID, String.valueOf(flowInfo.outputInterfaceId()))
+ .tag(VXLAN_ID, String.valueOf(flowInfo.vxlanId()))
+ .tag(SRC_IP, flowInfo.srcIp().toString())
+ .tag(DST_IP, flowInfo.dstIp().toString())
+ .tag(DST_PORT, getTpPort(flowInfo.dstPort()))
+ .tag(PROTOCOL, String.valueOf(flowInfo.protocol()))
+ .addField(STARTUP_TIME, flowInfo.statsInfo().startupTime())
+ .addField(FST_PKT_ARR_TIME, flowInfo.statsInfo().fstPktArrTime())
+ .addField(LST_PKT_OFFSET, flowInfo.statsInfo().lstPktOffset())
+ .addField(PREV_ACC_BYTES, flowInfo.statsInfo().prevAccBytes())
+ .addField(PREV_ACC_PKTS, flowInfo.statsInfo().prevAccPkts())
+ .addField(CURR_ACC_BYTES, flowInfo.statsInfo().currAccBytes())
+ .addField(CURR_ACC_PKTS, flowInfo.statsInfo().currAccPkts())
+ .addField(ERROR_PKTS, flowInfo.statsInfo().errorPkts())
+ .addField(DROP_PKTS, flowInfo.statsInfo().dropPkts());
+
+ if (flowInfo.vlanId() != null) {
+ pointBuilder.tag(VLAN_ID, flowInfo.vlanId().toString());
+ }
+
+ if (flowInfo.srcPort() != null) {
+ pointBuilder.tag(SRC_PORT, getTpPort(flowInfo.srcPort()));
+ }
+
+ if (flowInfo.dstPort() != null) {
+ pointBuilder.tag(DST_PORT, getTpPort(flowInfo.dstPort()));
+ }
+
+ batchPoints.point(pointBuilder.build());
}
-
- if (flowInfo.srcPort() != null) {
- pointBuilder.tag(SRC_PORT, getTpPort(flowInfo.srcPort()));
- }
-
- if (flowInfo.dstPort() != null) {
- pointBuilder.tag(DST_PORT, getTpPort(flowInfo.dstPort()));
- }
-
- batchPoints.point(pointBuilder.build());
- }
- producer.write(batchPoints);
+ v.write(batchPoints);
+ });
}
@Override
public boolean isRunning() {
- return producer != null;
+ return !producers.isEmpty();
}
- private void createDB() {
+ private void createDB(InfluxDB producer, String database) {
if (producer.databaseExists(database)) {
log.debug("Database {} is already created", database);
} else {
diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/PrometheusTelemetryConfigService.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaConfigsLoader.java
similarity index 65%
copy from apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/PrometheusTelemetryConfigService.java
copy to apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaConfigsLoader.java
index 4e115de..1eca176 100644
--- a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/PrometheusTelemetryConfigService.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaConfigsLoader.java
@@ -13,10 +13,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.onosproject.openstacktelemetry.api;
+package org.onosproject.openstacktelemetry.impl;
+
+import org.osgi.service.component.annotations.Component;
/**
- * Configuration service API for publishing openstack telemetry through Prometheus producer.
+ * Loader for kafka telemetry configurations.
*/
-public interface PrometheusTelemetryConfigService extends TelemetryConfigService {
+@Component(immediate = true)
+public class KafkaConfigsLoader extends AbstractTelemetryConfigLoader {
+ public KafkaConfigsLoader() {
+ super("kafka-configs.xml");
+ }
}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryConfigManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryConfigManager.java
deleted file mode 100644
index 6621e0b..0000000
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryConfigManager.java
+++ /dev/null
@@ -1,240 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.openstacktelemetry.impl;
-
-import org.onlab.util.Tools;
-import org.onosproject.cfg.ComponentConfigService;
-import org.onosproject.openstacktelemetry.api.KafkaTelemetryAdminService;
-import org.onosproject.openstacktelemetry.api.KafkaTelemetryConfigService;
-import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
-import org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig;
-import org.osgi.service.component.ComponentContext;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Deactivate;
-import org.osgi.service.component.annotations.Modified;
-import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.component.annotations.ReferenceCardinality;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Dictionary;
-
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_ADDRESS;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_ADDRESS_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_BATCH_SIZE;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_BATCH_SIZE_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_ENABLE_SERVICE;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_ENABLE_SERVICE_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_KEY_SERIALIZER;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_KEY_SERIALIZER_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_LINGER_MS;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_LINGER_MS_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_MEMORY_BUFFER;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_MEMORY_BUFFER_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_PORT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_PORT_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_REQUIRED_ACKS;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_REQUIRED_ACKS_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_RETRIES;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_RETRIES_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_VALUE_SERIALIZER;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_KAFKA_VALUE_SERIALIZER_DEFAULT;
-import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.getBooleanProperty;
-import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.initTelemetryService;
-
-/**
- * Kafka server configuration manager for publishing openstack telemetry.
- */
-@Component(
- immediate = true,
- service = KafkaTelemetryConfigService.class,
- property = {
- PROP_KAFKA_ADDRESS + "=" + PROP_KAFKA_ADDRESS_DEFAULT,
- PROP_KAFKA_PORT + ":Integer=" + PROP_KAFKA_PORT_DEFAULT,
- PROP_KAFKA_RETRIES + ":Integer=" + PROP_KAFKA_RETRIES_DEFAULT,
- PROP_KAFKA_REQUIRED_ACKS + "=" + PROP_KAFKA_REQUIRED_ACKS_DEFAULT,
- PROP_KAFKA_BATCH_SIZE + ":Integer=" + PROP_KAFKA_BATCH_SIZE_DEFAULT,
- PROP_KAFKA_LINGER_MS + ":Integer=" + PROP_KAFKA_LINGER_MS_DEFAULT,
- PROP_KAFKA_MEMORY_BUFFER + ":Integer=" + PROP_KAFKA_MEMORY_BUFFER_DEFAULT,
- PROP_KAFKA_KEY_SERIALIZER + "=" + PROP_KAFKA_KEY_SERIALIZER_DEFAULT,
- PROP_KAFKA_VALUE_SERIALIZER + "=" + PROP_KAFKA_VALUE_SERIALIZER_DEFAULT,
- PROP_KAFKA_ENABLE_SERVICE + ":Boolean=" + PROP_KAFKA_ENABLE_SERVICE_DEFAULT
- }
-)
-public class KafkaTelemetryConfigManager implements KafkaTelemetryConfigService {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected ComponentConfigService componentConfigService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected KafkaTelemetryAdminService kafkaTelemetryAdminService;
-
- /** Default IP address to establish initial connection to Kafka server. */
- protected String address = PROP_KAFKA_ADDRESS_DEFAULT;
-
- /** Default port number to establish initial connection to Kafka server. */
- protected Integer port = PROP_KAFKA_PORT_DEFAULT;
-
- /** Number of times the producer can retry to send after first failure. */
- protected int retries = PROP_KAFKA_RETRIES_DEFAULT;
-
- /** Producer will get an acknowledgement after the leader has replicated the data. */
- protected String requiredAcks = PROP_KAFKA_REQUIRED_ACKS_DEFAULT;
-
- /** The largest record batch size allowed by Kafka. */
- protected Integer batchSize = PROP_KAFKA_BATCH_SIZE_DEFAULT;
-
- /** The producer groups together any records that arrive between request transmissions into a single batch. */
- protected Integer lingerMs = PROP_KAFKA_LINGER_MS_DEFAULT;
-
- /** The total memory used for log cleaner I/O buffers across all cleaner threads. */
- protected Integer memoryBuffer = PROP_KAFKA_MEMORY_BUFFER_DEFAULT;
-
- /** Serializer class for key that implements the Serializer interface. */
- protected String keySerializer = PROP_KAFKA_KEY_SERIALIZER_DEFAULT;
-
- /** Serializer class for value that implements the Serializer interface. */
- protected String valueSerializer = PROP_KAFKA_VALUE_SERIALIZER_DEFAULT;
-
- /** Specify the default behavior of telemetry service. */
- protected Boolean enableService = PROP_KAFKA_ENABLE_SERVICE_DEFAULT;
-
- @Activate
- protected void activate(ComponentContext context) {
- componentConfigService.registerProperties(getClass());
-
- if (enableService) {
- kafkaTelemetryAdminService.start(getConfig());
- }
- log.info("Started");
- }
-
- @Deactivate
- protected void deactivate() {
- componentConfigService.unregisterProperties(getClass(), false);
-
- if (enableService) {
- kafkaTelemetryAdminService.stop();
- }
- log.info("Stopped");
- }
-
- @Modified
- private void modified(ComponentContext context) {
- readComponentConfiguration(context);
- initTelemetryService(kafkaTelemetryAdminService, getConfig(), enableService);
- log.info("Modified");
- }
-
- @Override
- public TelemetryConfig getConfig() {
- return new DefaultKafkaTelemetryConfig.DefaultBuilder()
- .withAddress(address)
- .withPort(port)
- .withRetries(retries)
- .withRequiredAcks(requiredAcks)
- .withBatchSize(batchSize)
- .withLingerMs(lingerMs)
- .withMemoryBuffer(memoryBuffer)
- .withKeySerializer(keySerializer)
- .withValueSerializer(valueSerializer)
- .build();
- }
-
- /**
- * Extracts properties from the component configuration context.
- *
- * @param context the component context
- */
- private void readComponentConfiguration(ComponentContext context) {
- Dictionary<?, ?> properties = context.getProperties();
-
- String addressStr = Tools.get(properties, PROP_KAFKA_ADDRESS);
- address = addressStr != null ? addressStr : PROP_KAFKA_ADDRESS_DEFAULT;
- log.info("Configured. Kafka server address is {}", address);
-
- Integer portConfigured = Tools.getIntegerProperty(properties, PROP_KAFKA_PORT);
- if (portConfigured == null) {
- port = PROP_KAFKA_PORT_DEFAULT;
- log.info("Kafka server port is NOT configured, default value is {}", port);
- } else {
- port = portConfigured;
- log.info("Configured. Kafka server port is {}", port);
- }
-
- Integer retriesConfigured = Tools.getIntegerProperty(properties, PROP_KAFKA_RETRIES);
- if (retriesConfigured == null) {
- retries = PROP_KAFKA_RETRIES_DEFAULT;
- log.info("Kafka number of retries property is NOT configured, default value is {}", retries);
- } else {
- retries = retriesConfigured;
- log.info("Configured. Kafka number of retries is {}", retries);
- }
-
- String requiredAcksStr = Tools.get(properties, PROP_KAFKA_REQUIRED_ACKS);
- requiredAcks = requiredAcksStr != null ? requiredAcksStr : PROP_KAFKA_REQUIRED_ACKS_DEFAULT;
- log.info("Configured, Kafka required acknowledgement is {}", requiredAcks);
-
- Integer batchSizeConfigured = Tools.getIntegerProperty(properties, PROP_KAFKA_BATCH_SIZE);
- if (batchSizeConfigured == null) {
- batchSize = PROP_KAFKA_BATCH_SIZE_DEFAULT;
- log.info("Kafka batch size property is NOT configured, default value is {}", batchSize);
- } else {
- batchSize = batchSizeConfigured;
- log.info("Configured. Kafka batch size is {}", batchSize);
- }
-
- Integer lingerMsConfigured = Tools.getIntegerProperty(properties, PROP_KAFKA_LINGER_MS);
- if (lingerMsConfigured == null) {
- lingerMs = PROP_KAFKA_LINGER_MS_DEFAULT;
- log.info("Kafka lingerMs property is NOT configured, default value is {}", lingerMs);
- } else {
- lingerMs = lingerMsConfigured;
- log.info("Configured. Kafka lingerMs is {}", lingerMs);
- }
-
- Integer memoryBufferConfigured = Tools.getIntegerProperty(properties, PROP_KAFKA_MEMORY_BUFFER);
- if (memoryBufferConfigured == null) {
- memoryBuffer = PROP_KAFKA_MEMORY_BUFFER_DEFAULT;
- log.info("Kafka memory buffer property is NOT configured, default value is {}", memoryBuffer);
- } else {
- memoryBuffer = memoryBufferConfigured;
- log.info("Configured. Kafka memory buffer is {}", memoryBuffer);
- }
-
- String keySerializerStr = Tools.get(properties, PROP_KAFKA_KEY_SERIALIZER);
- keySerializer = keySerializerStr != null ? keySerializerStr : PROP_KAFKA_KEY_SERIALIZER_DEFAULT;
- log.info("Configured, Kafka key serializer is {}", keySerializer);
-
- String valueSerializerStr = Tools.get(properties, PROP_KAFKA_VALUE_SERIALIZER);
- valueSerializer = valueSerializerStr != null ? valueSerializerStr : PROP_KAFKA_VALUE_SERIALIZER_DEFAULT;
- log.info("Configured, Kafka value serializer is {}", valueSerializer);
-
- Boolean enableServiceConfigured =
- getBooleanProperty(properties, PROP_KAFKA_ENABLE_SERVICE);
- if (enableServiceConfigured == null) {
- enableService = PROP_KAFKA_ENABLE_SERVICE_DEFAULT;
- log.info("Kafka service enable flag is NOT " +
- "configured, default value is {}", enableService);
- } else {
- enableService = enableServiceConfigured;
- log.info("Configured. Kafka service enable flag is {}", enableService);
- }
- }
-}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
index e895e46..85f36e3 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/KafkaTelemetryManager.java
@@ -15,12 +15,17 @@
*/
package org.onosproject.openstacktelemetry.impl;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
+import org.onosproject.openstacktelemetry.api.FlowInfo;
import org.onosproject.openstacktelemetry.api.KafkaTelemetryAdminService;
import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
+import org.onosproject.openstacktelemetry.api.TelemetryCodec;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigService;
import org.onosproject.openstacktelemetry.api.config.KafkaTelemetryConfig;
import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
import org.osgi.service.component.annotations.Activate;
@@ -31,9 +36,16 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.nio.ByteBuffer;
+import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.Future;
+import static org.onosproject.openstacktelemetry.api.Constants.KAFKA_SCHEME;
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.KAFKA;
+import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.fromTelemetryConfig;
+
/**
* Kafka telemetry manager.
*/
@@ -42,6 +54,8 @@
private final Logger log = LoggerFactory.getLogger(getClass());
+ private static final String CODEC_PREFIX = "org.onosproject.openstacktelemetry.codec.";
+
private static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
private static final String RETRIES = "retries";
private static final String ACKS = "acks";
@@ -54,7 +68,10 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected OpenstackTelemetryService openstackTelemetryService;
- private Producer<String, byte[]> producer = null;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected TelemetryConfigService telemetryConfigService;
+
+ private Map<String, Producer<String, byte[]>> producers = Maps.newConcurrentMap();
@Activate
protected void activate() {
@@ -74,64 +91,83 @@
}
@Override
- public void start(TelemetryConfig config) {
- if (producer != null) {
- log.info("Kafka producer has already been started");
- return;
- }
+ public void start() {
+ telemetryConfigService.getConfigsByType(KAFKA).forEach(c -> {
+ KafkaTelemetryConfig kafkaConfig = fromTelemetryConfig(c);
- KafkaTelemetryConfig kafkaConfig = (KafkaTelemetryConfig) config;
+ if (kafkaConfig != null && !c.name().equals(KAFKA_SCHEME) && c.enabled()) {
+ StringBuilder kafkaServerBuilder = new StringBuilder();
+ kafkaServerBuilder.append(kafkaConfig.address());
+ kafkaServerBuilder.append(":");
+ kafkaServerBuilder.append(kafkaConfig.port());
- StringBuilder kafkaServerBuilder = new StringBuilder();
- kafkaServerBuilder.append(kafkaConfig.address());
- kafkaServerBuilder.append(":");
- kafkaServerBuilder.append(kafkaConfig.port());
+ // Configure Kafka server properties
+ Properties prop = new Properties();
+ prop.put(BOOTSTRAP_SERVERS, kafkaServerBuilder.toString());
+ prop.put(RETRIES, kafkaConfig.retries());
+ prop.put(ACKS, kafkaConfig.requiredAcks());
+ prop.put(BATCH_SIZE, kafkaConfig.batchSize());
+ prop.put(LINGER_MS, kafkaConfig.lingerMs());
+ prop.put(MEMORY_BUFFER, kafkaConfig.memoryBuffer());
+ prop.put(KEY_SERIALIZER, kafkaConfig.keySerializer());
+ prop.put(VALUE_SERIALIZER, kafkaConfig.valueSerializer());
- // Configure Kafka server properties
- Properties prop = new Properties();
- prop.put(BOOTSTRAP_SERVERS, kafkaServerBuilder.toString());
- prop.put(RETRIES, kafkaConfig.retries());
- prop.put(ACKS, kafkaConfig.requiredAcks());
- prop.put(BATCH_SIZE, kafkaConfig.batchSize());
- prop.put(LINGER_MS, kafkaConfig.lingerMs());
- prop.put(MEMORY_BUFFER, kafkaConfig.memoryBuffer());
- prop.put(KEY_SERIALIZER, kafkaConfig.keySerializer());
- prop.put(VALUE_SERIALIZER, kafkaConfig.valueSerializer());
+ producers.put(c.name(), new KafkaProducer<>(prop));
+ }
+ });
- producer = new KafkaProducer<>(prop);
log.info("Kafka producer has Started");
}
@Override
public void stop() {
- if (producer != null) {
- producer.close();
- producer = null;
+ if (!producers.isEmpty()) {
+ producers.values().forEach(Producer::close);
}
+ producers.clear();
+
log.info("Kafka producer has Stopped");
}
@Override
- public void restart(TelemetryConfig config) {
+ public void restart() {
stop();
- start(config);
+ start();
}
@Override
- public Future<RecordMetadata> publish(ProducerRecord<String, byte[]> record) {
+ public Set<Future<RecordMetadata>> publish(Set<FlowInfo> flowInfos) {
- if (producer == null) {
+ if (producers == null || producers.isEmpty()) {
log.debug("Kafka telemetry service has not been enabled!");
return null;
}
log.debug("Send telemetry record to kafka server...");
- return producer.send(record);
+ Set<Future<RecordMetadata>> futureSet = Sets.newHashSet();
+ producers.forEach((k, v) -> {
+ TelemetryConfig config = telemetryConfigService.getConfig(k);
+ KafkaTelemetryConfig kafkaConfig =
+ fromTelemetryConfig(config);
+
+ try {
+ Class codecClazz = Class.forName(CODEC_PREFIX + kafkaConfig.codec());
+ TelemetryCodec codec = (TelemetryCodec) codecClazz.newInstance();
+
+ ByteBuffer buffer = codec.encode(flowInfos);
+ ProducerRecord record = new ProducerRecord<>(
+ kafkaConfig.topic(), kafkaConfig.key(), buffer.array());
+ futureSet.add(v.send(record));
+ } catch (ClassNotFoundException | IllegalAccessException | InstantiationException e) {
+ log.warn("Failed to send telemetry record due to {}", e);
+ }
+ });
+ return futureSet;
}
@Override
public boolean isRunning() {
- return producer != null;
+ return !producers.isEmpty();
}
}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java
index 783dbd6..ce325cd 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManager.java
@@ -17,8 +17,6 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.openstacktelemetry.api.FlowInfo;
import org.onosproject.openstacktelemetry.api.GrpcTelemetryService;
import org.onosproject.openstacktelemetry.api.InfluxDbTelemetryService;
@@ -26,8 +24,10 @@
import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
import org.onosproject.openstacktelemetry.api.PrometheusTelemetryService;
import org.onosproject.openstacktelemetry.api.RestTelemetryService;
-import org.onosproject.openstacktelemetry.api.TelemetryService;
-import org.onosproject.openstacktelemetry.codec.TinaMessageByteBufferCodec;
+import org.onosproject.openstacktelemetry.api.TelemetryAdminService;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigEvent;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigListener;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigService;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
@@ -36,14 +36,10 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.nio.ByteBuffer;
import java.util.List;
import java.util.Set;
import static org.onosproject.openstacktelemetry.api.Constants.DEFAULT_INFLUXDB_MEASUREMENT;
-import static org.onosproject.openstacktelemetry.codec.TinaMessageByteBufferCodec.KAFKA_KEY;
-import static org.onosproject.openstacktelemetry.codec.TinaMessageByteBufferCodec.KAFKA_TOPIC;
-import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.getPropertyValueAsBoolean;
/**
* Openstack telemetry manager.
@@ -53,63 +49,57 @@
private final Logger log = LoggerFactory.getLogger(getClass());
- private static final String ENABLE_SERVICE = "enableService";
-
@Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected ComponentConfigService componentConfigService;
+ protected TelemetryConfigService telemetryConfigService;
- private List<TelemetryService> telemetryServices = Lists.newArrayList();
+ private List<TelemetryAdminService> telemetryServices = Lists.newArrayList();
+ private InternalTelemetryConfigListener
+ configListener = new InternalTelemetryConfigListener();
@Activate
protected void activate() {
+ telemetryConfigService.addListener(configListener);
+
log.info("Started");
}
@Deactivate
protected void deactivate() {
+ telemetryConfigService.removeListener(configListener);
+
log.info("Stopped");
}
@Override
- public void addTelemetryService(TelemetryService telemetryService) {
+ public void addTelemetryService(TelemetryAdminService telemetryService) {
telemetryServices.add(telemetryService);
}
@Override
- public void removeTelemetryService(TelemetryService telemetryService) {
+ public void removeTelemetryService(TelemetryAdminService telemetryService) {
telemetryServices.remove(telemetryService);
}
@Override
public void publish(Set<FlowInfo> flowInfos) {
telemetryServices.forEach(service -> {
- if (service instanceof GrpcTelemetryManager &&
- getPropertyValueAsBoolean(componentConfigService.getProperties(
- GrpcTelemetryConfigManager.class.getName()), ENABLE_SERVICE)) {
+ if (service instanceof GrpcTelemetryManager) {
invokeGrpcPublisher((GrpcTelemetryService) service, flowInfos);
}
- if (service instanceof InfluxDbTelemetryManager &&
- getPropertyValueAsBoolean(componentConfigService.getProperties(
- InfluxDbTelemetryConfigManager.class.getName()), ENABLE_SERVICE)) {
+ if (service instanceof InfluxDbTelemetryManager) {
invokeInfluxDbPublisher((InfluxDbTelemetryService) service, flowInfos);
}
- if (service instanceof PrometheusTelemetryManager &&
- getPropertyValueAsBoolean(componentConfigService.getProperties(
- PrometheusTelemetryConfigManager.class.getName()), ENABLE_SERVICE)) {
+ if (service instanceof PrometheusTelemetryManager) {
invokePrometheusPublisher((PrometheusTelemetryService) service, flowInfos);
}
- if (service instanceof KafkaTelemetryManager &&
- getPropertyValueAsBoolean(componentConfigService.getProperties(
- KafkaTelemetryConfigManager.class.getName()), ENABLE_SERVICE)) {
+ if (service instanceof KafkaTelemetryManager) {
invokeKafkaPublisher((KafkaTelemetryService) service, flowInfos);
}
- if (service instanceof RestTelemetryManager &&
- getPropertyValueAsBoolean(componentConfigService.getProperties(
- RestTelemetryConfigManager.class.getName()), ENABLE_SERVICE)) {
+ if (service instanceof RestTelemetryManager) {
invokeRestPublisher((RestTelemetryService) service, flowInfos);
}
@@ -118,31 +108,55 @@
}
@Override
- public Set<TelemetryService> telemetryServices() {
+ public Set<TelemetryAdminService> telemetryServices() {
return ImmutableSet.copyOf(telemetryServices);
}
- private void invokeGrpcPublisher(GrpcTelemetryService service, Set<FlowInfo> flowInfos) {
+ @Override
+ public TelemetryAdminService telemetryService(String type) {
+ return telemetryServices.stream()
+ .filter(s -> s.type().name().equalsIgnoreCase(type))
+ .findFirst()
+ .orElse(null);
+ }
+
+ private void invokeGrpcPublisher(GrpcTelemetryService service,
+ Set<FlowInfo> flowInfos) {
// TODO: need provide implementation
}
- private void invokeInfluxDbPublisher(InfluxDbTelemetryService service, Set<FlowInfo> flowInfos) {
+ private void invokeInfluxDbPublisher(InfluxDbTelemetryService service,
+ Set<FlowInfo> flowInfos) {
DefaultInfluxRecord<String, Set<FlowInfo>> influxRecord
= new DefaultInfluxRecord<>(DEFAULT_INFLUXDB_MEASUREMENT, flowInfos);
service.publish(influxRecord);
}
- private void invokePrometheusPublisher(PrometheusTelemetryService service, Set<FlowInfo> flowInfos) {
+ private void invokePrometheusPublisher(PrometheusTelemetryService service,
+ Set<FlowInfo> flowInfos) {
service.publish(flowInfos);
}
- private void invokeKafkaPublisher(KafkaTelemetryService service, Set<FlowInfo> flowInfos) {
- TinaMessageByteBufferCodec codec = new TinaMessageByteBufferCodec();
- ByteBuffer buffer = codec.encode(flowInfos);
- service.publish(new ProducerRecord<>(KAFKA_TOPIC, KAFKA_KEY, buffer.array()));
+ private void invokeKafkaPublisher(KafkaTelemetryService service,
+ Set<FlowInfo> flowInfos) {
+ service.publish(flowInfos);
}
- private void invokeRestPublisher(RestTelemetryService service, Set<FlowInfo> flowInfos) {
+ private void invokeRestPublisher(RestTelemetryService service,
+ Set<FlowInfo> flowInfos) {
// TODO: need provide implementation
}
+
+ private class InternalTelemetryConfigListener implements TelemetryConfigListener {
+
+ @Override
+ public void event(TelemetryConfigEvent event) {
+ TelemetryAdminService service =
+ telemetryService(event.subject().type().name());
+
+ if (service != null) {
+ service.restart();
+ }
+ }
+ }
}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OsgiPropertyConstants.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OsgiPropertyConstants.java
index 325bce2..cc754f5 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OsgiPropertyConstants.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/OsgiPropertyConstants.java
@@ -23,65 +23,7 @@
private OsgiPropertyConstants() {
}
- // REST telemetry
-
- static final String PROP_REST_ENABLE_SERVICE = "enableService";
- static final boolean PROP_REST_ENABLE_SERVICE_DEFAULT = false;
-
- static final String PROP_REST_SERVER_ADDRESS = "address";
- static final String PROP_REST_SERVER_ADDRESS_DEFAULT = "localhost";
-
- static final String PROP_REST_SERVER_PORT = "port";
- static final int PROP_REST_SERVER_PORT_DEFAULT = 80;
-
- static final String PROP_REST_ENDPOINT = "endpoint";
- static final String PROP_REST_ENDPOINT_DEFAULT = "telemetry";
-
- static final String PROP_REST_METHOD = "method";
- static final String PROP_REST_METHOD_DEFAULT = "POST";
-
- static final String PROP_REST_REQUEST_MEDIA_TYPE = "requestMediaType";
- static final String PROP_REST_REQUEST_MEDIA_TYPE_DEFAULT = "application/json";
-
- static final String PROP_REST_RESPONSE_MEDIA_TYPE = "responseMediaType";
- static final String PROP_REST_RESPONSE_MEDIA_TYPE_DEFAULT = "application/json";
-
- // Kafka telemetry
-
- static final String PROP_KAFKA_ENABLE_SERVICE = "enableService";
- static final boolean PROP_KAFKA_ENABLE_SERVICE_DEFAULT = false;
-
- static final String PROP_KAFKA_ADDRESS = "address";
- static final String PROP_KAFKA_ADDRESS_DEFAULT = "localhost";
-
- static final String PROP_KAFKA_PORT = "port";
- static final int PROP_KAFKA_PORT_DEFAULT = 9092;
-
- static final String PROP_KAFKA_RETRIES = "retries";
- static final int PROP_KAFKA_RETRIES_DEFAULT = 0;
-
- static final String PROP_KAFKA_REQUIRED_ACKS = "requiredAcks";
- static final String PROP_KAFKA_REQUIRED_ACKS_DEFAULT = "all";
-
- static final String PROP_KAFKA_BATCH_SIZE = "batchSize";
- static final int PROP_KAFKA_BATCH_SIZE_DEFAULT = 16384;
-
- static final String PROP_KAFKA_LINGER_MS = "lingerMs";
- static final int PROP_KAFKA_LINGER_MS_DEFAULT = 1;
-
- static final String PROP_KAFKA_MEMORY_BUFFER = "memoryBuffer";
- static final int PROP_KAFKA_MEMORY_BUFFER_DEFAULT = 33554432;
-
- static final String PROP_KAFKA_KEY_SERIALIZER = "keySerializer";
- static final String PROP_KAFKA_KEY_SERIALIZER_DEFAULT =
- "org.apache.kafka.common.serialization.StringSerializer";
-
- static final String PROP_KAFKA_VALUE_SERIALIZER = "valueSerializer";
- static final String PROP_KAFKA_VALUE_SERIALIZER_DEFAULT =
- "org.apache.kafka.common.serialization.ByteArraySerializer";
-
// Stats flow rule manager
-
static final String PROP_REVERSE_PATH_STATS = "reversePathStats";
static final boolean PROP_REVERSE_PATH_STATS_DEFAULT = false;
@@ -96,56 +38,4 @@
static final String PROP_MONITOR_UNDERLAY = "monitorUnderlay";
static final boolean PROP_MONITOR_UNDERLAY_DEFAULT = true;
-
- // Influx DB Telemetry config manager
-
- static final String PROP_INFLUXDB_ENABLE_SERVICE = "enableService";
- static final boolean PROP_INFLUXDB_ENABLE_SERVICE_DEFAULT = false;
-
- static final String PROP_INFLUXDB_SERVER_ADDRESS = "address";
- static final String PROP_INFLUXDB_SERVER_ADDRESS_DEFAULT = "localhost";
-
- static final String PROP_INFLUXDB_SERVER_PORT = "port";
- static final int PROP_INFLUXDB_SERVER_PORT_DEFAULT = 8086;
-
- static final String PROP_INFLUXDB_USERNAME = "username";
- static final String PROP_INFLUXDB_USERNAME_DEFAULT = "onos";
-
- static final String PROP_INFLUXDB_PASSWORD = "password";
- static final String PROP_INFLUXDB_PASSWORD_DEFAULT = "onos";
-
- static final String PROP_INFLUXDB_DATABASE = "database";
- static final String PROP_INFLUXDB_DATABASE_DEFAULT = "onos";
-
- static final String PROP_INFLUXDB_MEASUREMENT = "measurement";
- static final String PROP_INFLUXDB_MEASUREMENT_DEFAULT = "sonaflow";
-
- static final String PROP_INFLUXDB_ENABLE_BATCH = "enableBatch";
- static final boolean PROP_INFLUXDB_ENABLE_BATCH_DEFAULT = true;
-
- // GRPC Telemetry config manager
- static final String PROP_GRPC_ENABLE_SERVICE = "enableService";
- static final boolean GRPC_ENABLE_SERVICE_DEFAULT = false;
-
- static final String PROP_GRPC_SERVER_ADDRESS = "address";
- static final String GRPC_SERVER_ADDRESS_DEFAULT = "localhost";
-
- static final String PROP_GRPC_SERVER_PORT = "port";
- static final int GRPC_SERVER_PORT_DEFAULT = 50051;
-
- static final String PROP_GRPC_USE_PLAINTEXT = "usePlaintext";
- static final boolean GRPC_USE_PLAINTEXT_DEFAULT = true;
-
- static final String PROP_GRPC_MAX_INBOUND_MSG_SIZE = "maxInboundMsgSize";
- static final int GRPC_MAX_INBOUND_MSG_SIZE_DEFAULT = 4194304; //4 * 1024 * 1024;
-
- // Prometheus Telemetry config manager
- static final String PROP_PROMETHEUS_ENABLE_SERVICE = "enableService";
- static final boolean PROP_PROMETHEUS_ENABLE_SERVICE_DEFAULT = true;
-
- static final String PROP_PROMETHEUS_EXPORTER_ADDRESS = "address";
- public static final String PROP_PROMETHEUS_EXPORTER_ADDRESS_DEFAULT = "localhost";
-
- static final String PROP_PROMETHEUS_EXPORTER_PORT = "port";
- public static final int PROP_PROMETHEUS_EXPORTER_PORT_DEFAULT = 9555;
}
diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/PrometheusTelemetryConfigService.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/PrometheusConfigsLoader.java
similarity index 63%
copy from apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/PrometheusTelemetryConfigService.java
copy to apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/PrometheusConfigsLoader.java
index 4e115de..a7b740d 100644
--- a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/PrometheusTelemetryConfigService.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/PrometheusConfigsLoader.java
@@ -13,10 +13,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.onosproject.openstacktelemetry.api;
+package org.onosproject.openstacktelemetry.impl;
+
+import org.osgi.service.component.annotations.Component;
/**
- * Configuration service API for publishing openstack telemetry through Prometheus producer.
+ * Loader for prometheus telemetry configurations.
*/
-public interface PrometheusTelemetryConfigService extends TelemetryConfigService {
+@Component(immediate = true)
+public class PrometheusConfigsLoader extends AbstractTelemetryConfigLoader {
+ public PrometheusConfigsLoader() {
+ super("prometheus-configs.xml");
+ }
}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/PrometheusTelemetryConfigManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/PrometheusTelemetryConfigManager.java
deleted file mode 100644
index dcc7023..0000000
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/PrometheusTelemetryConfigManager.java
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.openstacktelemetry.impl;
-
-import org.onlab.util.Tools;
-import org.onosproject.cfg.ComponentConfigService;
-import org.onosproject.openstacktelemetry.api.PrometheusTelemetryAdminService;
-import org.onosproject.openstacktelemetry.api.PrometheusTelemetryConfigService;
-import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
-import org.onosproject.openstacktelemetry.config.DefaultPrometheusTelemetryConfig;
-import org.osgi.service.component.ComponentContext;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Deactivate;
-import org.osgi.service.component.annotations.Modified;
-import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.component.annotations.ReferenceCardinality;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Dictionary;
-
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_PROMETHEUS_ENABLE_SERVICE_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_PROMETHEUS_EXPORTER_ADDRESS_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_PROMETHEUS_EXPORTER_PORT_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_PROMETHEUS_ENABLE_SERVICE;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_PROMETHEUS_EXPORTER_ADDRESS;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_PROMETHEUS_EXPORTER_PORT;
-import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.getBooleanProperty;
-import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.initTelemetryService;
-
-/**
- * Prometheus exporter configuration manager for publishing openstack telemetry.
- */
-@Component(
- immediate = true,
- service = PrometheusTelemetryConfigService.class,
- property = {
- PROP_PROMETHEUS_ENABLE_SERVICE + ":Boolean=" + PROP_PROMETHEUS_ENABLE_SERVICE_DEFAULT,
- PROP_PROMETHEUS_EXPORTER_ADDRESS + "=" + PROP_PROMETHEUS_EXPORTER_ADDRESS_DEFAULT,
- PROP_PROMETHEUS_EXPORTER_PORT + ":Integer=" + PROP_PROMETHEUS_EXPORTER_PORT_DEFAULT
- }
-)
-public class PrometheusTelemetryConfigManager implements PrometheusTelemetryConfigService {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected ComponentConfigService componentConfigService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected PrometheusTelemetryAdminService prometheusTelemetryAdminService;
-
- /** Default IP address of prometheus exporter. */
- protected String address = PROP_PROMETHEUS_EXPORTER_ADDRESS_DEFAULT;
-
- /** Default port number of prometheus exporter. */
- protected Integer port = PROP_PROMETHEUS_EXPORTER_PORT_DEFAULT;
-
- /** Specify the default behavior of telemetry service. */
- protected Boolean enableService = PROP_PROMETHEUS_ENABLE_SERVICE_DEFAULT;
-
- @Activate
- protected void activate(ComponentContext context) {
- componentConfigService.registerProperties(getClass());
- if (enableService) {
- prometheusTelemetryAdminService.start(getConfig());
- }
- log.info("Started");
- }
-
- @Deactivate
- protected void deactivate() {
- componentConfigService.unregisterProperties(getClass(), false);
- if (enableService) {
- prometheusTelemetryAdminService.stop();
- }
- log.info("Stopped");
- }
-
- @Modified
- private void modified(ComponentContext context) {
- readComponentConfiguration(context);
- initTelemetryService(prometheusTelemetryAdminService, getConfig(), enableService);
- log.info("Modified");
- }
-
- @Override
- public TelemetryConfig getConfig() {
- return new DefaultPrometheusTelemetryConfig.DefaultBuilder()
- .withAddress(address)
- .withPort(port)
- .build();
- }
-
- /**
- * Extracts properties from the component configuration context.
- *
- * @param context the component context
- */
- private void readComponentConfiguration(ComponentContext context) {
- Dictionary<?, ?> properties = context.getProperties();
-
- String addressStr = Tools.get(properties, PROP_PROMETHEUS_EXPORTER_ADDRESS);
- address = addressStr != null ? addressStr : PROP_PROMETHEUS_EXPORTER_ADDRESS_DEFAULT;
- log.info("Configured. Prometheus exporter address is {}", address);
-
- Integer portConfigured = Tools.getIntegerProperty(properties, PROP_PROMETHEUS_EXPORTER_PORT);
- if (portConfigured == null) {
- port = PROP_PROMETHEUS_EXPORTER_PORT_DEFAULT;
- log.info("Prometheus exporter port is NOT configured, default value is {}", port);
- } else {
- port = portConfigured;
- log.info("Configured. Prometheus exporter port is {}", port);
- }
-
- Boolean enableServiceConfigured = getBooleanProperty(properties, PROP_PROMETHEUS_ENABLE_SERVICE);
- if (enableServiceConfigured == null) {
- enableService = PROP_PROMETHEUS_ENABLE_SERVICE_DEFAULT;
- log.info("Prometheus service enable flag is NOT " +
- "configured, default value is {}", enableService);
- } else {
- enableService = enableServiceConfigured;
- log.info("Configured. Prometheus service enable flag is {}", enableService);
- }
- }
-}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/PrometheusTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/PrometheusTelemetryManager.java
index 69423ca..7111fc0 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/PrometheusTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/PrometheusTelemetryManager.java
@@ -15,13 +15,19 @@
*/
package org.onosproject.openstacktelemetry.impl;
+import com.google.common.collect.Sets;
+import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
+import io.prometheus.client.exporter.MetricsServlet;
+import org.eclipse.jetty.server.Server;
+import org.eclipse.jetty.servlet.ServletContextHandler;
+import org.eclipse.jetty.servlet.ServletHolder;
import org.onlab.packet.TpPort;
import org.onosproject.openstacktelemetry.api.FlowInfo;
import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
import org.onosproject.openstacktelemetry.api.PrometheusTelemetryAdminService;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigService;
import org.onosproject.openstacktelemetry.api.config.PrometheusTelemetryConfig;
-import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
@@ -30,15 +36,13 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import io.prometheus.client.Counter;
-import io.prometheus.client.exporter.MetricsServlet;
-import org.eclipse.jetty.server.Server;
-import org.eclipse.jetty.servlet.ServletContextHandler;
-import org.eclipse.jetty.servlet.ServletHolder;
-
import java.util.Arrays;
import java.util.Set;
+import static org.onosproject.openstacktelemetry.api.Constants.PROMETHEUS_SCHEME;
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.PROMETHEUS;
+import static org.onosproject.openstacktelemetry.config.DefaultPrometheusTelemetryConfig.fromTelemetryConfig;
+
/**
* Prometheus telemetry manager.
*/
@@ -47,7 +51,7 @@
private final Logger log = LoggerFactory.getLogger(getClass());
- private Server prometheusExporter;
+ private Set<Server> prometheusExporters = Sets.newConcurrentHashSet();
private static final String FLOW_TYPE = "flowType";
private static final String DEVICE_ID = "deviceId";
@@ -134,6 +138,9 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected OpenstackTelemetryService openstackTelemetryService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected TelemetryConfigService telemetryConfigService;
+
@Activate
protected void activate() {
openstackTelemetryService.addTelemetryService(this);
@@ -148,38 +155,51 @@
}
@Override
- public void start(TelemetryConfig config) {
+ public void start() {
log.info("Prometheus exporter starts.");
- PrometheusTelemetryConfig prometheusConfig = (PrometheusTelemetryConfig) config;
+ telemetryConfigService.getConfigsByType(PROMETHEUS).forEach(c -> {
+ PrometheusTelemetryConfig prometheusConfig = fromTelemetryConfig(c);
- try {
- prometheusExporter = new Server(prometheusConfig.port());
- ServletContextHandler context = new ServletContextHandler();
- context.setContextPath("/");
- prometheusExporter.setHandler(context);
- context.addServlet(new ServletHolder(new MetricsServlet()), "/metrics");
- log.info("Prometeus server start (Server port:{})", prometheusConfig.port());
- prometheusExporter.start();
- } catch (Exception ex) {
- log.warn("Exception: {}", ex.toString());
- }
+ if (prometheusConfig != null &&
+ !c.name().equals(PROMETHEUS_SCHEME) && c.enabled()) {
+ try {
+ // TODO Offer a 'Authentication'
+ Server prometheusExporter = new Server(prometheusConfig.port());
+ ServletContextHandler context = new ServletContextHandler();
+ context.setContextPath("/");
+ prometheusExporter.setHandler(context);
+ context.addServlet(new ServletHolder(new MetricsServlet()), "/metrics");
+
+ log.info("Prometheus server start");
+
+ prometheusExporter.start();
+
+ prometheusExporters.add(prometheusExporter);
+
+ } catch (Exception ex) {
+ log.warn("Exception: {}", ex);
+ }
+ }
+ });
}
@Override
public void stop() {
- try {
- prometheusExporter.stop();
- } catch (Exception ex) {
- log.warn("Exception: {}", ex.toString());
- }
+ prometheusExporters.forEach(pe -> {
+ try {
+ pe.stop();
+ } catch (Exception e) {
+ log.warn("Failed to stop prometheus server due to {}", e);
+ }
+ });
log.info("Prometheus exporter has stopped");
}
@Override
- public void restart(TelemetryConfig config) {
+ public void restart() {
stop();
- start(config);
+ start();
}
@Override
@@ -247,6 +267,6 @@
@Override
public boolean isRunning() {
- return prometheusExporter.isRunning();
+ return !prometheusExporters.isEmpty();
}
}
diff --git a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/PrometheusTelemetryConfigService.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/RestConfigsLoader.java
similarity index 65%
copy from apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/PrometheusTelemetryConfigService.java
copy to apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/RestConfigsLoader.java
index 4e115de..ffe6cbc 100644
--- a/apps/openstacktelemetry/api/src/main/java/org/onosproject/openstacktelemetry/api/PrometheusTelemetryConfigService.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/RestConfigsLoader.java
@@ -13,10 +13,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.onosproject.openstacktelemetry.api;
+package org.onosproject.openstacktelemetry.impl;
+
+import org.osgi.service.component.annotations.Component;
/**
- * Configuration service API for publishing openstack telemetry through Prometheus producer.
+ * Loader for REST telemetry configurations.
*/
-public interface PrometheusTelemetryConfigService extends TelemetryConfigService {
+@Component(immediate = true)
+public class RestConfigsLoader extends AbstractTelemetryConfigLoader {
+ public RestConfigsLoader() {
+ super("rest-configs.xml");
+ }
}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/RestTelemetryConfigManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/RestTelemetryConfigManager.java
deleted file mode 100644
index 16b62cb..0000000
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/RestTelemetryConfigManager.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.onosproject.openstacktelemetry.impl;
-
-import org.onlab.util.Tools;
-import org.onosproject.cfg.ComponentConfigService;
-import org.onosproject.openstacktelemetry.api.RestTelemetryAdminService;
-import org.onosproject.openstacktelemetry.api.RestTelemetryConfigService;
-import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
-import org.onosproject.openstacktelemetry.config.DefaultRestTelemetryConfig;
-import org.osgi.service.component.ComponentContext;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Deactivate;
-import org.osgi.service.component.annotations.Modified;
-import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.component.annotations.ReferenceCardinality;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Dictionary;
-
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_REST_SERVER_ADDRESS;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_REST_ENABLE_SERVICE;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_REST_ENABLE_SERVICE_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_REST_ENDPOINT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_REST_METHOD;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_REST_SERVER_PORT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_REST_REQUEST_MEDIA_TYPE;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_REST_RESPONSE_MEDIA_TYPE;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_REST_ENDPOINT_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_REST_METHOD_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_REST_REQUEST_MEDIA_TYPE_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_REST_RESPONSE_MEDIA_TYPE_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_REST_SERVER_ADDRESS_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_REST_SERVER_PORT_DEFAULT;
-import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.getBooleanProperty;
-import static org.onosproject.openstacktelemetry.util.OpenstackTelemetryUtil.initTelemetryService;
-
-/**
- * REST server configuration manager for publishing openstack telemetry.
- */
-@Component(
- immediate = true,
- service = RestTelemetryConfigService.class,
- property = {
- PROP_REST_ENABLE_SERVICE + ":Boolean=" + PROP_REST_ENABLE_SERVICE_DEFAULT,
- PROP_REST_SERVER_ADDRESS + "=" + PROP_REST_SERVER_ADDRESS_DEFAULT,
- PROP_REST_SERVER_PORT + ":Integer=" + PROP_REST_SERVER_PORT_DEFAULT,
- PROP_REST_ENDPOINT + "=" + PROP_REST_ENDPOINT_DEFAULT,
- PROP_REST_METHOD + "=" + PROP_REST_METHOD_DEFAULT,
- PROP_REST_REQUEST_MEDIA_TYPE + "=" + PROP_REST_REQUEST_MEDIA_TYPE_DEFAULT,
- PROP_REST_RESPONSE_MEDIA_TYPE + "=" + PROP_REST_RESPONSE_MEDIA_TYPE_DEFAULT
- }
-)
-public class RestTelemetryConfigManager implements RestTelemetryConfigService {
-
- private final Logger log = LoggerFactory.getLogger(getClass());
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected ComponentConfigService componentConfigService;
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected RestTelemetryAdminService restTelemetryAdminService;
-
- /** Default IP address to establish initial connection to REST server. */
- protected String address = PROP_REST_SERVER_ADDRESS_DEFAULT;
-
- /** Default port number to establish initial connection to REST server. */
- protected Integer port = PROP_REST_SERVER_PORT_DEFAULT;
-
- /** Endpoint of REST server. */
- protected String endpoint = PROP_REST_ENDPOINT_DEFAULT;
-
- /** HTTP method of REST server. */
- protected String method = PROP_REST_METHOD_DEFAULT;
-
- /** Request media type of REST server. */
- protected String requestMediaType = PROP_REST_REQUEST_MEDIA_TYPE_DEFAULT;
-
- /** Response media type of REST server. */
- protected String responseMediaType = PROP_REST_RESPONSE_MEDIA_TYPE_DEFAULT;
-
- /** Specify the default behavior of telemetry service. */
- protected Boolean enableService = PROP_REST_ENABLE_SERVICE_DEFAULT;
-
- @Activate
- protected void activate(ComponentContext context) {
- componentConfigService.registerProperties(getClass());
-
- if (enableService) {
- restTelemetryAdminService.start(getConfig());
- }
- log.info("Started");
- }
-
- @Deactivate
- protected void deactivate() {
- componentConfigService.unregisterProperties(getClass(), false);
-
- if (enableService) {
- restTelemetryAdminService.stop();
- }
- log.info("Stopped");
- }
-
- @Modified
- private void modified(ComponentContext context) {
- readComponentConfiguration(context);
- initTelemetryService(restTelemetryAdminService, getConfig(), enableService);
- log.info("Modified");
- }
-
- @Override
- public TelemetryConfig getConfig() {
- return new DefaultRestTelemetryConfig.DefaultBuilder()
- .withAddress(address)
- .withPort(port)
- .withEndpoint(endpoint)
- .withMethod(method)
- .withRequestMediaType(requestMediaType)
- .withResponseMediaType(responseMediaType)
- .build();
- }
-
- /**
- * Extracts properties from the component configuration context.
- *
- * @param context the component context
- */
- private void readComponentConfiguration(ComponentContext context) {
- Dictionary<?, ?> properties = context.getProperties();
-
- String addressStr = Tools.get(properties, PROP_REST_SERVER_ADDRESS);
- address = addressStr != null ? addressStr : PROP_REST_SERVER_ADDRESS_DEFAULT;
- log.info("Configured. REST server address is {}", address);
-
- Integer portConfigured = Tools.getIntegerProperty(properties, PROP_REST_SERVER_PORT);
- if (portConfigured == null) {
- port = PROP_REST_SERVER_PORT_DEFAULT;
- log.info("REST server port is NOT configured, default value is {}", port);
- } else {
- port = portConfigured;
- log.info("Configured. REST server port is {}", port);
- }
-
- String endpointStr = Tools.get(properties, PROP_REST_ENDPOINT);
- endpoint = endpointStr != null ? endpointStr : PROP_REST_ENDPOINT_DEFAULT;
- log.info("Configured. REST server endpoint is {}", endpoint);
-
- String methodStr = Tools.get(properties, PROP_REST_METHOD);
- method = methodStr != null ? methodStr : PROP_REST_METHOD_DEFAULT;
- log.info("Configured. REST server default HTTP method is {}", method);
-
- String requestMediaTypeStr = Tools.get(properties, PROP_REST_REQUEST_MEDIA_TYPE);
- requestMediaType = requestMediaTypeStr != null ?
- requestMediaTypeStr : PROP_REST_REQUEST_MEDIA_TYPE_DEFAULT;
- log.info("Configured. REST server request media type is {}", requestMediaType);
-
- String responseMediaTypeStr = Tools.get(properties, PROP_REST_RESPONSE_MEDIA_TYPE);
- responseMediaType = responseMediaTypeStr != null ?
- responseMediaTypeStr : PROP_REST_RESPONSE_MEDIA_TYPE_DEFAULT;
- log.info("Configured. REST server response media type is {}", responseMediaType);
-
- Boolean enableServiceConfigured =
- getBooleanProperty(properties, PROP_REST_ENABLE_SERVICE);
- if (enableServiceConfigured == null) {
- enableService = PROP_REST_ENABLE_SERVICE_DEFAULT;
- log.info("REST service enable flag is NOT " +
- "configured, default value is {}", enableService);
- } else {
- enableService = enableServiceConfigured;
- log.info("Configured. REST service enable flag is {}", enableService);
- }
- }
-}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/RestTelemetryManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/RestTelemetryManager.java
index 9b4aaf9..48cef34 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/RestTelemetryManager.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/RestTelemetryManager.java
@@ -15,8 +15,11 @@
*/
package org.onosproject.openstacktelemetry.impl;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
import org.onosproject.openstacktelemetry.api.RestTelemetryAdminService;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigService;
import org.onosproject.openstacktelemetry.api.config.RestTelemetryConfig;
import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
import org.osgi.service.component.annotations.Activate;
@@ -32,6 +35,12 @@
import javax.ws.rs.client.Entity;
import javax.ws.rs.client.WebTarget;
import javax.ws.rs.core.Response;
+import java.util.Map;
+import java.util.Set;
+
+import static org.onosproject.openstacktelemetry.api.Constants.REST_SCHEME;
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.REST;
+import static org.onosproject.openstacktelemetry.config.DefaultRestTelemetryConfig.fromTelemetryConfig;
/**
* REST telemetry manager.
@@ -48,8 +57,10 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected OpenstackTelemetryService openstackTelemetryService;
- private WebTarget target = null;
- private RestTelemetryConfig restConfig = null;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected TelemetryConfigService telemetryConfigService;
+
+ private Map<String, WebTarget> targets = Maps.newConcurrentMap();
@Activate
protected void activate() {
@@ -69,85 +80,71 @@
}
@Override
- public void start(TelemetryConfig config) {
- if (target != null) {
- log.info("REST producer has already been started");
- return;
- }
+ public void start() {
- restConfig = (RestTelemetryConfig) config;
+ telemetryConfigService.getConfigsByType(REST).forEach(c -> {
+ RestTelemetryConfig restConfig = fromTelemetryConfig(c);
- StringBuilder restServerBuilder = new StringBuilder();
- restServerBuilder.append(PROTOCOL);
- restServerBuilder.append(":");
- restServerBuilder.append("//");
- restServerBuilder.append(restConfig.address());
- restServerBuilder.append(":");
- restServerBuilder.append(restConfig.port());
- restServerBuilder.append("/");
+ if (restConfig != null && !c.name().equals(REST_SCHEME) && c.enabled()) {
+ StringBuilder restServerBuilder = new StringBuilder();
+ restServerBuilder.append(PROTOCOL);
+ restServerBuilder.append(":");
+ restServerBuilder.append("//");
+ restServerBuilder.append(restConfig.address());
+ restServerBuilder.append(":");
+ restServerBuilder.append(restConfig.port());
+ restServerBuilder.append("/");
- Client client = ClientBuilder.newBuilder().build();
+ Client client = ClientBuilder.newBuilder().build();
- target = client.target(restServerBuilder.toString()).path(restConfig.endpoint());
+ WebTarget target = client.target(restServerBuilder.toString()).path(restConfig.endpoint());
+
+ targets.put(c.name(), target);
+ }
+ });
log.info("REST producer has Started");
}
@Override
public void stop() {
- if (target != null) {
- target = null;
- }
-
+ targets.values().forEach(t -> t = null);
log.info("REST producer has Stopped");
}
@Override
- public void restart(TelemetryConfig config) {
+ public void restart() {
stop();
- start(config);
+ start();
}
@Override
- public Response publish(String endpoint, String method, String record) {
- // TODO: need to find a way to invoke REST endpoint using target
- return null;
- }
+ public Set<Response> publish(String record) {
- @Override
- public Response publish(String method, String record) {
- switch (method) {
- case POST_METHOD:
- return target.request(restConfig.requestMediaType())
- .post(Entity.json(record));
- case GET_METHOD:
- return target.request(restConfig.requestMediaType()).get();
- default:
- return null;
- }
- }
+ Set<Response> responses = Sets.newConcurrentHashSet();
- @Override
- public Response publish(String record) {
+ targets.forEach((k, v) -> {
+ TelemetryConfig config = telemetryConfigService.getConfig(k);
+ RestTelemetryConfig restConfig = fromTelemetryConfig(config);
- if (target == null) {
- log.debug("REST telemetry service has not been enabled!");
- return null;
- }
+ switch (restConfig.method()) {
+ case POST_METHOD:
+ responses.add(v.request(restConfig.requestMediaType())
+ .post(Entity.json(record)));
+ break;
+ case GET_METHOD:
+ responses.add(v.request(restConfig.requestMediaType()).get());
+ break;
+ default:
+ break;
+ }
+ });
- switch (restConfig.method()) {
- case POST_METHOD:
- return target.request(restConfig.requestMediaType())
- .post(Entity.json(record));
- case GET_METHOD:
- return target.request(restConfig.requestMediaType()).get();
- default:
- return null;
- }
+ return responses;
}
@Override
public boolean isRunning() {
- return target != null;
+ return !targets.isEmpty();
}
}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/TelemetryConfigManager.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/TelemetryConfigManager.java
new file mode 100644
index 0000000..3ae2504
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/TelemetryConfigManager.java
@@ -0,0 +1,206 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.impl;
+
+import com.google.common.collect.ImmutableSet;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.event.ListenerRegistry;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigAdminService;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigEvent;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigListener;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigProvider;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigService;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigStore;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigStoreDelegate;
+import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
+import org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Objects;
+import java.util.Set;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.onlab.util.Tools.nullIsNotFound;
+import static org.onosproject.openstacktelemetry.api.Constants.OPENSTACK_TELEMETRY_APP_ID;
+
+/**
+ * Provides implementation of administering and interfacing telemetry configs.
+ * It also provides telemetry config events for the various exporters or connectors.
+ */
+@Component(
+ immediate = true,
+ service = {
+ TelemetryConfigService.class, TelemetryConfigAdminService.class
+ }
+)
+public class TelemetryConfigManager
+ extends ListenerRegistry<TelemetryConfigEvent, TelemetryConfigListener>
+ implements TelemetryConfigService, TelemetryConfigAdminService {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+
+ private static final String MSG_TELEMETRY_CONFIG = "Telemetry config %s %s";
+ private static final String MSG_CREATED = "created";
+ private static final String MSG_UPDATED = "updated";
+ private static final String MSG_REMOVED = "removed";
+
+ private static final String ERR_NULL_CONFIG = "Telemetry config cannot be null";
+ private static final String NO_CONFIG = "Telemetry config not found";
+
+ private static final KryoNamespace SERIALIZER_TELEMETRY_CONFIG =
+ KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(TelemetryConfigProvider.class)
+ .register(DefaultTelemetryConfigProvider.class)
+ .register(TelemetryConfig.class)
+ .register(TelemetryConfig.ConfigType.class)
+ .register(DefaultTelemetryConfig.class)
+ .build();
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected StorageService storageService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected LeadershipService leadershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected TelemetryConfigStore telemetryConfigStore;
+
+ private final TelemetryConfigStoreDelegate
+ delegate = new InternalTelemetryConfigStoreDelegate();
+
+ private ConsistentMap<String, TelemetryConfigProvider> providerMap;
+
+ private ApplicationId appId;
+ private NodeId localNodeId;
+
+ @Activate
+ protected void activate() {
+ appId = coreService.registerApplication(OPENSTACK_TELEMETRY_APP_ID);
+ localNodeId = clusterService.getLocalNode().id();
+ telemetryConfigStore.setDelegate(delegate);
+ leadershipService.runForLeadership(appId.name());
+
+ providerMap = storageService.<String, TelemetryConfigProvider>consistentMapBuilder()
+ .withSerializer(Serializer.using(SERIALIZER_TELEMETRY_CONFIG))
+ .withName("openstack-telemetry-config-provider")
+ .withApplicationId(appId)
+ .build();
+
+ log.info("Started");
+ }
+
+ @Deactivate
+ protected void deactivate() {
+ telemetryConfigStore.unsetDelegate(delegate);
+ leadershipService.withdraw(appId.name());
+ providerMap.clear();
+
+ log.info("Stopped");
+ }
+
+ @Override
+ public Set<TelemetryConfigProvider> getProviders() {
+ ImmutableSet.Builder<TelemetryConfigProvider> builder = ImmutableSet.builder();
+ providerMap.asJavaMap().values().forEach(builder::add);
+ return builder.build();
+ }
+
+ @Override
+ public void registerProvider(TelemetryConfigProvider provider) {
+ if (isLeader()) {
+ StringBuilder nameBuilder = new StringBuilder();
+ provider.getTelemetryConfigs().forEach(config -> {
+ nameBuilder.append(config.name());
+ telemetryConfigStore.createTelemetryConfig(config);
+ log.info(String.format(MSG_TELEMETRY_CONFIG, config.name(), MSG_CREATED));
+ });
+ providerMap.put(nameBuilder.toString(), provider);
+ }
+ }
+
+ @Override
+ public void unregisterProvider(TelemetryConfigProvider provider) {
+ if (isLeader()) {
+ StringBuilder nameBuilder = new StringBuilder();
+ provider.getTelemetryConfigs().forEach(config -> {
+ nameBuilder.append(config.name());
+ telemetryConfigStore.removeTelemetryConfig(config.name());
+ log.info(String.format(MSG_TELEMETRY_CONFIG, config.name(), MSG_REMOVED));
+ });
+ providerMap.remove(nameBuilder.toString());
+ }
+ }
+
+ @Override
+ public void updateTelemetryConfig(TelemetryConfig config) {
+ checkNotNull(config, ERR_NULL_CONFIG);
+
+ telemetryConfigStore.updateTelemetryConfig(config);
+ log.info(String.format(MSG_TELEMETRY_CONFIG, config.name(), MSG_UPDATED));
+ }
+
+ @Override
+ public TelemetryConfig getConfig(String name) {
+ return nullIsNotFound(telemetryConfigStore.telemetryConfig(name), NO_CONFIG);
+ }
+
+ @Override
+ public Set<TelemetryConfig> getConfigsByType(ConfigType type) {
+ return ImmutableSet.copyOf(telemetryConfigStore.telemetryConfigsByType(type));
+ }
+
+ @Override
+ public Set<TelemetryConfig> getConfigs() {
+ return ImmutableSet.copyOf(telemetryConfigStore.telemetryConfigs());
+ }
+
+ private boolean isLeader() {
+ return Objects.equals(localNodeId, leadershipService.getLeader(appId.name()));
+ }
+
+ private class InternalTelemetryConfigStoreDelegate implements TelemetryConfigStoreDelegate {
+
+ @Override
+ public void notify(TelemetryConfigEvent event) {
+ if (event != null) {
+ log.trace("send telemetry config event {}", event);
+ process(event);
+ }
+ }
+ }
+}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/XmlTelemetryConfigLoader.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/XmlTelemetryConfigLoader.java
new file mode 100644
index 0000000..39a5df0
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/impl/XmlTelemetryConfigLoader.java
@@ -0,0 +1,219 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.impl;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.HierarchicalConfiguration;
+import org.apache.commons.configuration.XMLConfiguration;
+import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.GRPC;
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.INFLUXDB;
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.KAFKA;
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.PROMETHEUS;
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.REST;
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.UNKNOWN;
+
+/**
+ * Utility capable of reading telemetry configuration XML resources and producing
+ * a telemetry config as a result.
+ * <p>
+ * The telemetry configurations stream structure is as follows:
+ * </p>
+ * <pre>
+ * <configs>
+ * <config name=“...” [manufacturer="..." swVersion="..."]>
+ * [<property name=“key”>value</key>]
+ * ...
+ * </config>
+ * ...
+ * </configs>
+ * </pre>
+ */
+public class XmlTelemetryConfigLoader {
+
+ private static final String CONFIGS = "configs";
+ private static final String CONFIG = "config";
+
+ private static final String PROPERTY = "property";
+
+ private static final String TRUE = "true";
+
+ private static final String NAME = "[@name]";
+ private static final String TYPE = "[@type]";
+ private static final String EXTENDS = "[@extends]";
+ private static final String MFG = "[@manufacturer]";
+ private static final String SW = "[@swVersion]";
+ private static final String ENABLED = "[@enabled]";
+
+ private Map<String, TelemetryConfig> configs = Maps.newHashMap();
+
+ /**
+ * Creates a new config loader capable of loading configs from the supplied
+ * class loader.
+ */
+ public XmlTelemetryConfigLoader() {
+ }
+
+ /**
+ * Loads the specified telemetry configs resource as an XML stream and parses
+ * it to produce a ready-to-register config provider.
+ *
+ * @param configsStream stream containing the telemetry configs definition
+ * @return telemetry configuration provider
+ * @throws IOException if issues are encountered reading the stream
+ * or parsing the telemetry config definition within
+ */
+ public DefaultTelemetryConfigProvider
+ loadTelemetryConfigs(InputStream configsStream) throws IOException {
+ try {
+ XMLConfiguration cfg = new XMLConfiguration();
+ cfg.setRootElementName(CONFIGS);
+ cfg.setAttributeSplittingDisabled(true);
+
+ cfg.load(configsStream);
+ return loadTelemetryConfigs(cfg);
+ } catch (ConfigurationException e) {
+ throw new IOException("Unable to load telemetry configs", e);
+ }
+ }
+
+ /**
+ * Loads a telemetry config provider from the supplied hierarchical configuration.
+ *
+ * @param telemetryConfig hierarchical configuration containing the configs definition
+ * @return telemetry configuration provider
+ */
+ public DefaultTelemetryConfigProvider
+ loadTelemetryConfigs(HierarchicalConfiguration telemetryConfig) {
+ DefaultTelemetryConfigProvider provider = new DefaultTelemetryConfigProvider();
+ for (HierarchicalConfiguration cfg : telemetryConfig.configurationsAt(CONFIG)) {
+ DefaultTelemetryConfig config = loadTelemetryConfig(cfg);
+ configs.put(config.name(), config);
+ provider.addConfig(config);
+ }
+ configs.clear();
+ return provider;
+ }
+
+ /**
+ * Loads a telemetry configuration from the supplied hierarchical configuration.
+ *
+ * @param telemetryCfg hierarchical configuration containing the telemetry config definition
+ * @return telemetry configuration
+ */
+ public DefaultTelemetryConfig loadTelemetryConfig(HierarchicalConfiguration telemetryCfg) {
+ String name = telemetryCfg.getString(NAME);
+ String parentsString = telemetryCfg.getString(EXTENDS, "");
+ List<TelemetryConfig> parents = Lists.newArrayList();
+
+ if (!"".equals(parentsString)) {
+ List<String> parentsNames;
+ if (parentsString.contains(",")) {
+ parentsNames = Arrays.asList(
+ parentsString.replace(" ", "").split(","));
+ } else {
+ parentsNames = Lists.newArrayList(parentsString);
+ }
+ parents = parentsNames.stream().map(parent -> (parent != null) ?
+ configs.get(parent) : null).collect(Collectors.toList());
+ }
+
+ String typeStr = telemetryCfg.getString(TYPE, getParentAttribute(parents, TYPE));
+ String manufacturer = telemetryCfg.getString(MFG, getParentAttribute(parents, MFG));
+ String swVersion = telemetryCfg.getString(SW, getParentAttribute(parents, SW));
+
+ // note that we do not inherits enabled property from parent
+ String enabledStr = telemetryCfg.getString(ENABLED);
+
+ boolean enabled = enabledStr != null && enabledStr.equalsIgnoreCase(TRUE);
+
+ TelemetryConfig.ConfigType type = type(typeStr);
+
+ if (type == null) {
+ return null;
+ }
+
+ return new DefaultTelemetryConfig(name, type, parents, manufacturer,
+ swVersion, enabled, parseProperties(parents, telemetryCfg));
+ }
+
+ private TelemetryConfig.ConfigType type(String typeStr) {
+ switch (typeStr.toUpperCase()) {
+ case "GRPC" :
+ return GRPC;
+ case "KAFKA":
+ return KAFKA;
+ case "REST":
+ return REST;
+ case "INFLUXDB":
+ return INFLUXDB;
+ case "PROMETHEUS":
+ return PROMETHEUS;
+ case "UNKNOWN":
+ default:
+ return UNKNOWN;
+ }
+ }
+
+ // Returns the specified property from the highest priority parent
+ private String getParentAttribute(List<TelemetryConfig> parents, String attribute) {
+ if (!parents.isEmpty()) {
+ TelemetryConfig parent = parents.get(0);
+ switch (attribute) {
+ case TYPE:
+ return parent.type().name().toLowerCase();
+ case MFG:
+ return parent.manufacturer();
+ case SW:
+ return parent.swVersion();
+ default:
+ throw new IllegalArgumentException("Unsupported attribute");
+ }
+ }
+ return "";
+ }
+
+ // Parses the properties section.
+ private Map<String, String> parseProperties(List<TelemetryConfig> parents,
+ HierarchicalConfiguration config) {
+ ImmutableMap.Builder<String, String> properties = ImmutableMap.builder();
+
+ // note that, we only allow the inheritance from single source
+ Map<String, String> parentConfigs = Maps.newHashMap();
+ if (!parents.isEmpty()) {
+ TelemetryConfig parent = parents.get(0);
+ parentConfigs = parent.properties();
+ }
+
+ properties.putAll(parentConfigs);
+
+ for (HierarchicalConfiguration b : config.configurationsAt(PROPERTY)) {
+ properties.put(b.getString(NAME), (String) b.getRootNode().getValue());
+ }
+ return properties.build();
+ }
+}
diff --git a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/util/OpenstackTelemetryUtil.java b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/util/OpenstackTelemetryUtil.java
index 860da11..39b1fdb 100644
--- a/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/util/OpenstackTelemetryUtil.java
+++ b/apps/openstacktelemetry/app/src/main/java/org/onosproject/openstacktelemetry/util/OpenstackTelemetryUtil.java
@@ -18,8 +18,6 @@
import com.google.common.base.Strings;
import org.onlab.packet.IPv4;
import org.onosproject.cfg.ConfigProperty;
-import org.onosproject.openstacktelemetry.api.TelemetryAdminService;
-import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
import java.util.Dictionary;
import java.util.Optional;
@@ -112,27 +110,4 @@
return PROTOCOL_NAME_ANY;
}
}
-
- /**
- * Initializes the telemetry service due tue configuration changes.
- *
- *
- * @param adminService telemetry admin service
- * @param config telemetry configuration
- * @param enable service enable flag
- */
- public static void initTelemetryService(TelemetryAdminService adminService,
- TelemetryConfig config, boolean enable) {
- if (enable) {
- if (adminService.isRunning()) {
- adminService.restart(config);
- } else {
- adminService.start(config);
- }
- } else {
- if (adminService.isRunning()) {
- adminService.stop();
- }
- }
- }
}
diff --git a/apps/openstacktelemetry/app/src/main/resources/org/onosproject/openstacktelemetry/impl/grpc-configs.xml b/apps/openstacktelemetry/app/src/main/resources/org/onosproject/openstacktelemetry/impl/grpc-configs.xml
new file mode 100644
index 0000000..681dfc7
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/main/resources/org/onosproject/openstacktelemetry/impl/grpc-configs.xml
@@ -0,0 +1,24 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright 2018-present Open Networking Foundation
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<configs>
+ <config name="grpc" type="grpc" manufacturer="grpc.io" swVersion="master">
+ <property name="address">127.0.0.1</property>
+ <property name="port">50051</property>
+ <property name="usePlaintext">true</property>
+ <property name="maxInboundMsgSize">4194304</property>
+ </config>
+</configs>
\ No newline at end of file
diff --git a/apps/openstacktelemetry/app/src/main/resources/org/onosproject/openstacktelemetry/impl/influxdb-configs.xml b/apps/openstacktelemetry/app/src/main/resources/org/onosproject/openstacktelemetry/impl/influxdb-configs.xml
new file mode 100644
index 0000000..d702a44
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/main/resources/org/onosproject/openstacktelemetry/impl/influxdb-configs.xml
@@ -0,0 +1,30 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright 2018-present Open Networking Foundation
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<configs>
+ <config name="influxdb" type="influxdb" manufacturer="influxdata.com" swVersion="master">
+ <property name="address">127.0.0.1</property>
+ <property name="port">8086</property>
+ <property name="username">onos</property>
+ <property name="password">onos</property>
+ </config>
+ <config name="sona-influxdb-connector" manufacturer="SK Telecom"
+ swVersion="1.0" extends="influxdb" enabled="false">
+ <property name="database">ost</property>
+ <property name="measurement">sonaflow</property>
+ <property name="enableBatch">true</property>
+ </config>
+</configs>
\ No newline at end of file
diff --git a/apps/openstacktelemetry/app/src/main/resources/org/onosproject/openstacktelemetry/impl/kafka-configs.xml b/apps/openstacktelemetry/app/src/main/resources/org/onosproject/openstacktelemetry/impl/kafka-configs.xml
new file mode 100644
index 0000000..7429ac2
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/main/resources/org/onosproject/openstacktelemetry/impl/kafka-configs.xml
@@ -0,0 +1,39 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright 2018-present Open Networking Foundation
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<configs>
+ <config name="kafka" type="kafka" manufacturer="kafka.apache.org" swVersion="master">
+ <property name="address">127.0.0.1</property>
+ <property name="port">9092</property>
+ <property name="retries">0</property>
+ </config>
+ <config name="tina-kafka-exporter" manufacturer="SK Telecom"
+ swVersion="1.0" extends="kafka" enabled="false">
+ <property name="batchSize">16384</property>
+ <property name="lingerMs">1</property>
+ <property name="memoryBuffer">33554432</property>
+ <property name="requiredAcks">all</property>
+ <property name="keySerializer">
+ org.apache.kafka.common.serialization.StringSerializer
+ </property>
+ <property name="valueSerializer">
+ org.apache.kafka.common.serialization.ByteArraySerializer
+ </property>
+ <property name="topic">sona.flow</property>
+ <property name="key">flowdata</property>
+ <property name="codec">TinaMessageByteBufferCodec</property>
+ </config>
+</configs>
\ No newline at end of file
diff --git a/apps/openstacktelemetry/app/src/main/resources/org/onosproject/openstacktelemetry/impl/prometheus-configs.xml b/apps/openstacktelemetry/app/src/main/resources/org/onosproject/openstacktelemetry/impl/prometheus-configs.xml
new file mode 100644
index 0000000..0459f6a
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/main/resources/org/onosproject/openstacktelemetry/impl/prometheus-configs.xml
@@ -0,0 +1,26 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright 2018-present Open Networking Foundation
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<configs>
+ <config name="prometheus" type="prometheus" manufacturer="prometheus.io" swVersion="master">
+ <property name="address">127.0.0.1</property>
+ <property name="port">50051</property>
+ </config>
+
+ <config name="sona-prometheus-exporter" manufacturer="SK Telecom"
+ swVersion="1.0" extends="prometheus" enabled="false">
+ </config>
+</configs>
\ No newline at end of file
diff --git a/apps/openstacktelemetry/app/src/main/resources/org/onosproject/openstacktelemetry/impl/rest-configs.xml b/apps/openstacktelemetry/app/src/main/resources/org/onosproject/openstacktelemetry/impl/rest-configs.xml
new file mode 100644
index 0000000..ca1dc57
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/main/resources/org/onosproject/openstacktelemetry/impl/rest-configs.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright 2018-present Open Networking Foundation
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<configs>
+ <config name="rest" type="rest" manufacturer="N/A" swVersion="master">
+ <property name="address">127.0.0.1</property>
+ <property name="port">80</property>
+ <property name="requestMediaType">application/json</property>
+ <property name="responseMediaType">application/json</property>
+ </config>
+ <config name="rest-connector" swVersion="1.0" extends="rest" enabled="false">
+ <property name="endpoint">telemetry</property>
+ <property name="method">POST</property>
+ </config>
+</configs>
\ No newline at end of file
diff --git a/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/config/DefaultGrpcTelemetryConfigTest.java b/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/config/DefaultGrpcTelemetryConfigTest.java
index 1a7f752..c56247f 100644
--- a/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/config/DefaultGrpcTelemetryConfigTest.java
+++ b/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/config/DefaultGrpcTelemetryConfigTest.java
@@ -15,17 +15,27 @@
*/
package org.onosproject.openstacktelemetry.config;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
import com.google.common.testing.EqualsTester;
import org.junit.Before;
import org.junit.Test;
import org.onosproject.openstacktelemetry.api.config.GrpcTelemetryConfig;
+import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
+import org.onosproject.openstacktelemetry.impl.DefaultTelemetryConfig;
import java.util.Map;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.onlab.junit.ImmutableClassChecker.assertThatClassIsImmutable;
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.GRPC;
+import static org.onosproject.openstacktelemetry.config.DefaultGrpcTelemetryConfig.ADDRESS;
+import static org.onosproject.openstacktelemetry.config.DefaultGrpcTelemetryConfig.MAX_INBOUND_MSG_SIZE;
+import static org.onosproject.openstacktelemetry.config.DefaultGrpcTelemetryConfig.PORT;
+import static org.onosproject.openstacktelemetry.config.DefaultGrpcTelemetryConfig.USE_PLAINTEXT;
+import static org.onosproject.openstacktelemetry.config.DefaultGrpcTelemetryConfig.fromTelemetryConfig;
/**
* Unit tests for DefaultGrpcTelemetryConfig class.
@@ -45,9 +55,11 @@
private static final boolean USE_PLAIN_TEXT_2 = true;
private static final Map<String, Object> CONFIG_MAP_1 =
- ImmutableMap.of("key1", "value1");
+ ImmutableMap.of("key1", "value1");
private static final Map<String, Object> CONFIG_MAP_2 =
- ImmutableMap.of("key2", "value2");
+ ImmutableMap.of("key2", "value2");
+
+ private static final String DUMMY = "dummy";
private GrpcTelemetryConfig config1;
private GrpcTelemetryConfig sameAsConfig1;
@@ -122,4 +134,24 @@
assertThat(config.usePlaintext(), is(USE_PLAIN_TEXT_1));
assertThat(config.configMap(), is(CONFIG_MAP_1));
}
-}
+
+ /**
+ * Tests props extraction.
+ */
+ @Test
+ public void testPropsExtraction() {
+ Map<String, String> props = Maps.newConcurrentMap();
+ props.put(ADDRESS, IP_ADDRESS_1);
+ props.put(PORT, String.valueOf(PORT_1));
+ props.put(MAX_INBOUND_MSG_SIZE, String.valueOf(MSG_SIZE_1));
+ props.put(USE_PLAINTEXT, String.valueOf(USE_PLAIN_TEXT_1));
+ TelemetryConfig config = new DefaultTelemetryConfig(DUMMY, GRPC,
+ ImmutableList.of(), DUMMY, DUMMY, false, props);
+
+ GrpcTelemetryConfig grpcConfig = fromTelemetryConfig(config);
+ assertThat(grpcConfig.address(), is(IP_ADDRESS_1));
+ assertThat(grpcConfig.port(), is(PORT_1));
+ assertThat(grpcConfig.maxInboundMsgSize(), is(MSG_SIZE_1));
+ assertThat(grpcConfig.usePlaintext(), is(USE_PLAIN_TEXT_1));
+ }
+}
\ No newline at end of file
diff --git a/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/config/DefaultInfluxDbTelemetryConfigTest.java b/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/config/DefaultInfluxDbTelemetryConfigTest.java
index 2357fec..cd4a973 100644
--- a/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/config/DefaultInfluxDbTelemetryConfigTest.java
+++ b/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/config/DefaultInfluxDbTelemetryConfigTest.java
@@ -15,17 +15,30 @@
*/
package org.onosproject.openstacktelemetry.config;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
import com.google.common.testing.EqualsTester;
import org.junit.Before;
import org.junit.Test;
import org.onosproject.openstacktelemetry.api.config.InfluxDbTelemetryConfig;
+import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
+import org.onosproject.openstacktelemetry.impl.DefaultTelemetryConfig;
import java.util.Map;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.onlab.junit.ImmutableClassChecker.assertThatClassIsImmutable;
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.INFLUXDB;
+import static org.onosproject.openstacktelemetry.config.DefaultInfluxDbTelemetryConfig.ADDRESS;
+import static org.onosproject.openstacktelemetry.config.DefaultInfluxDbTelemetryConfig.DATABASE;
+import static org.onosproject.openstacktelemetry.config.DefaultInfluxDbTelemetryConfig.ENABLE_BATCH;
+import static org.onosproject.openstacktelemetry.config.DefaultInfluxDbTelemetryConfig.MEASUREMENT;
+import static org.onosproject.openstacktelemetry.config.DefaultInfluxDbTelemetryConfig.PASSWORD;
+import static org.onosproject.openstacktelemetry.config.DefaultInfluxDbTelemetryConfig.PORT;
+import static org.onosproject.openstacktelemetry.config.DefaultInfluxDbTelemetryConfig.USERNAME;
+import static org.onosproject.openstacktelemetry.config.DefaultInfluxDbTelemetryConfig.fromTelemetryConfig;
/**
* Unit tests for DefaultInfluxDbTelemetryConfig class.
@@ -58,6 +71,8 @@
private static final Map<String, Object> CONFIG_MAP_2 =
ImmutableMap.of("key2", "value2");
+ private static final String DUMMY = "dummy";
+
private InfluxDbTelemetryConfig config1;
private InfluxDbTelemetryConfig sameAsConfig1;
private InfluxDbTelemetryConfig config2;
@@ -143,4 +158,31 @@
assertThat(config.enableBatch(), is(ENABLE_BATCH_1));
assertThat(config.configMap(), is(CONFIG_MAP_1));
}
-}
+
+ /**
+ * Tests props extraction.
+ */
+ @Test
+ public void testPropsExtraction() {
+ Map<String, String> props = Maps.newConcurrentMap();
+ props.put(ADDRESS, IP_ADDRESS_1);
+ props.put(PORT, String.valueOf(PORT_1));
+ props.put(USERNAME, USERNAME_1);
+ props.put(PASSWORD, PASSWORD_1);
+ props.put(DATABASE, DATABASE_1);
+ props.put(ENABLE_BATCH, String.valueOf(ENABLE_BATCH_1));
+ props.put(MEASUREMENT, MEASUREMENT_1);
+
+ TelemetryConfig config = new DefaultTelemetryConfig(DUMMY, INFLUXDB,
+ ImmutableList.of(), DUMMY, DUMMY, false, props);
+
+ InfluxDbTelemetryConfig influxDbConfig = fromTelemetryConfig(config);
+ assertThat(influxDbConfig.address(), is(IP_ADDRESS_1));
+ assertThat(influxDbConfig.port(), is(PORT_1));
+ assertThat(influxDbConfig.database(), is(DATABASE_1));
+ assertThat(influxDbConfig.measurement(), is(MEASUREMENT_1));
+ assertThat(influxDbConfig.username(), is(USERNAME_1));
+ assertThat(influxDbConfig.password(), is(PASSWORD_1));
+ assertThat(influxDbConfig.enableBatch(), is(ENABLE_BATCH_1));
+ }
+}
\ No newline at end of file
diff --git a/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/config/DefaultKafkaTelemetryConfigTest.java b/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/config/DefaultKafkaTelemetryConfigTest.java
index d3958fe..ba9b34b 100644
--- a/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/config/DefaultKafkaTelemetryConfigTest.java
+++ b/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/config/DefaultKafkaTelemetryConfigTest.java
@@ -15,17 +15,35 @@
*/
package org.onosproject.openstacktelemetry.config;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
import com.google.common.testing.EqualsTester;
import org.junit.Before;
import org.junit.Test;
import org.onosproject.openstacktelemetry.api.config.KafkaTelemetryConfig;
+import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
+import org.onosproject.openstacktelemetry.impl.DefaultTelemetryConfig;
import java.util.Map;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.onlab.junit.ImmutableClassChecker.assertThatClassIsImmutable;
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.KAFKA;
+import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.ADDRESS;
+import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.BATCH_SIZE;
+import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.CODEC;
+import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.KEY;
+import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.KEY_SERIALIZER;
+import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.LINGER_MS;
+import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.MEMORY_BUFFER;
+import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.PORT;
+import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.REQUIRED_ACKS;
+import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.RETRIES;
+import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.TOPIC;
+import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.VALUE_SERIALIZER;
+import static org.onosproject.openstacktelemetry.config.DefaultKafkaTelemetryConfig.fromTelemetryConfig;
public final class DefaultKafkaTelemetryConfigTest {
@@ -55,11 +73,22 @@
private static final String VALUE_SERIALIZER_1 = "valueserializer1";
private static final String VALUE_SERIALIZER_2 = "valueserializer2";
+ private static final String KEY_1 = "key1";
+ private static final String KEY_2 = "key2";
+
+ private static final String TOPIC_1 = "topic1";
+ private static final String TOPIC_2 = "topic2";
+
+ private static final String CODEC_1 = "codec1";
+ private static final String CODEC_2 = "codec2";
+
private static final Map<String, Object> CONFIG_MAP_1 =
ImmutableMap.of("key1", "value1");
private static final Map<String, Object> CONFIG_MAP_2 =
ImmutableMap.of("key2", "value2");
+ private static final String DUMMY = "dummy";
+
private KafkaTelemetryConfig config1;
private KafkaTelemetryConfig sameAsConfig1;
private KafkaTelemetryConfig config2;
@@ -87,6 +116,9 @@
.withLingerMs(LINGER_MS_1)
.withKeySerializer(KEY_SERIALIZER_1)
.withValueSerializer(VALUE_SERIALIZER_1)
+ .withKey(KEY_1)
+ .withTopic(TOPIC_1)
+ .withCodec(CODEC_1)
.withConfigMap(CONFIG_MAP_1)
.build();
@@ -100,6 +132,9 @@
.withLingerMs(LINGER_MS_1)
.withKeySerializer(KEY_SERIALIZER_1)
.withValueSerializer(VALUE_SERIALIZER_1)
+ .withKey(KEY_1)
+ .withTopic(TOPIC_1)
+ .withCodec(CODEC_1)
.withConfigMap(CONFIG_MAP_1)
.build();
@@ -113,6 +148,9 @@
.withLingerMs(LINGER_MS_2)
.withKeySerializer(KEY_SERIALIZER_2)
.withValueSerializer(VALUE_SERIALIZER_2)
+ .withKey(KEY_2)
+ .withTopic(TOPIC_2)
+ .withCodec(CODEC_2)
.withConfigMap(CONFIG_MAP_2)
.build();
}
@@ -151,6 +189,46 @@
assertThat(config.lingerMs(), is(LINGER_MS_1));
assertThat(config.keySerializer(), is(KEY_SERIALIZER_1));
assertThat(config.valueSerializer(), is(VALUE_SERIALIZER_1));
+ assertThat(config.key(), is(KEY_1));
+ assertThat(config.topic(), is(TOPIC_1));
+ assertThat(config.codec(), is(CODEC_1));
assertThat(config.configMap(), is(CONFIG_MAP_1));
}
-}
+
+ /**
+ * Tests props extraction.
+ */
+ @Test
+ public void testPropsExtraction() {
+ Map<String, String> props = Maps.newConcurrentMap();
+ props.put(ADDRESS, IP_ADDRESS_1);
+ props.put(PORT, String.valueOf(PORT_1));
+ props.put(RETRIES, String.valueOf(RETRIES_1));
+ props.put(BATCH_SIZE, String.valueOf(BATCH_SIZE_1));
+ props.put(MEMORY_BUFFER, String.valueOf(MEMORY_BUFFER_1));
+ props.put(REQUIRED_ACKS, REQUIRED_ACKS_1);
+ props.put(LINGER_MS, String.valueOf(LINGER_MS_1));
+ props.put(KEY_SERIALIZER, KEY_SERIALIZER_1);
+ props.put(VALUE_SERIALIZER, VALUE_SERIALIZER_1);
+ props.put(KEY, KEY_1);
+ props.put(TOPIC, TOPIC_1);
+ props.put(CODEC, CODEC_1);
+
+ TelemetryConfig config = new DefaultTelemetryConfig(DUMMY, KAFKA,
+ ImmutableList.of(), DUMMY, DUMMY, false, props);
+
+ KafkaTelemetryConfig kafkaConfig = fromTelemetryConfig(config);
+ assertThat(kafkaConfig.address(), is(IP_ADDRESS_1));
+ assertThat(kafkaConfig.port(), is(PORT_1));
+ assertThat(kafkaConfig.retries(), is(RETRIES_1));
+ assertThat(kafkaConfig.batchSize(), is(BATCH_SIZE_1));
+ assertThat(kafkaConfig.memoryBuffer(), is(MEMORY_BUFFER_1));
+ assertThat(kafkaConfig.requiredAcks(), is(REQUIRED_ACKS_1));
+ assertThat(kafkaConfig.lingerMs(), is(LINGER_MS_1));
+ assertThat(kafkaConfig.keySerializer(), is(KEY_SERIALIZER_1));
+ assertThat(kafkaConfig.valueSerializer(), is(VALUE_SERIALIZER_1));
+ assertThat(kafkaConfig.key(), is(KEY_1));
+ assertThat(kafkaConfig.topic(), is(TOPIC_1));
+ assertThat(kafkaConfig.codec(), is(CODEC_1));
+ }
+}
\ No newline at end of file
diff --git a/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/config/DefaultPrometheusTelemetryConfigTest.java b/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/config/DefaultPrometheusTelemetryConfigTest.java
index 7012ee8..de94f83 100644
--- a/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/config/DefaultPrometheusTelemetryConfigTest.java
+++ b/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/config/DefaultPrometheusTelemetryConfigTest.java
@@ -15,36 +15,43 @@
*/
package org.onosproject.openstacktelemetry.config;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
import com.google.common.testing.EqualsTester;
import org.junit.Before;
import org.junit.Test;
import org.onosproject.openstacktelemetry.api.config.PrometheusTelemetryConfig;
+import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
+import org.onosproject.openstacktelemetry.impl.DefaultTelemetryConfig;
import java.util.Map;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.onlab.junit.ImmutableClassChecker.assertThatClassIsImmutable;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_PROMETHEUS_EXPORTER_PORT_DEFAULT;
-import static org.onosproject.openstacktelemetry.impl.OsgiPropertyConstants.PROP_PROMETHEUS_EXPORTER_ADDRESS_DEFAULT;
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.PROMETHEUS;
+import static org.onosproject.openstacktelemetry.config.DefaultPrometheusTelemetryConfig.ADDRESS;
+import static org.onosproject.openstacktelemetry.config.DefaultPrometheusTelemetryConfig.PORT;
+import static org.onosproject.openstacktelemetry.config.DefaultPrometheusTelemetryConfig.fromTelemetryConfig;
/**
* Unit tests for DefaultPrometheusTelemetryConfig class.
*/
public class DefaultPrometheusTelemetryConfigTest {
-
- private static final String IP_ADDRESS_1 = PROP_PROMETHEUS_EXPORTER_ADDRESS_DEFAULT;
+ private static final String IP_ADDRESS_1 = "10.10.1.1";
private static final String IP_ADDRESS_2 = "10.10.1.2";
- private static final int PORT_1 = PROP_PROMETHEUS_EXPORTER_PORT_DEFAULT;
- private static final int PORT_2 = PROP_PROMETHEUS_EXPORTER_PORT_DEFAULT + 1;
+ private static final int PORT_1 = 50050;
+ private static final int PORT_2 = 50051;
private static final Map<String, Object> CONFIG_MAP_1 =
ImmutableMap.of("key1", "value1");
private static final Map<String, Object> CONFIG_MAP_2 =
ImmutableMap.of("key2", "value2");
+ private static final String DUMMY = "dummy";
+
private PrometheusTelemetryConfig config1;
private PrometheusTelemetryConfig sameAsConfig1;
private PrometheusTelemetryConfig config2;
@@ -110,4 +117,21 @@
assertThat(config.port(), is(PORT_1));
assertThat(config.configMap(), is(CONFIG_MAP_1));
}
-}
+
+ /**
+ * Tests props extraction.
+ */
+ @Test
+ public void testPropsExtraction() {
+ Map<String, String> props = Maps.newConcurrentMap();
+ props.put(ADDRESS, IP_ADDRESS_1);
+ props.put(PORT, String.valueOf(PORT_1));
+
+ TelemetryConfig config = new DefaultTelemetryConfig(DUMMY, PROMETHEUS,
+ ImmutableList.of(), DUMMY, DUMMY, false, props);
+
+ PrometheusTelemetryConfig prometheusConfig = fromTelemetryConfig(config);
+ assertThat(prometheusConfig.address(), is(IP_ADDRESS_1));
+ assertThat(prometheusConfig.port(), is(PORT_1));
+ }
+}
\ No newline at end of file
diff --git a/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/config/DefaultRestTelemetryConfigTest.java b/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/config/DefaultRestTelemetryConfigTest.java
index 3e8bac06..c78bb63 100644
--- a/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/config/DefaultRestTelemetryConfigTest.java
+++ b/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/config/DefaultRestTelemetryConfigTest.java
@@ -15,17 +15,28 @@
*/
package org.onosproject.openstacktelemetry.config;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
import com.google.common.testing.EqualsTester;
import org.junit.Before;
import org.junit.Test;
import org.onosproject.openstacktelemetry.api.config.RestTelemetryConfig;
+import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
+import org.onosproject.openstacktelemetry.impl.DefaultTelemetryConfig;
import java.util.Map;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.onlab.junit.ImmutableClassChecker.assertThatClassIsImmutable;
+import static org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType.REST;
+import static org.onosproject.openstacktelemetry.config.DefaultRestTelemetryConfig.ADDRESS;
+import static org.onosproject.openstacktelemetry.config.DefaultRestTelemetryConfig.ENDPOINT;
+import static org.onosproject.openstacktelemetry.config.DefaultRestTelemetryConfig.METHOD;
+import static org.onosproject.openstacktelemetry.config.DefaultRestTelemetryConfig.PORT;
+import static org.onosproject.openstacktelemetry.config.DefaultRestTelemetryConfig.REQUEST_MEDIA_TYPE;
+import static org.onosproject.openstacktelemetry.config.DefaultRestTelemetryConfig.RESPONSE_MEDIA_TYPE;
public final class DefaultRestTelemetryConfigTest {
@@ -52,6 +63,8 @@
private static final Map<String, Object> CONFIG_MAP_2 =
ImmutableMap.of("key2", "value2");
+ private static final String DUMMY = "dummy";
+
private RestTelemetryConfig config1;
private RestTelemetryConfig sameAsConfig1;
private RestTelemetryConfig config2;
@@ -133,4 +146,28 @@
assertThat(config.responseMediaType(), is(RESPONSE_MEDIA_TYPE_1));
assertThat(config.configMap(), is(CONFIG_MAP_1));
}
-}
+
+ /**
+ * Tests props extraction.
+ */
+ @Test
+ public void testPropsExtraction() {
+ Map<String, String> props = Maps.newConcurrentMap();
+ props.put(ADDRESS, IP_ADDRESS_1);
+ props.put(PORT, String.valueOf(PORT_1));
+ props.put(ENDPOINT, ENDPOINT_1);
+ props.put(METHOD, METHOD_1);
+ props.put(REQUEST_MEDIA_TYPE, REQUEST_MEDIA_TYPE_1);
+ props.put(RESPONSE_MEDIA_TYPE, RESPONSE_MEDIA_TYPE_1);
+
+ TelemetryConfig config = new DefaultTelemetryConfig(DUMMY, REST,
+ ImmutableList.of(), DUMMY, DUMMY, false, props);
+ RestTelemetryConfig restConfig = DefaultRestTelemetryConfig.fromTelemetryConfig(config);
+ assertThat(restConfig.address(), is(IP_ADDRESS_1));
+ assertThat(restConfig.port(), is(PORT_1));
+ assertThat(restConfig.endpoint(), is(ENDPOINT_1));
+ assertThat(restConfig.method(), is(METHOD_1));
+ assertThat(restConfig.requestMediaType(), is(REQUEST_MEDIA_TYPE_1));
+ assertThat(restConfig.responseMediaType(), is(RESPONSE_MEDIA_TYPE_1));
+ }
+}
\ No newline at end of file
diff --git a/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/impl/DefaultTelemetryConfigTest.java b/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/impl/DefaultTelemetryConfigTest.java
new file mode 100644
index 0000000..637f9fc
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/impl/DefaultTelemetryConfigTest.java
@@ -0,0 +1,116 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.impl;
+
+import com.google.common.collect.Maps;
+import com.google.common.testing.EqualsTester;
+import org.junit.Before;
+import org.junit.Test;
+import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
+import org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType;
+
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.onlab.junit.ImmutableClassChecker.assertThatClassIsImmutable;
+
+/**
+ * Unit tests for DefaultTelemetryConfig class.
+ */
+public final class DefaultTelemetryConfigTest {
+
+ private static final String NAME_1 = "grpc";
+ private static final String NAME_2 = "kafka";
+
+ private static final ConfigType TYPE_1 = ConfigType.GRPC;
+ private static final ConfigType TYPE_2 = ConfigType.KAFKA;
+
+ private static final String MANUFACTURER_1 = "grpc.io";
+ private static final String MANUFACTURER_2 = "kafka.apache.org";
+
+ private static final String SW_VERSION_1 = "1.0";
+ private static final String SW_VERSION_2 = "1.0";
+
+ private static final Map<String, String> PROP_1 = Maps.newConcurrentMap();
+ private static final Map<String, String> PROP_2 = Maps.newConcurrentMap();
+
+ private static final String PROP_1_KEY_1 = "key11";
+ private static final String PROP_1_KEY_2 = "key12";
+ private static final String PROP_1_VALUE_1 = "value11";
+ private static final String PROP_1_VALUE_2 = "value12";
+ private static final String PROP_2_KEY_1 = "key21";
+ private static final String PROP_2_KEY_2 = "key22";
+ private static final String PROP_2_VALUE_1 = "value21";
+ private static final String PROP_2_VALUE_2 = "value22";
+
+ private static final boolean ENABLED_1 = true;
+ private static final boolean ENABLED_2 = false;
+
+ private TelemetryConfig config1;
+ private TelemetryConfig sameAsConfig1;
+ private TelemetryConfig config2;
+
+ /**
+ * Initial setup for this unit test.
+ */
+ @Before
+ public void setup() {
+ PROP_1.put(PROP_1_KEY_1, PROP_1_VALUE_1);
+ PROP_1.put(PROP_1_KEY_2, PROP_1_VALUE_2);
+ PROP_2.put(PROP_2_KEY_1, PROP_2_VALUE_1);
+ PROP_2.put(PROP_2_KEY_2, PROP_2_VALUE_2);
+
+ config1 = new DefaultTelemetryConfig(NAME_1, TYPE_1, null,
+ MANUFACTURER_1, SW_VERSION_1, ENABLED_1, PROP_1);
+ sameAsConfig1 = new DefaultTelemetryConfig(NAME_1, TYPE_1, null,
+ MANUFACTURER_1, SW_VERSION_1, ENABLED_1, PROP_1);
+ config2 = new DefaultTelemetryConfig(NAME_2, TYPE_2, null,
+ MANUFACTURER_2, SW_VERSION_2, ENABLED_2, PROP_2);
+ }
+
+ /**
+ * Tests class immutability.
+ */
+ @Test
+ public void testImmutability() {
+ assertThatClassIsImmutable(DefaultTelemetryConfig.class);
+ }
+
+ /**
+ * Tests object equality.
+ */
+ @Test
+ public void testEquality() {
+ new EqualsTester()
+ .addEqualityGroup(config1, sameAsConfig1)
+ .addEqualityGroup(config2).testEquals();
+ }
+
+ /**
+ * Tests object construction.
+ */
+ @Test
+ public void testConstruction() {
+ TelemetryConfig config = config1;
+
+ assertEquals(config.name(), NAME_1);
+ assertEquals(config.type(), TYPE_1);
+ assertEquals(config.manufacturer(), MANUFACTURER_1);
+ assertEquals(config.swVersion(), SW_VERSION_1);
+ assertEquals(config.properties(), PROP_1);
+ assertEquals(config.enabled(), ENABLED_1);
+ }
+}
diff --git a/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/impl/DistributedTelemetryConfigStoreTest.java b/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/impl/DistributedTelemetryConfigStoreTest.java
new file mode 100644
index 0000000..511fa6d
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/impl/DistributedTelemetryConfigStoreTest.java
@@ -0,0 +1,138 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.impl;
+
+import com.google.common.collect.Maps;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.onosproject.TestApplicationId;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreServiceAdapter;
+import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
+import org.onosproject.openstacktelemetry.api.config.TelemetryConfig.ConfigType;
+import org.onosproject.store.service.TestStorageService;
+
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Distributed TelemetryConfig store test suite.
+ */
+public class DistributedTelemetryConfigStoreTest {
+
+ private static final String NAME_1 = "grpc";
+ private static final String NAME_2 = "kafka";
+
+ private static final ConfigType TYPE_1 = ConfigType.GRPC;
+ private static final ConfigType TYPE_2 = ConfigType.KAFKA;
+
+ private static final String MANUFACTURER_1 = "grpc.io";
+ private static final String MANUFACTURER_2 = "kafka.apache.org";
+
+ private static final String SW_VERSION_1 = "1.0";
+ private static final String SW_VERSION_2 = "1.0";
+
+ private static final Map<String, String> PROP_1 = Maps.newConcurrentMap();
+ private static final Map<String, String> PROP_2 = Maps.newConcurrentMap();
+
+ private static final String PROP_1_KEY_1 = "key11";
+ private static final String PROP_1_KEY_2 = "key12";
+ private static final String PROP_1_VALUE_1 = "value11";
+ private static final String PROP_1_VALUE_2 = "value12";
+ private static final String PROP_2_KEY_1 = "key21";
+ private static final String PROP_2_KEY_2 = "key22";
+ private static final String PROP_2_VALUE_1 = "value21";
+ private static final String PROP_2_VALUE_2 = "value22";
+
+ private static final boolean ENABLED_1 = true;
+ private static final boolean ENABLED_2 = false;
+
+ private TelemetryConfig config1;
+ private TelemetryConfig config2;
+
+ private DistributedTelemetryConfigStore configStore;
+
+ /**
+ * Sets up the telemetry config store and the storage service test harness.
+ */
+ @Before
+ public void setUp() {
+ configStore = new DistributedTelemetryConfigStore();
+ configStore.storageService = new TestStorageService();
+ configStore.coreService = new TestCoreService();
+ configStore.setDelegate(event -> {
+ });
+ configStore.activate();
+
+ initTelemetryConfigs();
+ }
+
+ /**
+ * Tears down the telemetry config store.
+ */
+ @After
+ public void tearDown() {
+ configStore.deactivate();
+ }
+
+ /**
+ * Tests adding, removing and getting.
+ */
+ @Test
+ public void basics() {
+ configStore.createTelemetryConfig(config1);
+ assertTrue("There should be one telemetry config in the set.",
+ configStore.telemetryConfigs().contains(config1));
+ assertTrue("The same telemetry config should be returned.",
+ configStore.telemetryConfigsByType(ConfigType.GRPC).contains(config1));
+ assertEquals("The telemetry config should be the same.",
+ configStore.telemetryConfig(NAME_1), config1);
+ configStore.removeTelemetryConfig(NAME_1);
+ assertFalse("There should be no telemetry config in the set.",
+ configStore.telemetryConfigs().contains(config1));
+
+ configStore.createTelemetryConfig(config1);
+ configStore.createTelemetryConfig(config2);
+ assertEquals("There should be two configs in the sets.",
+ configStore.telemetryConfigs().size(), 2);
+ }
+
+ /**
+ * Test core service; For generate test application ID.
+ */
+ public class TestCoreService extends CoreServiceAdapter {
+ @Override
+ public ApplicationId registerApplication(String name) {
+ return TestApplicationId.create(name);
+ }
+ }
+
+ private void initTelemetryConfigs() {
+ PROP_1.put(PROP_1_KEY_1, PROP_1_VALUE_1);
+ PROP_1.put(PROP_1_KEY_2, PROP_1_VALUE_2);
+ PROP_2.put(PROP_2_KEY_1, PROP_2_VALUE_1);
+ PROP_2.put(PROP_2_KEY_2, PROP_2_VALUE_2);
+
+ config1 = new DefaultTelemetryConfig(NAME_1, TYPE_1, null,
+ MANUFACTURER_1, SW_VERSION_1, ENABLED_1, PROP_1);
+ config2 = new DefaultTelemetryConfig(NAME_2, TYPE_2, null,
+ MANUFACTURER_2, SW_VERSION_2, ENABLED_2, PROP_2);
+ }
+}
diff --git a/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManagerTest.java b/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManagerTest.java
index 47ef6d9..0362492 100644
--- a/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManagerTest.java
+++ b/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryManagerTest.java
@@ -18,7 +18,7 @@
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
-import org.onosproject.openstacktelemetry.api.TelemetryService;
+import org.onosproject.openstacktelemetry.api.TelemetryAdminService;
import static org.junit.Assert.assertEquals;
@@ -27,11 +27,11 @@
*/
public final class OpenstackTelemetryManagerTest {
- private static final TelemetryService GRPC_SERVICE = new GrpcTelemetryManager();
- private static final TelemetryService INFLUXDB_SERVICE = new InfluxDbTelemetryManager();
- private static final TelemetryService KAFKA_SERVICE = new KafkaTelemetryManager();
- private static final TelemetryService PROMETHEUS_SERVICE = new PrometheusTelemetryManager();
- private static final TelemetryService REST_SERVICE = new PrometheusTelemetryManager();
+ private static final TelemetryAdminService GRPC_SERVICE = new GrpcTelemetryManager();
+ private static final TelemetryAdminService INFLUXDB_SERVICE = new InfluxDbTelemetryManager();
+ private static final TelemetryAdminService KAFKA_SERVICE = new KafkaTelemetryManager();
+ private static final TelemetryAdminService PROMETHEUS_SERVICE = new PrometheusTelemetryManager();
+ private static final TelemetryAdminService REST_SERVICE = new PrometheusTelemetryManager();
private OpenstackTelemetryManager manager;
@@ -42,6 +42,8 @@
public void setUp() {
manager = new OpenstackTelemetryManager();
+ manager.telemetryConfigService = new TelemetryConfigManager();
+
manager.activate();
}
@@ -52,7 +54,7 @@
public void testAddTelemetryService() {
addDefaultServices();
- TelemetryService kafkaService = new KafkaTelemetryManager();
+ TelemetryAdminService kafkaService = new KafkaTelemetryManager();
assertEquals(5, manager.telemetryServices().size());
diff --git a/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryServiceAdapter.java b/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryServiceAdapter.java
index 129fec5..dafb059 100644
--- a/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryServiceAdapter.java
+++ b/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/impl/OpenstackTelemetryServiceAdapter.java
@@ -19,7 +19,7 @@
import com.google.common.collect.Sets;
import org.onosproject.openstacktelemetry.api.FlowInfo;
import org.onosproject.openstacktelemetry.api.OpenstackTelemetryService;
-import org.onosproject.openstacktelemetry.api.TelemetryService;
+import org.onosproject.openstacktelemetry.api.TelemetryAdminService;
import java.util.Set;
@@ -28,15 +28,15 @@
*/
public class OpenstackTelemetryServiceAdapter implements OpenstackTelemetryService {
- Set<TelemetryService> services = Sets.newConcurrentHashSet();
+ Set<TelemetryAdminService> services = Sets.newConcurrentHashSet();
@Override
- public void addTelemetryService(TelemetryService telemetryService) {
+ public void addTelemetryService(TelemetryAdminService telemetryService) {
services.add(telemetryService);
}
@Override
- public void removeTelemetryService(TelemetryService telemetryService) {
+ public void removeTelemetryService(TelemetryAdminService telemetryService) {
services.remove(telemetryService);
}
@@ -46,7 +46,12 @@
}
@Override
- public Set<TelemetryService> telemetryServices() {
+ public Set<TelemetryAdminService> telemetryServices() {
return ImmutableSet.copyOf(services);
}
+
+ @Override
+ public TelemetryAdminService telemetryService(String type) {
+ return null;
+ }
}
diff --git a/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/impl/XmlTelemetryConfigLoaderTest.java b/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/impl/XmlTelemetryConfigLoaderTest.java
new file mode 100644
index 0000000..6188cef
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/test/java/org/onosproject/openstacktelemetry/impl/XmlTelemetryConfigLoaderTest.java
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.onosproject.openstacktelemetry.impl;
+
+import org.junit.Test;
+import org.onosproject.openstacktelemetry.api.TelemetryConfigProvider;
+import org.onosproject.openstacktelemetry.api.config.TelemetryConfig;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Iterator;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests of the XML configuration loader implementation.
+ */
+public class XmlTelemetryConfigLoaderTest {
+
+ @Test
+ public void basics() throws IOException {
+ XmlTelemetryConfigLoader loader = new XmlTelemetryConfigLoader();
+ InputStream stream = getClass().getResourceAsStream("configs.singleInheritance.xml");
+ TelemetryConfigProvider provider = loader.loadTelemetryConfigs(stream);
+
+ assertEquals("incorrect config count", 2, provider.getTelemetryConfigs().size());
+
+ TelemetryConfig config = getConfig(provider, "foo.1");
+
+ assertEquals("incorrect config name", "foo.1", config.name());
+ assertEquals("incorrect config type", "grpc", config.type().name().toLowerCase());
+ assertEquals("incorrect config mfg", "Circus", config.manufacturer());
+ assertEquals("incorrect config sw", "2.2", config.swVersion());
+
+ assertEquals("incorrect config properties", 2, config.properties().size());
+ assertTrue("incorrect config property", config.properties().containsKey("p1"));
+ }
+
+ @Test
+ public void multipleDrivers() throws IOException {
+ XmlTelemetryConfigLoader loader = new XmlTelemetryConfigLoader();
+ InputStream stream = getClass().getResourceAsStream("configs.multipleInheritance.xml");
+ TelemetryConfigProvider provider = loader.loadTelemetryConfigs(stream);
+
+ TelemetryConfig config1 = getConfig(provider, "foo.1");
+
+ assertEquals("incorrect config mfg", "Circus", config1.manufacturer());
+ assertEquals("incorrect config sw", "2.2", config1.swVersion());
+
+ assertEquals("incorrect config type", "grpc", config1.type().name().toLowerCase());
+ assertEquals("incorrect config properties", 3, config1.properties().size());
+ assertTrue("incorrect config property", config1.properties().containsKey("p0"));
+ assertTrue("incorrect config property", config1.properties().containsKey("p1"));
+ assertTrue("incorrect config property", config1.properties().containsKey("p2"));
+
+ TelemetryConfig config2 = getConfig(provider, "foo.2");
+ assertEquals("incorrect config type", "grpc", config2.type().name().toLowerCase());
+ assertEquals("incorrect config mfg", "Big Top OEM", config2.manufacturer());
+ assertEquals("incorrect config sw", "2.2", config2.swVersion());
+
+ assertEquals("incorrect config properties", 4, config2.properties().size());
+ assertTrue("incorrect config property", config2.properties().containsKey("p0"));
+ assertTrue("incorrect config property", config2.properties().containsKey("p1"));
+ assertTrue("incorrect config property", config2.properties().containsKey("p2"));
+ assertTrue("incorrect config property", config2.properties().containsKey("p3"));
+ }
+
+ private TelemetryConfig getConfig(TelemetryConfigProvider provider, String name) {
+ Iterator<TelemetryConfig> iterator = provider.getTelemetryConfigs().iterator();
+ TelemetryConfig config;
+ do {
+ config = iterator.next();
+ } while (!name.equals(config.name()));
+ return config;
+ }
+}
diff --git a/apps/openstacktelemetry/app/src/test/resources/org/onosproject/openstacktelemetry/impl/configs.multipleInheritance.xml b/apps/openstacktelemetry/app/src/test/resources/org/onosproject/openstacktelemetry/impl/configs.multipleInheritance.xml
new file mode 100644
index 0000000..0c56b26
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/test/resources/org/onosproject/openstacktelemetry/impl/configs.multipleInheritance.xml
@@ -0,0 +1,33 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright 2018-present Open Networking Foundation
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<configs>
+ <config name="foo.0" type="grpc" manufacturer="Circus" swVersion="2.0">
+ <property name="p0">v0</property>
+ </config>
+
+ <config name="foo.1" extends="foo.0" swVersion="2.2">
+ <fingerprint>ding</fingerprint>
+ <fingerprint>bat</fingerprint>
+
+ <property name="p1">v1</property>
+ <property name="p2">v2</property>
+ </config>
+
+ <config name="foo.2" extends="foo.1" manufacturer="Big Top OEM">
+ <property name="p3">v3</property>
+ </config>
+</configs>
\ No newline at end of file
diff --git a/apps/openstacktelemetry/app/src/test/resources/org/onosproject/openstacktelemetry/impl/configs.singleInheritance.xml b/apps/openstacktelemetry/app/src/test/resources/org/onosproject/openstacktelemetry/impl/configs.singleInheritance.xml
new file mode 100644
index 0000000..8b6e5be
--- /dev/null
+++ b/apps/openstacktelemetry/app/src/test/resources/org/onosproject/openstacktelemetry/impl/configs.singleInheritance.xml
@@ -0,0 +1,28 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Copyright 2018-present Open Networking Foundation
+ ~
+ ~ Licensed under the Apache License, Version 2.0 (the "License");
+ ~ you may not use this file except in compliance with the License.
+ ~ You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<configs>
+ <config name="foo.0" type="grpc" manufacturer="Circus" swVersion="2.0">
+ </config>
+
+ <config name="foo.1" extends="foo.0" swVersion="2.2">
+ <fingerprint>ding</fingerprint>
+ <fingerprint>bat</fingerprint>
+
+ <property name="p1">v1</property>
+ <property name="p2">v2</property>
+ </config>
+</configs>
\ No newline at end of file