Skip to main content

Плагіни Фільтрів

Плагіни фільтрів надають механізм для зміни потоку даних під час його проходження через екземпляр fledge, фільтри можуть бути застосовані на півдні або півночі мікросервісів і можуть утворювати трубопровід з декількох елементів обробки через які проходять потоки даних. Фільтри, застосовані у південному сервісі, будуть лише оброблятимуть лише ті дані, які отримує південний сервіс, в той час як фільтри, розміщені на півночі, будуть обробляти всі дані, які виходять з північного інтерфейсу.

Фільтри можуть;

  • доповнювати дані, додаючи до них статичні метадані або обчислені значення
  • видаляти дані з потоку
  • додавати дані до потоку
  • змінювати дані в потоці

Слід зазначити, що існують деякі альтернативи створенню фільтра якщо ви хочете внести прості зміни до потоку даних. Існує ряд існуючих фільтрів, які забезпечують певну ступінь програмованості. До них відносяться , який дозволяє застосувати довільну математичну формулу до даних, або , який дозволяє невеликий вбудований скрипт на Python, що можна застосувати до даних.

Плагіни фільтрів можуть бути написані на C++ або Python і мають дуже простий інтерфейс. Механізм плагіна і підмножина API є спільним для всіх типів плагінів, включаючи фільтри.

Конфігурація

Фільтри використовують той самий механізм конфігурації, що і решта Fledge, використовуючи JSON-документ для опису параметрів конфігурації. Як і у випадку з будь-яким іншим плагіном, структура визначається плагіном і отримується через точку входу plugin_info. Потім вона зіставляється з базою даних для передачі налаштованих значень до точки входу plugin_init.

API Плагіну Фільтру на C++

API фільтра складається з невеликої кількості точок входу до функцій C, вони викликаються у строгому порядку і базуються на однаковому наборі спільних API для всіх плагінів Fledge.

Информація про плагін

Точка входу plugin_info є першою точкою входу, яка викликається у плагіні фільтра і повертає інформаційну структуру плагіна. Це той самий виклик, який має підтримувати кожен плагін Fledge, і який використовується для визначення типу плагіна і категорії конфігурації за замовчуванням для плагіна.

Типова реалізація plugin_info просто повертає вказівник на статичну структуру PLUGIN_INFORMATION.

PLUGIN_INFORMATION *plugin_info()
{
return &info;
}

Ініціалізація плагіна

Точка входу plugin_init викликається після виклику plugin_info і до того, як будь-які дані будуть передані фільтру. Вона викликається на етапі, коли сервіс налаштовує трубопровід фільтру і надає фільтру категорію конфігурації, яка тепер містить введені користувачем значення і місце призначення, куди фільтр буде надсилати вихідні дані.

PLUGIN_HANDLE plugin_init(ConfigCategory* config,
OUTPUT_HANDLE *outHandle,
OUTPUT_STREAM output)
{
}

Параметр config - це категорія конфігурації з наданими користувачем значеннями, outHandle - дескриптор для наступного фільтра в ланцюжку, і output - вказівник на функцію, яку слід викликати для надсилання даних до наступного фільтра у ланцюжку. Аргументи outHandle та output слід зберігати для подальшого використання у plugin_inest, коли дані будуть пересилатимуться у трубопроводі.

Функція plugin_init повертає дескриптор, який буде передано усім наступним викликам плагіна. Цей дескриптор можна використовувати для зберігання стану, який потрібно передати між викликами. Зазвичай виклик plugin_init створює клас C++, який реалізує фільтр, і повертає точку на екземпляр як дескриптор. Потім екземпляр можна використовувати для зберігання стану фільтра, включно з дескриптором виводу та функцією зворотного виклику, яку буде потрібно використати.

Класи фільтрів також можна використовувати для буферизації даних між викликами plugin_ingest, що дозволяє фільтру відкласти обробку даних доти, доки він не матиме достатньої кількості буферизованих даних.

Інтегрування плагіна

Точка входу plugin_inest є робочою конячкою фільтра, вона викликається з наборами показань для обробки, а потім передає новий набір даних до наступного фільтру у трубопроводі. Процес передачі даних наступному фільтру даних до наступного фільтра відбувається через вказівник функції OUTPUT_STREAM. Фільтр не зобов'язаний виводити дані кожного разу, коли він отримує дані, він може не виводити жодних даних або виводити більше чи менше даних, ніж було отримано.

