mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 07:31:57 +00:00
Preserve KafkaConsumer objects
This will make system.kafka_consumers more useful, since after TTL
consumer object will be removed prio this patch, but after, all
information will be preserved.
Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
(cherry picked from commit 2ff0bfb0a1
)
This commit is contained in:
parent
71fdde76c2
commit
bea1610219
@ -46,15 +46,13 @@ const auto DRAIN_TIMEOUT_MS = 5000ms;
|
||||
|
||||
|
||||
KafkaConsumer::KafkaConsumer(
|
||||
ConsumerPtr consumer_,
|
||||
Poco::Logger * log_,
|
||||
size_t max_batch_size,
|
||||
size_t poll_timeout_,
|
||||
bool intermediate_commit_,
|
||||
const std::atomic<bool> & stopped_,
|
||||
const Names & _topics)
|
||||
: consumer(consumer_)
|
||||
, log(log_)
|
||||
: log(log_)
|
||||
, batch_size(max_batch_size)
|
||||
, poll_timeout(poll_timeout_)
|
||||
, intermediate_commit(intermediate_commit_)
|
||||
@ -63,6 +61,12 @@ KafkaConsumer::KafkaConsumer(
|
||||
, topics(_topics)
|
||||
, exceptions_buffer(EXCEPTIONS_DEPTH)
|
||||
{
|
||||
}
|
||||
|
||||
void KafkaConsumer::setConsumer(const ConsumerPtr & consumer_)
|
||||
{
|
||||
consumer = consumer_;
|
||||
|
||||
// called (synchronously, during poll) when we enter the consumer group
|
||||
consumer->set_assignment_callback([this](const cppkafka::TopicPartitionList & topic_partitions)
|
||||
{
|
||||
@ -135,6 +139,9 @@ KafkaConsumer::KafkaConsumer(
|
||||
|
||||
KafkaConsumer::~KafkaConsumer()
|
||||
{
|
||||
if (!consumer)
|
||||
return;
|
||||
|
||||
try
|
||||
{
|
||||
if (!consumer->get_subscription().empty())
|
||||
@ -568,6 +575,9 @@ void KafkaConsumer::setExceptionInfo(const std::string & text, bool with_stacktr
|
||||
*/
|
||||
std::string KafkaConsumer::getMemberId() const
|
||||
{
|
||||
if (!consumer)
|
||||
return "";
|
||||
|
||||
char * memberid_ptr = rd_kafka_memberid(consumer->get_handle());
|
||||
std::string memberid_string = memberid_ptr;
|
||||
rd_kafka_mem_free(nullptr, memberid_ptr);
|
||||
@ -578,8 +588,14 @@ std::string KafkaConsumer::getMemberId() const
|
||||
KafkaConsumer::Stat KafkaConsumer::getStat() const
|
||||
{
|
||||
KafkaConsumer::Stat::Assignments assignments;
|
||||
auto cpp_assignments = consumer->get_assignment();
|
||||
auto cpp_offsets = consumer->get_offsets_position(cpp_assignments);
|
||||
cppkafka::TopicPartitionList cpp_assignments;
|
||||
cppkafka::TopicPartitionList cpp_offsets;
|
||||
|
||||
if (consumer)
|
||||
{
|
||||
cpp_assignments = consumer->get_assignment();
|
||||
cpp_offsets = consumer->get_offsets_position(cpp_assignments);
|
||||
}
|
||||
|
||||
for (size_t num = 0; num < cpp_assignments.size(); ++num)
|
||||
{
|
||||
@ -591,7 +607,7 @@ KafkaConsumer::Stat KafkaConsumer::getStat() const
|
||||
}
|
||||
|
||||
return {
|
||||
.consumer_id = getMemberId() /* consumer->get_member_id() */ ,
|
||||
.consumer_id = getMemberId(),
|
||||
.assignments = std::move(assignments),
|
||||
.last_poll_time = last_poll_timestamp_usec.load(),
|
||||
.num_messages_read = num_messages_read.load(),
|
||||
@ -601,12 +617,18 @@ KafkaConsumer::Stat KafkaConsumer::getStat() const
|
||||
.num_commits = num_commits.load(),
|
||||
.num_rebalance_assignments = num_rebalance_assignments.load(),
|
||||
.num_rebalance_revocations = num_rebalance_revocations.load(),
|
||||
.exceptions_buffer = [&](){std::lock_guard<std::mutex> lock(exception_mutex);
|
||||
return exceptions_buffer;}(),
|
||||
.exceptions_buffer = [&]()
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(exception_mutex);
|
||||
return exceptions_buffer;
|
||||
}(),
|
||||
.in_use = in_use.load(),
|
||||
.last_used_usec = last_used_usec.load(),
|
||||
.rdkafka_stat = [&](){std::lock_guard<std::mutex> lock(rdkafka_stat_mutex);
|
||||
return rdkafka_stat;}(),
|
||||
.rdkafka_stat = [&]()
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(rdkafka_stat_mutex);
|
||||
return rdkafka_stat;
|
||||
}(),
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -61,9 +61,7 @@ public:
|
||||
std::string rdkafka_stat;
|
||||
};
|
||||
|
||||
public:
|
||||
KafkaConsumer(
|
||||
ConsumerPtr consumer_,
|
||||
Poco::Logger * log_,
|
||||
size_t max_batch_size,
|
||||
size_t poll_timeout_,
|
||||
@ -73,6 +71,11 @@ public:
|
||||
);
|
||||
|
||||
~KafkaConsumer();
|
||||
|
||||
void setConsumer(const ConsumerPtr & consumer);
|
||||
bool hasConsumer() const { return consumer.get() != nullptr; }
|
||||
ConsumerPtr && moveConsumer() { return std::move(consumer); }
|
||||
|
||||
void commit(); // Commit all processed messages.
|
||||
void subscribe(); // Subscribe internal consumer to topics.
|
||||
void unsubscribe(); // Unsubscribe internal consumer in case of failure.
|
||||
|
@ -32,6 +32,7 @@
|
||||
#include <boost/algorithm/string/replace.hpp>
|
||||
#include <boost/algorithm/string/split.hpp>
|
||||
#include <boost/algorithm/string/trim.hpp>
|
||||
#include <cppkafka/configuration.h>
|
||||
#include <librdkafka/rdkafka.h>
|
||||
#include <Poco/Util/AbstractConfiguration.h>
|
||||
#include <Common/Exception.h>
|
||||
@ -413,7 +414,9 @@ SinkToStoragePtr StorageKafka::write(const ASTPtr &, const StorageMetadataPtr &
|
||||
|
||||
void StorageKafka::startup()
|
||||
{
|
||||
all_consumers.resize(num_consumers);
|
||||
consumers.resize(num_consumers);
|
||||
for (size_t i = 0; i < num_consumers; ++i)
|
||||
consumers[i] = createKafkaConsumer(i);
|
||||
|
||||
// Start the reader thread
|
||||
for (auto & task : tasks)
|
||||
@ -462,7 +465,6 @@ void StorageKafka::pushConsumer(KafkaConsumerPtr consumer)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
consumer->notInUse();
|
||||
consumers.push_back(consumer);
|
||||
cv.notify_one();
|
||||
CurrentMetrics::sub(CurrentMetrics::KafkaConsumersInUse, 1);
|
||||
}
|
||||
@ -478,50 +480,106 @@ KafkaConsumerPtr StorageKafka::popConsumer(std::chrono::milliseconds timeout)
|
||||
{
|
||||
std::unique_lock lock(mutex);
|
||||
|
||||
KafkaConsumerPtr consumer_ptr;
|
||||
|
||||
/// 1. There is consumer available. Return one of them.
|
||||
if (!consumers.empty())
|
||||
KafkaConsumerPtr ret_consumer_ptr;
|
||||
std::optional<size_t> closed_consumer_index;
|
||||
for (size_t i = 0; i < consumers.size(); ++i)
|
||||
{
|
||||
consumer_ptr = consumers.back();
|
||||
consumers.pop_back();
|
||||
auto & consumer_ptr = consumers[i];
|
||||
|
||||
if (consumer_ptr->isInUse())
|
||||
continue;
|
||||
|
||||
if (consumer_ptr->hasConsumer())
|
||||
{
|
||||
ret_consumer_ptr = consumer_ptr;
|
||||
break;
|
||||
}
|
||||
|
||||
if (!closed_consumer_index.has_value() && !consumer_ptr->hasConsumer())
|
||||
{
|
||||
closed_consumer_index = i;
|
||||
}
|
||||
}
|
||||
|
||||
/// 1. There is consumer available - return it.
|
||||
if (ret_consumer_ptr)
|
||||
{
|
||||
/// Noop
|
||||
}
|
||||
/// 2. There is no consumer, but we can create a new one.
|
||||
else if (!ret_consumer_ptr && closed_consumer_index.has_value())
|
||||
{
|
||||
ret_consumer_ptr = consumers[*closed_consumer_index];
|
||||
/// It should be OK to create consumer under lock, since it should be fast (without subscribing).
|
||||
ret_consumer_ptr->setConsumer(createConsumer(*ret_consumer_ptr, *closed_consumer_index));
|
||||
}
|
||||
/// 3. There is no free consumer and num_consumers already created, waiting @timeout.
|
||||
else
|
||||
{
|
||||
auto expired_consumer = std::find_if(all_consumers.begin(), all_consumers.end(), [](const auto & consumer_weak_ptr)
|
||||
cv.wait_for(lock, timeout, [&]()
|
||||
{
|
||||
return consumer_weak_ptr.expired();
|
||||
});
|
||||
|
||||
/// 2. There is no consumer, but we can create a new one.
|
||||
if (expired_consumer != all_consumers.end())
|
||||
{
|
||||
size_t consumer_number = std::distance(all_consumers.begin(), expired_consumer);
|
||||
/// It should be OK to create consumer under lock, since it should be fast (without subscribing).
|
||||
consumer_ptr = createConsumer(consumer_number);
|
||||
*expired_consumer = consumer_ptr;
|
||||
}
|
||||
/// 3. There is no consumer and num_consumers already created, waiting @timeout.
|
||||
else
|
||||
{
|
||||
if (cv.wait_for(lock, timeout, [&]() { return !consumers.empty(); }))
|
||||
/// Note we are waiting only opened, free, consumers, since consumer cannot be closed right now
|
||||
auto it = std::find_if(consumers.begin(), consumers.end(), [](const auto & ptr)
|
||||
{
|
||||
consumer_ptr = consumers.back();
|
||||
consumers.pop_back();
|
||||
return !ptr->isInUse() && ptr->hasConsumer();
|
||||
});
|
||||
if (it != consumers.end())
|
||||
{
|
||||
ret_consumer_ptr = *it;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
});
|
||||
}
|
||||
|
||||
if (consumer_ptr)
|
||||
if (ret_consumer_ptr)
|
||||
{
|
||||
CurrentMetrics::add(CurrentMetrics::KafkaConsumersInUse, 1);
|
||||
consumer_ptr->inUse();
|
||||
ret_consumer_ptr->inUse();
|
||||
}
|
||||
return consumer_ptr;
|
||||
return ret_consumer_ptr;
|
||||
}
|
||||
|
||||
|
||||
KafkaConsumerPtr StorageKafka::createConsumer(size_t consumer_number)
|
||||
KafkaConsumerPtr StorageKafka::createKafkaConsumer(size_t consumer_number)
|
||||
{
|
||||
/// NOTE: we pass |stream_cancelled| by reference here, so the buffers should not outlive the storage.
|
||||
auto & stream_cancelled = thread_per_consumer ? tasks[consumer_number]->stream_cancelled : tasks.back()->stream_cancelled;
|
||||
|
||||
KafkaConsumerPtr kafka_consumer_ptr = std::make_shared<KafkaConsumer>(
|
||||
log,
|
||||
getPollMaxBatchSize(),
|
||||
getPollTimeoutMillisecond(),
|
||||
intermediate_commit,
|
||||
stream_cancelled,
|
||||
topics);
|
||||
return kafka_consumer_ptr;
|
||||
}
|
||||
|
||||
ConsumerPtr StorageKafka::createConsumer(KafkaConsumer & kafka_consumer, size_t consumer_number)
|
||||
{
|
||||
cppkafka::Configuration consumer_config = getConsumerConfiguration(consumer_number);
|
||||
|
||||
/// Using KafkaConsumer by reference should be safe, since
|
||||
/// cppkafka::Consumer can poll messages (including statistics, which will
|
||||
/// trigger the callback below) only via KafkaConsumer.
|
||||
if (consumer_config.get("statistics.interval.ms") != "0")
|
||||
{
|
||||
consumer_config.set_stats_callback([&kafka_consumer](cppkafka::KafkaHandleBase &, const std::string & stat_json)
|
||||
{
|
||||
kafka_consumer.setRDKafkaStat(stat_json);
|
||||
});
|
||||
}
|
||||
|
||||
auto consumer_ptr = std::make_shared<cppkafka::Consumer>(consumer_config);
|
||||
consumer_ptr->set_destroy_flags(RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE);
|
||||
|
||||
LOG_TRACE(log, "Created #{} consumer", consumer_number);
|
||||
|
||||
return consumer_ptr;
|
||||
}
|
||||
|
||||
cppkafka::Configuration StorageKafka::getConsumerConfiguration(size_t consumer_number)
|
||||
{
|
||||
cppkafka::Configuration conf;
|
||||
|
||||
@ -546,37 +604,14 @@ KafkaConsumerPtr StorageKafka::createConsumer(size_t consumer_number)
|
||||
size_t max_allowed_queued_min_messages = 10000000; // must be less than or equal to max allowed value
|
||||
conf.set("queued.min.messages", std::min(std::max(getMaxBlockSize(), default_queued_min_messages), max_allowed_queued_min_messages));
|
||||
|
||||
/// a reference to the consumer is needed in statistic callback
|
||||
/// although the consumer does not exist when callback is being registered
|
||||
/// shared_ptr<weak_ptr<KafkaConsumer>> comes to the rescue
|
||||
auto consumer_weak_ptr_ptr = std::make_shared<KafkaConsumerWeakPtr>();
|
||||
updateConfiguration(conf, consumer_weak_ptr_ptr);
|
||||
updateConfiguration(conf);
|
||||
|
||||
// those settings should not be changed by users.
|
||||
conf.set("enable.auto.commit", "false"); // We manually commit offsets after a stream successfully finished
|
||||
conf.set("enable.auto.offset.store", "false"); // Update offset automatically - to commit them all at once.
|
||||
conf.set("enable.partition.eof", "false"); // Ignore EOF messages
|
||||
|
||||
// Create a consumer and subscribe to topics
|
||||
auto consumer_impl = std::make_shared<cppkafka::Consumer>(conf);
|
||||
consumer_impl->set_destroy_flags(RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE);
|
||||
|
||||
KafkaConsumerPtr kafka_consumer_ptr;
|
||||
|
||||
/// NOTE: we pass |stream_cancelled| by reference here, so the buffers should not outlive the storage.
|
||||
if (thread_per_consumer)
|
||||
{
|
||||
auto& stream_cancelled = tasks[consumer_number]->stream_cancelled;
|
||||
kafka_consumer_ptr = std::make_shared<KafkaConsumer>(consumer_impl, log, getPollMaxBatchSize(), getPollTimeoutMillisecond(), intermediate_commit, stream_cancelled, topics);
|
||||
}
|
||||
else
|
||||
{
|
||||
kafka_consumer_ptr = std::make_shared<KafkaConsumer>(consumer_impl, log, getPollMaxBatchSize(), getPollTimeoutMillisecond(), intermediate_commit, tasks.back()->stream_cancelled, topics);
|
||||
}
|
||||
LOG_TRACE(log, "Created #{} consumer", consumer_number);
|
||||
|
||||
*consumer_weak_ptr_ptr = kafka_consumer_ptr;
|
||||
return kafka_consumer_ptr;
|
||||
return conf;
|
||||
}
|
||||
|
||||
void StorageKafka::cleanConsumers()
|
||||
@ -585,34 +620,28 @@ void StorageKafka::cleanConsumers()
|
||||
UInt64 now_usec = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
|
||||
|
||||
/// Copy consumers for closing to a new vector to close them without a lock
|
||||
std::vector<KafkaConsumerPtr> consumers_to_close;
|
||||
std::vector<ConsumerPtr> consumers_to_close;
|
||||
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
|
||||
for (auto it = consumers.begin(); it != consumers.end();)
|
||||
for (size_t i = 0; i < consumers.size(); ++i)
|
||||
{
|
||||
auto & consumer_ptr = *it;
|
||||
auto & consumer_ptr = consumers[i];
|
||||
|
||||
UInt64 consumer_last_used_usec = consumer_ptr->getLastUsedUsec();
|
||||
chassert(consumer_last_used_usec <= now_usec);
|
||||
|
||||
if (!consumer_ptr->hasConsumer())
|
||||
continue;
|
||||
if (consumer_ptr->isInUse())
|
||||
continue;
|
||||
|
||||
if (now_usec - consumer_last_used_usec > CONSUMER_TTL_USEC)
|
||||
{
|
||||
/// We need this only to get the consumer number.
|
||||
auto weak_it = std::find_if(all_consumers.begin(), all_consumers.end(), [&](const auto & consume_weak_ptr)
|
||||
{
|
||||
return consumer_ptr == consume_weak_ptr.lock();
|
||||
});
|
||||
chassert(weak_it != all_consumers.end());
|
||||
size_t consumer_number = std::distance(all_consumers.begin(), weak_it);
|
||||
|
||||
LOG_TRACE(log, "Closing #{} consumer (id: {})", consumer_number, consumer_ptr->getMemberId());
|
||||
|
||||
consumers_to_close.push_back(std::move(consumer_ptr));
|
||||
it = consumers.erase(it);
|
||||
LOG_TRACE(log, "Closing #{} consumer (id: {})", i, consumer_ptr->getMemberId());
|
||||
consumers_to_close.push_back(consumer_ptr->moveConsumer());
|
||||
}
|
||||
else
|
||||
++it;
|
||||
}
|
||||
}
|
||||
|
||||
@ -656,8 +685,7 @@ String StorageKafka::getConfigPrefix() const
|
||||
return CONFIG_KAFKA_TAG;
|
||||
}
|
||||
|
||||
void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config,
|
||||
std::shared_ptr<KafkaConsumerWeakPtr> kafka_consumer_weak_ptr_ptr)
|
||||
void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config)
|
||||
{
|
||||
// Update consumer configuration from the configuration. Example:
|
||||
// <kafka>
|
||||
@ -737,31 +765,16 @@ void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config,
|
||||
LOG_IMPL(log, client_logs_level, poco_level, "[rdk:{}] {}", facility, message);
|
||||
});
|
||||
|
||||
if (kafka_consumer_weak_ptr_ptr)
|
||||
/// NOTE: statistics should be consumed, otherwise it creates too much
|
||||
/// entries in the queue, that leads to memory leak and slow shutdown.
|
||||
///
|
||||
/// This is the case when you have kafka table but no SELECT from it or
|
||||
/// materialized view attached.
|
||||
///
|
||||
/// So for now it is disabled by default, until properly fixed.
|
||||
if (!config.has(config_prefix + "." + "statistics_interval_ms"))
|
||||
{
|
||||
/// NOTE: statistics should be consumed, otherwise it creates too much
|
||||
/// entries in the queue, that leads to memory leak and slow shutdown.
|
||||
///
|
||||
/// This is the case when you have kafka table but no SELECT from it or
|
||||
/// materialized view attached.
|
||||
///
|
||||
/// So for now it is disabled by default, until properly fixed.
|
||||
if (!config.has(config_prefix + "." + "statistics_interval_ms"))
|
||||
{
|
||||
kafka_config.set("statistics.interval.ms", "3000"); // every 3 seconds by default. set to 0 to disable.
|
||||
}
|
||||
|
||||
if (kafka_config.get("statistics.interval.ms") != "0")
|
||||
{
|
||||
kafka_config.set_stats_callback([kafka_consumer_weak_ptr_ptr](cppkafka::KafkaHandleBase &, const std::string & stat_json_string)
|
||||
{
|
||||
auto kafka_consumer_ptr = kafka_consumer_weak_ptr_ptr->lock();
|
||||
if (kafka_consumer_ptr)
|
||||
{
|
||||
kafka_consumer_ptr->setRDKafkaStat(stat_json_string);
|
||||
}
|
||||
});
|
||||
}
|
||||
kafka_config.set("statistics.interval.ms", "3000"); // every 3 seconds by default. set to 0 to disable.
|
||||
}
|
||||
|
||||
// Configure interceptor to change thread name
|
||||
@ -832,7 +845,7 @@ void StorageKafka::threadFunc(size_t idx)
|
||||
mv_attached.store(true);
|
||||
|
||||
// Keep streaming as long as there are attached views and streaming is not cancelled
|
||||
while (!task->stream_cancelled && !all_consumers.empty())
|
||||
while (!task->stream_cancelled)
|
||||
{
|
||||
if (!checkDependencies(table_id))
|
||||
break;
|
||||
@ -869,13 +882,10 @@ void StorageKafka::threadFunc(size_t idx)
|
||||
LOG_ERROR(log, "{} {}", __PRETTY_FUNCTION__, exception_str);
|
||||
|
||||
auto safe_consumers = getSafeConsumers();
|
||||
for (auto const & consumer_ptr_weak : safe_consumers.consumers)
|
||||
for (auto const & consumer_ptr : safe_consumers.consumers)
|
||||
{
|
||||
/// propagate materialized view exception to all consumers
|
||||
if (auto consumer_ptr = consumer_ptr_weak.lock())
|
||||
{
|
||||
consumer_ptr->setExceptionInfo(exception_str, false /* no stacktrace, reuse passed one */);
|
||||
}
|
||||
consumer_ptr->setExceptionInfo(exception_str, false /* no stacktrace, reuse passed one */);
|
||||
}
|
||||
}
|
||||
|
||||
@ -922,7 +932,7 @@ bool StorageKafka::streamToViews()
|
||||
std::vector<std::shared_ptr<KafkaSource>> sources;
|
||||
Pipes pipes;
|
||||
|
||||
auto stream_count = thread_per_consumer ? 1 : all_consumers.size();
|
||||
auto stream_count = thread_per_consumer ? 1 : num_consumers;
|
||||
sources.reserve(stream_count);
|
||||
pipes.reserve(stream_count);
|
||||
for (size_t i = 0; i < stream_count; ++i)
|
||||
|
@ -13,13 +13,7 @@
|
||||
#include <mutex>
|
||||
#include <list>
|
||||
#include <atomic>
|
||||
|
||||
namespace cppkafka
|
||||
{
|
||||
|
||||
class Configuration;
|
||||
|
||||
}
|
||||
#include <cppkafka/cppkafka.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -29,7 +23,7 @@ class StorageSystemKafkaConsumers;
|
||||
struct StorageKafkaInterceptors;
|
||||
|
||||
using KafkaConsumerPtr = std::shared_ptr<KafkaConsumer>;
|
||||
using KafkaConsumerWeakPtr = std::weak_ptr<KafkaConsumer>;
|
||||
using ConsumerPtr = std::shared_ptr<cppkafka::Consumer>;
|
||||
|
||||
/** Implements a Kafka queue table engine that can be used as a persistent queue / buffer,
|
||||
* or as a basic building block for creating pipelines with a continuous insertion / ETL.
|
||||
@ -85,10 +79,10 @@ public:
|
||||
{
|
||||
std::shared_ptr<IStorage> storage_ptr;
|
||||
std::unique_lock<std::mutex> lock;
|
||||
std::vector<KafkaConsumerWeakPtr> & consumers;
|
||||
std::vector<KafkaConsumerPtr> & consumers;
|
||||
};
|
||||
|
||||
SafeConsumers getSafeConsumers() { return {shared_from_this(), std::unique_lock(mutex), all_consumers}; }
|
||||
SafeConsumers getSafeConsumers() { return {shared_from_this(), std::unique_lock(mutex), consumers}; }
|
||||
|
||||
private:
|
||||
// Configuration and state
|
||||
@ -108,8 +102,7 @@ private:
|
||||
|
||||
std::atomic<bool> mv_attached = false;
|
||||
|
||||
std::vector<KafkaConsumerPtr> consumers; /// available consumers
|
||||
std::vector<KafkaConsumerWeakPtr> all_consumers; /// busy (belong to a KafkaSource) and vacant consumers
|
||||
std::vector<KafkaConsumerPtr> consumers;
|
||||
|
||||
std::mutex mutex;
|
||||
std::condition_variable cv;
|
||||
@ -131,7 +124,12 @@ private:
|
||||
std::list<std::shared_ptr<ThreadStatus>> thread_statuses;
|
||||
|
||||
SettingsChanges createSettingsAdjustments();
|
||||
KafkaConsumerPtr createConsumer(size_t consumer_number);
|
||||
/// Creates KafkaConsumer object without real consumer (cppkafka::Consumer)
|
||||
KafkaConsumerPtr createKafkaConsumer(size_t consumer_number);
|
||||
/// Creates real cppkafka::Consumer object
|
||||
ConsumerPtr createConsumer(KafkaConsumer & kafka_consumer, size_t consumer_number);
|
||||
/// Returns consumer configuration with all changes that had been overwritten in config
|
||||
cppkafka::Configuration getConsumerConfiguration(size_t consumer_number);
|
||||
|
||||
/// If named_collection is specified.
|
||||
String collection_name;
|
||||
@ -139,11 +137,7 @@ private:
|
||||
std::atomic<bool> shutdown_called = false;
|
||||
|
||||
// Update Kafka configuration with values from CH user configuration.
|
||||
void updateConfiguration(cppkafka::Configuration & kafka_config, std::shared_ptr<KafkaConsumerWeakPtr>);
|
||||
void updateConfiguration(cppkafka::Configuration & kafka_config)
|
||||
{
|
||||
updateConfiguration(kafka_config, std::make_shared<KafkaConsumerWeakPtr>());
|
||||
}
|
||||
void updateConfiguration(cppkafka::Configuration & kafka_config);
|
||||
|
||||
String getConfigPrefix() const;
|
||||
void threadFunc(size_t idx);
|
||||
|
@ -98,58 +98,55 @@ void StorageSystemKafkaConsumers::fillData(MutableColumns & res_columns, Context
|
||||
|
||||
auto safe_consumers = storage_kafka_ptr->getSafeConsumers();
|
||||
|
||||
for (const auto & weak_consumer : safe_consumers.consumers)
|
||||
for (const auto & consumer : safe_consumers.consumers)
|
||||
{
|
||||
if (auto consumer = weak_consumer.lock())
|
||||
auto consumer_stat = consumer->getStat();
|
||||
|
||||
database.insertData(database_str.data(), database_str.size());
|
||||
table.insertData(table_str.data(), table_str.size());
|
||||
|
||||
consumer_id.insertData(consumer_stat.consumer_id.data(), consumer_stat.consumer_id.size());
|
||||
|
||||
const auto num_assignnemts = consumer_stat.assignments.size();
|
||||
|
||||
for (size_t num = 0; num < num_assignnemts; ++num)
|
||||
{
|
||||
auto consumer_stat = consumer->getStat();
|
||||
const auto & assign = consumer_stat.assignments[num];
|
||||
|
||||
database.insertData(database_str.data(), database_str.size());
|
||||
table.insertData(table_str.data(), table_str.size());
|
||||
assigments_topics.insertData(assign.topic_str.data(), assign.topic_str.size());
|
||||
|
||||
consumer_id.insertData(consumer_stat.consumer_id.data(), consumer_stat.consumer_id.size());
|
||||
|
||||
const auto num_assignnemts = consumer_stat.assignments.size();
|
||||
|
||||
for (size_t num = 0; num < num_assignnemts; ++num)
|
||||
{
|
||||
const auto & assign = consumer_stat.assignments[num];
|
||||
|
||||
assigments_topics.insertData(assign.topic_str.data(), assign.topic_str.size());
|
||||
|
||||
assigments_partition_id.insert(assign.partition_id);
|
||||
assigments_current_offset.insert(assign.current_offset);
|
||||
}
|
||||
last_assignment_num += num_assignnemts;
|
||||
|
||||
assigments_topics_offsets.push_back(last_assignment_num);
|
||||
assigments_partition_id_offsets.push_back(last_assignment_num);
|
||||
assigments_current_offset_offsets.push_back(last_assignment_num);
|
||||
|
||||
for (const auto & exc : consumer_stat.exceptions_buffer)
|
||||
{
|
||||
exceptions_text.insertData(exc.text.data(), exc.text.size());
|
||||
exceptions_time.insert(exc.timestamp_usec);
|
||||
}
|
||||
exceptions_num += consumer_stat.exceptions_buffer.size();
|
||||
exceptions_text_offset.push_back(exceptions_num);
|
||||
exceptions_time_offset.push_back(exceptions_num);
|
||||
|
||||
|
||||
last_poll_time.insert(consumer_stat.last_poll_time);
|
||||
num_messages_read.insert(consumer_stat.num_messages_read);
|
||||
last_commit_time.insert(consumer_stat.last_commit_timestamp_usec);
|
||||
num_commits.insert(consumer_stat.num_commits);
|
||||
last_rebalance_time.insert(consumer_stat.last_rebalance_timestamp_usec);
|
||||
|
||||
num_rebalance_revocations.insert(consumer_stat.num_rebalance_revocations);
|
||||
num_rebalance_assigments.insert(consumer_stat.num_rebalance_assignments);
|
||||
|
||||
is_currently_used.insert(consumer_stat.in_use);
|
||||
last_used.insert(consumer_stat.last_used_usec);
|
||||
|
||||
rdkafka_stat.insertData(consumer_stat.rdkafka_stat.data(), consumer_stat.rdkafka_stat.size());
|
||||
assigments_partition_id.insert(assign.partition_id);
|
||||
assigments_current_offset.insert(assign.current_offset);
|
||||
}
|
||||
last_assignment_num += num_assignnemts;
|
||||
|
||||
assigments_topics_offsets.push_back(last_assignment_num);
|
||||
assigments_partition_id_offsets.push_back(last_assignment_num);
|
||||
assigments_current_offset_offsets.push_back(last_assignment_num);
|
||||
|
||||
for (const auto & exc : consumer_stat.exceptions_buffer)
|
||||
{
|
||||
exceptions_text.insertData(exc.text.data(), exc.text.size());
|
||||
exceptions_time.insert(exc.timestamp_usec);
|
||||
}
|
||||
exceptions_num += consumer_stat.exceptions_buffer.size();
|
||||
exceptions_text_offset.push_back(exceptions_num);
|
||||
exceptions_time_offset.push_back(exceptions_num);
|
||||
|
||||
|
||||
last_poll_time.insert(consumer_stat.last_poll_time);
|
||||
num_messages_read.insert(consumer_stat.num_messages_read);
|
||||
last_commit_time.insert(consumer_stat.last_commit_timestamp_usec);
|
||||
num_commits.insert(consumer_stat.num_commits);
|
||||
last_rebalance_time.insert(consumer_stat.last_rebalance_timestamp_usec);
|
||||
|
||||
num_rebalance_revocations.insert(consumer_stat.num_rebalance_revocations);
|
||||
num_rebalance_assigments.insert(consumer_stat.num_rebalance_assignments);
|
||||
|
||||
is_currently_used.insert(consumer_stat.in_use);
|
||||
last_used.insert(consumer_stat.last_used_usec);
|
||||
|
||||
rdkafka_stat.insertData(consumer_stat.rdkafka_stat.data(), consumer_stat.rdkafka_stat.size());
|
||||
}
|
||||
};
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user