Merge pull request #50999 from ilejn/system_kafka_consumers

system.kafka_consumers table to monitor kafka consumers
This commit is contained in:
Kseniia Sumarokova 2023-08-19 13:31:18 +04:00 committed by GitHub
commit c0b8d7eddf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 842 additions and 14 deletions

View File

@ -173,6 +173,7 @@ Similar to GraphiteMergeTree, the Kafka engine supports extended configuration u
<!-- Global configuration options for all tables of Kafka engine type -->
<debug>cgrp</debug>
<auto_offset_reset>smallest</auto_offset_reset>
<statistics_interval_ms>600</statistics_interval_ms>
<!-- Configuration specific to topics "logs" and "stats" -->
@ -260,3 +261,4 @@ The number of rows in one Kafka message depends on whether the format is row-bas
- [Virtual columns](../../../engines/table-engines/index.md#table_engines-virtual_columns)
- [background_message_broker_schedule_pool_size](../../../operations/server-configuration-parameters/settings.md#background_message_broker_schedule_pool_size)
- [system.kafka_consumers](../../../operations/system-tables/kafka_consumers.md)

View File

@ -0,0 +1,58 @@
---
slug: /en/operations/system-tables/kafka_consumers
---
# kafka_consumers
Contains information about Kafka consumers.
Applicable for [Kafka table engine](../../engines/table-engines/integrations/kafka) (native ClickHouse integration)
Columns:
- `database` (String) - database of the table with Kafka Engine.
- `table` (String) - name of the table with Kafka Engine.
- `consumer_id` (String) - Kafka consumer identifier. Note, that a table can have many consumers. Specified by `kafka_num_consumers` parameter.
- `assignments.topic` (Array(String)) - Kafka topic.
- `assignments.partition_id` (Array(Int32)) - Kafka partition id. Note, that only one consumer can be assigned to a partition.
- `assignments.current_offset` (Array(Int64)) - current offset.
- `exceptions.time`, (Array(DateTime)) - timestamp when the 10 most recent exceptions were generated.
- `exceptions.text`, (Array(String)) - text of 10 most recent exceptions.
- `last_poll_time`, (DateTime) - timestamp of the most recent poll.
- `num_messages_read`, (UInt64) - number of messages read by the consumer.
- `last_commit_time`, (DateTime) - timestamp of the most recent poll.
- `num_commits`, (UInt64) - total number of commits for the consumer.
- `last_rebalance_time`, (DateTime) - timestamp of the most recent Kafka rebalance
- `num_rebalance_revocations`, (UInt64) - number of times the consumer was revoked its partitions
- `num_rebalance_assignments`, (UInt64) - number of times the consumer was assigned to Kafka cluster
- `is_currently_used`, (UInt8) - consumer is in use
- `rdkafka_stat` (String) - library internal statistic. See https://github.com/ClickHouse/librdkafka/blob/master/STATISTICS.md . Set `statistics_interval_ms` to 0 disable, default is 3000 (once in three seconds).
Example:
``` sql
SELECT *
FROM system.kafka_consumers
FORMAT Vertical
```
``` text
Row 1:
──────
database: test
table: kafka
consumer_id: ClickHouse-instance-test-kafka-1caddc7f-f917-4bb1-ac55-e28bd103a4a0
assignments.topic: ['system_kafka_cons']
assignments.partition_id: [0]
assignments.current_offset: [18446744073709550615]
exceptions.time: []
exceptions.text: []
last_poll_time: 2006-11-09 18:47:47
num_messages_read: 4
last_commit_time: 2006-11-10 04:39:40
num_commits: 1
last_rebalance_time: 1970-01-01 00:00:00
num_rebalance_revocations: 0
num_rebalance_assignments: 1
is_currently_used: 1
rdkafka_stat: {...}
```

View File

@ -384,7 +384,7 @@ target_link_libraries (clickhouse_common_io PUBLIC ch_contrib::abseil_swiss_tabl
dbms_target_link_libraries(PUBLIC ch_contrib::roaring)
if (TARGET ch_contrib::rdkafka)
dbms_target_link_libraries(PRIVATE ch_contrib::rdkafka ch_contrib::cppkafka)
dbms_target_link_libraries(PUBLIC ch_contrib::rdkafka ch_contrib::cppkafka)
endif()
if (TARGET ch_contrib::nats_io)

View File

@ -61,6 +61,7 @@ KafkaConsumer::KafkaConsumer(
, stopped(stopped_)
, current(messages.begin())
, topics(_topics)
, exceptions_buffer(EXCEPTIONS_DEPTH)
{
// called (synchronously, during poll) when we enter the consumer group
consumer->set_assignment_callback([this](const cppkafka::TopicPartitionList & topic_partitions)
@ -79,6 +80,7 @@ KafkaConsumer::KafkaConsumer(
}
assignment = topic_partitions;
num_rebalance_assignments++;
});
// called (synchronously, during poll) when we leave the consumer group
@ -106,6 +108,8 @@ KafkaConsumer::KafkaConsumer(
cleanUnprocessed();
stalled_status = REBALANCE_HAPPENED;
last_rebalance_timestamp_usec = static_cast<UInt64>(Poco::Timestamp().epochTime());
assignment.reset();
waited_for_assignment = 0;
@ -118,12 +122,14 @@ KafkaConsumer::KafkaConsumer(
// {
// LOG_WARNING(log, "Commit error: {}", e.what());
// }
num_rebalance_revocations++;
});
consumer->set_rebalance_error_callback([this](cppkafka::Error err)
{
LOG_ERROR(log, "Rebalance error: {}", err);
ProfileEvents::increment(ProfileEvents::KafkaRebalanceErrors);
setExceptionInfo(err);
});
}
@ -177,6 +183,7 @@ void KafkaConsumer::drain()
else
{
LOG_ERROR(log, "Error during draining: {}", error);
setExceptionInfo(error);
}
}
@ -251,6 +258,8 @@ void KafkaConsumer::commit()
consumer->commit();
committed = true;
print_offsets("Committed offset", consumer->get_offsets_committed(consumer->get_assignment()));
last_commit_timestamp_usec = static_cast<UInt64>(Poco::Timestamp().epochTime());
num_commits += 1;
}
catch (const cppkafka::HandleException & e)
{
@ -259,7 +268,10 @@ void KafkaConsumer::commit()
if (e.get_error() == RD_KAFKA_RESP_ERR__NO_OFFSET)
committed = true;
else
{
LOG_ERROR(log, "Exception during commit attempt: {}", e.what());
setExceptionInfo(e.what());
}
}
--max_retries;
}
@ -399,6 +411,8 @@ ReadBufferPtr KafkaConsumer::consume()
/// Don't drop old messages immediately, since we may need them for virtual columns.
auto new_messages = consumer->poll_batch(batch_size,
std::chrono::milliseconds(actual_poll_timeout_ms));
last_poll_timestamp_usec = static_cast<UInt64>(Poco::Timestamp().epochTime());
num_messages_read += new_messages.size();
resetIfStopped();
if (stalled_status == CONSUMER_STOPPED)
@ -495,6 +509,7 @@ size_t KafkaConsumer::filterMessageErrors()
{
ProfileEvents::increment(ProfileEvents::KafkaConsumerErrors);
LOG_ERROR(log, "Consumer error: {}", error);
setExceptionInfo(error);
return true;
}
return false;
@ -527,4 +542,64 @@ void KafkaConsumer::storeLastReadMessageOffset()
}
}
void KafkaConsumer::setExceptionInfo(const cppkafka::Error & err)
{
setExceptionInfo(err.to_string());
}
void KafkaConsumer::setExceptionInfo(const String & text)
{
std::lock_guard<std::mutex> lock(exception_mutex);
exceptions_buffer.push_back({text, static_cast<UInt64>(Poco::Timestamp().epochTime())});
}
/*
* Needed until
* https://github.com/mfontanini/cppkafka/pull/309
* is merged,
* because consumer->get_member_id() contains a leak
*/
std::string KafkaConsumer::getMemberId() const
{
char * memberid_ptr = rd_kafka_memberid(consumer->get_handle());
std::string memberid_string = memberid_ptr;
rd_kafka_mem_free(nullptr, memberid_ptr);
return memberid_string;
}
KafkaConsumer::Stat KafkaConsumer::getStat() const
{
KafkaConsumer::Stat::Assignments assignments;
auto cpp_assignments = consumer->get_assignment();
auto cpp_offsets = consumer->get_offsets_position(cpp_assignments);
for (size_t num = 0; num < cpp_assignments.size(); ++num)
{
assignments.push_back({
cpp_assignments[num].get_topic(),
cpp_assignments[num].get_partition(),
cpp_offsets[num].get_offset(),
});
}
return {
.consumer_id = getMemberId() /* consumer->get_member_id() */ ,
.assignments = std::move(assignments),
.last_poll_time = last_poll_timestamp_usec.load(),
.num_messages_read = num_messages_read.load(),
.last_commit_timestamp_usec = last_commit_timestamp_usec.load(),
.last_rebalance_timestamp_usec = last_rebalance_timestamp_usec.load(),
.num_commits = num_commits.load(),
.num_rebalance_assignments = num_rebalance_assignments.load(),
.num_rebalance_revocations = num_rebalance_revocations.load(),
.exceptions_buffer = [&](){std::lock_guard<std::mutex> lock(exception_mutex);
return exceptions_buffer;}(),
.in_use = in_use.load(),
.rdkafka_stat = [&](){std::lock_guard<std::mutex> lock(rdkafka_stat_mutex);
return rdkafka_stat;}(),
};
}
}