void plugin_ingest(PLUGIN_HANDLE *handle,
READINGSET *readingSet)
{
}

Кількість зчитувань, з якими викликається фільтр, буде залежати від середовища, у якому він працює, і від того, що було отримано попередніми фільтрами у трубопроводі. Фільтр, який вимагає певного розміру вибірки для обробки результату, слід підготувати для буферизації даних під час декількох викликів plugin_ingest. Кілька прикладів фільтрів доступні для ознайомлення.

Виклик plugin_ingest може надсилати дані далі у трубопроводі фільтрів використовуючи збережені параметри output і outHandle, передані до plugin_init.

(*output)(outHandle, readings);

Переналаштування плагіна

Як і для інших типів плагінів, фільтр може бути переналаштований під час його роботи. Коли виконується операція реконфігурації, буде викликано метод plugin_reconfigure із новою конфігурацією для фільтра.

void plugin_reconfigure(PLUGIN_HANDLE *handle, const std::string& newConfig)
{
}

Закриття плагіна

Як і в інших плагінах, існує виклик завершення роботи, який може використовуватися плагіном для виконання будь-якого очищення, необхідного під час вимкнення фільтра.

void plugin_shutdown(PLUGIN_HANDLE *handle)
{
}

C++ Допоміжний Класс

Очікується, що фільтри будуть написані як класи C++, а дескриптор плагіна буде використовуватися як механізм для зберігання і передачі вказівника на екземпляр класу фільтру. Для того, щоб полегшити написання фільтрів, було надано базовий клас FledgeFilter, рекомендується створювати ваш конкретний клас фільтра з цього базового класу, щоб спростити реалізацію

class FledgeFilter {
public:
FledgeFilter(const std::string& filterName,
ConfigCategory& filterConfig,
OUTPUT_HANDLE *outHandle,
OUTPUT_STREAM output);
~FledgeFilter() {};
const std::string&
getName() const { return m_name; };
bool isEnabled() const { return m_enabled; };
ConfigCategory& getConfig() { return m_config; };
void disableFilter() { m_enabled = false; };
void setConfig(const std::string& newConfig);
public:
OUTPUT_HANDLE* m_data;
OUTPUT_STREAM m_func;
protected:
std::string m_name;
ConfigCategory m_config;
bool m_enabled;
};

Приклад Фільтра C++

Наступний приклад є простим прикладом обробки даних. Він застосовує функцію log() до числових даних у потоці даних

Интерфейс Плагіна

Більшість плагінів, написаних на C++, мають вихідний файл, який інкапсулює C API до плагіна, він традиційно називається plugin.cpp. Приклад плагіна слідує цій моделі зі змістом plugin.cpp, показаним нижче.

Перша секція включає клас filter, який є фактичною реалізацією логіки фільтрації і визначає категорію конфігурації JSON. Тут використовується макрос QUOTE для того, щоб зробити визначення JSON більш читабельним.

/*
* Fledge "log" filter plugin.
*
* Copyright (c) 2020 Dianomic Systems
*
* Released under the Apache 2.0 Licence
*
* Author: Mark Riddoch
*/

#include <logFilter.h>
#include <version.h>

#define FILTER_NAME "log"
const static char *default_config = QUOTE({
"plugin" : {
"description" : "Log filter plugin",
"type" : "string",
"default" : FILTER_NAME,
"readonly": "true"
},
"enable": {
"description": "A switch that can be used to enable or disable execution of the log filter.",
"type": "boolean",
"displayName": "Enabled",
"default": "false"
},
"match" : {
"description" : "An optional regular expression to match in the asset name.",
"type": "string",
"default": "",
"order": "1",
"displayName": "Asset filter"}
});

using namespace std;

Потім ми визначаємо вміст інформації плагіна, який повертатиметься викликом plugin_info.

