Merge pull request #9884 from filimonov/kafka_commit_retry

Kafka retry commits on failure
This commit is contained in:
alexey-milovidov 2020-03-26 22:48:49 +03:00 committed by GitHub
commit 6f87f7ba68
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 106 additions and 9 deletions

View File

@ -489,6 +489,7 @@ namespace ErrorCodes
extern const int INCORRECT_ACCESS_ENTITY_DEFINITION = 515;
extern const int AUTHENTICATION_FAILED = 516;
extern const int CANNOT_ASSIGN_ALTER = 517;
extern const int CANNOT_COMMIT_OFFSET = 518;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

View File

@ -8,6 +8,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_COMMIT_OFFSET;
}
using namespace std::chrono_literals;
const auto MAX_TIME_TO_WAIT_FOR_ASSIGNMENT_MS = 15000;
@ -31,14 +36,14 @@ ReadBufferFromKafkaConsumer::ReadBufferFromKafkaConsumer(
, topics(_topics)
{
// called (synchroniously, during poll) when we enter the consumer group
consumer->set_assignment_callback([this](const cppkafka::TopicPartitionList& topic_partitions)
consumer->set_assignment_callback([this](const cppkafka::TopicPartitionList & topic_partitions)
{
LOG_TRACE(log, "Topics/partitions assigned: " << topic_partitions);
assignment = topic_partitions;
});
// called (synchroniously, during poll) when we leave the consumer group
consumer->set_revocation_callback([this](const cppkafka::TopicPartitionList& topic_partitions)
consumer->set_revocation_callback([this](const cppkafka::TopicPartitionList & topic_partitions)
{
// Rebalance is happening now, and now we have a chance to finish the work
// with topics/partitions we were working with before rebalance
@ -140,16 +145,41 @@ void ReadBufferFromKafkaConsumer::commit()
// we may need to repeat commit in sync mode in revocation callback,
// but it seems like existing API doesn't allow us to to that
// in a controlled manner (i.e. we don't know the offsets to commit then)
consumer->commit();
size_t max_retries = 5;
bool commited = false;
while (!commited && max_retries > 0)
{
try
{
// See https://github.com/edenhill/librdkafka/issues/1470
// broker may reject commit if during offsets.commit.timeout.ms (5000 by default),
// there were not enough replicas available for the __consumer_offsets topic.
// also some other temporary issues like client-server connectivity problems are possible
consumer->commit();
commited = true;
print_offsets("Committed offset", consumer->get_offsets_committed(consumer->get_assignment()));
}
catch (const cppkafka::HandleException & e)
{
LOG_ERROR(log, "Exception during commit attempt: " << e.what());
}
--max_retries;
}
if (!commited)
{
// TODO: insert atomicity / transactions is needed here (possibility to rollback, ot 2 phase commits)
throw Exception("All commit attempts failed. Last block was already written to target table(s), but was not commited to Kafka.", ErrorCodes::CANNOT_COMMIT_OFFSET);
}
}
else
{
LOG_TRACE(log,"Nothing to commit.");
LOG_TRACE(log, "Nothing to commit.");
}
print_offsets("Committed offset", consumer->get_offsets_committed(consumer->get_assignment()));
offsets_stored = 0;
stalled = false;
}
@ -222,8 +252,7 @@ void ReadBufferFromKafkaConsumer::resetToLastCommitted(const char * msg)
}
auto committed_offset = consumer->get_offsets_committed(consumer->get_assignment());
consumer->assign(committed_offset);
LOG_TRACE(log, msg << "Returned to committed position: " << committed_offset);
LOG_TRACE(log, msg << " Returned to committed position: " << committed_offset);
}
/// Do commit messages implicitly after we processed the previous batch.

View File

@ -474,7 +474,7 @@ class ClickHouseCluster:
instance.client = Client(instance.ip_address, command=self.client_bin_path)
self.is_up = True
except BaseException, e:
print "Failed to start cluster: "
print str(e)
@ -506,6 +506,13 @@ class ClickHouseCluster:
if sanitizer_assert_instance is not None:
raise Exception("Sanitizer assert found in {} for instance {}".format(self.docker_logs_path, sanitizer_assert_instance))
def pause_container(self, instance_name):
subprocess_check_call(self.base_cmd + ['pause', instance_name])
# subprocess_check_call(self.base_cmd + ['kill', '-s SIGSTOP', instance_name])
def unpause_container(self, instance_name):
subprocess_check_call(self.base_cmd + ['unpause', instance_name])
# subprocess_check_call(self.base_cmd + ['kill', '-s SIGCONT', instance_name])
def open_bash_shell(self, instance_name):
os.system(' '.join(self.base_cmd + ['exec', instance_name, '/bin/bash']))

View File

@ -1194,6 +1194,66 @@ def test_exception_from_destructor(kafka_cluster):
assert TSV(instance.query('SELECT 1')) == TSV('1')
@pytest.mark.timeout(1200)
def test_kafka_duplicates_when_commit_failed(kafka_cluster):
messages = [json.dumps({'key': j+1, 'value': 'x' * 300}) for j in range(22)]
kafka_produce('duplicates_when_commit_failed', messages)
instance.query('''
DROP TABLE IF EXISTS test.view;
DROP TABLE IF EXISTS test.consumer;
CREATE TABLE test.kafka (key UInt64, value String)
ENGINE = Kafka
SETTINGS kafka_broker_list = 'kafka1:19092',
kafka_topic_list = 'duplicates_when_commit_failed',
kafka_group_name = 'duplicates_when_commit_failed',
kafka_format = 'JSONEachRow',
kafka_max_block_size = 20;
CREATE TABLE test.view (key UInt64, value String)
ENGINE = MergeTree()
ORDER BY key;
CREATE MATERIALIZED VIEW test.consumer TO test.view AS
SELECT * FROM test.kafka
WHERE NOT sleepEachRow(0.5);
''')
#print time.strftime("%m/%d/%Y %H:%M:%S")
time.sleep(12) # 5-6 sec to connect to kafka, do subscription, and fetch 20 rows, another 10 sec for MV, after that commit should happen
#print time.strftime("%m/%d/%Y %H:%M:%S")
kafka_cluster.pause_container('kafka1')
# that timeout it VERY important, and picked after lot of experiments
# when too low (<30sec) librdkafka will not report any timeout (alternative is to decrease the default session timeouts for librdkafka)
# when too high (>50sec) broker will decide to remove us from the consumer group, and will start answering "Broker: Unknown member"
time.sleep(40)
#print time.strftime("%m/%d/%Y %H:%M:%S")
kafka_cluster.unpause_container('kafka1')
#kafka_cluster.open_bash_shell('instance')
# connection restored and it will take a while until next block will be flushed
# it takes years on CI :\
time.sleep(30)
# as it's a bit tricky to hit the proper moment - let's check in logs if we did it correctly
assert instance.contains_in_log("Local: Waiting for coordinator")
result = instance.query('SELECT count(), uniqExact(key), max(key) FROM test.view')
print(result)
instance.query('''
DROP TABLE test.consumer;
DROP TABLE test.view;
''')
assert TSV(result) == TSV('22\t22\t22')
if __name__ == '__main__':
cluster.start()
raw_input("Cluster created, press any key to destroy...")