View File

@ -1,5 +1,7 @@
#pragma once
#include <boost/circular_buffer.hpp>
#include <Core/Names.h>
#include <base/types.h>
#include <IO/ReadBuffer.h>
@ -20,10 +22,44 @@ namespace Poco
namespace DB
{
class StorageSystemKafkaConsumers;
using ConsumerPtr = std::shared_ptr<cppkafka::Consumer>;
class KafkaConsumer
{
public:
struct ExceptionInfo
{
String text;
UInt64 timestamp_usec;
};
using ExceptionsBuffer = boost::circular_buffer<ExceptionInfo>;
struct Stat // system.kafka_consumers data
{
struct Assignment
{
String topic_str;
Int32 partition_id;
Int64 current_offset;
};
using Assignments = std::vector<Assignment>;
String consumer_id;
Assignments assignments;
UInt64 last_poll_time;
UInt64 num_messages_read;
UInt64 last_commit_timestamp_usec;
UInt64 last_rebalance_timestamp_usec;
UInt64 num_commits;
UInt64 num_rebalance_assignments;
UInt64 num_rebalance_revocations;
KafkaConsumer::ExceptionsBuffer exceptions_buffer;
bool in_use;
std::string rdkafka_stat;
};
public:
KafkaConsumer(
ConsumerPtr consumer_,
@ -69,6 +105,18 @@ public:
auto currentTimestamp() const { return current[-1].get_timestamp(); }
const auto & currentHeaderList() const { return current[-1].get_header_list(); }
String currentPayload() const { return current[-1].get_payload(); }
void setExceptionInfo(const cppkafka::Error & err);
void setExceptionInfo(const String & text);
void setRDKafkaStat(const std::string & stat_json_string)
{
std::lock_guard<std::mutex> lock(rdkafka_stat_mutex);
rdkafka_stat = stat_json_string;
}
void inUse() { in_use = true; }
void notInUse() { in_use = false; }
// For system.kafka_consumers
Stat getStat() const;
private:
using Messages = std::vector<cppkafka::Message>;
@ -105,12 +153,33 @@ private:
std::optional<cppkafka::TopicPartitionList> assignment;
const Names topics;
/// system.kafka_consumers data is retrieved asynchronously
/// so we have to protect exceptions_buffer
mutable std::mutex exception_mutex;
const size_t EXCEPTIONS_DEPTH = 10;
ExceptionsBuffer exceptions_buffer;
std::atomic<UInt64> last_exception_timestamp_usec = 0;
std::atomic<UInt64> last_poll_timestamp_usec = 0;
std::atomic<UInt64> num_messages_read = 0;
std::atomic<UInt64> last_commit_timestamp_usec = 0;
std::atomic<UInt64> num_commits = 0;
std::atomic<UInt64> last_rebalance_timestamp_usec = 0;
std::atomic<UInt64> num_rebalance_assignments = 0;
std::atomic<UInt64> num_rebalance_revocations = 0;
std::atomic<bool> in_use = 0;
mutable std::mutex rdkafka_stat_mutex;
std::string rdkafka_stat;
void drain();
void cleanUnprocessed();
void resetIfStopped();
/// Return number of messages with an error.
size_t filterMessageErrors();
ReadBufferPtr getNextMessage();
std::string getMemberId() const;
};
}

View File

@ -133,6 +133,7 @@ Chunk KafkaSource::generateImpl()
{
e.addMessage("while parsing Kafka message (topic: {}, partition: {}, offset: {})'",
consumer->currentTopic(), consumer->currentPartition(), consumer->currentOffset());
consumer->setExceptionInfo(e.message());
throw std::move(e);
}
};

