Apache Kafka vs. MySQL

Im Moment arbeite ich an der Anbindung des Sniffers, welchen ich früher kurz beschrieben habe, an eine MySQL-Datenbank um die aufgezeichneten Nachrichten eine Weile lang (hier für immer) speichern zu können. Ziel dessen soll es sein die Nachrichten im Anschluss zu verarbeiten, wenn der / die Sniffer eine Weile lang gelaufen sind.

Zunächst war die Idee einen zentralen Nachrichtenbroker namens Kafka zu verwenden (siehe [1]). An diesen Server, der im Netzwerk gespiegelt oder mehrfach auftreten kann, werden Nachrichten geschickt, die später wieder ausgelesen werden können. Kafka vergisst nach einer gewissen Zeit alte Nachrichten (was nicht immer schlecht sein muss, vor allem dann, wenn es eigentlich darum geht, auf halbwegs aktuellen Informationen weiterzuarbeiten). Für mich störend sind allerdings einige Sachen am Kafka: Zum einen stört mich, dass Kafka und das offenbar darunter liegende Zookeeper auf der Java-VM laufen. Ich kann einfach nicht glauben, dass das schnell genug arbeiten können soll um viele Clienten abhandeln zu können. Außerdem ist dezent störend, dass Kafka keine Anfragensprache wie SQL kann. Für meine Auswertung wird es notwendig im Kontext einer gefundenen Nachricht bspw. die Antwort auf die Nachricht zu finden, was intuitiv mit MySQL durch die SQL-Abfragensprache einfacher zu realisieren ist. Allerdings fällt mir beim Schreiben des Textes auf, dass Kafka dazu sehr wohl in der Lage wäre, und zwar durch den Offset - so kann man Kafka anweisen an einer bestimmten "Stelle" die Nachricht zu liefern. Diese Stelle ist für den Konsumenten beliebig, und kann im Programm verändert werden. So könnte man solange lesen, bis schließlich ein Filterkriterium nicht mehr stimmt (bspw. Timestamp).

Die eigentliche Idee, warum MySQL also zum Einsatz kommen soll, ist, dass es eine Abfragensprache (SQL) unterstützt, mit deren Hilfe ich mir einfache Abfragen erhoffe und damit ein einfaches Gewinnen an Informationen ohne selbst unnötig viele Daten wegwerfen zu müssen bzw. selbst aussortieren zu müssen, was, wie ich denke, Einfluss auf den Verarbeitungsprozess der Daten haben könnte.

Für die Anbindung von MySQL verwende ich den MySQL++-Wrapper (siehe [2]). Dieser objektorientierte Wrapper hat die Eigenschaft etwa so einfach zu bedienen zu sein, als hätte ich das entwickelt - ich neige dazu Funktionalität zu wrappen und hinter recht einfachen Schnittstellen zu verstecken. Das einzige, was ihr tun müsst, ist eine Verbindung herstellen und schon könnt ihr eine Abfrage an den MySQL Server stellen, was dieser auch sofort mit den Tupeln, die auf die Abfrage zutreffen, beantwortet: aus [3]

#include "cmdline.h"
#include "printdata.h"

#include <mysql++.h>

#include <iostream>
#include <iomanip>

using namespace std;

int
main(int argc, char *argv[])
{
    // Get database access parameters from command line
    mysqlpp::examples::CommandLine cmdline(argc, argv);
    if (!cmdline) {
        return 1;
    }

    // Connect to the sample database.
    mysqlpp::Connection conn(false);
    if (conn.connect(mysqlpp::examples::db_name, cmdline.server(),
        cmdline.user(), cmdline.pass())) {
        // Retrieve a subset of the sample stock table set up by resetdb
        // and display it.
        mysqlpp::Query query = conn.query("select item from stock");
        if (mysqlpp::StoreQueryResult res = query.store()) {
            cout << "We have:" << endl;
            mysqlpp::StoreQueryResult::const_iterator it;
            for (it = res.begin(); it != res.end(); ++it) {
                mysqlpp::Row row = *it;
                cout << '\t' << row[0] << endl;
            }
        }
        else {
            cerr << "Failed to get item list: " << query.error() << endl;
            return 1;
        }

        return 0;
    }
    else {
        cerr << "DB connection failed: " << conn.error() << endl;
        return 1;
    }
}

Das Programm wird gebaut mit:

g++ main.cpp -lmysql++

Leider hat mysql++ noch einige Macken, vor allem, dass die Bibliothek beim Bauen die installierte MySQL-Client-Bibliothek nicht finden konnte, weshalb ich mehrere Symlinks im /usr/-Ordner anlegen musste, bis das Bauen und Linken den Bibliothek endlich funktioniert hat. Außerdem gibt es einen Fehler, dass irgendwo in er mysql++.h eine "mysql_version.h" inkludiert wird, die aber (jedenfalls bei mir) in "mysql/mysql_version.h" liegt. Vielleicht ist das ein Fehler, exklusiv zu LinuxMint, aber nervig ist das allemal.

Kafka auf der anderen Seite ist mit librdkafka++ [4] etwas schwieriger in der Handhabung. Einerseits verlangt die Bibliothek, dass ihr deutlich mehr macht als Verbindung aufbauen, es gibt Offsets und Partitionen zu setzen, ihr müsst Server kennen und Ports, dazu noch das Thema, auf das ihr posten wollt. Das ist schon etwas komplexer, erlaubt dafür aber auch die Daten semantisch zu ordnen, ähnlich wie in MySQL die Datenbanken. Vorteil von Kafka ist, dass ihr die Daten zur Not auf Byteweise reinlegen könnt, wohingegen ihr bei MySQL darauf angewiesen seid, dass die Datentypen von MySQL die gleichen wie in eurem C-Programm sind.

