mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 09:32:06 +00:00
Merge pull request #57829 from azat/kafka-fix-stat-leak
Create consumers for Kafka tables on fly (but keep them for some period since last used)
This commit is contained in:
commit
1a93fd7f7d
@ -28,25 +28,31 @@ namespace ErrorCodes
|
|||||||
static thread_local char thread_name[THREAD_NAME_SIZE]{};
|
static thread_local char thread_name[THREAD_NAME_SIZE]{};
|
||||||
|
|
||||||
|
|
||||||
void setThreadName(const char * name)
|
void setThreadName(const char * name, bool truncate)
|
||||||
{
|
{
|
||||||
if (strlen(name) > THREAD_NAME_SIZE - 1)
|
size_t name_len = strlen(name);
|
||||||
|
if (!truncate && name_len > THREAD_NAME_SIZE - 1)
|
||||||
throw DB::Exception(DB::ErrorCodes::PTHREAD_ERROR, "Thread name cannot be longer than 15 bytes");
|
throw DB::Exception(DB::ErrorCodes::PTHREAD_ERROR, "Thread name cannot be longer than 15 bytes");
|
||||||
|
|
||||||
|
size_t name_capped_len = std::min<size_t>(1 + name_len, THREAD_NAME_SIZE - 1);
|
||||||
|
char name_capped[THREAD_NAME_SIZE];
|
||||||
|
memcpy(name_capped, name, name_capped_len);
|
||||||
|
name_capped[name_capped_len] = '\0';
|
||||||
|
|
||||||
#if defined(OS_FREEBSD)
|
#if defined(OS_FREEBSD)
|
||||||
pthread_set_name_np(pthread_self(), name);
|
pthread_set_name_np(pthread_self(), name_capped);
|
||||||
if ((false))
|
if ((false))
|
||||||
#elif defined(OS_DARWIN)
|
#elif defined(OS_DARWIN)
|
||||||
if (0 != pthread_setname_np(name))
|
if (0 != pthread_setname_np(name_capped))
|
||||||
#elif defined(OS_SUNOS)
|
#elif defined(OS_SUNOS)
|
||||||
if (0 != pthread_setname_np(pthread_self(), name))
|
if (0 != pthread_setname_np(pthread_self(), name_capped))
|
||||||
#else
|
#else
|
||||||
if (0 != prctl(PR_SET_NAME, name, 0, 0, 0))
|
if (0 != prctl(PR_SET_NAME, name_capped, 0, 0, 0))
|
||||||
#endif
|
#endif
|
||||||
if (errno != ENOSYS && errno != EPERM) /// It's ok if the syscall is unsupported or not allowed in some environments.
|
if (errno != ENOSYS && errno != EPERM) /// It's ok if the syscall is unsupported or not allowed in some environments.
|
||||||
throw DB::ErrnoException(DB::ErrorCodes::PTHREAD_ERROR, "Cannot set thread name with prctl(PR_SET_NAME, ...)");
|
throw DB::ErrnoException(DB::ErrorCodes::PTHREAD_ERROR, "Cannot set thread name with prctl(PR_SET_NAME, ...)");
|
||||||
|
|
||||||
memcpy(thread_name, name, std::min<size_t>(1 + strlen(name), THREAD_NAME_SIZE - 1));
|
memcpy(thread_name, name_capped, name_capped_len);
|
||||||
}
|
}
|
||||||
|
|
||||||
const char * getThreadName()
|
const char * getThreadName()
|
||||||
|
@ -4,7 +4,9 @@
|
|||||||
/** Sets the thread name (maximum length is 15 bytes),
|
/** Sets the thread name (maximum length is 15 bytes),
|
||||||
* which will be visible in ps, gdb, /proc,
|
* which will be visible in ps, gdb, /proc,
|
||||||
* for convenience of observation and debugging.
|
* for convenience of observation and debugging.
|
||||||
|
*
|
||||||
|
* @param truncate - if true, will truncate to 15 automatically, otherwise throw
|
||||||
*/
|
*/
|
||||||
void setThreadName(const char * name);
|
void setThreadName(const char * name, bool truncate = false);
|
||||||
|
|
||||||
const char * getThreadName();
|
const char * getThreadName();
|
||||||
|
@ -13,6 +13,7 @@
|
|||||||
|
|
||||||
#include <Common/CurrentMetrics.h>
|
#include <Common/CurrentMetrics.h>
|
||||||
#include <Common/ProfileEvents.h>
|
#include <Common/ProfileEvents.h>
|
||||||
|
#include <base/defines.h>
|
||||||
|
|
||||||
namespace CurrentMetrics
|
namespace CurrentMetrics
|
||||||
{
|
{
|
||||||
@ -46,15 +47,13 @@ const auto DRAIN_TIMEOUT_MS = 5000ms;
|
|||||||
|
|
||||||
|
|
||||||
KafkaConsumer::KafkaConsumer(
|
KafkaConsumer::KafkaConsumer(
|
||||||
ConsumerPtr consumer_,
|
|
||||||
Poco::Logger * log_,
|
Poco::Logger * log_,
|
||||||
size_t max_batch_size,
|
size_t max_batch_size,
|
||||||
size_t poll_timeout_,
|
size_t poll_timeout_,
|
||||||
bool intermediate_commit_,
|
bool intermediate_commit_,
|
||||||
const std::atomic<bool> & stopped_,
|
const std::atomic<bool> & stopped_,
|
||||||
const Names & _topics)
|
const Names & _topics)
|
||||||
: consumer(consumer_)
|
: log(log_)
|
||||||
, log(log_)
|
|
||||||
, batch_size(max_batch_size)
|
, batch_size(max_batch_size)
|
||||||
, poll_timeout(poll_timeout_)
|
, poll_timeout(poll_timeout_)
|
||||||
, intermediate_commit(intermediate_commit_)
|
, intermediate_commit(intermediate_commit_)
|
||||||
@ -63,6 +62,25 @@ KafkaConsumer::KafkaConsumer(
|
|||||||
, topics(_topics)
|
, topics(_topics)
|
||||||
, exceptions_buffer(EXCEPTIONS_DEPTH)
|
, exceptions_buffer(EXCEPTIONS_DEPTH)
|
||||||
{
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
void KafkaConsumer::createConsumer(cppkafka::Configuration consumer_config)
|
||||||
|
{
|
||||||
|
chassert(!consumer.get());
|
||||||
|
|
||||||
|
/// Using this should be safe, since cppkafka::Consumer can poll messages
|
||||||
|
/// (including statistics, which will trigger the callback below) only via
|
||||||
|
/// KafkaConsumer.
|
||||||
|
if (consumer_config.get("statistics.interval.ms") != "0")
|
||||||
|
{
|
||||||
|
consumer_config.set_stats_callback([this](cppkafka::KafkaHandleBase &, const std::string & stat_json)
|
||||||
|
{
|
||||||
|
setRDKafkaStat(stat_json);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
consumer = std::make_shared<cppkafka::Consumer>(consumer_config);
|
||||||
|
consumer->set_destroy_flags(RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE);
|
||||||
|
|
||||||
// called (synchronously, during poll) when we enter the consumer group
|
// called (synchronously, during poll) when we enter the consumer group
|
||||||
consumer->set_assignment_callback([this](const cppkafka::TopicPartitionList & topic_partitions)
|
consumer->set_assignment_callback([this](const cppkafka::TopicPartitionList & topic_partitions)
|
||||||
{
|
{
|
||||||
@ -135,6 +153,9 @@ KafkaConsumer::KafkaConsumer(
|
|||||||
|
|
||||||
KafkaConsumer::~KafkaConsumer()
|
KafkaConsumer::~KafkaConsumer()
|
||||||
{
|
{
|
||||||
|
if (!consumer)
|
||||||
|
return;
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
if (!consumer->get_subscription().empty())
|
if (!consumer->get_subscription().empty())
|
||||||
@ -568,6 +589,9 @@ void KafkaConsumer::setExceptionInfo(const std::string & text, bool with_stacktr
|
|||||||
*/
|
*/
|
||||||
std::string KafkaConsumer::getMemberId() const
|
std::string KafkaConsumer::getMemberId() const
|
||||||
{
|
{
|
||||||
|
if (!consumer)
|
||||||
|
return "";
|
||||||
|
|
||||||
char * memberid_ptr = rd_kafka_memberid(consumer->get_handle());
|
char * memberid_ptr = rd_kafka_memberid(consumer->get_handle());
|
||||||
std::string memberid_string = memberid_ptr;
|
std::string memberid_string = memberid_ptr;
|
||||||
rd_kafka_mem_free(nullptr, memberid_ptr);
|
rd_kafka_mem_free(nullptr, memberid_ptr);
|
||||||
@ -578,8 +602,14 @@ std::string KafkaConsumer::getMemberId() const
|
|||||||
KafkaConsumer::Stat KafkaConsumer::getStat() const
|
KafkaConsumer::Stat KafkaConsumer::getStat() const
|
||||||
{
|
{
|
||||||
KafkaConsumer::Stat::Assignments assignments;
|
KafkaConsumer::Stat::Assignments assignments;
|
||||||
auto cpp_assignments = consumer->get_assignment();
|
cppkafka::TopicPartitionList cpp_assignments;
|
||||||
auto cpp_offsets = consumer->get_offsets_position(cpp_assignments);
|
cppkafka::TopicPartitionList cpp_offsets;
|
||||||
|
|
||||||
|
if (consumer)
|
||||||
|
{
|
||||||
|
cpp_assignments = consumer->get_assignment();
|
||||||
|
cpp_offsets = consumer->get_offsets_position(cpp_assignments);
|
||||||
|
}
|
||||||
|
|
||||||
for (size_t num = 0; num < cpp_assignments.size(); ++num)
|
for (size_t num = 0; num < cpp_assignments.size(); ++num)
|
||||||
{
|
{
|
||||||
@ -591,7 +621,7 @@ KafkaConsumer::Stat KafkaConsumer::getStat() const
|
|||||||
}
|
}
|
||||||
|
|
||||||
return {
|
return {
|
||||||
.consumer_id = getMemberId() /* consumer->get_member_id() */ ,
|
.consumer_id = getMemberId(),
|
||||||
.assignments = std::move(assignments),
|
.assignments = std::move(assignments),
|
||||||
.last_poll_time = last_poll_timestamp_usec.load(),
|
.last_poll_time = last_poll_timestamp_usec.load(),
|
||||||
.num_messages_read = num_messages_read.load(),
|
.num_messages_read = num_messages_read.load(),
|
||||||
@ -601,11 +631,18 @@ KafkaConsumer::Stat KafkaConsumer::getStat() const
|
|||||||
.num_commits = num_commits.load(),
|
.num_commits = num_commits.load(),
|
||||||
.num_rebalance_assignments = num_rebalance_assignments.load(),
|
.num_rebalance_assignments = num_rebalance_assignments.load(),
|
||||||
.num_rebalance_revocations = num_rebalance_revocations.load(),
|
.num_rebalance_revocations = num_rebalance_revocations.load(),
|
||||||
.exceptions_buffer = [&](){std::lock_guard<std::mutex> lock(exception_mutex);
|
.exceptions_buffer = [&]()
|
||||||
return exceptions_buffer;}(),
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(exception_mutex);
|
||||||
|
return exceptions_buffer;
|
||||||
|
}(),
|
||||||
.in_use = in_use.load(),
|
.in_use = in_use.load(),
|
||||||
.rdkafka_stat = [&](){std::lock_guard<std::mutex> lock(rdkafka_stat_mutex);
|
.last_used_usec = last_used_usec.load(),
|
||||||
return rdkafka_stat;}(),
|
.rdkafka_stat = [&]()
|
||||||
|
{
|
||||||
|
std::lock_guard<std::mutex> lock(rdkafka_stat_mutex);
|
||||||
|
return rdkafka_stat;
|
||||||
|
}(),
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -57,12 +57,11 @@ public:
|
|||||||
UInt64 num_rebalance_revocations;
|
UInt64 num_rebalance_revocations;
|
||||||
KafkaConsumer::ExceptionsBuffer exceptions_buffer;
|
KafkaConsumer::ExceptionsBuffer exceptions_buffer;
|
||||||
bool in_use;
|
bool in_use;
|
||||||
|
UInt64 last_used_usec;
|
||||||
std::string rdkafka_stat;
|
std::string rdkafka_stat;
|
||||||
};
|
};
|
||||||
|
|
||||||
public:
|
|
||||||
KafkaConsumer(
|
KafkaConsumer(
|
||||||
ConsumerPtr consumer_,
|
|
||||||
Poco::Logger * log_,
|
Poco::Logger * log_,
|
||||||
size_t max_batch_size,
|
size_t max_batch_size,
|
||||||
size_t poll_timeout_,
|
size_t poll_timeout_,
|
||||||
@ -72,6 +71,11 @@ public:
|
|||||||
);
|
);
|
||||||
|
|
||||||
~KafkaConsumer();
|
~KafkaConsumer();
|
||||||
|
|
||||||
|
void createConsumer(cppkafka::Configuration consumer_config);
|
||||||
|
bool hasConsumer() const { return consumer.get() != nullptr; }
|
||||||
|
ConsumerPtr && moveConsumer() { return std::move(consumer); }
|
||||||
|
|
||||||
void commit(); // Commit all processed messages.
|
void commit(); // Commit all processed messages.
|
||||||
void subscribe(); // Subscribe internal consumer to topics.
|
void subscribe(); // Subscribe internal consumer to topics.
|
||||||
void unsubscribe(); // Unsubscribe internal consumer in case of failure.
|
void unsubscribe(); // Unsubscribe internal consumer in case of failure.
|
||||||
@ -113,11 +117,20 @@ public:
|
|||||||
rdkafka_stat = stat_json_string;
|
rdkafka_stat = stat_json_string;
|
||||||
}
|
}
|
||||||
void inUse() { in_use = true; }
|
void inUse() { in_use = true; }
|
||||||
void notInUse() { in_use = false; }
|
void notInUse()
|
||||||
|
{
|
||||||
|
in_use = false;
|
||||||
|
last_used_usec = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
|
||||||
|
}
|
||||||
|
|
||||||
// For system.kafka_consumers
|
// For system.kafka_consumers
|
||||||
Stat getStat() const;
|
Stat getStat() const;
|
||||||
|
|
||||||
|
bool isInUse() const { return in_use; }
|
||||||
|
UInt64 getLastUsedUsec() const { return last_used_usec; }
|
||||||
|
|
||||||
|
std::string getMemberId() const;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
using Messages = std::vector<cppkafka::Message>;
|
using Messages = std::vector<cppkafka::Message>;
|
||||||
CurrentMetrics::Increment metric_increment{CurrentMetrics::KafkaConsumers};
|
CurrentMetrics::Increment metric_increment{CurrentMetrics::KafkaConsumers};
|
||||||
@ -168,6 +181,8 @@ private:
|
|||||||
std::atomic<UInt64> num_rebalance_assignments = 0;
|
std::atomic<UInt64> num_rebalance_assignments = 0;
|
||||||
std::atomic<UInt64> num_rebalance_revocations = 0;
|
std::atomic<UInt64> num_rebalance_revocations = 0;
|
||||||
std::atomic<bool> in_use = 0;
|
std::atomic<bool> in_use = 0;
|
||||||
|
/// Last used time (for TTL)
|
||||||
|
std::atomic<UInt64> last_used_usec = 0;
|
||||||
|
|
||||||
mutable std::mutex rdkafka_stat_mutex;
|
mutable std::mutex rdkafka_stat_mutex;
|
||||||
std::string rdkafka_stat;
|
std::string rdkafka_stat;
|
||||||
@ -178,8 +193,6 @@ private:
|
|||||||
/// Return number of messages with an error.
|
/// Return number of messages with an error.
|
||||||
size_t filterMessageErrors();
|
size_t filterMessageErrors();
|
||||||
ReadBufferPtr getNextMessage();
|
ReadBufferPtr getNextMessage();
|
||||||
|
|
||||||
std::string getMemberId() const;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -11,6 +11,7 @@ namespace DB
|
|||||||
namespace ErrorCodes
|
namespace ErrorCodes
|
||||||
{
|
{
|
||||||
extern const int UNKNOWN_SETTING;
|
extern const int UNKNOWN_SETTING;
|
||||||
|
extern const int BAD_ARGUMENTS;
|
||||||
}
|
}
|
||||||
|
|
||||||
IMPLEMENT_SETTINGS_TRAITS(KafkaSettingsTraits, LIST_OF_KAFKA_SETTINGS)
|
IMPLEMENT_SETTINGS_TRAITS(KafkaSettingsTraits, LIST_OF_KAFKA_SETTINGS)
|
||||||
@ -38,4 +39,15 @@ void KafkaSettings::loadFromQuery(ASTStorage & storage_def)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void KafkaSettings::sanityCheck() const
|
||||||
|
{
|
||||||
|
if (kafka_consumers_pool_ttl_ms < KAFKA_RESCHEDULE_MS)
|
||||||
|
throw Exception(ErrorCodes::BAD_ARGUMENTS, "The value of 'kafka_consumers_pool_ttl_ms' ({}) cannot be less then rescheduled interval ({})",
|
||||||
|
kafka_consumers_pool_ttl_ms, KAFKA_RESCHEDULE_MS);
|
||||||
|
|
||||||
|
if (kafka_consumers_pool_ttl_ms > KAFKA_CONSUMERS_POOL_TTL_MS_MAX)
|
||||||
|
throw Exception(ErrorCodes::BAD_ARGUMENTS, "The value of 'kafka_consumers_pool_ttl_ms' ({}) cannot be too big (greater then {}), since this may cause live memory leaks",
|
||||||
|
kafka_consumers_pool_ttl_ms, KAFKA_CONSUMERS_POOL_TTL_MS_MAX);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -8,6 +8,12 @@ namespace DB
|
|||||||
{
|
{
|
||||||
class ASTStorage;
|
class ASTStorage;
|
||||||
|
|
||||||
|
const auto KAFKA_RESCHEDULE_MS = 500;
|
||||||
|
const auto KAFKA_CLEANUP_TIMEOUT_MS = 3000;
|
||||||
|
// once per minute leave do reschedule (we can't lock threads in pool forever)
|
||||||
|
const auto KAFKA_MAX_THREAD_WORK_DURATION_MS = 60000;
|
||||||
|
// 10min
|
||||||
|
const auto KAFKA_CONSUMERS_POOL_TTL_MS_MAX = 600'000;
|
||||||
|
|
||||||
#define KAFKA_RELATED_SETTINGS(M, ALIAS) \
|
#define KAFKA_RELATED_SETTINGS(M, ALIAS) \
|
||||||
M(String, kafka_broker_list, "", "A comma-separated list of brokers for Kafka engine.", 0) \
|
M(String, kafka_broker_list, "", "A comma-separated list of brokers for Kafka engine.", 0) \
|
||||||
@ -25,6 +31,7 @@ class ASTStorage;
|
|||||||
/* default is stream_poll_timeout_ms */ \
|
/* default is stream_poll_timeout_ms */ \
|
||||||
M(Milliseconds, kafka_poll_timeout_ms, 0, "Timeout for single poll from Kafka.", 0) \
|
M(Milliseconds, kafka_poll_timeout_ms, 0, "Timeout for single poll from Kafka.", 0) \
|
||||||
M(UInt64, kafka_poll_max_batch_size, 0, "Maximum amount of messages to be polled in a single Kafka poll.", 0) \
|
M(UInt64, kafka_poll_max_batch_size, 0, "Maximum amount of messages to be polled in a single Kafka poll.", 0) \
|
||||||
|
M(UInt64, kafka_consumers_pool_ttl_ms, 60'000, "TTL for Kafka consumers (in milliseconds)", 0) \
|
||||||
/* default is stream_flush_interval_ms */ \
|
/* default is stream_flush_interval_ms */ \
|
||||||
M(Milliseconds, kafka_flush_interval_ms, 0, "Timeout for flushing data from Kafka.", 0) \
|
M(Milliseconds, kafka_flush_interval_ms, 0, "Timeout for flushing data from Kafka.", 0) \
|
||||||
M(Bool, kafka_thread_per_consumer, false, "Provide independent thread for each consumer", 0) \
|
M(Bool, kafka_thread_per_consumer, false, "Provide independent thread for each consumer", 0) \
|
||||||
@ -53,6 +60,8 @@ DECLARE_SETTINGS_TRAITS(KafkaSettingsTraits, LIST_OF_KAFKA_SETTINGS)
|
|||||||
struct KafkaSettings : public BaseSettings<KafkaSettingsTraits>
|
struct KafkaSettings : public BaseSettings<KafkaSettingsTraits>
|
||||||
{
|
{
|
||||||
void loadFromQuery(ASTStorage & storage_def);
|
void loadFromQuery(ASTStorage & storage_def);
|
||||||
|
|
||||||
|
void sanityCheck() const;
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -27,10 +27,12 @@
|
|||||||
#include <Storages/StorageMaterializedView.h>
|
#include <Storages/StorageMaterializedView.h>
|
||||||
#include <Storages/NamedCollectionsHelpers.h>
|
#include <Storages/NamedCollectionsHelpers.h>
|
||||||
#include <base/getFQDNOrHostName.h>
|
#include <base/getFQDNOrHostName.h>
|
||||||
|
#include <Common/Stopwatch.h>
|
||||||
#include <Common/logger_useful.h>
|
#include <Common/logger_useful.h>
|
||||||
#include <boost/algorithm/string/replace.hpp>
|
#include <boost/algorithm/string/replace.hpp>
|
||||||
#include <boost/algorithm/string/split.hpp>
|
#include <boost/algorithm/string/split.hpp>
|
||||||
#include <boost/algorithm/string/trim.hpp>
|
#include <boost/algorithm/string/trim.hpp>
|
||||||
|
#include <cppkafka/configuration.h>
|
||||||
#include <librdkafka/rdkafka.h>
|
#include <librdkafka/rdkafka.h>
|
||||||
#include <Poco/Util/AbstractConfiguration.h>
|
#include <Poco/Util/AbstractConfiguration.h>
|
||||||
#include <Common/Exception.h>
|
#include <Common/Exception.h>
|
||||||
@ -45,6 +47,7 @@
|
|||||||
#include <Common/config_version.h>
|
#include <Common/config_version.h>
|
||||||
#include <Common/CurrentMetrics.h>
|
#include <Common/CurrentMetrics.h>
|
||||||
#include <Common/ProfileEvents.h>
|
#include <Common/ProfileEvents.h>
|
||||||
|
#include <base/sleep.h>
|
||||||
|
|
||||||
#if USE_KRB5
|
#if USE_KRB5
|
||||||
#include <Access/KerberosInit.h>
|
#include <Access/KerberosInit.h>
|
||||||
@ -76,6 +79,7 @@ namespace ErrorCodes
|
|||||||
extern const int BAD_ARGUMENTS;
|
extern const int BAD_ARGUMENTS;
|
||||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||||
extern const int QUERY_NOT_ALLOWED;
|
extern const int QUERY_NOT_ALLOWED;
|
||||||
|
extern const int ABORTED;
|
||||||
}
|
}
|
||||||
|
|
||||||
struct StorageKafkaInterceptors
|
struct StorageKafkaInterceptors
|
||||||
@ -172,10 +176,6 @@ struct StorageKafkaInterceptors
|
|||||||
|
|
||||||
namespace
|
namespace
|
||||||
{
|
{
|
||||||
const auto RESCHEDULE_MS = 500;
|
|
||||||
const auto CLEANUP_TIMEOUT_MS = 3000;
|
|
||||||
const auto MAX_THREAD_WORK_DURATION_MS = 60000; // once per minute leave do reschedule (we can't lock threads in pool forever)
|
|
||||||
|
|
||||||
const String CONFIG_KAFKA_TAG = "kafka";
|
const String CONFIG_KAFKA_TAG = "kafka";
|
||||||
const String CONFIG_KAFKA_TOPIC_TAG = "kafka_topic";
|
const String CONFIG_KAFKA_TOPIC_TAG = "kafka_topic";
|
||||||
const String CONFIG_NAME_TAG = "name";
|
const String CONFIG_NAME_TAG = "name";
|
||||||
@ -262,17 +262,19 @@ StorageKafka::StorageKafka(
|
|||||||
, schema_name(getContext()->getMacros()->expand(kafka_settings->kafka_schema.value, macros_info))
|
, schema_name(getContext()->getMacros()->expand(kafka_settings->kafka_schema.value, macros_info))
|
||||||
, num_consumers(kafka_settings->kafka_num_consumers.value)
|
, num_consumers(kafka_settings->kafka_num_consumers.value)
|
||||||
, log(&Poco::Logger::get("StorageKafka (" + table_id_.table_name + ")"))
|
, log(&Poco::Logger::get("StorageKafka (" + table_id_.table_name + ")"))
|
||||||
, semaphore(0, static_cast<int>(num_consumers))
|
|
||||||
, intermediate_commit(kafka_settings->kafka_commit_every_batch.value)
|
, intermediate_commit(kafka_settings->kafka_commit_every_batch.value)
|
||||||
, settings_adjustments(createSettingsAdjustments())
|
, settings_adjustments(createSettingsAdjustments())
|
||||||
, thread_per_consumer(kafka_settings->kafka_thread_per_consumer.value)
|
, thread_per_consumer(kafka_settings->kafka_thread_per_consumer.value)
|
||||||
, collection_name(collection_name_)
|
, collection_name(collection_name_)
|
||||||
{
|
{
|
||||||
|
kafka_settings->sanityCheck();
|
||||||
|
|
||||||
if (kafka_settings->kafka_handle_error_mode == StreamingHandleErrorMode::STREAM)
|
if (kafka_settings->kafka_handle_error_mode == StreamingHandleErrorMode::STREAM)
|
||||||
{
|
{
|
||||||
kafka_settings->input_format_allow_errors_num = 0;
|
kafka_settings->input_format_allow_errors_num = 0;
|
||||||
kafka_settings->input_format_allow_errors_ratio = 0;
|
kafka_settings->input_format_allow_errors_ratio = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
StorageInMemoryMetadata storage_metadata;
|
StorageInMemoryMetadata storage_metadata;
|
||||||
storage_metadata.setColumns(columns_);
|
storage_metadata.setColumns(columns_);
|
||||||
setInMemoryMetadata(storage_metadata);
|
setInMemoryMetadata(storage_metadata);
|
||||||
@ -283,6 +285,18 @@ StorageKafka::StorageKafka(
|
|||||||
task->deactivate();
|
task->deactivate();
|
||||||
tasks.emplace_back(std::make_shared<TaskContext>(std::move(task)));
|
tasks.emplace_back(std::make_shared<TaskContext>(std::move(task)));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
consumers.resize(num_consumers);
|
||||||
|
for (size_t i = 0; i < num_consumers; ++i)
|
||||||
|
consumers[i] = createKafkaConsumer(i);
|
||||||
|
|
||||||
|
cleanup_thread = std::make_unique<ThreadFromGlobalPool>([this]()
|
||||||
|
{
|
||||||
|
const auto & table = getStorageID().getTableName();
|
||||||
|
const auto & thread_name = std::string("KfkCln:") + table;
|
||||||
|
setThreadName(thread_name.c_str(), /*truncate=*/ true);
|
||||||
|
cleanConsumers();
|
||||||
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
SettingsChanges StorageKafka::createSettingsAdjustments()
|
SettingsChanges StorageKafka::createSettingsAdjustments()
|
||||||
@ -343,8 +357,8 @@ Pipe StorageKafka::read(
|
|||||||
size_t /* max_block_size */,
|
size_t /* max_block_size */,
|
||||||
size_t /* num_streams */)
|
size_t /* num_streams */)
|
||||||
{
|
{
|
||||||
if (num_created_consumers == 0)
|
if (shutdown_called)
|
||||||
return {};
|
throw Exception(ErrorCodes::ABORTED, "Table is detached");
|
||||||
|
|
||||||
if (!local_context->getSettingsRef().stream_like_engine_allow_direct_select)
|
if (!local_context->getSettingsRef().stream_like_engine_allow_direct_select)
|
||||||
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED,
|
throw Exception(ErrorCodes::QUERY_NOT_ALLOWED,
|
||||||
@ -357,12 +371,12 @@ Pipe StorageKafka::read(
|
|||||||
|
|
||||||
/// Always use all consumers at once, otherwise SELECT may not read messages from all partitions.
|
/// Always use all consumers at once, otherwise SELECT may not read messages from all partitions.
|
||||||
Pipes pipes;
|
Pipes pipes;
|
||||||
pipes.reserve(num_created_consumers);
|
pipes.reserve(num_consumers);
|
||||||
auto modified_context = Context::createCopy(local_context);
|
auto modified_context = Context::createCopy(local_context);
|
||||||
modified_context->applySettingsChanges(settings_adjustments);
|
modified_context->applySettingsChanges(settings_adjustments);
|
||||||
|
|
||||||
// Claim as many consumers as requested, but don't block
|
// Claim as many consumers as requested, but don't block
|
||||||
for (size_t i = 0; i < num_created_consumers; ++i)
|
for (size_t i = 0; i < num_consumers; ++i)
|
||||||
{
|
{
|
||||||
/// Use block size of 1, otherwise LIMIT won't work properly as it will buffer excess messages in the last block
|
/// Use block size of 1, otherwise LIMIT won't work properly as it will buffer excess messages in the last block
|
||||||
/// TODO: probably that leads to awful performance.
|
/// TODO: probably that leads to awful performance.
|
||||||
@ -412,21 +426,6 @@ SinkToStoragePtr StorageKafka::write(const ASTPtr &, const StorageMetadataPtr &
|
|||||||
|
|
||||||
void StorageKafka::startup()
|
void StorageKafka::startup()
|
||||||
{
|
{
|
||||||
for (size_t i = 0; i < num_consumers; ++i)
|
|
||||||
{
|
|
||||||
try
|
|
||||||
{
|
|
||||||
auto consumer = createConsumer(i);
|
|
||||||
pushConsumer(consumer);
|
|
||||||
all_consumers.push_back(consumer);
|
|
||||||
++num_created_consumers;
|
|
||||||
}
|
|
||||||
catch (const cppkafka::Exception &)
|
|
||||||
{
|
|
||||||
tryLogCurrentException(log);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start the reader thread
|
// Start the reader thread
|
||||||
for (auto & task : tasks)
|
for (auto & task : tasks)
|
||||||
{
|
{
|
||||||
@ -437,21 +436,48 @@ void StorageKafka::startup()
|
|||||||
|
|
||||||
void StorageKafka::shutdown(bool)
|
void StorageKafka::shutdown(bool)
|
||||||
{
|
{
|
||||||
|
shutdown_called = true;
|
||||||
|
cleanup_cv.notify_one();
|
||||||
|
|
||||||
|
{
|
||||||
|
LOG_TRACE(log, "Waiting for consumers cleanup thread");
|
||||||
|
Stopwatch watch;
|
||||||
|
if (cleanup_thread)
|
||||||
|
{
|
||||||
|
cleanup_thread->join();
|
||||||
|
cleanup_thread.reset();
|
||||||
|
}
|
||||||
|
LOG_TRACE(log, "Consumers cleanup thread finished in {} ms.", watch.elapsedMilliseconds());
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
LOG_TRACE(log, "Waiting for streaming jobs");
|
||||||
|
Stopwatch watch;
|
||||||
for (auto & task : tasks)
|
for (auto & task : tasks)
|
||||||
{
|
{
|
||||||
// Interrupt streaming thread
|
// Interrupt streaming thread
|
||||||
task->stream_cancelled = true;
|
task->stream_cancelled = true;
|
||||||
|
|
||||||
LOG_TRACE(log, "Waiting for cleanup");
|
LOG_TEST(log, "Waiting for cleanup of a task");
|
||||||
task->holder->deactivate();
|
task->holder->deactivate();
|
||||||
}
|
}
|
||||||
|
LOG_TRACE(log, "Streaming jobs finished in {} ms.", watch.elapsedMilliseconds());
|
||||||
|
}
|
||||||
|
|
||||||
LOG_TRACE(log, "Closing consumers");
|
{
|
||||||
for (size_t i = 0; i < num_created_consumers; ++i)
|
std::lock_guard lock(mutex);
|
||||||
auto consumer = popConsumer();
|
LOG_TRACE(log, "Closing {} consumers", consumers.size());
|
||||||
LOG_TRACE(log, "Consumers closed");
|
Stopwatch watch;
|
||||||
|
consumers.clear();
|
||||||
|
LOG_TRACE(log, "Consumers closed. Took {} ms.", watch.elapsedMilliseconds());
|
||||||
|
}
|
||||||
|
|
||||||
rd_kafka_wait_destroyed(CLEANUP_TIMEOUT_MS);
|
{
|
||||||
|
LOG_TRACE(log, "Waiting for final cleanup");
|
||||||
|
Stopwatch watch;
|
||||||
|
rd_kafka_wait_destroyed(KAFKA_CLEANUP_TIMEOUT_MS);
|
||||||
|
LOG_TRACE(log, "Final cleanup finished in {} ms (timeout {} ms).", watch.elapsedMilliseconds(), KAFKA_CLEANUP_TIMEOUT_MS);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -459,8 +485,7 @@ void StorageKafka::pushConsumer(KafkaConsumerPtr consumer)
|
|||||||
{
|
{
|
||||||
std::lock_guard lock(mutex);
|
std::lock_guard lock(mutex);
|
||||||
consumer->notInUse();
|
consumer->notInUse();
|
||||||
consumers.push_back(consumer);
|
cv.notify_one();
|
||||||
semaphore.set();
|
|
||||||
CurrentMetrics::sub(CurrentMetrics::KafkaConsumersInUse, 1);
|
CurrentMetrics::sub(CurrentMetrics::KafkaConsumersInUse, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -473,26 +498,88 @@ KafkaConsumerPtr StorageKafka::popConsumer()
|
|||||||
|
|
||||||
KafkaConsumerPtr StorageKafka::popConsumer(std::chrono::milliseconds timeout)
|
KafkaConsumerPtr StorageKafka::popConsumer(std::chrono::milliseconds timeout)
|
||||||
{
|
{
|
||||||
// Wait for the first free buffer
|
std::unique_lock lock(mutex);
|
||||||
if (timeout == std::chrono::milliseconds::zero())
|
|
||||||
semaphore.wait();
|
KafkaConsumerPtr ret_consumer_ptr;
|
||||||
else
|
std::optional<size_t> closed_consumer_index;
|
||||||
|
for (size_t i = 0; i < consumers.size(); ++i)
|
||||||
{
|
{
|
||||||
if (!semaphore.tryWait(timeout.count()))
|
auto & consumer_ptr = consumers[i];
|
||||||
return nullptr;
|
|
||||||
|
if (consumer_ptr->isInUse())
|
||||||
|
continue;
|
||||||
|
|
||||||
|
if (consumer_ptr->hasConsumer())
|
||||||
|
{
|
||||||
|
ret_consumer_ptr = consumer_ptr;
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Take the first available buffer from the list
|
if (!closed_consumer_index.has_value() && !consumer_ptr->hasConsumer())
|
||||||
std::lock_guard lock(mutex);
|
{
|
||||||
auto consumer = consumers.back();
|
closed_consumer_index = i;
|
||||||
consumers.pop_back();
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// 1. There is consumer available - return it.
|
||||||
|
if (ret_consumer_ptr)
|
||||||
|
{
|
||||||
|
/// Noop
|
||||||
|
}
|
||||||
|
/// 2. There is no consumer, but we can create a new one.
|
||||||
|
else if (!ret_consumer_ptr && closed_consumer_index.has_value())
|
||||||
|
{
|
||||||
|
ret_consumer_ptr = consumers[*closed_consumer_index];
|
||||||
|
|
||||||
|
cppkafka::Configuration consumer_config = getConsumerConfiguration(*closed_consumer_index);
|
||||||
|
/// It should be OK to create consumer under lock, since it should be fast (without subscribing).
|
||||||
|
ret_consumer_ptr->createConsumer(consumer_config);
|
||||||
|
LOG_TRACE(log, "Created #{} consumer", *closed_consumer_index);
|
||||||
|
}
|
||||||
|
/// 3. There is no free consumer and num_consumers already created, waiting @timeout.
|
||||||
|
else
|
||||||
|
{
|
||||||
|
cv.wait_for(lock, timeout, [&]()
|
||||||
|
{
|
||||||
|
/// Note we are waiting only opened, free, consumers, since consumer cannot be closed right now
|
||||||
|
auto it = std::find_if(consumers.begin(), consumers.end(), [](const auto & ptr)
|
||||||
|
{
|
||||||
|
return !ptr->isInUse() && ptr->hasConsumer();
|
||||||
|
});
|
||||||
|
if (it != consumers.end())
|
||||||
|
{
|
||||||
|
ret_consumer_ptr = *it;
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ret_consumer_ptr)
|
||||||
|
{
|
||||||
CurrentMetrics::add(CurrentMetrics::KafkaConsumersInUse, 1);
|
CurrentMetrics::add(CurrentMetrics::KafkaConsumersInUse, 1);
|
||||||
consumer->inUse();
|
ret_consumer_ptr->inUse();
|
||||||
return consumer;
|
}
|
||||||
|
return ret_consumer_ptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
KafkaConsumerPtr StorageKafka::createConsumer(size_t consumer_number)
|
KafkaConsumerPtr StorageKafka::createKafkaConsumer(size_t consumer_number)
|
||||||
|
{
|
||||||
|
/// NOTE: we pass |stream_cancelled| by reference here, so the buffers should not outlive the storage.
|
||||||
|
auto & stream_cancelled = thread_per_consumer ? tasks[consumer_number]->stream_cancelled : tasks.back()->stream_cancelled;
|
||||||
|
|
||||||
|
KafkaConsumerPtr kafka_consumer_ptr = std::make_shared<KafkaConsumer>(
|
||||||
|
log,
|
||||||
|
getPollMaxBatchSize(),
|
||||||
|
getPollTimeoutMillisecond(),
|
||||||
|
intermediate_commit,
|
||||||
|
stream_cancelled,
|
||||||
|
topics);
|
||||||
|
return kafka_consumer_ptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
cppkafka::Configuration StorageKafka::getConsumerConfiguration(size_t consumer_number)
|
||||||
{
|
{
|
||||||
cppkafka::Configuration conf;
|
cppkafka::Configuration conf;
|
||||||
|
|
||||||
@ -517,35 +604,66 @@ KafkaConsumerPtr StorageKafka::createConsumer(size_t consumer_number)
|
|||||||
size_t max_allowed_queued_min_messages = 10000000; // must be less than or equal to max allowed value
|
size_t max_allowed_queued_min_messages = 10000000; // must be less than or equal to max allowed value
|
||||||
conf.set("queued.min.messages", std::min(std::max(getMaxBlockSize(), default_queued_min_messages), max_allowed_queued_min_messages));
|
conf.set("queued.min.messages", std::min(std::max(getMaxBlockSize(), default_queued_min_messages), max_allowed_queued_min_messages));
|
||||||
|
|
||||||
/// a reference to the consumer is needed in statistic callback
|
updateConfiguration(conf);
|
||||||
/// although the consumer does not exist when callback is being registered
|
|
||||||
/// shared_ptr<weak_ptr<KafkaConsumer>> comes to the rescue
|
|
||||||
auto consumer_weak_ptr_ptr = std::make_shared<KafkaConsumerWeakPtr>();
|
|
||||||
updateConfiguration(conf, consumer_weak_ptr_ptr);
|
|
||||||
|
|
||||||
// those settings should not be changed by users.
|
// those settings should not be changed by users.
|
||||||
conf.set("enable.auto.commit", "false"); // We manually commit offsets after a stream successfully finished
|
conf.set("enable.auto.commit", "false"); // We manually commit offsets after a stream successfully finished
|
||||||
conf.set("enable.auto.offset.store", "false"); // Update offset automatically - to commit them all at once.
|
conf.set("enable.auto.offset.store", "false"); // Update offset automatically - to commit them all at once.
|
||||||
conf.set("enable.partition.eof", "false"); // Ignore EOF messages
|
conf.set("enable.partition.eof", "false"); // Ignore EOF messages
|
||||||
|
|
||||||
// Create a consumer and subscribe to topics
|
return conf;
|
||||||
auto consumer_impl = std::make_shared<cppkafka::Consumer>(conf);
|
}
|
||||||
consumer_impl->set_destroy_flags(RD_KAFKA_DESTROY_F_NO_CONSUMER_CLOSE);
|
|
||||||
|
|
||||||
KafkaConsumerPtr kafka_consumer_ptr;
|
void StorageKafka::cleanConsumers()
|
||||||
|
{
|
||||||
|
UInt64 ttl_usec = kafka_settings->kafka_consumers_pool_ttl_ms * 1'000;
|
||||||
|
|
||||||
/// NOTE: we pass |stream_cancelled| by reference here, so the buffers should not outlive the storage.
|
std::unique_lock lock(mutex);
|
||||||
if (thread_per_consumer)
|
std::chrono::milliseconds timeout(KAFKA_RESCHEDULE_MS);
|
||||||
|
while (!cleanup_cv.wait_for(lock, timeout, [this]() { return shutdown_called == true; }))
|
||||||
{
|
{
|
||||||
auto& stream_cancelled = tasks[consumer_number]->stream_cancelled;
|
/// Copy consumers for closing to a new vector to close them without a lock
|
||||||
kafka_consumer_ptr = std::make_shared<KafkaConsumer>(consumer_impl, log, getPollMaxBatchSize(), getPollTimeoutMillisecond(), intermediate_commit, stream_cancelled, topics);
|
std::vector<ConsumerPtr> consumers_to_close;
|
||||||
}
|
|
||||||
else
|
UInt64 now_usec = std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
|
||||||
{
|
{
|
||||||
kafka_consumer_ptr = std::make_shared<KafkaConsumer>(consumer_impl, log, getPollMaxBatchSize(), getPollTimeoutMillisecond(), intermediate_commit, tasks.back()->stream_cancelled, topics);
|
for (size_t i = 0; i < consumers.size(); ++i)
|
||||||
|
{
|
||||||
|
auto & consumer_ptr = consumers[i];
|
||||||
|
|
||||||
|
UInt64 consumer_last_used_usec = consumer_ptr->getLastUsedUsec();
|
||||||
|
chassert(consumer_last_used_usec <= now_usec);
|
||||||
|
|
||||||
|
if (!consumer_ptr->hasConsumer())
|
||||||
|
continue;
|
||||||
|
if (consumer_ptr->isInUse())
|
||||||
|
continue;
|
||||||
|
|
||||||
|
if (now_usec - consumer_last_used_usec > ttl_usec)
|
||||||
|
{
|
||||||
|
LOG_TRACE(log, "Closing #{} consumer (id: {})", i, consumer_ptr->getMemberId());
|
||||||
|
consumers_to_close.push_back(consumer_ptr->moveConsumer());
|
||||||
}
|
}
|
||||||
*consumer_weak_ptr_ptr = kafka_consumer_ptr;
|
}
|
||||||
return kafka_consumer_ptr;
|
}
|
||||||
|
|
||||||
|
if (!consumers_to_close.empty())
|
||||||
|
{
|
||||||
|
lock.unlock();
|
||||||
|
|
||||||
|
Stopwatch watch;
|
||||||
|
size_t closed = consumers_to_close.size();
|
||||||
|
consumers_to_close.clear();
|
||||||
|
LOG_TRACE(log, "{} consumers had been closed (due to {} usec timeout). Took {} ms.",
|
||||||
|
closed, ttl_usec, watch.elapsedMilliseconds());
|
||||||
|
|
||||||
|
lock.lock();
|
||||||
|
}
|
||||||
|
|
||||||
|
ttl_usec = kafka_settings->kafka_consumers_pool_ttl_ms * 1'000;
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG_TRACE(log, "Consumers cleanup thread finished");
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t StorageKafka::getMaxBlockSize() const
|
size_t StorageKafka::getMaxBlockSize() const
|
||||||
@ -578,8 +696,7 @@ String StorageKafka::getConfigPrefix() const
|
|||||||
return CONFIG_KAFKA_TAG;
|
return CONFIG_KAFKA_TAG;
|
||||||
}
|
}
|
||||||
|
|
||||||
void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config,
|
void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config)
|
||||||
std::shared_ptr<KafkaConsumerWeakPtr> kafka_consumer_weak_ptr_ptr)
|
|
||||||
{
|
{
|
||||||
// Update consumer configuration from the configuration. Example:
|
// Update consumer configuration from the configuration. Example:
|
||||||
// <kafka>
|
// <kafka>
|
||||||
@ -659,33 +776,12 @@ void StorageKafka::updateConfiguration(cppkafka::Configuration & kafka_config,
|
|||||||
LOG_IMPL(log, client_logs_level, poco_level, "[rdk:{}] {}", facility, message);
|
LOG_IMPL(log, client_logs_level, poco_level, "[rdk:{}] {}", facility, message);
|
||||||
});
|
});
|
||||||
|
|
||||||
if (kafka_consumer_weak_ptr_ptr)
|
|
||||||
{
|
|
||||||
/// NOTE: statistics should be consumed, otherwise it creates too much
|
/// NOTE: statistics should be consumed, otherwise it creates too much
|
||||||
/// entries in the queue, that leads to memory leak and slow shutdown.
|
/// entries in the queue, that leads to memory leak and slow shutdown.
|
||||||
///
|
|
||||||
/// This is the case when you have kafka table but no SELECT from it or
|
|
||||||
/// materialized view attached.
|
|
||||||
///
|
|
||||||
/// So for now it is disabled by default, until properly fixed.
|
|
||||||
#if 0
|
|
||||||
if (!config.has(config_prefix + "." + "statistics_interval_ms"))
|
if (!config.has(config_prefix + "." + "statistics_interval_ms"))
|
||||||
{
|
{
|
||||||
kafka_config.set("statistics.interval.ms", "3000"); // every 3 seconds by default. set to 0 to disable.
|
// every 3 seconds by default. set to 0 to disable.
|
||||||
}
|
kafka_config.set("statistics.interval.ms", "3000");
|
||||||
#endif
|
|
||||||
|
|
||||||
if (kafka_config.get("statistics.interval.ms") != "0")
|
|
||||||
{
|
|
||||||
kafka_config.set_stats_callback([kafka_consumer_weak_ptr_ptr](cppkafka::KafkaHandleBase &, const std::string & stat_json_string)
|
|
||||||
{
|
|
||||||
auto kafka_consumer_ptr = kafka_consumer_weak_ptr_ptr->lock();
|
|
||||||
if (kafka_consumer_ptr)
|
|
||||||
{
|
|
||||||
kafka_consumer_ptr->setRDKafkaStat(stat_json_string);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Configure interceptor to change thread name
|
// Configure interceptor to change thread name
|
||||||
@ -756,7 +852,7 @@ void StorageKafka::threadFunc(size_t idx)
|
|||||||
mv_attached.store(true);
|
mv_attached.store(true);
|
||||||
|
|
||||||
// Keep streaming as long as there are attached views and streaming is not cancelled
|
// Keep streaming as long as there are attached views and streaming is not cancelled
|
||||||
while (!task->stream_cancelled && num_created_consumers > 0)
|
while (!task->stream_cancelled)
|
||||||
{
|
{
|
||||||
if (!checkDependencies(table_id))
|
if (!checkDependencies(table_id))
|
||||||
break;
|
break;
|
||||||
@ -773,7 +869,7 @@ void StorageKafka::threadFunc(size_t idx)
|
|||||||
|
|
||||||
auto ts = std::chrono::steady_clock::now();
|
auto ts = std::chrono::steady_clock::now();
|
||||||
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(ts-start_time);
|
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(ts-start_time);
|
||||||
if (duration.count() > MAX_THREAD_WORK_DURATION_MS)
|
if (duration.count() > KAFKA_MAX_THREAD_WORK_DURATION_MS)
|
||||||
{
|
{
|
||||||
LOG_TRACE(log, "Thread work duration limit exceeded. Reschedule.");
|
LOG_TRACE(log, "Thread work duration limit exceeded. Reschedule.");
|
||||||
break;
|
break;
|
||||||
@ -793,21 +889,18 @@ void StorageKafka::threadFunc(size_t idx)
|
|||||||
LOG_ERROR(log, "{} {}", __PRETTY_FUNCTION__, exception_str);
|
LOG_ERROR(log, "{} {}", __PRETTY_FUNCTION__, exception_str);
|
||||||
|
|
||||||
auto safe_consumers = getSafeConsumers();
|
auto safe_consumers = getSafeConsumers();
|
||||||
for (auto const & consumer_ptr_weak : safe_consumers.consumers)
|
for (auto const & consumer_ptr : safe_consumers.consumers)
|
||||||
{
|
{
|
||||||
/// propagate materialized view exception to all consumers
|
/// propagate materialized view exception to all consumers
|
||||||
if (auto consumer_ptr = consumer_ptr_weak.lock())
|
|
||||||
{
|
|
||||||
consumer_ptr->setExceptionInfo(exception_str, false /* no stacktrace, reuse passed one */);
|
consumer_ptr->setExceptionInfo(exception_str, false /* no stacktrace, reuse passed one */);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
mv_attached.store(false);
|
mv_attached.store(false);
|
||||||
|
|
||||||
// Wait for attached views
|
// Wait for attached views
|
||||||
if (!task->stream_cancelled)
|
if (!task->stream_cancelled)
|
||||||
task->holder->scheduleAfter(RESCHEDULE_MS);
|
task->holder->scheduleAfter(KAFKA_RESCHEDULE_MS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -844,7 +937,7 @@ bool StorageKafka::streamToViews()
|
|||||||
std::vector<std::shared_ptr<KafkaSource>> sources;
|
std::vector<std::shared_ptr<KafkaSource>> sources;
|
||||||
Pipes pipes;
|
Pipes pipes;
|
||||||
|
|
||||||
auto stream_count = thread_per_consumer ? 1 : num_created_consumers;
|
auto stream_count = thread_per_consumer ? 1 : num_consumers;
|
||||||
sources.reserve(stream_count);
|
sources.reserve(stream_count);
|
||||||
pipes.reserve(stream_count);
|
pipes.reserve(stream_count);
|
||||||
for (size_t i = 0; i < stream_count; ++i)
|
for (size_t i = 0; i < stream_count; ++i)
|
||||||
|
@ -1,5 +1,6 @@
|
|||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
|
#include <Common/ThreadPool_fwd.h>
|
||||||
#include <Common/Macros.h>
|
#include <Common/Macros.h>
|
||||||
#include <Core/BackgroundSchedulePool.h>
|
#include <Core/BackgroundSchedulePool.h>
|
||||||
#include <Storages/IStorage.h>
|
#include <Storages/IStorage.h>
|
||||||
@ -9,16 +10,11 @@
|
|||||||
|
|
||||||
#include <Poco/Semaphore.h>
|
#include <Poco/Semaphore.h>
|
||||||
|
|
||||||
|
#include <condition_variable>
|
||||||
#include <mutex>
|
#include <mutex>
|
||||||
#include <list>
|
#include <list>
|
||||||
#include <atomic>
|
#include <atomic>
|
||||||
|
#include <cppkafka/cppkafka.h>
|
||||||
namespace cppkafka
|
|
||||||
{
|
|
||||||
|
|
||||||
class Configuration;
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
@ -28,7 +24,7 @@ class StorageSystemKafkaConsumers;
|
|||||||
struct StorageKafkaInterceptors;
|
struct StorageKafkaInterceptors;
|
||||||
|
|
||||||
using KafkaConsumerPtr = std::shared_ptr<KafkaConsumer>;
|
using KafkaConsumerPtr = std::shared_ptr<KafkaConsumer>;
|
||||||
using KafkaConsumerWeakPtr = std::weak_ptr<KafkaConsumer>;
|
using ConsumerPtr = std::shared_ptr<cppkafka::Consumer>;
|
||||||
|
|
||||||
/** Implements a Kafka queue table engine that can be used as a persistent queue / buffer,
|
/** Implements a Kafka queue table engine that can be used as a persistent queue / buffer,
|
||||||
* or as a basic building block for creating pipelines with a continuous insertion / ETL.
|
* or as a basic building block for creating pipelines with a continuous insertion / ETL.
|
||||||
@ -84,10 +80,10 @@ public:
|
|||||||
{
|
{
|
||||||
std::shared_ptr<IStorage> storage_ptr;
|
std::shared_ptr<IStorage> storage_ptr;
|
||||||
std::unique_lock<std::mutex> lock;
|
std::unique_lock<std::mutex> lock;
|
||||||
std::vector<KafkaConsumerWeakPtr> & consumers;
|
std::vector<KafkaConsumerPtr> & consumers;
|
||||||
};
|
};
|
||||||
|
|
||||||
SafeConsumers getSafeConsumers() { return {shared_from_this(), std::unique_lock(mutex), all_consumers}; }
|
SafeConsumers getSafeConsumers() { return {shared_from_this(), std::unique_lock(mutex), consumers}; }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
// Configuration and state
|
// Configuration and state
|
||||||
@ -102,20 +98,16 @@ private:
|
|||||||
const String schema_name;
|
const String schema_name;
|
||||||
const size_t num_consumers; /// total number of consumers
|
const size_t num_consumers; /// total number of consumers
|
||||||
Poco::Logger * log;
|
Poco::Logger * log;
|
||||||
Poco::Semaphore semaphore;
|
|
||||||
const bool intermediate_commit;
|
const bool intermediate_commit;
|
||||||
const SettingsChanges settings_adjustments;
|
const SettingsChanges settings_adjustments;
|
||||||
|
|
||||||
std::atomic<bool> mv_attached = false;
|
std::atomic<bool> mv_attached = false;
|
||||||
|
|
||||||
/// Can differ from num_consumers in case of exception in startup() (or if startup() hasn't been called).
|
std::vector<KafkaConsumerPtr> consumers;
|
||||||
/// In this case we still need to be able to shutdown() properly.
|
|
||||||
size_t num_created_consumers = 0; /// number of actually created consumers.
|
|
||||||
|
|
||||||
std::vector<KafkaConsumerPtr> consumers; /// available consumers
|
|
||||||
std::vector<KafkaConsumerWeakPtr> all_consumers; /// busy (belong to a KafkaSource) and vacant consumers
|
|
||||||
|
|
||||||
std::mutex mutex;
|
std::mutex mutex;
|
||||||
|
std::condition_variable cv;
|
||||||
|
std::condition_variable cleanup_cv;
|
||||||
|
|
||||||
// Stream thread
|
// Stream thread
|
||||||
struct TaskContext
|
struct TaskContext
|
||||||
@ -129,12 +121,17 @@ private:
|
|||||||
std::vector<std::shared_ptr<TaskContext>> tasks;
|
std::vector<std::shared_ptr<TaskContext>> tasks;
|
||||||
bool thread_per_consumer = false;
|
bool thread_per_consumer = false;
|
||||||
|
|
||||||
|
std::unique_ptr<ThreadFromGlobalPool> cleanup_thread;
|
||||||
|
|
||||||
/// For memory accounting in the librdkafka threads.
|
/// For memory accounting in the librdkafka threads.
|
||||||
std::mutex thread_statuses_mutex;
|
std::mutex thread_statuses_mutex;
|
||||||
std::list<std::shared_ptr<ThreadStatus>> thread_statuses;
|
std::list<std::shared_ptr<ThreadStatus>> thread_statuses;
|
||||||
|
|
||||||
SettingsChanges createSettingsAdjustments();
|
SettingsChanges createSettingsAdjustments();
|
||||||
KafkaConsumerPtr createConsumer(size_t consumer_number);
|
/// Creates KafkaConsumer object without real consumer (cppkafka::Consumer)
|
||||||
|
KafkaConsumerPtr createKafkaConsumer(size_t consumer_number);
|
||||||
|
/// Returns consumer configuration with all changes that had been overwritten in config
|
||||||
|
cppkafka::Configuration getConsumerConfiguration(size_t consumer_number);
|
||||||
|
|
||||||
/// If named_collection is specified.
|
/// If named_collection is specified.
|
||||||
String collection_name;
|
String collection_name;
|
||||||
@ -142,11 +139,7 @@ private:
|
|||||||
std::atomic<bool> shutdown_called = false;
|
std::atomic<bool> shutdown_called = false;
|
||||||
|
|
||||||
// Update Kafka configuration with values from CH user configuration.
|
// Update Kafka configuration with values from CH user configuration.
|
||||||
void updateConfiguration(cppkafka::Configuration & kafka_config, std::shared_ptr<KafkaConsumerWeakPtr>);
|
void updateConfiguration(cppkafka::Configuration & kafka_config);
|
||||||
void updateConfiguration(cppkafka::Configuration & kafka_config)
|
|
||||||
{
|
|
||||||
updateConfiguration(kafka_config, std::make_shared<KafkaConsumerWeakPtr>());
|
|
||||||
}
|
|
||||||
|
|
||||||
String getConfigPrefix() const;
|
String getConfigPrefix() const;
|
||||||
void threadFunc(size_t idx);
|
void threadFunc(size_t idx);
|
||||||
@ -161,6 +154,7 @@ private:
|
|||||||
bool streamToViews();
|
bool streamToViews();
|
||||||
bool checkDependencies(const StorageID & table_id);
|
bool checkDependencies(const StorageID & table_id);
|
||||||
|
|
||||||
|
void cleanConsumers();
|
||||||
};
|
};
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -41,6 +41,7 @@ NamesAndTypesList StorageSystemKafkaConsumers::getNamesAndTypes()
|
|||||||
{"num_rebalance_revocations", std::make_shared<DataTypeUInt64>()},
|
{"num_rebalance_revocations", std::make_shared<DataTypeUInt64>()},
|
||||||
{"num_rebalance_assignments", std::make_shared<DataTypeUInt64>()},
|
{"num_rebalance_assignments", std::make_shared<DataTypeUInt64>()},
|
||||||
{"is_currently_used", std::make_shared<DataTypeUInt8>()},
|
{"is_currently_used", std::make_shared<DataTypeUInt8>()},
|
||||||
|
{"last_used", std::make_shared<DataTypeDateTime64>(6)},
|
||||||
{"rdkafka_stat", std::make_shared<DataTypeString>()},
|
{"rdkafka_stat", std::make_shared<DataTypeString>()},
|
||||||
};
|
};
|
||||||
return names_and_types;
|
return names_and_types;
|
||||||
@ -78,6 +79,7 @@ void StorageSystemKafkaConsumers::fillData(MutableColumns & res_columns, Context
|
|||||||
auto & num_rebalance_revocations = assert_cast<ColumnUInt64 &>(*res_columns[index++]);
|
auto & num_rebalance_revocations = assert_cast<ColumnUInt64 &>(*res_columns[index++]);
|
||||||
auto & num_rebalance_assigments = assert_cast<ColumnUInt64 &>(*res_columns[index++]);
|
auto & num_rebalance_assigments = assert_cast<ColumnUInt64 &>(*res_columns[index++]);
|
||||||
auto & is_currently_used = assert_cast<ColumnUInt8 &>(*res_columns[index++]);
|
auto & is_currently_used = assert_cast<ColumnUInt8 &>(*res_columns[index++]);
|
||||||
|
auto & last_used = assert_cast<ColumnDateTime64 &>(*res_columns[index++]);
|
||||||
auto & rdkafka_stat = assert_cast<ColumnString &>(*res_columns[index++]);
|
auto & rdkafka_stat = assert_cast<ColumnString &>(*res_columns[index++]);
|
||||||
|
|
||||||
const auto access = context->getAccess();
|
const auto access = context->getAccess();
|
||||||
@ -96,9 +98,7 @@ void StorageSystemKafkaConsumers::fillData(MutableColumns & res_columns, Context
|
|||||||
|
|
||||||
auto safe_consumers = storage_kafka_ptr->getSafeConsumers();
|
auto safe_consumers = storage_kafka_ptr->getSafeConsumers();
|
||||||
|
|
||||||
for (const auto & weak_consumer : safe_consumers.consumers)
|
for (const auto & consumer : safe_consumers.consumers)
|
||||||
{
|
|
||||||
if (auto consumer = weak_consumer.lock())
|
|
||||||
{
|
{
|
||||||
auto consumer_stat = consumer->getStat();
|
auto consumer_stat = consumer->getStat();
|
||||||
|
|
||||||
@ -144,10 +144,10 @@ void StorageSystemKafkaConsumers::fillData(MutableColumns & res_columns, Context
|
|||||||
num_rebalance_assigments.insert(consumer_stat.num_rebalance_assignments);
|
num_rebalance_assigments.insert(consumer_stat.num_rebalance_assignments);
|
||||||
|
|
||||||
is_currently_used.insert(consumer_stat.in_use);
|
is_currently_used.insert(consumer_stat.in_use);
|
||||||
|
last_used.insert(consumer_stat.last_used_usec);
|
||||||
|
|
||||||
rdkafka_stat.insertData(consumer_stat.rdkafka_stat.data(), consumer_stat.rdkafka_stat.size());
|
rdkafka_stat.insertData(consumer_stat.rdkafka_stat.data(), consumer_stat.rdkafka_stat.size());
|
||||||
}
|
}
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
const bool show_tables_granted = access->isGranted(AccessType::SHOW_TABLES);
|
const bool show_tables_granted = access->isGranted(AccessType::SHOW_TABLES);
|
||||||
|
Loading…
Reference in New Issue
Block a user