From b50480cbde937e0e67c6c95b7cf956ff5342d99b Mon Sep 17 00:00:00 2001 From: Mikhail Filimonov Date: Tue, 19 May 2020 18:55:37 +0200 Subject: [PATCH 1/3] Fixes the potential missed data during termination of Kafka engine table --- src/Storages/Kafka/KafkaBlockInputStream.cpp | 8 +- .../Kafka/ReadBufferFromKafkaConsumer.cpp | 9 +- .../Kafka/ReadBufferFromKafkaConsumer.h | 3 +- src/Storages/Kafka/StorageKafka.cpp | 5 +- tests/integration/test_storage_kafka/test.py | 92 +++++++++++++++++++ 5 files changed, 110 insertions(+), 7 deletions(-) diff --git a/src/Storages/Kafka/KafkaBlockInputStream.cpp b/src/Storages/Kafka/KafkaBlockInputStream.cpp index 55ff8610941..9f19bd464ff 100644 --- a/src/Storages/Kafka/KafkaBlockInputStream.cpp +++ b/src/Storages/Kafka/KafkaBlockInputStream.cpp @@ -134,7 +134,11 @@ Block KafkaBlockInputStream::readImpl() auto new_rows = read_kafka_message(); - buffer->storeLastReadMessageOffset(); + // we can't store the offser after rebalance, when consumer is stalled, or if it's terminating + if (!buffer->storeLastReadMessageOffset()) { + total_rows = 0; + break; + } auto topic = buffer->currentTopic(); auto key = buffer->currentKey(); @@ -172,7 +176,7 @@ Block KafkaBlockInputStream::readImpl() } } - if (buffer->rebalanceHappened() || total_rows == 0) + if (total_rows == 0) return Block(); /// MATERIALIZED columns can be added here, but I think diff --git a/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp b/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp index eff4161ffb6..aeda60e90a7 100644 --- a/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp +++ b/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp @@ -336,12 +336,17 @@ bool ReadBufferFromKafkaConsumer::nextImpl() return true; } -void ReadBufferFromKafkaConsumer::storeLastReadMessageOffset() +bool ReadBufferFromKafkaConsumer::storeLastReadMessageOffset() { - if (!stalled && !rebalance_happened) + if (!stalled && !rebalance_happened && !stopped) { consumer->store_offset(*(current - 1)); ++offsets_stored; + return true; + } + else + { + return false; } } diff --git a/src/Storages/Kafka/ReadBufferFromKafkaConsumer.h b/src/Storages/Kafka/ReadBufferFromKafkaConsumer.h index c5b72ed6d7c..435b5f2a7c8 100644 --- a/src/Storages/Kafka/ReadBufferFromKafkaConsumer.h +++ b/src/Storages/Kafka/ReadBufferFromKafkaConsumer.h @@ -37,9 +37,8 @@ public: auto pollTimeout() const { return poll_timeout; } bool hasMorePolledMessages() const; - auto rebalanceHappened() const { return rebalance_happened; } - void storeLastReadMessageOffset(); + bool storeLastReadMessageOffset(); void resetToLastCommitted(const char * msg); // Return values for the message that's being read. diff --git a/src/Storages/Kafka/StorageKafka.cpp b/src/Storages/Kafka/StorageKafka.cpp index 5c4657403b7..793a9a29676 100644 --- a/src/Storages/Kafka/StorageKafka.cpp +++ b/src/Storages/Kafka/StorageKafka.cpp @@ -454,7 +454,10 @@ bool StorageKafka::streamToViews() else in = streams[0]; - copyData(*in, *block_io.out, &stream_cancelled); + // We can't cancel during copyData, as it's not aware of commits and other kafka-related stuff. + // It will be cancelled on underlying layer (kafka buffer) + std::atomic stub = {false}; + copyData(*in, *block_io.out, &stub); for (auto & stream : streams) stream->as()->commit(); diff --git a/tests/integration/test_storage_kafka/test.py b/tests/integration/test_storage_kafka/test.py index cbe96df3c29..9154ad67c05 100644 --- a/tests/integration/test_storage_kafka/test.py +++ b/tests/integration/test_storage_kafka/test.py @@ -1241,6 +1241,98 @@ def test_exception_from_destructor(kafka_cluster): assert TSV(instance.query('SELECT 1')) == TSV('1') +@pytest.mark.timeout(120) +def test_commits_of_unprocessed_messages_on_drop(kafka_cluster): + messages = [json.dumps({'key': j+1, 'value': j+1}) for j in range(1)] + kafka_produce('commits_of_unprocessed_messages_on_drop', messages) + + instance.query(''' + DROP TABLE IF EXISTS test.destination; + CREATE TABLE test.destination ( + key UInt64, + value UInt64, + _topic String, + _key String, + _offset UInt64, + _partition UInt64, + _timestamp Nullable(DateTime), + _consumed_by LowCardinality(String) + ) + ENGINE = MergeTree() + ORDER BY key; + + CREATE TABLE test.kafka (key UInt64, value UInt64) + ENGINE = Kafka + SETTINGS kafka_broker_list = 'kafka1:19092', + kafka_topic_list = 'commits_of_unprocessed_messages_on_drop', + kafka_group_name = 'commits_of_unprocessed_messages_on_drop_test_group', + kafka_format = 'JSONEachRow', + kafka_max_block_size = 1000; + + CREATE MATERIALIZED VIEW test.kafka_consumer TO test.destination AS + SELECT + key, + value, + _topic, + _key, + _offset, + _partition, + _timestamp + FROM test.kafka; + ''') + + while int(instance.query("SELECT count() FROM test.destination")) == 0: + print("Waiting for test.kafka_consumer to start consume") + time.sleep(1) + + cancel = threading.Event() + + i = [2] + def produce(): + while not cancel.is_set(): + messages = [] + for _ in range(113): + messages.append(json.dumps({'key': i[0], 'value': i[0]})) + i[0] += 1 + kafka_produce('commits_of_unprocessed_messages_on_drop', messages) + time.sleep(1) + + kafka_thread = threading.Thread(target=produce) + kafka_thread.start() + time.sleep(12) + + instance.query(''' + DROP TABLE test.kafka; + ''') + + instance.query(''' + CREATE TABLE test.kafka (key UInt64, value UInt64) + ENGINE = Kafka + SETTINGS kafka_broker_list = 'kafka1:19092', + kafka_topic_list = 'commits_of_unprocessed_messages_on_drop', + kafka_group_name = 'commits_of_unprocessed_messages_on_drop_test_group', + kafka_format = 'JSONEachRow', + kafka_max_block_size = 10000; + ''') + + cancel.set() + time.sleep(15) + + #kafka_cluster.open_bash_shell('instance') + # SELECT key, _timestamp, _offset FROM test.destination where runningDifference(key) <> 1 ORDER BY key; + + result = instance.query('SELECT count(), uniqExact(key), max(key) FROM test.destination') + print(result) + + instance.query(''' + DROP TABLE test.kafka_consumer; + DROP TABLE test.destination; + ''') + + kafka_thread.join() + assert TSV(result) == TSV('{0}\t{0}\t{0}'.format(i[0]-1)), 'Missing data!' + + @pytest.mark.timeout(1200) def test_kafka_duplicates_when_commit_failed(kafka_cluster): messages = [json.dumps({'key': j+1, 'value': 'x' * 300}) for j in range(22)] From 34198336eddb7bd07ae7831652fb1fe9d5e02318 Mon Sep 17 00:00:00 2001 From: Mikhail Filimonov Date: Tue, 19 May 2020 19:02:37 +0200 Subject: [PATCH 2/3] Style --- src/Storages/Kafka/KafkaBlockInputStream.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Storages/Kafka/KafkaBlockInputStream.cpp b/src/Storages/Kafka/KafkaBlockInputStream.cpp index 9f19bd464ff..4da5de69085 100644 --- a/src/Storages/Kafka/KafkaBlockInputStream.cpp +++ b/src/Storages/Kafka/KafkaBlockInputStream.cpp @@ -135,7 +135,8 @@ Block KafkaBlockInputStream::readImpl() auto new_rows = read_kafka_message(); // we can't store the offser after rebalance, when consumer is stalled, or if it's terminating - if (!buffer->storeLastReadMessageOffset()) { + if (!buffer->storeLastReadMessageOffset()) + { total_rows = 0; break; } From e8a13842afc9e40c3a693d983d1edb1830ac2dcc Mon Sep 17 00:00:00 2001 From: Mikhail Filimonov Date: Wed, 20 May 2020 11:02:02 +0200 Subject: [PATCH 3/3] Better fix, previous fix was wrong (was leaving the cycle by eof condition) --- src/Storages/Kafka/KafkaBlockInputStream.cpp | 9 ++---- .../Kafka/ReadBufferFromKafkaConsumer.cpp | 31 +++++++++++++------ .../Kafka/ReadBufferFromKafkaConsumer.h | 5 ++- 3 files changed, 28 insertions(+), 17 deletions(-) diff --git a/src/Storages/Kafka/KafkaBlockInputStream.cpp b/src/Storages/Kafka/KafkaBlockInputStream.cpp index 4da5de69085..a2403e66c50 100644 --- a/src/Storages/Kafka/KafkaBlockInputStream.cpp +++ b/src/Storages/Kafka/KafkaBlockInputStream.cpp @@ -134,12 +134,7 @@ Block KafkaBlockInputStream::readImpl() auto new_rows = read_kafka_message(); - // we can't store the offser after rebalance, when consumer is stalled, or if it's terminating - if (!buffer->storeLastReadMessageOffset()) - { - total_rows = 0; - break; - } + buffer->storeLastReadMessageOffset(); auto topic = buffer->currentTopic(); auto key = buffer->currentKey(); @@ -177,7 +172,7 @@ Block KafkaBlockInputStream::readImpl() } } - if (total_rows == 0) + if (buffer->polledDataUnusable() || total_rows == 0) return Block(); /// MATERIALIZED columns can be added here, but I think diff --git a/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp b/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp index aeda60e90a7..ad9d660a989 100644 --- a/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp +++ b/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp @@ -250,11 +250,23 @@ void ReadBufferFromKafkaConsumer::resetToLastCommitted(const char * msg) /// Do commit messages implicitly after we processed the previous batch. bool ReadBufferFromKafkaConsumer::nextImpl() { + /// NOTE: ReadBuffer was implemented with an immutable underlying contents in mind. /// If we failed to poll any message once - don't try again. /// Otherwise, the |poll_timeout| expectations get flawn. - if (stalled || stopped || !allowed || rebalance_happened) + + // we can react on stop only during fetching data + // after block is formed (i.e. during copying data to MV / commiting) we ignore stop attempts + if (stopped) + { + was_stopped = true; + offsets_stored = 0; return false; + } + + if (stalled || was_stopped || !allowed || rebalance_happened) + return false; + if (current == messages.end()) { @@ -267,7 +279,13 @@ bool ReadBufferFromKafkaConsumer::nextImpl() /// Don't drop old messages immediately, since we may need them for virtual columns. auto new_messages = consumer->poll_batch(batch_size, std::chrono::milliseconds(poll_timeout)); - if (rebalance_happened) + if (stopped) + { + was_stopped = true; + offsets_stored = 0; + return false; + } + else if (rebalance_happened) { if (!new_messages.empty()) { @@ -336,17 +354,12 @@ bool ReadBufferFromKafkaConsumer::nextImpl() return true; } -bool ReadBufferFromKafkaConsumer::storeLastReadMessageOffset() +void ReadBufferFromKafkaConsumer::storeLastReadMessageOffset() { - if (!stalled && !rebalance_happened && !stopped) + if (!stalled && !was_stopped && !rebalance_happened) { consumer->store_offset(*(current - 1)); ++offsets_stored; - return true; - } - else - { - return false; } } diff --git a/src/Storages/Kafka/ReadBufferFromKafkaConsumer.h b/src/Storages/Kafka/ReadBufferFromKafkaConsumer.h index 435b5f2a7c8..46dace827d0 100644 --- a/src/Storages/Kafka/ReadBufferFromKafkaConsumer.h +++ b/src/Storages/Kafka/ReadBufferFromKafkaConsumer.h @@ -37,8 +37,9 @@ public: auto pollTimeout() const { return poll_timeout; } bool hasMorePolledMessages() const; + bool polledDataUnusable() const { return (was_stopped || rebalance_happened); } - bool storeLastReadMessageOffset(); + void storeLastReadMessageOffset(); void resetToLastCommitted(const char * msg); // Return values for the message that's being read. @@ -68,6 +69,8 @@ private: bool rebalance_happened = false; + bool was_stopped = false; + // order is important, need to be destructed before consumer cppkafka::TopicPartitionList assignment; const Names topics;