Dafür ist das Bauen von librdkafka++ recht einfach, allerdings fordert das Bauen von euren Anwendungen mehrere Bibliotheken. So wird ein einfaches Programm gebaut [5] (aus den Beispielen der Bibliothek):

#include <iostream>
#include <string>
#include <cstdlib>
#include <cstdio>
#include <csignal>
#include <cstring>

#include <librdkafka/rdkafkacpp.h>

static bool run = true;

static void sigterm (int sig) {
  run = false;
}


class ExampleDeliveryReportCb : public RdKafka::DeliveryReportCb {
 public:
  void dr_cb (RdKafka::Message &message) {
    std::cout << "Message delivery for (" << message.len() << " bytes): " <<
        message.errstr() << std::endl;
  }
};


class ExampleEventCb : public RdKafka::EventCb {
 public:
  void event_cb (RdKafka::Event &event) {
    switch (event.type())
    {
      case RdKafka::Event::EVENT_ERROR:
        std::cerr << "ERROR (" << RdKafka::err2str(event.err()) << "): " <<
            event.str() << std::endl;
        if (event.err() == RdKafka::ERR__ALL_BROKERS_DOWN)
          run = false;
        break;

      case RdKafka::Event::EVENT_STATS:
        std::cerr << "\"STATS\": " << event.str() << std::endl;
        break;

      case RdKafka::Event::EVENT_LOG:
        fprintf(stderr, "LOG-%i-%s: %s\n",
                event.severity(), event.fac().c_str(), event.str().c_str());
        break;

      default:
        std::cerr << "EVENT " << event.type() <<
            " (" << RdKafka::err2str(event.err()) << "): " <<
            event.str() << std::endl;
        break;
    }
  }
};


int main () 
{
    std::string errstr;
    std::string topic_str = "test3";
    std::string host = "localhost:9092";

    int32_t partition = RdKafka::Topic::PARTITION_UA;
    int64_t start_offset = RdKafka::Topic::OFFSET_BEGINNING;

    // create configurations (global and topic)
    RdKafka::Conf *conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
    RdKafka::Conf *tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);

    // list of brokers
    conf->set("metadata.broker.list", host, errstr);

    // event callback
    ExampleEventCb ex_event_cb;
    conf->set("event_cb", &ex_event_cb, errstr);

    signal(SIGINT, sigterm);
    signal(SIGTERM, sigterm);

    // delivery callback
    ExampleDeliveryReportCb ex_dr_cb;
    conf->set("dr_cb", &ex_dr_cb, errstr);

    // create producer
    RdKafka::Producer *producer = RdKafka::Producer::create(conf, errstr);
    if (!producer) {
      std::cerr << "Failed to create producer: " << errstr << std::endl;
      exit(1);
    }
    std::cout << "% Created producer " << producer->name() << std::endl;

    // create topic-handle
    RdKafka::Topic *topic = RdKafka::Topic::create(producer, topic_str, tconf, errstr);
    if (!topic) {
      std::cerr << "Failed to create topic: " << errstr << std::endl;
      exit(1);
    }

    for (std::string line; run and std::getline(std::cin, line);) 
    {
        if (line.empty()) {
            producer->poll(0);
            continue;
        }

      /*
       * Produce message
       */
        RdKafka::ErrorCode resp = producer->produce(topic, partition, RdKafka::Producer::RK_MSG_COPY /* Copy payload */,
                                                    const_cast<char *>(line.c_str()), line.size(), NULL, NULL);
        if (resp != RdKafka::ERR_NO_ERROR)
          std::cerr << "% Produce failed: " << RdKafka::err2str(resp) << std::endl;
        else
          std::cerr << "% Produced message (" << line.size() << " bytes)" << std::endl;
        producer->poll(0);
    }
    run = true;

    while (run and producer->outq_len() > 0) 
    {
        std::cerr << "Waiting for " << producer->outq_len() << std::endl;
        producer->poll(1000);
    }

    delete topic;
    delete producer;
    RdKafka::wait_destroyed(5000);
}

Gebaut wird mit:

g++ main.cpp -g -lrdkafka++ -lrdkafka -lpthread -lz -lrt

So. Abschließen bleibt die Frage offen, was genau ich nun verwenden will. Einerseits gefällt mir die einfachen Abfragen via MySQL, andererseits sehe ich auch die Vorteile von Kafka und halte den Nachrichtenserver für besser skalierend (vor allem, da sich die verschiedenen Nachrichtenbroker die Partitionen unter sich aufteilen). Für die eigentliche Verarbeitung der Daten macht das kaum einen Unterschied. Bei MySQL muss ich hoffen, dass die Datentypen stimmen, die ich eingestellt habe, bei Kafka muss ich hoffen, dass er mir die Daten Byte-Weise so zurückliefert, wie er sie bekommt, auch wenn es sich nicht um druckbare Zeichen handelt.

[1] http://www.kafka.apache.org
[2] http://www.tangentsoft.net/mysql++/
[3] http://www.tangentsoft.net/mysql++/doc/html/userman/tutorial.html#simple
[4] https://github.com/edenhill/librdkafka
[5] https://www.proggen.org/doku.php?id=user:naums:rdkafka_producer

Letzte Bearbeitung: 21.05.2015 23:30