diff --git a/dbms/src/Common/ErrorCodes.cpp b/dbms/src/Common/ErrorCodes.cpp index 72e25db1b80..f606151cd94 100644 --- a/dbms/src/Common/ErrorCodes.cpp +++ b/dbms/src/Common/ErrorCodes.cpp @@ -489,6 +489,7 @@ namespace ErrorCodes extern const int INCORRECT_ACCESS_ENTITY_DEFINITION = 515; extern const int AUTHENTICATION_FAILED = 516; extern const int CANNOT_ASSIGN_ALTER = 517; + extern const int CANNOT_COMMIT_OFFSET = 518; extern const int KEEPER_EXCEPTION = 999; extern const int POCO_EXCEPTION = 1000; diff --git a/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp b/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp index 9455390c00b..f137cb0c559 100644 --- a/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp +++ b/dbms/src/Storages/Kafka/ReadBufferFromKafkaConsumer.cpp @@ -8,6 +8,11 @@ namespace DB { +namespace ErrorCodes +{ + extern const int CANNOT_COMMIT_OFFSET; +} + using namespace std::chrono_literals; const auto MAX_TIME_TO_WAIT_FOR_ASSIGNMENT_MS = 15000; @@ -140,14 +145,40 @@ void ReadBufferFromKafkaConsumer::commit() // we may need to repeat commit in sync mode in revocation callback, // but it seems like existing API doesn't allow us to to that // in a controlled manner (i.e. we don't know the offsets to commit then) - consumer->commit(); + + size_t max_retries = 5; + bool commited = false; + + while (!commited && max_retries>0) + { + try + { + // See https://github.com/edenhill/librdkafka/issues/1470 + // broker may reject commit if during offsets.commit.timeout.ms (5000 by default), + // there were not enough replicas available for the __consumer_offsets topic. + // also some other temporary issues like client-server connectivity problems are possible + consumer->commit(); + commited = true; + print_offsets("Committed offset", consumer->get_offsets_committed(consumer->get_assignment())); + } + catch (const cppkafka::HandleException & e) + { + LOG_ERROR(log, "Exception during commit attempt: " << e.what()); + } + max_retries--; + } + + if (!commited) + { + // TODO: insert atomicity / transactions is needed here (possibility to rollback, ot 2 phase commits) + throw Exception("All commit attempts failed. Last block was already written to target table(s), but was not commited to Kafka.", ErrorCodes::CANNOT_COMMIT_OFFSET); + } } else { LOG_TRACE(log,"Nothing to commit."); } - print_offsets("Committed offset", consumer->get_offsets_committed(consumer->get_assignment())); offsets_stored = 0; stalled = false;