diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index 918bc301754..4a664124b94 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -547,6 +547,7 @@ M(577, INVALID_SHARD_ID) \ M(578, INVALID_FORMAT_INSERT_QUERY_WITH_DATA) \ M(579, INCORRECT_PART_TYPE) \ + M(580, UNKNOWN_HANDLE_KAFKA_ERROR_MODE) \ \ M(998, POSTGRESQL_CONNECTION_FAILURE) \ M(999, KEEPER_EXCEPTION) \ diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 0b47cf1f2f7..140f34becc0 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -454,6 +454,7 @@ class IColumn; M(Bool, optimize_group_by_function_keys, true, "Eliminates functions of other keys in GROUP BY section", 0) \ M(UInt64, query_plan_max_optimizations_to_apply, 10000, "Limit the total number of optimizations applied to query plan. If zero, ignored. If limit reached, throw exception", 0) \ M(Bool, database_replicated_ddl_output, true, "Obsolete setting, does nothing. Will be removed after 2021-09-08", 0) \ + M(HandleKafkaErrorMode, handle_kafka_error_mode, HandleKafkaErrorMode::DEFAULT, "How to handle errors for Kafka engine. Passible values: default, stream.", 0) \ // End of COMMON_SETTINGS // Please add settings related to formats into the FORMAT_FACTORY_SETTINGS below. diff --git a/src/Core/SettingsEnums.cpp b/src/Core/SettingsEnums.cpp index 64ba51d1c68..e91be3690da 100644 --- a/src/Core/SettingsEnums.cpp +++ b/src/Core/SettingsEnums.cpp @@ -13,6 +13,7 @@ namespace ErrorCodes extern const int BAD_ARGUMENTS; extern const int UNKNOWN_MYSQL_DATATYPES_SUPPORT_LEVEL; extern const int UNKNOWN_UNION; + extern const int UNKNOWN_HANDLE_KAFKA_ERROR_MODE; } @@ -108,4 +109,7 @@ IMPLEMENT_SETTING_ENUM(DistributedDDLOutputMode, ErrorCodes::BAD_ARGUMENTS, {"null_status_on_timeout", DistributedDDLOutputMode::NULL_STATUS_ON_TIMEOUT}, {"never_throw", DistributedDDLOutputMode::NEVER_THROW}}) +IMPLEMENT_SETTING_ENUM(HandleKafkaErrorMode, ErrorCodes::UNKNOWN_HANDLE_KAFKA_ERROR_MODE, + {{"default", HandleKafkaErrorMode::DEFAULT}, + {"stream", HandleKafkaErrorMode::STREAM}}) } diff --git a/src/Core/SettingsEnums.h b/src/Core/SettingsEnums.h index 7615b185a61..b44202b9446 100644 --- a/src/Core/SettingsEnums.h +++ b/src/Core/SettingsEnums.h @@ -138,7 +138,6 @@ enum class UnionMode DECLARE_SETTING_ENUM(UnionMode) - enum class DistributedDDLOutputMode { NONE, @@ -149,4 +148,13 @@ enum class DistributedDDLOutputMode DECLARE_SETTING_ENUM(DistributedDDLOutputMode) +enum class HandleKafkaErrorMode +{ + DEFAULT = 0, // Ignore errors whith threshold. + STREAM, // Put errors to stream in the virtual column named ``_error. + /*FIXED_SYSTEM_TABLE, Put errors to in a fixed system table likey system.kafka_errors. This is not implemented now. */ + /*CUSTOM_SYSTEM_TABLE, Put errors to in a custom system table. This is not implemented now. */ +}; + +DECLARE_SETTING_ENUM(HandleKafkaErrorMode) } diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp index f7f32cf9b6f..eecf97aa848 100644 --- a/src/Formats/FormatFactory.cpp +++ b/src/Formats/FormatFactory.cpp @@ -269,7 +269,8 @@ InputFormatPtr FormatFactory::getInputFormat( const Block & sample, const Context & context, UInt64 max_block_size, - const std::optional & _format_settings) const + const std::optional & _format_settings, + const bool sync_after_error) const { const auto & input_getter = getCreators(name).input_processor_creator; if (!input_getter) @@ -289,7 +290,7 @@ InputFormatPtr FormatFactory::getInputFormat( params.allow_errors_ratio = format_settings.input_allow_errors_ratio; params.max_execution_time = settings.max_execution_time; params.timeout_overflow_mode = settings.timeout_overflow_mode; - + params.sync_after_error = sync_after_error; auto format = input_getter(buf, sample, params, format_settings); /// It's a kludge. Because I cannot remove context from values format. diff --git a/src/Formats/FormatFactory.h b/src/Formats/FormatFactory.h index 4fa7e9a0c01..349120aaad4 100644 --- a/src/Formats/FormatFactory.h +++ b/src/Formats/FormatFactory.h @@ -133,7 +133,8 @@ public: const Block & sample, const Context & context, UInt64 max_block_size, - const std::optional & format_settings = std::nullopt) const; + const std::optional & format_settings = std::nullopt, + const bool sync_after_error = false) const; /// Checks all preconditions. Returns ordinary format if parallel formatting cannot be done. OutputFormatPtr getOutputFormatParallelIfPossible( diff --git a/src/Processors/Formats/IRowInputFormat.h b/src/Processors/Formats/IRowInputFormat.h index c802bd3066b..7061db75765 100644 --- a/src/Processors/Formats/IRowInputFormat.h +++ b/src/Processors/Formats/IRowInputFormat.h @@ -29,6 +29,7 @@ struct RowInputFormatParams Poco::Timespan max_execution_time = 0; OverflowMode timeout_overflow_mode = OverflowMode::THROW; + bool sync_after_error = false; }; bool isParseError(int code); diff --git a/src/Storages/Kafka/KafkaBlockInputStream.cpp b/src/Storages/Kafka/KafkaBlockInputStream.cpp index bf985902b4d..3d6de219034 100644 --- a/src/Storages/Kafka/KafkaBlockInputStream.cpp +++ b/src/Storages/Kafka/KafkaBlockInputStream.cpp @@ -35,8 +35,8 @@ KafkaBlockInputStream::KafkaBlockInputStream( , max_block_size(max_block_size_) , commit_in_suffix(commit_in_suffix_) , non_virtual_header(metadata_snapshot->getSampleBlockNonMaterialized()) - , virtual_header(metadata_snapshot->getSampleBlockForColumns( - {"_topic", "_key", "_offset", "_partition", "_timestamp", "_timestamp_ms", "_headers.name", "_headers.value"}, storage.getVirtuals(), storage.getStorageID())) + , virtual_header(metadata_snapshot->getSampleBlockForColumns(storage.getVirtualColumnNames(), storage.getVirtuals(), storage.getStorageID())) + , handle_error_mode(context_->getSettings().handle_kafka_error_mode) { } @@ -78,20 +78,23 @@ Block KafkaBlockInputStream::readImpl() // now it's one-time usage InputStream // one block of the needed size (or with desired flush timeout) is formed in one internal iteration // otherwise external iteration will reuse that and logic will became even more fuzzy - MutableColumns result_columns = non_virtual_header.cloneEmptyColumns(); MutableColumns virtual_columns = virtual_header.cloneEmptyColumns(); + auto put_error_to_stream = handle_error_mode == HandleKafkaErrorMode::STREAM; + auto input_format = FormatFactory::instance().getInputFormat( - storage.getFormatName(), *buffer, non_virtual_header, *context, max_block_size); + storage.getFormatName(), *buffer, non_virtual_header, *context, max_block_size, {}, put_error_to_stream); InputPort port(input_format->getPort().getHeader(), input_format.get()); connect(input_format->getPort(), port); port.setNeeded(); + std::string exception_message; auto read_kafka_message = [&] { size_t new_rows = 0; + exception_message.clear(); while (true) { @@ -100,7 +103,22 @@ Block KafkaBlockInputStream::readImpl() switch (status) { case IProcessor::Status::Ready: - input_format->work(); + try + { + input_format->work(); + } + catch (const Exception & e) + { + if (put_error_to_stream) + { + exception_message = e.message(); + new_rows++; + } + else + { + throw e; + } + } break; case IProcessor::Status::Finished: @@ -138,6 +156,11 @@ Block KafkaBlockInputStream::readImpl() { auto new_rows = buffer->poll() ? read_kafka_message() : 0; + if (!exception_message.empty()) + { + for (size_t i = 0; i < result_columns.size(); ++i) + result_columns[i]->insertDefault(); + } if (new_rows) { // In read_kafka_message(), ReadBufferFromKafkaConsumer::nextImpl() @@ -189,6 +212,15 @@ Block KafkaBlockInputStream::readImpl() } virtual_columns[6]->insert(headers_names); virtual_columns[7]->insert(headers_values); + if (put_error_to_stream) + { + auto payload = buffer->currentPayload(); + virtual_columns[8]->insert(payload); + if (exception_message.empty()) + virtual_columns[9]->insertDefault(); + else + virtual_columns[9]->insert(exception_message); + } } total_rows = total_rows + new_rows; diff --git a/src/Storages/Kafka/KafkaBlockInputStream.h b/src/Storages/Kafka/KafkaBlockInputStream.h index 517df6ecaf7..c77f17803e7 100644 --- a/src/Storages/Kafka/KafkaBlockInputStream.h +++ b/src/Storages/Kafka/KafkaBlockInputStream.h @@ -51,6 +51,7 @@ private: const Block non_virtual_header; const Block virtual_header; + const HandleKafkaErrorMode handle_error_mode; }; } diff --git a/src/Storages/Kafka/ReadBufferFromKafkaConsumer.h b/src/Storages/Kafka/ReadBufferFromKafkaConsumer.h index 1d889655941..49d3df0e180 100644 --- a/src/Storages/Kafka/ReadBufferFromKafkaConsumer.h +++ b/src/Storages/Kafka/ReadBufferFromKafkaConsumer.h @@ -63,6 +63,7 @@ public: auto currentPartition() const { return current[-1].get_partition(); } auto currentTimestamp() const { return current[-1].get_timestamp(); } const auto & currentHeaderList() const { return current[-1].get_header_list(); } + String currentPayload() const { return current[-1].get_payload(); } private: using Messages = std::vector; diff --git a/src/Storages/Kafka/StorageKafka.cpp b/src/Storages/Kafka/StorageKafka.cpp index 45e4ec538a1..027331a8330 100644 --- a/src/Storages/Kafka/StorageKafka.cpp +++ b/src/Storages/Kafka/StorageKafka.cpp @@ -760,7 +760,7 @@ void registerStorageKafka(StorageFactory & factory) NamesAndTypesList StorageKafka::getVirtuals() const { - return NamesAndTypesList{ + auto result = NamesAndTypesList{ {"_topic", std::make_shared()}, {"_key", std::make_shared()}, {"_offset", std::make_shared()}, @@ -770,6 +770,32 @@ NamesAndTypesList StorageKafka::getVirtuals() const {"_headers.name", std::make_shared(std::make_shared())}, {"_headers.value", std::make_shared(std::make_shared())} }; + if (handle_error_mode == HandleKafkaErrorMode::STREAM) + { + result.push_back({"_raw_message", std::make_shared()}); + result.push_back({"_error", std::make_shared()}); + } + return result; +} + +Names StorageKafka::getVirtualColumnNames() const +{ + auto result = Names { + "_topic", + "_key", + "_offset", + "_partition", + "_timestamp", + "_timestamp_ms", + "_headers.name", + "_headers.value", + }; + if (handle_error_mode == HandleKafkaErrorMode::STREAM) + { + result.push_back({"_raw_message"}); + result.push_back({"_error"}); + } + return result; } } diff --git a/src/Storages/Kafka/StorageKafka.h b/src/Storages/Kafka/StorageKafka.h index 53871990810..72690418c18 100644 --- a/src/Storages/Kafka/StorageKafka.h +++ b/src/Storages/Kafka/StorageKafka.h @@ -64,6 +64,7 @@ public: const auto & getFormatName() const { return format_name; } NamesAndTypesList getVirtuals() const override; + Names getVirtualColumnNames() const; protected: StorageKafka( const StorageID & table_id_, @@ -112,6 +113,9 @@ private: std::mutex thread_statuses_mutex; std::list> thread_statuses; + /// Handle error mode + HandleKafkaErrorMode handle_error_mode; + SettingsChanges createSettingsAdjustments(); ConsumerBufferPtr createReadBuffer(const size_t consumer_number);