mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-19 16:20:50 +00:00
kafka_dead_letter_queue: debug logging removed
This commit is contained in:
parent
7fc7293214
commit
e411d7f50f
@ -49,7 +49,6 @@ KafkaSource::KafkaSource(
|
|||||||
, virtual_header(storage.getVirtualsHeader())
|
, virtual_header(storage.getVirtualsHeader())
|
||||||
, handle_error_mode(storage.getHandleErrorMode())
|
, handle_error_mode(storage.getHandleErrorMode())
|
||||||
{
|
{
|
||||||
LOG_DEBUG(&Poco::Logger::get("KafkaSource"), "ctor");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
KafkaSource::~KafkaSource()
|
KafkaSource::~KafkaSource()
|
||||||
@ -78,7 +77,6 @@ bool KafkaSource::checkTimeLimit() const
|
|||||||
|
|
||||||
Chunk KafkaSource::generateImpl()
|
Chunk KafkaSource::generateImpl()
|
||||||
{
|
{
|
||||||
LOG_DEBUG(&Poco::Logger::get("KafkaSource"), "actual top");
|
|
||||||
if (!consumer)
|
if (!consumer)
|
||||||
{
|
{
|
||||||
auto timeout = std::chrono::milliseconds(context->getSettingsRef().kafka_max_wait_ms.totalMilliseconds());
|
auto timeout = std::chrono::milliseconds(context->getSettingsRef().kafka_max_wait_ms.totalMilliseconds());
|
||||||
@ -95,7 +93,6 @@ Chunk KafkaSource::generateImpl()
|
|||||||
if (is_finished)
|
if (is_finished)
|
||||||
return {};
|
return {};
|
||||||
|
|
||||||
LOG_DEBUG(&Poco::Logger::get("KafkaSource"), "generateImpl: top");
|
|
||||||
is_finished = true;
|
is_finished = true;
|
||||||
// now it's one-time usage InputStream
|
// now it's one-time usage InputStream
|
||||||
// one block of the needed size (or with desired flush timeout) is formed in one internal iteration
|
// one block of the needed size (or with desired flush timeout) is formed in one internal iteration
|
||||||
@ -149,7 +146,6 @@ Chunk KafkaSource::generateImpl()
|
|||||||
|
|
||||||
while (true)
|
while (true)
|
||||||
{
|
{
|
||||||
LOG_DEBUG(&Poco::Logger::get("KafkaSource"), "generateImpl: top of while");
|
|
||||||
size_t new_rows = 0;
|
size_t new_rows = 0;
|
||||||
exception_message.reset();
|
exception_message.reset();
|
||||||
if (auto buf = consumer->consume())
|
if (auto buf = consumer->consume())
|
||||||
@ -169,7 +165,6 @@ Chunk KafkaSource::generateImpl()
|
|||||||
|
|
||||||
ProfileEvents::increment(ProfileEvents::KafkaRowsRead, new_rows);
|
ProfileEvents::increment(ProfileEvents::KafkaRowsRead, new_rows);
|
||||||
|
|
||||||
LOG_DEBUG(&Poco::Logger::get("KafkaSource"), "generateImpl: new_rows");
|
|
||||||
consumer->storeLastReadMessageOffset();
|
consumer->storeLastReadMessageOffset();
|
||||||
|
|
||||||
auto topic = consumer->currentTopic();
|
auto topic = consumer->currentTopic();
|
||||||
@ -195,7 +190,6 @@ Chunk KafkaSource::generateImpl()
|
|||||||
|
|
||||||
for (size_t i = 0; i < new_rows; ++i)
|
for (size_t i = 0; i < new_rows; ++i)
|
||||||
{
|
{
|
||||||
LOG_DEBUG(&Poco::Logger::get("KafkaSource"), "generateImpl: top of for");
|
|
||||||
virtual_columns[0]->insert(topic);
|
virtual_columns[0]->insert(topic);
|
||||||
virtual_columns[1]->insert(key);
|
virtual_columns[1]->insert(key);
|
||||||
virtual_columns[2]->insert(offset);
|
virtual_columns[2]->insert(offset);
|
||||||
@ -229,7 +223,6 @@ Chunk KafkaSource::generateImpl()
|
|||||||
}
|
}
|
||||||
else if (handle_error_mode == ExtStreamingHandleErrorMode::DEAD_LETTER_QUEUE)
|
else if (handle_error_mode == ExtStreamingHandleErrorMode::DEAD_LETTER_QUEUE)
|
||||||
{
|
{
|
||||||
LOG_DEBUG(&Poco::Logger::get("KafkaSource"), "generateImpl: DEAD_LETTER_QUEUE");
|
|
||||||
if (exception_message)
|
if (exception_message)
|
||||||
{
|
{
|
||||||
|
|
||||||
@ -237,7 +230,6 @@ Chunk KafkaSource::generateImpl()
|
|||||||
auto storage_id = storage.getStorageID();
|
auto storage_id = storage.getStorageID();
|
||||||
|
|
||||||
auto dead_letter_queue = context->getDeadLetterQueue();
|
auto dead_letter_queue = context->getDeadLetterQueue();
|
||||||
LOG_DEBUG(&Poco::Logger::get("KafkaSource"), "generateImpl: calling dead_letter_queue->add");
|
|
||||||
dead_letter_queue->add(
|
dead_letter_queue->add(
|
||||||
DeadLetterQueueElement{
|
DeadLetterQueueElement{
|
||||||
.stream_type = DeadLetterQueueElement::StreamType::Kafka,
|
.stream_type = DeadLetterQueueElement::StreamType::Kafka,
|
||||||
|
@ -927,8 +927,6 @@ StorageKafka2::PolledBatchInfo StorageKafka2::pollConsumer(
|
|||||||
virtual_columns[6]->insert(headers_names);
|
virtual_columns[6]->insert(headers_names);
|
||||||
virtual_columns[7]->insert(headers_values);
|
virtual_columns[7]->insert(headers_values);
|
||||||
|
|
||||||
LOG_DEBUG(&Poco::Logger::get("StorageKafka2"), "pollConsumer");
|
|
||||||
|
|
||||||
if (getHandleKafkaErrorMode() == ExtStreamingHandleErrorMode::STREAM)
|
if (getHandleKafkaErrorMode() == ExtStreamingHandleErrorMode::STREAM)
|
||||||
{
|
{
|
||||||
if (exception_message)
|
if (exception_message)
|
||||||
@ -944,14 +942,12 @@ StorageKafka2::PolledBatchInfo StorageKafka2::pollConsumer(
|
|||||||
}
|
}
|
||||||
else if (getHandleKafkaErrorMode() == ExtStreamingHandleErrorMode::DEAD_LETTER_QUEUE)
|
else if (getHandleKafkaErrorMode() == ExtStreamingHandleErrorMode::DEAD_LETTER_QUEUE)
|
||||||
{
|
{
|
||||||
LOG_DEBUG(&Poco::Logger::get("KafkaSource"), "generateImpl: DEAD_LETTER_QUEUE");
|
|
||||||
if (exception_message)
|
if (exception_message)
|
||||||
{
|
{
|
||||||
|
|
||||||
const auto time_now = std::chrono::system_clock::now();
|
const auto time_now = std::chrono::system_clock::now();
|
||||||
|
|
||||||
auto dead_letter_queue = getContext()->getDeadLetterQueue();
|
auto dead_letter_queue = getContext()->getDeadLetterQueue();
|
||||||
LOG_DEBUG(&Poco::Logger::get("KafkaSource"), "generateImpl: calling dead_letter_queue->add");
|
|
||||||
dead_letter_queue->add(
|
dead_letter_queue->add(
|
||||||
DeadLetterQueueElement{
|
DeadLetterQueueElement{
|
||||||
.stream_type = DeadLetterQueueElement::StreamType::Kafka,
|
.stream_type = DeadLetterQueueElement::StreamType::Kafka,
|
||||||
|
@ -59,8 +59,6 @@ namespace ErrorCodes
|
|||||||
|
|
||||||
void registerStorageKafka(StorageFactory & factory)
|
void registerStorageKafka(StorageFactory & factory)
|
||||||
{
|
{
|
||||||
LOG_DEBUG(&Poco::Logger::get("registerStorageKafka"), "Top of registerStorageKafka");
|
|
||||||
|
|
||||||
auto creator_fn = [](const StorageFactory::Arguments & args) -> std::shared_ptr<IStorage>
|
auto creator_fn = [](const StorageFactory::Arguments & args) -> std::shared_ptr<IStorage>
|
||||||
{
|
{
|
||||||
ASTs & engine_args = args.engine_args;
|
ASTs & engine_args = args.engine_args;
|
||||||
@ -74,9 +72,6 @@ void registerStorageKafka(StorageFactory & factory)
|
|||||||
for (const auto & setting : kafka_settings->all())
|
for (const auto & setting : kafka_settings->all())
|
||||||
{
|
{
|
||||||
const auto & setting_name = setting.getName();
|
const auto & setting_name = setting.getName();
|
||||||
LOG_DEBUG(&Poco::Logger::get("registerStorageKafka"), "registerStorageKafka (named collection): processing {}", setting_name);
|
|
||||||
|
|
||||||
|
|
||||||
if (named_collection->has(setting_name))
|
if (named_collection->has(setting_name))
|
||||||
kafka_settings->set(setting_name, named_collection->get<String>(setting_name));
|
kafka_settings->set(setting_name, named_collection->get<String>(setting_name));
|
||||||
}
|
}
|
||||||
@ -85,9 +80,7 @@ void registerStorageKafka(StorageFactory & factory)
|
|||||||
|
|
||||||
if (has_settings)
|
if (has_settings)
|
||||||
{
|
{
|
||||||
LOG_DEBUG(&Poco::Logger::get("registerStorageKafka"), "registerStorageKafka: before loadFromQuery");
|
|
||||||
kafka_settings->loadFromQuery(*args.storage_def);
|
kafka_settings->loadFromQuery(*args.storage_def);
|
||||||
LOG_DEBUG(&Poco::Logger::get("registerStorageKafka"), "registerStorageKafka: after loadFromQuery");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check arguments and settings
|
// Check arguments and settings
|
||||||
@ -161,9 +154,7 @@ void registerStorageKafka(StorageFactory & factory)
|
|||||||
CHECK_KAFKA_STORAGE_ARGUMENT(12, kafka_poll_timeout_ms, 0)
|
CHECK_KAFKA_STORAGE_ARGUMENT(12, kafka_poll_timeout_ms, 0)
|
||||||
CHECK_KAFKA_STORAGE_ARGUMENT(13, kafka_flush_interval_ms, 0)
|
CHECK_KAFKA_STORAGE_ARGUMENT(13, kafka_flush_interval_ms, 0)
|
||||||
CHECK_KAFKA_STORAGE_ARGUMENT(14, kafka_thread_per_consumer, 0)
|
CHECK_KAFKA_STORAGE_ARGUMENT(14, kafka_thread_per_consumer, 0)
|
||||||
LOG_DEBUG(&Poco::Logger::get("registerStorageKafka"), "registerStorageKafka: before kafka_handle_error_mode CHECK_KAFKA_STORAGE_ARGUMENT");
|
|
||||||
CHECK_KAFKA_STORAGE_ARGUMENT(15, kafka_handle_error_mode, 0)
|
CHECK_KAFKA_STORAGE_ARGUMENT(15, kafka_handle_error_mode, 0)
|
||||||
LOG_DEBUG(&Poco::Logger::get("registerStorageKafka"), "registerStorageKafka: after kafka_handle_error_mode CHECK_KAFKA_STORAGE_ARGUMENT");
|
|
||||||
CHECK_KAFKA_STORAGE_ARGUMENT(16, kafka_commit_on_select, 0)
|
CHECK_KAFKA_STORAGE_ARGUMENT(16, kafka_commit_on_select, 0)
|
||||||
CHECK_KAFKA_STORAGE_ARGUMENT(17, kafka_max_rows_per_message, 0)
|
CHECK_KAFKA_STORAGE_ARGUMENT(17, kafka_max_rows_per_message, 0)
|
||||||
}
|
}
|
||||||
@ -291,8 +282,6 @@ void registerStorageKafka(StorageFactory & factory)
|
|||||||
args.table_id, args.getContext(), args.columns, args.comment, std::move(kafka_settings), collection_name);
|
args.table_id, args.getContext(), args.columns, args.comment, std::move(kafka_settings), collection_name);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
factory.registerStorage(
|
factory.registerStorage(
|
||||||
"Kafka",
|
"Kafka",
|
||||||
creator_fn,
|
creator_fn,
|
||||||
|
Loading…
Reference in New Issue
Block a user