Disable system.kafka_consumers by default (due to possible live memory leak)

It is not safe to use statistics because of how KafkaEngine works - it
pre-creates consumers, and this leads to the situation when this
statistics entries generated (RD_KAFKA_OP_STATS), but never consumed.

Which creates a live memory leak for a server with Kafka tables, but
without materialized view attached to it (and no SELECT).

Another problem is that this makes shutdown very slow, because of how
pending queue entries are handled in librdkafka, it uses
TAILQ_INSERT_SORTED, which is sorted insert into linked list, which
works incredibly slow (likely you will never wait till it ends and kill
the server)

For instance in my production setup the server was running for ~67 days
with such table, and it got 1'942'233 `TAILQ_INSERT_SORTED` entries
(which perfectly matches by the way - `67*86400/3` = 1'929'600), and it
moved only 289'806 entries for a few hours, though I'm not sure how much
time the process was in the running state, since most of the time it was
with debugger attached.

So for now let's disable it, to make this patch easy for backporting,
and I will think about long term fix - do not pre-create consumers in
Kafka engine.

Signed-off-by: Azat Khuzhin <a.khuzhin@semrush.com>
This commit is contained in:
Azat Khuzhin 2023-12-13 17:59:53 +01:00
parent 230e1fe6c7
commit 055c231438

View File

@ -661,10 +661,19 @@ void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config,
if (kafka_consumer_weak_ptr_ptr)
{
/// NOTE: statistics should be consumed, otherwise it creates too much
/// entries in the queue, that leads to memory leak and slow shutdown.
///
/// This is the case when you have kafka table but no SELECT from it or
/// materialized view attached.
///
/// So for now it is disabled by default, until properly fixed.
#if 0
if (!config.has(config_prefix + "." + "statistics_interval_ms"))
{
kafka_config.set("statistics.interval.ms", "3000"); // every 3 seconds by default. set to 0 to disable.
}
#endif
if (kafka_config.get("statistics.interval.ms") != "0")
{