mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-27 01:51:59 +00:00
Merge pull request #35916 from filimonov/kafka_metrics
Add some metrics to engine Kafka
This commit is contained in:
commit
a383d9ee6a
@ -81,6 +81,14 @@
|
|||||||
M(ActiveSyncDrainedConnections, "Number of active connections drained synchronously.") \
|
M(ActiveSyncDrainedConnections, "Number of active connections drained synchronously.") \
|
||||||
M(AsynchronousReadWait, "Number of threads waiting for asynchronous read.") \
|
M(AsynchronousReadWait, "Number of threads waiting for asynchronous read.") \
|
||||||
M(PendingAsyncInsert, "Number of asynchronous inserts that are waiting for flush.") \
|
M(PendingAsyncInsert, "Number of asynchronous inserts that are waiting for flush.") \
|
||||||
|
M(KafkaConsumers, "Number of active Kafka consumers") \
|
||||||
|
M(KafkaConsumersWithAssignment, "Number of active Kafka consumers which have some partitions assigned.") \
|
||||||
|
M(KafkaProducers, "Number of active Kafka producer created") \
|
||||||
|
M(KafkaLibrdkafkaThreads, "Number of active librdkafka threads") \
|
||||||
|
M(KafkaBackgroundReads, "Number of background reads currently working (populating materialized views from Kafka)") \
|
||||||
|
M(KafkaConsumersInUse, "Number of consumers which are currently used by direct or background reads") \
|
||||||
|
M(KafkaWrites, "Number of currently running inserts to Kafka") \
|
||||||
|
M(KafkaAssignedPartitions, "Number of partitions Kafka tables currently assigned to") \
|
||||||
|
|
||||||
namespace CurrentMetrics
|
namespace CurrentMetrics
|
||||||
{
|
{
|
||||||
|
@ -297,6 +297,25 @@
|
|||||||
M(MergeTreeMetadataCacheHit, "Number of times the read of meta file was done from MergeTree metadata cache") \
|
M(MergeTreeMetadataCacheHit, "Number of times the read of meta file was done from MergeTree metadata cache") \
|
||||||
M(MergeTreeMetadataCacheMiss, "Number of times the read of meta file was not done from MergeTree metadata cache") \
|
M(MergeTreeMetadataCacheMiss, "Number of times the read of meta file was not done from MergeTree metadata cache") \
|
||||||
\
|
\
|
||||||
|
M(KafkaRebalanceRevocations, "Number of partition revocations (the first stage of consumer group rebalance)") \
|
||||||
|
M(KafkaRebalanceAssignments, "Number of partition assignments (the final stage of consumer group rebalance)") \
|
||||||
|
M(KafkaRebalanceErrors, "Number of failed consumer group rebalances") \
|
||||||
|
M(KafkaMessagesPolled, "Number of Kafka messages polled from librdkafka to ClickHouse") \
|
||||||
|
M(KafkaMessagesRead, "Number of Kafka messages already processed by ClickHouse") \
|
||||||
|
M(KafkaMessagesFailed, "Number of Kafka messages ClickHouse failed to parse") \
|
||||||
|
M(KafkaRowsRead, "Number of rows parsed from Kafka messages") \
|
||||||
|
M(KafkaRowsRejected, "Number of parsed rows which were later rejected (due to rebalances / errors or similar reasons). Those rows will be consumed again after the rebalance.") \
|
||||||
|
M(KafkaDirectReads, "Number of direct selects from Kafka tables since server start") \
|
||||||
|
M(KafkaBackgroundReads, "Number of background reads populating materialized views from Kafka since server start") \
|
||||||
|
M(KafkaCommits, "Number of successful commits of consumed offsets to Kafka (normally should be the same as KafkaBackgroundReads)") \
|
||||||
|
M(KafkaCommitFailures, "Number of failed commits of consumed offsets to Kafka (usually is a sign of some data duplication)") \
|
||||||
|
M(KafkaConsumerErrors, "Number of errors reported by librdkafka during polls") \
|
||||||
|
M(KafkaWrites, "Number of writes (inserts) to Kafka tables ") \
|
||||||
|
M(KafkaRowsWritten, "Number of rows inserted into Kafka tables") \
|
||||||
|
M(KafkaProducerFlushes, "Number of explicit flushes to Kafka producer") \
|
||||||
|
M(KafkaMessagesProduced, "Number of messages produced to Kafka") \
|
||||||
|
M(KafkaProducerErrors, "Number of errors during producing the messages to Kafka") \
|
||||||
|
\
|
||||||
M(ScalarSubqueriesGlobalCacheHit, "Number of times a read from a scalar subquery was done using the global cache") \
|
M(ScalarSubqueriesGlobalCacheHit, "Number of times a read from a scalar subquery was done using the global cache") \
|
||||||
M(ScalarSubqueriesLocalCacheHit, "Number of times a read from a scalar subquery was done using the local cache") \
|
M(ScalarSubqueriesLocalCacheHit, "Number of times a read from a scalar subquery was done using the local cache") \
|
||||||
M(ScalarSubqueriesCacheMiss, "Number of times a read from a scalar subquery was not cached and had to be calculated completely")
|
M(ScalarSubqueriesCacheMiss, "Number of times a read from a scalar subquery was not cached and had to be calculated completely")
|
||||||
|
@ -6,6 +6,16 @@
|
|||||||
#include <base/logger_useful.h>
|
#include <base/logger_useful.h>
|
||||||
#include <Interpreters/Context.h>
|
#include <Interpreters/Context.h>
|
||||||
|
|
||||||
|
#include <Common/ProfileEvents.h>
|
||||||
|
|
||||||
|
namespace ProfileEvents
|
||||||
|
{
|
||||||
|
extern const Event KafkaMessagesRead;
|
||||||
|
extern const Event KafkaMessagesFailed;
|
||||||
|
extern const Event KafkaRowsRead;
|
||||||
|
extern const Event KafkaRowsRejected;
|
||||||
|
}
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
namespace ErrorCodes
|
namespace ErrorCodes
|
||||||
@ -85,6 +95,8 @@ Chunk KafkaSource::generateImpl()
|
|||||||
|
|
||||||
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
|
auto on_error = [&](const MutableColumns & result_columns, Exception & e)
|
||||||
{
|
{
|
||||||
|
ProfileEvents::increment(ProfileEvents::KafkaMessagesFailed);
|
||||||
|
|
||||||
if (put_error_to_stream)
|
if (put_error_to_stream)
|
||||||
{
|
{
|
||||||
exception_message = e.message();
|
exception_message = e.message();
|
||||||
@ -117,7 +129,11 @@ Chunk KafkaSource::generateImpl()
|
|||||||
size_t new_rows = 0;
|
size_t new_rows = 0;
|
||||||
exception_message.reset();
|
exception_message.reset();
|
||||||
if (buffer->poll())
|
if (buffer->poll())
|
||||||
|
{
|
||||||
|
// poll provide one message at a time to the input_format
|
||||||
|
ProfileEvents::increment(ProfileEvents::KafkaMessagesRead);
|
||||||
new_rows = executor.execute();
|
new_rows = executor.execute();
|
||||||
|
}
|
||||||
|
|
||||||
if (new_rows)
|
if (new_rows)
|
||||||
{
|
{
|
||||||
@ -128,6 +144,8 @@ Chunk KafkaSource::generateImpl()
|
|||||||
if (buffer->isStalled())
|
if (buffer->isStalled())
|
||||||
throw Exception("Polled messages became unusable", ErrorCodes::LOGICAL_ERROR);
|
throw Exception("Polled messages became unusable", ErrorCodes::LOGICAL_ERROR);
|
||||||
|
|
||||||
|
ProfileEvents::increment(ProfileEvents::KafkaRowsRead, new_rows);
|
||||||
|
|
||||||
buffer->storeLastReadMessageOffset();
|
buffer->storeLastReadMessageOffset();
|
||||||
|
|
||||||
auto topic = buffer->currentTopic();
|
auto topic = buffer->currentTopic();
|
||||||
@ -212,8 +230,18 @@ Chunk KafkaSource::generateImpl()
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (buffer->polledDataUnusable() || total_rows == 0)
|
if (total_rows == 0)
|
||||||
|
{
|
||||||
return {};
|
return {};
|
||||||
|
}
|
||||||
|
else if (buffer->polledDataUnusable())
|
||||||
|
{
|
||||||
|
// the rows were counted already before by KafkaRowsRead,
|
||||||
|
// so let's count the rows we ignore separately
|
||||||
|
// (they will be retried after the rebalance)
|
||||||
|
ProfileEvents::increment(ProfileEvents::KafkaRowsRejected, total_rows);
|
||||||
|
return {};
|
||||||
|
}
|
||||||
|
|
||||||
/// MATERIALIZED columns can be added here, but I think
|
/// MATERIALIZED columns can be added here, but I think
|
||||||
// they are not needed here:
|
// they are not needed here:
|
||||||
|
@ -10,6 +10,26 @@
|
|||||||
#include <boost/algorithm/string/join.hpp>
|
#include <boost/algorithm/string/join.hpp>
|
||||||
#include <algorithm>
|
#include <algorithm>
|
||||||
|
|
||||||
|
#include <Common/CurrentMetrics.h>
|
||||||
|
#include <Common/ProfileEvents.h>
|
||||||
|
|
||||||
|
namespace CurrentMetrics
|
||||||
|
{
|
||||||
|
extern const Metric KafkaAssignedPartitions;
|
||||||
|
extern const Metric KafkaConsumersWithAssignment;
|
||||||
|
}
|
||||||
|
|
||||||
|
namespace ProfileEvents
|
||||||
|
{
|
||||||
|
extern const Event KafkaRebalanceRevocations;
|
||||||
|
extern const Event KafkaRebalanceAssignments;
|
||||||
|
extern const Event KafkaRebalanceErrors;
|
||||||
|
extern const Event KafkaMessagesPolled;
|
||||||
|
extern const Event KafkaCommitFailures;
|
||||||
|
extern const Event KafkaCommits;
|
||||||
|
extern const Event KafkaConsumerErrors;
|
||||||
|
}
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
@ -45,6 +65,9 @@ ReadBufferFromKafkaConsumer::ReadBufferFromKafkaConsumer(
|
|||||||
// called (synchronously, during poll) when we enter the consumer group
|
// called (synchronously, during poll) when we enter the consumer group
|
||||||
consumer->set_assignment_callback([this](const cppkafka::TopicPartitionList & topic_partitions)
|
consumer->set_assignment_callback([this](const cppkafka::TopicPartitionList & topic_partitions)
|
||||||
{
|
{
|
||||||
|
CurrentMetrics::add(CurrentMetrics::KafkaAssignedPartitions, topic_partitions.size());
|
||||||
|
ProfileEvents::increment(ProfileEvents::KafkaRebalanceAssignments);
|
||||||
|
|
||||||
if (topic_partitions.empty())
|
if (topic_partitions.empty())
|
||||||
{
|
{
|
||||||
LOG_INFO(log, "Got empty assignment: Not enough partitions in the topic for all consumers?");
|
LOG_INFO(log, "Got empty assignment: Not enough partitions in the topic for all consumers?");
|
||||||
@ -52,6 +75,7 @@ ReadBufferFromKafkaConsumer::ReadBufferFromKafkaConsumer(
|
|||||||
else
|
else
|
||||||
{
|
{
|
||||||
LOG_TRACE(log, "Topics/partitions assigned: {}", topic_partitions);
|
LOG_TRACE(log, "Topics/partitions assigned: {}", topic_partitions);
|
||||||
|
CurrentMetrics::add(CurrentMetrics::KafkaConsumersWithAssignment, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
assignment = topic_partitions;
|
assignment = topic_partitions;
|
||||||
@ -60,10 +84,18 @@ ReadBufferFromKafkaConsumer::ReadBufferFromKafkaConsumer(
|
|||||||
// called (synchronously, during poll) when we leave the consumer group
|
// called (synchronously, during poll) when we leave the consumer group
|
||||||
consumer->set_revocation_callback([this](const cppkafka::TopicPartitionList & topic_partitions)
|
consumer->set_revocation_callback([this](const cppkafka::TopicPartitionList & topic_partitions)
|
||||||
{
|
{
|
||||||
|
CurrentMetrics::sub(CurrentMetrics::KafkaAssignedPartitions, topic_partitions.size());
|
||||||
|
ProfileEvents::increment(ProfileEvents::KafkaRebalanceRevocations);
|
||||||
|
|
||||||
// Rebalance is happening now, and now we have a chance to finish the work
|
// Rebalance is happening now, and now we have a chance to finish the work
|
||||||
// with topics/partitions we were working with before rebalance
|
// with topics/partitions we were working with before rebalance
|
||||||
LOG_TRACE(log, "Rebalance initiated. Revoking partitions: {}", topic_partitions);
|
LOG_TRACE(log, "Rebalance initiated. Revoking partitions: {}", topic_partitions);
|
||||||
|
|
||||||
|
if (!topic_partitions.empty())
|
||||||
|
{
|
||||||
|
CurrentMetrics::sub(CurrentMetrics::KafkaConsumersWithAssignment, 1);
|
||||||
|
}
|
||||||
|
|
||||||
// we can not flush data to target from that point (it is pulled, not pushed)
|
// we can not flush data to target from that point (it is pulled, not pushed)
|
||||||
// so the best we can now it to
|
// so the best we can now it to
|
||||||
// 1) repeat last commit in sync mode (async could be still in queue, we need to be sure is is properly committed before rebalance)
|
// 1) repeat last commit in sync mode (async could be still in queue, we need to be sure is is properly committed before rebalance)
|
||||||
@ -91,6 +123,7 @@ ReadBufferFromKafkaConsumer::ReadBufferFromKafkaConsumer(
|
|||||||
consumer->set_rebalance_error_callback([this](cppkafka::Error err)
|
consumer->set_rebalance_error_callback([this](cppkafka::Error err)
|
||||||
{
|
{
|
||||||
LOG_ERROR(log, "Rebalance error: {}", err);
|
LOG_ERROR(log, "Rebalance error: {}", err);
|
||||||
|
ProfileEvents::increment(ProfileEvents::KafkaRebalanceErrors);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -229,8 +262,14 @@ void ReadBufferFromKafkaConsumer::commit()
|
|||||||
if (!committed)
|
if (!committed)
|
||||||
{
|
{
|
||||||
// TODO: insert atomicity / transactions is needed here (possibility to rollback, on 2 phase commits)
|
// TODO: insert atomicity / transactions is needed here (possibility to rollback, on 2 phase commits)
|
||||||
|
ProfileEvents::increment(ProfileEvents::KafkaCommitFailures);
|
||||||
throw Exception("All commit attempts failed. Last block was already written to target table(s), but was not committed to Kafka.", ErrorCodes::CANNOT_COMMIT_OFFSET);
|
throw Exception("All commit attempts failed. Last block was already written to target table(s), but was not committed to Kafka.", ErrorCodes::CANNOT_COMMIT_OFFSET);
|
||||||
}
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
ProfileEvents::increment(ProfileEvents::KafkaCommits);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
@ -423,6 +462,8 @@ bool ReadBufferFromKafkaConsumer::poll()
|
|||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ProfileEvents::increment(ProfileEvents::KafkaMessagesPolled, messages.size());
|
||||||
|
|
||||||
stalled_status = NOT_STALLED;
|
stalled_status = NOT_STALLED;
|
||||||
allowed = true;
|
allowed = true;
|
||||||
return true;
|
return true;
|
||||||
@ -436,6 +477,7 @@ size_t ReadBufferFromKafkaConsumer::filterMessageErrors()
|
|||||||
{
|
{
|
||||||
if (auto error = message.get_error())
|
if (auto error = message.get_error())
|
||||||
{
|
{
|
||||||
|
ProfileEvents::increment(ProfileEvents::KafkaConsumerErrors);
|
||||||
LOG_ERROR(log, "Consumer error: {}", error);
|
LOG_ERROR(log, "Consumer error: {}", error);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -5,6 +5,12 @@
|
|||||||
#include <IO/ReadBuffer.h>
|
#include <IO/ReadBuffer.h>
|
||||||
|
|
||||||
#include <cppkafka/cppkafka.h>
|
#include <cppkafka/cppkafka.h>
|
||||||
|
#include <Common/CurrentMetrics.h>
|
||||||
|
|
||||||
|
namespace CurrentMetrics
|
||||||
|
{
|
||||||
|
extern const Metric KafkaConsumers;
|
||||||
|
}
|
||||||
|
|
||||||
namespace Poco
|
namespace Poco
|
||||||
{
|
{
|
||||||
@ -67,6 +73,7 @@ public:
|
|||||||
|
|
||||||
private:
|
private:
|
||||||
using Messages = std::vector<cppkafka::Message>;
|
using Messages = std::vector<cppkafka::Message>;
|
||||||
|
CurrentMetrics::Increment metric_increment{CurrentMetrics::KafkaConsumers};
|
||||||
|
|
||||||
enum StalledStatus
|
enum StalledStatus
|
||||||
{
|
{
|
||||||
|
@ -41,6 +41,26 @@
|
|||||||
#include <Common/typeid_cast.h>
|
#include <Common/typeid_cast.h>
|
||||||
|
|
||||||
|
|
||||||
|
#include <Common/CurrentMetrics.h>
|
||||||
|
#include <Common/ProfileEvents.h>
|
||||||
|
|
||||||
|
|
||||||
|
namespace CurrentMetrics
|
||||||
|
{
|
||||||
|
extern const Metric KafkaLibrdkafkaThreads;
|
||||||
|
extern const Metric KafkaBackgroundReads;
|
||||||
|
extern const Metric KafkaConsumersInUse;
|
||||||
|
extern const Metric KafkaWrites;
|
||||||
|
}
|
||||||
|
|
||||||
|
namespace ProfileEvents
|
||||||
|
{
|
||||||
|
extern const Event KafkaDirectReads;
|
||||||
|
extern const Event KafkaBackgroundReads;
|
||||||
|
extern const Event KafkaWrites;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
@ -58,6 +78,7 @@ struct StorageKafkaInterceptors
|
|||||||
static rd_kafka_resp_err_t rdKafkaOnThreadStart(rd_kafka_t *, rd_kafka_thread_type_t thread_type, const char *, void * ctx)
|
static rd_kafka_resp_err_t rdKafkaOnThreadStart(rd_kafka_t *, rd_kafka_thread_type_t thread_type, const char *, void * ctx)
|
||||||
{
|
{
|
||||||
StorageKafka * self = reinterpret_cast<StorageKafka *>(ctx);
|
StorageKafka * self = reinterpret_cast<StorageKafka *>(ctx);
|
||||||
|
CurrentMetrics::add(CurrentMetrics::KafkaLibrdkafkaThreads, 1);
|
||||||
|
|
||||||
const auto & storage_id = self->getStorageID();
|
const auto & storage_id = self->getStorageID();
|
||||||
const auto & table = storage_id.getTableName();
|
const auto & table = storage_id.getTableName();
|
||||||
@ -89,6 +110,7 @@ struct StorageKafkaInterceptors
|
|||||||
static rd_kafka_resp_err_t rdKafkaOnThreadExit(rd_kafka_t *, rd_kafka_thread_type_t, const char *, void * ctx)
|
static rd_kafka_resp_err_t rdKafkaOnThreadExit(rd_kafka_t *, rd_kafka_thread_type_t, const char *, void * ctx)
|
||||||
{
|
{
|
||||||
StorageKafka * self = reinterpret_cast<StorageKafka *>(ctx);
|
StorageKafka * self = reinterpret_cast<StorageKafka *>(ctx);
|
||||||
|
CurrentMetrics::sub(CurrentMetrics::KafkaLibrdkafkaThreads, 1);
|
||||||
|
|
||||||
std::lock_guard lock(self->thread_statuses_mutex);
|
std::lock_guard lock(self->thread_statuses_mutex);
|
||||||
const auto it = std::find_if(self->thread_statuses.begin(), self->thread_statuses.end(), [](const auto & thread_status_ptr)
|
const auto it = std::find_if(self->thread_statuses.begin(), self->thread_statuses.end(), [](const auto & thread_status_ptr)
|
||||||
@ -279,6 +301,8 @@ Pipe StorageKafka::read(
|
|||||||
if (mv_attached)
|
if (mv_attached)
|
||||||
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Cannot read from StorageKafka with attached materialized views");
|
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Cannot read from StorageKafka with attached materialized views");
|
||||||
|
|
||||||
|
ProfileEvents::increment(ProfileEvents::KafkaDirectReads);
|
||||||
|
|
||||||
/// Always use all consumers at once, otherwise SELECT may not read messages from all partitions.
|
/// Always use all consumers at once, otherwise SELECT may not read messages from all partitions.
|
||||||
Pipes pipes;
|
Pipes pipes;
|
||||||
pipes.reserve(num_created_consumers);
|
pipes.reserve(num_created_consumers);
|
||||||
@ -304,6 +328,9 @@ SinkToStoragePtr StorageKafka::write(const ASTPtr &, const StorageMetadataPtr &
|
|||||||
auto modified_context = Context::createCopy(local_context);
|
auto modified_context = Context::createCopy(local_context);
|
||||||
modified_context->applySettingsChanges(settings_adjustments);
|
modified_context->applySettingsChanges(settings_adjustments);
|
||||||
|
|
||||||
|
CurrentMetrics::Increment metric_increment{CurrentMetrics::KafkaWrites};
|
||||||
|
ProfileEvents::increment(ProfileEvents::KafkaWrites);
|
||||||
|
|
||||||
if (topics.size() > 1)
|
if (topics.size() > 1)
|
||||||
throw Exception("Can't write to Kafka table with multiple topics!", ErrorCodes::NOT_IMPLEMENTED);
|
throw Exception("Can't write to Kafka table with multiple topics!", ErrorCodes::NOT_IMPLEMENTED);
|
||||||
return std::make_shared<KafkaSink>(*this, metadata_snapshot, modified_context);
|
return std::make_shared<KafkaSink>(*this, metadata_snapshot, modified_context);
|
||||||
@ -358,6 +385,7 @@ void StorageKafka::pushReadBuffer(ConsumerBufferPtr buffer)
|
|||||||
std::lock_guard lock(mutex);
|
std::lock_guard lock(mutex);
|
||||||
buffers.push_back(buffer);
|
buffers.push_back(buffer);
|
||||||
semaphore.set();
|
semaphore.set();
|
||||||
|
CurrentMetrics::sub(CurrentMetrics::KafkaConsumersInUse, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -382,6 +410,7 @@ ConsumerBufferPtr StorageKafka::popReadBuffer(std::chrono::milliseconds timeout)
|
|||||||
std::lock_guard lock(mutex);
|
std::lock_guard lock(mutex);
|
||||||
auto buffer = buffers.back();
|
auto buffer = buffers.back();
|
||||||
buffers.pop_back();
|
buffers.pop_back();
|
||||||
|
CurrentMetrics::add(CurrentMetrics::KafkaConsumersInUse, 1);
|
||||||
return buffer;
|
return buffer;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -615,6 +644,9 @@ bool StorageKafka::streamToViews()
|
|||||||
if (!table)
|
if (!table)
|
||||||
throw Exception("Engine table " + table_id.getNameForLogs() + " doesn't exist.", ErrorCodes::LOGICAL_ERROR);
|
throw Exception("Engine table " + table_id.getNameForLogs() + " doesn't exist.", ErrorCodes::LOGICAL_ERROR);
|
||||||
|
|
||||||
|
CurrentMetrics::Increment metric_increment{CurrentMetrics::KafkaBackgroundReads};
|
||||||
|
ProfileEvents::increment(ProfileEvents::KafkaBackgroundReads);
|
||||||
|
|
||||||
auto storage_snapshot = getStorageSnapshot(getInMemoryMetadataPtr());
|
auto storage_snapshot = getStorageSnapshot(getInMemoryMetadataPtr());
|
||||||
|
|
||||||
// Create an INSERT query for streaming data
|
// Create an INSERT query for streaming data
|
||||||
|
@ -3,6 +3,16 @@
|
|||||||
#include "Columns/ColumnString.h"
|
#include "Columns/ColumnString.h"
|
||||||
#include "Columns/ColumnsNumber.h"
|
#include "Columns/ColumnsNumber.h"
|
||||||
|
|
||||||
|
#include <Common/ProfileEvents.h>
|
||||||
|
|
||||||
|
namespace ProfileEvents
|
||||||
|
{
|
||||||
|
extern const Event KafkaRowsWritten;
|
||||||
|
extern const Event KafkaProducerFlushes;
|
||||||
|
extern const Event KafkaMessagesProduced;
|
||||||
|
extern const Event KafkaProducerErrors;
|
||||||
|
}
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
WriteBufferToKafkaProducer::WriteBufferToKafkaProducer(
|
WriteBufferToKafkaProducer::WriteBufferToKafkaProducer(
|
||||||
@ -53,6 +63,8 @@ WriteBufferToKafkaProducer::~WriteBufferToKafkaProducer()
|
|||||||
|
|
||||||
void WriteBufferToKafkaProducer::countRow(const Columns & columns, size_t current_row)
|
void WriteBufferToKafkaProducer::countRow(const Columns & columns, size_t current_row)
|
||||||
{
|
{
|
||||||
|
ProfileEvents::increment(ProfileEvents::KafkaRowsWritten);
|
||||||
|
|
||||||
if (++rows % max_rows == 0)
|
if (++rows % max_rows == 0)
|
||||||
{
|
{
|
||||||
const std::string & last_chunk = chunks.back();
|
const std::string & last_chunk = chunks.back();
|
||||||
@ -103,8 +115,10 @@ void WriteBufferToKafkaProducer::countRow(const Columns & columns, size_t curren
|
|||||||
producer->poll(timeout);
|
producer->poll(timeout);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
ProfileEvents::increment(ProfileEvents::KafkaProducerErrors);
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
|
ProfileEvents::increment(ProfileEvents::KafkaMessagesProduced);
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@ -126,9 +140,12 @@ void WriteBufferToKafkaProducer::flush()
|
|||||||
{
|
{
|
||||||
if (e.get_error() == RD_KAFKA_RESP_ERR__TIMED_OUT)
|
if (e.get_error() == RD_KAFKA_RESP_ERR__TIMED_OUT)
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
|
ProfileEvents::increment(ProfileEvents::KafkaProducerErrors);
|
||||||
throw;
|
throw;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ProfileEvents::increment(ProfileEvents::KafkaProducerFlushes);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -7,6 +7,14 @@
|
|||||||
|
|
||||||
#include <list>
|
#include <list>
|
||||||
|
|
||||||
|
#include <Common/CurrentMetrics.h>
|
||||||
|
|
||||||
|
namespace CurrentMetrics
|
||||||
|
{
|
||||||
|
extern const Metric KafkaProducers;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
class Block;
|
class Block;
|
||||||
@ -32,6 +40,7 @@ private:
|
|||||||
void nextImpl() override;
|
void nextImpl() override;
|
||||||
void addChunk();
|
void addChunk();
|
||||||
void reinitializeChunks();
|
void reinitializeChunks();
|
||||||
|
CurrentMetrics::Increment metric_increment{CurrentMetrics::KafkaProducers};
|
||||||
|
|
||||||
ProducerPtr producer;
|
ProducerPtr producer;
|
||||||
const std::string topic;
|
const std::string topic;
|
||||||
|
Loading…
Reference in New Issue
Block a user