This commit is contained in:
avogar 2023-08-21 12:43:11 +00:00
parent 47304bf7aa
commit 584bd57f5c
14 changed files with 28 additions and 136 deletions

2
contrib/arrow vendored

@ -1 +1 @@
Subproject commit 1f1b3d35fb6eb73e6492d3afd8a85cde848d174f
Subproject commit 1d93838f69a802639ca144ea5704a98e2481810d

View File

@ -25,10 +25,6 @@ public:
virtual NamesAndTypesList readSchema() = 0;
/// Some formats like Parquet contains number of rows in metadata
/// and we can read it once during schema inference and reuse it later for fast count;
virtual std::optional<size_t> readNumberOrRows() { return std::nullopt; }
/// True if order of columns is important in format.
/// Exceptions: JSON, TSKV.
virtual bool hasStrictOrderOfColumns() const { return true; }

View File

@ -172,49 +172,23 @@ ArrowSchemaReader::ArrowSchemaReader(ReadBuffer & in_, bool stream_, const Forma
{
}
void ArrowSchemaReader::initializeIfNeeded()
{
if (file_reader || stream_reader)
return;
if (stream)
stream_reader = createStreamReader(in);
else
{
std::atomic<int> is_stopped = 0;
file_reader = createFileReader(in, format_settings, is_stopped);
}
}
NamesAndTypesList ArrowSchemaReader::readSchema()
{
initializeIfNeeded();
std::shared_ptr<arrow::Schema> schema;
if (stream)
schema = stream_reader->schema();
schema = createStreamReader(in)->schema();
else
schema = file_reader->schema();
{
std::atomic<int> is_stopped = 0;
schema = createFileReader(in, format_settings, is_stopped)->schema();
}
auto header = ArrowColumnToCHColumn::arrowSchemaToCHHeader(
*schema, stream ? "ArrowStream" : "Arrow", format_settings.arrow.skip_columns_with_unsupported_types_in_schema_inference);
if (format_settings.schema_inference_make_columns_nullable)
return getNamesAndRecursivelyNullableTypes(header);
return header.getNamesAndTypesList();
}
std::optional<size_t> ArrowSchemaReader::readNumberOrRows()
{
if (stream)
return std::nullopt;
auto rows = file_reader->CountRows();
if (!rows.ok())
throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading batch of Arrow data: {}", rows.status().ToString());
return *rows;
}
return header.getNamesAndTypesList();}
void registerInputFormatArrow(FormatFactory & factory)
{

View File

@ -66,16 +66,9 @@ public:
NamesAndTypesList readSchema() override;
std::optional<size_t> readNumberOrRows() override;
private:
void initializeIfNeeded();
bool stream;
const FormatSettings format_settings;
std::shared_ptr<arrow::RecordBatchReader> stream_reader;
// The following fields are used only for Arrow format
std::shared_ptr<arrow::ipc::RecordBatchFileReader> file_reader;
};
}

View File

@ -28,7 +28,6 @@ public:
void setReadBuffer(ReadBuffer & in_) override;
void resetParser() override;
// size_t countRows(size_t max_block_size) override;
protected:
CSVRowInputFormat(const Block & header_, std::shared_ptr<PeekableReadBuffer> in_, const Params & params_,

View File

@ -147,30 +147,17 @@ ORCSchemaReader::ORCSchemaReader(ReadBuffer & in_, const FormatSettings & format
{
}
void ORCSchemaReader::initializeIfNeeded()
{
if (file_reader)
return;
std::atomic<int> is_stopped = 0;
getFileReaderAndSchema(in, file_reader, schema, format_settings, is_stopped);
}
NamesAndTypesList ORCSchemaReader::readSchema()
{
initializeIfNeeded();
std::unique_ptr<arrow::adapters::orc::ORCFileReader> file_reader;
std::shared_ptr<arrow::Schema> schema;
std::atomic<int> is_stopped = 0;
getFileReaderAndSchema(in, file_reader, schema, format_settings, is_stopped);
auto header = ArrowColumnToCHColumn::arrowSchemaToCHHeader(
*schema, "ORC", format_settings.orc.skip_columns_with_unsupported_types_in_schema_inference);
if (format_settings.schema_inference_make_columns_nullable)
return getNamesAndRecursivelyNullableTypes(header);
return header.getNamesAndTypesList();
}
std::optional<size_t> ORCSchemaReader::readNumberOrRows()
{
initializeIfNeeded();
return file_reader->NumberOfRows();
}
return header.getNamesAndTypesList();}
void registerInputFormatORC(FormatFactory & factory)
{

View File

@ -69,13 +69,8 @@ public:
ORCSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_);
NamesAndTypesList readSchema() override;
std::optional<size_t> readNumberOrRows() override;
private:
void initializeIfNeeded();
std::unique_ptr<arrow::adapters::orc::ORCFileReader> file_reader;
std::shared_ptr<arrow::Schema> schema;
const FormatSettings format_settings;
};

