Merge pull request #11048 from filimonov/kafka_missed_data_during_drop

Fixes the potential missed data during termination of Kafka engine table
This commit is contained in:
alexey-milovidov 2020-05-22 14:11:10 +03:00 committed by GitHub
commit b82c633716
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 121 additions and 6 deletions

View File

@ -172,7 +172,7 @@ Block KafkaBlockInputStream::readImpl()
}
}
if (buffer->rebalanceHappened() || total_rows == 0)
if (buffer->polledDataUnusable() || total_rows == 0)
return Block();
/// MATERIALIZED columns can be added here, but I think

View File

@ -250,11 +250,23 @@ void ReadBufferFromKafkaConsumer::resetToLastCommitted(const char * msg)
/// Do commit messages implicitly after we processed the previous batch.
bool ReadBufferFromKafkaConsumer::nextImpl()
{
/// NOTE: ReadBuffer was implemented with an immutable underlying contents in mind.
/// If we failed to poll any message once - don't try again.
/// Otherwise, the |poll_timeout| expectations get flawn.
if (stalled || stopped || !allowed || rebalance_happened)
// we can react on stop only during fetching data
// after block is formed (i.e. during copying data to MV / commiting) we ignore stop attempts
if (stopped)
{
was_stopped = true;
offsets_stored = 0;
return false;
}
if (stalled || was_stopped || !allowed || rebalance_happened)
return false;
if (current == messages.end())
{
@ -267,7 +279,13 @@ bool ReadBufferFromKafkaConsumer::nextImpl()
/// Don't drop old messages immediately, since we may need them for virtual columns.
auto new_messages = consumer->poll_batch(batch_size, std::chrono::milliseconds(poll_timeout));
if (rebalance_happened)
if (stopped)
{
was_stopped = true;
offsets_stored = 0;
return false;
}
else if (rebalance_happened)
{
if (!new_messages.empty())
{
@ -338,7 +356,7 @@ bool ReadBufferFromKafkaConsumer::nextImpl()
void ReadBufferFromKafkaConsumer::storeLastReadMessageOffset()
{
if (!stalled && !rebalance_happened)
if (!stalled && !was_stopped && !rebalance_happened)
{
consumer->store_offset(*(current - 1));
++offsets_stored;

View File

@ -37,7 +37,7 @@ public:
auto pollTimeout() const { return poll_timeout; }
bool hasMorePolledMessages() const;
auto rebalanceHappened() const { return rebalance_happened; }
bool polledDataUnusable() const { return (was_stopped || rebalance_happened); }
void storeLastReadMessageOffset();
void resetToLastCommitted(const char * msg);
@ -69,6 +69,8 @@ private:
bool rebalance_happened = false;
bool was_stopped = false;
// order is important, need to be destructed before consumer
cppkafka::TopicPartitionList assignment;
const Names topics;

View File

@ -454,7 +454,10 @@ bool StorageKafka::streamToViews()
else
in = streams[0];
copyData(*in, *block_io.out, &stream_cancelled);
// We can't cancel during copyData, as it's not aware of commits and other kafka-related stuff.
// It will be cancelled on underlying layer (kafka buffer)
std::atomic<bool> stub = {false};
copyData(*in, *block_io.out, &stub);
for (auto & stream : streams)
stream->as<KafkaBlockInputStream>()->commit();

View File

@ -1241,6 +1241,98 @@ def test_exception_from_destructor(kafka_cluster):
assert TSV(instance.query('SELECT 1')) == TSV('1')
@pytest.mark.timeout(120)
def test_commits_of_unprocessed_messages_on_drop(kafka_cluster):
messages = [json.dumps({'key': j+1, 'value': j+1}) for j in range(1)]
kafka_produce('commits_of_unprocessed_messages_on_drop', messages)
instance.query('''
DROP TABLE IF EXISTS test.destination;
CREATE TABLE test.destination (
key UInt64,
value UInt64,
_topic String,
_key String,
_offset UInt64,
_partition UInt64,
_timestamp Nullable(DateTime),
_consumed_by LowCardinality(String)
)
ENGINE = MergeTree()
ORDER BY key;
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'commits_of_unprocessed_messages_on_drop',
kafka_group_name = 'commits_of_unprocessed_messages_on_drop_test_group',
kafka_format = 'JSONEachRow',
kafka_max_block_size = 1000;
CREATE MATERIALIZED VIEW test.kafka_consumer TO test.destination AS
SELECT
key,
value,
_topic,
_key,
_offset,
_partition,
_timestamp
FROM test.kafka;
''')
while int(instance.query("SELECT count() FROM test.destination")) == 0:
print("Waiting for test.kafka_consumer to start consume")
time.sleep(1)
cancel = threading.Event()
i = [2]
def produce():
while not cancel.is_set():
messages = []
for _ in range(113):
messages.append(json.dumps({'key': i[0], 'value': i[0]}))
i[0] += 1
kafka_produce('commits_of_unprocessed_messages_on_drop', messages)
time.sleep(1)
kafka_thread = threading.Thread(target=produce)
kafka_thread.start()
time.sleep(12)
instance.query('''
DROP TABLE test.kafka;
''')
instance.query('''
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'commits_of_unprocessed_messages_on_drop',
kafka_group_name = 'commits_of_unprocessed_messages_on_drop_test_group',
kafka_format = 'JSONEachRow',
kafka_max_block_size = 10000;
''')
cancel.set()
time.sleep(15)
#kafka_cluster.open_bash_shell('instance')
# SELECT key, _timestamp, _offset FROM test.destination where runningDifference(key) <> 1 ORDER BY key;
result = instance.query('SELECT count(), uniqExact(key), max(key) FROM test.destination')
print(result)
instance.query('''
DROP TABLE test.kafka_consumer;
DROP TABLE test.destination;
''')
kafka_thread.join()
assert TSV(result) == TSV('{0}\t{0}\t{0}'.format(i[0]-1)), 'Missing data!'
@pytest.mark.timeout(1200)
def test_kafka_duplicates_when_commit_failed(kafka_cluster):
messages = [json.dumps({'key': j+1, 'value': 'x' * 300}) for j in range(22)]