Change KafkaDirectReads to KafkaConsumersInUse

This commit is contained in:
Mikhail Filimonov 2022-04-05 20:25:05 +02:00
parent 53c7376e37
commit 3412be9d4d
No known key found for this signature in database
GPG Key ID: 6E49C2E9AF1220BE
2 changed files with 4 additions and 3 deletions

View File

@ -86,7 +86,7 @@
M(KafkaProducers, "Number of active Kafka producer created") \ M(KafkaProducers, "Number of active Kafka producer created") \
M(KafkaLibrdkafkaThreads, "Number of active librdkafka threads") \ M(KafkaLibrdkafkaThreads, "Number of active librdkafka threads") \
M(KafkaBackgroundReads, "Number of background reads currently working (populating materialized views from Kafka)") \ M(KafkaBackgroundReads, "Number of background reads currently working (populating materialized views from Kafka)") \
M(KafkaDirectReads, "Number of direct selects from Kafka currently executing") \ M(KafkaConsumersInUse, "Number of consumers which are currently used by direct or background reads") \
M(KafkaWrites, "Number of currently running inserts to Kafka") \ M(KafkaWrites, "Number of currently running inserts to Kafka") \
M(KafkaAssignedPartitions, "Number of partitions Kafka tables currently assigned to") \ M(KafkaAssignedPartitions, "Number of partitions Kafka tables currently assigned to") \

View File

@ -49,7 +49,7 @@ namespace CurrentMetrics
{ {
extern const Metric KafkaLibrdkafkaThreads; extern const Metric KafkaLibrdkafkaThreads;
extern const Metric KafkaBackgroundReads; extern const Metric KafkaBackgroundReads;
extern const Metric KafkaDirectReads; extern const Metric KafkaConsumersInUse;
extern const Metric KafkaWrites; extern const Metric KafkaWrites;
} }
@ -301,7 +301,6 @@ Pipe StorageKafka::read(
if (mv_attached) if (mv_attached)
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Cannot read from StorageKafka with attached materialized views"); throw Exception(ErrorCodes::QUERY_NOT_ALLOWED, "Cannot read from StorageKafka with attached materialized views");
CurrentMetrics::Increment metric_increment{CurrentMetrics::KafkaDirectReads};
ProfileEvents::increment(ProfileEvents::KafkaDirectReads); ProfileEvents::increment(ProfileEvents::KafkaDirectReads);
/// Always use all consumers at once, otherwise SELECT may not read messages from all partitions. /// Always use all consumers at once, otherwise SELECT may not read messages from all partitions.
@ -386,6 +385,7 @@ void StorageKafka::pushReadBuffer(ConsumerBufferPtr buffer)
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
buffers.push_back(buffer); buffers.push_back(buffer);
semaphore.set(); semaphore.set();
CurrentMetrics::sub(CurrentMetrics::KafkaConsumersInUse, 1);
} }
@ -410,6 +410,7 @@ ConsumerBufferPtr StorageKafka::popReadBuffer(std::chrono::milliseconds timeout)
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
auto buffer = buffers.back(); auto buffer = buffers.back();
buffers.pop_back(); buffers.pop_back();
CurrentMetrics::add(CurrentMetrics::KafkaConsumersInUse, 1);
return buffer; return buffer;
} }