View File

@ -388,19 +388,12 @@ ParquetSchemaReader::ParquetSchemaReader(ReadBuffer & in_, const FormatSettings
{
}
void ParquetSchemaReader::initializeIfNeeded()
{
if (arrow_file)
return;
std::atomic<int> is_stopped{0};
arrow_file = asArrowFile(in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true);
metadata = parquet::ReadMetaData(arrow_file);
}
NamesAndTypesList ParquetSchemaReader::readSchema()
{
initializeIfNeeded();
std::atomic<int> is_stopped{0};
auto file = asArrowFile(in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true);
auto metadata = parquet::ReadMetaData(file);
std::shared_ptr<arrow::Schema> schema;
THROW_ARROW_NOT_OK(parquet::arrow::FromParquetSchema(metadata->schema(), &schema));
@ -412,12 +405,6 @@ NamesAndTypesList ParquetSchemaReader::readSchema()
return header.getNamesAndTypesList();
}
std::optional<size_t> ParquetSchemaReader::readNumberOrRows()
{
initializeIfNeeded();
return metadata->num_rows();
}
void registerInputFormatParquet(FormatFactory & factory)
{
factory.registerRandomAccessInputFormat(

View File

@ -286,14 +286,9 @@ public:
ParquetSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_);
NamesAndTypesList readSchema() override;
std::optional<size_t> readNumberOrRows() override;
private:
void initializeIfNeeded();
const FormatSettings format_settings;
std::shared_ptr<arrow::io::RandomAccessFile> arrow_file;
std::shared_ptr<parquet::FileMetaData> metadata;
};
}

View File

@ -41,10 +41,10 @@ bool RawBLOBRowInputFormat::readRow(MutableColumns & columns, RowReadExtension &
size_t RawBLOBRowInputFormat::countRows(size_t)
{
if (in->eof())
if (done_count_rows)
return 0;
in->ignoreAll();
done_count_rows = true;
return 1;
}

View File

@ -21,9 +21,12 @@ public:
String getName() const override { return "RawBLOBRowInputFormat"; }
private:
bool readRow(MutableColumns & columns, RowReadExtension &) override;
bool supportsCountRows() const override { return true; }
size_t countRows(size_t max_block_size) override;
bool readRow(MutableColumns & columns, RowReadExtension &) override;
bool done_count_rows = false;
};
class RawBLOBSchemaReader: public IExternalSchemaReader

View File

@ -1,36 +0,0 @@
#pragma once
#include <Processors/ISource.h>
namespace DB
{
class ConstChunkGenerator : public ISource
{
public:
ConstChunkGenerator(Block header, size_t total_num_rows, size_t max_block_size_)
: ISource(std::move(header))
, remaining_rows(total_num_rows), max_block_size(max_block_size_)
{
}
String getName() const override { return "ConstChunkGenerator"; }
protected:
Chunk generate() override
{
if (!remaining_rows)
return {};
size_t num_rows = std::min(max_block_size, remaining_rows);
remaining_rows -= num_rows;
return cloneConstWithDefault(Chunk{getPort().getHeader().getColumns(), 0}, num_rows);
}
private:
size_t remaining_rows;
size_t max_block_size;
};
}

View File

@ -37,7 +37,6 @@
#include <Processors/Formats/IInputFormat.h>
#include <Processors/Formats/ISchemaReader.h>
#include <Processors/Sources/NullSource.h>
#include <Processors/Sources/ConstChunkGenerator.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/ResizeProcessor.h>

View File

@ -1,17 +1,17 @@
#pragma once
#include <Poco/URI.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Processors/ISource.h>
#include <Formats/FormatSettings.h>
#include <IO/CompressionMethod.h>
#include <IO/HTTPHeaderEntries.h>
#include <IO/ReadWriteBufferFromHTTP.h>
#include <Processors/ISource.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Storages/Cache/SchemaCache.h>
#include <IO/HTTPHeaderEntries.h>
#include <Storages/IStorage.h>
#include <Storages/StorageConfiguration.h>
#include <Storages/StorageFactory.h>
#include <Storages/Cache/SchemaCache.h>
#include <Storages/StorageConfiguration.h>
#include <Storages/prepareReadingFromFormat.h>
#include <Poco/URI.h>
namespace DB