/**
* The Filter plugin interface
*/
extern "C" {

/**
* The plugin information structure
*/
static PLUGIN_INFORMATION info = {
FILTER_NAME, // Name
VERSION, // Version
0, // Flags
PLUGIN_TYPE_FILTER, // Type
"1.0.0", // Interface version
default_config // Default plugin configuration
};

Останній розділ цього файлу складається з самих точок входу та реалізації. Більшість із них складається із викликів класу LogFilter, який у цьому випадку реалізує логіку фільтра.

/**
* Return the information about this plugin
*/
PLUGIN_INFORMATION *plugin_info()
{
return &info;
}

/**
* Initialise the plugin, called to get the plugin handle.
* We merely create an instance of our LogFilter class
*
* @param config The configuration category for the filter
* @param outHandle A handle that will be passed to the output stream
* @param output The output stream (function pointer) to which data is passed
* @return An opaque handle that is used in all subsequent calls to the plugin
*/
PLUGIN_HANDLE plugin_init(ConfigCategory* config,
OUTPUT_HANDLE *outHandle,
OUTPUT_STREAM output)
{
LogFilter *log = new LogFilter(FILTER_NAME,
*config,
outHandle,
output);

return (PLUGIN_HANDLE)log;
}

/**
* Ingest a set of readings into the plugin for processing
*
* @param handle The plugin handle returned from plugin_init
* @param readingSet The readings to process
*/
void plugin_ingest(PLUGIN_HANDLE *handle,
READINGSET *readingSet)
{
LogFilter *log = (LogFilter *) handle;
log->ingest(readingSet);
}

/**
* Plugin reconfiguration method
*
* @param handle The plugin handle
* @param newConfig The updated configuration
*/
void plugin_reconfigure(PLUGIN_HANDLE *handle, const std::string& newConfig)
{
LogFilter *log = (LogFilter *)handle;
log->reconfigure(newConfig);
}

/**
* Call the shutdown method in the plugin
*/
void plugin_shutdown(PLUGIN_HANDLE *handle)
{
LogFilter *log = (LogFilter *) handle;
delete log;
}

// End of extern "C"
};

Класс Фільтру

Незважаючи на те, що це не є обов’язковим, хороша практика інкапсулювати ім’я для входу в фільтр у класі, ці класи є похідними від класу FledgeFilter.

#ifndef _LOG_FILTER_H
#define _LOG_FILTER_H
/*
* Fledge "Log" filter plugin.
*
* Copyright (c) 2020 Dianomic Systems
*
* Released under the Apache 2.0 Licence
*
* Author: Mark Riddoch
*/
#include <filter.h>
#include <reading_set.h>
#include <config_category.h>
#include <string>
#include <logger.h>
#include <mutex>
#include <regex>
#include <math.h>


/**
* Convert the incoming data to use a logarithmic scale
*/
class LogFilter : public FledgeFilter {
public:
LogFilter(const std::string& filterName,
ConfigCategory& filterConfig,
OUTPUT_HANDLE *outHandle,
OUTPUT_STREAM output);
~LogFilter();
void ingest(READINGSET *readingSet);
void reconfigure(const std::string& newConfig);
private:
void handleConfig(ConfigCategory& config);
std::string m_match;
std::regex *m_regex;
std::mutex m_configMutex;
};


#endif

Реалізація класу фільтра

Нижче наведено код, який реалізує логіку фільтра

/*
* Fledge "Log" filter plugin.
*
* Copyright (c) 2020 Dianomic Systems
*
* Released under the Apache 2.0 Licence
*
* Author: Mark Riddoch
*/
#include <logFilter.h>

using namespace std;

/**
* Constructor for the LogFilter.
*
* We call the constructor of the base class and handle the initial
* configuration of the filter.
*
* @param filterName The name of the filter
* @param filterConfig The configuration category for this filter
* @param outHandle The handle of the next filter in the chain
* @param output A function pointer to call to output data to the next filter
*/
LogFilter::LogFilter(const std::string& filterName,
ConfigCategory& filterConfig,
OUTPUT_HANDLE *outHandle,
OUTPUT_STREAM output) : m_regex(NULL),
FledgeFilter(filterName, filterConfig, outHandle, output)
{
handleConfig(filterConfig);
}

/**
* Destructor for this filter class
*/
LogFilter::~LogFilter()
{
if (m_regex)
delete m_regex;
}

