From 909d5ad2b5a82613fbe8c27b8d7188e3a5385097 Mon Sep 17 00:00:00 2001 From: Peng Jian Date: Thu, 18 Mar 2021 13:26:32 +0800 Subject: [PATCH 01/18] Handle errors for Kafka engine --- src/Common/ErrorCodes.cpp | 1 + src/Core/Settings.h | 1 + src/Core/SettingsEnums.cpp | 4 ++ src/Core/SettingsEnums.h | 10 ++++- src/Formats/FormatFactory.cpp | 5 ++- src/Formats/FormatFactory.h | 3 +- src/Processors/Formats/IRowInputFormat.h | 1 + src/Storages/Kafka/KafkaBlockInputStream.cpp | 42 ++++++++++++++++--- src/Storages/Kafka/KafkaBlockInputStream.h | 1 + .../Kafka/ReadBufferFromKafkaConsumer.h | 1 + src/Storages/Kafka/StorageKafka.cpp | 28 ++++++++++++- src/Storages/Kafka/StorageKafka.h | 4 ++ 12 files changed, 91 insertions(+), 10 deletions(-) diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index 918bc301754..4a664124b94 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -547,6 +547,7 @@ M(577, INVALID_SHARD_ID) \ M(578, INVALID_FORMAT_INSERT_QUERY_WITH_DATA) \ M(579, INCORRECT_PART_TYPE) \ + M(580, UNKNOWN_HANDLE_KAFKA_ERROR_MODE) \ \ M(998, POSTGRESQL_CONNECTION_FAILURE) \ M(999, KEEPER_EXCEPTION) \ diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 0b47cf1f2f7..140f34becc0 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -454,6 +454,7 @@ class IColumn; M(Bool, optimize_group_by_function_keys, true, "Eliminates functions of other keys in GROUP BY section", 0) \ M(UInt64, query_plan_max_optimizations_to_apply, 10000, "Limit the total number of optimizations applied to query plan. If zero, ignored. If limit reached, throw exception", 0) \ M(Bool, database_replicated_ddl_output, true, "Obsolete setting, does nothing. Will be removed after 2021-09-08", 0) \ + M(HandleKafkaErrorMode, handle_kafka_error_mode, HandleKafkaErrorMode::DEFAULT, "How to handle errors for Kafka engine. Passible values: default, stream.", 0) \ // End of COMMON_SETTINGS // Please add settings related to formats into the FORMAT_FACTORY_SETTINGS below. diff --git a/src/Core/SettingsEnums.cpp b/src/Core/SettingsEnums.cpp index 64ba51d1c68..e91be3690da 100644 --- a/src/Core/SettingsEnums.cpp +++ b/src/Core/SettingsEnums.cpp @@ -13,6 +13,7 @@ namespace ErrorCodes extern const int BAD_ARGUMENTS; extern const int UNKNOWN_MYSQL_DATATYPES_SUPPORT_LEVEL; extern const int UNKNOWN_UNION; + extern const int UNKNOWN_HANDLE_KAFKA_ERROR_MODE; } @@ -108,4 +109,7 @@ IMPLEMENT_SETTING_ENUM(DistributedDDLOutputMode, ErrorCodes::BAD_ARGUMENTS, {"null_status_on_timeout", DistributedDDLOutputMode::NULL_STATUS_ON_TIMEOUT}, {"never_throw", DistributedDDLOutputMode::NEVER_THROW}}) +IMPLEMENT_SETTING_ENUM(HandleKafkaErrorMode, ErrorCodes::UNKNOWN_HANDLE_KAFKA_ERROR_MODE, + {{"default", HandleKafkaErrorMode::DEFAULT}, + {"stream", HandleKafkaErrorMode::STREAM}}) } diff --git a/src/Core/SettingsEnums.h b/src/Core/SettingsEnums.h index 7615b185a61..b44202b9446 100644 --- a/src/Core/SettingsEnums.h +++ b/src/Core/SettingsEnums.h @@ -138,7 +138,6 @@ enum class UnionMode DECLARE_SETTING_ENUM(UnionMode) - enum class DistributedDDLOutputMode { NONE, @@ -149,4 +148,13 @@ enum class DistributedDDLOutputMode DECLARE_SETTING_ENUM(DistributedDDLOutputMode) +enum class HandleKafkaErrorMode +{ + DEFAULT = 0, // Ignore errors whith threshold. + STREAM, // Put errors to stream in the virtual column named ``_error. + /*FIXED_SYSTEM_TABLE, Put errors to in a fixed system table likey system.kafka_errors. This is not implemented now. */ + /*CUSTOM_SYSTEM_TABLE, Put errors to in a custom system table. This is not implemented now. */ +}; + +DECLARE_SETTING_ENUM(HandleKafkaErrorMode) } diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp index f7f32cf9b6f..eecf97aa848 100644 --- a/src/Formats/FormatFactory.cpp +++ b/src/Formats/FormatFactory.cpp @@ -269,7 +269,8 @@ InputFormatPtr FormatFactory::getInputFormat( const Block & sample, const Context & context, UInt64 max_block_size, - const std::optional & _format_settings) const + const std::optional & _format_settings, + const bool sync_after_error) const { const auto & input_getter = getCreators(name).input_processor_creator; if (!input_getter) @@ -289,7 +290,7 @@ InputFormatPtr FormatFactory::getInputFormat( params.allow_errors_ratio = format_settings.input_allow_errors_ratio; params.max_execution_time = settings.max_execution_time; params.timeout_overflow_mode = settings.timeout_overflow_mode; - + params.sync_after_error = sync_after_error; auto format = input_getter(buf, sample, params, format_settings); /// It's a kludge. Because I cannot remove context from values format. diff --git a/src/Formats/FormatFactory.h b/src/Formats/FormatFactory.h index 4fa7e9a0c01..349120aaad4 100644 --- a/src/Formats/FormatFactory.h +++ b/src/Formats/FormatFactory.h @@ -133,7 +133,8 @@ public: const Block & sample, const Context & context, UInt64 max_block_size, - const std::optional & format_settings = std::nullopt) const; + const std::optional & format_settings = std::nullopt, + const bool sync_after_error = false) const; /// Checks all preconditions. Returns ordinary format if parallel formatting cannot be done. OutputFormatPtr getOutputFormatParallelIfPossible( diff --git a/src/Processors/Formats/IRowInputFormat.h b/src/Processors/Formats/IRowInputFormat.h index c802bd3066b..7061db75765 100644 --- a/src/Processors/Formats/IRowInputFormat.h +++ b/src/Processors/Formats/IRowInputFormat.h @@ -29,6 +29,7 @@ struct RowInputFormatParams Poco::Timespan max_execution_time = 0; OverflowMode timeout_overflow_mode = OverflowMode::THROW; + bool sync_after_error = false; }; bool isParseError(int code); diff --git a/src/Storages/Kafka/KafkaBlockInputStream.cpp b/src/Storages/Kafka/KafkaBlockInputStream.cpp index bf985902b4d..3d6de219034 100644 --- a/src/Storages/Kafka/KafkaBlockInputStream.cpp +++ b/src/Storages/Kafka/KafkaBlockInputStream.cpp @@ -35,8 +35,8 @@ KafkaBlockInputStream::KafkaBlockInputStream( , max_block_size(max_block_size_) , commit_in_suffix(commit_in_suffix_) , non_virtual_header(metadata_snapshot->getSampleBlockNonMaterialized()) - , virtual_header(metadata_snapshot->getSampleBlockForColumns( - {"_topic", "_key", "_offset", "_partition", "_timestamp", "_timestamp_ms", "_headers.name", "_headers.value"}, storage.getVirtuals(), storage.getStorageID())) + , virtual_header(metadata_snapshot->getSampleBlockForColumns(storage.getVirtualColumnNames(), storage.getVirtuals(), storage.getStorageID())) + , handle_error_mode(context_->getSettings().handle_kafka_error_mode) { } @@ -78,20 +78,23 @@ Block KafkaBlockInputStream::readImpl() // now it's one-time usage InputStream // one block of the needed size (or with desired flush timeout) is formed in one internal iteration // otherwise external iteration will reuse that and logic will became even more fuzzy - MutableColumns result_columns = non_virtual_header.cloneEmptyColumns(); MutableColumns virtual_columns = virtual_header.cloneEmptyColumns(); + auto put_error_to_stream = handle_error_mode == HandleKafkaErrorMode::STREAM; + auto input_format = FormatFactory::instance().getInputFormat( - storage.getFormatName(), *buffer, non_virtual_header, *context, max_block_size); + storage.getFormatName(), *buffer, non_virtual_header, *context, max_block_size, {}, put_error_to_stream); InputPort port(input_format->getPort().getHeader(), input_format.get()); connect(input_format->getPort(), port); port.setNeeded(); + std::string exception_message; auto read_kafka_message = [&] { size_t new_rows = 0; + exception_message.clear(); while (true) { @@ -100,7 +103,22 @@ Block KafkaBlockInputStream::readImpl() switch (status) { case IProcessor::Status::Ready: - input_format->work(); + try + { + input_format->work(); + } + catch (const Exception & e) + { + if (put_error_to_stream) + { + exception_message = e.message(); + new_rows++; + } + else + { + throw e; + } + } break; case IProcessor::Status::Finished: @@ -138,6 +156,11 @@ Block KafkaBlockInputStream::readImpl() { auto new_rows = buffer->poll() ? read_kafka_message() : 0; + if (!exception_message.empty()) + { + for (size_t i = 0; i < result_columns.size(); ++i) + result_columns[i]->insertDefault(); + } if (new_rows) { // In read_kafka_message(), ReadBufferFromKafkaConsumer::nextImpl() @@ -189,6 +212,15 @@ Block KafkaBlockInputStream::readImpl() } virtual_columns[6]->insert(headers_names); virtual_columns[7]->insert(headers_values); + if (put_error_to_stream) + { + auto payload = buffer->currentPayload(); + virtual_columns[8]->insert(payload); + if (exception_message.empty()) + virtual_columns[9]->insertDefault(); + else + virtual_columns[9]->insert(exception_message); + } } total_rows = total_rows + new_rows; diff --git a/src/Storages/Kafka/KafkaBlockInputStream.h b/src/Storages/Kafka/KafkaBlockInputStream.h index 517df6ecaf7..c77f17803e7 100644 --- a/src/Storages/Kafka/KafkaBlockInputStream.h +++ b/src/Storages/Kafka/KafkaBlockInputStream.h @@ -51,6 +51,7 @@ private: const Block non_virtual_header; const Block virtual_header; + const HandleKafkaErrorMode handle_error_mode; }; } diff --git a/src/Storages/Kafka/ReadBufferFromKafkaConsumer.h b/src/Storages/Kafka/ReadBufferFromKafkaConsumer.h index 1d889655941..49d3df0e180 100644 --- a/src/Storages/Kafka/ReadBufferFromKafkaConsumer.h +++ b/src/Storages/Kafka/ReadBufferFromKafkaConsumer.h @@ -63,6 +63,7 @@ public: auto currentPartition() const { return current[-1].get_partition(); } auto currentTimestamp() const { return current[-1].get_timestamp(); } const auto & currentHeaderList() const { return current[-1].get_header_list(); } + String currentPayload() const { return current[-1].get_payload(); } private: using Messages = std::vector; diff --git a/src/Storages/Kafka/StorageKafka.cpp b/src/Storages/Kafka/StorageKafka.cpp index 45e4ec538a1..027331a8330 100644 --- a/src/Storages/Kafka/StorageKafka.cpp +++ b/src/Storages/Kafka/StorageKafka.cpp @@ -760,7 +760,7 @@ void registerStorageKafka(StorageFactory & factory) NamesAndTypesList StorageKafka::getVirtuals() const { - return NamesAndTypesList{ + auto result = NamesAndTypesList{ {"_topic", std::make_shared()}, {"_key", std::make_shared()}, {"_offset", std::make_shared()}, @@ -770,6 +770,32 @@ NamesAndTypesList StorageKafka::getVirtuals() const {"_headers.name", std::make_shared(std::make_shared())}, {"_headers.value", std::make_shared(std::make_shared())} }; + if (handle_error_mode == HandleKafkaErrorMode::STREAM) + { + result.push_back({"_raw_message", std::make_shared()}); + result.push_back({"_error", std::make_shared()}); + } + return result; +} + +Names StorageKafka::getVirtualColumnNames() const +{ + auto result = Names { + "_topic", + "_key", + "_offset", + "_partition", + "_timestamp", + "_timestamp_ms", + "_headers.name", + "_headers.value", + }; + if (handle_error_mode == HandleKafkaErrorMode::STREAM) + { + result.push_back({"_raw_message"}); + result.push_back({"_error"}); + } + return result; } } diff --git a/src/Storages/Kafka/StorageKafka.h b/src/Storages/Kafka/StorageKafka.h index 53871990810..72690418c18 100644 --- a/src/Storages/Kafka/StorageKafka.h +++ b/src/Storages/Kafka/StorageKafka.h @@ -64,6 +64,7 @@ public: const auto & getFormatName() const { return format_name; } NamesAndTypesList getVirtuals() const override; + Names getVirtualColumnNames() const; protected: StorageKafka( const StorageID & table_id_, @@ -112,6 +113,9 @@ private: std::mutex thread_statuses_mutex; std::list> thread_statuses; + /// Handle error mode + HandleKafkaErrorMode handle_error_mode; + SettingsChanges createSettingsAdjustments(); ConsumerBufferPtr createReadBuffer(const size_t consumer_number); From 85cfb927e60eb4958287eccb89eae4987aa32f75 Mon Sep 17 00:00:00 2001 From: Peng Jian Date: Thu, 18 Mar 2021 16:54:39 +0800 Subject: [PATCH 02/18] move HandleKafkaErrorMode to KafkaSettings --- src/Common/ErrorCodes.cpp | 1 - src/Core/SettingsEnums.cpp | 3 ++- src/Storages/Kafka/KafkaBlockInputStream.cpp | 2 +- src/Storages/Kafka/KafkaSettings.h | 3 ++- src/Storages/Kafka/StorageKafka.cpp | 4 ++-- src/Storages/Kafka/StorageKafka.h | 1 + 6 files changed, 8 insertions(+), 6 deletions(-) diff --git a/src/Common/ErrorCodes.cpp b/src/Common/ErrorCodes.cpp index 4a664124b94..918bc301754 100644 --- a/src/Common/ErrorCodes.cpp +++ b/src/Common/ErrorCodes.cpp @@ -547,7 +547,6 @@ M(577, INVALID_SHARD_ID) \ M(578, INVALID_FORMAT_INSERT_QUERY_WITH_DATA) \ M(579, INCORRECT_PART_TYPE) \ - M(580, UNKNOWN_HANDLE_KAFKA_ERROR_MODE) \ \ M(998, POSTGRESQL_CONNECTION_FAILURE) \ M(999, KEEPER_EXCEPTION) \ diff --git a/src/Core/SettingsEnums.cpp b/src/Core/SettingsEnums.cpp index e91be3690da..7f2b2d6056a 100644 --- a/src/Core/SettingsEnums.cpp +++ b/src/Core/SettingsEnums.cpp @@ -13,7 +13,6 @@ namespace ErrorCodes extern const int BAD_ARGUMENTS; extern const int UNKNOWN_MYSQL_DATATYPES_SUPPORT_LEVEL; extern const int UNKNOWN_UNION; - extern const int UNKNOWN_HANDLE_KAFKA_ERROR_MODE; } @@ -110,6 +109,8 @@ IMPLEMENT_SETTING_ENUM(DistributedDDLOutputMode, ErrorCodes::BAD_ARGUMENTS, {"never_throw", DistributedDDLOutputMode::NEVER_THROW}}) IMPLEMENT_SETTING_ENUM(HandleKafkaErrorMode, ErrorCodes::UNKNOWN_HANDLE_KAFKA_ERROR_MODE, + +IMPLEMENT_SETTING_ENUM(HandleKafkaErrorMode, ErrorCodes::BAD_ARGUMENTS, {{"default", HandleKafkaErrorMode::DEFAULT}, {"stream", HandleKafkaErrorMode::STREAM}}) } diff --git a/src/Storages/Kafka/KafkaBlockInputStream.cpp b/src/Storages/Kafka/KafkaBlockInputStream.cpp index 3d6de219034..70b6d3680f4 100644 --- a/src/Storages/Kafka/KafkaBlockInputStream.cpp +++ b/src/Storages/Kafka/KafkaBlockInputStream.cpp @@ -36,7 +36,7 @@ KafkaBlockInputStream::KafkaBlockInputStream( , commit_in_suffix(commit_in_suffix_) , non_virtual_header(metadata_snapshot->getSampleBlockNonMaterialized()) , virtual_header(metadata_snapshot->getSampleBlockForColumns(storage.getVirtualColumnNames(), storage.getVirtuals(), storage.getStorageID())) - , handle_error_mode(context_->getSettings().handle_kafka_error_mode) + , handle_error_mode(storage.getHandleKafkaErrorMode()) { } diff --git a/src/Storages/Kafka/KafkaSettings.h b/src/Storages/Kafka/KafkaSettings.h index 1df10d16339..1010c486abb 100644 --- a/src/Storages/Kafka/KafkaSettings.h +++ b/src/Storages/Kafka/KafkaSettings.h @@ -29,7 +29,8 @@ class ASTStorage; M(Char, kafka_row_delimiter, '\0', "The character to be considered as a delimiter in Kafka message.", 0) \ M(String, kafka_schema, "", "Schema identifier (used by schema-based formats) for Kafka engine", 0) \ M(UInt64, kafka_skip_broken_messages, 0, "Skip at least this number of broken messages from Kafka topic per block", 0) \ - M(Bool, kafka_thread_per_consumer, false, "Provide independent thread for each consumer", 0) + M(Bool, kafka_thread_per_consumer, false, "Provide independent thread for each consumer", 0) \ + M(HandleKafkaErrorMode, kafka_handle_error_mode, HandleKafkaErrorMode::DEFAULT, "How to handle errors for Kafka engine. Passible values: default, stream.", 0) \ /** TODO: */ /* https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md */ diff --git a/src/Storages/Kafka/StorageKafka.cpp b/src/Storages/Kafka/StorageKafka.cpp index 027331a8330..a26604257fb 100644 --- a/src/Storages/Kafka/StorageKafka.cpp +++ b/src/Storages/Kafka/StorageKafka.cpp @@ -770,7 +770,7 @@ NamesAndTypesList StorageKafka::getVirtuals() const {"_headers.name", std::make_shared(std::make_shared())}, {"_headers.value", std::make_shared(std::make_shared())} }; - if (handle_error_mode == HandleKafkaErrorMode::STREAM) + if (kafka_settings->kafka_handle_error_mode == HandleKafkaErrorMode::STREAM) { result.push_back({"_raw_message", std::make_shared()}); result.push_back({"_error", std::make_shared()}); @@ -790,7 +790,7 @@ Names StorageKafka::getVirtualColumnNames() const "_headers.name", "_headers.value", }; - if (handle_error_mode == HandleKafkaErrorMode::STREAM) + if (kafka_settings->kafka_handle_error_mode == HandleKafkaErrorMode::STREAM) { result.push_back({"_raw_message"}); result.push_back({"_error"}); diff --git a/src/Storages/Kafka/StorageKafka.h b/src/Storages/Kafka/StorageKafka.h index 72690418c18..74526ed3c87 100644 --- a/src/Storages/Kafka/StorageKafka.h +++ b/src/Storages/Kafka/StorageKafka.h @@ -65,6 +65,7 @@ public: NamesAndTypesList getVirtuals() const override; Names getVirtualColumnNames() const; + HandleKafkaErrorMode getHandleKafkaErrorMode() const { return kafka_settings->kafka_handle_error_mode; } protected: StorageKafka( const StorageID & table_id_, From cad54d4e2cadaaf73f8816fa3073c8fc4597367a Mon Sep 17 00:00:00 2001 From: Peng Jian Date: Fri, 19 Mar 2021 11:49:30 +0800 Subject: [PATCH 03/18] reset the parser if exception catched. --- src/Storages/Kafka/KafkaBlockInputStream.cpp | 75 ++++++++++++-------- 1 file changed, 46 insertions(+), 29 deletions(-) diff --git a/src/Storages/Kafka/KafkaBlockInputStream.cpp b/src/Storages/Kafka/KafkaBlockInputStream.cpp index 70b6d3680f4..0041184c19d 100644 --- a/src/Storages/Kafka/KafkaBlockInputStream.cpp +++ b/src/Storages/Kafka/KafkaBlockInputStream.cpp @@ -90,12 +90,10 @@ Block KafkaBlockInputStream::readImpl() connect(input_format->getPort(), port); port.setNeeded(); - std::string exception_message; + std::optional exception_message; auto read_kafka_message = [&] { size_t new_rows = 0; - exception_message.clear(); - while (true) { auto status = input_format->prepare(); @@ -103,22 +101,7 @@ Block KafkaBlockInputStream::readImpl() switch (status) { case IProcessor::Status::Ready: - try - { - input_format->work(); - } - catch (const Exception & e) - { - if (put_error_to_stream) - { - exception_message = e.message(); - new_rows++; - } - else - { - throw e; - } - } + input_format->work(); break; case IProcessor::Status::Finished: @@ -154,13 +137,42 @@ Block KafkaBlockInputStream::readImpl() while (true) { - auto new_rows = buffer->poll() ? read_kafka_message() : 0; - - if (!exception_message.empty()) + size_t new_rows = 0; + exception_message.reset(); + if (buffer->poll()) { - for (size_t i = 0; i < result_columns.size(); ++i) - result_columns[i]->insertDefault(); + try + { + new_rows = read_kafka_message(); + } + catch (Exception & e) + { + if (put_error_to_stream) + { + input_format->resetParser(); + exception_message = e.message(); + for (size_t i = 0, s = result_columns.size(); i < s; ++i) + { + // read_kafka_message could already push some rows to result_columns + // before exception, we need to fix it. + auto cur_rows = result_columns[i]->size(); + if (cur_rows > total_rows) + { + result_columns[i]->popBack(cur_rows - total_rows); + } + // all data columns will get defaul value in case of error + result_columns[i]->insertDefault(); + } + new_rows = 1; + } + else + { + e.addMessage("while parsing Kafka message (topic: {}, partition: {}, offset: {})'", buffer->currentTopic(), buffer->currentPartition(), buffer->currentOffset()); + throw; + } + } } + if (new_rows) { // In read_kafka_message(), ReadBufferFromKafkaConsumer::nextImpl() @@ -214,12 +226,17 @@ Block KafkaBlockInputStream::readImpl() virtual_columns[7]->insert(headers_values); if (put_error_to_stream) { - auto payload = buffer->currentPayload(); - virtual_columns[8]->insert(payload); - if (exception_message.empty()) - virtual_columns[9]->insertDefault(); + if (exception_message) + { + auto payload = buffer->currentPayload(); + virtual_columns[8]->insert(payload); + virtual_columns[9]->insert(*exception_message); + } else - virtual_columns[9]->insert(exception_message); + { + virtual_columns[8]->insertDefault(); + virtual_columns[9]->insertDefault(); + } } } From 278d2d7df00ce7ab1f6f2ee07a30c30d8f56588c Mon Sep 17 00:00:00 2001 From: Peng Jian Date: Fri, 19 Mar 2021 16:32:20 +0800 Subject: [PATCH 04/18] fix typo --- src/Core/SettingsEnums.h | 2 +- src/Storages/Kafka/KafkaBlockInputStream.cpp | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Core/SettingsEnums.h b/src/Core/SettingsEnums.h index b44202b9446..f0dd10aacfb 100644 --- a/src/Core/SettingsEnums.h +++ b/src/Core/SettingsEnums.h @@ -150,7 +150,7 @@ DECLARE_SETTING_ENUM(DistributedDDLOutputMode) enum class HandleKafkaErrorMode { - DEFAULT = 0, // Ignore errors whith threshold. + DEFAULT = 0, // Ignore errors whit threshold. STREAM, // Put errors to stream in the virtual column named ``_error. /*FIXED_SYSTEM_TABLE, Put errors to in a fixed system table likey system.kafka_errors. This is not implemented now. */ /*CUSTOM_SYSTEM_TABLE, Put errors to in a custom system table. This is not implemented now. */ diff --git a/src/Storages/Kafka/KafkaBlockInputStream.cpp b/src/Storages/Kafka/KafkaBlockInputStream.cpp index 0041184c19d..c5b52a842f6 100644 --- a/src/Storages/Kafka/KafkaBlockInputStream.cpp +++ b/src/Storages/Kafka/KafkaBlockInputStream.cpp @@ -160,7 +160,7 @@ Block KafkaBlockInputStream::readImpl() { result_columns[i]->popBack(cur_rows - total_rows); } - // all data columns will get defaul value in case of error + // all data columns will get default value in case of error result_columns[i]->insertDefault(); } new_rows = 1; From ec54f2a3617e269ca781e446b5971dd9a1efcf62 Mon Sep 17 00:00:00 2001 From: Peng Jian Date: Fri, 19 Mar 2021 20:58:32 +0800 Subject: [PATCH 05/18] fix compilation error --- src/Storages/Kafka/KafkaBlockInputStream.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Storages/Kafka/KafkaBlockInputStream.cpp b/src/Storages/Kafka/KafkaBlockInputStream.cpp index c5b52a842f6..24a5f5e3923 100644 --- a/src/Storages/Kafka/KafkaBlockInputStream.cpp +++ b/src/Storages/Kafka/KafkaBlockInputStream.cpp @@ -151,17 +151,17 @@ Block KafkaBlockInputStream::readImpl() { input_format->resetParser(); exception_message = e.message(); - for (size_t i = 0, s = result_columns.size(); i < s; ++i) + for (auto & column : result_columns) { // read_kafka_message could already push some rows to result_columns // before exception, we need to fix it. - auto cur_rows = result_columns[i]->size(); + auto cur_rows = column->size(); if (cur_rows > total_rows) { - result_columns[i]->popBack(cur_rows - total_rows); + column->popBack(cur_rows - total_rows); } // all data columns will get default value in case of error - result_columns[i]->insertDefault(); + column->insertDefault(); } new_rows = 1; } From cf19ab76dc4c34adb2fb01288f34e93972da677e Mon Sep 17 00:00:00 2001 From: Peng Jian Date: Mon, 22 Mar 2021 19:42:25 +0800 Subject: [PATCH 06/18] skip to next one before throw parsing exception --- src/Processors/Formats/IRowInputFormat.cpp | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/Processors/Formats/IRowInputFormat.cpp b/src/Processors/Formats/IRowInputFormat.cpp index 75a9abf6845..a0f70304197 100644 --- a/src/Processors/Formats/IRowInputFormat.cpp +++ b/src/Processors/Formats/IRowInputFormat.cpp @@ -108,6 +108,12 @@ Chunk IRowInputFormat::generate() if (!isParseError(e.code())) throw; + if (params.sync_after_error) + { + syncAfterError(); + throw; + } + if (params.allow_errors_num == 0 && params.allow_errors_ratio == 0) throw; From 17a5fc6e0e1168c882e71db858e6986940dc23b4 Mon Sep 17 00:00:00 2001 From: Peng Jian Date: Mon, 22 Mar 2021 19:54:09 +0800 Subject: [PATCH 07/18] add test --- tests/integration/test_storage_kafka/test.py | 58 ++++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/tests/integration/test_storage_kafka/test.py b/tests/integration/test_storage_kafka/test.py index 9b2f54a49a0..72fc4f94cb9 100644 --- a/tests/integration/test_storage_kafka/test.py +++ b/tests/integration/test_storage_kafka/test.py @@ -2533,6 +2533,64 @@ def test_kafka_csv_with_thread_per_consumer(kafka_cluster): kafka_check_result(result, True) +def random_string(size=8): + return ''.join(random.choices(string.ascii_uppercase + string.digits, k=size)) + +@pytest.mark.timeout(180) +def test_kafka_engine_put_errors_to_stream(kafka_cluster): + instance.query(''' + DROP TABLE IF EXISTS test.kafka; + DROP TABLE IF EXISTS test.kafka_data; + DROP TABLE IF EXISTS test.kafka_errors; + CREATE TABLE test.kafka (i Int64, s String) + ENGINE = Kafka + SETTINGS kafka_broker_list = 'kafka1:19092', + kafka_topic_list = 'json', + kafka_group_name = 'json', + kafka_format = 'JSONEachRow', + kafka_max_block_size = 128, + kafka_handle_error_mode = 'stream'; + CREATE MATERIALIZED VIEW test.kafka_data (i Int64, s String) + ENGINE = MergeTree + ORDER BY i + AS SELECT i, s FROM test.kafka WHERE length(_error) == 0; + CREATE MATERIALIZED VIEW test.kafka_errors (topic String, partition Int64, offset Int64, raw String, error String) + ENGINE = MergeTree + ORDER BY (topic, offset) + AS SELECT + _topic AS topic, + _partition AS partition, + _offset AS offset, + _raw_message AS raw, + _error AS error + FROM test.kafka WHERE length(_error) > 0; + ''') + + messages = [] + for i in range(128): + if i % 2 == 0: + messages.append(json.dumps({'i': i, 's': random_string(8)})) + else: + # Unexpected json content for table test.kafka. + messages.append(json.dumps({'i': 'n_' + random_string(4), 's': random_string(8)})) + + kafka_produce('json', messages) + + while True: + total_rows = instance.query('SELECT count() FROM test.kafka_data', ignore_error=True) + if total_rows == '64\n': + break + + while True: + total_error_rows = instance.query('SELECT count() FROM test.kafka_errors', ignore_error=True) + if total_error_rows == '64\n': + break + + instance.query(''' + DROP TABLE test.kafka; + DROP TABLE test.kafka_data; + DROP TABLE test.kafka_errors; + ''') if __name__ == '__main__': cluster.start() From cc56302e8bb6c0ee0d9d48062a24c13530ca4b8e Mon Sep 17 00:00:00 2001 From: Peng Jian Date: Mon, 22 Mar 2021 20:21:21 +0800 Subject: [PATCH 08/18] fix test case --- tests/integration/test_storage_kafka/test.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/integration/test_storage_kafka/test.py b/tests/integration/test_storage_kafka/test.py index 72fc4f94cb9..7c69d865594 100644 --- a/tests/integration/test_storage_kafka/test.py +++ b/tests/integration/test_storage_kafka/test.py @@ -6,6 +6,7 @@ import subprocess import threading import time import io +import string import avro.schema import avro.io From 86a76bbfe42c28fbc538e48cca1308f272afea7d Mon Sep 17 00:00:00 2001 From: Peng Jian Date: Wed, 31 Mar 2021 17:38:46 +0800 Subject: [PATCH 09/18] fix compilation error --- src/Core/SettingsEnums.cpp | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/Core/SettingsEnums.cpp b/src/Core/SettingsEnums.cpp index 7f2b2d6056a..26c2bd9b6af 100644 --- a/src/Core/SettingsEnums.cpp +++ b/src/Core/SettingsEnums.cpp @@ -108,8 +108,6 @@ IMPLEMENT_SETTING_ENUM(DistributedDDLOutputMode, ErrorCodes::BAD_ARGUMENTS, {"null_status_on_timeout", DistributedDDLOutputMode::NULL_STATUS_ON_TIMEOUT}, {"never_throw", DistributedDDLOutputMode::NEVER_THROW}}) -IMPLEMENT_SETTING_ENUM(HandleKafkaErrorMode, ErrorCodes::UNKNOWN_HANDLE_KAFKA_ERROR_MODE, - IMPLEMENT_SETTING_ENUM(HandleKafkaErrorMode, ErrorCodes::BAD_ARGUMENTS, {{"default", HandleKafkaErrorMode::DEFAULT}, {"stream", HandleKafkaErrorMode::STREAM}}) From 36e4a8e2dd985da6d3dd4a284f8a4dc288c4d23e Mon Sep 17 00:00:00 2001 From: Peng Jian Date: Wed, 31 Mar 2021 19:17:35 +0800 Subject: [PATCH 10/18] add test --- .../01781_general_format_for_parser.reference | 4 ++++ .../0_stateless/01781_general_format_for_parser.sh | 13 +++++++++++++ 2 files changed, 17 insertions(+) create mode 100755 tests/queries/0_stateless/01781_general_format_for_parser.reference create mode 100755 tests/queries/0_stateless/01781_general_format_for_parser.sh diff --git a/tests/queries/0_stateless/01781_general_format_for_parser.reference b/tests/queries/0_stateless/01781_general_format_for_parser.reference new file mode 100755 index 00000000000..be47ac9f63d --- /dev/null +++ b/tests/queries/0_stateless/01781_general_format_for_parser.reference @@ -0,0 +1,4 @@ +040501610555496e7438010101010101620655496e743136020002000200 +02000200016306537472696e6704deadbeef04deadbeef04deadbeef04de +adbeef04deadbeef01640e4669786564537472696e67283429cafebabeca +febabecafebabecafebabecafebabe diff --git a/tests/queries/0_stateless/01781_general_format_for_parser.sh b/tests/queries/0_stateless/01781_general_format_for_parser.sh new file mode 100755 index 00000000000..104570d50b6 --- /dev/null +++ b/tests/queries/0_stateless/01781_general_format_for_parser.sh @@ -0,0 +1,13 @@ +#!/usr/bin/env bash + +CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +# shellcheck source=../shell_config.sh +. "$CUR_DIR"/../shell_config.sh + +$RESULT="result.dat" +$REFERENCE="01781_general_format_for_parser.reference" +$CLICKHOUSE_LOCAL --query "select toUInt8(1) as a, toUInt16(2) as b, '\xDE\xAD\xBE\xEF' as c, toFixedString('\xCA\xFE\xBA\xBE',4) as d from numbers(5) format Native" | xxd > $RESULT + + +cmp --silent $RESULT $REFERENCE && echo 'OK' || echo 'FAIL' +rm -rf $RESULT From e30c07db201c39d1eb1d3d26d69a2f05795e439b Mon Sep 17 00:00:00 2001 From: Peng Jian Date: Wed, 31 Mar 2021 20:22:36 +0800 Subject: [PATCH 11/18] add test cases --- tests/integration/test_storage_kafka/test.py | 72 +++++++++++++++++++ .../01781_general_format_for_parser.reference | 4 -- .../01781_general_format_for_parser.sh | 16 +++-- 3 files changed, 83 insertions(+), 9 deletions(-) delete mode 100755 tests/queries/0_stateless/01781_general_format_for_parser.reference diff --git a/tests/integration/test_storage_kafka/test.py b/tests/integration/test_storage_kafka/test.py index 7c69d865594..f8d869b8533 100644 --- a/tests/integration/test_storage_kafka/test.py +++ b/tests/integration/test_storage_kafka/test.py @@ -2593,6 +2593,78 @@ def test_kafka_engine_put_errors_to_stream(kafka_cluster): DROP TABLE test.kafka_errors; ''') +def gen_normal_json(): + return '{"i":1000, "s":"ABC123abc"}' + +def gen_malformed_json(): + return '{"i":"n1000", "s":"1000"}' + +def gen_message_with_jsons(jsons = 10, malformed = 0): + s = io.StringIO() + for i in range (jsons): + if malformed and random.randint(0,1) == 1: + s.write(gen_malformed_json()) + else: + s.write(gen_normal_json()) + s.write(' ') + return s.getvalue() + + +def test_kafka_engine_put_errors_to_stream_with_random_malformed_json(kafka_cluster): + instance.query(''' + DROP TABLE IF EXISTS test.kafka; + DROP TABLE IF EXISTS test.kafka_data; + DROP TABLE IF EXISTS test.kafka_errors; + CREATE TABLE test.kafka (i Int64, s String) + ENGINE = Kafka + SETTINGS kafka_broker_list = 'kafka1:19092', + kafka_topic_list = 'json', + kafka_group_name = 'json', + kafka_format = 'JSONEachRow', + kafka_max_block_size = 100, + kafka_poll_max_batch_size = 1, + kafka_handle_error_mode = 'stream'; + CREATE MATERIALIZED VIEW test.kafka_data (i Int64, s String) + ENGINE = MergeTree + ORDER BY i + AS SELECT i, s FROM test.kafka WHERE length(_error) == 0; + CREATE MATERIALIZED VIEW test.kafka_errors (topic String, partition Int64, offset Int64, raw String, error String) + ENGINE = MergeTree + ORDER BY (topic, offset) + AS SELECT + _topic AS topic, + _partition AS partition, + _offset AS offset, + _raw_message AS raw, + _error AS error + FROM test.kafka WHERE length(_error) > 0; + ''') + + messages = [] + for i in range(128): + if i % 2 == 0: + messages.append(gen_message_with_jsons(10, 1)) + else: + messages.append(gen_message_with_jsons(10, 0)) + + kafka_produce('json', messages) + + while True: + total_rows = instance.query('SELECT count() FROM test.kafka_data', ignore_error=True) + if total_rows == '640\n': + break + + while True: + total_error_rows = instance.query('SELECT count() FROM test.kafka_errors', ignore_error=True) + if total_error_rows == '64\n': + break + + instance.query(''' + DROP TABLE test.kafka; + DROP TABLE test.kafka_data; + DROP TABLE test.kafka_errors; + ''') + if __name__ == '__main__': cluster.start() input("Cluster created, press any key to destroy...") diff --git a/tests/queries/0_stateless/01781_general_format_for_parser.reference b/tests/queries/0_stateless/01781_general_format_for_parser.reference deleted file mode 100755 index be47ac9f63d..00000000000 --- a/tests/queries/0_stateless/01781_general_format_for_parser.reference +++ /dev/null @@ -1,4 +0,0 @@ -040501610555496e7438010101010101620655496e743136020002000200 -02000200016306537472696e6704deadbeef04deadbeef04deadbeef04de -adbeef04deadbeef01640e4669786564537472696e67283429cafebabeca -febabecafebabecafebabecafebabe diff --git a/tests/queries/0_stateless/01781_general_format_for_parser.sh b/tests/queries/0_stateless/01781_general_format_for_parser.sh index 104570d50b6..e642c7ad277 100755 --- a/tests/queries/0_stateless/01781_general_format_for_parser.sh +++ b/tests/queries/0_stateless/01781_general_format_for_parser.sh @@ -4,10 +4,16 @@ CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) # shellcheck source=../shell_config.sh . "$CUR_DIR"/../shell_config.sh -$RESULT="result.dat" -$REFERENCE="01781_general_format_for_parser.reference" -$CLICKHOUSE_LOCAL --query "select toUInt8(1) as a, toUInt16(2) as b, '\xDE\xAD\xBE\xEF' as c, toFixedString('\xCA\xFE\xBA\xBE',4) as d from numbers(5) format Native" | xxd > $RESULT +TEST_OUTPUT=`$CLICKHOUSE_LOCAL --query "select toUInt8(1) as a, toUInt16(2) as b, '\xDE\xAD\xBE\xEF' as c, toFixedString('\xCA\xFE\xBA\xBE',4) as d from numbers(5) format Native" | xxd -ps` +EXPECTED_OUTPUT=` +echo '040501610555496e7438010101010101620655496e743136020002000200 +02000200016306537472696e6704deadbeef04deadbeef04deadbeef04de +adbeef04deadbeef01640e4669786564537472696e67283429cafebabeca +febabecafebabecafebabecafebabe'` -cmp --silent $RESULT $REFERENCE && echo 'OK' || echo 'FAIL' -rm -rf $RESULT +if [ "$TEST_OUTPUT" = "$EXPECTED_OUTPUT" ]; then + echo 'OK' +else + echo 'FAIL' +fi From 26b5482b4daff66be596b35ffdbffbdd0d2e8f4b Mon Sep 17 00:00:00 2001 From: Peng Jian Date: Wed, 31 Mar 2021 22:25:51 +0800 Subject: [PATCH 12/18] remove the flag in the parser --- src/Formats/FormatFactory.cpp | 4 +--- src/Formats/FormatFactory.h | 3 +-- src/Processors/Formats/IInputFormat.cpp | 4 +--- src/Processors/Formats/IRowInputFormat.cpp | 6 ------ src/Processors/Formats/IRowInputFormat.h | 1 - src/Storages/Kafka/KafkaBlockInputStream.cpp | 2 +- src/Storages/Kafka/StorageKafka.cpp | 5 +++++ 7 files changed, 9 insertions(+), 16 deletions(-) diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp index eecf97aa848..596d63ba7d7 100644 --- a/src/Formats/FormatFactory.cpp +++ b/src/Formats/FormatFactory.cpp @@ -269,8 +269,7 @@ InputFormatPtr FormatFactory::getInputFormat( const Block & sample, const Context & context, UInt64 max_block_size, - const std::optional & _format_settings, - const bool sync_after_error) const + const std::optional & _format_settings) const { const auto & input_getter = getCreators(name).input_processor_creator; if (!input_getter) @@ -290,7 +289,6 @@ InputFormatPtr FormatFactory::getInputFormat( params.allow_errors_ratio = format_settings.input_allow_errors_ratio; params.max_execution_time = settings.max_execution_time; params.timeout_overflow_mode = settings.timeout_overflow_mode; - params.sync_after_error = sync_after_error; auto format = input_getter(buf, sample, params, format_settings); /// It's a kludge. Because I cannot remove context from values format. diff --git a/src/Formats/FormatFactory.h b/src/Formats/FormatFactory.h index 349120aaad4..4fa7e9a0c01 100644 --- a/src/Formats/FormatFactory.h +++ b/src/Formats/FormatFactory.h @@ -133,8 +133,7 @@ public: const Block & sample, const Context & context, UInt64 max_block_size, - const std::optional & format_settings = std::nullopt, - const bool sync_after_error = false) const; + const std::optional & format_settings = std::nullopt) const; /// Checks all preconditions. Returns ordinary format if parallel formatting cannot be done. OutputFormatPtr getOutputFormatParallelIfPossible( diff --git a/src/Processors/Formats/IInputFormat.cpp b/src/Processors/Formats/IInputFormat.cpp index 069d25564b1..160a2125468 100644 --- a/src/Processors/Formats/IInputFormat.cpp +++ b/src/Processors/Formats/IInputFormat.cpp @@ -18,9 +18,7 @@ IInputFormat::IInputFormat(Block header, ReadBuffer & in_) void IInputFormat::resetParser() { - if (in.hasPendingData()) - throw Exception("Unread data in IInputFormat::resetParser. Most likely it's a bug.", ErrorCodes::LOGICAL_ERROR); - + in.ignoreAll(); // those are protected attributes from ISource (I didn't want to propagate resetParser up there) finished = false; got_exception = false; diff --git a/src/Processors/Formats/IRowInputFormat.cpp b/src/Processors/Formats/IRowInputFormat.cpp index a0f70304197..75a9abf6845 100644 --- a/src/Processors/Formats/IRowInputFormat.cpp +++ b/src/Processors/Formats/IRowInputFormat.cpp @@ -108,12 +108,6 @@ Chunk IRowInputFormat::generate() if (!isParseError(e.code())) throw; - if (params.sync_after_error) - { - syncAfterError(); - throw; - } - if (params.allow_errors_num == 0 && params.allow_errors_ratio == 0) throw; diff --git a/src/Processors/Formats/IRowInputFormat.h b/src/Processors/Formats/IRowInputFormat.h index 7061db75765..c802bd3066b 100644 --- a/src/Processors/Formats/IRowInputFormat.h +++ b/src/Processors/Formats/IRowInputFormat.h @@ -29,7 +29,6 @@ struct RowInputFormatParams Poco::Timespan max_execution_time = 0; OverflowMode timeout_overflow_mode = OverflowMode::THROW; - bool sync_after_error = false; }; bool isParseError(int code); diff --git a/src/Storages/Kafka/KafkaBlockInputStream.cpp b/src/Storages/Kafka/KafkaBlockInputStream.cpp index 24a5f5e3923..6b6158ff62a 100644 --- a/src/Storages/Kafka/KafkaBlockInputStream.cpp +++ b/src/Storages/Kafka/KafkaBlockInputStream.cpp @@ -84,7 +84,7 @@ Block KafkaBlockInputStream::readImpl() auto put_error_to_stream = handle_error_mode == HandleKafkaErrorMode::STREAM; auto input_format = FormatFactory::instance().getInputFormat( - storage.getFormatName(), *buffer, non_virtual_header, *context, max_block_size, {}, put_error_to_stream); + storage.getFormatName(), *buffer, non_virtual_header, *context, max_block_size); InputPort port(input_format->getPort().getHeader(), input_format.get()); connect(input_format->getPort(), port); diff --git a/src/Storages/Kafka/StorageKafka.cpp b/src/Storages/Kafka/StorageKafka.cpp index a26604257fb..88bc177ef0b 100644 --- a/src/Storages/Kafka/StorageKafka.cpp +++ b/src/Storages/Kafka/StorageKafka.cpp @@ -190,6 +190,11 @@ StorageKafka::StorageKafka( , settings_adjustments(createSettingsAdjustments()) , thread_per_consumer(kafka_settings->kafka_thread_per_consumer.value) { + if (kafka_settings->kafka_handle_error_mode == HandleKafkaErrorMode::STREAM) + { + kafka_settings->input_format_allow_errors_num = 0; + kafka_settings->input_format_allow_errors_ratio = 0; + } StorageInMemoryMetadata storage_metadata; storage_metadata.setColumns(columns_); setInMemoryMetadata(storage_metadata); From 1e033e8817dbe88aeb2c4c43b798c0dce0a55d8d Mon Sep 17 00:00:00 2001 From: Peng Jian Date: Thu, 1 Apr 2021 10:12:37 +0800 Subject: [PATCH 13/18] fix code style --- src/Processors/Formats/IInputFormat.cpp | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/Processors/Formats/IInputFormat.cpp b/src/Processors/Formats/IInputFormat.cpp index 160a2125468..5594e04dc74 100644 --- a/src/Processors/Formats/IInputFormat.cpp +++ b/src/Processors/Formats/IInputFormat.cpp @@ -5,11 +5,6 @@ namespace DB { -namespace ErrorCodes -{ - extern const int LOGICAL_ERROR; -} - IInputFormat::IInputFormat(Block header, ReadBuffer & in_) : ISource(std::move(header)), in(in_) { From 19f92f521419e84ccd56aa1ae6c7a6f9dbfc92ab Mon Sep 17 00:00:00 2001 From: Peng Jian Date: Thu, 1 Apr 2021 11:41:42 +0800 Subject: [PATCH 14/18] fix test case --- .../0_stateless/01781_general_format_for_parser.sh | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/tests/queries/0_stateless/01781_general_format_for_parser.sh b/tests/queries/0_stateless/01781_general_format_for_parser.sh index e642c7ad277..5148e73f525 100755 --- a/tests/queries/0_stateless/01781_general_format_for_parser.sh +++ b/tests/queries/0_stateless/01781_general_format_for_parser.sh @@ -4,13 +4,9 @@ CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) # shellcheck source=../shell_config.sh . "$CUR_DIR"/../shell_config.sh -TEST_OUTPUT=`$CLICKHOUSE_LOCAL --query "select toUInt8(1) as a, toUInt16(2) as b, '\xDE\xAD\xBE\xEF' as c, toFixedString('\xCA\xFE\xBA\xBE',4) as d from numbers(5) format Native" | xxd -ps` +TEST_OUTPUT=`$CLICKHOUSE_LOCAL --query "select toUInt8(1) as a, toUInt16(2) as b, '\xDE\xAD\xBE\xEF' as c, toFixedString('\xCA\xFE\xBA\xBE',4) as d from numbers(5) format Native" | md5sum | cut -d ' ' -f1` -EXPECTED_OUTPUT=` -echo '040501610555496e7438010101010101620655496e743136020002000200 -02000200016306537472696e6704deadbeef04deadbeef04deadbeef04de -adbeef04deadbeef01640e4669786564537472696e67283429cafebabeca -febabecafebabecafebabecafebabe'` +EXPECTED_OUTPUT=`echo 'd7255e6863ef058aa60064fab921fb2e'` if [ "$TEST_OUTPUT" = "$EXPECTED_OUTPUT" ]; then echo 'OK' From fd5907a3328394d0c3ef5514026acc5d27221f48 Mon Sep 17 00:00:00 2001 From: Peng Jian Date: Thu, 1 Apr 2021 14:20:18 +0800 Subject: [PATCH 15/18] add reference for 01781_general_format_for_parser test case --- .../0_stateless/01781_general_format_for_parser.reference | 1 + 1 file changed, 1 insertion(+) create mode 100644 tests/queries/0_stateless/01781_general_format_for_parser.reference diff --git a/tests/queries/0_stateless/01781_general_format_for_parser.reference b/tests/queries/0_stateless/01781_general_format_for_parser.reference new file mode 100644 index 00000000000..d86bac9de59 --- /dev/null +++ b/tests/queries/0_stateless/01781_general_format_for_parser.reference @@ -0,0 +1 @@ +OK From 604ba9db601bb05693097aad0924ef2504dec50e Mon Sep 17 00:00:00 2001 From: Peng Jian Date: Wed, 7 Apr 2021 20:15:15 +0800 Subject: [PATCH 16/18] add test case --- tests/integration/test_storage_kafka/test.py | 247 ++++++++++++++++++ .../01781_general_format_for_parser.reference | 1 - .../01781_general_format_for_parser.sh | 15 -- 3 files changed, 247 insertions(+), 16 deletions(-) delete mode 100644 tests/queries/0_stateless/01781_general_format_for_parser.reference delete mode 100755 tests/queries/0_stateless/01781_general_format_for_parser.sh diff --git a/tests/integration/test_storage_kafka/test.py b/tests/integration/test_storage_kafka/test.py index f8d869b8533..d77a7c56cf4 100644 --- a/tests/integration/test_storage_kafka/test.py +++ b/tests/integration/test_storage_kafka/test.py @@ -2665,6 +2665,253 @@ def test_kafka_engine_put_errors_to_stream_with_random_malformed_json(kafka_clus DROP TABLE test.kafka_errors; ''') +@pytest.mark.timeout(120) +def test_kafka_formats_with_broken_message(kafka_cluster): + # data was dumped from clickhouse itself in a following manner + # clickhouse-client --format=Native --query='SELECT toInt64(number) as id, toUInt16( intDiv( id, 65536 ) ) as blockNo, reinterpretAsString(19777) as val1, toFloat32(0.5) as val2, toUInt8(1) as val3 from numbers(100) ORDER BY id' | xxd -ps | tr -d '\n' | sed 's/\(..\)/\\x\1/g' + + all_formats = { + ## Text formats ## + # dumped with clickhouse-client ... | perl -pe 's/\n/\\n/; s/\t/\\t/g;' + 'JSONEachRow': { + 'data_sample': [ + '{"id":"0","blockNo":0,"val1":"AM","val2":0.5,"val3":1}\n', + '{"id":"1","blockNo":0,"val1":"AM","val2":0.5,"val3":1}\n{"id":"2","blockNo":0,"val1":"AM","val2":0.5,"val3":1}\n{"id":"3","blockNo":0,"val1":"AM","val2":0.5,"val3":1}\n{"id":"4","blockNo":0,"val1":"AM","val2":0.5,"val3":1}\n{"id":"5","blockNo":0,"val1":"AM","val2":0.5,"val3":1}\n{"id":"6","blockNo":0,"val1":"AM","val2":0.5,"val3":1}\n{"id":"7","blockNo":0,"val1":"AM","val2":0.5,"val3":1}\n{"id":"8","blockNo":0,"val1":"AM","val2":0.5,"val3":1}\n{"id":"9","blockNo":0,"val1":"AM","val2":0.5,"val3":1}\n{"id":"10","blockNo":0,"val1":"AM","val2":0.5,"val3":1}\n{"id":"11","blockNo":0,"val1":"AM","val2":0.5,"val3":1}\n{"id":"12","blockNo":0,"val1":"AM","val2":0.5,"val3":1}\n{"id":"13","blockNo":0,"val1":"AM","val2":0.5,"val3":1}\n{"id":"14","blockNo":0,"val1":"AM","val2":0.5,"val3":1}\n{"id":"15","blockNo":0,"val1":"AM","val2":0.5,"val3":1}\n', + '{"id":"0","blockNo":0,"val1":"AM","val2":0.5,"val3":1}\n', + # broken message + '{"id":"0","blockNo":"BAD","val1":"AM","val2":0.5,"val3":1}', + ], + 'expected':'''{"raw_message":"{\\"id\\":\\"0\\",\\"blockNo\\":\\"BAD\\",\\"val1\\":\\"AM\\",\\"val2\\":0.5,\\"val3\\":1}","error":"Cannot parse input: expected '\\"' before: 'BAD\\",\\"val1\\":\\"AM\\",\\"val2\\":0.5,\\"val3\\":1}': (while reading the value of key blockNo)"}''', + 'supports_empty_value': True, + 'printable': True, + }, + # JSONAsString doesn't fit to that test, and tested separately + 'JSONCompactEachRow': { + 'data_sample': [ + '["0", 0, "AM", 0.5, 1]\n', + '["1", 0, "AM", 0.5, 1]\n["2", 0, "AM", 0.5, 1]\n["3", 0, "AM", 0.5, 1]\n["4", 0, "AM", 0.5, 1]\n["5", 0, "AM", 0.5, 1]\n["6", 0, "AM", 0.5, 1]\n["7", 0, "AM", 0.5, 1]\n["8", 0, "AM", 0.5, 1]\n["9", 0, "AM", 0.5, 1]\n["10", 0, "AM", 0.5, 1]\n["11", 0, "AM", 0.5, 1]\n["12", 0, "AM", 0.5, 1]\n["13", 0, "AM", 0.5, 1]\n["14", 0, "AM", 0.5, 1]\n["15", 0, "AM", 0.5, 1]\n', + '["0", 0, "AM", 0.5, 1]\n', + # broken message + '["0", "BAD", "AM", 0.5, 1]', + ], + 'expected':'''{"raw_message":"[\\"0\\", \\"BAD\\", \\"AM\\", 0.5, 1]","error":"Cannot parse input: expected '\\"' before: 'BAD\\", \\"AM\\", 0.5, 1]': (while reading the value of key blockNo)"}''', + 'supports_empty_value': True, + 'printable':True, + }, + 'JSONCompactEachRowWithNamesAndTypes': { + 'data_sample': [ + '["id", "blockNo", "val1", "val2", "val3"]\n["Int64", "UInt16", "String", "Float32", "UInt8"]\n["0", 0, "AM", 0.5, 1]\n', + '["id", "blockNo", "val1", "val2", "val3"]\n["Int64", "UInt16", "String", "Float32", "UInt8"]\n["1", 0, "AM", 0.5, 1]\n["2", 0, "AM", 0.5, 1]\n["3", 0, "AM", 0.5, 1]\n["4", 0, "AM", 0.5, 1]\n["5", 0, "AM", 0.5, 1]\n["6", 0, "AM", 0.5, 1]\n["7", 0, "AM", 0.5, 1]\n["8", 0, "AM", 0.5, 1]\n["9", 0, "AM", 0.5, 1]\n["10", 0, "AM", 0.5, 1]\n["11", 0, "AM", 0.5, 1]\n["12", 0, "AM", 0.5, 1]\n["13", 0, "AM", 0.5, 1]\n["14", 0, "AM", 0.5, 1]\n["15", 0, "AM", 0.5, 1]\n', + '["id", "blockNo", "val1", "val2", "val3"]\n["Int64", "UInt16", "String", "Float32", "UInt8"]\n["0", 0, "AM", 0.5, 1]\n', + # broken message + '["0", "BAD", "AM", 0.5, 1]', + ], + 'expected':'''{"raw_message":"[\\"0\\", \\"BAD\\", \\"AM\\", 0.5, 1]","error":"Cannot parse JSON string: expected opening quote"}''', + 'printable':True, + }, + 'TSKV': { + 'data_sample': [ + 'id=0\tblockNo=0\tval1=AM\tval2=0.5\tval3=1\n', + 'id=1\tblockNo=0\tval1=AM\tval2=0.5\tval3=1\nid=2\tblockNo=0\tval1=AM\tval2=0.5\tval3=1\nid=3\tblockNo=0\tval1=AM\tval2=0.5\tval3=1\nid=4\tblockNo=0\tval1=AM\tval2=0.5\tval3=1\nid=5\tblockNo=0\tval1=AM\tval2=0.5\tval3=1\nid=6\tblockNo=0\tval1=AM\tval2=0.5\tval3=1\nid=7\tblockNo=0\tval1=AM\tval2=0.5\tval3=1\nid=8\tblockNo=0\tval1=AM\tval2=0.5\tval3=1\nid=9\tblockNo=0\tval1=AM\tval2=0.5\tval3=1\nid=10\tblockNo=0\tval1=AM\tval2=0.5\tval3=1\nid=11\tblockNo=0\tval1=AM\tval2=0.5\tval3=1\nid=12\tblockNo=0\tval1=AM\tval2=0.5\tval3=1\nid=13\tblockNo=0\tval1=AM\tval2=0.5\tval3=1\nid=14\tblockNo=0\tval1=AM\tval2=0.5\tval3=1\nid=15\tblockNo=0\tval1=AM\tval2=0.5\tval3=1\n', + 'id=0\tblockNo=0\tval1=AM\tval2=0.5\tval3=1\n', + # broken message + 'id=0\tblockNo=BAD\tval1=AM\tval2=0.5\tval3=1\n', + ], + 'expected':'{"raw_message":"id=0\\tblockNo=BAD\\tval1=AM\\tval2=0.5\\tval3=1\\n","error":"Found garbage after field in TSKV format: blockNo: (at row 1)\\n"}', + 'printable':True, + }, + 'CSV': { + 'data_sample': [ + '0,0,"AM",0.5,1\n', + '1,0,"AM",0.5,1\n2,0,"AM",0.5,1\n3,0,"AM",0.5,1\n4,0,"AM",0.5,1\n5,0,"AM",0.5,1\n6,0,"AM",0.5,1\n7,0,"AM",0.5,1\n8,0,"AM",0.5,1\n9,0,"AM",0.5,1\n10,0,"AM",0.5,1\n11,0,"AM",0.5,1\n12,0,"AM",0.5,1\n13,0,"AM",0.5,1\n14,0,"AM",0.5,1\n15,0,"AM",0.5,1\n', + '0,0,"AM",0.5,1\n', + # broken message + '0,"BAD","AM",0.5,1\n', + ], + 'expected':'''{"raw_message":"0,\\"BAD\\",\\"AM\\",0.5,1\\n","error":"Cannot parse input: expected '\\"' before: 'BAD\\",\\"AM\\",0.5,1\\\\n': Could not print diagnostic info because two last rows aren't in buffer (rare case)\\n"}''', + 'printable':True, + 'supports_empty_value': True, + }, + 'TSV': { + 'data_sample': [ + '0\t0\tAM\t0.5\t1\n', + '1\t0\tAM\t0.5\t1\n2\t0\tAM\t0.5\t1\n3\t0\tAM\t0.5\t1\n4\t0\tAM\t0.5\t1\n5\t0\tAM\t0.5\t1\n6\t0\tAM\t0.5\t1\n7\t0\tAM\t0.5\t1\n8\t0\tAM\t0.5\t1\n9\t0\tAM\t0.5\t1\n10\t0\tAM\t0.5\t1\n11\t0\tAM\t0.5\t1\n12\t0\tAM\t0.5\t1\n13\t0\tAM\t0.5\t1\n14\t0\tAM\t0.5\t1\n15\t0\tAM\t0.5\t1\n', + '0\t0\tAM\t0.5\t1\n', + # broken message + '0\tBAD\tAM\t0.5\t1\n', + ], + 'expected':'''{"raw_message":"0\\tBAD\\tAM\\t0.5\\t1\\n","error":"Cannot parse input: expected '\\\\t' before: 'BAD\\\\tAM\\\\t0.5\\\\t1\\\\n': Could not print diagnostic info because two last rows aren't in buffer (rare case)\\n"}''', + 'supports_empty_value': True, + 'printable':True, + }, + 'CSVWithNames': { + 'data_sample': [ + '"id","blockNo","val1","val2","val3"\n0,0,"AM",0.5,1\n', + '"id","blockNo","val1","val2","val3"\n1,0,"AM",0.5,1\n2,0,"AM",0.5,1\n3,0,"AM",0.5,1\n4,0,"AM",0.5,1\n5,0,"AM",0.5,1\n6,0,"AM",0.5,1\n7,0,"AM",0.5,1\n8,0,"AM",0.5,1\n9,0,"AM",0.5,1\n10,0,"AM",0.5,1\n11,0,"AM",0.5,1\n12,0,"AM",0.5,1\n13,0,"AM",0.5,1\n14,0,"AM",0.5,1\n15,0,"AM",0.5,1\n', + '"id","blockNo","val1","val2","val3"\n0,0,"AM",0.5,1\n', + # broken message + '"id","blockNo","val1","val2","val3"\n0,"BAD","AM",0.5,1\n', + ], + 'expected':'''{"raw_message":"\\"id\\",\\"blockNo\\",\\"val1\\",\\"val2\\",\\"val3\\"\\n0,\\"BAD\\",\\"AM\\",0.5,1\\n","error":"Cannot parse input: expected '\\"' before: 'BAD\\",\\"AM\\",0.5,1\\\\n': Could not print diagnostic info because two last rows aren't in buffer (rare case)\\n"}''', + 'printable':True, + }, + 'Values': { + 'data_sample': [ + "(0,0,'AM',0.5,1)", + "(1,0,'AM',0.5,1),(2,0,'AM',0.5,1),(3,0,'AM',0.5,1),(4,0,'AM',0.5,1),(5,0,'AM',0.5,1),(6,0,'AM',0.5,1),(7,0,'AM',0.5,1),(8,0,'AM',0.5,1),(9,0,'AM',0.5,1),(10,0,'AM',0.5,1),(11,0,'AM',0.5,1),(12,0,'AM',0.5,1),(13,0,'AM',0.5,1),(14,0,'AM',0.5,1),(15,0,'AM',0.5,1)", + "(0,0,'AM',0.5,1)", + # broken message + "(0,'BAD','AM',0.5,1)", + ], + 'expected':r'''{"raw_message":"(0,'BAD','AM',0.5,1)","error":"Cannot parse string 'BAD' as UInt16: syntax error at begin of string. Note: there are toUInt16OrZero and toUInt16OrNull functions, which returns zero\/NULL instead of throwing exception.: while executing 'FUNCTION CAST(assumeNotNull(_dummy_0) :: 2, 'UInt16' :: 1) -> CAST(assumeNotNull(_dummy_0), 'UInt16') UInt16 : 4'"}''', + 'supports_empty_value': True, + 'printable':True, + }, + 'TSVWithNames': { + 'data_sample': [ + 'id\tblockNo\tval1\tval2\tval3\n0\t0\tAM\t0.5\t1\n', + 'id\tblockNo\tval1\tval2\tval3\n1\t0\tAM\t0.5\t1\n2\t0\tAM\t0.5\t1\n3\t0\tAM\t0.5\t1\n4\t0\tAM\t0.5\t1\n5\t0\tAM\t0.5\t1\n6\t0\tAM\t0.5\t1\n7\t0\tAM\t0.5\t1\n8\t0\tAM\t0.5\t1\n9\t0\tAM\t0.5\t1\n10\t0\tAM\t0.5\t1\n11\t0\tAM\t0.5\t1\n12\t0\tAM\t0.5\t1\n13\t0\tAM\t0.5\t1\n14\t0\tAM\t0.5\t1\n15\t0\tAM\t0.5\t1\n', + 'id\tblockNo\tval1\tval2\tval3\n0\t0\tAM\t0.5\t1\n', + # broken message + 'id\tblockNo\tval1\tval2\tval3\n0\tBAD\tAM\t0.5\t1\n', + ], + 'expected':'''{"raw_message":"id\\tblockNo\\tval1\\tval2\\tval3\\n0\\tBAD\\tAM\\t0.5\\t1\\n","error":"Cannot parse input: expected '\\\\t' before: 'BAD\\\\tAM\\\\t0.5\\\\t1\\\\n': Could not print diagnostic info because two last rows aren't in buffer (rare case)\\n"}''', + 'supports_empty_value': True, + 'printable':True, + }, + 'TSVWithNamesAndTypes': { + 'data_sample': [ + 'id\tblockNo\tval1\tval2\tval3\nInt64\tUInt16\tString\tFloat32\tUInt8\n0\t0\tAM\t0.5\t1\n', + 'id\tblockNo\tval1\tval2\tval3\nInt64\tUInt16\tString\tFloat32\tUInt8\n1\t0\tAM\t0.5\t1\n2\t0\tAM\t0.5\t1\n3\t0\tAM\t0.5\t1\n4\t0\tAM\t0.5\t1\n5\t0\tAM\t0.5\t1\n6\t0\tAM\t0.5\t1\n7\t0\tAM\t0.5\t1\n8\t0\tAM\t0.5\t1\n9\t0\tAM\t0.5\t1\n10\t0\tAM\t0.5\t1\n11\t0\tAM\t0.5\t1\n12\t0\tAM\t0.5\t1\n13\t0\tAM\t0.5\t1\n14\t0\tAM\t0.5\t1\n15\t0\tAM\t0.5\t1\n', + 'id\tblockNo\tval1\tval2\tval3\nInt64\tUInt16\tString\tFloat32\tUInt8\n0\t0\tAM\t0.5\t1\n', + # broken message + 'id\tblockNo\tval1\tval2\tval3\nInt64\tUInt16\tString\tFloat32\tUInt8\n0\tBAD\tAM\t0.5\t1\n', + ], + 'expected':'''{"raw_message":"id\\tblockNo\\tval1\\tval2\\tval3\\nInt64\\tUInt16\\tString\\tFloat32\\tUInt8\\n0\\tBAD\\tAM\\t0.5\\t1\\n","error":"Cannot parse input: expected '\\\\t' before: 'BAD\\\\tAM\\\\t0.5\\\\t1\\\\n': Could not print diagnostic info because two last rows aren't in buffer (rare case)\\n"}''', + 'printable':True, + }, + 'Native': { + 'data_sample': [ + b'\x05\x01\x02\x69\x64\x05\x49\x6e\x74\x36\x34\x00\x00\x00\x00\x00\x00\x00\x00\x07\x62\x6c\x6f\x63\x6b\x4e\x6f\x06\x55\x49\x6e\x74\x31\x36\x00\x00\x04\x76\x61\x6c\x31\x06\x53\x74\x72\x69\x6e\x67\x02\x41\x4d\x04\x76\x61\x6c\x32\x07\x46\x6c\x6f\x61\x74\x33\x32\x00\x00\x00\x3f\x04\x76\x61\x6c\x33\x05\x55\x49\x6e\x74\x38\x01', + b'\x05\x0f\x02\x69\x64\x05\x49\x6e\x74\x36\x34\x01\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x03\x00\x00\x00\x00\x00\x00\x00\x04\x00\x00\x00\x00\x00\x00\x00\x05\x00\x00\x00\x00\x00\x00\x00\x06\x00\x00\x00\x00\x00\x00\x00\x07\x00\x00\x00\x00\x00\x00\x00\x08\x00\x00\x00\x00\x00\x00\x00\x09\x00\x00\x00\x00\x00\x00\x00\x0a\x00\x00\x00\x00\x00\x00\x00\x0b\x00\x00\x00\x00\x00\x00\x00\x0c\x00\x00\x00\x00\x00\x00\x00\x0d\x00\x00\x00\x00\x00\x00\x00\x0e\x00\x00\x00\x00\x00\x00\x00\x0f\x00\x00\x00\x00\x00\x00\x00\x07\x62\x6c\x6f\x63\x6b\x4e\x6f\x06\x55\x49\x6e\x74\x31\x36\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x04\x76\x61\x6c\x31\x06\x53\x74\x72\x69\x6e\x67\x02\x41\x4d\x02\x41\x4d\x02\x41\x4d\x02\x41\x4d\x02\x41\x4d\x02\x41\x4d\x02\x41\x4d\x02\x41\x4d\x02\x41\x4d\x02\x41\x4d\x02\x41\x4d\x02\x41\x4d\x02\x41\x4d\x02\x41\x4d\x02\x41\x4d\x04\x76\x61\x6c\x32\x07\x46\x6c\x6f\x61\x74\x33\x32\x00\x00\x00\x3f\x00\x00\x00\x3f\x00\x00\x00\x3f\x00\x00\x00\x3f\x00\x00\x00\x3f\x00\x00\x00\x3f\x00\x00\x00\x3f\x00\x00\x00\x3f\x00\x00\x00\x3f\x00\x00\x00\x3f\x00\x00\x00\x3f\x00\x00\x00\x3f\x00\x00\x00\x3f\x00\x00\x00\x3f\x00\x00\x00\x3f\x04\x76\x61\x6c\x33\x05\x55\x49\x6e\x74\x38\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01', + b'\x05\x01\x02\x69\x64\x05\x49\x6e\x74\x36\x34\x00\x00\x00\x00\x00\x00\x00\x00\x07\x62\x6c\x6f\x63\x6b\x4e\x6f\x06\x55\x49\x6e\x74\x31\x36\x00\x00\x04\x76\x61\x6c\x31\x06\x53\x74\x72\x69\x6e\x67\x02\x41\x4d\x04\x76\x61\x6c\x32\x07\x46\x6c\x6f\x61\x74\x33\x32\x00\x00\x00\x3f\x04\x76\x61\x6c\x33\x05\x55\x49\x6e\x74\x38\x01', + # broken message + b'\x05\x01\x02\x69\x64\x05\x49\x6e\x74\x36\x34\x00\x00\x00\x00\x00\x00\x00\x00\x07\x62\x6c\x6f\x63\x6b\x4e\x6f\x06\x53\x74\x72\x69\x6e\x67\x03\x42\x41\x44\x04\x76\x61\x6c\x31\x06\x53\x74\x72\x69\x6e\x67\x02\x41\x4d\x04\x76\x61\x6c\x32\x07\x46\x6c\x6f\x61\x74\x33\x32\x00\x00\x00\x3f\x04\x76\x61\x6c\x33\x05\x55\x49\x6e\x74\x38\x01', + ], + 'expected':'''{"raw_message":"050102696405496E743634000000000000000007626C6F636B4E6F06537472696E67034241440476616C3106537472696E6702414D0476616C3207466C6F617433320000003F0476616C330555496E743801","error":"Cannot convert: String to UInt16"}''', + 'printable':False, + }, + 'RowBinary': { + 'data_sample': [ + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x41\x4d\x00\x00\x00\x3f\x01', + b'\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x41\x4d\x00\x00\x00\x3f\x01\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x41\x4d\x00\x00\x00\x3f\x01\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x41\x4d\x00\x00\x00\x3f\x01\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x41\x4d\x00\x00\x00\x3f\x01\x05\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x41\x4d\x00\x00\x00\x3f\x01\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x41\x4d\x00\x00\x00\x3f\x01\x07\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x41\x4d\x00\x00\x00\x3f\x01\x08\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x41\x4d\x00\x00\x00\x3f\x01\x09\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x41\x4d\x00\x00\x00\x3f\x01\x0a\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x41\x4d\x00\x00\x00\x3f\x01\x0b\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x41\x4d\x00\x00\x00\x3f\x01\x0c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x41\x4d\x00\x00\x00\x3f\x01\x0d\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x41\x4d\x00\x00\x00\x3f\x01\x0e\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x41\x4d\x00\x00\x00\x3f\x01\x0f\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x41\x4d\x00\x00\x00\x3f\x01', + b'\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x41\x4d\x00\x00\x00\x3f\x01', + # broken message + b'\x00\x00\x00\x00\x00\x00\x00\x00\x03\x42\x41\x44\x02\x41\x4d\x00\x00\x00\x3f\x01', + ], + 'expected':'{"raw_message":"00000000000000000342414402414D0000003F01","error":"Cannot read all data. Bytes read: 9. Bytes expected: 65.: (at row 1)\\n"}', + 'printable':False, + }, + 'RowBinaryWithNamesAndTypes': { + 'data_sample': [ + b'\x05\x02\x69\x64\x07\x62\x6c\x6f\x63\x6b\x4e\x6f\x04\x76\x61\x6c\x31\x04\x76\x61\x6c\x32\x04\x76\x61\x6c\x33\x05\x49\x6e\x74\x36\x34\x06\x55\x49\x6e\x74\x31\x36\x06\x53\x74\x72\x69\x6e\x67\x07\x46\x6c\x6f\x61\x74\x33\x32\x05\x55\x49\x6e\x74\x38\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x41\x4d\x00\x00\x00\x3f\x01', + b'\x05\x02\x69\x64\x07\x62\x6c\x6f\x63\x6b\x4e\x6f\x04\x76\x61\x6c\x31\x04\x76\x61\x6c\x32\x04\x76\x61\x6c\x33\x05\x49\x6e\x74\x36\x34\x06\x55\x49\x6e\x74\x31\x36\x06\x53\x74\x72\x69\x6e\x67\x07\x46\x6c\x6f\x61\x74\x33\x32\x05\x55\x49\x6e\x74\x38\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x41\x4d\x00\x00\x00\x3f\x01\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x41\x4d\x00\x00\x00\x3f\x01\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x41\x4d\x00\x00\x00\x3f\x01\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x41\x4d\x00\x00\x00\x3f\x01\x05\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x41\x4d\x00\x00\x00\x3f\x01\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x41\x4d\x00\x00\x00\x3f\x01\x07\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x41\x4d\x00\x00\x00\x3f\x01\x08\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x41\x4d\x00\x00\x00\x3f\x01\x09\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x41\x4d\x00\x00\x00\x3f\x01\x0a\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x41\x4d\x00\x00\x00\x3f\x01\x0b\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x41\x4d\x00\x00\x00\x3f\x01\x0c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x41\x4d\x00\x00\x00\x3f\x01\x0d\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x41\x4d\x00\x00\x00\x3f\x01\x0e\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x41\x4d\x00\x00\x00\x3f\x01\x0f\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x41\x4d\x00\x00\x00\x3f\x01', + b'\x05\x02\x69\x64\x07\x62\x6c\x6f\x63\x6b\x4e\x6f\x04\x76\x61\x6c\x31\x04\x76\x61\x6c\x32\x04\x76\x61\x6c\x33\x05\x49\x6e\x74\x36\x34\x06\x55\x49\x6e\x74\x31\x36\x06\x53\x74\x72\x69\x6e\x67\x07\x46\x6c\x6f\x61\x74\x33\x32\x05\x55\x49\x6e\x74\x38\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x02\x41\x4d\x00\x00\x00\x3f\x01', + # broken message + b'\x05\x02\x69\x64\x07\x62\x6c\x6f\x63\x6b\x4e\x6f\x04\x76\x61\x6c\x31\x04\x76\x61\x6c\x32\x04\x76\x61\x6c\x33\x05\x49\x6e\x74\x36\x34\x06\x53\x74\x72\x69\x6e\x67\x06\x53\x74\x72\x69\x6e\x67\x07\x46\x6c\x6f\x61\x74\x33\x32\x05\x55\x49\x6e\x74\x38\x00\x00\x00\x00\x00\x00\x00\x00\x03\x42\x41\x44\x02\x41\x4d\x00\x00\x00\x3f\x01', + ], + 'expected':'{"raw_message":"0502696407626C6F636B4E6F0476616C310476616C320476616C3305496E74363406537472696E6706537472696E6707466C6F617433320555496E743800000000000000000342414402414D0000003F01","error":"Cannot read all data. Bytes read: 9. Bytes expected: 65.: (at row 1)\\n"}', + 'printable':False, + }, + 'ORC': { + 'data_sample': [ + b'\x4f\x52\x43\x11\x00\x00\x0a\x06\x12\x04\x08\x01\x50\x00\x2b\x00\x00\x0a\x13\x0a\x03\x00\x00\x00\x12\x0c\x08\x01\x12\x06\x08\x00\x10\x00\x18\x00\x50\x00\x30\x00\x00\xe3\x12\xe7\x62\x65\x00\x01\x21\x3e\x0e\x46\x25\x0e\x2e\x46\x03\x21\x46\x03\x09\xa6\x00\x06\x00\x32\x00\x00\xe3\x92\xe4\x62\x65\x00\x01\x21\x01\x0e\x46\x25\x2e\x2e\x26\x47\x5f\x21\x20\x96\x60\x09\x60\x00\x00\x36\x00\x00\xe3\x92\xe1\x62\x65\x00\x01\x21\x61\x0e\x46\x23\x5e\x2e\x46\x03\x21\x66\x03\x3d\x53\x29\x10\x11\xc0\x00\x00\x2b\x00\x00\x0a\x13\x0a\x03\x00\x00\x00\x12\x0c\x08\x01\x12\x06\x08\x02\x10\x02\x18\x02\x50\x00\x05\x00\x00\xff\x00\x03\x00\x00\x30\x07\x00\x00\x40\x00\x80\x05\x00\x00\x41\x4d\x07\x00\x00\x42\x00\x80\x03\x00\x00\x0a\x07\x00\x00\x42\x00\x80\x05\x00\x00\xff\x01\x88\x00\x00\x4d\xca\xc1\x0a\x80\x30\x0c\x03\xd0\x2e\x6b\xcb\x98\x17\xf1\x14\x50\xfc\xff\xcf\xb4\x66\x1e\x3c\x84\x47\x9a\xce\x1c\xb9\x1b\xb7\xf9\xda\x48\x09\x9e\xb2\xf3\x92\xce\x5b\x86\xf6\x56\x7f\x21\x41\x2f\x51\xa6\x7a\xd7\x1d\xe5\xea\xae\x3d\xca\xd5\x83\x71\x60\xd8\x17\xfc\x62\x0f\xa8\x00\x00\xe3\x4a\xe6\x62\xe1\x60\x0c\x60\xe0\xe2\xe3\x60\x14\x62\xe3\x60\x10\x60\x90\x60\x08\x60\x88\x60\xe5\x12\xe0\x60\x54\xe2\xe0\x62\x34\x10\x62\x34\x90\x60\x02\x8a\x70\x71\x09\x01\x45\xb8\xb8\x98\x1c\x7d\x85\x80\x58\x82\x05\x28\xc6\xcd\x25\xca\xc1\x68\xc4\x0b\x52\xc5\x6c\xa0\x67\x2a\x05\x22\xc0\x4a\x21\x86\x31\x09\x30\x81\xb5\xb2\x02\x00\x36\x01\x00\x25\x8c\xbd\x0a\xc2\x30\x14\x85\x73\x6f\x92\xf6\x92\x6a\x09\x01\x21\x64\x92\x4e\x75\x91\x58\x71\xc9\x64\x27\x5d\x2c\x1d\x5d\xfd\x59\xc4\x42\x37\x5f\xc0\x17\xe8\x23\x9b\xc6\xe1\x3b\x70\x0f\xdf\xb9\xc4\xf5\x17\x5d\x41\x5c\x4f\x60\x37\xeb\x53\x0d\x55\x4d\x0b\x23\x01\xb9\x90\x2e\xbf\x0f\xe3\xe3\xdd\x8d\x0e\x5f\x4f\x27\x3e\xb7\x61\x97\xb2\x49\xb9\xaf\x90\x20\x92\x27\x32\x2a\x6b\xf4\xf3\x0d\x1e\x82\x20\xe8\x59\x28\x09\x4c\x46\x4c\x33\xcb\x7a\x76\x95\x41\x47\x9f\x14\x78\x03\xde\x62\x6c\x54\x30\xb1\x51\x0a\xdb\x8b\x89\x58\x11\xbb\x22\xac\x08\x9a\xe5\x6c\x71\xbf\x3d\xb8\x39\x92\xfa\x7f\x86\x1a\xd3\x54\x1e\xa7\xee\xcc\x7e\x08\x9e\x01\x10\x01\x18\x80\x80\x10\x22\x02\x00\x0c\x28\x57\x30\x06\x82\xf4\x03\x03\x4f\x52\x43\x18', + b'\x4f\x52\x43\x11\x00\x00\x0a\x06\x12\x04\x08\x0f\x50\x00\x2b\x00\x00\x0a\x13\x0a\x03\x00\x00\x00\x12\x0c\x08\x0f\x12\x06\x08\x00\x10\x00\x18\x00\x50\x00\x30\x00\x00\xe3\x12\xe7\x62\x65\x00\x01\x21\x3e\x0e\x7e\x25\x0e\x2e\x46\x43\x21\x46\x4b\x09\xad\x00\x06\x00\x33\x00\x00\x0a\x17\x0a\x03\x00\x00\x00\x12\x10\x08\x0f\x22\x0a\x0a\x02\x41\x4d\x12\x02\x41\x4d\x18\x3c\x50\x00\x3a\x00\x00\xe3\x92\xe1\x62\x65\x00\x01\x21\x61\x0e\x7e\x23\x5e\x2e\x46\x03\x21\x66\x03\x3d\x53\x29\x66\x73\x3d\xd3\x00\x06\x00\x2b\x00\x00\x0a\x13\x0a\x03\x00\x00\x00\x12\x0c\x08\x0f\x12\x06\x08\x02\x10\x02\x18\x1e\x50\x00\x05\x00\x00\x0c\x00\x2b\x00\x00\x31\x32\x33\x34\x35\x36\x37\x38\x39\x31\x30\x31\x31\x31\x32\x31\x33\x31\x34\x31\x35\x09\x00\x00\x06\x01\x03\x02\x09\x00\x00\xc0\x0e\x00\x00\x07\x00\x00\x42\x00\x80\x05\x00\x00\x41\x4d\x0a\x00\x00\xe3\xe2\x42\x01\x00\x09\x00\x00\xc0\x0e\x02\x00\x05\x00\x00\x0c\x01\x94\x00\x00\x2d\xca\xc1\x0e\x80\x30\x08\x03\xd0\xc1\x60\x2e\xf3\x62\x76\x6a\xe2\x0e\xfe\xff\x57\x5a\x3b\x0f\xe4\x51\xe8\x68\xbd\x5d\x05\xe7\xf8\x34\x40\x3a\x6e\x59\xb1\x64\xe0\x91\xa9\xbf\xb1\x97\xd2\x95\x9d\x1e\xca\x55\x3a\x6d\xb4\xd2\xdd\x0b\x74\x9a\x74\xf7\x12\x39\xbd\x97\x7f\x7c\x06\xbb\xa6\x8d\x97\x17\xb4\x00\x00\xe3\x4a\xe6\x62\xe1\xe0\x0f\x60\xe0\xe2\xe3\xe0\x17\x62\xe3\x60\x10\x60\x90\x60\x08\x60\x88\x60\xe5\x12\xe0\xe0\x57\xe2\xe0\x62\x34\x14\x62\xb4\x94\xd0\x02\x8a\xc8\x73\x09\x01\x45\xb8\xb8\x98\x1c\x7d\x85\x80\x58\xc2\x06\x28\x26\xc4\x25\xca\xc1\x6f\xc4\xcb\xc5\x68\x20\xc4\x6c\xa0\x67\x2a\xc5\x6c\xae\x67\x0a\x14\xe6\x87\x1a\xc6\x24\xc0\x24\x21\x07\x32\x0c\x00\x4a\x01\x00\xe3\x60\x16\x58\xc3\x24\xc5\xcd\xc1\x2c\x30\x89\x51\xc2\x4b\xc1\x57\x83\x5f\x49\x83\x83\x47\x88\x95\x91\x89\x99\x85\x55\x8a\x3d\x29\x27\x3f\x39\xdb\x2f\x5f\x8a\x29\x33\x45\x8a\xa5\x2c\x31\xc7\x10\x4c\x1a\x81\x49\x63\x25\x26\x0e\x46\x20\x66\x07\x63\x36\x0e\x3e\x0d\x26\x03\x10\x9f\xd1\x80\xdf\x8a\x85\x83\x3f\x80\xc1\x8a\x8f\x83\x5f\x88\x8d\x83\x41\x80\x41\x82\x21\x80\x21\x82\xd5\x4a\x80\x83\x5f\x89\x83\x8b\xd1\x50\x88\xd1\x52\x42\x0b\x28\x22\x6f\x25\x04\x14\xe1\xe2\x62\x72\xf4\x15\x02\x62\x09\x1b\xa0\x98\x90\x95\x28\x07\xbf\x11\x2f\x17\xa3\x81\x10\xb3\x81\x9e\xa9\x14\xb3\xb9\x9e\x29\x50\x98\x1f\x6a\x18\x93\x00\x93\x84\x1c\xc8\x30\x87\x09\x7e\x1e\x0c\x00\x08\xa8\x01\x10\x01\x18\x80\x80\x10\x22\x02\x00\x0c\x28\x5d\x30\x06\x82\xf4\x03\x03\x4f\x52\x43\x18', + b'\x4f\x52\x43\x11\x00\x00\x0a\x06\x12\x04\x08\x01\x50\x00\x2b\x00\x00\x0a\x13\x0a\x03\x00\x00\x00\x12\x0c\x08\x01\x12\x06\x08\x00\x10\x00\x18\x00\x50\x00\x30\x00\x00\xe3\x12\xe7\x62\x65\x00\x01\x21\x3e\x0e\x46\x25\x0e\x2e\x46\x03\x21\x46\x03\x09\xa6\x00\x06\x00\x32\x00\x00\xe3\x92\xe4\x62\x65\x00\x01\x21\x01\x0e\x46\x25\x2e\x2e\x26\x47\x5f\x21\x20\x96\x60\x09\x60\x00\x00\x36\x00\x00\xe3\x92\xe1\x62\x65\x00\x01\x21\x61\x0e\x46\x23\x5e\x2e\x46\x03\x21\x66\x03\x3d\x53\x29\x10\x11\xc0\x00\x00\x2b\x00\x00\x0a\x13\x0a\x03\x00\x00\x00\x12\x0c\x08\x01\x12\x06\x08\x02\x10\x02\x18\x02\x50\x00\x05\x00\x00\xff\x00\x03\x00\x00\x30\x07\x00\x00\x40\x00\x80\x05\x00\x00\x41\x4d\x07\x00\x00\x42\x00\x80\x03\x00\x00\x0a\x07\x00\x00\x42\x00\x80\x05\x00\x00\xff\x01\x88\x00\x00\x4d\xca\xc1\x0a\x80\x30\x0c\x03\xd0\x2e\x6b\xcb\x98\x17\xf1\x14\x50\xfc\xff\xcf\xb4\x66\x1e\x3c\x84\x47\x9a\xce\x1c\xb9\x1b\xb7\xf9\xda\x48\x09\x9e\xb2\xf3\x92\xce\x5b\x86\xf6\x56\x7f\x21\x41\x2f\x51\xa6\x7a\xd7\x1d\xe5\xea\xae\x3d\xca\xd5\x83\x71\x60\xd8\x17\xfc\x62\x0f\xa8\x00\x00\xe3\x4a\xe6\x62\xe1\x60\x0c\x60\xe0\xe2\xe3\x60\x14\x62\xe3\x60\x10\x60\x90\x60\x08\x60\x88\x60\xe5\x12\xe0\x60\x54\xe2\xe0\x62\x34\x10\x62\x34\x90\x60\x02\x8a\x70\x71\x09\x01\x45\xb8\xb8\x98\x1c\x7d\x85\x80\x58\x82\x05\x28\xc6\xcd\x25\xca\xc1\x68\xc4\x0b\x52\xc5\x6c\xa0\x67\x2a\x05\x22\xc0\x4a\x21\x86\x31\x09\x30\x81\xb5\xb2\x02\x00\x36\x01\x00\x25\x8c\xbd\x0a\xc2\x30\x14\x85\x73\x6f\x92\xf6\x92\x6a\x09\x01\x21\x64\x92\x4e\x75\x91\x58\x71\xc9\x64\x27\x5d\x2c\x1d\x5d\xfd\x59\xc4\x42\x37\x5f\xc0\x17\xe8\x23\x9b\xc6\xe1\x3b\x70\x0f\xdf\xb9\xc4\xf5\x17\x5d\x41\x5c\x4f\x60\x37\xeb\x53\x0d\x55\x4d\x0b\x23\x01\xb9\x90\x2e\xbf\x0f\xe3\xe3\xdd\x8d\x0e\x5f\x4f\x27\x3e\xb7\x61\x97\xb2\x49\xb9\xaf\x90\x20\x92\x27\x32\x2a\x6b\xf4\xf3\x0d\x1e\x82\x20\xe8\x59\x28\x09\x4c\x46\x4c\x33\xcb\x7a\x76\x95\x41\x47\x9f\x14\x78\x03\xde\x62\x6c\x54\x30\xb1\x51\x0a\xdb\x8b\x89\x58\x11\xbb\x22\xac\x08\x9a\xe5\x6c\x71\xbf\x3d\xb8\x39\x92\xfa\x7f\x86\x1a\xd3\x54\x1e\xa7\xee\xcc\x7e\x08\x9e\x01\x10\x01\x18\x80\x80\x10\x22\x02\x00\x0c\x28\x57\x30\x06\x82\xf4\x03\x03\x4f\x52\x43\x18', + # broken message + b'\x4f\x52\x43\x0a\x0b\x0a\x03\x00\x00\x00\x12\x04\x08\x01\x50\x00\x0a\x15\x0a\x05\x00\x00\x00\x00\x00\x12\x0c\x08\x01\x12\x06\x08\x00\x10\x00\x18\x00\x50\x00\x0a\x12\x0a\x06\x00\x00\x00\x00\x00\x00\x12\x08\x08\x01\x42\x02\x08\x06\x50\x00\x0a\x12\x0a\x06\x00\x00\x00\x00\x00\x00\x12\x08\x08\x01\x42\x02\x08\x04\x50\x00\x0a\x29\x0a\x04\x00\x00\x00\x00\x12\x21\x08\x01\x1a\x1b\x09\x00\x00\x00\x00\x00\x00\xe0\x3f\x11\x00\x00\x00\x00\x00\x00\xe0\x3f\x19\x00\x00\x00\x00\x00\x00\xe0\x3f\x50\x00\x0a\x15\x0a\x05\x00\x00\x00\x00\x00\x12\x0c\x08\x01\x12\x06\x08\x02\x10\x02\x18\x02\x50\x00\xff\x80\xff\x80\xff\x00\xff\x80\xff\x03\x42\x41\x44\xff\x80\xff\x02\x41\x4d\xff\x80\x00\x00\x00\x3f\xff\x80\xff\x01\x0a\x06\x08\x06\x10\x00\x18\x0d\x0a\x06\x08\x06\x10\x01\x18\x17\x0a\x06\x08\x06\x10\x02\x18\x14\x0a\x06\x08\x06\x10\x03\x18\x14\x0a\x06\x08\x06\x10\x04\x18\x2b\x0a\x06\x08\x06\x10\x05\x18\x17\x0a\x06\x08\x00\x10\x00\x18\x02\x0a\x06\x08\x00\x10\x01\x18\x02\x0a\x06\x08\x01\x10\x01\x18\x02\x0a\x06\x08\x00\x10\x02\x18\x02\x0a\x06\x08\x02\x10\x02\x18\x02\x0a\x06\x08\x01\x10\x02\x18\x03\x0a\x06\x08\x00\x10\x03\x18\x02\x0a\x06\x08\x02\x10\x03\x18\x02\x0a\x06\x08\x01\x10\x03\x18\x02\x0a\x06\x08\x00\x10\x04\x18\x02\x0a\x06\x08\x01\x10\x04\x18\x04\x0a\x06\x08\x00\x10\x05\x18\x02\x0a\x06\x08\x01\x10\x05\x18\x02\x12\x04\x08\x00\x10\x00\x12\x04\x08\x00\x10\x00\x12\x04\x08\x00\x10\x00\x12\x04\x08\x00\x10\x00\x12\x04\x08\x00\x10\x00\x12\x04\x08\x00\x10\x00\x1a\x03\x47\x4d\x54\x0a\x59\x0a\x04\x08\x01\x50\x00\x0a\x0c\x08\x01\x12\x06\x08\x00\x10\x00\x18\x00\x50\x00\x0a\x08\x08\x01\x42\x02\x08\x06\x50\x00\x0a\x08\x08\x01\x42\x02\x08\x04\x50\x00\x0a\x21\x08\x01\x1a\x1b\x09\x00\x00\x00\x00\x00\x00\xe0\x3f\x11\x00\x00\x00\x00\x00\x00\xe0\x3f\x19\x00\x00\x00\x00\x00\x00\xe0\x3f\x50\x00\x0a\x0c\x08\x01\x12\x06\x08\x02\x10\x02\x18\x02\x50\x00\x08\x03\x10\xec\x02\x1a\x0c\x08\x03\x10\x8e\x01\x18\x1d\x20\xc1\x01\x28\x01\x22\x2e\x08\x0c\x12\x05\x01\x02\x03\x04\x05\x1a\x02\x69\x64\x1a\x07\x62\x6c\x6f\x63\x6b\x4e\x6f\x1a\x04\x76\x61\x6c\x31\x1a\x04\x76\x61\x6c\x32\x1a\x04\x76\x61\x6c\x33\x20\x00\x28\x00\x30\x00\x22\x08\x08\x04\x20\x00\x28\x00\x30\x00\x22\x08\x08\x08\x20\x00\x28\x00\x30\x00\x22\x08\x08\x08\x20\x00\x28\x00\x30\x00\x22\x08\x08\x05\x20\x00\x28\x00\x30\x00\x22\x08\x08\x01\x20\x00\x28\x00\x30\x00\x30\x01\x3a\x04\x08\x01\x50\x00\x3a\x0c\x08\x01\x12\x06\x08\x00\x10\x00\x18\x00\x50\x00\x3a\x08\x08\x01\x42\x02\x08\x06\x50\x00\x3a\x08\x08\x01\x42\x02\x08\x04\x50\x00\x3a\x21\x08\x01\x1a\x1b\x09\x00\x00\x00\x00\x00\x00\xe0\x3f\x11\x00\x00\x00\x00\x00\x00\xe0\x3f\x19\x00\x00\x00\x00\x00\x00\xe0\x3f\x50\x00\x3a\x0c\x08\x01\x12\x06\x08\x02\x10\x02\x18\x02\x50\x00\x40\x90\x4e\x48\x01\x08\xd5\x01\x10\x00\x18\x80\x80\x04\x22\x02\x00\x0b\x28\x5b\x30\x06\x82\xf4\x03\x03\x4f\x52\x43\x18', + ], + 'expected':r'''{"raw_message":"4F52430A0B0A030000001204080150000A150A050000000000120C0801120608001000180050000A120A06000000000000120808014202080650000A120A06000000000000120808014202080450000A290A0400000000122108011A1B09000000000000E03F11000000000000E03F19000000000000E03F50000A150A050000000000120C080112060802100218025000FF80FF80FF00FF80FF03424144FF80FF02414DFF800000003FFF80FF010A0608061000180D0A060806100118170A060806100218140A060806100318140A0608061004182B0A060806100518170A060800100018020A060800100118020A060801100118020A060800100218020A060802100218020A060801100218030A060800100318020A060802100318020A060801100318020A060800100418020A060801100418040A060800100518020A060801100518021204080010001204080010001204080010001204080010001204080010001204080010001A03474D540A590A04080150000A0C0801120608001000180050000A0808014202080650000A0808014202080450000A2108011A1B09000000000000E03F11000000000000E03F19000000000000E03F50000A0C080112060802100218025000080310EC021A0C0803108E01181D20C1012801222E080C120501020304051A0269641A07626C6F636B4E6F1A0476616C311A0476616C321A0476616C33200028003000220808042000280030002208080820002800300022080808200028003000220808052000280030002208080120002800300030013A04080150003A0C0801120608001000180050003A0808014202080650003A0808014202080450003A2108011A1B09000000000000E03F11000000000000E03F19000000000000E03F50003A0C08011206080210021802500040904E480108D5011000188080042202000B285B300682F403034F524318","error":"Cannot parse string 'BAD' as UInt16: syntax error at begin of string. Note: there are toUInt16OrZero and toUInt16OrNull functions, which returns zero\/NULL instead of throwing exception."}''', + 'printable':False, + } + } + + for format_name, format_opts in list(all_formats.items()): + print(('Set up {}'.format(format_name))) + topic_name = 'format_tests_{}'.format(format_name) + data_sample = format_opts['data_sample'] + data_prefix = [] + raw_message = '_raw_message' + # prepend empty value when supported + if format_opts.get('supports_empty_value', False): + data_prefix = data_prefix + [''] + if format_opts.get('printable', False) == False: + raw_message = 'hex(_raw_message)' + kafka_produce(topic_name, data_prefix + data_sample) + instance.query(''' + DROP TABLE IF EXISTS test.kafka_{format_name}; + + CREATE TABLE test.kafka_{format_name} ( + id Int64, + blockNo UInt16, + val1 String, + val2 Float32, + val3 UInt8 + ) ENGINE = Kafka() + SETTINGS kafka_broker_list = 'kafka1:19092', + kafka_topic_list = '{topic_name}', + kafka_group_name = '{topic_name}_group', + kafka_format = '{format_name}', + kafka_handle_error_mode = 'stream', + kafka_flush_interval_ms = 1000 {extra_settings}; + + DROP TABLE IF EXISTS test.kafka_data_{format_name}_mv; + CREATE MATERIALIZED VIEW test.kafka_data_{format_name}_mv Engine=Log AS + SELECT *, _topic, _partition, _offset FROM test.kafka_{format_name} + WHERE length(_error) = 0; + + DROP TABLE IF EXISTS test.kafka_errors_{format_name}_mv; + CREATE MATERIALIZED VIEW test.kafka_errors_{format_name}_mv Engine=Log AS + SELECT {raw_message} as raw_message, _error as error, _topic as topic, _partition as partition, _offset as offset FROM test.kafka_{format_name} + WHERE length(_error) > 0; + '''.format(topic_name=topic_name, format_name=format_name, raw_message=raw_message, + extra_settings=format_opts.get('extra_settings') or '')) + + #DROP TABLE IF EXISTS test.kafka_errors_{format_name}_mv; + + for format_name, format_opts in list(all_formats.items()): + print(('Checking {}'.format(format_name))) + topic_name = 'format_tests_{}'.format(format_name) + # shift offsets by 1 if format supports empty value + offsets = [1, 2, 3] if format_opts.get('supports_empty_value', False) else [0, 1, 2] + result = instance.query('SELECT * FROM test.kafka_data_{format_name}_mv;'.format(format_name=format_name)) + expected = '''\ +0 0 AM 0.5 1 {topic_name} 0 {offset_0} +1 0 AM 0.5 1 {topic_name} 0 {offset_1} +2 0 AM 0.5 1 {topic_name} 0 {offset_1} +3 0 AM 0.5 1 {topic_name} 0 {offset_1} +4 0 AM 0.5 1 {topic_name} 0 {offset_1} +5 0 AM 0.5 1 {topic_name} 0 {offset_1} +6 0 AM 0.5 1 {topic_name} 0 {offset_1} +7 0 AM 0.5 1 {topic_name} 0 {offset_1} +8 0 AM 0.5 1 {topic_name} 0 {offset_1} +9 0 AM 0.5 1 {topic_name} 0 {offset_1} +10 0 AM 0.5 1 {topic_name} 0 {offset_1} +11 0 AM 0.5 1 {topic_name} 0 {offset_1} +12 0 AM 0.5 1 {topic_name} 0 {offset_1} +13 0 AM 0.5 1 {topic_name} 0 {offset_1} +14 0 AM 0.5 1 {topic_name} 0 {offset_1} +15 0 AM 0.5 1 {topic_name} 0 {offset_1} +0 0 AM 0.5 1 {topic_name} 0 {offset_2} +'''.format(topic_name=topic_name, offset_0=offsets[0], offset_1=offsets[1], offset_2=offsets[2]) + print(('Checking result\n {result} \n expected \n {expected}\n'.format(result=str(result), expected=str(expected)))) + assert TSV(result) == TSV(expected), 'Proper result for format: {}'.format(format_name) + errors_result = instance.query('SELECT raw_message, error FROM test.kafka_errors_{format_name}_mv format JSONEachRow'.format(format_name=format_name)) + errors_expected = format_opts['expected'] + print(errors_result.strip()) + print(errors_expected.strip()) + assert errors_result.strip() == errors_expected.strip(), 'Proper errors for format: {}'.format(format_name) + if __name__ == '__main__': cluster.start() input("Cluster created, press any key to destroy...") diff --git a/tests/queries/0_stateless/01781_general_format_for_parser.reference b/tests/queries/0_stateless/01781_general_format_for_parser.reference deleted file mode 100644 index d86bac9de59..00000000000 --- a/tests/queries/0_stateless/01781_general_format_for_parser.reference +++ /dev/null @@ -1 +0,0 @@ -OK diff --git a/tests/queries/0_stateless/01781_general_format_for_parser.sh b/tests/queries/0_stateless/01781_general_format_for_parser.sh deleted file mode 100755 index 5148e73f525..00000000000 --- a/tests/queries/0_stateless/01781_general_format_for_parser.sh +++ /dev/null @@ -1,15 +0,0 @@ -#!/usr/bin/env bash - -CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) -# shellcheck source=../shell_config.sh -. "$CUR_DIR"/../shell_config.sh - -TEST_OUTPUT=`$CLICKHOUSE_LOCAL --query "select toUInt8(1) as a, toUInt16(2) as b, '\xDE\xAD\xBE\xEF' as c, toFixedString('\xCA\xFE\xBA\xBE',4) as d from numbers(5) format Native" | md5sum | cut -d ' ' -f1` - -EXPECTED_OUTPUT=`echo 'd7255e6863ef058aa60064fab921fb2e'` - -if [ "$TEST_OUTPUT" = "$EXPECTED_OUTPUT" ]; then - echo 'OK' -else - echo 'FAIL' -fi From 16e971ee5254574a57cb0a091ad739eab69d222c Mon Sep 17 00:00:00 2001 From: Peng Jian Date: Thu, 8 Apr 2021 07:19:57 +0800 Subject: [PATCH 17/18] fix test case --- tests/integration/test_storage_kafka/test.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/tests/integration/test_storage_kafka/test.py b/tests/integration/test_storage_kafka/test.py index d77a7c56cf4..6c06d7401bd 100644 --- a/tests/integration/test_storage_kafka/test.py +++ b/tests/integration/test_storage_kafka/test.py @@ -2859,8 +2859,8 @@ def test_kafka_formats_with_broken_message(kafka_cluster): val3 UInt8 ) ENGINE = Kafka() SETTINGS kafka_broker_list = 'kafka1:19092', - kafka_topic_list = '{topic_name}', - kafka_group_name = '{topic_name}_group', + kafka_topic_list = 'stream_{topic_name}', + kafka_group_name = 'stream_{topic_name}_group', kafka_format = '{format_name}', kafka_handle_error_mode = 'stream', kafka_flush_interval_ms = 1000 {extra_settings}; @@ -2877,8 +2877,6 @@ def test_kafka_formats_with_broken_message(kafka_cluster): '''.format(topic_name=topic_name, format_name=format_name, raw_message=raw_message, extra_settings=format_opts.get('extra_settings') or '')) - #DROP TABLE IF EXISTS test.kafka_errors_{format_name}_mv; - for format_name, format_opts in list(all_formats.items()): print(('Checking {}'.format(format_name))) topic_name = 'format_tests_{}'.format(format_name) From f73faf1578008f356de44f9ddd2777e1ca817718 Mon Sep 17 00:00:00 2001 From: Peng Jian Date: Thu, 8 Apr 2021 14:26:32 +0800 Subject: [PATCH 18/18] fix test case --- tests/integration/test_storage_kafka/test.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/tests/integration/test_storage_kafka/test.py b/tests/integration/test_storage_kafka/test.py index 6c06d7401bd..99617367bb1 100644 --- a/tests/integration/test_storage_kafka/test.py +++ b/tests/integration/test_storage_kafka/test.py @@ -2836,9 +2836,10 @@ def test_kafka_formats_with_broken_message(kafka_cluster): } } + topic_name_prefix = 'format_tests_4_stream_' for format_name, format_opts in list(all_formats.items()): print(('Set up {}'.format(format_name))) - topic_name = 'format_tests_{}'.format(format_name) + topic_name = topic_name_prefix + '{}'.format(format_name) data_sample = format_opts['data_sample'] data_prefix = [] raw_message = '_raw_message' @@ -2859,8 +2860,8 @@ def test_kafka_formats_with_broken_message(kafka_cluster): val3 UInt8 ) ENGINE = Kafka() SETTINGS kafka_broker_list = 'kafka1:19092', - kafka_topic_list = 'stream_{topic_name}', - kafka_group_name = 'stream_{topic_name}_group', + kafka_topic_list = '{topic_name}', + kafka_group_name = '{topic_name}', kafka_format = '{format_name}', kafka_handle_error_mode = 'stream', kafka_flush_interval_ms = 1000 {extra_settings}; @@ -2879,7 +2880,7 @@ def test_kafka_formats_with_broken_message(kafka_cluster): for format_name, format_opts in list(all_formats.items()): print(('Checking {}'.format(format_name))) - topic_name = 'format_tests_{}'.format(format_name) + topic_name = topic_name_prefix + '{}'.format(format_name) # shift offsets by 1 if format supports empty value offsets = [1, 2, 3] if format_opts.get('supports_empty_value', False) else [0, 1, 2] result = instance.query('SELECT * FROM test.kafka_data_{format_name}_mv;'.format(format_name=format_name))