Check ORC/Parquet/Arrow format magic bytes before loading file in memory

This commit is contained in:
avogar 2022-04-13 19:27:38 +00:00
parent 6a165787a6
commit 42726639f3
7 changed files with 54 additions and 8 deletions

View File

@ -111,7 +111,7 @@ static std::shared_ptr<arrow::RecordBatchReader> createStreamReader(ReadBuffer &
static std::shared_ptr<arrow::ipc::RecordBatchFileReader> createFileReader(ReadBuffer & in, const FormatSettings & format_settings, std::atomic<int> & is_stopped)
{
auto arrow_file = asArrowFile(in, format_settings, is_stopped);
auto arrow_file = asArrowFile(in, format_settings, is_stopped, "Arrow", ARROW_MAGIC_BYTES);
if (is_stopped)
return nullptr;

View File

@ -9,6 +9,7 @@
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/WriteBufferFromString.h>
#include <IO/copyData.h>
#include <IO/PeekableReadBuffer.h>
#include <arrow/buffer.h>
#include <arrow/io/memory.h>
#include <arrow/result.h>
@ -22,6 +23,7 @@ namespace DB
namespace ErrorCodes
{
extern const int UNKNOWN_FILE_SIZE;
extern const int INCORRECT_DATA;
}
ArrowBufferedOutputStream::ArrowBufferedOutputStream(WriteBuffer & out_) : out{out_}, is_open{true}
@ -139,7 +141,12 @@ arrow::Status ArrowInputStreamFromReadBuffer::Close()
return arrow::Status();
}
std::shared_ptr<arrow::io::RandomAccessFile> asArrowFile(ReadBuffer & in, const FormatSettings & settings, std::atomic<int> & is_cancelled)
std::shared_ptr<arrow::io::RandomAccessFile> asArrowFile(
ReadBuffer & in,
const FormatSettings & settings,
std::atomic<int> & is_cancelled,
const std::string & format_name,
const std::string & magic_bytes)
{
if (auto * fd_in = dynamic_cast<ReadBufferFromFileDescriptor *>(&in))
{
@ -158,8 +165,23 @@ std::shared_ptr<arrow::io::RandomAccessFile> asArrowFile(ReadBuffer & in, const
// fallback to loading the entire file in memory
std::string file_data;
{
PeekableReadBuffer buf(in);
std::string magic_bytes_from_data;
magic_bytes_from_data.resize(magic_bytes.size());
bool read_magic_bytes = false;
try
{
PeekableReadBufferCheckpoint checkpoint(buf, true);
buf.readStrict(magic_bytes_from_data.data(), magic_bytes_from_data.size());
read_magic_bytes = true;
}
catch (const Exception &) {}
if (!read_magic_bytes || magic_bytes_from_data != magic_bytes)
throw Exception(ErrorCodes::INCORRECT_DATA, "Not a {} file", format_name);
WriteBufferFromString file_buffer(file_data);
copyData(in, file_buffer, is_cancelled);
copyData(buf, file_buffer, is_cancelled);
}
return std::make_shared<arrow::io::BufferReader>(arrow::Buffer::FromString(std::move(file_data)));

View File

@ -6,6 +6,10 @@
#include <arrow/io/interfaces.h>
#include <optional>
#define ORC_MAGIC_BYTES "ORC"
#define PARQUET_MAGIC_BYTES "PAR1"
#define ARROW_MAGIC_BYTES "ARROW1"
namespace DB
{
@ -86,7 +90,12 @@ private:
ARROW_DISALLOW_COPY_AND_ASSIGN(ArrowInputStreamFromReadBuffer);
};
std::shared_ptr<arrow::io::RandomAccessFile> asArrowFile(ReadBuffer & in, const FormatSettings & settings, std::atomic<int> & is_cancelled);
std::shared_ptr<arrow::io::RandomAccessFile> asArrowFile(
ReadBuffer & in,
const FormatSettings & settings,
std::atomic<int> & is_cancelled,
const std::string & format_name,
const std::string & magic_bytes);
}

View File

@ -116,7 +116,7 @@ static void getFileReaderAndSchema(
const FormatSettings & format_settings,
std::atomic<int> & is_stopped)
{
auto arrow_file = asArrowFile(in, format_settings, is_stopped);
auto arrow_file = asArrowFile(in, format_settings, is_stopped, "ORC", ORC_MAGIC_BYTES);
if (is_stopped)
return;

View File

@ -117,7 +117,7 @@ static void getFileReaderAndSchema(
const FormatSettings & format_settings,
std::atomic<int> & is_stopped)
{
auto arrow_file = asArrowFile(in, format_settings, is_stopped);
auto arrow_file = asArrowFile(in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES);
if (is_stopped)
return;
THROW_ARROW_NOT_OK(parquet::arrow::OpenFile(std::move(arrow_file), arrow::default_memory_pool(), &file_reader));

View File

@ -150,7 +150,7 @@ void HiveORCFile::prepareReader()
in = std::make_unique<ReadBufferFromHDFS>(namenode_url, path, getContext()->getGlobalContext()->getConfigRef());
auto format_settings = getFormatSettings(getContext());
std::atomic<int> is_stopped{0};
auto result = arrow::adapters::orc::ORCFileReader::Open(asArrowFile(*in, format_settings, is_stopped), arrow::default_memory_pool());
auto result = arrow::adapters::orc::ORCFileReader::Open(asArrowFile(*in, format_settings, is_stopped, "ORC", ORC_MAGIC_BYTES), arrow::default_memory_pool());
THROW_ARROW_NOT_OK(result.status());
reader = std::move(result).ValueOrDie();
}
@ -270,7 +270,7 @@ void HiveParquetFile::prepareReader()
in = std::make_unique<ReadBufferFromHDFS>(namenode_url, path, getContext()->getGlobalContext()->getConfigRef());
auto format_settings = getFormatSettings(getContext());
std::atomic<int> is_stopped{0};
THROW_ARROW_NOT_OK(parquet::arrow::OpenFile(asArrowFile(*in, format_settings, is_stopped), arrow::default_memory_pool(), &reader));
THROW_ARROW_NOT_OK(parquet::arrow::OpenFile(asArrowFile(*in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES), arrow::default_memory_pool(), &reader));
}
void HiveParquetFile::loadSplitMinMaxIndexesImpl()

View File

@ -1428,3 +1428,18 @@ def test_parallel_reading_with_memory_limit(started_cluster):
# Check that server didn't crash
result = instance.query("select 1")
assert int(result) == 1
def test_wrong_format_usage(started_cluster):
bucket = started_cluster.minio_bucket
instance = started_cluster.instances["dummy"]
instance.query(
f"insert into function s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_wrong_format.native') select * from numbers(10)"
)
result = instance.query_and_get_error(
f"desc s3('http://{started_cluster.minio_host}:{started_cluster.minio_port}/{bucket}/test_wrong_format.native', 'Parquet') settings input_format_allow_seeks=0, max_memory_usage=1000"
)
assert "Not a Parquet file" in result