Add some metrics to engine Kafka

This commit is contained in:
Mikhail Filimonov 2022-04-04 16:05:31 +02:00
parent 44293e6f94
commit 53c7376e37
No known key found for this signature in database
GPG Key ID: 6E49C2E9AF1220BE
8 changed files with 162 additions and 1 deletions

View File

@ -81,6 +81,14 @@
M(ActiveSyncDrainedConnections, "Number of active connections drained synchronously.") \ M(ActiveSyncDrainedConnections, "Number of active connections drained synchronously.") \
M(AsynchronousReadWait, "Number of threads waiting for asynchronous read.") \ M(AsynchronousReadWait, "Number of threads waiting for asynchronous read.") \
M(PendingAsyncInsert, "Number of asynchronous inserts that are waiting for flush.") \ M(PendingAsyncInsert, "Number of asynchronous inserts that are waiting for flush.") \
M(KafkaConsumers, "Number of active Kafka consumers") \
M(KafkaConsumersWithAssignment, "Number of active Kafka consumers which have some partitions assigned.") \
M(KafkaProducers, "Number of active Kafka producer created") \
M(KafkaLibrdkafkaThreads, "Number of active librdkafka threads") \
M(KafkaBackgroundReads, "Number of background reads currently working (populating materialized views from Kafka)") \
M(KafkaDirectReads, "Number of direct selects from Kafka currently executing") \
M(KafkaWrites, "Number of currently running inserts to Kafka") \
M(KafkaAssignedPartitions, "Number of partitions Kafka tables currently assigned to") \
namespace CurrentMetrics namespace CurrentMetrics
{ {

View File

@ -295,6 +295,25 @@
M(MergeTreeMetadataCacheHit, "Number of times the read of meta file was done from MergeTree metadata cache") \ M(MergeTreeMetadataCacheHit, "Number of times the read of meta file was done from MergeTree metadata cache") \
M(MergeTreeMetadataCacheMiss, "Number of times the read of meta file was not done from MergeTree metadata cache") \ M(MergeTreeMetadataCacheMiss, "Number of times the read of meta file was not done from MergeTree metadata cache") \
\ \
M(KafkaRebalanceRevocations, "Number of partition revocations (the first stage of consumer group rebalance)") \
M(KafkaRebalanceAssignments, "Number of partition assignments (the final stage of consumer group rebalance)") \
M(KafkaRebalanceErrors, "Number of failed consumer group rebalances") \
M(KafkaMessagesPolled, "Number of Kafka messages polled from librdkafka to ClickHouse") \
M(KafkaMessagesRead, "Number of Kafka messages already processed by ClickHouse") \
M(KafkaMessagesFailed, "Number of Kafka messages ClickHouse failed to parse") \
M(KafkaRowsRead, "Number of rows parsed from Kafka messages") \
M(KafkaRowsRejected, "Number of parsed rows which were later rejected (due to rebalances / errors or similar reasons). Those rows will be consumed again after the rebalance.") \
M(KafkaDirectReads, "Number of direct selects from Kafka tables since server start") \
M(KafkaBackgroundReads, "Number of background reads populating materialized views from Kafka since server start") \
M(KafkaCommits, "Number of successful commits of consumed offsets to Kafka (normally should be the same as KafkaBackgroundReads)") \
M(KafkaCommitFailures, "Number of failed commits of consumed offsets to Kafka (usually is a sign of some data duplication)") \
M(KafkaConsumerErrors, "Number of errors reported by librdkafka during polls") \
M(KafkaWrites, "Number of writes (inserts) to Kafka tables ") \
M(KafkaRowsWritten, "Number of rows inserted into Kafka tables") \
M(KafkaProducerFlushes, "Number of explicit flushes to Kafka producer") \
M(KafkaMessagesProduced, "Number of messages produced to Kafka") \
M(KafkaProducerErrors, "Number of errors during producing the messages to Kafka") \
\
M(ScalarSubqueriesGlobalCacheHit, "Number of times a read from a scalar subquery was done using the global cache") \ M(ScalarSubqueriesGlobalCacheHit, "Number of times a read from a scalar subquery was done using the global cache") \
M(ScalarSubqueriesLocalCacheHit, "Number of times a read from a scalar subquery was done using the local cache") \ M(ScalarSubqueriesLocalCacheHit, "Number of times a read from a scalar subquery was done using the local cache") \
M(ScalarSubqueriesCacheMiss, "Number of times a read from a scalar subquery was not cached and had to be calculated completely") M(ScalarSubqueriesCacheMiss, "Number of times a read from a scalar subquery was not cached and had to be calculated completely")

View File

@ -6,6 +6,16 @@
#include <base/logger_useful.h> #include <base/logger_useful.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Common/ProfileEvents.h>
namespace ProfileEvents
{
extern const Event KafkaMessagesRead;
extern const Event KafkaMessagesFailed;
extern const Event KafkaRowsRead;
extern const Event KafkaRowsRejected;
}
namespace DB namespace DB
{ {
namespace ErrorCodes namespace ErrorCodes
@ -85,6 +95,8 @@ Chunk KafkaSource::generateImpl()
auto on_error = [&](const MutableColumns & result_columns, Exception & e) auto on_error = [&](const MutableColumns & result_columns, Exception & e)
{ {
ProfileEvents::increment(ProfileEvents::KafkaMessagesFailed);
if (put_error_to_stream) if (put_error_to_stream)
{ {
exception_message = e.message(); exception_message = e.message();
@ -117,7 +129,11 @@ Chunk KafkaSource::generateImpl()
size_t new_rows = 0; size_t new_rows = 0;
exception_message.reset(); exception_message.reset();
if (buffer->poll()) if (buffer->poll())
{
// poll provide one message at a time to the input_format
ProfileEvents::increment(ProfileEvents::KafkaMessagesRead);
new_rows = executor.execute(); new_rows = executor.execute();
}
if (new_rows) if (new_rows)
{ {
@ -128,6 +144,8 @@ Chunk KafkaSource::generateImpl()
if (buffer->isStalled()) if (buffer->isStalled())
throw Exception("Polled messages became unusable", ErrorCodes::LOGICAL_ERROR); throw Exception("Polled messages became unusable", ErrorCodes::LOGICAL_ERROR);
ProfileEvents::increment(ProfileEvents::KafkaRowsRead, new_rows);
buffer->storeLastReadMessageOffset(); buffer->storeLastReadMessageOffset();
auto topic = buffer->currentTopic(); auto topic = buffer->currentTopic();
@ -212,8 +230,18 @@ Chunk KafkaSource::generateImpl()
} }
} }
if (buffer->polledDataUnusable() || total_rows == 0) if (total_rows == 0)
{
return {}; return {};
}
else if (buffer->polledDataUnusable())
{
// the rows were counted already before by KafkaRowsRead,
// so let's count the rows we ignore separately
// (they will be retried after the rebalance)
ProfileEvents::increment(ProfileEvents::KafkaRowsRejected, total_rows);
return {};
}
/// MATERIALIZED columns can be added here, but I think /// MATERIALIZED columns can be added here, but I think
// they are not needed here: // they are not needed here:

View File

@ -10,6 +10,26 @@
#include <boost/algorithm/string/join.hpp> #include <boost/algorithm/string/join.hpp>
#include <algorithm> #include <algorithm>
#include <Common/CurrentMetrics.h>
#include <Common/ProfileEvents.h>
namespace CurrentMetrics
{
extern const Metric KafkaAssignedPartitions;
extern const Metric KafkaConsumersWithAssignment;
}
namespace ProfileEvents
{
extern const Event KafkaRebalanceRevocations;
extern const Event KafkaRebalanceAssignments;
extern const Event KafkaRebalanceErrors;
extern const Event KafkaMessagesPolled;
extern const Event KafkaCommitFailures;
extern const Event KafkaCommits;
extern const Event KafkaConsumerErrors;
}
namespace DB namespace DB
{ {
@ -45,6 +65,9 @@ ReadBufferFromKafkaConsumer::ReadBufferFromKafkaConsumer(
// called (synchronously, during poll) when we enter the consumer group // called (synchronously, during poll) when we enter the consumer group
consumer->set_assignment_callback([this](const cppkafka::TopicPartitionList & topic_partitions) consumer->set_assignment_callback([this](const cppkafka::TopicPartitionList & topic_partitions)
{ {
CurrentMetrics::add(CurrentMetrics::KafkaAssignedPartitions, topic_partitions.size());
ProfileEvents::increment(ProfileEvents::KafkaRebalanceAssignments);
if (topic_partitions.empty()) if (topic_partitions.empty())
{ {
LOG_INFO(log, "Got empty assignment: Not enough partitions in the topic for all consumers?"); LOG_INFO(log, "Got empty assignment: Not enough partitions in the topic for all consumers?");
@ -52,6 +75,7 @@ ReadBufferFromKafkaConsumer::ReadBufferFromKafkaConsumer(
else else
{ {
LOG_TRACE(log, "Topics/partitions assigned: {}", topic_partitions); LOG_TRACE(log, "Topics/partitions assigned: {}", topic_partitions);
CurrentMetrics::add(CurrentMetrics::KafkaConsumersWithAssignment, 1);
} }
assignment = topic_partitions; assignment = topic_partitions;
@ -60,10 +84,18 @@ ReadBufferFromKafkaConsumer::ReadBufferFromKafkaConsumer(
// called (synchronously, during poll) when we leave the consumer group // called (synchronously, during poll) when we leave the consumer group
consumer->set_revocation_callback([this](const cppkafka::TopicPartitionList & topic_partitions) consumer->set_revocation_callback([this](const cppkafka::TopicPartitionList & topic_partitions)
{ {
CurrentMetrics::sub(CurrentMetrics::KafkaAssignedPartitions, topic_partitions.size());
ProfileEvents::increment(ProfileEvents::KafkaRebalanceRevocations);
// Rebalance is happening now, and now we have a chance to finish the work // Rebalance is happening now, and now we have a chance to finish the work
// with topics/partitions we were working with before rebalance // with topics/partitions we were working with before rebalance
LOG_TRACE(log, "Rebalance initiated. Revoking partitions: {}", topic_partitions); LOG_TRACE(log, "Rebalance initiated. Revoking partitions: {}", topic_partitions);
if (!topic_partitions.empty())
{
CurrentMetrics::sub(CurrentMetrics::KafkaConsumersWithAssignment, 1);
}
// we can not flush data to target from that point (it is pulled, not pushed) // we can not flush data to target from that point (it is pulled, not pushed)
// so the best we can now it to // so the best we can now it to
// 1) repeat last commit in sync mode (async could be still in queue, we need to be sure is is properly committed before rebalance) // 1) repeat last commit in sync mode (async could be still in queue, we need to be sure is is properly committed before rebalance)
@ -91,6 +123,7 @@ ReadBufferFromKafkaConsumer::ReadBufferFromKafkaConsumer(
consumer->set_rebalance_error_callback([this](cppkafka::Error err) consumer->set_rebalance_error_callback([this](cppkafka::Error err)
{ {
LOG_ERROR(log, "Rebalance error: {}", err); LOG_ERROR(log, "Rebalance error: {}", err);
ProfileEvents::increment(ProfileEvents::KafkaRebalanceErrors);
}); });
} }
@ -229,8 +262,14 @@ void ReadBufferFromKafkaConsumer::commit()
if (!committed) if (!committed)
{ {
// TODO: insert atomicity / transactions is needed here (possibility to rollback, on 2 phase commits) // TODO: insert atomicity / transactions is needed here (possibility to rollback, on 2 phase commits)
ProfileEvents::increment(ProfileEvents::KafkaCommitFailures);
throw Exception("All commit attempts failed. Last block was already written to target table(s), but was not committed to Kafka.", ErrorCodes::CANNOT_COMMIT_OFFSET); throw Exception("All commit attempts failed. Last block was already written to target table(s), but was not committed to Kafka.", ErrorCodes::CANNOT_COMMIT_OFFSET);
} }
else
{
ProfileEvents::increment(ProfileEvents::KafkaCommits);
}
} }
else else
{ {
@ -423,6 +462,8 @@ bool ReadBufferFromKafkaConsumer::poll()
return false; return false;
} }
ProfileEvents::increment(ProfileEvents::KafkaMessagesPolled, messages.size());
stalled_status = NOT_STALLED; stalled_status = NOT_STALLED;
allowed = true; allowed = true;
return true; return true;
@ -436,6 +477,7 @@ size_t ReadBufferFromKafkaConsumer::filterMessageErrors()
{ {
if (auto error = message.get_error()) if (auto error = message.get_error())
{ {
ProfileEvents::increment(ProfileEvents::KafkaConsumerErrors);
LOG_ERROR(log, "Consumer error: {}", error); LOG_ERROR(log, "Consumer error: {}", error);
return true; return true;
} }

View File

@ -5,6 +5,12 @@
#include <IO/ReadBuffer.h> #include <IO/ReadBuffer.h>
#include <cppkafka/cppkafka.h> #include <cppkafka/cppkafka.h>
#include <Common/CurrentMetrics.h>
namespace CurrentMetrics
{
extern const Metric KafkaConsumers;
}
namespace Poco namespace Poco
{ {
@ -67,6 +73,7 @@ public:
private: private:
using Messages = std::vector<cppkafka::Message>; using Messages = std::vector<cppkafka::Message>;
CurrentMetrics::Increment metric_increment{CurrentMetrics::KafkaConsumers};
enum StalledStatus enum StalledStatus
{ {

View File

@ -41,6 +41,26 @@
#include <Common/typeid_cast.h> #include <Common/typeid_cast.h>
#include <Common/CurrentMetrics.h>
#include <Common/ProfileEvents.h>
namespace CurrentMetrics
{
extern const Metric KafkaLibrdkafkaThreads;
extern const Metric KafkaBackgroundReads;
extern const Metric KafkaDirectReads;
extern const Metric KafkaWrites;
}
namespace ProfileEvents
{
extern const Event KafkaDirectReads;
extern const Event KafkaBackgroundReads;
extern const Event KafkaWrites;
}
namespace DB namespace DB
{ {
@ -58,6 +78,7 @@ struct StorageKafkaInterceptors
static rd_kafka_resp_err_t rdKafkaOnThreadStart(rd_kafka_t *, rd_kafka_thread_type_t thread_type, const char *, void * ctx) static rd_kafka_resp_err_t rdKafkaOnThreadStart(rd_kafka_t *, rd_kafka_thread_type_t thread_type, const char *, void * ctx)
{ {
StorageKafka * self = reinterpret_cast<StorageKafka *>(ctx); StorageKafka * self = reinterpret_cast<StorageKafka *>(ctx);
CurrentMetrics::add(CurrentMetrics::KafkaLibrdkafkaThreads, 1);
const auto & storage_id = self->getStorageID(); const auto & storage_id = self->getStorageID();
const auto & table = storage_id.getTableName(); const auto & table = storage_id.getTableName();
@ -89,6 +110,7 @@ struct StorageKafkaInterceptors
static rd_kafka_resp_err_t rdKafkaOnThreadExit(rd_kafka_t *, rd_kafka_thread_type_t, const char *, void * ctx) static rd_kafka_resp_err_t rdKafkaOnThreadExit(rd_kafka_t *, rd_kafka_thread_type_t, const char *, void * ctx)
{ {
StorageKafka * self = reinterpret_cast<StorageKafka *>(ctx); StorageKafka * self = reinterpret_cast<StorageKafka *>(ctx);
CurrentMetrics::sub(CurrentMetrics::KafkaLibrdkafkaThreads, 1);
std::lock_guard lock(self->thread_statuses_mutex); std::lock_guard lock(self->thread_statuses_mutex);
const auto it = std::find_if(self->thread_statuses.begin(), self->thread_statuses.end(), [](const auto & thread_status_ptr) const auto it = std::find_if(self->thread_statuses.begin(), self->thread_statuses.end(), [](const auto & thread_status_ptr)
@ -279,6 +301,9 @@ Pipe StorageKafka::read(
if (mv_attached) if (mv_attached)
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Cannot read from StorageKafka with attached materialized views"); throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Cannot read from StorageKafka with attached materialized views");
CurrentMetrics::Increment metric_increment{CurrentMetrics::KafkaDirectReads};
ProfileEvents::increment(ProfileEvents::KafkaDirectReads);
/// Always use all consumers at once, otherwise SELECT may not read messages from all partitions. /// Always use all consumers at once, otherwise SELECT may not read messages from all partitions.
Pipes pipes; Pipes pipes;
pipes.reserve(num_created_consumers); pipes.reserve(num_created_consumers);
@ -304,6 +329,9 @@ SinkToStoragePtr StorageKafka::write(const ASTPtr &, const StorageMetadataPtr &
auto modified_context = Context::createCopy(local_context); auto modified_context = Context::createCopy(local_context);
modified_context->applySettingsChanges(settings_adjustments); modified_context->applySettingsChanges(settings_adjustments);
CurrentMetrics::Increment metric_increment{CurrentMetrics::KafkaWrites};
ProfileEvents::increment(ProfileEvents::KafkaWrites);
if (topics.size() > 1) if (topics.size() > 1)
throw Exception("Can't write to Kafka table with multiple topics!", ErrorCodes::NOT_IMPLEMENTED); throw Exception("Can't write to Kafka table with multiple topics!", ErrorCodes::NOT_IMPLEMENTED);
return std::make_shared<KafkaSink>(*this, metadata_snapshot, modified_context); return std::make_shared<KafkaSink>(*this, metadata_snapshot, modified_context);
@ -615,6 +643,9 @@ bool StorageKafka::streamToViews()
if (!table) if (!table)
throw Exception("Engine table " + table_id.getNameForLogs() + " doesn't exist.", ErrorCodes::LOGICAL_ERROR); throw Exception("Engine table " + table_id.getNameForLogs() + " doesn't exist.", ErrorCodes::LOGICAL_ERROR);
CurrentMetrics::Increment metric_increment{CurrentMetrics::KafkaBackgroundReads};
ProfileEvents::increment(ProfileEvents::KafkaBackgroundReads);
auto storage_snapshot = getStorageSnapshot(getInMemoryMetadataPtr()); auto storage_snapshot = getStorageSnapshot(getInMemoryMetadataPtr());
// Create an INSERT query for streaming data // Create an INSERT query for streaming data

View File

@ -3,6 +3,16 @@
#include "Columns/ColumnString.h" #include "Columns/ColumnString.h"
#include "Columns/ColumnsNumber.h" #include "Columns/ColumnsNumber.h"
#include <Common/ProfileEvents.h>
namespace ProfileEvents
{
extern const Event KafkaRowsWritten;
extern const Event KafkaProducerFlushes;
extern const Event KafkaMessagesProduced;
extern const Event KafkaProducerErrors;
}
namespace DB namespace DB
{ {
WriteBufferToKafkaProducer::WriteBufferToKafkaProducer( WriteBufferToKafkaProducer::WriteBufferToKafkaProducer(
@ -53,6 +63,8 @@ WriteBufferToKafkaProducer::~WriteBufferToKafkaProducer()
void WriteBufferToKafkaProducer::countRow(const Columns & columns, size_t current_row) void WriteBufferToKafkaProducer::countRow(const Columns & columns, size_t current_row)
{ {
ProfileEvents::increment(ProfileEvents::KafkaRowsWritten);
if (++rows % max_rows == 0) if (++rows % max_rows == 0)
{ {
const std::string & last_chunk = chunks.back(); const std::string & last_chunk = chunks.back();
@ -103,8 +115,10 @@ void WriteBufferToKafkaProducer::countRow(const Columns & columns, size_t curren
producer->poll(timeout); producer->poll(timeout);
continue; continue;
} }
ProfileEvents::increment(ProfileEvents::KafkaProducerErrors);
throw; throw;
} }
ProfileEvents::increment(ProfileEvents::KafkaMessagesProduced);
break; break;
} }
@ -126,9 +140,12 @@ void WriteBufferToKafkaProducer::flush()
{ {
if (e.get_error() == RD_KAFKA_RESP_ERR__TIMED_OUT) if (e.get_error() == RD_KAFKA_RESP_ERR__TIMED_OUT)
continue; continue;
ProfileEvents::increment(ProfileEvents::KafkaProducerErrors);
throw; throw;
} }
ProfileEvents::increment(ProfileEvents::KafkaProducerFlushes);
break; break;
} }
} }

View File

@ -7,6 +7,14 @@
#include <list> #include <list>
#include <Common/CurrentMetrics.h>
namespace CurrentMetrics
{
extern const Metric KafkaProducers;
}
namespace DB namespace DB
{ {
class Block; class Block;
@ -32,6 +40,7 @@ private:
void nextImpl() override; void nextImpl() override;
void addChunk(); void addChunk();
void reinitializeChunks(); void reinitializeChunks();
CurrentMetrics::Increment metric_increment{CurrentMetrics::KafkaProducers};
ProducerPtr producer; ProducerPtr producer;
const std::string topic; const std::string topic;