View File

@ -416,7 +416,9 @@ void StorageKafka::startup()
{
try
{
pushConsumer(createConsumer(i));
auto consumer = createConsumer(i);
pushConsumer(consumer);
all_consumers.push_back(consumer);
++num_created_consumers;
}
catch (const cppkafka::Exception &)
@ -456,6 +458,7 @@ void StorageKafka::shutdown()
void StorageKafka::pushConsumer(KafkaConsumerPtr consumer)
{
std::lock_guard lock(mutex);
consumer->notInUse();
consumers.push_back(consumer);
semaphore.set();
CurrentMetrics::sub(CurrentMetrics::KafkaConsumersInUse, 1);
@ -484,6 +487,7 @@ KafkaConsumerPtr StorageKafka::popConsumer(std::chrono::milliseconds timeout)
auto consumer = consumers.back();
consumers.pop_back();
CurrentMetrics::add(CurrentMetrics::KafkaConsumersInUse, 1);
consumer->inUse();
return consumer;
}
@ -512,7 +516,11 @@ KafkaConsumerPtr StorageKafka::createConsumer(size_t consumer_number)
size_t default_queued_min_messages = 100000; // we don't want to decrease the default
conf.set("queued.min.messages", std::max(getMaxBlockSize(),default_queued_min_messages));
updateConfiguration(conf);
/// a reference to the consumer is needed in statistic callback
/// although the consumer does not exist when callback is being registered
/// shared_ptr<weak_ptr<KafkaConsumer>> comes to the rescue
auto consumer_weak_ptr_ptr = std::make_shared<KafkaConsumerWeakPtr>();
updateConfiguration(conf, consumer_weak_ptr_ptr);
// those settings should not be changed by users.
conf.set("enable.auto.commit", "false"); // We manually commit offsets after a stream successfully finished
@ -523,13 +531,20 @@ KafkaConsumerPtr StorageKafka::createConsumer(size_t consumer_number)
auto consumer_impl = std::make_shared<cppkafka::Consumer>(conf);
consumer_impl->set_destroy_flags(RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE);
KafkaConsumerPtr kafka_consumer_ptr;
/// NOTE: we pass |stream_cancelled| by reference here, so the buffers should not outlive the storage.
if (thread_per_consumer)
{
auto& stream_cancelled = tasks[consumer_number]->stream_cancelled;
return std::make_shared<KafkaConsumer>(consumer_impl, log, getPollMaxBatchSize(), getPollTimeoutMillisecond(), intermediate_commit, stream_cancelled, topics);
kafka_consumer_ptr = std::make_shared<KafkaConsumer>(consumer_impl, log, getPollMaxBatchSize(), getPollTimeoutMillisecond(), intermediate_commit, stream_cancelled, topics);
}
return std::make_shared<KafkaConsumer>(consumer_impl, log, getPollMaxBatchSize(), getPollTimeoutMillisecond(), intermediate_commit, tasks.back()->stream_cancelled, topics);
else
{
kafka_consumer_ptr = std::make_shared<KafkaConsumer>(consumer_impl, log, getPollMaxBatchSize(), getPollTimeoutMillisecond(), intermediate_commit, tasks.back()->stream_cancelled, topics);
}
*consumer_weak_ptr_ptr = kafka_consumer_ptr;
return kafka_consumer_ptr;
}
size_t StorageKafka::getMaxBlockSize() const
@ -562,7 +577,8 @@ String StorageKafka::getConfigPrefix() const
return CONFIG_KAFKA_TAG;
}
void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config)
void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config,
std::shared_ptr<KafkaConsumerWeakPtr> kafka_consumer_weak_ptr_ptr)
{
// Update consumer configuration from the configuration. Example:
// <kafka>
@ -642,6 +658,26 @@ void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config)
LOG_IMPL(log, client_logs_level, poco_level, "[rdk:{}] {}", facility, message);
});
if (kafka_consumer_weak_ptr_ptr)
{
if (!config.has(config_prefix + "." + "statistics_interval_ms"))
{
kafka_config.set("statistics.interval.ms", "3000"); // every 3 seconds by default. set to 0 to disable.
}
if (kafka_config.get("statistics.interval.ms") != "0")
{
kafka_config.set_stats_callback([kafka_consumer_weak_ptr_ptr](cppkafka::KafkaHandleBase &, const std::string & stat_json_string)
{
auto kafka_consumer_ptr = kafka_consumer_weak_ptr_ptr->lock();
if (kafka_consumer_ptr)
{
kafka_consumer_ptr->setRDKafkaStat(stat_json_string);
}
});
}
}
// Configure interceptor to change thread name
//
// TODO: add interceptors support into the cppkafka.
@ -952,7 +988,7 @@ void registerStorageKafka(StorageFactory & factory)
"of getting data from Kafka, consider using a setting kafka_thread_per_consumer=1, "
"and ensure you have enough threads "
"in MessageBrokerSchedulePool (background_message_broker_schedule_pool_size). "
"See also https://clickhouse.com/docs/integrations/kafka/kafka-table-engine#tuning-performance", max_consumers);
"See also https://clickhouse.com/docs/en/integrations/kafka#tuning-performance", max_consumers);
}
else if (num_consumers < 1)
{

View File

@ -23,9 +23,12 @@ class Configuration;
namespace DB
{
class StorageSystemKafkaConsumers;
struct StorageKafkaInterceptors;
using KafkaConsumerPtr = std::shared_ptr<KafkaConsumer>;
using KafkaConsumerWeakPtr = std::weak_ptr<KafkaConsumer>;
/** Implements a Kafka queue table engine that can be used as a persistent queue / buffer,
* or as a basic building block for creating pipelines with a continuous insertion / ETL.
@ -77,6 +80,15 @@ public:
Names getVirtualColumnNames() const;
HandleKafkaErrorMode getHandleKafkaErrorMode() const { return kafka_settings->kafka_handle_error_mode; }
struct SafeConsumers
{
std::shared_ptr<IStorage> storage_ptr;
std::unique_lock<std::mutex> lock;
std::vector<KafkaConsumerWeakPtr> & consumers;
};
SafeConsumers getSafeConsumers() { return {shared_from_this(), std::unique_lock(mutex), all_consumers}; }
private:
// Configuration and state
std::unique_ptr<KafkaSettings> kafka_settings;
@ -101,6 +113,7 @@ private:
size_t num_created_consumers = 0; /// number of actually created consumers.
std::vector<KafkaConsumerPtr> consumers; /// available consumers
std::vector<KafkaConsumerWeakPtr> all_consumers; /// busy (belong to a KafkaSource) and vacant consumers
std::mutex mutex;
@ -129,7 +142,12 @@ private:
std::atomic<bool> shutdown_called = false;
// Update Kafka configuration with values from CH user configuration.
void updateConfiguration(cppkafka::Configuration & kafka_config);
void updateConfiguration(cppkafka::Configuration & kafka_config, std::shared_ptr<KafkaConsumerWeakPtr>);
void updateConfiguration(cppkafka::Configuration & kafka_config)
{
updateConfiguration(kafka_config, std::make_shared<KafkaConsumerWeakPtr>());
}
String getConfigPrefix() const;
void threadFunc(size_t idx);
@ -142,6 +160,7 @@ private:
bool streamToViews();
bool checkDependencies(const StorageID & table_id);
};
}

View File

@ -0,0 +1,175 @@
#include "config.h"
#if USE_RDKAFKA
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeUUID.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnsDateTime.h>
#include <Access/ContextAccess.h>
#include <Storages/System/StorageSystemKafkaConsumers.h>
#include <Storages/Kafka/StorageKafka.h>
#include <Interpreters/Context.h>
#include <Interpreters/DatabaseCatalog.h>
#include "base/types.h"
namespace DB
{
NamesAndTypesList StorageSystemKafkaConsumers::getNamesAndTypes()
{
NamesAndTypesList names_and_types{
{"database", std::make_shared<DataTypeString>()},
{"table", std::make_shared<DataTypeString>()},
{"consumer_id", std::make_shared<DataTypeString>()}, //(number? or string? - single clickhouse table can have many consumers)
{"assignments.topic", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
{"assignments.partition_id", std::make_shared<DataTypeArray>(std::make_shared<DataTypeInt32>())},
{"assignments.current_offset", std::make_shared<DataTypeArray>(std::make_shared<DataTypeInt64>())},
{"exceptions.time", std::make_shared<DataTypeArray>(std::make_shared<DataTypeDateTime>())},
{"exceptions.text", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
{"last_poll_time", std::make_shared<DataTypeDateTime>()},
{"num_messages_read", std::make_shared<DataTypeUInt64>()},
{"last_commit_time", std::make_shared<DataTypeDateTime>()},
{"num_commits", std::make_shared<DataTypeUInt64>()},
{"last_rebalance_time", std::make_shared<DataTypeDateTime>()},
{"num_rebalance_revocations", std::make_shared<DataTypeUInt64>()},
{"num_rebalance_assignments", std::make_shared<DataTypeUInt64>()},
{"is_currently_used", std::make_shared<DataTypeUInt8>()},
{"rdkafka_stat", std::make_shared<DataTypeString>()},
};
return names_and_types;
}
void StorageSystemKafkaConsumers::fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const
{
auto tables_mark_dropped = DatabaseCatalog::instance().getTablesMarkedDropped();
size_t index = 0;
auto & database = assert_cast<ColumnString &>(*res_columns[index++]);
auto & table = assert_cast<ColumnString &>(*res_columns[index++]);
auto & consumer_id = assert_cast<ColumnString &>(*res_columns[index++]); //(number? or string? - single clickhouse table can have many consumers)
auto & assigments_topics = assert_cast<ColumnString &>(assert_cast<ColumnArray &>(*res_columns[index]).getData());
auto & assigments_topics_offsets = assert_cast<ColumnArray &>(*res_columns[index++]).getOffsets();
auto & assigments_partition_id = assert_cast<ColumnInt32 &>(assert_cast<ColumnArray &>(*res_columns[index]).getData());
auto & assigments_partition_id_offsets = assert_cast<ColumnArray &>(*res_columns[index++]).getOffsets();
auto & assigments_current_offset = assert_cast<ColumnInt64 &>(assert_cast<ColumnArray &>(*res_columns[index]).getData());
auto & assigments_current_offset_offsets = assert_cast<ColumnArray &>(*res_columns[index++]).getOffsets();
auto & exceptions_time = assert_cast<ColumnDateTime &>(assert_cast<ColumnArray &>(*res_columns[index]).getData());
auto & exceptions_time_offset = assert_cast<ColumnArray &>(*res_columns[index++]).getOffsets();
auto & exceptions_text = assert_cast<ColumnString &>(assert_cast<ColumnArray &>(*res_columns[index]).getData());
auto & exceptions_text_offset = assert_cast<ColumnArray &>(*res_columns[index++]).getOffsets();
auto & last_poll_time = assert_cast<ColumnDateTime &>(*res_columns[index++]);
auto & num_messages_read = assert_cast<ColumnUInt64 &>(*res_columns[index++]);
auto & last_commit_time = assert_cast<ColumnDateTime &>(*res_columns[index++]);
auto & num_commits = assert_cast<ColumnUInt64 &>(*res_columns[index++]);
auto & last_rebalance_time = assert_cast<ColumnDateTime &>(*res_columns[index++]);
auto & num_rebalance_revocations = assert_cast<ColumnUInt64 &>(*res_columns[index++]);
auto & num_rebalance_assigments = assert_cast<ColumnUInt64 &>(*res_columns[index++]);
auto & is_currently_used = assert_cast<ColumnUInt8 &>(*res_columns[index++]);
auto & rdkafka_stat = assert_cast<ColumnString &>(*res_columns[index++]);
const auto access = context->getAccess();
size_t last_assignment_num = 0;
size_t exceptions_num = 0;
auto add_row = [&](const DatabaseTablesIteratorPtr & it, StorageKafka * storage_kafka_ptr)
{
if (!access->isGranted(AccessType::SHOW_TABLES, it->databaseName(), it->name()))
{
return;
}
std::string database_str = it->databaseName();
std::string table_str = it->name();
auto safe_consumers = storage_kafka_ptr->getSafeConsumers();
for (const auto & weak_consumer : safe_consumers.consumers)
{
if (auto consumer = weak_consumer.lock())
{
auto consumer_stat = consumer->getStat();
database.insertData(database_str.data(), database_str.size());
table.insertData(table_str.data(), table_str.size());
consumer_id.insertData(consumer_stat.consumer_id.data(), consumer_stat.consumer_id.size());
const auto num_assignnemts = consumer_stat.assignments.size();
for (size_t num = 0; num < num_assignnemts; ++num)
{
const auto & assign = consumer_stat.assignments[num];
assigments_topics.insertData(assign.topic_str.data(), assign.topic_str.size());
assigments_partition_id.insert(assign.partition_id);
assigments_current_offset.insert(assign.current_offset);
}
last_assignment_num += num_assignnemts;
assigments_topics_offsets.push_back(last_assignment_num);
assigments_partition_id_offsets.push_back(last_assignment_num);
assigments_current_offset_offsets.push_back(last_assignment_num);
for (const auto & exc : consumer_stat.exceptions_buffer)
{
exceptions_text.insertData(exc.text.data(), exc.text.size());
exceptions_time.insert(exc.timestamp_usec);
}
exceptions_num += consumer_stat.exceptions_buffer.size();
exceptions_text_offset.push_back(exceptions_num);
exceptions_time_offset.push_back(exceptions_num);
last_poll_time.insert(consumer_stat.last_poll_time);
num_messages_read.insert(consumer_stat.num_messages_read);
last_commit_time.insert(consumer_stat.last_commit_timestamp_usec);
num_commits.insert(consumer_stat.num_commits);
last_rebalance_time.insert(consumer_stat.last_rebalance_timestamp_usec);
num_rebalance_revocations.insert(consumer_stat.num_rebalance_revocations);
num_rebalance_assigments.insert(consumer_stat.num_rebalance_assignments);
is_currently_used.insert(consumer_stat.in_use);
rdkafka_stat.insertData(consumer_stat.rdkafka_stat.data(), consumer_stat.rdkafka_stat.size());
}
}
};
const bool show_tables_granted = access->isGranted(AccessType::SHOW_TABLES);
if (show_tables_granted)
{
auto databases = DatabaseCatalog::instance().getDatabases();
for (const auto & db : databases)
{
for (auto iterator = db.second->getTablesIterator(context); iterator->isValid(); iterator->next())
{
StoragePtr storage = iterator->table();
if (auto * kafka_table = dynamic_cast<StorageKafka *>(storage.get()))
{
add_row(iterator, kafka_table);
}
}
}
}
}
}
#endif

View File

@ -0,0 +1,27 @@
#pragma once
#include "config.h"
#if USE_RDKAFKA
#include <Storages/System/IStorageSystemOneBlock.h>
namespace DB
{
class StorageSystemKafkaConsumers final : public IStorageSystemOneBlock<StorageSystemKafkaConsumers>
{
public:
std::string getName() const override { return "SystemKafkaConsumers"; }
static NamesAndTypesList getNamesAndTypes();
protected:
using IStorageSystemOneBlock::IStorageSystemOneBlock;
void fillData(MutableColumns & res_columns, ContextPtr context, const SelectQueryInfo &) const override;
};
}
#endif

View File

@ -84,6 +84,10 @@
#include <Storages/System/StorageSystemZooKeeperConnection.h>
#include <Storages/System/StorageSystemJemalloc.h>
#if USE_RDKAFKA
#include <Storages/System/StorageSystemKafkaConsumers.h>
#endif
#ifdef OS_LINUX
#include <Storages/System/StorageSystemStackTrace.h>
#endif
@ -144,6 +148,9 @@ void attachSystemTablesLocal(ContextPtr context, IDatabase & system_database)
attach<StorageSystemBackups>(context, system_database, "backups");
attach<StorageSystemSchemaInferenceCache>(context, system_database, "schema_inference_cache");
attach<StorageSystemDroppedTables>(context, system_database, "dropped_tables");
#if USE_RDKAFKA
attach<StorageSystemKafkaConsumers>(context, system_database, "kafka_consumers");
#endif
#ifdef OS_LINUX
attach<StorageSystemStackTrace>(context, system_database, "stack_trace");
#endif

View File

@ -90,7 +90,9 @@ def producer_serializer(x):
return x.encode() if isinstance(x, str) else x
def kafka_produce(kafka_cluster, topic, messages, timestamp=None, retries=15):
def kafka_produce(
kafka_cluster, topic, messages, timestamp=None, retries=15, partition=None
):
logging.debug(
"kafka_produce server:{}:{} topic:{}".format(
"localhost", kafka_cluster.kafka_port, topic
@ -100,7 +102,9 @@ def kafka_produce(kafka_cluster, topic, messages, timestamp=None, retries=15):
kafka_cluster.kafka_port, producer_serializer, retries
)
for message in messages:
producer.send(topic=topic, value=message, timestamp_ms=timestamp)
producer.send(
topic=topic, value=message, timestamp_ms=timestamp, partition=partition
)
producer.flush()
@ -115,7 +119,7 @@ def kafka_cluster():
cluster.shutdown()
def test_bad_messages_parsing(kafka_cluster):
def test_bad_messages_parsing_stream(kafka_cluster):
admin_client = KafkaAdminClient(
bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)
)
@ -255,7 +259,7 @@ struct Message
kafka_schema='schema_test_errors:Message';
CREATE MATERIALIZED VIEW view Engine=Log AS
SELECT _error FROM kafka WHERE length(_error) != 0 ;
SELECT _error FROM kafka WHERE length(_error) != 0;
"""
)
@ -279,6 +283,68 @@ struct Message
kafka_delete_topic(admin_client, "CapnProto_err")
def test_bad_messages_parsing_exception(kafka_cluster, max_retries=20):
admin_client = KafkaAdminClient(
bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)
)
for format_name in [
"Avro",
"JSONEachRow",
]:
print(format_name)
kafka_create_topic(admin_client, f"{format_name}_err")
instance.query(
f"""
DROP TABLE IF EXISTS view_{format_name};
DROP TABLE IF EXISTS kafka_{format_name};
DROP TABLE IF EXISTS kafka;
CREATE TABLE kafka_{format_name} (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = '{format_name}_err',
kafka_group_name = '{format_name}',
kafka_format = '{format_name}',
kafka_num_consumers = 1;
CREATE MATERIALIZED VIEW view_{format_name} Engine=Log AS
SELECT * FROM kafka_{format_name};
"""
)
kafka_produce(
kafka_cluster, f"{format_name}_err", ["qwertyuiop", "asdfghjkl", "zxcvbnm"]
)
expected_result = """avro::Exception: Invalid data file. Magic does not match: : while parsing Kafka message (topic: Avro_err, partition: 0, offset: 0)\\'|1|1|1|default|kafka_Avro
Cannot parse input: expected \\'{\\' before: \\'qwertyuiop\\': while parsing Kafka message (topic: JSONEachRow_err, partition: 0, offset: 0)\\'|1|1|1|default|kafka_JSONEachRow
"""
retries = 0
result_system_kafka_consumers = ""
while True:
result_system_kafka_consumers = instance.query(
"""
SELECT exceptions.text[1], length(exceptions.text) > 1 AND length(exceptions.text) < 15, length(exceptions.time) > 1 AND length(exceptions.time) < 15, abs(dateDiff('second', exceptions.time[1], now())) < 40, database, table FROM system.kafka_consumers ORDER BY table, assignments.partition_id[1]
"""
)
result_system_kafka_consumers = result_system_kafka_consumers.replace("\t", "|")
if result_system_kafka_consumers == expected_result or retries > max_retries:
break
retries += 1
time.sleep(1)
assert result_system_kafka_consumers == expected_result
for format_name in [
"Avro",
"JSONEachRow",
]:
kafka_delete_topic(admin_client, f"{format_name}_err")
if __name__ == "__main__":
cluster.start()
input("Cluster created, press any key to destroy...")

View File

@ -10,6 +10,10 @@
-->
<debug>cgrp,consumer,topic,protocol</debug>
<!-- librdkafka stat in system.kafka_consumers -->
<!-- default 3000 (every three second) seems too long for test -->
<statistics_interval_ms>600</statistics_interval_ms>
<kafka_topic>
<name>consumer_hang</name>
<!-- default: 3000 -->

View File

@ -1186,6 +1186,7 @@ def test_kafka_consumer_hang2(kafka_cluster):
instance.query(
"""
DROP TABLE IF EXISTS test.kafka;
DROP TABLE IF EXISTS test.kafka2;
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
@ -4545,6 +4546,294 @@ def test_block_based_formats_2(kafka_cluster):
kafka_delete_topic(admin_client, format_name)
def test_system_kafka_consumers(kafka_cluster):
admin_client = KafkaAdminClient(
bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)
)
topic = "system_kafka_cons"
kafka_create_topic(admin_client, topic)
# Check that format_csv_delimiter parameter works now - as part of all available format settings.
kafka_produce(
kafka_cluster,
topic,
["1|foo", "2|bar", "42|answer", "100|multi\n101|row\n103|message"],
)
instance.query(
f"""
DROP TABLE IF EXISTS test.kafka;
CREATE TABLE test.kafka (a UInt64, b String)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = '{topic}',
kafka_group_name = '{topic}',
kafka_commit_on_select = 1,
kafka_format = 'CSV',
kafka_row_delimiter = '\\n',
format_csv_delimiter = '|';
"""
)
result = instance.query("SELECT * FROM test.kafka ORDER BY a;")
result_system_kafka_consumers = instance.query(
"""
create or replace function stable_timestamp as
(d)->multiIf(d==toDateTime('1970-01-01 00:00:00'), 'never', abs(dateDiff('second', d, now())) < 30, 'now', toString(d));
SELECT database, table, length(consumer_id), assignments.topic, assignments.partition_id,
assignments.current_offset,
if(length(exceptions.time)>0, exceptions.time[1]::String, 'never') as last_exception_time_,
if(length(exceptions.text)>0, exceptions.text[1], 'no exception') as last_exception_,
stable_timestamp(last_poll_time) as last_poll_time_, num_messages_read, stable_timestamp(last_commit_time) as last_commit_time_,
num_commits, stable_timestamp(last_rebalance_time) as last_rebalance_time_,
num_rebalance_revocations, num_rebalance_assignments, is_currently_used
FROM system.kafka_consumers WHERE database='test' and table='kafka' format Vertical;
"""
)
logging.debug(f"result_system_kafka_consumers: {result_system_kafka_consumers}")
assert (
result_system_kafka_consumers
== """Row 1:
database: test
table: kafka
length(consumer_id): 67
assignments.topic: ['system_kafka_cons']
assignments.partition_id: [0]
assignments.current_offset: [4]
last_exception_time_: never
last_exception_: no exception
last_poll_time_: now
num_messages_read: 4
last_commit_time_: now
num_commits: 1
last_rebalance_time_: never
num_rebalance_revocations: 0
num_rebalance_assignments: 1
is_currently_used: 0
"""
)
instance.query("DROP TABLE test.kafka")
kafka_delete_topic(admin_client, topic)
def test_system_kafka_consumers_rebalance(kafka_cluster, max_retries=15):
# based on test_kafka_consumer_hang2
admin_client = KafkaAdminClient(
bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)
)
producer = KafkaProducer(
bootstrap_servers="localhost:{}".format(cluster.kafka_port),
value_serializer=producer_serializer,
key_serializer=producer_serializer,
)
topic = "system_kafka_cons2"
kafka_create_topic(admin_client, topic, num_partitions=2)
instance.query(
f"""
DROP TABLE IF EXISTS test.kafka;
DROP TABLE IF EXISTS test.kafka2;
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = '{topic}',
kafka_group_name = '{topic}',
kafka_commit_on_select = 1,
kafka_format = 'JSONEachRow';
CREATE TABLE test.kafka2 (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = '{topic}',
kafka_commit_on_select = 1,
kafka_group_name = '{topic}',
kafka_format = 'JSONEachRow';
"""
)
producer.send(topic=topic, value=json.dumps({"key": 1, "value": 1}), partition=0)
producer.send(topic=topic, value=json.dumps({"key": 11, "value": 11}), partition=1)
time.sleep(3)
# first consumer subscribe the topic, try to poll some data, and go to rest
instance.query("SELECT * FROM test.kafka")
# second consumer do the same leading to rebalance in the first
# consumer, try to poll some data
instance.query("SELECT * FROM test.kafka2")
producer.send(topic=topic, value=json.dumps({"key": 1, "value": 1}), partition=0)
producer.send(topic=topic, value=json.dumps({"key": 10, "value": 10}), partition=1)
time.sleep(3)
instance.query("SELECT * FROM test.kafka")
instance.query("SELECT * FROM test.kafka2")
instance.query("SELECT * FROM test.kafka")
instance.query("SELECT * FROM test.kafka2")
result_system_kafka_consumers = instance.query(
"""
create or replace function stable_timestamp as
(d)->multiIf(d==toDateTime('1970-01-01 00:00:00'), 'never', abs(dateDiff('second', d, now())) < 30, 'now', toString(d));
SELECT database, table, length(consumer_id), assignments.topic, assignments.partition_id,
assignments.current_offset,
if(length(exceptions.time)>0, exceptions.time[1]::String, 'never') as last_exception_time_,
if(length(exceptions.text)>0, exceptions.text[1], 'no exception') as last_exception_,
stable_timestamp(last_poll_time) as last_poll_time_, num_messages_read, stable_timestamp(last_commit_time) as last_commit_time_,
num_commits, stable_timestamp(last_rebalance_time) as last_rebalance_time_,
num_rebalance_revocations, num_rebalance_assignments, is_currently_used
FROM system.kafka_consumers WHERE database='test' and table IN ('kafka', 'kafka2') format Vertical;
"""
)
logging.debug(f"result_system_kafka_consumers: {result_system_kafka_consumers}")
assert (
result_system_kafka_consumers
== """Row 1:
database: test
table: kafka
length(consumer_id): 67
assignments.topic: ['system_kafka_cons2']
assignments.partition_id: [0]
assignments.current_offset: [2]
last_exception_time_: never
last_exception_: no exception
last_poll_time_: now
num_messages_read: 4
last_commit_time_: now
num_commits: 2
last_rebalance_time_: now
num_rebalance_revocations: 1
num_rebalance_assignments: 2
is_currently_used: 0
Row 2:
database: test
table: kafka2
length(consumer_id): 68
assignments.topic: ['system_kafka_cons2']
assignments.partition_id: [1]
assignments.current_offset: [2]
last_exception_time_: never
last_exception_: no exception
last_poll_time_: now
num_messages_read: 1
last_commit_time_: now
num_commits: 1
last_rebalance_time_: never
num_rebalance_revocations: 0
num_rebalance_assignments: 1
is_currently_used: 0
"""
)
instance.query("DROP TABLE test.kafka")
instance.query("DROP TABLE test.kafka2")
kafka_delete_topic(admin_client, topic)
def test_system_kafka_consumers_rebalance_mv(kafka_cluster, max_retries=15):
admin_client = KafkaAdminClient(
bootstrap_servers="localhost:{}".format(kafka_cluster.kafka_port)
)
producer = KafkaProducer(
bootstrap_servers="localhost:{}".format(cluster.kafka_port),
value_serializer=producer_serializer,
key_serializer=producer_serializer,
)
topic = "system_kafka_cons_mv"
kafka_create_topic(admin_client, topic, num_partitions=2)
instance.query(
f"""
DROP TABLE IF EXISTS test.kafka;
DROP TABLE IF EXISTS test.kafka2;
DROP TABLE IF EXISTS test.kafka_persistent;
DROP TABLE IF EXISTS test.kafka_persistent2;
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = '{topic}',
kafka_group_name = '{topic}',
kafka_commit_on_select = 1,
kafka_format = 'JSONEachRow';
CREATE TABLE test.kafka2 (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = '{topic}',
kafka_commit_on_select = 1,
kafka_group_name = '{topic}',
kafka_format = 'JSONEachRow';
CREATE TABLE test.kafka_persistent (key UInt64, value UInt64)
ENGINE = MergeTree()
ORDER BY key;
CREATE TABLE test.kafka_persistent2 (key UInt64, value UInt64)
ENGINE = MergeTree()
ORDER BY key;
CREATE MATERIALIZED VIEW test.persistent_kafka_mv TO test.kafka_persistent AS
SELECT key, value
FROM test.kafka;
CREATE MATERIALIZED VIEW test.persistent_kafka_mv2 TO test.kafka_persistent2 AS
SELECT key, value
FROM test.kafka2;
"""
)
producer.send(topic=topic, value=json.dumps({"key": 1, "value": 1}), partition=0)
producer.send(topic=topic, value=json.dumps({"key": 11, "value": 11}), partition=1)
time.sleep(3)
retries = 0
result_rdkafka_stat = ""
while True:
result_rdkafka_stat = instance.query(
"""
SELECT table, JSONExtractString(rdkafka_stat, 'type')
FROM system.kafka_consumers WHERE database='test' and table = 'kafka' format Vertical;
"""
)
if result_rdkafka_stat.find("consumer") or retries > max_retries:
break
retries += 1
time.sleep(1)
assert (
result_rdkafka_stat
== """Row 1:
table: kafka
JSONExtractString(rdkafka_stat, 'type'): consumer
"""
)
instance.query("DROP TABLE test.kafka")
instance.query("DROP TABLE test.kafka2")
instance.query("DROP TABLE test.kafka_persistent")
instance.query("DROP TABLE test.kafka_persistent2")
instance.query("DROP TABLE test.persistent_kafka_mv")
instance.query("DROP TABLE test.persistent_kafka_mv2")
kafka_delete_topic(admin_client, topic)
if __name__ == "__main__":
cluster.start()
input("Cluster created, press any key to destroy...")