From 42726639f34567b22e24ae4bafb506c1ef3b808c Mon Sep 17 00:00:00 2001 From: avogar Date: Wed, 13 Apr 2022 19:27:38 +0000 Subject: [PATCH] Check ORC/Parquet/Arrow format magic bytes before loading file in memory --- .../Formats/Impl/ArrowBlockInputFormat.cpp | 2 +- .../Formats/Impl/ArrowBufferedStreams.cpp | 26 +++++++++++++++++-- .../Formats/Impl/ArrowBufferedStreams.h | 11 +++++++- .../Formats/Impl/ORCBlockInputFormat.cpp | 2 +- .../Formats/Impl/ParquetBlockInputFormat.cpp | 2 +- src/Storages/Hive/HiveFile.cpp | 4 +-- tests/integration/test_storage_s3/test.py | 15 +++++++++++ 7 files changed, 54 insertions(+), 8 deletions(-) diff --git a/src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp b/src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp index 792ebd09392..07331d82bb8 100644 --- a/src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp @@ -111,7 +111,7 @@ static std::shared_ptr createStreamReader(ReadBuffer & static std::shared_ptr createFileReader(ReadBuffer & in, const FormatSettings & format_settings, std::atomic & is_stopped) { - auto arrow_file = asArrowFile(in, format_settings, is_stopped); + auto arrow_file = asArrowFile(in, format_settings, is_stopped, "Arrow", ARROW_MAGIC_BYTES); if (is_stopped) return nullptr; diff --git a/src/Processors/Formats/Impl/ArrowBufferedStreams.cpp b/src/Processors/Formats/Impl/ArrowBufferedStreams.cpp index 484a3a17f8f..8573a560d02 100644 --- a/src/Processors/Formats/Impl/ArrowBufferedStreams.cpp +++ b/src/Processors/Formats/Impl/ArrowBufferedStreams.cpp @@ -9,6 +9,7 @@ #include #include #include +#include #include #include #include @@ -22,6 +23,7 @@ namespace DB namespace ErrorCodes { extern const int UNKNOWN_FILE_SIZE; + extern const int INCORRECT_DATA; } ArrowBufferedOutputStream::ArrowBufferedOutputStream(WriteBuffer & out_) : out{out_}, is_open{true} @@ -139,7 +141,12 @@ arrow::Status ArrowInputStreamFromReadBuffer::Close() return arrow::Status(); } -std::shared_ptr asArrowFile(ReadBuffer & in, const FormatSettings & settings, std::atomic & is_cancelled) +std::shared_ptr asArrowFile( + ReadBuffer & in, + const FormatSettings & settings, + std::atomic & is_cancelled, + const std::string & format_name, + const std::string & magic_bytes) { if (auto * fd_in = dynamic_cast(&in)) { @@ -158,8 +165,23 @@ std::shared_ptr asArrowFile(ReadBuffer & in, const // fallback to loading the entire file in memory std::string file_data; { + PeekableReadBuffer buf(in); + std::string magic_bytes_from_data; + magic_bytes_from_data.resize(magic_bytes.size()); + bool read_magic_bytes = false; + try + { + PeekableReadBufferCheckpoint checkpoint(buf, true); + buf.readStrict(magic_bytes_from_data.data(), magic_bytes_from_data.size()); + read_magic_bytes = true; + } + catch (const Exception &) {} + + if (!read_magic_bytes || magic_bytes_from_data != magic_bytes) + throw Exception(ErrorCodes::INCORRECT_DATA, "Not a {} file", format_name); + WriteBufferFromString file_buffer(file_data); - copyData(in, file_buffer, is_cancelled); + copyData(buf, file_buffer, is_cancelled); } return std::make_shared(arrow::Buffer::FromString(std::move(file_data))); diff --git a/src/Processors/Formats/Impl/ArrowBufferedStreams.h b/src/Processors/Formats/Impl/ArrowBufferedStreams.h index e06eab04f1b..4ad0ecdf012 100644 --- a/src/Processors/Formats/Impl/ArrowBufferedStreams.h +++ b/src/Processors/Formats/Impl/ArrowBufferedStreams.h @@ -6,6 +6,10 @@ #include #include +#define ORC_MAGIC_BYTES "ORC" +#define PARQUET_MAGIC_BYTES "PAR1" +#define ARROW_MAGIC_BYTES "ARROW1" + namespace DB { @@ -86,7 +90,12 @@ private: ARROW_DISALLOW_COPY_AND_ASSIGN(ArrowInputStreamFromReadBuffer); }; -std::shared_ptr asArrowFile(ReadBuffer & in, const FormatSettings & settings, std::atomic & is_cancelled); +std::shared_ptr asArrowFile( + ReadBuffer & in, + const FormatSettings & settings, + std::atomic & is_cancelled, + const std::string & format_name, + const std::string & magic_bytes); } diff --git a/src/Processors/Formats/Impl/ORCBlockInputFormat.cpp b/src/Processors/Formats/Impl/ORCBlockInputFormat.cpp index 333129aee81..1531c0d2794 100644 --- a/src/Processors/Formats/Impl/ORCBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ORCBlockInputFormat.cpp @@ -116,7 +116,7 @@ static void getFileReaderAndSchema( const FormatSettings & format_settings, std::atomic & is_stopped) { - auto arrow_file = asArrowFile(in, format_settings, is_stopped); + auto arrow_file = asArrowFile(in, format_settings, is_stopped, "ORC", ORC_MAGIC_BYTES); if (is_stopped) return; diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp index af16d30bcfe..86987c665e0 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp @@ -117,7 +117,7 @@ static void getFileReaderAndSchema( const FormatSettings & format_settings, std::atomic & is_stopped) { - auto arrow_file = asArrowFile(in, format_settings, is_stopped); + auto arrow_file = asArrowFile(in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES); if (is_stopped) return; THROW_ARROW_NOT_OK(parquet::arrow::OpenFile(std::move(arrow_file), arrow::default_memory_pool(), &file_reader)); diff --git a/src/Storages/Hive/HiveFile.cpp b/src/Storages/Hive/HiveFile.cpp index 02c92770274..57acbdd577b 100644 --- a/src/Storages/Hive/HiveFile.cpp +++ b/src/Storages/Hive/HiveFile.cpp @@ -150,7 +150,7 @@ void HiveORCFile::prepareReader() in = std::make_unique(namenode_url, path, getContext()->getGlobalContext()->getConfigRef()); auto format_settings = getFormatSettings(getContext()); std::atomic is_stopped{0}; - auto result = arrow::adapters::orc::ORCFileReader::Open(asArrowFile(*in, format_settings, is_stopped), arrow::default_memory_pool()); + auto result = arrow::adapters::orc::ORCFileReader::Open(asArrowFile(*in, format_settings, is_stopped, "ORC", ORC_MAGIC_BYTES), arrow::default_memory_pool()); THROW_ARROW_NOT_OK(result.status()); reader = std::move(result).ValueOrDie(); } @@ -270,7 +270,7 @@ void HiveParquetFile::prepareReader() in = std::make_unique(namenode_url, path, getContext()->getGlobalContext()->getConfigRef()); auto format_settings = getFormatSettings(getContext()); std::atomic is_stopped{0}; - THROW_ARROW_NOT_OK(parquet::arrow::OpenFile(asArrowFile(*in, format_settings, is_stopped), arrow::default_memory_pool(), &reader)); + THROW_ARROW_NOT_OK(parquet::arrow::OpenFile(asArrowFile(*in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES), arrow::default_memory_pool(), &reader)); } void HiveParquetFile::loadSplitMinMaxIndexesImpl() diff --git a/tests/integration/test_storage_s3/test.py b/tests/integration/test_storage_s3/test.py index 18cf7cc67ab..9b0cc3cdea8 100644 --- a/tests/integration/test_storage_s3/test.py +++ b/tests/integration/test_storage_s3/test.py @@ -1428,3 +1428,18 @@ def test_parallel_reading_with_memory_limit(started_cluster): # Check that server didn't crash result = instance.query("select 1") assert int(result) == 1 + + +def test_wrong_format_usage(started_cluster): + bucket = started_cluster.minio_bucket + instance = started_cluster.instances["dummy"] + + instance.query( + f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_wrong_format.native') select * from numbers(10)" + ) + + result = instance.query_and_get_error( + f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_wrong_format.native', 'Parquet') settings input_format_allow_seeks=0, max_memory_usage=1000" + ) + + assert "Not a Parquet file" in result