blob: 20d6599c963d437e5f23ad5adf3341111d071ed3 [file] [log] [blame]
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -08001/**
2* Copyright 2011, Big Switch Networks, Inc.
3* Originally created by David Erickson, Stanford University
4*
5* Licensed under the Apache License, Version 2.0 (the "License"); you may
6* not use this file except in compliance with the License. You may obtain
7* a copy of the License at
8*
9* http://www.apache.org/licenses/LICENSE-2.0
10*
11* Unless required by applicable law or agreed to in writing, software
12* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14* License for the specific language governing permissions and limitations
15* under the License.
16**/
17
18package net.floodlightcontroller.storage;
19
20import java.util.ArrayList;
21import java.util.Collection;
22import java.util.HashMap;
23import java.util.List;
24import java.util.Map;
25import java.util.Set;
26import java.util.concurrent.Callable;
27import java.util.concurrent.ConcurrentHashMap;
28import java.util.concurrent.CopyOnWriteArraySet;
29import java.util.concurrent.ExecutorService;
30import java.util.concurrent.Executors;
31import java.util.concurrent.Future;
32
33import net.floodlightcontroller.core.annotations.LogMessageCategory;
34import net.floodlightcontroller.core.annotations.LogMessageDoc;
35import net.floodlightcontroller.core.module.FloodlightModuleContext;
36import net.floodlightcontroller.core.module.FloodlightModuleException;
37import net.floodlightcontroller.core.module.IFloodlightModule;
38import net.floodlightcontroller.core.module.IFloodlightService;
39import net.floodlightcontroller.counter.ICounter;
40import net.floodlightcontroller.counter.CounterStore;
41import net.floodlightcontroller.counter.ICounterStoreService;
42import net.floodlightcontroller.counter.CounterValue.CounterType;
43import net.floodlightcontroller.restserver.IRestApiService;
44import net.floodlightcontroller.storage.web.StorageWebRoutable;
45
46import org.slf4j.Logger;
47import org.slf4j.LoggerFactory;
48
49@LogMessageCategory("System Database")
50public abstract class AbstractStorageSource
51 implements IStorageSourceService, IFloodlightModule {
Yuta HIGUCHI6ac8d182013-10-22 15:24:56 -070052 protected final static Logger logger = LoggerFactory.getLogger(AbstractStorageSource.class);
Umesh Krishnaswamy345ee992012-12-13 20:29:48 -080053
54 // Shared instance of the executor to use to execute the storage tasks.
55 // We make this a single threaded executor, because if we used a thread pool
56 // then storage operations could be executed out of order which would cause
57 // problems in some cases (e.g. delete and update of a row getting reordered).
58 // If we wanted to make this more multi-threaded we could have multiple
59 // worker threads/executors with affinity of operations on a given table
60 // to a single worker thread. But for now, we'll keep it simple and just have
61 // a single thread for all operations.
62 protected static ExecutorService defaultExecutorService = Executors.newSingleThreadExecutor();
63
64 protected final static String STORAGE_QUERY_COUNTER_NAME = "StorageQuery";
65 protected final static String STORAGE_UPDATE_COUNTER_NAME = "StorageUpdate";
66 protected final static String STORAGE_DELETE_COUNTER_NAME = "StorageDelete";
67
68 protected Set<String> allTableNames = new CopyOnWriteArraySet<String>();
69 protected ICounterStoreService counterStore;
70 protected ExecutorService executorService = defaultExecutorService;
71 protected IStorageExceptionHandler exceptionHandler;
72
73 private Map<String, Set<IStorageSourceListener>> listeners =
74 new ConcurrentHashMap<String, Set<IStorageSourceListener>>();
75
76 // Our dependencies
77 protected IRestApiService restApi = null;
78
79 protected static final String DB_ERROR_EXPLANATION =
80 "An unknown error occurred while executing asynchronous " +
81 "database operation";
82
83 @LogMessageDoc(level="ERROR",
84 message="Failure in asynchronous call to executeQuery",
85 explanation=DB_ERROR_EXPLANATION,
86 recommendation=LogMessageDoc.GENERIC_ACTION)
87 abstract class StorageCallable<V> implements Callable<V> {
88 public V call() {
89 try {
90 return doStorageOperation();
91 }
92 catch (StorageException e) {
93 logger.error("Failure in asynchronous call to executeQuery", e);
94 if (exceptionHandler != null)
95 exceptionHandler.handleException(e);
96 throw e;
97 }
98 }
99 abstract protected V doStorageOperation();
100 }
101
102 @LogMessageDoc(level="ERROR",
103 message="Failure in asynchronous call to updateRows",
104 explanation=DB_ERROR_EXPLANATION,
105 recommendation=LogMessageDoc.GENERIC_ACTION)
106 abstract class StorageRunnable implements Runnable {
107 public void run() {
108 try {
109 doStorageOperation();
110 }
111 catch (StorageException e) {
112 logger.error("Failure in asynchronous call to updateRows", e);
113 if (exceptionHandler != null)
114 exceptionHandler.handleException(e);
115 throw e;
116 }
117 }
118 abstract void doStorageOperation();
119 }
120
121 public AbstractStorageSource() {
122 this.executorService = defaultExecutorService;
123 }
124
125 public void setExecutorService(ExecutorService executorService) {
126 this.executorService = (executorService != null) ?
127 executorService : defaultExecutorService;
128 }
129
130 @Override
131 public void setExceptionHandler(IStorageExceptionHandler exceptionHandler) {
132 this.exceptionHandler = exceptionHandler;
133 }
134
135 @Override
136 public abstract void setTablePrimaryKeyName(String tableName, String primaryKeyName);
137
138 @Override
139 public void createTable(String tableName, Set<String> indexedColumns) {
140 allTableNames.add(tableName);
141 }
142
143 @Override
144 public Set<String> getAllTableNames() {
145 return allTableNames;
146 }
147
148 public void setCounterStore(CounterStore counterStore) {
149 this.counterStore = counterStore;
150 }
151
152 protected void updateCounters(String baseName, String tableName) {
153 if (counterStore != null) {
154 String counterName;
155 if (tableName != null) {
156 updateCounters(baseName, null);
157 counterName = baseName + CounterStore.TitleDelimitor + tableName;
158 } else {
159 counterName = baseName;
160 }
161 ICounter counter = counterStore.getCounter(counterName);
162 if (counter == null) {
163 counter = counterStore.createCounter(counterName, CounterType.LONG);
164 }
165 counter.increment();
166 }
167 }
168
169 @Override
170 public abstract IQuery createQuery(String tableName, String[] columnNames,
171 IPredicate predicate, RowOrdering ordering);
172
173 @Override
174 public IResultSet executeQuery(IQuery query) {
175 updateCounters(STORAGE_QUERY_COUNTER_NAME, query.getTableName());
176 return executeQueryImpl(query);
177 }
178
179 protected abstract IResultSet executeQueryImpl(IQuery query);
180
181 @Override
182 public IResultSet executeQuery(String tableName, String[] columnNames,
183 IPredicate predicate, RowOrdering ordering) {
184 IQuery query = createQuery(tableName, columnNames, predicate, ordering);
185 IResultSet resultSet = executeQuery(query);
186 return resultSet;
187 }
188
189 @Override
190 public Object[] executeQuery(String tableName, String[] columnNames,
191 IPredicate predicate, RowOrdering ordering, IRowMapper rowMapper) {
192 List<Object> objectList = new ArrayList<Object>();
193 IResultSet resultSet = executeQuery(tableName, columnNames, predicate, ordering);
194 while (resultSet.next()) {
195 Object object = rowMapper.mapRow(resultSet);
196 objectList.add(object);
197 }
198 return objectList.toArray();
199 }
200
201 @Override
202 public Future<IResultSet> executeQueryAsync(final IQuery query) {
203 Future<IResultSet> future = executorService.submit(
204 new StorageCallable<IResultSet>() {
205 public IResultSet doStorageOperation() {
206 return executeQuery(query);
207 }
208 });
209 return future;
210 }
211
212 @Override
213 public Future<IResultSet> executeQueryAsync(final String tableName,
214 final String[] columnNames, final IPredicate predicate,
215 final RowOrdering ordering) {
216 Future<IResultSet> future = executorService.submit(
217 new StorageCallable<IResultSet>() {
218 public IResultSet doStorageOperation() {
219 return executeQuery(tableName, columnNames,
220 predicate, ordering);
221 }
222 });
223 return future;
224 }
225
226 @Override
227 public Future<Object[]> executeQueryAsync(final String tableName,
228 final String[] columnNames, final IPredicate predicate,
229 final RowOrdering ordering, final IRowMapper rowMapper) {
230 Future<Object[]> future = executorService.submit(
231 new StorageCallable<Object[]>() {
232 public Object[] doStorageOperation() {
233 return executeQuery(tableName, columnNames, predicate,
234 ordering, rowMapper);
235 }
236 });
237 return future;
238 }
239
240 @Override
241 public Future<?> insertRowAsync(final String tableName,
242 final Map<String,Object> values) {
243 Future<?> future = executorService.submit(
244 new StorageRunnable() {
245 public void doStorageOperation() {
246 insertRow(tableName, values);
247 }
248 }, null);
249 return future;
250 }
251
252 @Override
253 public Future<?> updateRowsAsync(final String tableName, final List<Map<String,Object>> rows) {
254 Future<?> future = executorService.submit(
255 new StorageRunnable() {
256 public void doStorageOperation() {
257 updateRows(tableName, rows);
258 }
259 }, null);
260 return future;
261 }
262
263 @Override
264 public Future<?> updateMatchingRowsAsync(final String tableName,
265 final IPredicate predicate, final Map<String,Object> values) {
266 Future<?> future = executorService.submit(
267 new StorageRunnable() {
268 public void doStorageOperation() {
269 updateMatchingRows(tableName, predicate, values);
270 }
271 }, null);
272 return future;
273 }
274
275 @Override
276 public Future<?> updateRowAsync(final String tableName,
277 final Object rowKey, final Map<String,Object> values) {
278 Future<?> future = executorService.submit(
279 new StorageRunnable() {
280 public void doStorageOperation() {
281 updateRow(tableName, rowKey, values);
282 }
283 }, null);
284 return future;
285 }
286
287 @Override
288 public Future<?> updateRowAsync(final String tableName,
289 final Map<String,Object> values) {
290 Future<?> future = executorService.submit(
291 new StorageRunnable() {
292 public void doStorageOperation() {
293 updateRow(tableName, values);
294 }
295 }, null);
296 return future;
297 }
298
299 @Override
300 public Future<?> deleteRowAsync(final String tableName, final Object rowKey) {
301 Future<?> future = executorService.submit(
302 new StorageRunnable() {
303 public void doStorageOperation() {
304 deleteRow(tableName, rowKey);
305 }
306 }, null);
307 return future;
308 }
309
310 @Override
311 public Future<?> deleteRowsAsync(final String tableName, final Set<Object> rowKeys) {
312 Future<?> future = executorService.submit(
313 new StorageRunnable() {
314 public void doStorageOperation() {
315 deleteRows(tableName, rowKeys);
316 }
317 }, null);
318 return future;
319 }
320
321 @Override
322 public Future<?> deleteMatchingRowsAsync(final String tableName, final IPredicate predicate) {
323 Future<?> future = executorService.submit(
324 new StorageRunnable() {
325 public void doStorageOperation() {
326 deleteMatchingRows(tableName, predicate);
327 }
328 }, null);
329 return future;
330 }
331
332 @Override
333 public Future<?> getRowAsync(final String tableName, final Object rowKey) {
334 Future<?> future = executorService.submit(
335 new StorageRunnable() {
336 public void doStorageOperation() {
337 getRow(tableName, rowKey);
338 }
339 }, null);
340 return future;
341 }
342
343 @Override
344 public Future<?> saveAsync(final IResultSet resultSet) {
345 Future<?> future = executorService.submit(
346 new StorageRunnable() {
347 public void doStorageOperation() {
348 resultSet.save();
349 }
350 }, null);
351 return future;
352 }
353
354 @Override
355 public void insertRow(String tableName, Map<String, Object> values) {
356 updateCounters(STORAGE_UPDATE_COUNTER_NAME, tableName);
357 insertRowImpl(tableName, values);
358 }
359
360 protected abstract void insertRowImpl(String tableName, Map<String, Object> values);
361
362
363 @Override
364 public void updateRows(String tableName, List<Map<String,Object>> rows) {
365 updateCounters(STORAGE_UPDATE_COUNTER_NAME, tableName);
366 updateRowsImpl(tableName, rows);
367 }
368
369 protected abstract void updateRowsImpl(String tableName, List<Map<String,Object>> rows);
370
371 @Override
372 public void updateMatchingRows(String tableName, IPredicate predicate,
373 Map<String, Object> values) {
374 updateCounters(STORAGE_UPDATE_COUNTER_NAME, tableName);
375 updateMatchingRowsImpl(tableName, predicate, values);
376 }
377
378 protected abstract void updateMatchingRowsImpl(String tableName, IPredicate predicate,
379 Map<String, Object> values);
380
381 @Override
382 public void updateRow(String tableName, Object rowKey,
383 Map<String, Object> values) {
384 updateCounters(STORAGE_UPDATE_COUNTER_NAME, tableName);
385 updateRowImpl(tableName, rowKey, values);
386 }
387
388 protected abstract void updateRowImpl(String tableName, Object rowKey,
389 Map<String, Object> values);
390
391 @Override
392 public void updateRow(String tableName, Map<String, Object> values) {
393 updateCounters(STORAGE_UPDATE_COUNTER_NAME, tableName);
394 updateRowImpl(tableName, values);
395 }
396
397 protected abstract void updateRowImpl(String tableName, Map<String, Object> values);
398
399 @Override
400 public void deleteRow(String tableName, Object rowKey) {
401 updateCounters(STORAGE_DELETE_COUNTER_NAME, tableName);
402 deleteRowImpl(tableName, rowKey);
403 }
404
405 protected abstract void deleteRowImpl(String tableName, Object rowKey);
406
407 @Override
408 public void deleteRows(String tableName, Set<Object> rowKeys) {
409 updateCounters(STORAGE_DELETE_COUNTER_NAME, tableName);
410 deleteRowsImpl(tableName, rowKeys);
411 }
412
413 protected abstract void deleteRowsImpl(String tableName, Set<Object> rowKeys);
414
415 @Override
416 public void deleteMatchingRows(String tableName, IPredicate predicate) {
417 IResultSet resultSet = null;
418 try {
419 resultSet = executeQuery(tableName, null, predicate, null);
420 while (resultSet.next()) {
421 resultSet.deleteRow();
422 }
423 resultSet.save();
424 }
425 finally {
426 if (resultSet != null)
427 resultSet.close();
428 }
429 }
430
431 @Override
432 public IResultSet getRow(String tableName, Object rowKey) {
433 updateCounters(STORAGE_QUERY_COUNTER_NAME, tableName);
434 return getRowImpl(tableName, rowKey);
435 }
436
437 protected abstract IResultSet getRowImpl(String tableName, Object rowKey);
438
439 @Override
440 public synchronized void addListener(String tableName, IStorageSourceListener listener) {
441 Set<IStorageSourceListener> tableListeners = listeners.get(tableName);
442 if (tableListeners == null) {
443 tableListeners = new CopyOnWriteArraySet<IStorageSourceListener>();
444 listeners.put(tableName, tableListeners);
445 }
446 tableListeners.add(listener);
447 }
448
449 @Override
450 public synchronized void removeListener(String tableName, IStorageSourceListener listener) {
451 Set<IStorageSourceListener> tableListeners = listeners.get(tableName);
452 if (tableListeners != null) {
453 tableListeners.remove(listener);
454 }
455 }
456
457 @LogMessageDoc(level="ERROR",
458 message="Exception caught handling storage notification",
459 explanation="An unknown error occured while trying to notify" +
460 " storage listeners",
461 recommendation=LogMessageDoc.GENERIC_ACTION)
462 protected synchronized void notifyListeners(StorageSourceNotification notification) {
463 String tableName = notification.getTableName();
464 Set<Object> keys = notification.getKeys();
465 Set<IStorageSourceListener> tableListeners = listeners.get(tableName);
466 if (tableListeners != null) {
467 for (IStorageSourceListener listener : tableListeners) {
468 try {
469 switch (notification.getAction()) {
470 case MODIFY:
471 listener.rowsModified(tableName, keys);
472 break;
473 case DELETE:
474 listener.rowsDeleted(tableName, keys);
475 break;
476 }
477 }
478 catch (Exception e) {
479 logger.error("Exception caught handling storage notification", e);
480 }
481 }
482 }
483 }
484
485 @Override
486 public void notifyListeners(List<StorageSourceNotification> notifications) {
487 for (StorageSourceNotification notification : notifications)
488 notifyListeners(notification);
489 }
490
491 // IFloodlightModule
492
493 @Override
494 public Collection<Class<? extends IFloodlightService>> getModuleServices() {
495 Collection<Class<? extends IFloodlightService>> l =
496 new ArrayList<Class<? extends IFloodlightService>>();
497 l.add(IStorageSourceService.class);
498 return l;
499 }
500
501 @Override
502 public Map<Class<? extends IFloodlightService>,
503 IFloodlightService> getServiceImpls() {
504 Map<Class<? extends IFloodlightService>,
505 IFloodlightService> m =
506 new HashMap<Class<? extends IFloodlightService>,
507 IFloodlightService>();
508 m.put(IStorageSourceService.class, this);
509 return m;
510 }
511
512 @Override
513 public Collection<Class<? extends IFloodlightService>> getModuleDependencies() {
514 Collection<Class<? extends IFloodlightService>> l =
515 new ArrayList<Class<? extends IFloodlightService>>();
516 l.add(IRestApiService.class);
517 l.add(ICounterStoreService.class);
518 return l;
519 }
520
521 @Override
522 public void init(FloodlightModuleContext context)
523 throws FloodlightModuleException {
524 restApi =
525 context.getServiceImpl(IRestApiService.class);
526 counterStore =
527 context.getServiceImpl(ICounterStoreService.class);
528 }
529
530 @Override
531 public void startUp(FloodlightModuleContext context) {
532 restApi.addRestletRoutable(new StorageWebRoutable());
533 }
534}