Add retry logic for Kafka commits

This commit is contained in:
Mikhail Filimonov 2020-03-26 12:52:16 +01:00
parent adca27cb45
commit 4fbe0e230e
2 changed files with 34 additions and 2 deletions

View File

@ -489,6 +489,7 @@ namespace ErrorCodes
extern const int INCORRECT_ACCESS_ENTITY_DEFINITION = 515;
extern const int AUTHENTICATION_FAILED = 516;
extern const int CANNOT_ASSIGN_ALTER = 517;
extern const int CANNOT_COMMIT_OFFSET = 518;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

View File

@ -8,6 +8,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_COMMIT_OFFSET;
}
using namespace std::chrono_literals;
const auto MAX_TIME_TO_WAIT_FOR_ASSIGNMENT_MS = 15000;
@ -140,14 +145,40 @@ void ReadBufferFromKafkaConsumer::commit()
// we may need to repeat commit in sync mode in revocation callback,
// but it seems like existing API doesn't allow us to to that
// in a controlled manner (i.e. we don't know the offsets to commit then)
consumer->commit();
size_t max_retries = 5;
bool commited = false;
while (!commited && max_retries>0)
{
try
{
// See https://github.com/edenhill/librdkafka/issues/1470
// broker may reject commit if during offsets.commit.timeout.ms (5000 by default),
// there were not enough replicas available for the __consumer_offsets topic.
// also some other temporary issues like client-server connectivity problems are possible
consumer->commit();
commited = true;
print_offsets("Committed offset", consumer->get_offsets_committed(consumer->get_assignment()));
}
catch (const cppkafka::HandleException & e)
{
LOG_ERROR(log, "Exception during commit attempt: " << e.what());
}
max_retries--;
}
if (!commited)
{
// TODO: insert atomicity / transactions is needed here (possibility to rollback, ot 2 phase commits)
throw Exception("All commit attempts failed. Last block was already written to target table(s), but was not commited to Kafka.", ErrorCodes::CANNOT_COMMIT_OFFSET);
}
}
else
{
LOG_TRACE(log,"Nothing to commit.");
}
print_offsets("Committed offset", consumer->get_offsets_committed(consumer->get_assignment()));
offsets_stored = 0;
stalled = false;