Merge pull request #1331 from vavrusa/upstream-add-kafka

Add Kafka as a table engine
This commit is contained in:
alexey-milovidov 2017-10-13 23:53:03 +03:00 committed by GitHub
commit 7fe700c960
20 changed files with 630 additions and 7 deletions

3
.gitmodules vendored
View File

@ -1,3 +1,6 @@
[submodule "contrib/librdkafka"]
path = contrib/librdkafka
url = https://github.com/edenhill/librdkafka.git
[submodule "contrib/zookeeper"]
path = contrib/zookeeper
url = https://github.com/ClickHouse-Extras/zookeeper.git

View File

@ -229,6 +229,7 @@ include (cmake/find_rt.cmake)
include (cmake/find_readline_edit.cmake)
include (cmake/find_zookeeper.cmake)
include (cmake/find_re2.cmake)
include (cmake/find_rdkafka.cmake)
include (cmake/find_contrib_lib.cmake)
find_contrib_lib(cityhash)

View File

@ -12,6 +12,7 @@ endif ()
if (LZ4_LIBRARY AND LZ4_INCLUDE_DIR)
else ()
set (LZ4_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/lz4/lib)
set (USE_INTERNAL_LZ4_LIBRARY 1)
set (LZ4_LIBRARY lz4)
endif ()

17
cmake/find_rdkafka.cmake Normal file
View File

