From 1afd655bbc0d47f1fde625c3cc06df246498bc7f Mon Sep 17 00:00:00 2001 From: avogar Date: Tue, 10 Oct 2023 16:21:33 +0000 Subject: [PATCH] Allow to save unparsed records and errors in RabbitMQ, NATS and FileLog engines --- src/Core/Settings.h | 2 +- src/Core/SettingsEnums.cpp | 6 +- src/Core/SettingsEnums.h | 4 +- .../DataLakes/IcebergMetadataParser.cpp | 1 - src/Storages/FileLog/Buffer_fwd.h | 4 +- src/Storages/FileLog/FileLogConsumer.cpp | 165 ++++++++++++++++++ src/Storages/FileLog/FileLogConsumer.h | 82 +++++++++ src/Storages/FileLog/FileLogSettings.h | 3 +- src/Storages/FileLog/FileLogSource.cpp | 71 ++++++-- src/Storages/FileLog/FileLogSource.h | 10 +- .../FileLog/ReadBufferFromFileLog.cpp | 1 + src/Storages/FileLog/ReadBufferFromFileLog.h | 2 + src/Storages/FileLog/StorageFileLog.cpp | 21 ++- src/Storages/FileLog/StorageFileLog.h | 2 - src/Storages/Kafka/KafkaConsumer.h | 2 +- src/Storages/Kafka/KafkaSettings.h | 13 +- src/Storages/Kafka/KafkaSource.cpp | 10 +- src/Storages/Kafka/KafkaSource.h | 2 +- src/Storages/Kafka/StorageKafka.cpp | 6 +- src/Storages/Kafka/StorageKafka.h | 2 +- src/Storages/NATS/NATSConsumer.h | 1 + src/Storages/NATS/NATSSettings.h | 13 +- src/Storages/NATS/NATSSource.cpp | 59 ++++++- src/Storages/NATS/NATSSource.h | 7 +- src/Storages/NATS/StorageNATS.cpp | 21 ++- src/Storages/RabbitMQ/RabbitMQSettings.h | 11 +- src/Storages/RabbitMQ/RabbitMQSource.cpp | 54 +++++- src/Storages/RabbitMQ/RabbitMQSource.h | 3 + src/Storages/RabbitMQ/StorageRabbitMQ.cpp | 21 ++- .../integration/test_storage_rabbitmq/test.py | 97 ++++++++++ .../02889_file_log_save_errors.reference | 20 +++ .../0_stateless/02889_file_log_save_errors.sh | 46 +++++ 32 files changed, 680 insertions(+), 82 deletions(-) create mode 100644 src/Storages/FileLog/FileLogConsumer.cpp create mode 100644 src/Storages/FileLog/FileLogConsumer.h create mode 100644 tests/queries/0_stateless/02889_file_log_save_errors.reference create mode 100755 tests/queries/0_stateless/02889_file_log_save_errors.sh diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 10134a3a545..9eaed07760e 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -828,7 +828,7 @@ class IColumn; MAKE_OBSOLETE(M, Bool, allow_experimental_geo_types, true) \ \ MAKE_OBSOLETE(M, Milliseconds, async_insert_stale_timeout_ms, 0) \ - MAKE_OBSOLETE(M, HandleKafkaErrorMode, handle_kafka_error_mode, HandleKafkaErrorMode::DEFAULT) \ + MAKE_OBSOLETE(M, StreamingHandleErrorMode, handle_kafka_error_mode, StreamingHandleErrorMode::DEFAULT) \ MAKE_OBSOLETE(M, Bool, database_replicated_ddl_output, true) \ MAKE_OBSOLETE(M, UInt64, replication_alter_columns_timeout, 60) \ MAKE_OBSOLETE(M, UInt64, odbc_max_field_size, 0) \ diff --git a/src/Core/SettingsEnums.cpp b/src/Core/SettingsEnums.cpp index a30d8040f47..b4301434200 100644 --- a/src/Core/SettingsEnums.cpp +++ b/src/Core/SettingsEnums.cpp @@ -110,9 +110,9 @@ IMPLEMENT_SETTING_ENUM(DistributedDDLOutputMode, ErrorCodes::BAD_ARGUMENTS, {"null_status_on_timeout", DistributedDDLOutputMode::NULL_STATUS_ON_TIMEOUT}, {"never_throw", DistributedDDLOutputMode::NEVER_THROW}}) -IMPLEMENT_SETTING_ENUM(HandleKafkaErrorMode, ErrorCodes::BAD_ARGUMENTS, - {{"default", HandleKafkaErrorMode::DEFAULT}, - {"stream", HandleKafkaErrorMode::STREAM}}) +IMPLEMENT_SETTING_ENUM(StreamingHandleErrorMode, ErrorCodes::BAD_ARGUMENTS, + {{"default", StreamingHandleErrorMode::DEFAULT}, + {"stream", StreamingHandleErrorMode::STREAM}}) IMPLEMENT_SETTING_ENUM(ShortCircuitFunctionEvaluation, ErrorCodes::BAD_ARGUMENTS, {{"enable", ShortCircuitFunctionEvaluation::ENABLE}, diff --git a/src/Core/SettingsEnums.h b/src/Core/SettingsEnums.h index 034e4c8c887..0cd53da868f 100644 --- a/src/Core/SettingsEnums.h +++ b/src/Core/SettingsEnums.h @@ -163,7 +163,7 @@ enum class DistributedDDLOutputMode DECLARE_SETTING_ENUM(DistributedDDLOutputMode) -enum class HandleKafkaErrorMode +enum class StreamingHandleErrorMode { DEFAULT = 0, // Ignore errors with threshold. STREAM, // Put errors to stream in the virtual column named ``_error. @@ -171,7 +171,7 @@ enum class HandleKafkaErrorMode /*CUSTOM_SYSTEM_TABLE, Put errors to in a custom system table. This is not implemented now. */ }; -DECLARE_SETTING_ENUM(HandleKafkaErrorMode) +DECLARE_SETTING_ENUM(StreamingHandleErrorMode) enum class ShortCircuitFunctionEvaluation { diff --git a/src/Storages/DataLakes/IcebergMetadataParser.cpp b/src/Storages/DataLakes/IcebergMetadataParser.cpp index 3820bb08247..d57549ad838 100644 --- a/src/Storages/DataLakes/IcebergMetadataParser.cpp +++ b/src/Storages/DataLakes/IcebergMetadataParser.cpp @@ -214,7 +214,6 @@ struct IcebergMetadataParser::Impl { auto buffer = MetadataReadHelper::createReadBuffer(manifest_file, context, configuration); auto file_reader = std::make_unique(std::make_unique(*buffer)); - avro::NodePtr root_node = file_reader->dataSchema().root(); size_t leaves_num = root_node->leaves(); size_t expected_min_num = metadata.format_version == 1 ? 3 : 2; diff --git a/src/Storages/FileLog/Buffer_fwd.h b/src/Storages/FileLog/Buffer_fwd.h index ec644aa7d36..5c6416196bf 100644 --- a/src/Storages/FileLog/Buffer_fwd.h +++ b/src/Storages/FileLog/Buffer_fwd.h @@ -4,7 +4,7 @@ namespace DB { -class ReadBufferFromFileLog; +class FileLogConsumer; -using ReadBufferFromFileLogPtr = std::shared_ptr; +using ReadBufferFromFileLogPtr = std::shared_ptr; } diff --git a/src/Storages/FileLog/FileLogConsumer.cpp b/src/Storages/FileLog/FileLogConsumer.cpp new file mode 100644 index 00000000000..7268b72f793 --- /dev/null +++ b/src/Storages/FileLog/FileLogConsumer.cpp @@ -0,0 +1,165 @@ +#include +#include +#include + +#include +#include + +#include +#include + +namespace DB +{ +namespace ErrorCodes +{ + extern const int CANNOT_READ_ALL_DATA; +} + +FileLogConsumer::FileLogConsumer( + StorageFileLog & storage_, + size_t max_batch_size, + size_t poll_timeout_, + ContextPtr context_, + size_t stream_number_, + size_t max_streams_number_) + : log(&Poco::Logger::get("FileLogConsumer " + toString(stream_number_))) + , storage(storage_) + , batch_size(max_batch_size) + , poll_timeout(poll_timeout_) + , context(context_) + , stream_number(stream_number_) + , max_streams_number(max_streams_number_) +{ + current = records.begin(); + allowed = false; +} + +ReadBufferPtr FileLogConsumer::consume() +{ + if (hasMorePolledRecords()) + return getNextRecord(); + + auto new_records = pollBatch(batch_size); + if (new_records.empty()) + { + buffer_status = BufferStatus::NO_RECORD_RETURNED; + LOG_TRACE(log, "No new records to read"); + return nullptr; + } + else + { + records = std::move(new_records); + current = records.begin(); + + LOG_TRACE(log, "Polled batch of {} records. ", records.size()); + + buffer_status = BufferStatus::POLLED_OK; + allowed = true; + return getNextRecord(); + } +} + +FileLogConsumer::Records FileLogConsumer::pollBatch(size_t batch_size_) +{ + Records new_records; + new_records.reserve(batch_size_); + + readNewRecords(new_records, batch_size); + if (new_records.size() == batch_size_ || stream_out) + return new_records; + + Stopwatch watch; + while (watch.elapsedMilliseconds() < poll_timeout && new_records.size() != batch_size_) + { + readNewRecords(new_records, batch_size); + /// All ifstrem reach end, no need to wait for timeout, + /// since file status can not be updated during a streamToViews + if (stream_out) + break; + } + + return new_records; +} + +void FileLogConsumer::readNewRecords(FileLogConsumer::Records & new_records, size_t batch_size_) +{ + size_t need_records_size = batch_size_ - new_records.size(); + size_t read_records_size = 0; + + auto & file_infos = storage.getFileInfos(); + + size_t files_per_stream = file_infos.file_names.size() / max_streams_number; + size_t start = stream_number * files_per_stream; + size_t end = stream_number == max_streams_number - 1 ? file_infos.file_names.size() : (stream_number + 1) * files_per_stream; + + for (size_t i = start; i < end; ++i) + { + const auto & file_name = file_infos.file_names[i]; + + auto & file_ctx = StorageFileLog::findInMap(file_infos.context_by_name, file_name); + if (file_ctx.status == StorageFileLog::FileStatus::NO_CHANGE) + continue; + + auto & file_meta = StorageFileLog::findInMap(file_infos.meta_by_inode, file_ctx.inode); + + if (!file_ctx.reader) + throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Ifstream for file {} is not initialized", file_meta.file_name); + + auto & reader = file_ctx.reader.value(); + StorageFileLog::assertStreamGood(reader); + + Record record; + while (read_records_size < need_records_size) + { + /// Need to get offset before reading record from stream + auto offset = reader.tellg(); + if (static_cast(offset) >= file_meta.last_open_end) + break; + record.offset = offset; + StorageFileLog::assertStreamGood(reader); + + record.file_name = file_name; + + + std::getline(reader, record.data); + StorageFileLog::assertStreamGood(reader); + + new_records.emplace_back(record); + ++read_records_size; + } + + UInt64 current_position = reader.tellg(); + StorageFileLog::assertStreamGood(reader); + + file_meta.last_writen_position = current_position; + + /// stream reach to end + if (current_position == file_meta.last_open_end) + { + file_ctx.status = StorageFileLog::FileStatus::NO_CHANGE; + } + + /// All ifstream reach end + if (i == end - 1 && (file_ctx.status == StorageFileLog::FileStatus::NO_CHANGE)) + { + stream_out = true; + } + + if (read_records_size == need_records_size) + { + break; + } + } +} + +ReadBufferPtr FileLogConsumer::getNextRecord() +{ + if (!hasMorePolledRecords()) + return nullptr; + + auto buf = std::make_unique(current->data); + ++current; + return buf; +} + +} diff --git a/src/Storages/FileLog/FileLogConsumer.h b/src/Storages/FileLog/FileLogConsumer.h new file mode 100644 index 00000000000..d9f31521b3d --- /dev/null +++ b/src/Storages/FileLog/FileLogConsumer.h @@ -0,0 +1,82 @@ +#pragma once + +#include +#include +#include + +#include +#include + +namespace DB +{ +class FileLogConsumer +{ +public: + FileLogConsumer( + StorageFileLog & storage_, + size_t max_batch_size, + size_t poll_timeout_, + ContextPtr context_, + size_t stream_number_, + size_t max_streams_number_); + + auto pollTimeout() const { return poll_timeout; } + + bool hasMorePolledRecords() const { return current != records.end(); } + + ReadBufferPtr consume(); + + bool noRecords() { return buffer_status == BufferStatus::NO_RECORD_RETURNED; } + + auto getFileName() const { return current[-1].file_name; } + auto getOffset() const { return current[-1].offset; } + const String & getCurrentRecord() const { return current[-1].data; } + +private: + enum class BufferStatus + { + INIT, + NO_RECORD_RETURNED, + POLLED_OK, + }; + + BufferStatus buffer_status = BufferStatus::INIT; + + Poco::Logger * log; + + StorageFileLog & storage; + + bool stream_out = false; + + size_t batch_size; + size_t poll_timeout; + + ContextPtr context; + + size_t stream_number; + size_t max_streams_number; + + bool allowed = true; + + using RecordData = std::string; + struct Record + { + RecordData data; + std::string file_name; + /// Offset is the start of a row, which is needed for virtual columns. + UInt64 offset; + }; + using Records = std::vector; + + Records records; + Records::const_iterator current; + + using TaskThread = BackgroundSchedulePool::TaskHolder; + + Records pollBatch(size_t batch_size_); + + void readNewRecords(Records & new_records, size_t batch_size_); + + ReadBufferPtr getNextRecord(); +}; +} diff --git a/src/Storages/FileLog/FileLogSettings.h b/src/Storages/FileLog/FileLogSettings.h index 4f3157424db..9e64b80f57a 100644 --- a/src/Storages/FileLog/FileLogSettings.h +++ b/src/Storages/FileLog/FileLogSettings.h @@ -17,7 +17,8 @@ class ASTStorage; M(UInt64, max_threads, 0, "Number of max threads to parse files, default is 0, which means the number will be max(1, physical_cpu_cores / 4)", 0) \ M(Milliseconds, poll_directory_watch_events_backoff_init, 500, "The initial sleep value for watch directory thread.", 0) \ M(Milliseconds, poll_directory_watch_events_backoff_max, 32000, "The max sleep value for watch directory thread.", 0) \ - M(UInt64, poll_directory_watch_events_backoff_factor, 2, "The speed of backoff, exponential by default", 0) + M(UInt64, poll_directory_watch_events_backoff_factor, 2, "The speed of backoff, exponential by default", 0) \ + M(StreamingHandleErrorMode, handle_error_mode, StreamingHandleErrorMode::DEFAULT, "How to handle errors for FileLog engine. Possible values: default (throw an exception after nats_skip_broken_messages broken messages), stream (save broken messages and errors in virtual columns _raw_message, _error).", 0) \ #define LIST_OF_FILELOG_SETTINGS(M, ALIAS) \ FILELOG_RELATED_SETTINGS(M, ALIAS) \ diff --git a/src/Storages/FileLog/FileLogSource.cpp b/src/Storages/FileLog/FileLogSource.cpp index be97cd58de9..b1192af4ced 100644 --- a/src/Storages/FileLog/FileLogSource.cpp +++ b/src/Storages/FileLog/FileLogSource.cpp @@ -1,8 +1,8 @@ #include #include #include +#include #include -#include #include #include @@ -18,7 +18,8 @@ FileLogSource::FileLogSource( size_t max_block_size_, size_t poll_time_out_, size_t stream_number_, - size_t max_streams_number_) + size_t max_streams_number_, + StreamingHandleErrorMode handle_error_mode_) : ISource(storage_snapshot_->getSampleBlockForColumns(columns)) , storage(storage_) , storage_snapshot(storage_snapshot_) @@ -28,10 +29,11 @@ FileLogSource::FileLogSource( , poll_time_out(poll_time_out_) , stream_number(stream_number_) , max_streams_number(max_streams_number_) + , handle_error_mode(handle_error_mode_) , non_virtual_header(storage_snapshot->metadata->getSampleBlockNonMaterialized()) - , virtual_header(storage_snapshot->getSampleBlockForColumns(storage.getVirtualColumnNames())) + , virtual_header(storage_snapshot->getSampleBlockForColumns(storage.getVirtuals().getNames())) { - buffer = std::make_unique(storage, max_block_size, poll_time_out, context, stream_number_, max_streams_number_); + consumer = std::make_unique(storage, max_block_size, poll_time_out, context, stream_number_, max_streams_number_); const auto & file_infos = storage.getFileInfos(); @@ -67,7 +69,7 @@ Chunk FileLogSource::generate() /// Store metas of last written chunk into disk storage.storeMetas(start, end); - if (!buffer || buffer->noRecords()) + if (!consumer || consumer->noRecords()) { /// There is no onFinish for ISource, we call it /// when no records return to close files @@ -77,29 +79,72 @@ Chunk FileLogSource::generate() MutableColumns virtual_columns = virtual_header.cloneEmptyColumns(); + EmptyReadBuffer empty_buf; auto input_format = FormatFactory::instance().getInput( - storage.getFormatName(), *buffer, non_virtual_header, context, max_block_size, std::nullopt, 1); - - StreamingFormatExecutor executor(non_virtual_header, input_format); + storage.getFormatName(), empty_buf, non_virtual_header, context, max_block_size, std::nullopt, 1); + std::optional exception_message; size_t total_rows = 0; + + auto on_error = [&](const MutableColumns & result_columns, Exception & e) + { + if (handle_error_mode == StreamingHandleErrorMode::STREAM) + { + exception_message = e.message(); + for (const auto & column : result_columns) + { + // We could already push some rows to result_columns + // before exception, we need to fix it. + auto cur_rows = column->size(); + if (cur_rows > total_rows) + column->popBack(cur_rows - total_rows); + + // All data columns will get default value in case of error. + column->insertDefault(); + } + + return 1; + } + else + { + throw std::move(e); + } + }; + + StreamingFormatExecutor executor(non_virtual_header, input_format, on_error); + size_t failed_poll_attempts = 0; Stopwatch watch; while (true) { + exception_message.reset(); size_t new_rows = 0; - if (buffer->poll()) - new_rows = executor.execute(); + if (auto buf = consumer->consume()) + new_rows = executor.execute(*buf); if (new_rows) { - auto file_name = buffer->getFileName(); - auto offset = buffer->getOffset(); + auto file_name = consumer->getFileName(); + auto offset = consumer->getOffset(); for (size_t i = 0; i < new_rows; ++i) { virtual_columns[0]->insert(file_name); virtual_columns[1]->insert(offset); + if (handle_error_mode == StreamingHandleErrorMode::STREAM) + { + if (exception_message) + { + const auto & current_record = consumer->getCurrentRecord(); + virtual_columns[2]->insertData(current_record.data(), current_record.size()); + virtual_columns[3]->insertData(exception_message->data(), exception_message->size()); + } + else + { + virtual_columns[2]->insertDefault(); + virtual_columns[3]->insertDefault(); + } + } } total_rows = total_rows + new_rows; } @@ -108,7 +153,7 @@ Chunk FileLogSource::generate() ++failed_poll_attempts; } - if (!buffer->hasMorePolledRecords() + if (!consumer->hasMorePolledRecords() && ((total_rows >= max_block_size) || watch.elapsedMilliseconds() > poll_time_out || failed_poll_attempts >= MAX_FAILED_POLL_ATTEMPTS)) { diff --git a/src/Storages/FileLog/FileLogSource.h b/src/Storages/FileLog/FileLogSource.h index 51d69d23b57..281b109ff4b 100644 --- a/src/Storages/FileLog/FileLogSource.h +++ b/src/Storages/FileLog/FileLogSource.h @@ -1,7 +1,7 @@ #pragma once #include -#include +#include #include namespace Poco @@ -21,11 +21,12 @@ public: size_t max_block_size_, size_t poll_time_out_, size_t stream_number_, - size_t max_streams_number_); + size_t max_streams_number_, + StreamingHandleErrorMode handle_error_mode_); String getName() const override { return "FileLog"; } - bool noRecords() { return !buffer || buffer->noRecords(); } + bool noRecords() { return !consumer || consumer->noRecords(); } void onFinish(); @@ -45,8 +46,9 @@ private: size_t stream_number; size_t max_streams_number; + StreamingHandleErrorMode handle_error_mode; - std::unique_ptr buffer; + std::unique_ptr consumer; Block non_virtual_header; Block virtual_header; diff --git a/src/Storages/FileLog/ReadBufferFromFileLog.cpp b/src/Storages/FileLog/ReadBufferFromFileLog.cpp index 23db01eaefc..f2eee8abaee 100644 --- a/src/Storages/FileLog/ReadBufferFromFileLog.cpp +++ b/src/Storages/FileLog/ReadBufferFromFileLog.cpp @@ -166,6 +166,7 @@ bool ReadBufferFromFileLog::nextImpl() current_file = current->file_name; current_offset = current->offset; + current_record = current->data; ++current; diff --git a/src/Storages/FileLog/ReadBufferFromFileLog.h b/src/Storages/FileLog/ReadBufferFromFileLog.h index 5991fe29b70..6dc4cdcfbd8 100644 --- a/src/Storages/FileLog/ReadBufferFromFileLog.h +++ b/src/Storages/FileLog/ReadBufferFromFileLog.h @@ -32,6 +32,7 @@ public: auto getFileName() const { return current_file; } auto getOffset() const { return current_offset; } + const String & getCurrentRecord() const { return current_record; } private: enum class BufferStatus @@ -74,6 +75,7 @@ private: String current_file; UInt64 current_offset = 0; + String current_record; using TaskThread = BackgroundSchedulePool::TaskHolder; diff --git a/src/Storages/FileLog/StorageFileLog.cpp b/src/Storages/FileLog/StorageFileLog.cpp index 5faccefd836..de41ede8a5c 100644 --- a/src/Storages/FileLog/StorageFileLog.cpp +++ b/src/Storages/FileLog/StorageFileLog.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include @@ -346,7 +347,8 @@ Pipe StorageFileLog::read( getMaxBlockSize(), getPollTimeoutMillisecond(), stream_number, - max_streams_number)); + max_streams_number, + filelog_settings->handle_error_mode)); } return Pipe::unitePipes(std::move(pipes)); @@ -708,7 +710,8 @@ bool StorageFileLog::streamToViews() getPollMaxBatchSize(), getPollTimeoutMillisecond(), stream_number, - max_streams_number)); + max_streams_number, + filelog_settings->handle_error_mode)); } auto input= Pipe::unitePipes(std::move(pipes)); @@ -978,13 +981,17 @@ bool StorageFileLog::updateFileInfos() NamesAndTypesList StorageFileLog::getVirtuals() const { - return NamesAndTypesList{ + auto virtuals = NamesAndTypesList{ {"_filename", std::make_shared(std::make_shared())}, {"_offset", std::make_shared()}}; + + if (filelog_settings->handle_error_mode == StreamingHandleErrorMode::STREAM) + { + virtuals.push_back({"_raw_record", std::make_shared(std::make_shared())}); + virtuals.push_back({"_error", std::make_shared(std::make_shared())}); + } + + return virtuals; } -Names StorageFileLog::getVirtualColumnNames() -{ - return {"_filename", "_offset"}; -} } diff --git a/src/Storages/FileLog/StorageFileLog.h b/src/Storages/FileLog/StorageFileLog.h index c0c5ac904b5..0fd62a22a18 100644 --- a/src/Storages/FileLog/StorageFileLog.h +++ b/src/Storages/FileLog/StorageFileLog.h @@ -103,8 +103,6 @@ public: NamesAndTypesList getVirtuals() const override; - static Names getVirtualColumnNames(); - static UInt64 getInode(const String & file_name); void openFilesAndSetPos(); diff --git a/src/Storages/Kafka/KafkaConsumer.h b/src/Storages/Kafka/KafkaConsumer.h index 1c3ddd85873..f9758ff6c90 100644 --- a/src/Storages/Kafka/KafkaConsumer.h +++ b/src/Storages/Kafka/KafkaConsumer.h @@ -104,7 +104,7 @@ public: auto currentPartition() const { return current[-1].get_partition(); } auto currentTimestamp() const { return current[-1].get_timestamp(); } const auto & currentHeaderList() const { return current[-1].get_header_list(); } - String currentPayload() const { return current[-1].get_payload(); } + const cppkafka::Buffer & currentPayload() const { return current[-1].get_payload(); } void setExceptionInfo(const cppkafka::Error & err, bool with_stacktrace = true); void setExceptionInfo(const std::string & text, bool with_stacktrace = true); void setRDKafkaStat(const std::string & stat_json_string) diff --git a/src/Storages/Kafka/KafkaSettings.h b/src/Storages/Kafka/KafkaSettings.h index cfe9f16c019..075e79c96f0 100644 --- a/src/Storages/Kafka/KafkaSettings.h +++ b/src/Storages/Kafka/KafkaSettings.h @@ -15,7 +15,6 @@ class ASTStorage; M(String, kafka_group_name, "", "Client group id string. All Kafka consumers sharing the same group.id belong to the same group.", 0) \ /* those are mapped to format factory settings */ \ M(String, kafka_format, "", "The message format for Kafka engine.", 0) \ - M(Char, kafka_row_delimiter, '\0', "The character to be considered as a delimiter in Kafka message.", 0) \ M(String, kafka_schema, "", "Schema identifier (used by schema-based formats) for Kafka engine", 0) \ M(UInt64, kafka_num_consumers, 1, "The number of consumers per table for Kafka engine.", 0) \ /* default is = max_insert_block_size / kafka_num_consumers */ \ @@ -29,17 +28,21 @@ class ASTStorage; /* default is stream_flush_interval_ms */ \ M(Milliseconds, kafka_flush_interval_ms, 0, "Timeout for flushing data from Kafka.", 0) \ M(Bool, kafka_thread_per_consumer, false, "Provide independent thread for each consumer", 0) \ - M(HandleKafkaErrorMode, kafka_handle_error_mode, HandleKafkaErrorMode::DEFAULT, "How to handle errors for Kafka engine. Possible values: default, stream.", 0) \ + M(StreamingHandleErrorMode, kafka_handle_error_mode, StreamingHandleErrorMode::DEFAULT, "How to handle errors for Kafka engine. Possible values: default (throw an exception after rabbitmq_skip_broken_messages broken messages), stream (save broken messages and errors in virtual columns _raw_message, _error).", 0) \ M(Bool, kafka_commit_on_select, false, "Commit messages when select query is made", 0) \ M(UInt64, kafka_max_rows_per_message, 1, "The maximum number of rows produced in one kafka message for row-based formats.", 0) \ +#define OBSOLETE_KAFKA_SETTINGS(M, ALIAS) \ + MAKE_OBSOLETE(M, Char, kafka_row_delimiter, '\0') \ + /** TODO: */ /* https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md */ /* https://github.com/edenhill/librdkafka/blob/v1.4.2/src/rdkafka_conf.c */ -#define LIST_OF_KAFKA_SETTINGS(M, ALIAS) \ - KAFKA_RELATED_SETTINGS(M, ALIAS) \ - LIST_OF_ALL_FORMAT_SETTINGS(M, ALIAS) +#define LIST_OF_KAFKA_SETTINGS(M, ALIAS) \ + KAFKA_RELATED_SETTINGS(M, ALIAS) \ + OBSOLETE_KAFKA_SETTINGS(M, ALIAS) \ + LIST_OF_ALL_FORMAT_SETTINGS(M, ALIAS) \ DECLARE_SETTINGS_TRAITS(KafkaSettingsTraits, LIST_OF_KAFKA_SETTINGS) diff --git a/src/Storages/Kafka/KafkaSource.cpp b/src/Storages/Kafka/KafkaSource.cpp index cd83a6a1422..1fbd7e2d705 100644 --- a/src/Storages/Kafka/KafkaSource.cpp +++ b/src/Storages/Kafka/KafkaSource.cpp @@ -46,7 +46,7 @@ KafkaSource::KafkaSource( , commit_in_suffix(commit_in_suffix_) , non_virtual_header(storage_snapshot->metadata->getSampleBlockNonMaterialized()) , virtual_header(storage_snapshot->getSampleBlockForColumns(storage.getVirtualColumnNames())) - , handle_error_mode(storage.getHandleKafkaErrorMode()) + , handle_error_mode(storage.getStreamingHandleErrorMode()) { } @@ -98,7 +98,7 @@ Chunk KafkaSource::generateImpl() // otherwise external iteration will reuse that and logic will became even more fuzzy MutableColumns virtual_columns = virtual_header.cloneEmptyColumns(); - auto put_error_to_stream = handle_error_mode == HandleKafkaErrorMode::STREAM; + auto put_error_to_stream = handle_error_mode == StreamingHandleErrorMode::STREAM; EmptyReadBuffer empty_buf; auto input_format = FormatFactory::instance().getInput( @@ -207,9 +207,9 @@ Chunk KafkaSource::generateImpl() { if (exception_message) { - auto payload = consumer->currentPayload(); - virtual_columns[8]->insert(payload); - virtual_columns[9]->insert(*exception_message); + const auto & payload = consumer->currentPayload(); + virtual_columns[8]->insertData(reinterpret_cast(payload.get_data()), payload.get_size()); + virtual_columns[9]->insertData(exception_message->data(), exception_message->size()); } else { diff --git a/src/Storages/Kafka/KafkaSource.h b/src/Storages/Kafka/KafkaSource.h index 3d2edd4ebd1..485a8e55b6a 100644 --- a/src/Storages/Kafka/KafkaSource.h +++ b/src/Storages/Kafka/KafkaSource.h @@ -51,7 +51,7 @@ private: const Block non_virtual_header; const Block virtual_header; - const HandleKafkaErrorMode handle_error_mode; + const StreamingHandleErrorMode handle_error_mode; Poco::Timespan max_execution_time = 0; Stopwatch total_stopwatch {CLOCK_MONOTONIC_COARSE}; diff --git a/src/Storages/Kafka/StorageKafka.cpp b/src/Storages/Kafka/StorageKafka.cpp index 43a3bedfb74..6c48161a273 100644 --- a/src/Storages/Kafka/StorageKafka.cpp +++ b/src/Storages/Kafka/StorageKafka.cpp @@ -268,7 +268,7 @@ StorageKafka::StorageKafka( , thread_per_consumer(kafka_settings->kafka_thread_per_consumer.value) , collection_name(collection_name_) { - if (kafka_settings->kafka_handle_error_mode == HandleKafkaErrorMode::STREAM) + if (kafka_settings->kafka_handle_error_mode == StreamingHandleErrorMode::STREAM) { kafka_settings->input_format_allow_errors_num = 0; kafka_settings->input_format_allow_errors_ratio = 0; @@ -1056,7 +1056,7 @@ NamesAndTypesList StorageKafka::getVirtuals() const {"_timestamp_ms", std::make_shared(std::make_shared(3))}, {"_headers.name", std::make_shared(std::make_shared())}, {"_headers.value", std::make_shared(std::make_shared())}}; - if (kafka_settings->kafka_handle_error_mode == HandleKafkaErrorMode::STREAM) + if (kafka_settings->kafka_handle_error_mode == StreamingHandleErrorMode::STREAM) { result.push_back({"_raw_message", std::make_shared()}); result.push_back({"_error", std::make_shared()}); @@ -1076,7 +1076,7 @@ Names StorageKafka::getVirtualColumnNames() const "_headers.name", "_headers.value", }; - if (kafka_settings->kafka_handle_error_mode == HandleKafkaErrorMode::STREAM) + if (kafka_settings->kafka_handle_error_mode == StreamingHandleErrorMode::STREAM) { result.push_back({"_raw_message"}); result.push_back({"_error"}); diff --git a/src/Storages/Kafka/StorageKafka.h b/src/Storages/Kafka/StorageKafka.h index 77e1370c2b7..e08baf9fc80 100644 --- a/src/Storages/Kafka/StorageKafka.h +++ b/src/Storages/Kafka/StorageKafka.h @@ -78,7 +78,7 @@ public: NamesAndTypesList getVirtuals() const override; Names getVirtualColumnNames() const; - HandleKafkaErrorMode getHandleKafkaErrorMode() const { return kafka_settings->kafka_handle_error_mode; } + StreamingHandleErrorMode getStreamingHandleErrorMode() const { return kafka_settings->kafka_handle_error_mode; } struct SafeConsumers { diff --git a/src/Storages/NATS/NATSConsumer.h b/src/Storages/NATS/NATSConsumer.h index a6f950329aa..a5470433303 100644 --- a/src/Storages/NATS/NATSConsumer.h +++ b/src/Storages/NATS/NATSConsumer.h @@ -45,6 +45,7 @@ public: size_t queueSize() { return received.size(); } auto getSubject() const { return current.subject; } + const String & getCurrentMessage() const { return current.message; } /// Return read buffer containing next available message /// or nullptr if there are no messages to process. diff --git a/src/Storages/NATS/NATSSettings.h b/src/Storages/NATS/NATSSettings.h index 2482f811e50..3e3ed739d82 100644 --- a/src/Storages/NATS/NATSSettings.h +++ b/src/Storages/NATS/NATSSettings.h @@ -2,6 +2,7 @@ #include #include +#include namespace DB { @@ -11,7 +12,6 @@ class ASTStorage; M(String, nats_url, "", "A host-port to connect to NATS server.", 0) \ M(String, nats_subjects, "", "List of subject for NATS table to subscribe/publish to.", 0) \ M(String, nats_format, "", "The message format.", 0) \ - M(Char, nats_row_delimiter, '\0', "The character to be considered as a delimiter.", 0) \ M(String, nats_schema, "", "Schema identifier (used by schema-based formats) for NATS engine", 0) \ M(UInt64, nats_num_consumers, 1, "The number of consumer channels per table.", 0) \ M(String, nats_queue_group, "", "Name for queue group of NATS subscribers.", 0) \ @@ -27,10 +27,15 @@ class ASTStorage; M(String, nats_token, "", "NATS token", 0) \ M(UInt64, nats_startup_connect_tries, 5, "Number of connect tries at startup", 0) \ M(UInt64, nats_max_rows_per_message, 1, "The maximum number of rows produced in one message for row-based formats.", 0) \ + M(StreamingHandleErrorMode, nats_handle_error_mode, StreamingHandleErrorMode::DEFAULT, "How to handle errors for NATS engine. Possible values: default (throw an exception after nats_skip_broken_messages broken messages), stream (save broken messages and errors in virtual columns _raw_message, _error).", 0) \ -#define LIST_OF_NATS_SETTINGS(M, ALIAS) \ - NATS_RELATED_SETTINGS(M, ALIAS) \ - LIST_OF_ALL_FORMAT_SETTINGS(M, ALIAS) +#define OBSOLETE_NATS_SETTINGS(M, ALIAS) \ + MAKE_OBSOLETE(M, Char, nats_row_delimiter, '\0') \ + +#define LIST_OF_NATS_SETTINGS(M, ALIAS) \ + NATS_RELATED_SETTINGS(M, ALIAS) \ + OBSOLETE_NATS_SETTINGS(M, ALIAS) \ + LIST_OF_ALL_FORMAT_SETTINGS(M, ALIAS) \ DECLARE_SETTINGS_TRAITS(NATSSettingsTraits, LIST_OF_NATS_SETTINGS) diff --git a/src/Storages/NATS/NATSSource.cpp b/src/Storages/NATS/NATSSource.cpp index 89af173533d..3fc01eacb22 100644 --- a/src/Storages/NATS/NATSSource.cpp +++ b/src/Storages/NATS/NATSSource.cpp @@ -9,10 +9,10 @@ namespace DB { -static std::pair getHeaders(const StorageSnapshotPtr & storage_snapshot) +static std::pair getHeaders(StorageNATS & storage, const StorageSnapshotPtr & storage_snapshot) { auto non_virtual_header = storage_snapshot->metadata->getSampleBlockNonMaterialized(); - auto virtual_header = storage_snapshot->getSampleBlockForColumns({"_subject"}); + auto virtual_header = storage_snapshot->getSampleBlockForColumns(storage.getVirtuals().getNames()); return {non_virtual_header, virtual_header}; } @@ -31,8 +31,9 @@ NATSSource::NATSSource( const StorageSnapshotPtr & storage_snapshot_, ContextPtr context_, const Names & columns, - size_t max_block_size_) - : NATSSource(storage_, storage_snapshot_, getHeaders(storage_snapshot_), context_, columns, max_block_size_) + size_t max_block_size_, + StreamingHandleErrorMode handle_error_mode_) + : NATSSource(storage_, storage_snapshot_, getHeaders(storage_, storage_snapshot_), context_, columns, max_block_size_, handle_error_mode_) { } @@ -42,13 +43,15 @@ NATSSource::NATSSource( std::pair headers, ContextPtr context_, const Names & columns, - size_t max_block_size_) + size_t max_block_size_, + StreamingHandleErrorMode handle_error_mode_) : ISource(getSampleBlock(headers.first, headers.second)) , storage(storage_) , storage_snapshot(storage_snapshot_) , context(context_) , column_names(columns) , max_block_size(max_block_size_) + , handle_error_mode(handle_error_mode_) , non_virtual_header(std::move(headers.first)) , virtual_header(std::move(headers.second)) { @@ -97,16 +100,41 @@ Chunk NATSSource::generate() EmptyReadBuffer empty_buf; auto input_format = FormatFactory::instance().getInput( storage.getFormatName(), empty_buf, non_virtual_header, context, max_block_size, std::nullopt, 1); - - StreamingFormatExecutor executor(non_virtual_header, input_format); - + std::optional exception_message; size_t total_rows = 0; + auto on_error = [&](const MutableColumns & result_columns, Exception & e) + { + if (handle_error_mode == StreamingHandleErrorMode::STREAM) + { + exception_message = e.message(); + for (const auto & column : result_columns) + { + // We could already push some rows to result_columns + // before exception, we need to fix it. + auto cur_rows = column->size(); + if (cur_rows > total_rows) + column->popBack(cur_rows - total_rows); + + // All data columns will get default value in case of error. + column->insertDefault(); + } + + return 1; + } + else + { + throw std::move(e); + } + }; + + StreamingFormatExecutor executor(non_virtual_header, input_format, on_error); while (true) { if (consumer->queueEmpty()) break; + exception_message.reset(); size_t new_rows = 0; if (auto buf = consumer->consume()) new_rows = executor.execute(*buf); @@ -115,6 +143,21 @@ Chunk NATSSource::generate() { auto subject = consumer->getSubject(); virtual_columns[0]->insertMany(subject, new_rows); + if (handle_error_mode == StreamingHandleErrorMode::STREAM) + { + if (exception_message) + { + const auto & current_message = consumer->getCurrentMessage(); + virtual_columns[1]->insertData(current_message.data(), current_message.size()); + virtual_columns[2]->insertData(exception_message->data(), exception_message->size()); + + } + else + { + virtual_columns[1]->insertDefault(); + virtual_columns[2]->insertDefault(); + } + } total_rows = total_rows + new_rows; } diff --git a/src/Storages/NATS/NATSSource.h b/src/Storages/NATS/NATSSource.h index 604a8a5366f..91532442d36 100644 --- a/src/Storages/NATS/NATSSource.h +++ b/src/Storages/NATS/NATSSource.h @@ -16,7 +16,8 @@ public: const StorageSnapshotPtr & storage_snapshot_, ContextPtr context_, const Names & columns, - size_t max_block_size_); + size_t max_block_size_, + StreamingHandleErrorMode handle_error_mode_); ~NATSSource() override; @@ -37,6 +38,7 @@ private: ContextPtr context; Names column_names; const size_t max_block_size; + StreamingHandleErrorMode handle_error_mode; bool is_finished = false; const Block non_virtual_header; @@ -53,7 +55,8 @@ private: std::pair headers, ContextPtr context_, const Names & columns, - size_t max_block_size_); + size_t max_block_size_, + StreamingHandleErrorMode handle_error_mode_); }; } diff --git a/src/Storages/NATS/StorageNATS.cpp b/src/Storages/NATS/StorageNATS.cpp index a3478069356..c3fdcea4f9d 100644 --- a/src/Storages/NATS/StorageNATS.cpp +++ b/src/Storages/NATS/StorageNATS.cpp @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -156,7 +157,11 @@ ContextMutablePtr StorageNATS::addSettings(ContextPtr local_context) const auto modified_context = Context::createCopy(local_context); modified_context->setSetting("input_format_skip_unknown_fields", true); modified_context->setSetting("input_format_allow_errors_ratio", 0.); - modified_context->setSetting("input_format_allow_errors_num", nats_settings->nats_skip_broken_messages.value); + if (nats_settings->nats_handle_error_mode == StreamingHandleErrorMode::DEFAULT) + modified_context->setSetting("input_format_allow_errors_num", nats_settings->nats_skip_broken_messages.value); + else + modified_context->setSetting("input_format_allow_errors_num", Field{0}); + /// Since we are reusing the same context for all queries executed simultaneously, we don't want to used shared `analyze_count` modified_context->setSetting("max_analyze_depth", Field{0}); @@ -319,7 +324,7 @@ void StorageNATS::read( for (size_t i = 0; i < num_created_consumers; ++i) { - auto nats_source = std::make_shared(*this, storage_snapshot, modified_context, column_names, 1); + auto nats_source = std::make_shared(*this, storage_snapshot, modified_context, column_names, 1, nats_settings->nats_handle_error_mode); auto converting_dag = ActionsDAG::makeConvertingActions( nats_source->getPort().getHeader().getColumnsWithTypeAndName(), @@ -642,7 +647,7 @@ bool StorageNATS::streamToViews() for (size_t i = 0; i < num_created_consumers; ++i) { LOG_DEBUG(log, "Current queue size: {}", consumers[0]->queueSize()); - auto source = std::make_shared(*this, storage_snapshot, nats_context, column_names, block_size); + auto source = std::make_shared(*this, storage_snapshot, nats_context, column_names, block_size, nats_settings->nats_handle_error_mode); sources.emplace_back(source); pipes.emplace_back(source); @@ -743,9 +748,17 @@ void registerStorageNATS(StorageFactory & factory) NamesAndTypesList StorageNATS::getVirtuals() const { - return NamesAndTypesList{ + auto virtuals = NamesAndTypesList{ {"_subject", std::make_shared()} }; + + if (nats_settings->nats_handle_error_mode == StreamingHandleErrorMode::STREAM) + { + virtuals.push_back({"_raw_message", std::make_shared(std::make_shared())}); + virtuals.push_back({"_error", std::make_shared(std::make_shared())}); + } + + return virtuals; } } diff --git a/src/Storages/RabbitMQ/RabbitMQSettings.h b/src/Storages/RabbitMQ/RabbitMQSettings.h index 78cc8bf4df1..36e092925fd 100644 --- a/src/Storages/RabbitMQ/RabbitMQSettings.h +++ b/src/Storages/RabbitMQ/RabbitMQSettings.h @@ -14,7 +14,6 @@ namespace DB M(String, rabbitmq_format, "", "The message format.", 0) \ M(String, rabbitmq_exchange_type, "default", "The exchange type.", 0) \ M(String, rabbitmq_routing_key_list, "5672", "A string of routing keys, separated by dots.", 0) \ - M(Char, rabbitmq_row_delimiter, '\0', "The character to be considered as a delimiter.", 0) \ M(String, rabbitmq_schema, "", "Schema identifier (used by schema-based formats) for RabbitMQ engine", 0) \ M(UInt64, rabbitmq_num_consumers, 1, "The number of consumer channels per table.", 0) \ M(UInt64, rabbitmq_num_queues, 1, "The number of queues per consumer.", 0) \ @@ -35,10 +34,16 @@ namespace DB M(String, rabbitmq_password, "", "RabbitMQ password", 0) \ M(Bool, rabbitmq_commit_on_select, false, "Commit messages when select query is made", 0) \ M(UInt64, rabbitmq_max_rows_per_message, 1, "The maximum number of rows produced in one message for row-based formats.", 0) \ + M(StreamingHandleErrorMode, rabbitmq_handle_error_mode, StreamingHandleErrorMode::DEFAULT, "How to handle errors for RabbitMQ engine. Possible values: default (throw an exception after rabbitmq_skip_broken_messages broken messages), stream (save broken messages and errors in virtual columns _raw_message, _error).", 0) \ + +#define OBSOLETE_RABBITMQ_SETTINGS(M, ALIAS) \ + MAKE_OBSOLETE(M, Char, rabbitmq_row_delimiter, '\0') \ + #define LIST_OF_RABBITMQ_SETTINGS(M, ALIAS) \ - RABBITMQ_RELATED_SETTINGS(M, ALIAS) \ - LIST_OF_ALL_FORMAT_SETTINGS(M, ALIAS) + RABBITMQ_RELATED_SETTINGS(M, ALIAS) \ + OBSOLETE_RABBITMQ_SETTINGS(M, ALIAS) \ + LIST_OF_ALL_FORMAT_SETTINGS(M, ALIAS) \ DECLARE_SETTINGS_TRAITS(RabbitMQSettingsTraits, LIST_OF_RABBITMQ_SETTINGS) diff --git a/src/Storages/RabbitMQ/RabbitMQSource.cpp b/src/Storages/RabbitMQ/RabbitMQSource.cpp index 879c0e1e975..793064c10f8 100644 --- a/src/Storages/RabbitMQ/RabbitMQSource.cpp +++ b/src/Storages/RabbitMQ/RabbitMQSource.cpp @@ -11,11 +11,10 @@ namespace DB { -static std::pair getHeaders(const StorageSnapshotPtr & storage_snapshot) +static std::pair getHeaders(StorageRabbitMQ & storage_, const StorageSnapshotPtr & storage_snapshot) { auto non_virtual_header = storage_snapshot->metadata->getSampleBlockNonMaterialized(); - auto virtual_header = storage_snapshot->getSampleBlockForColumns( - {"_exchange_name", "_channel_id", "_delivery_tag", "_redelivered", "_message_id", "_timestamp"}); + auto virtual_header = storage_snapshot->getSampleBlockForColumns(storage_.getVirtuals().getNames()); return {non_virtual_header, virtual_header}; } @@ -36,15 +35,17 @@ RabbitMQSource::RabbitMQSource( const Names & columns, size_t max_block_size_, UInt64 max_execution_time_, + StreamingHandleErrorMode handle_error_mode_, bool ack_in_suffix_) : RabbitMQSource( storage_, storage_snapshot_, - getHeaders(storage_snapshot_), + getHeaders(storage_, storage_snapshot_), context_, columns, max_block_size_, max_execution_time_, + handle_error_mode_, ack_in_suffix_) { } @@ -57,6 +58,7 @@ RabbitMQSource::RabbitMQSource( const Names & columns, size_t max_block_size_, UInt64 max_execution_time_, + StreamingHandleErrorMode handle_error_mode_, bool ack_in_suffix_) : ISource(getSampleBlock(headers.first, headers.second)) , storage(storage_) @@ -64,6 +66,7 @@ RabbitMQSource::RabbitMQSource( , context(context_) , column_names(columns) , max_block_size(max_block_size_) + , handle_error_mode(handle_error_mode_) , ack_in_suffix(ack_in_suffix_) , non_virtual_header(std::move(headers.first)) , virtual_header(std::move(headers.second)) @@ -131,12 +134,40 @@ Chunk RabbitMQSource::generateImpl() auto input_format = FormatFactory::instance().getInput( storage.getFormatName(), empty_buf, non_virtual_header, context, max_block_size, std::nullopt, 1); - StreamingFormatExecutor executor(non_virtual_header, input_format); + std::optional exception_message; size_t total_rows = 0; + auto on_error = [&](const MutableColumns & result_columns, Exception & e) + { + if (handle_error_mode == StreamingHandleErrorMode::STREAM) + { + exception_message = e.message(); + for (const auto & column : result_columns) + { + // We could already push some rows to result_columns + // before exception, we need to fix it. + auto cur_rows = column->size(); + if (cur_rows > total_rows) + column->popBack(cur_rows - total_rows); + + // All data columns will get default value in case of error. + column->insertDefault(); + } + + return 1; + } + else + { + throw std::move(e); + } + }; + + StreamingFormatExecutor executor(non_virtual_header, input_format, on_error); + RabbitMQConsumer::CommitInfo current_commit_info; while (true) { + exception_message.reset(); size_t new_rows = 0; if (consumer->hasPendingMessages()) @@ -158,6 +189,19 @@ Chunk RabbitMQSource::generateImpl() virtual_columns[3]->insert(message.redelivered); virtual_columns[4]->insert(message.message_id); virtual_columns[5]->insert(message.timestamp); + if (handle_error_mode == StreamingHandleErrorMode::STREAM) + { + if (exception_message) + { + virtual_columns[6]->insertData(message.message.data(), message.message.size()); + virtual_columns[7]->insertData(exception_message->data(), exception_message->size()); + } + else + { + virtual_columns[6]->insertDefault(); + virtual_columns[7]->insertDefault(); + } + } } total_rows += new_rows; diff --git a/src/Storages/RabbitMQ/RabbitMQSource.h b/src/Storages/RabbitMQ/RabbitMQSource.h index 4a7f4578190..a25b3d50222 100644 --- a/src/Storages/RabbitMQ/RabbitMQSource.h +++ b/src/Storages/RabbitMQ/RabbitMQSource.h @@ -19,6 +19,7 @@ public: const Names & columns, size_t max_block_size_, UInt64 max_execution_time_, + StreamingHandleErrorMode handle_error_mode_, bool ack_in_suffix = false); ~RabbitMQSource() override; @@ -39,6 +40,7 @@ private: ContextPtr context; Names column_names; const size_t max_block_size; + StreamingHandleErrorMode handle_error_mode; bool ack_in_suffix; bool is_finished = false; @@ -61,6 +63,7 @@ private: const Names & columns, size_t max_block_size_, UInt64 max_execution_time_, + StreamingHandleErrorMode handle_error_mode_, bool ack_in_suffix); Chunk generateImpl(); diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp index ec552dd1032..493897e261f 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include @@ -240,7 +241,11 @@ ContextMutablePtr StorageRabbitMQ::addSettings(ContextPtr local_context) const auto modified_context = Context::createCopy(local_context); modified_context->setSetting("input_format_skip_unknown_fields", true); modified_context->setSetting("input_format_allow_errors_ratio", 0.); - modified_context->setSetting("input_format_allow_errors_num", rabbitmq_settings->rabbitmq_skip_broken_messages.value); + if (rabbitmq_settings->rabbitmq_handle_error_mode == StreamingHandleErrorMode::DEFAULT) + modified_context->setSetting("input_format_allow_errors_num", rabbitmq_settings->rabbitmq_skip_broken_messages.value); + else + modified_context->setSetting("input_format_allow_errors_num", Field(0)); + /// Since we are reusing the same context for all queries executed simultaneously, we don't want to used shared `analyze_count` modified_context->setSetting("max_analyze_depth", Field{0}); @@ -730,7 +735,7 @@ void StorageRabbitMQ::read( { auto rabbit_source = std::make_shared( *this, storage_snapshot, modified_context, column_names, 1, - max_execution_time_ms, rabbitmq_settings->rabbitmq_commit_on_select); + max_execution_time_ms, rabbitmq_settings->rabbitmq_handle_error_mode, rabbitmq_settings->rabbitmq_commit_on_select); auto converting_dag = ActionsDAG::makeConvertingActions( rabbit_source->getPort().getHeader().getColumnsWithTypeAndName(), @@ -1076,7 +1081,7 @@ bool StorageRabbitMQ::tryStreamToViews() for (size_t i = 0; i < num_created_consumers; ++i) { auto source = std::make_shared( - *this, storage_snapshot, rabbitmq_context, column_names, block_size, max_execution_time_ms, false); + *this, storage_snapshot, rabbitmq_context, column_names, block_size, max_execution_time_ms, rabbitmq_settings->rabbitmq_handle_error_mode, false); sources.emplace_back(source); pipes.emplace_back(source); @@ -1212,7 +1217,7 @@ void registerStorageRabbitMQ(StorageFactory & factory) NamesAndTypesList StorageRabbitMQ::getVirtuals() const { - return NamesAndTypesList{ + auto virtuals = NamesAndTypesList{ {"_exchange_name", std::make_shared()}, {"_channel_id", std::make_shared()}, {"_delivery_tag", std::make_shared()}, @@ -1220,6 +1225,14 @@ NamesAndTypesList StorageRabbitMQ::getVirtuals() const {"_message_id", std::make_shared()}, {"_timestamp", std::make_shared()} }; + + if (rabbitmq_settings->rabbitmq_handle_error_mode == StreamingHandleErrorMode::STREAM) + { + virtuals.push_back({"_raw_message", std::make_shared(std::make_shared())}); + virtuals.push_back({"_error", std::make_shared(std::make_shared())}); + } + + return virtuals; } } diff --git a/tests/integration/test_storage_rabbitmq/test.py b/tests/integration/test_storage_rabbitmq/test.py index 80d2050b394..aa9d8e6e8fe 100644 --- a/tests/integration/test_storage_rabbitmq/test.py +++ b/tests/integration/test_storage_rabbitmq/test.py @@ -3416,3 +3416,100 @@ def test_rabbitmq_flush_by_time(rabbitmq_cluster): ) assert int(result) == 3 + + +def test_rabbitmq_handle_error_mode_stream(rabbitmq_cluster): + instance.query( + """ + DROP TABLE IF EXISTS test.rabbitmq; + DROP TABLE IF EXISTS test.view; + DROP TABLE IF EXISTS test.data; + DROP TABLE IF EXISTS test.errors; + DROP TABLE IF EXISTS test.errors_view; + + CREATE TABLE test.rabbit (key UInt64, value UInt64) + ENGINE = RabbitMQ + SETTINGS rabbitmq_host_port = '{}:5672', + rabbitmq_exchange_name = 'select', + rabbitmq_commit_on_select = 1, + rabbitmq_format = 'JSONEachRow', + rabbitmq_row_delimiter = '\\n', + rabbitmq_handle_error_mode = 'stream'; + + + CREATE TABLE test.errors (error Nullable(String), broken_message Nullable(String)) + ENGINE = MergeTree() + ORDER BY tuple(); + + CREATE MATERIALIZED VIEW test.errors_view TO test.errors AS + SELECT _error as error, _raw_message as broken_message FROM test.rabbit where not isNull(_error); + + CREATE TABLE test.data (key UInt64, value UInt64) + ENGINE = MergeTree() + ORDER BY key; + + CREATE MATERIALIZED VIEW test.view TO test.data AS + SELECT key, value FROM test.rabbit; + """.format( + rabbitmq_cluster.rabbitmq_host + ) + ) + + credentials = pika.PlainCredentials("root", "clickhouse") + parameters = pika.ConnectionParameters( + rabbitmq_cluster.rabbitmq_ip, rabbitmq_cluster.rabbitmq_port, "/", credentials + ) + connection = pika.BlockingConnection(parameters) + channel = connection.channel() + + messages = [] + num_rows = 50 + for i in range(num_rows): + if i % 2 == 0: + messages.append(json.dumps({"key": i, "value": i})) + else: + messages.append("Broken message " + str(i)) + + for message in messages: + channel.basic_publish(exchange="select", routing_key="", body=message) + + connection.close() + # The order of messages in select * from test.rabbitmq is not guaranteed, so sleep to collect everything in one select + time.sleep(1) + + attempt = 0 + rows = 0 + while attempt < 100: + rows = int(instance.query("SELECT count() FROM test.data")) + if rows == num_rows: + break + attempt += 1 + + assert rows == num_rows + + result = instance.query("SELECT * FROM test.data ORDER by key") + expected = "0\t0\n" * (num_rows // 2) + for i in range(num_rows): + if i % 2 == 0: + expected += str(i) + "\t" + str(i) + "\n" + + assert result == expected + + attempt = 0 + errors_count = 0 + while attempt < 100: + errors_count = int(instance.query("SELECT count() FROM test.errors")) + if errors_count == num_rows: + break + attempt += 1 + + assert errors_count == num_rows / 2 + + broken_messages = instance.query("SELECT broken_message FROM test.errors order by broken_message") + expected = [] + for i in range(num_rows): + if i % 2 != 0: + expected.append("Broken message " + str(i) + "\n") + + expected = "".join(sorted(expected)) + assert broken_messages == expected diff --git a/tests/queries/0_stateless/02889_file_log_save_errors.reference b/tests/queries/0_stateless/02889_file_log_save_errors.reference new file mode 100644 index 00000000000..c4a7c1f0bda --- /dev/null +++ b/tests/queries/0_stateless/02889_file_log_save_errors.reference @@ -0,0 +1,20 @@ +Cannot parse input: expected \'{\' before: \'Error 0\' Error 0 a.jsonl +Cannot parse input: expected \'{\' before: \'Error 1\' Error 1 a.jsonl +Cannot parse input: expected \'{\' before: \'Error 2\' Error 2 a.jsonl +Cannot parse input: expected \'{\' before: \'Error 3\' Error 3 a.jsonl +Cannot parse input: expected \'{\' before: \'Error 4\' Error 4 a.jsonl +Cannot parse input: expected \'{\' before: \'Error 5\' Error 5 a.jsonl +Cannot parse input: expected \'{\' before: \'Error 6\' Error 6 a.jsonl +Cannot parse input: expected \'{\' before: \'Error 7\' Error 7 a.jsonl +Cannot parse input: expected \'{\' before: \'Error 8\' Error 8 a.jsonl +Cannot parse input: expected \'{\' before: \'Error 9\' Error 9 a.jsonl +Cannot parse input: expected \'{\' before: \'Error 10\' Error 10 b.jsonl +Cannot parse input: expected \'{\' before: \'Error 11\' Error 11 b.jsonl +Cannot parse input: expected \'{\' before: \'Error 12\' Error 12 b.jsonl +Cannot parse input: expected \'{\' before: \'Error 13\' Error 13 b.jsonl +Cannot parse input: expected \'{\' before: \'Error 14\' Error 14 b.jsonl +Cannot parse input: expected \'{\' before: \'Error 15\' Error 15 b.jsonl +Cannot parse input: expected \'{\' before: \'Error 16\' Error 16 b.jsonl +Cannot parse input: expected \'{\' before: \'Error 17\' Error 17 b.jsonl +Cannot parse input: expected \'{\' before: \'Error 18\' Error 18 b.jsonl +Cannot parse input: expected \'{\' before: \'Error 19\' Error 19 b.jsonl diff --git a/tests/queries/0_stateless/02889_file_log_save_errors.sh b/tests/queries/0_stateless/02889_file_log_save_errors.sh new file mode 100755 index 00000000000..62f876e13db --- /dev/null +++ b/tests/queries/0_stateless/02889_file_log_save_errors.sh @@ -0,0 +1,46 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CURDIR"/../shell_config.sh + +user_files_path=$(clickhouse-client --query "select _path,_file from file('nonexist.txt', 'CSV', 'val1 char')" 2>&1 | grep Exception | awk '{gsub("/nonexist.txt","",$9); print $9}') + +${CLICKHOUSE_CLIENT} --query "drop table if exists file_log;" +${CLICKHOUSE_CLIENT} --query "drop table if exists log_errors;" + +mkdir -p ${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/ +rm -rf ${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME:?}/* + +for i in {0..9} +do + echo "{\"key\" : $i, \"value\" : $i}" >> ${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/a.jsonl + echo "Error $i" >> ${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/a.jsonl +done + +for i in {10..19} +do + echo "{\"key\" : $i, \"value\" : $i}" >> ${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/b.jsonl + echo "Error $i" >> ${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/b.jsonl +done + +${CLICKHOUSE_CLIENT} --query "create table file_log(key UInt8, value UInt8) engine=FileLog('${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME}/', 'JSONEachRow') settings handle_error_mode='stream';" +${CLICKHOUSE_CLIENT} --query "create Materialized View log_errors engine=MergeTree order by tuple() as select _error as error, _raw_record as record, _filename as file from file_log where not isNull(_error);" + +function count() +{ + COUNT=$(${CLICKHOUSE_CLIENT} --query "select count() from log_errors;") + echo $COUNT +} + +while true; do + [[ $(count) == 20 ]] && break + sleep 1 +done + +${CLICKHOUSE_CLIENT} --query "select * from log_errors order by file, record;" +${CLICKHOUSE_CLIENT} --query "drop table file_log;" +${CLICKHOUSE_CLIENT} --query "drop table log_errors;" + +rm -rf ${user_files_path}/${CLICKHOUSE_TEST_UNIQUE_NAME:?} +