/**
* The actual filtering code
*
* @param readingSet The reading data to filter
*/
void
LogFilter::ingest(READINGSET *readingSet)
{
lock_guard<mutex> guard(m_configMutex);

if (isEnabled()) // Filter enable, process the readings
{
const vector<Reading *>& readings = ((ReadingSet *)readingSet)->getAllReadings();
for (vector<Reading *>::const_iterator elem = readings.begin();
elem != readings.end(); ++elem)
{
// If we set a matching regex then compare to the name of this asset
if (!m_match.empty())
{
string asset = (*elem)->getAssetName();
if (!regex_match(asset, *m_regex))
{
continue;
}
}

// We are modifying this asset so put an entry in the asset tracker
AssetTracker::getAssetTracker()->addAssetTrackingTuple(getName(), (*elem)->getAssetName(), string("Filter"));

// Get a reading DataPoints
const vector<Datapoint *>& dataPoints = (*elem)->getReadingData();

// Iterate over the datapoints
for (vector<Datapoint *>::const_iterator it = dataPoints.begin(); it != dataPoints.end(); ++it)
{
// Get the reference to a DataPointValue
DatapointValue& value = (*it)->getData();

/*
* Deal with the T_INTEGER and T_FLOAT types.
* Try to preserve the type if possible but
* if a floating point log function is applied
* then T_INTEGER values will turn into T_FLOAT.
* If the value is zero we do not apply the log function
*/
if (value.getType() == DatapointValue::T_INTEGER)
{
long ival = value.toInt();
if (ival != 0)
{
double newValue = log((double)ival);
value.setValue(newValue);
}
}
else if (value.getType() == DatapointValue::T_FLOAT)
{
double dval = value.toDouble();
if (dval != 0.0)
{
value.setValue(log(dval));
}
}
else
{
// do nothing for other types
}
}
}
}

// Pass on all readings in this case
(*m_func)(m_data, readingSet);
}

/**
* Reconfiguration entry point to the filter.
*
* This method runs holding the configMutex to prevent
* ingest using the regex class that may be destroyed by this
* call.
*
* Pass the configuration to the base FilterPlugin class and
* then call the private method to handle the filter specific
* configuration.
*
* @param newConfig The JSON of the new configuration
*/
void
LogFilter::reconfigure(const std::string& newConfig)
{
lock_guard<mutex> guard(m_configMutex);
setConfig(newConfig); // Pass the configuration to the base class
handleConfig(m_config);
}

/**
* Handle the filter specific configuration. In this case
* it is just the single item "match" that is a regex
* expression
*
* @param config The configuration category
*/
void
LogFilter::handleConfig(ConfigCategory& config)
{
if (config.itemExists("match"))
{
m_match = config.getValue("match");
if (m_regex)
delete m_regex;
m_regex = new regex(m_match);
}
}

API Фільтра Python

Фільтри також можуть бути написані на Python, API дуже схожий на фільтр C++ і складається з того самого набору точок входу.

Інформація про плагін

Як і у випадку з фільтрами C++, це перша точка входу, яка викликається, вона повертає словник Python, який описує фільтр.

def plugin_info():
""" Returns information about the plugin
Args:
Returns:
dict: plugin information
Raises:
"""

Ініціалізація плагіна

Виклик plugin_init використовується для передачі розв’язаної конфігурації до модуля, а також для передачі дескриптора наступного фільтра в трубопроводі та зворотного виклику, який слід викликати з вихідними даними фільтра.

def plugin_init(config, ingest_ref, callback):
""" Initialise the plugin
Args:
config: JSON configuration document for the Filter plugin configuration category
ingest_ref: filter ingest reference
callback: filter callback
Returns:
data: JSON object to be used in future calls to the plugin
Raises:
"""

Завантаження плагіна

Метод plugin_ingest використовується для передачі даних у плагін, потім плагін обробить ці дані та викличе зворотний виклик, який було передано в точку входу plugin_init з дескриптором ingest_ref і даними для надсилання по трубопроводу фільтра.

def plugin_ingest(handle, data):
""" Modify readings data and pass it onward

Args:
handle: handle returned by the plugin initialisation call
data: readings data
"""

data впорядковано як масив словників Python, кожен з яких є Reading. Зазвичай дані можуть бути оброблені шляхом обходу масиву

for elem in data:
process(elem)

Переналаштування плагіна

Точка входу plugin_reconfigure викликається щоразу, коли відбувається зміна конфігурації для категорії конфігурації фільтрів.

def plugin_reconfigure(handle, new_config):
""" Reconfigures the plugin

Args:
handle: handle returned by the plugin initialisation call
new_config: JSON object representing the new configuration category for the category
Returns:
new_handle: new handle to be used in the future calls
"""

Закриття Плагіну

Викликається, коли плагін потрібно вимкнути, щоб дозволити йому виконувати будь-які операції очищення.

def plugin_shutdown(handle):
""" Shutdowns the plugin doing required cleanup.

Args:
handle: handle returned by the plugin initialisation call
Returns:
plugin shutdown
"""

