diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp
index a741f1f1bfc..d49fc02084f 100644
--- a/src/Common/CurrentMetrics.cpp
+++ b/src/Common/CurrentMetrics.cpp
@@ -81,6 +81,14 @@
M(ActiveSyncDrainedConnections, "Number of active connections drained synchronously.") \
M(AsynchronousReadWait, "Number of threads waiting for asynchronous read.") \
M(PendingAsyncInsert, "Number of asynchronous inserts that are waiting for flush.") \
+ M(KafkaConsumers, "Number of active Kafka consumers") \
+ M(KafkaConsumersWithAssignment, "Number of active Kafka consumers which have some partitions assigned.") \
+ M(KafkaProducers, "Number of active Kafka producer created") \
+ M(KafkaLibrdkafkaThreads, "Number of active librdkafka threads") \
+ M(KafkaBackgroundReads, "Number of background reads currently working (populating materialized views from Kafka)") \
+ M(KafkaDirectReads, "Number of direct selects from Kafka currently executing") \
+ M(KafkaWrites, "Number of currently running inserts to Kafka") \
+ M(KafkaAssignedPartitions, "Number of partitions Kafka tables currently assigned to") \
namespace CurrentMetrics
{
diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp
index 074ec02394b..a963c024ab1 100644
--- a/src/Common/ProfileEvents.cpp
+++ b/src/Common/ProfileEvents.cpp
@@ -295,6 +295,25 @@
M(MergeTreeMetadataCacheHit, "Number of times the read of meta file was done from MergeTree metadata cache") \
M(MergeTreeMetadataCacheMiss, "Number of times the read of meta file was not done from MergeTree metadata cache") \
\
+ M(KafkaRebalanceRevocations, "Number of partition revocations (the first stage of consumer group rebalance)") \
+ M(KafkaRebalanceAssignments, "Number of partition assignments (the final stage of consumer group rebalance)") \
+ M(KafkaRebalanceErrors, "Number of failed consumer group rebalances") \
+ M(KafkaMessagesPolled, "Number of Kafka messages polled from librdkafka to ClickHouse") \
+ M(KafkaMessagesRead, "Number of Kafka messages already processed by ClickHouse") \
+ M(KafkaMessagesFailed, "Number of Kafka messages ClickHouse failed to parse") \
+ M(KafkaRowsRead, "Number of rows parsed from Kafka messages") \
+ M(KafkaRowsRejected, "Number of parsed rows which were later rejected (due to rebalances / errors or similar reasons). Those rows will be consumed again after the rebalance.") \
+ M(KafkaDirectReads, "Number of direct selects from Kafka tables since server start") \
+ M(KafkaBackgroundReads, "Number of background reads populating materialized views from Kafka since server start") \
+ M(KafkaCommits, "Number of successful commits of consumed offsets to Kafka (normally should be the same as KafkaBackgroundReads)") \
+ M(KafkaCommitFailures, "Number of failed commits of consumed offsets to Kafka (usually is a sign of some data duplication)") \
+ M(KafkaConsumerErrors, "Number of errors reported by librdkafka during polls") \
+ M(KafkaWrites, "Number of writes (inserts) to Kafka tables ") \
+ M(KafkaRowsWritten, "Number of rows inserted into Kafka tables") \
+ M(KafkaProducerFlushes, "Number of explicit flushes to Kafka producer") \
+ M(KafkaMessagesProduced, "Number of messages produced to Kafka") \
+ M(KafkaProducerErrors, "Number of errors during producing the messages to Kafka") \
+ \
M(ScalarSubqueriesGlobalCacheHit, "Number of times a read from a scalar subquery was done using the global cache") \
M(ScalarSubqueriesLocalCacheHit, "Number of times a read from a scalar subquery was done using the local cache") \
M(ScalarSubqueriesCacheMiss, "Number of times a read from a scalar subquery was not cached and had to be calculated completely")
diff --git a/src/Storages/Kafka/KafkaSource.cpp b/src/Storages/Kafka/KafkaSource.cpp
index 99130f615f5..60047af8774 100644
--- a/src/Storages/Kafka/KafkaSource.cpp
+++ b/src/Storages/Kafka/KafkaSource.cpp
@@ -6,6 +6,16 @@
#include
#include
+#include
+
+namespace ProfileEvents
+{
+ extern const Event KafkaMessagesRead;
+ extern const Event KafkaMessagesFailed;
+ extern const Event KafkaRowsRead;
+ extern const Event KafkaRowsRejected;
+}
+
namespace DB
{
namespace ErrorCodes
@@ -85,6 +95,8 @@ Chunk KafkaSource::generateImpl()
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
{
+ ProfileEvents::increment(ProfileEvents::KafkaMessagesFailed);
+
if (put_error_to_stream)
{
exception_message = e.message();
@@ -117,7 +129,11 @@ Chunk KafkaSource::generateImpl()
size_t new_rows = 0;
exception_message.reset();
if (buffer->poll())
+ {
+ // poll provide one message at a time to the input_format
+ ProfileEvents::increment(ProfileEvents::KafkaMessagesRead);
new_rows = executor.execute();
+ }
if (new_rows)
{
@@ -128,6 +144,8 @@ Chunk KafkaSource::generateImpl()
if (buffer->isStalled())
throw Exception("Polled messages became unusable", ErrorCodes::LOGICAL_ERROR);
+ ProfileEvents::increment(ProfileEvents::KafkaRowsRead, new_rows);
+
buffer->storeLastReadMessageOffset();
auto topic = buffer->currentTopic();
@@ -212,8 +230,18 @@ Chunk KafkaSource::generateImpl()
}
}
- if (buffer->polledDataUnusable() || total_rows == 0)
+ if (total_rows == 0)
+ {
return {};
+ }
+ else if (buffer->polledDataUnusable())
+ {
+ // the rows were counted already before by KafkaRowsRead,
+ // so let's count the rows we ignore separately
+ // (they will be retried after the rebalance)
+ ProfileEvents::increment(ProfileEvents::KafkaRowsRejected, total_rows);
+ return {};
+ }
/// MATERIALIZED columns can be added here, but I think
// they are not needed here:
diff --git a/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp b/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp
index ebfeaed8346..5ff90164064 100644
--- a/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp
+++ b/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp
@@ -10,6 +10,26 @@
#include
#include
+#include
+#include
+
+namespace CurrentMetrics
+{
+ extern const Metric KafkaAssignedPartitions;
+ extern const Metric KafkaConsumersWithAssignment;
+}
+
+namespace ProfileEvents
+{
+ extern const Event KafkaRebalanceRevocations;
+ extern const Event KafkaRebalanceAssignments;
+ extern const Event KafkaRebalanceErrors;
+ extern const Event KafkaMessagesPolled;
+ extern const Event KafkaCommitFailures;
+ extern const Event KafkaCommits;
+ extern const Event KafkaConsumerErrors;
+}
+
namespace DB
{
@@ -45,6 +65,9 @@ ReadBufferFromKafkaConsumer::ReadBufferFromKafkaConsumer(
// called (synchronously, during poll) when we enter the consumer group
consumer->set_assignment_callback([this](const cppkafka::TopicPartitionList & topic_partitions)
{
+ CurrentMetrics::add(CurrentMetrics::KafkaAssignedPartitions, topic_partitions.size());
+ ProfileEvents::increment(ProfileEvents::KafkaRebalanceAssignments);
+
if (topic_partitions.empty())
{
LOG_INFO(log, "Got empty assignment: Not enough partitions in the topic for all consumers?");
@@ -52,6 +75,7 @@ ReadBufferFromKafkaConsumer::ReadBufferFromKafkaConsumer(
else
{
LOG_TRACE(log, "Topics/partitions assigned: {}", topic_partitions);
+ CurrentMetrics::add(CurrentMetrics::KafkaConsumersWithAssignment, 1);
}
assignment = topic_partitions;
@@ -60,10 +84,18 @@ ReadBufferFromKafkaConsumer::ReadBufferFromKafkaConsumer(
// called (synchronously, during poll) when we leave the consumer group
consumer->set_revocation_callback([this](const cppkafka::TopicPartitionList & topic_partitions)
{
+ CurrentMetrics::sub(CurrentMetrics::KafkaAssignedPartitions, topic_partitions.size());
+ ProfileEvents::increment(ProfileEvents::KafkaRebalanceRevocations);
+
// Rebalance is happening now, and now we have a chance to finish the work
// with topics/partitions we were working with before rebalance
LOG_TRACE(log, "Rebalance initiated. Revoking partitions: {}", topic_partitions);
+ if (!topic_partitions.empty())
+ {
+ CurrentMetrics::sub(CurrentMetrics::KafkaConsumersWithAssignment, 1);
+ }
+
// we can not flush data to target from that point (it is pulled, not pushed)
// so the best we can now it to
// 1) repeat last commit in sync mode (async could be still in queue, we need to be sure is is properly committed before rebalance)
@@ -91,6 +123,7 @@ ReadBufferFromKafkaConsumer::ReadBufferFromKafkaConsumer(
consumer->set_rebalance_error_callback([this](cppkafka::Error err)
{
LOG_ERROR(log, "Rebalance error: {}", err);
+ ProfileEvents::increment(ProfileEvents::KafkaRebalanceErrors);
});
}
@@ -229,8 +262,14 @@ void ReadBufferFromKafkaConsumer::commit()
if (!committed)
{
// TODO: insert atomicity / transactions is needed here (possibility to rollback, on 2 phase commits)
+ ProfileEvents::increment(ProfileEvents::KafkaCommitFailures);
throw Exception("All commit attempts failed. Last block was already written to target table(s), but was not committed to Kafka.", ErrorCodes::CANNOT_COMMIT_OFFSET);
}
+ else
+ {
+ ProfileEvents::increment(ProfileEvents::KafkaCommits);
+ }
+
}
else
{
@@ -423,6 +462,8 @@ bool ReadBufferFromKafkaConsumer::poll()
return false;
}
+ ProfileEvents::increment(ProfileEvents::KafkaMessagesPolled, messages.size());
+
stalled_status = NOT_STALLED;
allowed = true;
return true;
@@ -436,6 +477,7 @@ size_t ReadBufferFromKafkaConsumer::filterMessageErrors()
{
if (auto error = message.get_error())
{
+ ProfileEvents::increment(ProfileEvents::KafkaConsumerErrors);
LOG_ERROR(log, "Consumer error: {}", error);
return true;
}
diff --git a/src/Storages/Kafka/ReadBufferFromKafkaConsumer.h b/src/Storages/Kafka/ReadBufferFromKafkaConsumer.h
index 4e9bf2e55c2..f390d1c1330 100644
--- a/src/Storages/Kafka/ReadBufferFromKafkaConsumer.h
+++ b/src/Storages/Kafka/ReadBufferFromKafkaConsumer.h
@@ -5,6 +5,12 @@
#include
#include
+#include
+
+namespace CurrentMetrics
+{
+ extern const Metric KafkaConsumers;
+}
namespace Poco
{
@@ -67,6 +73,7 @@ public:
private:
using Messages = std::vector;
+ CurrentMetrics::Increment metric_increment{CurrentMetrics::KafkaConsumers};
enum StalledStatus
{
diff --git a/src/Storages/Kafka/StorageKafka.cpp b/src/Storages/Kafka/StorageKafka.cpp
index 4c7465d587d..722c55e6c93 100644
--- a/src/Storages/Kafka/StorageKafka.cpp
+++ b/src/Storages/Kafka/StorageKafka.cpp
@@ -41,6 +41,26 @@
#include
+#include
+#include
+
+
+namespace CurrentMetrics
+{
+ extern const Metric KafkaLibrdkafkaThreads;
+ extern const Metric KafkaBackgroundReads;
+ extern const Metric KafkaDirectReads;
+ extern const Metric KafkaWrites;
+}
+
+namespace ProfileEvents
+{
+ extern const Event KafkaDirectReads;
+ extern const Event KafkaBackgroundReads;
+ extern const Event KafkaWrites;
+}
+
+
namespace DB
{
@@ -58,6 +78,7 @@ struct StorageKafkaInterceptors
static rd_kafka_resp_err_t rdKafkaOnThreadStart(rd_kafka_t *, rd_kafka_thread_type_t thread_type, const char *, void * ctx)
{
StorageKafka * self = reinterpret_cast(ctx);
+ CurrentMetrics::add(CurrentMetrics::KafkaLibrdkafkaThreads, 1);
const auto & storage_id = self->getStorageID();
const auto & table = storage_id.getTableName();
@@ -89,6 +110,7 @@ struct StorageKafkaInterceptors
static rd_kafka_resp_err_t rdKafkaOnThreadExit(rd_kafka_t *, rd_kafka_thread_type_t, const char *, void * ctx)
{
StorageKafka * self = reinterpret_cast(ctx);
+ CurrentMetrics::sub(CurrentMetrics::KafkaLibrdkafkaThreads, 1);
std::lock_guard lock(self->thread_statuses_mutex);
const auto it = std::find_if(self->thread_statuses.begin(), self->thread_statuses.end(), [](const auto & thread_status_ptr)
@@ -279,6 +301,9 @@ Pipe StorageKafka::read(
if (mv_attached)
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Cannot read from StorageKafka with attached materialized views");
+ CurrentMetrics::Increment metric_increment{CurrentMetrics::KafkaDirectReads};
+ ProfileEvents::increment(ProfileEvents::KafkaDirectReads);
+
/// Always use all consumers at once, otherwise SELECT may not read messages from all partitions.
Pipes pipes;
pipes.reserve(num_created_consumers);
@@ -304,6 +329,9 @@ SinkToStoragePtr StorageKafka::write(const ASTPtr &, const StorageMetadataPtr &
auto modified_context = Context::createCopy(local_context);
modified_context->applySettingsChanges(settings_adjustments);
+ CurrentMetrics::Increment metric_increment{CurrentMetrics::KafkaWrites};
+ ProfileEvents::increment(ProfileEvents::KafkaWrites);
+
if (topics.size() > 1)
throw Exception("Can't write to Kafka table with multiple topics!", ErrorCodes::NOT_IMPLEMENTED);
return std::make_shared(*this, metadata_snapshot, modified_context);
@@ -615,6 +643,9 @@ bool StorageKafka::streamToViews()
if (!table)
throw Exception("Engine table " + table_id.getNameForLogs() + " doesn't exist.", ErrorCodes::LOGICAL_ERROR);
+ CurrentMetrics::Increment metric_increment{CurrentMetrics::KafkaBackgroundReads};
+ ProfileEvents::increment(ProfileEvents::KafkaBackgroundReads);
+
auto storage_snapshot = getStorageSnapshot(getInMemoryMetadataPtr());
// Create an INSERT query for streaming data
diff --git a/src/Storages/Kafka/WriteBufferToKafkaProducer.cpp b/src/Storages/Kafka/WriteBufferToKafkaProducer.cpp
index 748ea02ac6d..28877864e16 100644
--- a/src/Storages/Kafka/WriteBufferToKafkaProducer.cpp
+++ b/src/Storages/Kafka/WriteBufferToKafkaProducer.cpp
@@ -3,6 +3,16 @@
#include "Columns/ColumnString.h"
#include "Columns/ColumnsNumber.h"
+#include
+
+namespace ProfileEvents
+{
+ extern const Event KafkaRowsWritten;
+ extern const Event KafkaProducerFlushes;
+ extern const Event KafkaMessagesProduced;
+ extern const Event KafkaProducerErrors;
+}
+
namespace DB
{
WriteBufferToKafkaProducer::WriteBufferToKafkaProducer(
@@ -53,6 +63,8 @@ WriteBufferToKafkaProducer::~WriteBufferToKafkaProducer()
void WriteBufferToKafkaProducer::countRow(const Columns & columns, size_t current_row)
{
+ ProfileEvents::increment(ProfileEvents::KafkaRowsWritten);
+
if (++rows % max_rows == 0)
{
const std::string & last_chunk = chunks.back();
@@ -103,8 +115,10 @@ void WriteBufferToKafkaProducer::countRow(const Columns & columns, size_t curren
producer->poll(timeout);
continue;
}
+ ProfileEvents::increment(ProfileEvents::KafkaProducerErrors);
throw;
}
+ ProfileEvents::increment(ProfileEvents::KafkaMessagesProduced);
break;
}
@@ -126,9 +140,12 @@ void WriteBufferToKafkaProducer::flush()
{
if (e.get_error() == RD_KAFKA_RESP_ERR__TIMED_OUT)
continue;
+
+ ProfileEvents::increment(ProfileEvents::KafkaProducerErrors);
throw;
}
+ ProfileEvents::increment(ProfileEvents::KafkaProducerFlushes);
break;
}
}
diff --git a/src/Storages/Kafka/WriteBufferToKafkaProducer.h b/src/Storages/Kafka/WriteBufferToKafkaProducer.h
index 15881b7a8e5..64b06571f0a 100644
--- a/src/Storages/Kafka/WriteBufferToKafkaProducer.h
+++ b/src/Storages/Kafka/WriteBufferToKafkaProducer.h
@@ -7,6 +7,14 @@
#include
+#include
+
+namespace CurrentMetrics
+{
+ extern const Metric KafkaProducers;
+}
+
+
namespace DB
{
class Block;
@@ -32,6 +40,7 @@ private:
void nextImpl() override;
void addChunk();
void reinitializeChunks();
+ CurrentMetrics::Increment metric_increment{CurrentMetrics::KafkaProducers};
ProducerPtr producer;
const std::string topic;