From 61c5df5662aa49340d7f3f1fc00faa4bc40043e2 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Sun, 3 May 2020 16:04:04 +0300 Subject: [PATCH] Drop claimed from the KafkaBlockInputStream --- src/Storages/Kafka/KafkaBlockInputStream.cpp | 3 +-- src/Storages/Kafka/KafkaBlockInputStream.h | 7 +++++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/Storages/Kafka/KafkaBlockInputStream.cpp b/src/Storages/Kafka/KafkaBlockInputStream.cpp index 18f7e696419..55ff8610941 100644 --- a/src/Storages/Kafka/KafkaBlockInputStream.cpp +++ b/src/Storages/Kafka/KafkaBlockInputStream.cpp @@ -33,7 +33,7 @@ KafkaBlockInputStream::KafkaBlockInputStream( KafkaBlockInputStream::~KafkaBlockInputStream() { - if (!claimed) + if (!buffer) return; if (broken) @@ -51,7 +51,6 @@ void KafkaBlockInputStream::readPrefixImpl() { auto timeout = std::chrono::milliseconds(context.getSettingsRef().kafka_max_wait_ms.totalMilliseconds()); buffer = storage.popReadBuffer(timeout); - claimed = !!buffer; if (!buffer) return; diff --git a/src/Storages/Kafka/KafkaBlockInputStream.h b/src/Storages/Kafka/KafkaBlockInputStream.h index 1c853a4d486..1f94ee332d3 100644 --- a/src/Storages/Kafka/KafkaBlockInputStream.h +++ b/src/Storages/Kafka/KafkaBlockInputStream.h @@ -33,9 +33,12 @@ private: UInt64 max_block_size; ConsumerBufferPtr buffer; - bool broken = true, finished = false, claimed = false, commit_in_suffix; + bool broken = true; + bool finished = false; + bool commit_in_suffix; - const Block non_virtual_header, virtual_header; + const Block non_virtual_header; + const Block virtual_header; }; }