Приклад Фільтру Python

Нижче наведено приклад фільтра Python, який обчислює експоненціальне ковзне середнє.

# -*- coding: utf-8 -*-

# Fledge_BEGIN
# See: http://fledge-iot.readthedocs.io/
# Fledge_END

""" Module for EMA filter plugin

Generate Exponential Moving Average
The rate value (x) allows to include x% of current value
and (100-x)% of history
A datapoint called 'ema' is added to each reading being filtered
"""

import time
import copy
import logging

from fledge.common import logger
import filter_ingest

__author__ = "Massimiliano Pinto"
__copyright__ = "Copyright (c) 2022 Dianomic Systems Inc."
__license__ = "Apache 2.0"
__version__ = "${VERSION}"

_LOGGER = logger.setup(__name__, level = logging.INFO)

PLUGIN_NAME = 'ema'

_DEFAULT_CONFIG = {
'plugin': {
'description': 'Exponential Moving Average filter plugin',
'type': 'string',
'default': PLUGIN_NAME,
'readonly': 'true'
},
'enable': {
'description': 'Enable ema plugin',
'type': 'boolean',
'default': 'false',
'displayName': 'Enabled',
'order': "3"
},
'rate': {
'description': 'Rate value: include % of current value',
'type': 'float',
'default': '0.07',
'displayName': 'Rate',
'order': "2"
},
'datapoint': {
'description': 'Datapoint name for calculated ema value',
'type': 'string',
'default': PLUGIN_NAME,
'displayName': 'EMA datapoint',
'order': "1"
}
}


def compute_ema(handle, reading):
""" Compute EMA

Args:
A reading data
"""
rate = float(handle['rate']['value'])
for attribute in list(reading):
if not handle['latest']:
handle['latest'] = reading[attribute]
handle['latest'] = reading[attribute] * rate + handle['latest'] * (1 - rate)
reading[handle['datapoint']['value']] = handle['latest']


def plugin_info():
""" Returns information about the plugin
Args:
Returns:
dict: plugin information
Raises:
"""
return {
'name': PLUGIN_NAME,
'version': '1.9.2',
'mode': 'none',
'type': 'filter',
'interface': '1.0',
'config': _DEFAULT_CONFIG
}


def plugin_init(config, ingest_ref, callback):
""" Initialise the plugin
Args:
config: JSON configuration document for the Filter plugin configuration category
ingest_ref: filter ingest reference
callback: filter callback
Returns:
data: JSON object to be used in future calls to the plugin
Raises:
"""
_config = copy.deepcopy(config)
_config['ingestRef'] = ingest_ref
_config['callback'] = callback
_config['latest'] = None
_config['shutdownInProgress'] = False
return _config


def plugin_reconfigure(handle, new_config):
""" Reconfigures the plugin

Args:
handle: handle returned by the plugin initialisation call
new_config: JSON object representing the new configuration category for the category
Returns:
new_handle: new handle to be used in the future calls
"""
_LOGGER.info("Old config for ema plugin {} \n new config {}".format(handle, new_config))

new_handle = copy.deepcopy(new_config)
new_handle['shutdownInProgress'] = False
new_handle['latest'] = None
new_handle['ingestRef'] = handle['ingestRef']
new_handle['callback'] = handle['callback']
return new_handle


def plugin_shutdown(handle):
""" Shutdowns the plugin doing required cleanup.

Args:
handle: handle returned by the plugin initialisation call
Returns:
plugin shutdown
"""
handle['shutdownInProgress'] = True
time.sleep(1)
handle['callback'] = None
handle['ingestRef'] = None
handle['latest'] = None

_LOGGER.info('{} filter plugin shutdown.'.format(PLUGIN_NAME))


def plugin_ingest(handle, data):
""" Modify readings data and pass it onward

Args:
handle: handle returned by the plugin initialisation call
data: readings data
"""
if handle['shutdownInProgress']:
return

if handle['enable']['value'] == 'false':
# Filter not enabled, just pass data onwards
filter_ingest.filter_ingest_callback(handle['callback'], handle['ingestRef'], data)
return

# Filter is enabled: compute EMA for each reading
for elem in data:
compute_ema(handle, elem['readings'])

# Pass data onwards
filter_ingest.filter_ingest_callback(handle['callback'], handle['ingestRef'], data)

_LOGGER.debug("{} filter_ingest done.".format(PLUGIN_NAME))