@ -0,0 +1,17 @@
option (USE_INTERNAL_RDKAFKA_LIBRARY "Set to FALSE to use system librdkafka instead of the bundled" ${NOT_UNBUNDLED})
if (NOT USE_INTERNAL_RDKAFKA_LIBRARY)
find_library (RDKAFKA_LIBRARY rdkafka)
find_path (RDKAFKA_INCLUDE_DIR NAMES librdkafka/rdkafka.h PATHS ${RDKAFKA_INCLUDE_PATHS})
endif ()
if (RDKAFKA_LIBRARY AND RDKAFKA_INCLUDE_DIR)
include_directories (${RDKAFKA_INCLUDE_DIR})
else ()
set (USE_INTERNAL_RDKAFKA_LIBRARY 1)
set (RDKAFKA_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/librdkafka/src")
set (RDKAFKA_LIBRARY rdkafka)
set (USE_RDKAFKA 1)
endif ()
message (STATUS "Using librdkafka: ${RDKAFKA_INCLUDE_DIR} : ${RDKAFKA_LIBRARY}")

View File

@ -61,6 +61,13 @@ if (NOT ARCH_ARM)
add_subdirectory (libcpuid)
endif ()
if (USE_INTERNAL_RDKAFKA_LIBRARY)
set(RDKAFKA_BUILD_EXAMPLES OFF)
set(RDKAFKA_BUILD_TESTS OFF)
mark_as_advanced(ZLIB_INCLUDE_DIR)
add_subdirectory (librdkafka)
endif ()
if (USE_INTERNAL_POCO_LIBRARY)
set (_save ${ENABLE_TESTS})
set (ENABLE_TESTS 0)

1
contrib/librdkafka vendored Submodule

@ -0,0 +1 @@
Subproject commit c44f40be96556ab8145f0f5bcaa500d16d4e0fc4

View File

@ -181,6 +181,7 @@ target_link_libraries (dbms
${Boost_SYSTEM_LIBRARY}
${Poco_Data_LIBRARY}
${BTRIE_LIBRARIES}
${RDKAFKA_LIBRARY}
)
if (Poco_DataODBC_FOUND)
@ -209,14 +210,17 @@ target_link_libraries (dbms
target_include_directories (dbms BEFORE PRIVATE ${DIVIDE_INCLUDE_DIR})
target_include_directories (dbms BEFORE PRIVATE ${SPARCEHASH_INCLUDE_DIR})
if (NOT USE_INTERNAL_LZ4_LIBRARY)
target_include_directories (dbms BEFORE PRIVATE ${LZ4_INCLUDE_DIR})
endif ()
if (NOT USE_INTERNAL_ZSTD_LIBRARY)
target_include_directories (dbms BEFORE PRIVATE ${ZSTD_INCLUDE_DIR})
endif ()
target_include_directories (dbms PUBLIC ${DBMS_INCLUDE_DIR})
target_include_directories (dbms PUBLIC ${PCG_RANDOM_INCLUDE_DIR})
target_include_directories (dbms PUBLIC ${RDKAFKA_INCLUDE_DIR})
# only for copy_headers.sh:
target_include_directories (dbms PRIVATE ${COMMON_INCLUDE_DIR})

View File

@ -17,7 +17,7 @@ namespace DB
class PushingToViewsBlockOutputStream : public IBlockOutputStream
{
public:
PushingToViewsBlockOutputStream(String database, String table, const Context & context_, const ASTPtr & query_ptr_)
PushingToViewsBlockOutputStream(String database, String table, const Context & context_, const ASTPtr & query_ptr_, bool no_destination = false)
: context(context_), query_ptr(query_ptr_)
{
storage = context.getTable(database, table);
@ -34,7 +34,9 @@ public:
dynamic_cast<const StorageMaterializedView &>(*context.getTable(database_table.first, database_table.second)).getInnerQuery(),
std::make_shared<PushingToViewsBlockOutputStream>(database_table.first, database_table.second, context, ASTPtr()));
output = storage->write(query_ptr, context.getSettingsRef());
/* Do not push to destination table if the flag is set */
if (!no_destination)
output = storage->write(query_ptr, context.getSettingsRef());
}
void write(const Block & block) override

View File

@ -52,13 +52,13 @@ size_t find_centroid(Float64 x, std::vector<Float64>& centroids)
{
// Centroids array has to have at least one element, and if it has only one element,
// it is also the result of this Function.
Float64 distance = abs(centroids[0] - x);
Float64 distance = std::abs(centroids[0] - x);
size_t index = 0;
// Check if we have more clusters and if we have, whether some is closer to src[i]
for (size_t j = 1; j < centroids.size(); ++j)
{
Float64 next_distance = abs(centroids[j] - x);
Float64 next_distance = std::abs(centroids[j] - x);
if (next_distance < distance)
{

View File

@ -588,6 +588,11 @@ void Context::addDependency(const DatabaseAndTableName & from, const DatabaseAnd
checkDatabaseAccessRights(from.first);
checkDatabaseAccessRights(where.first);
shared->view_dependencies[from].insert(where);
// Notify table of dependencies change
auto table = tryGetTable(from.first, from.second);
if (table != nullptr)
table->updateDependencies();
}
void Context::removeDependency(const DatabaseAndTableName & from, const DatabaseAndTableName & where)
@ -596,6 +601,11 @@ void Context::removeDependency(const DatabaseAndTableName & from, const Database
checkDatabaseAccessRights(from.first);
checkDatabaseAccessRights(where.first);
shared->view_dependencies[from].erase(where);
// Notify table of dependencies change
auto table = tryGetTable(from.first, from.second);
if (table != nullptr)
table->updateDependencies();
}
Dependencies Context::getDependencies(const String & database_name, const String & table_name) const

View File

@ -93,7 +93,7 @@ BlockIO InterpreterInsertQuery::execute()
/// We create a pipeline of several streams, into which we will write data.
BlockOutputStreamPtr out;
out = std::make_shared<PushingToViewsBlockOutputStream>(query.database, query.table, context, query_ptr);
out = std::make_shared<PushingToViewsBlockOutputStream>(query.database, query.table, context, query_ptr, query.no_destination);
out = std::make_shared<MaterializingBlockOutputStream>(out);

View File

@ -304,7 +304,11 @@ struct Settings
* It is for testing purposes, the syntax will likely change soon and the server will not be able \
* to load the tables created this way. You have been warned. \
*/ \
M(SettingBool, experimental_merge_tree_allow_custom_partitions, false)
M(SettingBool, experimental_merge_tree_allow_custom_partitions, false) \
/* Timeout for flushing data from streaming storages. */ \
M(SettingMilliseconds, stream_flush_interval_ms, DEFAULT_QUERY_LOG_FLUSH_INTERVAL_MILLISECONDS) \
/* Schema identifier (used by schema-based formats) */ \
M(SettingString, schema, "")
/// Possible limits for query execution.

View File

@ -17,6 +17,10 @@ public:
ASTPtr columns;
String format;
ASTPtr select;
// Set to true if the data should only be inserted into attached views
bool no_destination = false;
/// Data to insert
const char * data = nullptr;
const char * end = nullptr;

View File

@ -390,7 +390,7 @@ struct Stats
avg_speed_first = avg_speed_value;
}
if (abs(avg_speed_value - avg_speed_first) >= precision)
if (std::abs(avg_speed_value - avg_speed_first) >= precision)
{
avg_speed_first = avg_speed_value;
avg_speed_watch.restart();

View File

@ -304,6 +304,10 @@ public:
/// Otherwise - throws an exception with detailed information or returns false
virtual bool checkTableCanBeDropped() const { return true; }
/** Notify engine about updated dependencies for this storage. */
virtual void updateDependencies() {}
protected:
using ITableDeclaration::ITableDeclaration;
using std::enable_shared_from_this<IStorage>::shared_from_this;

View File

@ -5,6 +5,8 @@
#include <Common/StringUtils.h>
#include <Common/typeid_cast.h>
#include <DataTypes/DataTypeTuple.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
@ -33,6 +35,7 @@
#include <Storages/StorageJoin.h>
#include <Storages/StorageFile.h>
#include <Storages/StorageDictionary.h>
#include <Storages/StorageKafka.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/parseAggregateFunctionParameters.h>
@ -623,6 +626,65 @@ StoragePtr StorageFactory::get(
StorageTrivialBuffer::Thresholds{max_time, max_rows, max_bytes},
destination_database, destination_table);
}
else if (name == "Kafka")
{
/** Arguments of engine is following:
* - Kafka broker list
* - List of topics
* - Group ID (may be a constaint expression with a string result)
* - Message format (string)
* - Schema (optional, if the format supports it)
*/
ASTs & args_func = typeid_cast<ASTFunction &>(*typeid_cast<ASTCreateQuery &>(*query).storage).children;
const auto params_error_message = "Storage Kafka requires 4 parameters"
" - Kafka broker list, list of topics to consume, consumer group ID, message format";
if (args_func.size() != 1)
throw Exception(params_error_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
ASTs & args = typeid_cast<ASTExpressionList &>(*args_func.at(0)).children;
if (args.size() != 4 && args.size() != 5)
throw Exception(params_error_message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
String brokers;
auto ast = typeid_cast<ASTLiteral *>(&*args[0]);
if (ast && ast->value.getType() == Field::Types::String)
brokers = safeGet<String>(ast->value);
else
throw Exception(String("Kafka broker list must be a string"), ErrorCodes::BAD_ARGUMENTS);
args[1] = evaluateConstantExpressionAsLiteral(args[1], local_context);
args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(args[2], local_context);
args[3] = evaluateConstantExpressionOrIdentifierAsLiteral(args[3], local_context);
// Additionally parse schema if supported
String schema;
if (args.size() == 5)
{
args[4] = evaluateConstantExpressionOrIdentifierAsLiteral(args[4], local_context);
schema = static_cast<const ASTLiteral &>(*args[4]).value.safeGet<String>();
}
// Parse topic list and consumer group
Names topics;
topics.push_back(static_cast<const ASTLiteral &>(*args[1]).value.safeGet<String>());
String group = static_cast<const ASTLiteral &>(*args[2]).value.safeGet<String>();
// Parse format from string
String format;
ast = typeid_cast<ASTLiteral *>(&*args[3]);
if (ast && ast->value.getType() == Field::Types::String)
format = safeGet<String>(ast->value);
else
throw Exception(String("Format must be a string"), ErrorCodes::BAD_ARGUMENTS);
return StorageKafka::create(
table_name, database_name, context, columns,
materialized_columns, alias_columns, column_defaults,
brokers, group, topics, format, schema);
}
else if (endsWith(name, "MergeTree"))
{
/** [Replicated][|Summing|Collapsing|Aggregating|Unsorted|Replacing|Graphite]MergeTree (2 * 7 combinations) engines

View File

@ -0,0 +1,358 @@
#include <Common/Exception.h>
#include <Common/setThreadName.h>
#include <DataStreams/FormatFactory.h>
#include <DataStreams/IProfilingBlockInputStream.h>
#include <DataStreams/LimitBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTInsertQuery.h>
#include <Storages/StorageKafka.h>
#include <common/logger_useful.h>
#include <rdkafka.h>
#include <thread>
namespace DB
{
namespace ErrorCodes
{
extern const int INCORRECT_DATA;
extern const int UNKNOWN_EXCEPTION;
extern const int CANNOT_READ_FROM_ISTREAM;
}
/// How long to wait for a single message (applies to each individual message)
static const auto READ_POLL_MS = 1 * 1000;
static const auto CLEANUP_TIMEOUT_MS = 2 * 1000;
/// How many messages to pull out of internal queue at once
static const auto BATCH_SIZE_MAX = 16UL;
class ReadBufferFromKafkaConsumer : public ReadBuffer
{
using Messages = std::vector<rd_kafka_message_t *>;
rd_kafka_t * consumer;
Messages messages;
Messages::iterator current;
Messages::iterator end;
Poco::Logger * log;
bool nextImpl() override
{
if (current == end)
{
// Fetch next batch of messages
bool res = fetchMessages();
if (!res)
{
LOG_ERROR(log, "Consumer error: " << rd_kafka_err2str(rd_kafka_last_error()));
return false;
}
// No error, but no messages read
if (current == end)
return false;
}
// Process next buffered message
rd_kafka_message_t * msg = *(current++);
if (msg->err)
{
if (msg->err != RD_KAFKA_RESP_ERR__PARTITION_EOF)
LOG_ERROR(log, "Consumer error: " << rd_kafka_err2str(msg->err) << " " << rd_kafka_message_errstr(msg));
return false;
}
// Consume message and mark the topic/partition offset
BufferBase::set(reinterpret_cast<char *>(msg->payload), msg->len, 0);
auto err = rd_kafka_offset_store(msg->rkt, msg->partition, msg->offset);
if (err)
LOG_ERROR(log, "Failed to store offsets: " << rd_kafka_err2str(err));
return true;
}
void reset()
{
for (auto it = messages.begin(); it < end; ++it)
rd_kafka_message_destroy(*it);
current = end = messages.begin();
}
bool fetchMessages()
{
rd_kafka_queue_t* queue = rd_kafka_queue_get_consumer(consumer);
if (queue == nullptr)
return false;
reset();
auto result = rd_kafka_consume_batch_queue(queue, READ_POLL_MS, messages.data(), messages.size());
if (result < 0)
return false;
current = messages.begin();
end = current + result;
return true;
}
public:
ReadBufferFromKafkaConsumer(rd_kafka_t * consumer_, size_t batch_size, Poco::Logger * log_)
: ReadBuffer(nullptr, 0), consumer(consumer_), messages(batch_size), current(messages.begin()), end(messages.begin()), log(log_) {}
~ReadBufferFromKafkaConsumer() { reset(); }
};
class KafkaBlockInputStream : public IProfilingBlockInputStream
{
public:
KafkaBlockInputStream(StorageKafka & storage_, const Context & context_, const String & schema, size_t max_block_size)
: storage(storage_)
{
// Always skip unknown fields regardless of the context (JSON or TSKV)
Context context = context_;
context.setSetting("input_format_skip_unknown_fields", 1UL);
if (schema.size() > 0)
context.setSetting("schema", schema);
// Create a formatted reader on Kafka messages
LOG_TRACE(storage.log, "Creating formatted reader");
read_buf = std::make_unique<ReadBufferFromKafkaConsumer>(storage.consumer, max_block_size, storage.log);
reader = FormatFactory().getInput(storage.format_name, *read_buf, storage.getSampleBlock(), context, max_block_size);
}
~KafkaBlockInputStream() override
{
}
String getName() const override
{
return storage.getName();
}
String getID() const override
{
std::stringstream res_stream;
res_stream << "Kafka(" << storage.topics.size() << ", " << storage.format_name << ")";
return res_stream.str();
}
Block readImpl() override
{
if (isCancelled())
return {};
return reader->read();
}
void readPrefixImpl() override
{
reader->readPrefix();
}
void readSuffixImpl() override
{
reader->readSuffix();
}
private:
StorageKafka & storage;
rd_kafka_t * consumer;
Block sample_block;
std::unique_ptr<ReadBufferFromKafkaConsumer> read_buf;
BlockInputStreamPtr reader;
};
StorageKafka::StorageKafka(
const std::string & table_name_,
const std::string & database_name_,
Context & context_,
NamesAndTypesListPtr columns_,
const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_,
const String & brokers_, const String & group_, const Names & topics_,
const String & format_name_, const String & schema_name_)
: IStorage{materialized_columns_, alias_columns_, column_defaults_},
table_name(table_name_), database_name(database_name_), context(context_),
columns(columns_), topics(topics_), format_name(format_name_), schema_name(schema_name_),
conf(rd_kafka_conf_new()), log(&Logger::get("StorageKafka (" + table_name_ + ")"))
{
char errstr[512];
LOG_TRACE(log, "Setting brokers: " << brokers_);
if (rd_kafka_conf_set(conf, "metadata.broker.list", brokers_.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
throw Exception(String(errstr), ErrorCodes::INCORRECT_DATA);
LOG_TRACE(log, "Setting Group ID: " << group_ << " Client ID: clickhouse");
if (rd_kafka_conf_set(conf, "group.id", group_.c_str(), errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
throw Exception(String(errstr), ErrorCodes::INCORRECT_DATA);
if (rd_kafka_conf_set(conf, "client.id", "clickhouse", errstr, sizeof(errstr)) != RD_KAFKA_CONF_OK)
throw Exception(String(errstr), ErrorCodes::INCORRECT_DATA);
// Don't store offsets of messages before they're processed
rd_kafka_conf_set(conf, "enable.auto.offset.store", "false", nullptr, 0);
// Try to fetch preferred number of bytes before timeout
const Settings & settings = context.getSettingsRef();
auto min_bytes = settings.preferred_block_size_bytes.toString();
rd_kafka_conf_set(conf, "fetch.min.bytes", min_bytes.c_str(), nullptr, 0);
}
BlockInputStreams StorageKafka::read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned num_streams)
{
check(column_names);
processed_stage = QueryProcessingStage::FetchColumns;
if (!conf)
return BlockInputStreams();
BlockInputStreams streams;
streams.reserve(num_streams);
// Note: The block size is set to 1, otherwise it'd have to be able to return excess buffered messages
for (size_t i = 0; i < num_streams; ++i)
streams.push_back(std::make_shared<KafkaBlockInputStream>(*this, context, schema_name, 1));
LOG_DEBUG(log, "Starting reading " << num_streams << " streams, " << max_block_size << " block size");
return streams;
}
void StorageKafka::startup()
{
// Create a consumer from saved configuration
char errstr[512];
consumer = rd_kafka_new(RD_KAFKA_CONSUMER, conf, errstr, sizeof(errstr));
if (consumer == nullptr)
throw Exception("Failed to create consumer handle: " + String(errstr), ErrorCodes::UNKNOWN_EXCEPTION);
rd_kafka_poll_set_consumer(consumer);
// Create a list of partitions
auto * topicList = rd_kafka_topic_partition_list_new(topics.size());
for (const auto & t : topics)
{
LOG_TRACE(log, "Subscribing to topic: " + t);
rd_kafka_topic_partition_list_add(topicList, t.c_str(), RD_KAFKA_PARTITION_UA);
}
// Subscribe to requested topics
auto err = rd_kafka_subscribe(consumer, topicList);
if (err)
throw Exception("Failed to subscribe: " + String(rd_kafka_err2str(err)), ErrorCodes::UNKNOWN_EXCEPTION);
rd_kafka_topic_partition_list_destroy(topicList);
// Start the reader thread
stream_thread = std::thread(&StorageKafka::streamThread, this);
}
void StorageKafka::shutdown()
{
is_cancelled = true;
cancel_event.set();
LOG_TRACE(log, "Unsubscribing from assignments");
rd_kafka_unsubscribe(consumer);
auto err = rd_kafka_consumer_close(consumer);
if (err)
{
LOG_ERROR(log, "Failed to close: " + String(rd_kafka_err2str(err)));
}
LOG_TRACE(log, "Destroying consumer");
rd_kafka_destroy(consumer);
if (stream_thread.joinable())
stream_thread.join();
rd_kafka_wait_destroyed(CLEANUP_TIMEOUT_MS);
}
void StorageKafka::updateDependencies()
{
cancel_event.set();
}
void StorageKafka::streamThread()
{
setThreadName("KafkaStreamThread");
while (!is_cancelled)
{
try
{
auto dependencies = context.getDependencies(database_name, table_name);
if (dependencies.size() > 0)
{
LOG_DEBUG(log, "Started streaming to " << dependencies.size() << " attached views");
streamToViews();
LOG_DEBUG(log, "Stopped streaming to views");
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
cancel_event.tryWait(READ_POLL_MS);
}
LOG_DEBUG(log, "Stream thread finished");
}
void StorageKafka::streamToViews()
{
auto table = context.getTable(database_name, table_name);
if (!table)
{
LOG_WARNING(log, "Destination table " << database_name << "." << table_name << " doesn't exist.");
return;
}
// Create an INSERT query for streaming data
auto insert = std::make_shared<ASTInsertQuery>();
insert->database = database_name;
insert->table = table_name;
insert->no_destination = true; // Only insert into dependent views
// Limit the number of batched messages to allow early cancellations
const Settings & settings = context.getSettingsRef();
const size_t block_size = std::min(settings.max_block_size.value, BATCH_SIZE_MAX);
BlockInputStreamPtr in = std::make_shared<KafkaBlockInputStream>(*this, context, schema_name, block_size);
// Limit read batch to maximum block size to allow DDL
IProfilingBlockInputStream::LocalLimits limits;
limits.max_execution_time = settings.stream_flush_interval_ms;
limits.timeout_overflow_mode = OverflowMode::BREAK;
if (IProfilingBlockInputStream * p_stream = dynamic_cast<IProfilingBlockInputStream *>(in.get()))
p_stream->setLimits(limits);
// Execute the query
InterpreterInsertQuery interpreter{insert, context};
auto block_io = interpreter.execute();
copyData(*in, *block_io.out);
}
}

View File

@ -0,0 +1,87 @@
#pragma once
#include <mutex>
#include <ext/shared_ptr_helper.h>
#include <Core/NamesAndTypes.h>
#include <Storages/IStorage.h>
#include <DataStreams/IBlockOutputStream.h>
#include <Poco/Event.h>
struct rd_kafka_s;
struct rd_kafka_conf_s;
namespace DB
{
class StorageKafka;
/** Implements a Kafka queue table engine that can be used as a persistent queue / buffer,
* or as a basic building block for creating pipelines with a continuous insertion / ETL.
*/
class StorageKafka : public ext::shared_ptr_helper<StorageKafka>, public IStorage
{
friend class ext::shared_ptr_helper<StorageKafka>;
friend class KafkaBlockInputStream;
friend class KafkaBlockOutputStream;
public:
std::string getName() const override { return "Kafka"; }
std::string getTableName() const override { return table_name; }
std::string getDatabaseName() const { return database_name; }
const NamesAndTypesList & getColumnsListImpl() const override { return *columns; }
void startup() override;
void shutdown() override;
BlockInputStreams read(
const Names & column_names,
const SelectQueryInfo & query_info,
const Context & context,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned num_streams) override;
void rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name) override
{
table_name = new_table_name;
database_name = new_database_name;
}
void updateDependencies() override;
private:
String table_name;
String database_name;
Context & context;
NamesAndTypesListPtr columns;
Names topics;
const String format_name;
const String schema_name;
struct rd_kafka_conf_s * conf;
struct rd_kafka_s * consumer;
std::mutex mutex;
Poco::Logger * log;
Poco::Event cancel_event;
std::atomic<bool> is_cancelled{false};
std::thread stream_thread;
StorageKafka(
const std::string & table_name_,
const std::string & database_name_,
Context & context_,
NamesAndTypesListPtr columns_,
const NamesAndTypesList & materialized_columns_,
const NamesAndTypesList & alias_columns_,
const ColumnDefaults & column_defaults_,
const String & brokers_, const String & group_, const Names & topics_,
const String & format_name_, const String & schema_name_);
void streamThread();
void streamToViews();
};
}

View File

@ -209,3 +209,17 @@ output_format_json_quote_64bit_integers
If the parameter is true (default value), UInt64 and Int64 numbers are printed as quoted strings in all JSON output formats.
Such behavior is compatible with most JavaScript interpreters that stores all numbers as double-precision floating point numbers.
Otherwise, they are printed as regular numbers.
stream_flush_interval_ms
------------------------
This setting only applies in cases when the server forms blocks from streaming table engines.
Either the timeout happens, or the stream produces ``max_insert_block_size`` rows.
By default, 7500.
Lower value results in stream flushing to table more often, so the data appears in the destination table faster.
Setting the value too low may result in excessive insertion frequency and lower ingestion efficiency.
schema
------
This parameter only applies when used in conjunction with formats requiring a schema definition, for example Cap'n Proto. The parameter value is specific to the format.

View File

@ -0,0 +1,44 @@
Kafka
-----
A table engine backed by Apache Kafka, a streaming platform having three key capabilities:
1. It lets you publish and subscribe to streams of records. In this respect it is similar to a message queue or enterprise messaging system.
2. It lets you store streams of records in a fault-tolerant way.
3. It lets you process streams of records as they occur.
.. code-block:: text
Kafka(broker_list, topic_list, group_name, format[, schema])
Engine parameters:
broker_list - A comma-separated list of brokers (``localhost:9092``).
topic_list - List of Kafka topics to consume (``my_topic``).
group_name - Kafka consumer group name (``group1``). Read offsets are tracked for each consumer group, if you want to consume messages exactly once across cluster, you should use the same group name.
format - Name of the format used to deserialize messages. It accepts the same values as the ``FORMAT`` SQL statement, for example ``JSONEachRow``.
schema - Optional schema value for formats that require a schema to interpret consumed messages, for example Cap'n Proto format requires a path to schema file and root object - ``schema.capnp:Message``. Self-describing formats such as JSON don't require any schema.
Example:
.. code-block:: sql
CREATE TABLE queue (timestamp UInt64, level String, message String) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');
SELECT * FROM queue LIMIT 5
The consumed messages are tracked automatically in the background, so each message will be read exactly once in a single consumer group. If you want to consume the same set of messages twice, you can create a copy of the table with a different ``group_name``. The consumer group is elastic and synchronised across the cluster, for example if you have 10 topic/partitions and 5 instances of the table across cluster, it will automatically assign 2 topic/partitions per instace. If you detach a table, or add new instances, it will rebalance topic/partition allocations automatically. See http://kafka.apache.org/intro for more information about how this works.
Reading messages directly however is not very useful, the table engine is typically used to build real-time ingestion pipelines using MATERIALIZED VIEW. If a MATERIALIZED VIEW is attached to a Kafka table engine, it will start consuming messages in the background, and push them into the attached views. This allows you to continuously ingest messages from Kafka and transform them using the SELECT statement into appropriate format.
Example:
.. code-block:: sql
CREATE TABLE queue (timestamp UInt64, level String, message String) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');
CREATE MATERIALIZED VIEW daily ENGINE = SummingMergeTree(day, (day, level), 8192) AS
SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total
FROM queue GROUP BY day, level;
SELECT level, sum(total) FROM daily GROUP BY level;
The messages are streamed into the attached view immediately in the same way a continuous stream of INSERT statement would. To improve performance, consumed messages are squashed into batches of ``max_insert_block_size``. If the message batch cannot be completed within ``stream_flush_interval_ms`` period (by default 7500ms), it will be flushed to ensure time bounded insertion time.