Add InfluxDB output (#1192)

This commit is contained in:
Daniel Krüger
2019-11-03 11:09:09 +01:00
committed by Christian W. Zuckschwerdt
parent b2ae9af9ba
commit 81fe66005c
6 changed files with 505 additions and 2 deletions

21
include/output_influx.h Normal file
View File

@@ -0,0 +1,21 @@
/** @file
InfluxDB output for rtl_433 events
Copyright (C) 2019 Daniel Krueger
based on output_mqtt.c
Copyright (C) 2019 Christian Zuckschwerdt
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
*/
#ifndef INCLUDE_OUTPUT_INFLUX_H_
#define INCLUDE_OUTPUT_INFLUX_H_
#include "data.h"
struct data_output *data_output_influx_create(char *opts);
#endif /* INCLUDE_OUTPUT_INFLUX_H_ */

View File

@@ -78,6 +78,8 @@ void add_kv_output(struct r_cfg *cfg, char *param);
void add_mqtt_output(struct r_cfg *cfg, char *param);
void add_influx_output(struct r_cfg *cfg, char *param);
void add_syslog_output(struct r_cfg *cfg, char *param);
void add_null_output(struct r_cfg *cfg, char *param);

View File

@@ -15,6 +15,7 @@ add_executable(rtl_433
list.c
mongoose.c
optparse.c
output_influx.c
output_mqtt.c
pulse_demod.c
pulse_detect.c

466
src/output_influx.c Normal file
View File

@@ -0,0 +1,466 @@
/** @file
InfluxDB output for rtl_433 events
Copyright (C) 2019 Daniel Krueger
based on output_mqtt.c
Copyright (C) 2019 Christian Zuckschwerdt
This program is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
*/
// note: our unit header includes unistd.h for gethostname() via data.h
#include "output_influx.h"
#include "optparse.h"
#include "util.h"
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include "mongoose.h"
/* InfluxDB client abstraction / printer */
typedef struct {
struct data_output output;
struct mg_mgr *mgr;
int prev_status;
int prev_resp_code;
char hostname[64];
char url[400];
char extra_headers[150];
int databufidxfill;
struct mbuf databufs[2];
bool transfer_running;
} influx_client_t;
static void influx_client_send(influx_client_t *ctx);
static void influx_client_event(struct mg_connection *nc, int ev, void *ev_data)
{
// note that while shutting down the ctx is NULL
influx_client_t *ctx = (influx_client_t *)nc->mgr->user_data;
struct http_message *hm = (struct http_message *) ev_data;
switch (ev) {
case MG_EV_CONNECT: {
int connect_status = *(int *)ev_data;
if (connect_status != 0) {
// Error, print only once
if (ctx) {
if (ctx->prev_status != connect_status)
fprintf(stderr, "InfluxDB connect error: %s\n", strerror(connect_status));
ctx->transfer_running = false;
}
}
if (ctx)
ctx->prev_status = connect_status;
break;
}
case MG_EV_HTTP_CHUNK: // response is normally empty (so mongoose thinks we received a chunk only)
case MG_EV_HTTP_REPLY:
nc->flags |= MG_F_CLOSE_IMMEDIATELY;
if (hm->resp_code == 204) {
// mark influx data as sent
}
else {
if (ctx && ctx->prev_resp_code != hm->resp_code)
fprintf(stderr, "InfluxDB replied HTTP code: %d with message:\n%s\n", hm->resp_code, hm->body.p);
}
if (ctx) {
ctx->prev_resp_code = hm->resp_code;
}
break;
case MG_EV_CLOSE:
if (ctx) {
ctx->transfer_running = false;
influx_client_send(ctx);
}
break;
}
}
static struct mg_mgr *influx_client_init(influx_client_t *ctx, char const *url, char const *token)
{
struct mg_mgr *mgr = calloc(1, sizeof(*mgr));
if (!mgr) {
fprintf(stderr, "calloc() failed in %s() %s:%d\n", __func__, __FILE__, __LINE__);
exit(1);
}
strncpy(ctx->url, url, sizeof(ctx->url) - 1);
snprintf(ctx->extra_headers, sizeof (ctx->extra_headers), "Authorization: Token %s\r\n", token);
mg_mgr_init(mgr, ctx);
return mgr;
}
static int influx_client_poll(struct mg_mgr *mgr)
{
return mg_mgr_poll(mgr, 0);
}
static void influx_client_send(influx_client_t *ctx)
{
struct mbuf *buf = &ctx->databufs[ctx->databufidxfill];
/*fprintf(stderr, "Influx %p msg: \"%s\" with %lu/%lu %s\n",
(void*)ctx, buf->buf, buf->len, buf->size,
ctx->transfer_running ? "buffering" : "to be sent");*/
if (ctx->transfer_running || !buf->len)
return;
if (mg_connect_http(ctx->mgr, influx_client_event, ctx->url, ctx->extra_headers, buf->buf) == NULL) {
fprintf(stderr, "Connect to InfluxDB (%s) failed\n", ctx->url);
}
else {
ctx->databufidxfill ^= 1;
ctx->transfer_running = true;
buf->len = 0;
*buf->buf = '\0';
}
}
/* Helper */
/// clean the tag/identifier inplace to [-.A-Za-z0-9], esp. not whitespace, =, comma and replace any leading _ by x
static char *influx_sanitize_tag(char *tag, char *end)
{
for (char *p = tag; *p && p!=end; ++p)
if (*p != '-' && *p != '.' && (*p < 'A' || *p > 'Z') && (*p < 'a' || *p > 'z') && (*p < '0' || *p > '9'))
*p = '_';
for (char *p = tag; *p && p!=end; ++p)
if (*p == '_')
*p = 'x';
else
break;
return tag;
}
/// reserve additional space of size len at end of mbuf a; returns actual free space
static size_t mbuf_reserve(struct mbuf *a, size_t len)
{
// insert undefined values at end of current buffer (it will increase the buffer if necessary)
len = mbuf_insert(a, a->len, NULL, len);
// reduce the buffer length again by actual inserted number of bytes
a->len -= len;
len = a->size - a->len;
if (len)
a->buf[a->len] = '\0';
return len;
}
static char* mbuf_snprintf(struct mbuf *a, const char* format, ...)
{
char *str = &a->buf[a->len];
int size;
va_list ap;
va_start(ap, format);
size = vsnprintf(str, a->size - a->len, format, ap);
va_end(ap);
if (size > 0) {
// vsnprintf might return size larger than actually filled
size = strlen(str);
a->len += size;
}
return str;
}
static void mbuf_remove_part(struct mbuf *a, char* pos, size_t len)
{
if (pos >= a->buf && pos < &a->buf[a->len] && &pos[len] <= &a->buf[a->len]) {
memmove(pos, &pos[len], a->len - (pos - a->buf) - len);
a->len -= len;
}
}
static void print_influx_array(data_output_t *output, data_array_t *array, char *format)
{
influx_client_t *influx = (influx_client_t *)output;
struct mbuf *buf = &influx->databufs[influx->databufidxfill];
mbuf_snprintf(buf, "\"array\""); // TODO
}
static void print_influx_data_escaped(data_output_t *output, data_t *data, char *format)
{
influx_client_t *influx = (influx_client_t *)output;
struct mbuf *buf = &influx->databufs[influx->databufidxfill];
char str[1000];
size_t size = data_print_jsons(data, str, sizeof (str));
output->print_string(output, str, format);
}
static void print_influx_string_escaped(data_output_t *output, char const *str, char *format)
{
influx_client_t *influx = (influx_client_t *)output;
struct mbuf *databuf = &influx->databufs[influx->databufidxfill];
size_t size = databuf->size - databuf->len;
char* buf = &databuf->buf[databuf->len];
if (size < strlen(str) + 3) {
return;
}
*buf++ = '"';
size--;
for (; *str && size >= 3; ++str) {
if (*str == '"' || *str == '\\') {
*buf++ = '\\';
size--;
}
*buf++ = *str;
size--;
}
if (size >= 2) {
*buf++ = '"';
size--;
}
*buf = '\0';
databuf->len = databuf->size - size;
}
static void print_influx_string(data_output_t *output, char const *str, char *format)
{
influx_client_t *influx = (influx_client_t *)output;
struct mbuf *buf = &influx->databufs[influx->databufidxfill];
mbuf_snprintf(buf, "%s", str);
}
// Generate InfluxDB line protocol
static void print_influx_data(data_output_t *output, data_t *data, char *format)
{
influx_client_t *influx = (influx_client_t *)output;
char *str;
char *end;
struct mbuf *buf = &influx->databufs[influx->databufidxfill];
bool comma = false;
data_t *data_org = data;
data_t *data_model = NULL;
data_t *data_time = NULL;
for (data_t *d = data; d; d = d->next) {
if (!strcmp(d->key, "model"))
data_model = d;
if (!strcmp(d->key, "time"))
data_time = d;
}
if (!data_model) {
// data isn't from device (maybe report for example)
// use hostname for measurement
mbuf_reserve(buf, 20000);
mbuf_snprintf(buf, "rtl_433_%s", influx->hostname);
}
else {
// use model for measurement
mbuf_reserve(buf, 1000);
str = &buf->buf[buf->len];
print_value(output, data_model->type, data_model->value, data_model->format);
influx_sanitize_tag(str, NULL);
}
// write tags
while (data) {
if (!strcmp(data->key, "model")
|| !strcmp(data->key, "time")) {
// skip
}
else if (!strcmp(data->key, "brand")
|| !strcmp(data->key, "type")
|| !strcmp(data->key, "subtype")
|| !strcmp(data->key, "id")
|| !strcmp(data->key, "channel")) {
str = mbuf_snprintf(buf, ",%s=", data->key);
str++;
end = &buf->buf[buf->len - 1];
influx_sanitize_tag(str, end);
str = end + 1;
print_value(output, data->type, data->value, data->format);
influx_sanitize_tag(str, NULL);
}
data = data->next;
}
mbuf_snprintf(buf, " ");
// activate escaped output functions
influx->output.print_data = print_influx_data_escaped;
influx->output.print_string = print_influx_string_escaped;
// write fields
data = data_org;
while (data) {
if (!strcmp(data->key, "model")
|| !strcmp(data->key, "time")) {
// skip
}
else if (!strcmp(data->key, "brand")
|| !strcmp(data->key, "type")
|| !strcmp(data->key, "subtype")
|| !strcmp(data->key, "id")
|| !strcmp(data->key, "channel")) {
// skip
}
else {
str = mbuf_snprintf(buf, comma ? ",%s=" : "%s=", data->key);
if (comma)
str++;
end = &buf->buf[buf->len - 1];
influx_sanitize_tag(str, end);
str = end + 1;
print_value(output, data->type, data->value, data->format);
comma = true;
}
data = data->next;
}
// restore original output functions
influx->output.print_data = print_influx_data;
influx->output.print_string = print_influx_string;
// write time if available
if (data_time) {
str = mbuf_snprintf(buf, " ");
print_value(output, data_time->type, data_time->value, data_time->format);
if (str[1] == '@' // relative time format configured
|| str[11] == ' ' // date time format configured
|| str[11] == 'T') { // ISO date time format configured
// -> bad, because InfluxDB doesn't under stand those formats -> remove timestamp
buf->len = str - buf->buf;
}
else if ((str = strchr(str, '.'))) {
// unix usec timestamp format configured
mbuf_remove_part(buf, str, 1);
mbuf_snprintf(buf, "000");
}
else {
// unix timestamp with seconds resolution configured
mbuf_snprintf(buf, "000000000");
}
}
mbuf_snprintf(buf, "\n");
influx_client_send(influx);
}
static void print_influx_double(data_output_t *output, double data, char *format)
{
influx_client_t *influx = (influx_client_t *)output;
struct mbuf *buf = &influx->databufs[influx->databufidxfill];
mbuf_snprintf(buf, "%f", data);
}
static void print_influx_int(data_output_t *output, int data, char *format)
{
influx_client_t *influx = (influx_client_t *)output;
struct mbuf *buf = &influx->databufs[influx->databufidxfill];
mbuf_snprintf(buf, "%d", data);
}
static void data_output_influx_poll(data_output_t *output)
{
influx_client_t *influx = (influx_client_t *)output;
if (!influx)
return;
influx_client_poll(influx->mgr);
}
static void data_output_influx_free(data_output_t *output)
{
influx_client_t *influx = (influx_client_t *)output;
if (!influx)
return;
influx->mgr->user_data = NULL;
mg_mgr_free(influx->mgr);
free(influx);
}
struct data_output *data_output_influx_create(char *opts)
{
influx_client_t *influx = calloc(1, sizeof(influx_client_t));
if (!influx) {
fprintf(stderr, "calloc() failed in %s() %s:%d\n", __func__, __FILE__, __LINE__);
exit(1);
}
gethostname(influx->hostname, sizeof(influx->hostname) - 1);
influx->hostname[sizeof(influx->hostname) - 1] = '\0';
// only use hostname, not domain part
char *dot = strchr(influx->hostname, '.');
if (dot)
*dot = '\0';
influx_sanitize_tag(influx->hostname, NULL);
char *token = NULL;
// param/opts starts with URL
char *url = opts;
opts = strchr(opts, ',');
if (opts) {
*opts = '\0';
opts++;
}
if (strncmp(url, "influx", 6) == 0) {
url += 2;
memcpy(url, "http", 4);
}
// check if valid URL has been provided
struct mg_str host, path, query;
if (mg_parse_uri(mg_mk_str(url), NULL, NULL, &host, NULL, &path,
&query, NULL) != 0
|| !host.len || !path.len || !query.len) {
fprintf(stderr, "Invalid URL to InfluxDB specified.%s%s%s\n"
"Something like \"influx://<host>/write?org=<org>&bucket=<bucket>\" required at least.\n",
!host.len ? " No host specified." : "",
!path.len ? " No path component specified." : "",
!query.len ? " No query parameters specified." : "");
exit(1);
}
// parse auth and format options
char *key, *val;
while (getkwargs(&opts, &key, &val)) {
key = remove_ws(key);
val = trim_ws(val);
if (!key || !*key)
continue;
else if (!strcasecmp(key, "t") || !strcasecmp(key, "token"))
token = val;
else {
fprintf(stderr, "Invalid key \"%s\" option.\n", key);
exit(1);
}
}
influx->output.print_data = print_influx_data;
influx->output.print_array = print_influx_array;
influx->output.print_string = print_influx_string;
influx->output.print_double = print_influx_double;
influx->output.print_int = print_influx_int;
influx->output.output_poll = data_output_influx_poll;
influx->output.output_free = data_output_influx_free;
fprintf(stderr, "Publishing data to InfluxDB (%s)\n", url);
influx->mgr = influx_client_init(influx, url, token);
return &influx->output;
}

View File

@@ -27,6 +27,7 @@
#include "list.h"
#include "optparse.h"
#include "output_mqtt.h"
#include "output_influx.h"
#include "compat_time.h"
#include "fatal.h"
@@ -849,6 +850,11 @@ void add_mqtt_output(r_cfg_t *cfg, char *param)
list_push(&cfg->output_handler, data_output_mqtt_create(host, port, opts, cfg->dev_query));
}
void add_influx_output(r_cfg_t *cfg, char *param)
{
list_push(&cfg->output_handler, data_output_influx_create(param));
}
void add_syslog_output(r_cfg_t *cfg, char *param)
{
char *host = "localhost";

View File

@@ -115,7 +115,7 @@ static void usage(int exit_code)
" [-w <filename> | help] Save data stream to output file (a '-' dumps samples to stdout)\n"
" [-W <filename> | help] Save data stream to output file, overwrite existing file\n"
"\t\t= Data output options =\n"
" [-F kv | json | csv | mqtt | syslog | null | help] Produce decoded output in given format.\n"
" [-F kv | json | csv | mqtt | influx | syslog | null | help] Produce decoded output in given format.\n"
" Append output to file with :<filename> (e.g. -F csv:log.csv), defaults to stdout.\n"
" Specify host/port for syslog with e.g. -F syslog:127.0.0.1:1514\n"
" [-M time[:<options>] | protocol | level | stats | bits | help] Add various meta data to each output.\n"
@@ -186,7 +186,7 @@ static void help_output(void)
{
term_help_printf(
"\t\t= Output format option =\n"
" [-F kv|json|csv|mqtt|syslog|null] Produce decoded output in given format.\n"
" [-F kv|json|csv|mqtt|influx|syslog|null] Produce decoded output in given format.\n"
"\tWithout this option the default is KV output. Use \"-F null\" to remove the default.\n"
"\tAppend output to file with :<filename> (e.g. -F csv:log.csv), defaults to stdout.\n"
"\tSpecify MQTT server with e.g. -F mqtt://localhost:1883\n"
@@ -198,6 +198,9 @@ static void help_output(void)
"\t devices: posts device and sensor info in nested topics\n"
"\tThe topic string will expand keys like [/model]\n"
"\tE.g. -F \"mqtt://localhost:1883,user=USERNAME,pass=PASSWORD,retain=0,devices=rtl_433[/id]\"\n"
"\tSpecify InfluxDB 2.0 server with e.g. -F \"influx://localhost:9999/api/v2/write?org=<org>&bucket=<bucket>,token=<authtoken>\"\n"
"\tSpecify InfluxDB 1.x server with e.g. -F \"influx://localhost:8086/write?db=<db>&p=<password>&u=<user>\"\n"
"\t Additional parameter -M time:unix:usec:utc for correct timestamps in InfluxDB recommended\n"
"\tSpecify host/port for syslog with e.g. -F syslog:127.0.0.1:1514\n");
exit(0);
}
@@ -973,6 +976,10 @@ static void parse_conf_option(r_cfg_t *cfg, int opt, char *arg)
else if (strncmp(arg, "mqtt", 4) == 0) {
add_mqtt_output(cfg, arg_param(arg));
}
else if (strncmp(arg, "http", 4) == 0
|| strncmp(arg, "influx", 6) == 0) {
add_influx_output(cfg, arg);
}
else if (strncmp(arg, "syslog", 6) == 0) {
add_syslog_output(cfg, arg_param(arg));
}