diff --git a/docs/en/engines/table-engines/integrations/kafka.md b/docs/en/engines/table-engines/integrations/kafka.md
index b81d5624c1a..e4d3ac762ed 100644
--- a/docs/en/engines/table-engines/integrations/kafka.md
+++ b/docs/en/engines/table-engines/integrations/kafka.md
@@ -173,6 +173,7 @@ Similar to GraphiteMergeTree, the Kafka engine supports extended configuration u
cgrp
smallest
+ 600
@@ -260,3 +261,4 @@ The number of rows in one Kafka message depends on whether the format is row-bas
- [Virtual columns](../../../engines/table-engines/index.md#table_engines-virtual_columns)
- [background_message_broker_schedule_pool_size](../../../operations/server-configuration-parameters/settings.md#background_message_broker_schedule_pool_size)
+- [system.kafka_consumers](../../../operations/system-tables/kafka_consumers.md)
diff --git a/docs/en/operations/system-tables/kafka_consumers.md b/docs/en/operations/system-tables/kafka_consumers.md
new file mode 100644
index 00000000000..7e28a251e26
--- /dev/null
+++ b/docs/en/operations/system-tables/kafka_consumers.md
@@ -0,0 +1,58 @@
+---
+slug: /en/operations/system-tables/kafka_consumers
+---
+# kafka_consumers
+
+Contains information about Kafka consumers.
+Applicable for [Kafka table engine](../../engines/table-engines/integrations/kafka) (native ClickHouse integration)
+
+Columns:
+
+- `database` (String) - database of the table with Kafka Engine.
+- `table` (String) - name of the table with Kafka Engine.
+- `consumer_id` (String) - Kafka consumer identifier. Note, that a table can have many consumers. Specified by `kafka_num_consumers` parameter.
+- `assignments.topic` (Array(String)) - Kafka topic.
+- `assignments.partition_id` (Array(Int32)) - Kafka partition id. Note, that only one consumer can be assigned to a partition.
+- `assignments.current_offset` (Array(Int64)) - current offset.
+- `exceptions.time`, (Array(DateTime)) - timestamp when the 10 most recent exceptions were generated.
+- `exceptions.text`, (Array(String)) - text of 10 most recent exceptions.
+- `last_poll_time`, (DateTime) - timestamp of the most recent poll.
+- `num_messages_read`, (UInt64) - number of messages read by the consumer.
+- `last_commit_time`, (DateTime) - timestamp of the most recent poll.
+- `num_commits`, (UInt64) - total number of commits for the consumer.
+- `last_rebalance_time`, (DateTime) - timestamp of the most recent Kafka rebalance
+- `num_rebalance_revocations`, (UInt64) - number of times the consumer was revoked its partitions
+- `num_rebalance_assignments`, (UInt64) - number of times the consumer was assigned to Kafka cluster
+- `is_currently_used`, (UInt8) - consumer is in use
+- `rdkafka_stat` (String) - library internal statistic. See https://github.com/ClickHouse/librdkafka/blob/master/STATISTICS.md . Set `statistics_interval_ms` to 0 disable, default is 3000 (once in three seconds).
+
+Example:
+
+``` sql
+SELECT *
+FROM system.kafka_consumers
+FORMAT Vertical
+```
+
+``` text
+Row 1:
+──────
+database: test
+table: kafka
+consumer_id: ClickHouse-instance-test-kafka-1caddc7f-f917-4bb1-ac55-e28bd103a4a0
+assignments.topic: ['system_kafka_cons']
+assignments.partition_id: [0]
+assignments.current_offset: [18446744073709550615]
+exceptions.time: []
+exceptions.text: []
+last_poll_time: 2006-11-09 18:47:47
+num_messages_read: 4
+last_commit_time: 2006-11-10 04:39:40
+num_commits: 1
+last_rebalance_time: 1970-01-01 00:00:00
+num_rebalance_revocations: 0
+num_rebalance_assignments: 1
+is_currently_used: 1
+rdkafka_stat: {...}
+
+```
diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt
index ddb6fcebd23..5dfc4c15be0 100644
--- a/src/CMakeLists.txt
+++ b/src/CMakeLists.txt
@@ -384,7 +384,7 @@ target_link_libraries (clickhouse_common_io PUBLIC ch_contrib::abseil_swiss_tabl
dbms_target_link_libraries(PUBLIC ch_contrib::roaring)
if (TARGET ch_contrib::rdkafka)
- dbms_target_link_libraries(PRIVATE ch_contrib::rdkafka ch_contrib::cppkafka)
+ dbms_target_link_libraries(PUBLIC ch_contrib::rdkafka ch_contrib::cppkafka)
endif()
if (TARGET ch_contrib::nats_io)
diff --git a/src/Storages/Kafka/KafkaConsumer.cpp b/src/Storages/Kafka/KafkaConsumer.cpp
index b2e6129c61c..9e558940012 100644
--- a/src/Storages/Kafka/KafkaConsumer.cpp
+++ b/src/Storages/Kafka/KafkaConsumer.cpp
@@ -61,6 +61,7 @@ KafkaConsumer::KafkaConsumer(
, stopped(stopped_)
, current(messages.begin())
, topics(_topics)
+ , exceptions_buffer(EXCEPTIONS_DEPTH)
{
// called (synchronously, during poll) when we enter the consumer group
consumer->set_assignment_callback([this](const cppkafka::TopicPartitionList & topic_partitions)
@@ -79,6 +80,7 @@ KafkaConsumer::KafkaConsumer(
}
assignment = topic_partitions;
+ num_rebalance_assignments++;
});
// called (synchronously, during poll) when we leave the consumer group
@@ -106,6 +108,8 @@ KafkaConsumer::KafkaConsumer(
cleanUnprocessed();
stalled_status = REBALANCE_HAPPENED;
+ last_rebalance_timestamp_usec = static_cast(Poco::Timestamp().epochTime());
+
assignment.reset();
waited_for_assignment = 0;
@@ -118,12 +122,14 @@ KafkaConsumer::KafkaConsumer(
// {
// LOG_WARNING(log, "Commit error: {}", e.what());
// }
+ num_rebalance_revocations++;
});
consumer->set_rebalance_error_callback([this](cppkafka::Error err)
{
LOG_ERROR(log, "Rebalance error: {}", err);
ProfileEvents::increment(ProfileEvents::KafkaRebalanceErrors);
+ setExceptionInfo(err);
});
}
@@ -177,6 +183,7 @@ void KafkaConsumer::drain()
else
{
LOG_ERROR(log, "Error during draining: {}", error);
+ setExceptionInfo(error);
}
}
@@ -251,6 +258,8 @@ void KafkaConsumer::commit()
consumer->commit();
committed = true;
print_offsets("Committed offset", consumer->get_offsets_committed(consumer->get_assignment()));
+ last_commit_timestamp_usec = static_cast(Poco::Timestamp().epochTime());
+ num_commits += 1;
}
catch (const cppkafka::HandleException & e)
{
@@ -259,7 +268,10 @@ void KafkaConsumer::commit()
if (e.get_error() == RD_KAFKA_RESP_ERR__NO_OFFSET)
committed = true;
else
+ {
LOG_ERROR(log, "Exception during commit attempt: {}", e.what());
+ setExceptionInfo(e.what());
+ }
}
--max_retries;
}
@@ -399,6 +411,8 @@ ReadBufferPtr KafkaConsumer::consume()
/// Don't drop old messages immediately, since we may need them for virtual columns.
auto new_messages = consumer->poll_batch(batch_size,
std::chrono::milliseconds(actual_poll_timeout_ms));
+ last_poll_timestamp_usec = static_cast(Poco::Timestamp().epochTime());
+ num_messages_read += new_messages.size();
resetIfStopped();
if (stalled_status == CONSUMER_STOPPED)
@@ -495,6 +509,7 @@ size_t KafkaConsumer::filterMessageErrors()
{
ProfileEvents::increment(ProfileEvents::KafkaConsumerErrors);
LOG_ERROR(log, "Consumer error: {}", error);
+ setExceptionInfo(error);
return true;
}
return false;
@@ -527,4 +542,64 @@ void KafkaConsumer::storeLastReadMessageOffset()
}
}
+void KafkaConsumer::setExceptionInfo(const cppkafka::Error & err)
+{
+ setExceptionInfo(err.to_string());
+}
+
+void KafkaConsumer::setExceptionInfo(const String & text)
+{
+ std::lock_guard lock(exception_mutex);
+ exceptions_buffer.push_back({text, static_cast(Poco::Timestamp().epochTime())});
+}
+
+/*
+ * Needed until
+ * https://github.com/mfontanini/cppkafka/pull/309
+ * is merged,
+ * because consumer->get_member_id() contains a leak
+ */
+std::string KafkaConsumer::getMemberId() const
+{
+ char * memberid_ptr = rd_kafka_memberid(consumer->get_handle());
+ std::string memberid_string = memberid_ptr;
+ rd_kafka_mem_free(nullptr, memberid_ptr);
+ return memberid_string;
+}
+
+
+KafkaConsumer::Stat KafkaConsumer::getStat() const
+{
+ KafkaConsumer::Stat::Assignments assignments;
+ auto cpp_assignments = consumer->get_assignment();
+ auto cpp_offsets = consumer->get_offsets_position(cpp_assignments);
+
+ for (size_t num = 0; num < cpp_assignments.size(); ++num)
+ {
+ assignments.push_back({
+ cpp_assignments[num].get_topic(),
+ cpp_assignments[num].get_partition(),
+ cpp_offsets[num].get_offset(),
+ });
+ }
+
+ return {
+ .consumer_id = getMemberId() /* consumer->get_member_id() */ ,
+ .assignments = std::move(assignments),
+ .last_poll_time = last_poll_timestamp_usec.load(),
+ .num_messages_read = num_messages_read.load(),
+
+ .last_commit_timestamp_usec = last_commit_timestamp_usec.load(),
+ .last_rebalance_timestamp_usec = last_rebalance_timestamp_usec.load(),
+ .num_commits = num_commits.load(),
+ .num_rebalance_assignments = num_rebalance_assignments.load(),
+ .num_rebalance_revocations = num_rebalance_revocations.load(),
+ .exceptions_buffer = [&](){std::lock_guard lock(exception_mutex);
+ return exceptions_buffer;}(),
+ .in_use = in_use.load(),
+ .rdkafka_stat = [&](){std::lock_guard lock(rdkafka_stat_mutex);
+ return rdkafka_stat;}(),
+ };
+}
+
}
diff --git a/src/Storages/Kafka/KafkaConsumer.h b/src/Storages/Kafka/KafkaConsumer.h
index feda51a682e..91bb2ae8d77 100644
--- a/src/Storages/Kafka/KafkaConsumer.h
+++ b/src/Storages/Kafka/KafkaConsumer.h
@@ -1,5 +1,7 @@
#pragma once
+#include
+
#include
#include
#include
@@ -20,10 +22,44 @@ namespace Poco
namespace DB
{
+class StorageSystemKafkaConsumers;
+
using ConsumerPtr = std::shared_ptr;
class KafkaConsumer
{
+public:
+ struct ExceptionInfo
+ {
+ String text;
+ UInt64 timestamp_usec;
+ };
+ using ExceptionsBuffer = boost::circular_buffer;
+
+ struct Stat // system.kafka_consumers data
+ {
+ struct Assignment
+ {
+ String topic_str;
+ Int32 partition_id;
+ Int64 current_offset;
+ };
+ using Assignments = std::vector;
+
+ String consumer_id;
+ Assignments assignments;
+ UInt64 last_poll_time;
+ UInt64 num_messages_read;
+ UInt64 last_commit_timestamp_usec;
+ UInt64 last_rebalance_timestamp_usec;
+ UInt64 num_commits;
+ UInt64 num_rebalance_assignments;
+ UInt64 num_rebalance_revocations;
+ KafkaConsumer::ExceptionsBuffer exceptions_buffer;
+ bool in_use;
+ std::string rdkafka_stat;
+ };
+
public:
KafkaConsumer(
ConsumerPtr consumer_,
@@ -69,6 +105,18 @@ public:
auto currentTimestamp() const { return current[-1].get_timestamp(); }
const auto & currentHeaderList() const { return current[-1].get_header_list(); }
String currentPayload() const { return current[-1].get_payload(); }
+ void setExceptionInfo(const cppkafka::Error & err);
+ void setExceptionInfo(const String & text);
+ void setRDKafkaStat(const std::string & stat_json_string)
+ {
+ std::lock_guard lock(rdkafka_stat_mutex);
+ rdkafka_stat = stat_json_string;
+ }
+ void inUse() { in_use = true; }
+ void notInUse() { in_use = false; }
+
+ // For system.kafka_consumers
+ Stat getStat() const;
private:
using Messages = std::vector;
@@ -105,12 +153,33 @@ private:
std::optional assignment;
const Names topics;
+ /// system.kafka_consumers data is retrieved asynchronously
+ /// so we have to protect exceptions_buffer
+ mutable std::mutex exception_mutex;
+ const size_t EXCEPTIONS_DEPTH = 10;
+ ExceptionsBuffer exceptions_buffer;
+
+ std::atomic last_exception_timestamp_usec = 0;
+ std::atomic last_poll_timestamp_usec = 0;
+ std::atomic num_messages_read = 0;
+ std::atomic last_commit_timestamp_usec = 0;
+ std::atomic num_commits = 0;
+ std::atomic last_rebalance_timestamp_usec = 0;
+ std::atomic num_rebalance_assignments = 0;
+ std::atomic num_rebalance_revocations = 0;
+ std::atomic in_use = 0;
+
+ mutable std::mutex rdkafka_stat_mutex;
+ std::string rdkafka_stat;
+
void drain();
void cleanUnprocessed();
void resetIfStopped();
/// Return number of messages with an error.
size_t filterMessageErrors();
ReadBufferPtr getNextMessage();
+
+ std::string getMemberId() const;
};
}
diff --git a/src/Storages/Kafka/KafkaSource.cpp b/src/Storages/Kafka/KafkaSource.cpp
index ba242417058..cd83a6a1422 100644
--- a/src/Storages/Kafka/KafkaSource.cpp
+++ b/src/Storages/Kafka/KafkaSource.cpp
@@ -133,6 +133,7 @@ Chunk KafkaSource::generateImpl()
{
e.addMessage("while parsing Kafka message (topic: {}, partition: {}, offset: {})'",
consumer->currentTopic(), consumer->currentPartition(), consumer->currentOffset());
+ consumer->setExceptionInfo(e.message());
throw std::move(e);
}
};
diff --git a/src/Storages/Kafka/StorageKafka.cpp b/src/Storages/Kafka/StorageKafka.cpp
index a7315eb51ea..54db0f29cb8 100644
--- a/src/Storages/Kafka/StorageKafka.cpp
+++ b/src/Storages/Kafka/StorageKafka.cpp
@@ -416,7 +416,9 @@ void StorageKafka::startup()
{
try
{
- pushConsumer(createConsumer(i));
+ auto consumer = createConsumer(i);
+ pushConsumer(consumer);
+ all_consumers.push_back(consumer);
++num_created_consumers;
}
catch (const cppkafka::Exception &)
@@ -456,6 +458,7 @@ void StorageKafka::shutdown()
void StorageKafka::pushConsumer(KafkaConsumerPtr consumer)
{
std::lock_guard lock(mutex);
+ consumer->notInUse();
consumers.push_back(consumer);
semaphore.set();
CurrentMetrics::sub(CurrentMetrics::KafkaConsumersInUse, 1);
@@ -484,6 +487,7 @@ KafkaConsumerPtr StorageKafka::popConsumer(std::chrono::milliseconds timeout)
auto consumer = consumers.back();
consumers.pop_back();
CurrentMetrics::add(CurrentMetrics::KafkaConsumersInUse, 1);
+ consumer->inUse();
return consumer;
}
@@ -512,7 +516,11 @@ KafkaConsumerPtr StorageKafka::createConsumer(size_t consumer_number)
size_t default_queued_min_messages = 100000; // we don't want to decrease the default
conf.set("queued.min.messages", std::max(getMaxBlockSize(),default_queued_min_messages));
- updateConfiguration(conf);
+ /// a reference to the consumer is needed in statistic callback
+ /// although the consumer does not exist when callback is being registered
+ /// shared_ptr> comes to the rescue
+ auto consumer_weak_ptr_ptr = std::make_shared();
+ updateConfiguration(conf, consumer_weak_ptr_ptr);
// those settings should not be changed by users.
conf.set("enable.auto.commit", "false"); // We manually commit offsets after a stream successfully finished
@@ -523,13 +531,20 @@ KafkaConsumerPtr StorageKafka::createConsumer(size_t consumer_number)
auto consumer_impl = std::make_shared(conf);
consumer_impl->set_destroy_flags(RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE);
+ KafkaConsumerPtr kafka_consumer_ptr;
+
/// NOTE: we pass |stream_cancelled| by reference here, so the buffers should not outlive the storage.
if (thread_per_consumer)
{
auto& stream_cancelled = tasks[consumer_number]->stream_cancelled;
- return std::make_shared(consumer_impl, log, getPollMaxBatchSize(), getPollTimeoutMillisecond(), intermediate_commit, stream_cancelled, topics);
+ kafka_consumer_ptr = std::make_shared(consumer_impl, log, getPollMaxBatchSize(), getPollTimeoutMillisecond(), intermediate_commit, stream_cancelled, topics);
}
- return std::make_shared(consumer_impl, log, getPollMaxBatchSize(), getPollTimeoutMillisecond(), intermediate_commit, tasks.back()->stream_cancelled, topics);
+ else
+ {
+ kafka_consumer_ptr = std::make_shared(consumer_impl, log, getPollMaxBatchSize(), getPollTimeoutMillisecond(), intermediate_commit, tasks.back()->stream_cancelled, topics);
+ }
+ *consumer_weak_ptr_ptr = kafka_consumer_ptr;
+ return kafka_consumer_ptr;
}
size_t StorageKafka::getMaxBlockSize() const
@@ -562,7 +577,8 @@ String StorageKafka::getConfigPrefix() const
return CONFIG_KAFKA_TAG;
}
-void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config)
+void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config,
+ std::shared_ptr kafka_consumer_weak_ptr_ptr)
{
// Update consumer configuration from the configuration. Example:
//
@@ -642,6 +658,26 @@ void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config)
LOG_IMPL(log, client_logs_level, poco_level, "[rdk:{}] {}", facility, message);
});
+ if (kafka_consumer_weak_ptr_ptr)
+ {
+ if (!config.has(config_prefix + "." + "statistics_interval_ms"))
+ {
+ kafka_config.set("statistics.interval.ms", "3000"); // every 3 seconds by default. set to 0 to disable.
+ }
+
+ if (kafka_config.get("statistics.interval.ms") != "0")
+ {
+ kafka_config.set_stats_callback([kafka_consumer_weak_ptr_ptr](cppkafka::KafkaHandleBase &, const std::string & stat_json_string)
+ {
+ auto kafka_consumer_ptr = kafka_consumer_weak_ptr_ptr->lock();
+ if (kafka_consumer_ptr)
+ {
+ kafka_consumer_ptr->setRDKafkaStat(stat_json_string);
+ }
+ });
+ }
+ }
+
// Configure interceptor to change thread name
//
// TODO: add interceptors support into the cppkafka.
@@ -952,7 +988,7 @@ void registerStorageKafka(StorageFactory & factory)
"of getting data from Kafka, consider using a setting kafka_thread_per_consumer=1, "
"and ensure you have enough threads "
"in MessageBrokerSchedulePool (background_message_broker_schedule_pool_size). "
- "See also https://clickhouse.com/docs/integrations/kafka/kafka-table-engine#tuning-performance", max_consumers);
+ "See also https://clickhouse.com/docs/en/integrations/kafka#tuning-performance", max_consumers);
}
else if (num_consumers < 1)
{
diff --git a/src/Storages/Kafka/StorageKafka.h b/src/Storages/Kafka/StorageKafka.h
index 72875e77b12..77e1370c2b7 100644
--- a/src/Storages/Kafka/StorageKafka.h
+++ b/src/Storages/Kafka/StorageKafka.h
@@ -23,9 +23,12 @@ class Configuration;
namespace DB
{
+class StorageSystemKafkaConsumers;
+
struct StorageKafkaInterceptors;
using KafkaConsumerPtr = std::shared_ptr;
+using KafkaConsumerWeakPtr = std::weak_ptr;
/** Implements a Kafka queue table engine that can be used as a persistent queue / buffer,
* or as a basic building block for creating pipelines with a continuous insertion / ETL.
@@ -77,6 +80,15 @@ public:
Names getVirtualColumnNames() const;
HandleKafkaErrorMode getHandleKafkaErrorMode() const { return kafka_settings->kafka_handle_error_mode; }
+ struct SafeConsumers
+ {
+ std::shared_ptr storage_ptr;
+ std::unique_lock lock;
+ std::vector & consumers;
+ };
+
+ SafeConsumers getSafeConsumers() { return {shared_from_this(), std::unique_lock(mutex), all_consumers}; }
+
private:
// Configuration and state
std::unique_ptr kafka_settings;
@@ -101,6 +113,7 @@ private:
size_t num_created_consumers = 0; /// number of actually created consumers.
std::vector consumers; /// available consumers
+ std::vector all_consumers; /// busy (belong to a KafkaSource) and vacant consumers
std::mutex mutex;
@@ -129,7 +142,12 @@ private:
std::atomic shutdown_called = false;
// Update Kafka configuration with values from CH user configuration.
- void updateConfiguration(cppkafka::Configuration & kafka_config);
+ void updateConfiguration(cppkafka::Configuration & kafka_config, std::shared_ptr);
+ void updateConfiguration(cppkafka::Configuration & kafka_config)
+ {
+ updateConfiguration(kafka_config, std::make_shared());
+ }
+
String getConfigPrefix() const;
void threadFunc(size_t idx);
@@ -142,6 +160,7 @@ private:
bool streamToViews();
bool checkDependencies(const StorageID & table_id);
+
};
}
diff --git a/src/Storages/System/StorageSystemKafkaConsumers.cpp b/src/Storages/System/StorageSystemKafkaConsumers.cpp
new file mode 100644
index 00000000000..eb7d84603c0
--- /dev/null
+++ b/src/Storages/System/StorageSystemKafkaConsumers.cpp
@@ -0,0 +1,175 @@
+#include "config.h"
+
+#if USE_RDKAFKA
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+#include
+#include
+#include "base/types.h"
+
+namespace DB
+{
+
+NamesAndTypesList StorageSystemKafkaConsumers::getNamesAndTypes()
+{
+ NamesAndTypesList names_and_types{
+ {"database", std::make_shared()},
+ {"table", std::make_shared()},
+ {"consumer_id", std::make_shared()}, //(number? or string? - single clickhouse table can have many consumers)
+ {"assignments.topic", std::make_shared(std::make_shared())},
+ {"assignments.partition_id", std::make_shared(std::make_shared())},
+ {"assignments.current_offset", std::make_shared(std::make_shared())},
+ {"exceptions.time", std::make_shared(std::make_shared())},
+ {"exceptions.text", std::make_shared(std::make_shared())},
+ {"last_poll_time", std::make_shared()},
+ {"num_messages_read", std::make_shared()},
+ {"last_commit_time", std::make_shared()},
+ {"num_commits", std::make_shared()},
+ {"last_rebalance_time", std::make_shared()},
+ {"num_rebalance_revocations", std::make_shared()},
+ {"num_rebalance_assignments", std::make_shared()},
+ {"is_currently_used", std::make_shared()},
+ {"rdkafka_stat", std::make_shared()},
+ };
+ return names_and_types;
+}
+
+void StorageSystemKafkaConsumers::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const
+{
+ auto tables_mark_dropped = DatabaseCatalog::instance().getTablesMarkedDropped();
+
+ size_t index = 0;
+
+
+ auto & database = assert_cast(*res_columns[index++]);
+ auto & table = assert_cast(*res_columns[index++]);
+ auto & consumer_id = assert_cast(*res_columns[index++]); //(number? or string? - single clickhouse table can have many consumers)
+
+ auto & assigments_topics = assert_cast(assert_cast(*res_columns[index]).getData());
+ auto & assigments_topics_offsets = assert_cast(*res_columns[index++]).getOffsets();
+
+ auto & assigments_partition_id = assert_cast(assert_cast(*res_columns[index]).getData());
+ auto & assigments_partition_id_offsets = assert_cast(*res_columns[index++]).getOffsets();
+
+ auto & assigments_current_offset = assert_cast(assert_cast(*res_columns[index]).getData());
+ auto & assigments_current_offset_offsets = assert_cast(*res_columns[index++]).getOffsets();
+
+ auto & exceptions_time = assert_cast(assert_cast(*res_columns[index]).getData());
+ auto & exceptions_time_offset = assert_cast(*res_columns[index++]).getOffsets();
+ auto & exceptions_text = assert_cast(assert_cast(*res_columns[index]).getData());
+ auto & exceptions_text_offset = assert_cast(*res_columns[index++]).getOffsets();
+ auto & last_poll_time = assert_cast(*res_columns[index++]);
+ auto & num_messages_read = assert_cast(*res_columns[index++]);
+ auto & last_commit_time = assert_cast(*res_columns[index++]);
+ auto & num_commits = assert_cast(*res_columns[index++]);
+ auto & last_rebalance_time = assert_cast(*res_columns[index++]);
+ auto & num_rebalance_revocations = assert_cast(*res_columns[index++]);
+ auto & num_rebalance_assigments = assert_cast(*res_columns[index++]);
+ auto & is_currently_used = assert_cast(*res_columns[index++]);
+ auto & rdkafka_stat = assert_cast(*res_columns[index++]);
+
+ const auto access = context->getAccess();
+ size_t last_assignment_num = 0;
+ size_t exceptions_num = 0;
+
+ auto add_row = [&](const DatabaseTablesIteratorPtr & it, StorageKafka * storage_kafka_ptr)
+ {
+ if (!access->isGranted(AccessType::SHOW_TABLES, it->databaseName(), it->name()))
+ {
+ return;
+ }
+
+ std::string database_str = it->databaseName();
+ std::string table_str = it->name();
+
+ auto safe_consumers = storage_kafka_ptr->getSafeConsumers();
+
+ for (const auto & weak_consumer : safe_consumers.consumers)
+ {
+ if (auto consumer = weak_consumer.lock())
+ {
+ auto consumer_stat = consumer->getStat();
+
+ database.insertData(database_str.data(), database_str.size());
+ table.insertData(table_str.data(), table_str.size());
+
+ consumer_id.insertData(consumer_stat.consumer_id.data(), consumer_stat.consumer_id.size());
+
+ const auto num_assignnemts = consumer_stat.assignments.size();
+
+ for (size_t num = 0; num < num_assignnemts; ++num)
+ {
+ const auto & assign = consumer_stat.assignments[num];
+
+ assigments_topics.insertData(assign.topic_str.data(), assign.topic_str.size());
+
+ assigments_partition_id.insert(assign.partition_id);
+ assigments_current_offset.insert(assign.current_offset);
+ }
+ last_assignment_num += num_assignnemts;
+
+ assigments_topics_offsets.push_back(last_assignment_num);
+ assigments_partition_id_offsets.push_back(last_assignment_num);
+ assigments_current_offset_offsets.push_back(last_assignment_num);
+
+ for (const auto & exc : consumer_stat.exceptions_buffer)
+ {
+ exceptions_text.insertData(exc.text.data(), exc.text.size());
+ exceptions_time.insert(exc.timestamp_usec);
+ }
+ exceptions_num += consumer_stat.exceptions_buffer.size();
+ exceptions_text_offset.push_back(exceptions_num);
+ exceptions_time_offset.push_back(exceptions_num);
+
+
+ last_poll_time.insert(consumer_stat.last_poll_time);
+ num_messages_read.insert(consumer_stat.num_messages_read);
+ last_commit_time.insert(consumer_stat.last_commit_timestamp_usec);
+ num_commits.insert(consumer_stat.num_commits);
+ last_rebalance_time.insert(consumer_stat.last_rebalance_timestamp_usec);
+
+ num_rebalance_revocations.insert(consumer_stat.num_rebalance_revocations);
+ num_rebalance_assigments.insert(consumer_stat.num_rebalance_assignments);
+
+ is_currently_used.insert(consumer_stat.in_use);
+
+ rdkafka_stat.insertData(consumer_stat.rdkafka_stat.data(), consumer_stat.rdkafka_stat.size());
+ }
+ }
+ };
+
+ const bool show_tables_granted = access->isGranted(AccessType::SHOW_TABLES);
+
+ if (show_tables_granted)
+ {
+ auto databases = DatabaseCatalog::instance().getDatabases();
+ for (const auto & db : databases)
+ {
+ for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next())
+ {
+ StoragePtr storage = iterator->table();
+ if (auto * kafka_table = dynamic_cast(storage.get()))
+ {
+ add_row(iterator, kafka_table);
+ }
+ }
+ }
+
+ }
+}
+
+}
+
+#endif
diff --git a/src/Storages/System/StorageSystemKafkaConsumers.h b/src/Storages/System/StorageSystemKafkaConsumers.h
new file mode 100644
index 00000000000..eda3a39bc7e
--- /dev/null
+++ b/src/Storages/System/StorageSystemKafkaConsumers.h
@@ -0,0 +1,27 @@
+#pragma once
+
+#include "config.h"
+
+#if USE_RDKAFKA
+
+
+#include
+
+
+namespace DB
+{
+
+class StorageSystemKafkaConsumers final : public IStorageSystemOneBlock
+{
+public:
+ std::string getName() const override { return "SystemKafkaConsumers"; }
+ static NamesAndTypesList getNamesAndTypes();
+
+protected:
+ using IStorageSystemOneBlock::IStorageSystemOneBlock;
+ void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const override;
+};
+
+}
+
+#endif
diff --git a/src/Storages/System/attachSystemTables.cpp b/src/Storages/System/attachSystemTables.cpp
index f0c67e0f787..b770f784c01 100644
--- a/src/Storages/System/attachSystemTables.cpp
+++ b/src/Storages/System/attachSystemTables.cpp
@@ -84,6 +84,10 @@
#include
#include
+#if USE_RDKAFKA
+#include
+#endif
+
#ifdef OS_LINUX
#include
#endif
@@ -144,6 +148,9 @@ void attachSystemTablesLocal(ContextPtr context, IDatabase & system_database)
attach(context, system_database, "backups");
attach(context, system_database, "schema_inference_cache");
attach(context, system_database, "dropped_tables");
+#if USE_RDKAFKA
+ attach(context, system_database, "kafka_consumers");
+#endif
#ifdef OS_LINUX
attach(context, system_database, "stack_trace");
#endif
diff --git a/tests/integration/test_kafka_bad_messages/test.py b/tests/integration/test_kafka_bad_messages/test.py
index da3cf36c853..a634ce36631 100644
--- a/tests/integration/test_kafka_bad_messages/test.py
+++ b/tests/integration/test_kafka_bad_messages/test.py
@@ -90,7 +90,9 @@ def producer_serializer(x):
return x.encode() if isinstance(x, str) else x
-def kafka_produce(kafka_cluster, topic, messages, timestamp=None, retries=15):
+def kafka_produce(
+ kafka_cluster, topic, messages, timestamp=None, retries=15, partition=None
+):
logging.debug(
"kafka_produce server:{}:{} topic:{}".format(
"localhost", kafka_cluster.kafka_port, topic
@@ -100,7 +102,9 @@ def kafka_produce(kafka_cluster, topic, messages, timestamp=None, retries=15):
kafka_cluster.kafka_port, producer_serializer, retries
)
for message in messages:
- producer.send(topic=topic, value=message, timestamp_ms=timestamp)
+ producer.send(
+ topic=topic, value=message, timestamp_ms=timestamp, partition=partition
+ )
producer.flush()
@@ -115,7 +119,7 @@ def kafka_cluster():
cluster.shutdown()
-def test_bad_messages_parsing(kafka_cluster):
+def test_bad_messages_parsing_stream(kafka_cluster):
admin_client = KafkaAdminClient(
bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)
)
@@ -244,7 +248,7 @@ struct Message
f"""
DROP TABLE IF EXISTS view;
DROP TABLE IF EXISTS kafka;
-
+
CREATE TABLE kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
@@ -253,9 +257,9 @@ struct Message
kafka_format = 'CapnProto',
kafka_handle_error_mode='stream',
kafka_schema='schema_test_errors:Message';
-
+
CREATE MATERIALIZED VIEW view Engine=Log AS
- SELECT _error FROM kafka WHERE length(_error) != 0 ;
+ SELECT _error FROM kafka WHERE length(_error) != 0;
"""
)
@@ -279,6 +283,68 @@ struct Message
kafka_delete_topic(admin_client, "CapnProto_err")
+def test_bad_messages_parsing_exception(kafka_cluster, max_retries=20):
+ admin_client = KafkaAdminClient(
+ bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)
+ )
+
+ for format_name in [
+ "Avro",
+ "JSONEachRow",
+ ]:
+ print(format_name)
+
+ kafka_create_topic(admin_client, f"{format_name}_err")
+
+ instance.query(
+ f"""
+ DROP TABLE IF EXISTS view_{format_name};
+ DROP TABLE IF EXISTS kafka_{format_name};
+ DROP TABLE IF EXISTS kafka;
+
+ CREATE TABLE kafka_{format_name} (key UInt64, value UInt64)
+ ENGINE = Kafka
+ SETTINGS kafka_broker_list = 'kafka1:19092',
+ kafka_topic_list = '{format_name}_err',
+ kafka_group_name = '{format_name}',
+ kafka_format = '{format_name}',
+ kafka_num_consumers = 1;
+
+ CREATE MATERIALIZED VIEW view_{format_name} Engine=Log AS
+ SELECT * FROM kafka_{format_name};
+ """
+ )
+
+ kafka_produce(
+ kafka_cluster, f"{format_name}_err", ["qwertyuiop", "asdfghjkl", "zxcvbnm"]
+ )
+
+ expected_result = """avro::Exception: Invalid data file. Magic does not match: : while parsing Kafka message (topic: Avro_err, partition: 0, offset: 0)\\'|1|1|1|default|kafka_Avro
+Cannot parse input: expected \\'{\\' before: \\'qwertyuiop\\': while parsing Kafka message (topic: JSONEachRow_err, partition: 0, offset: 0)\\'|1|1|1|default|kafka_JSONEachRow
+"""
+ retries = 0
+ result_system_kafka_consumers = ""
+ while True:
+ result_system_kafka_consumers = instance.query(
+ """
+ SELECT exceptions.text[1], length(exceptions.text) > 1 AND length(exceptions.text) < 15, length(exceptions.time) > 1 AND length(exceptions.time) < 15, abs(dateDiff('second', exceptions.time[1], now())) < 40, database, table FROM system.kafka_consumers ORDER BY table, assignments.partition_id[1]
+ """
+ )
+ result_system_kafka_consumers = result_system_kafka_consumers.replace("\t", "|")
+ if result_system_kafka_consumers == expected_result or retries > max_retries:
+ break
+ retries += 1
+ time.sleep(1)
+
+ assert result_system_kafka_consumers == expected_result
+
+ for format_name in [
+ "Avro",
+ "JSONEachRow",
+ ]:
+ kafka_delete_topic(admin_client, f"{format_name}_err")
+
+
if __name__ == "__main__":
cluster.start()
input("Cluster created, press any key to destroy...")
diff --git a/tests/integration/test_storage_kafka/configs/kafka.xml b/tests/integration/test_storage_kafka/configs/kafka.xml
index 062c98a2ac7..c6075aff715 100644
--- a/tests/integration/test_storage_kafka/configs/kafka.xml
+++ b/tests/integration/test_storage_kafka/configs/kafka.xml
@@ -10,6 +10,10 @@
-->
cgrp,consumer,topic,protocol
+
+
+ 600
+
consumer_hang
diff --git a/tests/integration/test_storage_kafka/test.py b/tests/integration/test_storage_kafka/test.py
index 28919cc685e..36815badd53 100644
--- a/tests/integration/test_storage_kafka/test.py
+++ b/tests/integration/test_storage_kafka/test.py
@@ -1186,6 +1186,7 @@ def test_kafka_consumer_hang2(kafka_cluster):
instance.query(
"""
DROP TABLE IF EXISTS test.kafka;
+ DROP TABLE IF EXISTS test.kafka2;
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
@@ -4545,6 +4546,294 @@ def test_block_based_formats_2(kafka_cluster):
kafka_delete_topic(admin_client, format_name)
+def test_system_kafka_consumers(kafka_cluster):
+ admin_client = KafkaAdminClient(
+ bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)
+ )
+
+ topic = "system_kafka_cons"
+ kafka_create_topic(admin_client, topic)
+
+ # Check that format_csv_delimiter parameter works now - as part of all available format settings.
+ kafka_produce(
+ kafka_cluster,
+ topic,
+ ["1|foo", "2|bar", "42|answer", "100|multi\n101|row\n103|message"],
+ )
+
+ instance.query(
+ f"""
+ DROP TABLE IF EXISTS test.kafka;
+
+ CREATE TABLE test.kafka (a UInt64, b String)
+ ENGINE = Kafka
+ SETTINGS kafka_broker_list = 'kafka1:19092',
+ kafka_topic_list = '{topic}',
+ kafka_group_name = '{topic}',
+ kafka_commit_on_select = 1,
+ kafka_format = 'CSV',
+ kafka_row_delimiter = '\\n',
+ format_csv_delimiter = '|';
+ """
+ )
+
+ result = instance.query("SELECT * FROM test.kafka ORDER BY a;")
+
+ result_system_kafka_consumers = instance.query(
+ """
+ create or replace function stable_timestamp as
+ (d)->multiIf(d==toDateTime('1970-01-01 00:00:00'), 'never', abs(dateDiff('second', d, now())) < 30, 'now', toString(d));
+
+ SELECT database, table, length(consumer_id), assignments.topic, assignments.partition_id,
+ assignments.current_offset,
+ if(length(exceptions.time)>0, exceptions.time[1]::String, 'never') as last_exception_time_,
+ if(length(exceptions.text)>0, exceptions.text[1], 'no exception') as last_exception_,
+ stable_timestamp(last_poll_time) as last_poll_time_, num_messages_read, stable_timestamp(last_commit_time) as last_commit_time_,
+ num_commits, stable_timestamp(last_rebalance_time) as last_rebalance_time_,
+ num_rebalance_revocations, num_rebalance_assignments, is_currently_used
+ FROM system.kafka_consumers WHERE database='test' and table='kafka' format Vertical;
+ """
+ )
+ logging.debug(f"result_system_kafka_consumers: {result_system_kafka_consumers}")
+ assert (
+ result_system_kafka_consumers
+ == """Row 1:
+──────
+database: test
+table: kafka
+length(consumer_id): 67
+assignments.topic: ['system_kafka_cons']
+assignments.partition_id: [0]
+assignments.current_offset: [4]
+last_exception_time_: never
+last_exception_: no exception
+last_poll_time_: now
+num_messages_read: 4
+last_commit_time_: now
+num_commits: 1
+last_rebalance_time_: never
+num_rebalance_revocations: 0
+num_rebalance_assignments: 1
+is_currently_used: 0
+"""
+ )
+
+ instance.query("DROP TABLE test.kafka")
+ kafka_delete_topic(admin_client, topic)
+
+
+def test_system_kafka_consumers_rebalance(kafka_cluster, max_retries=15):
+ # based on test_kafka_consumer_hang2
+ admin_client = KafkaAdminClient(
+ bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)
+ )
+
+ producer = KafkaProducer(
+ bootstrap_servers="localhost:{}".format(cluster.kafka_port),
+ value_serializer=producer_serializer,
+ key_serializer=producer_serializer,
+ )
+
+ topic = "system_kafka_cons2"
+ kafka_create_topic(admin_client, topic, num_partitions=2)
+
+ instance.query(
+ f"""
+ DROP TABLE IF EXISTS test.kafka;
+ DROP TABLE IF EXISTS test.kafka2;
+
+ CREATE TABLE test.kafka (key UInt64, value UInt64)
+ ENGINE = Kafka
+ SETTINGS kafka_broker_list = 'kafka1:19092',
+ kafka_topic_list = '{topic}',
+ kafka_group_name = '{topic}',
+ kafka_commit_on_select = 1,
+ kafka_format = 'JSONEachRow';
+
+ CREATE TABLE test.kafka2 (key UInt64, value UInt64)
+ ENGINE = Kafka
+ SETTINGS kafka_broker_list = 'kafka1:19092',
+ kafka_topic_list = '{topic}',
+ kafka_commit_on_select = 1,
+ kafka_group_name = '{topic}',
+ kafka_format = 'JSONEachRow';
+ """
+ )
+
+ producer.send(topic=topic, value=json.dumps({"key": 1, "value": 1}), partition=0)
+ producer.send(topic=topic, value=json.dumps({"key": 11, "value": 11}), partition=1)
+ time.sleep(3)
+
+ # first consumer subscribe the topic, try to poll some data, and go to rest
+ instance.query("SELECT * FROM test.kafka")
+
+ # second consumer do the same leading to rebalance in the first
+ # consumer, try to poll some data
+ instance.query("SELECT * FROM test.kafka2")
+
+ producer.send(topic=topic, value=json.dumps({"key": 1, "value": 1}), partition=0)
+ producer.send(topic=topic, value=json.dumps({"key": 10, "value": 10}), partition=1)
+ time.sleep(3)
+
+ instance.query("SELECT * FROM test.kafka")
+ instance.query("SELECT * FROM test.kafka2")
+ instance.query("SELECT * FROM test.kafka")
+ instance.query("SELECT * FROM test.kafka2")
+
+ result_system_kafka_consumers = instance.query(
+ """
+ create or replace function stable_timestamp as
+ (d)->multiIf(d==toDateTime('1970-01-01 00:00:00'), 'never', abs(dateDiff('second', d, now())) < 30, 'now', toString(d));
+ SELECT database, table, length(consumer_id), assignments.topic, assignments.partition_id,
+ assignments.current_offset,
+ if(length(exceptions.time)>0, exceptions.time[1]::String, 'never') as last_exception_time_,
+ if(length(exceptions.text)>0, exceptions.text[1], 'no exception') as last_exception_,
+ stable_timestamp(last_poll_time) as last_poll_time_, num_messages_read, stable_timestamp(last_commit_time) as last_commit_time_,
+ num_commits, stable_timestamp(last_rebalance_time) as last_rebalance_time_,
+ num_rebalance_revocations, num_rebalance_assignments, is_currently_used
+ FROM system.kafka_consumers WHERE database='test' and table IN ('kafka', 'kafka2') format Vertical;
+ """
+ )
+ logging.debug(f"result_system_kafka_consumers: {result_system_kafka_consumers}")
+ assert (
+ result_system_kafka_consumers
+ == """Row 1:
+──────
+database: test
+table: kafka
+length(consumer_id): 67
+assignments.topic: ['system_kafka_cons2']
+assignments.partition_id: [0]
+assignments.current_offset: [2]
+last_exception_time_: never
+last_exception_: no exception
+last_poll_time_: now
+num_messages_read: 4
+last_commit_time_: now
+num_commits: 2
+last_rebalance_time_: now
+num_rebalance_revocations: 1
+num_rebalance_assignments: 2
+is_currently_used: 0
+
+Row 2:
+──────
+database: test
+table: kafka2
+length(consumer_id): 68
+assignments.topic: ['system_kafka_cons2']
+assignments.partition_id: [1]
+assignments.current_offset: [2]
+last_exception_time_: never
+last_exception_: no exception
+last_poll_time_: now
+num_messages_read: 1
+last_commit_time_: now
+num_commits: 1
+last_rebalance_time_: never
+num_rebalance_revocations: 0
+num_rebalance_assignments: 1
+is_currently_used: 0
+"""
+ )
+
+ instance.query("DROP TABLE test.kafka")
+ instance.query("DROP TABLE test.kafka2")
+
+ kafka_delete_topic(admin_client, topic)
+
+
+def test_system_kafka_consumers_rebalance_mv(kafka_cluster, max_retries=15):
+ admin_client = KafkaAdminClient(
+ bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)
+ )
+
+ producer = KafkaProducer(
+ bootstrap_servers="localhost:{}".format(cluster.kafka_port),
+ value_serializer=producer_serializer,
+ key_serializer=producer_serializer,
+ )
+
+ topic = "system_kafka_cons_mv"
+ kafka_create_topic(admin_client, topic, num_partitions=2)
+
+ instance.query(
+ f"""
+ DROP TABLE IF EXISTS test.kafka;
+ DROP TABLE IF EXISTS test.kafka2;
+ DROP TABLE IF EXISTS test.kafka_persistent;
+ DROP TABLE IF EXISTS test.kafka_persistent2;
+
+ CREATE TABLE test.kafka (key UInt64, value UInt64)
+ ENGINE = Kafka
+ SETTINGS kafka_broker_list = 'kafka1:19092',
+ kafka_topic_list = '{topic}',
+ kafka_group_name = '{topic}',
+ kafka_commit_on_select = 1,
+ kafka_format = 'JSONEachRow';
+
+ CREATE TABLE test.kafka2 (key UInt64, value UInt64)
+ ENGINE = Kafka
+ SETTINGS kafka_broker_list = 'kafka1:19092',
+ kafka_topic_list = '{topic}',
+ kafka_commit_on_select = 1,
+ kafka_group_name = '{topic}',
+ kafka_format = 'JSONEachRow';
+
+ CREATE TABLE test.kafka_persistent (key UInt64, value UInt64)
+ ENGINE = MergeTree()
+ ORDER BY key;
+ CREATE TABLE test.kafka_persistent2 (key UInt64, value UInt64)
+ ENGINE = MergeTree()
+ ORDER BY key;
+
+ CREATE MATERIALIZED VIEW test.persistent_kafka_mv TO test.kafka_persistent AS
+ SELECT key, value
+ FROM test.kafka;
+
+ CREATE MATERIALIZED VIEW test.persistent_kafka_mv2 TO test.kafka_persistent2 AS
+ SELECT key, value
+ FROM test.kafka2;
+ """
+ )
+
+ producer.send(topic=topic, value=json.dumps({"key": 1, "value": 1}), partition=0)
+ producer.send(topic=topic, value=json.dumps({"key": 11, "value": 11}), partition=1)
+ time.sleep(3)
+
+ retries = 0
+ result_rdkafka_stat = ""
+ while True:
+ result_rdkafka_stat = instance.query(
+ """
+ SELECT table, JSONExtractString(rdkafka_stat, 'type')
+ FROM system.kafka_consumers WHERE database='test' and table = 'kafka' format Vertical;
+ """
+ )
+ if result_rdkafka_stat.find("consumer") or retries > max_retries:
+ break
+ retries += 1
+ time.sleep(1)
+
+ assert (
+ result_rdkafka_stat
+ == """Row 1:
+──────
+table: kafka
+JSONExtractString(rdkafka_stat, 'type'): consumer
+"""
+ )
+
+ instance.query("DROP TABLE test.kafka")
+ instance.query("DROP TABLE test.kafka2")
+ instance.query("DROP TABLE test.kafka_persistent")
+ instance.query("DROP TABLE test.kafka_persistent2")
+ instance.query("DROP TABLE test.persistent_kafka_mv")
+ instance.query("DROP TABLE test.persistent_kafka_mv2")
+
+ kafka_delete_topic(admin_client, topic)
+
+
if __name__ == "__main__":
cluster.start()
input("Cluster created, press any key to destroy...")