Parallel decoding with one row group per thread

This commit is contained in:
Michael Kolupaev 2023-03-24 01:34:24 +00:00
parent 683077890f
commit e133633359
26 changed files with 561 additions and 186 deletions

View File

@ -140,6 +140,8 @@
M(RestartReplicaThreadsActive, "Number of threads in the RESTART REPLICA thread pool running a task.") \
M(QueryPipelineExecutorThreads, "Number of threads in the PipelineExecutor thread pool.") \
M(QueryPipelineExecutorThreadsActive, "Number of threads in the PipelineExecutor thread pool running a task.") \
M(ParquetDecoderThreads, "Number of threads in the ParquetBlockInputFormat thread pool running a task.") \
M(ParquetDecoderThreadsActive, "Number of threads in the ParquetBlockInputFormat thread pool.") \
M(DistributedFilesToInsert, "Number of pending files to process for asynchronous insertion into Distributed tables. Number of files for every shard is summed.") \
M(BrokenDistributedFilesToInsert, "Number of files for asynchronous insertion into Distributed tables that has been marked as broken. This metric will starts from 0 on start. Number of files for every shard is summed.") \
M(TablesToDropQueueSize, "Number of dropped tables, that are waiting for background data removal.") \

View File

@ -323,7 +323,7 @@ using ThreadFromGlobalPool = ThreadFromGlobalPoolImpl<true>;
/// one is at GlobalThreadPool level, the other is at ThreadPool level, so tracing context will be initialized on the same thread twice.
///
/// Once the worker on ThreadPool gains the control of execution, it won't return until it's shutdown,
/// which means the tracing context initialized at underlying worker level won't be delete for a very long time.
/// which means the tracing context initialized at underlying worker level won't be deleted for a very long time.
/// This would cause wrong context for further jobs scheduled in ThreadPool.
///
/// To make sure the tracing context is correctly propagated, we explicitly disable context propagation(including initialization and de-initialization) at underlying worker level.

View File

@ -241,7 +241,7 @@ InputFormatPtr FormatFactory::getInput(
InputFormatPtr FormatFactory::getInputMultistream(
const String & name,
ParallelReadBuffer::ReadBufferFactoryPtr buf_factory,
SeekableReadBufferFactoryPtr buf_factory,
const Block & sample,
ContextPtr context,
UInt64 max_block_size,
@ -270,7 +270,7 @@ InputFormatPtr FormatFactory::getInputMultistream(
InputFormatPtr FormatFactory::getInputImpl(
const String & name,
// exactly one of the following two is nullptr
ParallelReadBuffer::ReadBufferFactoryPtr buf_factory,
SeekableReadBufferFactoryPtr buf_factory,
ReadBuffer * _buf,
const Block & sample,
ContextPtr context,

View File

@ -92,8 +92,21 @@ private:
// and get some information about projections (min/max/count per column per row group).
//
// TODO: This doesn't seem good. Rethink.
// * Rename multistream to random-access.
// * Pass max_parsing_threads to InputCreator too. Or require random-access formats to be
// created with this function, not InputCreator, and add a ReadBuf* param here.
// * Pass max_download_buffer_size to both creators. Maybe it should be in ReadSettings?
// * Do something about the seekable_read situation in StorageURL.
// Probably add something like checkIfActuallySeekable() in SeekableReadBuffer or in
// SeekableReadBufferFactory.
// (There's also the problem that if the http server doesn't support ranges, we'll read
// the file twice: for schema, then for data. But I think we should ignore that, even
// if we end up solving the related problem of caching metadata.)
// * Would be good to distinguish between max_download_threads = 0 vs 1. 0 would mean
// read inline, on normal threads. 1 would mean read from one IO thread, so that IO can
// overlap with processing.
using MultistreamInputCreator = std::function<InputFormatPtr(
ParallelReadBuffer::ReadBufferFactoryPtr,
SeekableReadBufferFactoryPtr buf_factory,
const Block & header,
const FormatSettings & settings,
const ReadSettings& read_settings,
@ -164,7 +177,7 @@ public:
// Prefer this over getInput() when reading from random-access source like file or HTTP.
InputFormatPtr getInputMultistream(
const String & name,
ParallelReadBuffer::ReadBufferFactoryPtr buf_factory,
SeekableReadBufferFactoryPtr buf_factory,
const Block & sample,
ContextPtr context,
UInt64 max_block_size,
@ -266,7 +279,7 @@ private:
InputFormatPtr getInputImpl(
const String & name,
// exactly one of the following two is nullptr
ParallelReadBuffer::ReadBufferFactoryPtr buf_factory,
SeekableReadBufferFactoryPtr buf_factory,
ReadBuffer * buf,
const Block & sample,
ContextPtr context,

View File

@ -213,12 +213,14 @@ struct FormatSettings
std::unordered_set<int> skip_row_groups = {};
bool output_string_as_string = false;
bool output_fixed_string_as_fixed_byte_array = true;
// TODO: This should probably be shared among all formats, and interact with
// https://github.com/ClickHouse/ClickHouse/issues/38755
bool preserve_order = false;
UInt64 max_block_size = 8192;
ParquetVersion output_version;
ParquetCompression output_compression_method = ParquetCompression::SNAPPY;
// TODO: asdqwe use ReadSettings.remote_read_min_bytes_for_seek instead.
size_t min_bytes_for_seek = 4 * 1024 * 1024;
// Limit on read size, but not on memory usage; many reads can be buffered at once.
// TODO: Remove and use max_download_buffer_size instead.
size_t max_bytes_to_read_at_once = 128 * 1024 * 1024;
} parquet;

View File

@ -51,7 +51,7 @@ struct ParallelReadBuffer::ReadWorker
};
ParallelReadBuffer::ParallelReadBuffer(
std::unique_ptr<ReadBufferFactory> reader_factory_, ThreadPoolCallbackRunner<void> schedule_, size_t max_working_readers_, size_t range_step_)
std::unique_ptr<SeekableReadBufferFactory> reader_factory_, ThreadPoolCallbackRunner<void> schedule_, size_t max_working_readers_, size_t range_step_)
: SeekableReadBuffer(nullptr, 0)
, max_working_readers(max_working_readers_)
, schedule(std::move(schedule_))

View File

@ -11,7 +11,7 @@ namespace DB
/**
* Reads from multiple ReadBuffers in parallel.
* Preserves order of readers obtained from ReadBufferFactory.
* Preserves order of readers obtained from SeekableReadBufferFactory.
*
* It consumes multiple readers and yields data from them in order as it passed.
* Each working reader save segments of data to internal queue.
@ -29,20 +29,7 @@ private:
bool nextImpl() override;
public:
class ReadBufferFactory : public WithFileSize
{
public:
~ReadBufferFactory() override = default;
// We usually call setReadUntilPosition() and seek() on the returned buffer before reading.
// So it's recommended that the returned implementation be lazy, i.e. don't start reading
// before the first call to nextImpl().
virtual std::unique_ptr<SeekableReadBuffer> getReader() = 0;
};
using ReadBufferFactoryPtr = std::unique_ptr<ReadBufferFactory>;
ParallelReadBuffer(ReadBufferFactoryPtr reader_factory_, ThreadPoolCallbackRunner<void> schedule_, size_t max_working_readers, size_t range_step_);
ParallelReadBuffer(SeekableReadBufferFactoryPtr reader_factory_, ThreadPoolCallbackRunner<void> schedule_, size_t max_working_readers, size_t range_step_);
~ParallelReadBuffer() override { finishAndWait(); }
@ -50,8 +37,8 @@ public:
size_t getFileSize();
off_t getPosition() override;
const ReadBufferFactory & getReadBufferFactory() const { return *reader_factory; }
ReadBufferFactory & getReadBufferFactory() { return *reader_factory; }
const SeekableReadBufferFactory & getReadBufferFactory() const { return *reader_factory; }
SeekableReadBufferFactory & getReadBufferFactory() { return *reader_factory; }
private:
/// Reader in progress with a list of read segments
@ -81,7 +68,7 @@ private:
ThreadPoolCallbackRunner<void> schedule;
std::unique_ptr<ReadBufferFactory> reader_factory;
std::unique_ptr<SeekableReadBufferFactory> reader_factory;
size_t range_step;
size_t next_range_start{0};

View File

@ -218,8 +218,8 @@ public:
* Behavior in weird cases is currently implementation-defined:
* - setReadUntilPosition() below current position,
* - setReadUntilPosition() above the end of the file,
* - seek() to a position above the until position (even if you setUntilPosition() to a higher
* value right after the seek!),
* - seek() to a position above the until position (even if you setReadUntilPosition() to a
* higher value right after the seek!),
*
* Typical implementations discard any current buffers and connections, even if the position is
* adjusted only a little.

View File

@ -300,7 +300,7 @@ bool ReadBufferFromS3::atEndOfRequestedRangeGuess() {
if (read_until_position)
return getPosition() >= read_until_position;
if (file_size)
return getPosition() >= file_size;
return getPosition() >= static_cast<off_t>(*file_size);
return false;
}

View File

@ -92,7 +92,7 @@ private:
};
/// Creates separate ReadBufferFromS3 for sequence of ranges of particular object
class ReadBufferS3Factory : public ParallelReadBuffer::ReadBufferFactory, public WithFileName
class ReadBufferS3Factory : public SeekableReadBufferFactory, public WithFileName
{
public:
explicit ReadBufferS3Factory(

View File

@ -534,9 +534,7 @@ namespace detail
next_callback(count());
if (read_range.end && getOffset() > read_range.end.value())
{
return false;
}
if (impl)
{
@ -699,7 +697,8 @@ namespace detail
}
}
ProfileEvents::increment(ProfileEvents::ReadBufferSeekCancelConnection);
if (!atEndOfRequestedRangeGuess())
ProfileEvents::increment(ProfileEvents::ReadBufferSeekCancelConnection);
impl.reset();
}
@ -715,30 +714,43 @@ namespace detail
until = std::max(until, 1ul);
if (read_range.end && *read_range.end + 1 == until)
return;
if (impl) {
if (!atEndOfRequestedRangeGuess())
ProfileEvents::increment(ProfileEvents::ReadBufferSeekCancelConnection);
impl.reset();
}
read_range.end = until - 1;
read_range.begin = getPosition();
resetWorkingBuffer();
if (impl) {
ProfileEvents::increment(ProfileEvents::ReadBufferSeekCancelConnection);
impl.reset();
}
}
void setReadUntilEnd() override
{
if (!read_range.end)
return;
if (impl) {
if (!atEndOfRequestedRangeGuess())
ProfileEvents::increment(ProfileEvents::ReadBufferSeekCancelConnection);
impl.reset();
}
read_range.end.reset();
read_range.begin = getPosition();
resetWorkingBuffer();
if (impl) {
ProfileEvents::increment(ProfileEvents::ReadBufferSeekCancelConnection);
impl.reset();
}
}
bool supportsRightBoundedReads() const override { return true; }
// If true, if we destroy impl now, no work was wasted. Just for metrics.
bool atEndOfRequestedRangeGuess() {
if (!impl)
return true;
if (read_range.end)
return getPosition() > static_cast<off_t>(*read_range.end);
if (file_size)
return getPosition() >= static_cast<off_t>(*file_size);
return false;
}
std::string getResponseCookie(const std::string & name, const std::string & def) const
{
for (const auto & cookie : cookies)
@ -829,7 +841,7 @@ public:
}
};
class RangedReadWriteBufferFromHTTPFactory : public ParallelReadBuffer::ReadBufferFactory, public WithFileName
class RangedReadWriteBufferFromHTTPFactory : public SeekableReadBufferFactory, public WithFileName
{
using OutStreamCallback = ReadWriteBufferFromHTTP::OutStreamCallback;

View File

@ -52,8 +52,22 @@ public:
virtual bool isIntegratedWithFilesystemCache() const { return false; }
};
// Useful for reading in parallel.
class SeekableReadBufferFactory : public WithFileSize
{
public:
~SeekableReadBufferFactory() override = default;
// We usually call setReadUntilPosition() and seek() on the returned buffer before reading.
// So it's recommended that the returned implementation be lazy, i.e. don't start reading
// before the first call to nextImpl().
virtual std::unique_ptr<SeekableReadBuffer> getReader() = 0;
};
using SeekableReadBufferPtr = std::shared_ptr<SeekableReadBuffer>;
using SeekableReadBufferFactoryPtr = std::unique_ptr<SeekableReadBufferFactory>;
/// Wraps a reference to a SeekableReadBuffer into an unique pointer to SeekableReadBuffer.
/// This function is like wrapReadBufferReference() but for SeekableReadBuffer.
std::unique_ptr<SeekableReadBuffer> wrapSeekableReadBufferReference(SeekableReadBuffer & ref);

View File

@ -5,14 +5,15 @@
namespace DB
{
IInputFormat::IInputFormat(Block header, ReadBuffer & in_)
: ISource(std::move(header)), in(&in_)
IInputFormat::IInputFormat(Block header, ReadBuffer * in_)
: ISource(std::move(header)), in(in_)
{
column_mapping = std::make_shared<ColumnMapping>();
}
void IInputFormat::resetParser()
{
chassert(in);
in->ignoreAll();
// those are protected attributes from ISource (I didn't want to propagate resetParser up there)
finished = false;
@ -23,6 +24,7 @@ void IInputFormat::resetParser()
void IInputFormat::setReadBuffer(ReadBuffer & in_)
{
chassert(in); // not supported by random-access formats
in = &in_;
}

View File

@ -18,10 +18,11 @@ class IInputFormat : public ISource
{
protected:
ReadBuffer * in [[maybe_unused]];
ReadBuffer * in [[maybe_unused]] = nullptr;
public:
IInputFormat(Block header, ReadBuffer & in_);
// ReadBuffer can be nullptr for random-access formats.
IInputFormat(Block header, ReadBuffer * in_);
/** In some usecase (hello Kafka) we need to read a lot of tiny streams in exactly the same format.
* The recreating of parser for each small stream takes too long, so we introduce a method
@ -32,7 +33,7 @@ public:
virtual void resetParser();
virtual void setReadBuffer(ReadBuffer & in_);
ReadBuffer & getReadBuffer() const { return *in; }
ReadBuffer & getReadBuffer() const { chassert(in); return *in; }
virtual const BlockMissingValues & getMissingValues() const
{

View File

@ -54,7 +54,7 @@ bool isParseError(int code)
}
IRowInputFormat::IRowInputFormat(Block header, ReadBuffer & in_, Params params_)
: IInputFormat(std::move(header), in_), serializations(getPort().getHeader().getSerializations()), params(params_)
: IInputFormat(std::move(header), &in_), serializations(getPort().getHeader().getSerializations()), params(params_)
{
}

View File

@ -24,7 +24,7 @@ namespace ErrorCodes
}
ArrowBlockInputFormat::ArrowBlockInputFormat(ReadBuffer & in_, const Block & header_, bool stream_, const FormatSettings & format_settings_)
: IInputFormat(header_, in_), stream{stream_}, format_settings(format_settings_)
: IInputFormat(header_, &in_), stream{stream_}, format_settings(format_settings_)
{
}

View File

@ -13,7 +13,6 @@
#include <arrow/io/memory.h>
#include <arrow/result.h>
#include <Core/Settings.h>
#include <Common/logger_useful.h>
#include <sys/stat.h>
@ -51,7 +50,6 @@ arrow::Status ArrowBufferedOutputStream::Write(const void * data, int64_t length
RandomAccessFileFromSeekableReadBuffer::RandomAccessFileFromSeekableReadBuffer(ReadBuffer & in_, std::optional<off_t> file_size_, bool avoid_buffering_)
: in{in_}, seekable_in{dynamic_cast<SeekableReadBuffer &>(in_)}, file_size{file_size_}, is_open{true}, avoid_buffering(avoid_buffering_)
{
LOG_INFO(&Poco::Logger::get("asdqwe"), "asdqwe creating with size {}", file_size.value_or(0));
}
arrow::Result<int64_t> RandomAccessFileFromSeekableReadBuffer::GetSize()
@ -66,7 +64,6 @@ arrow::Result<int64_t> RandomAccessFileFromSeekableReadBuffer::GetSize()
arrow::Status RandomAccessFileFromSeekableReadBuffer::Close()
{
LOG_INFO(&Poco::Logger::get("asdqwe"), "asdqwe closing");
is_open = false;
return arrow::Status::OK();
}
@ -78,7 +75,6 @@ arrow::Result<int64_t> RandomAccessFileFromSeekableReadBuffer::Tell() const
arrow::Result<int64_t> RandomAccessFileFromSeekableReadBuffer::Read(int64_t nbytes, void * out)
{
LOG_INFO(&Poco::Logger::get("asdqwe"), "asdqwe read {}", nbytes);
if (avoid_buffering)
in.setReadUntilPosition(seekable_in.getPosition() + nbytes);
return in.readBig(reinterpret_cast<char *>(out), nbytes);
@ -103,7 +99,6 @@ arrow::Future<std::shared_ptr<arrow::Buffer>> RandomAccessFileFromSeekableReadBu
arrow::Status RandomAccessFileFromSeekableReadBuffer::Seek(int64_t position)
{
LOG_INFO(&Poco::Logger::get("asdqwe"), "asdqwe seek {}", position);
if (avoid_buffering) {
// Seeking to a position above a previous setReadUntilPosition() confuses some of the
// ReadBuffer implementations.
@ -150,6 +145,59 @@ arrow::Status ArrowInputStreamFromReadBuffer::Close()
return arrow::Status();
}
RandomAccessFileFromManyReadBuffers::RandomAccessFileFromManyReadBuffers(SeekableReadBufferFactory & factory) : buf_factory(factory) {}
arrow::Result<int64_t> RandomAccessFileFromManyReadBuffers::GetSize() {
return buf_factory.getFileSize();
}
arrow::Result<int64_t> RandomAccessFileFromManyReadBuffers::ReadAt(int64_t position, int64_t nbytes, void* out) {
std::unique_lock lock(mutex);
if (free_bufs.empty())
free_bufs.push_back(buf_factory.getReader());
auto buf = std::move(free_bufs.back());
free_bufs.pop_back();
lock.unlock();
buf->seek(position, SEEK_SET);
buf->setReadUntilPosition(position + nbytes);
size_t bytes_read = buf->readBig(reinterpret_cast<char *>(out), nbytes);
// Seeking to a position above a previous setReadUntilPosition() confuses some of the
// ReadBuffer implementations. So we reset it before next seek.
buf->setReadUntilEnd();
lock.lock();
free_bufs.push_back(std::move(buf));
return static_cast<int64_t>(bytes_read);
}
arrow::Result<std::shared_ptr<arrow::Buffer>> RandomAccessFileFromManyReadBuffers::ReadAt(int64_t position, int64_t nbytes) {
ARROW_ASSIGN_OR_RAISE(auto buffer, arrow::AllocateResizableBuffer(nbytes))
ARROW_ASSIGN_OR_RAISE(int64_t bytes_read, ReadAt(position, nbytes, buffer->mutable_data()))
if (bytes_read < nbytes)
RETURN_NOT_OK(buffer->Resize(bytes_read));
return buffer;
}
arrow::Future<std::shared_ptr<arrow::Buffer>> RandomAccessFileFromManyReadBuffers::ReadAsync(const arrow::io::IOContext&, int64_t position, int64_t nbytes) {
return arrow::Future<std::shared_ptr<arrow::Buffer>>::MakeFinished(ReadAt(position, nbytes));
}
arrow::Status RandomAccessFileFromManyReadBuffers::Close() {
chassert(is_open);
is_open = false;
return arrow::Status::OK();
}
arrow::Status RandomAccessFileFromManyReadBuffers::Seek(int64_t) { return arrow::Status::NotImplemented(""); }
arrow::Result<int64_t> RandomAccessFileFromManyReadBuffers::Tell() const { return arrow::Status::NotImplemented(""); }
arrow::Result<int64_t> RandomAccessFileFromManyReadBuffers::Read(int64_t, void*) { return arrow::Status::NotImplemented(""); }
arrow::Result<std::shared_ptr<arrow::Buffer>> RandomAccessFileFromManyReadBuffers::Read(int64_t) { return arrow::Status::NotImplemented(""); }
std::shared_ptr<arrow::io::RandomAccessFile> asArrowFile(
ReadBuffer & in,
const FormatSettings & settings,
@ -160,11 +208,11 @@ std::shared_ptr<arrow::io::RandomAccessFile> asArrowFile(
{
if (auto * fd_in = dynamic_cast<ReadBufferFromFileDescriptor *>(&in))
{
struct stat stat;
auto res = ::fstat(fd_in->getFD(), &stat);
// if fd is a regular file i.e. not stdin
if (res == 0 && S_ISREG(stat.st_mode))
return std::make_shared<RandomAccessFileFromSeekableReadBuffer>(*fd_in, stat.st_size, avoid_buffering);
struct stat stat;
auto res = ::fstat(fd_in->getFD(), &stat);
// if fd is a regular file i.e. not stdin
if (res == 0 && S_ISREG(stat.st_mode))
return std::make_shared<RandomAccessFileFromSeekableReadBuffer>(*fd_in, stat.st_size, avoid_buffering);
}
else if (dynamic_cast<SeekableReadBuffer *>(&in) && isBufferWithFileSize(in))
{

View File

@ -18,6 +18,7 @@ class ReadBuffer;
class WriteBuffer;
class SeekableReadBuffer;
class SeekableReadBufferFactory;
struct FormatSettings;
class ArrowBufferedOutputStream : public arrow::io::OutputStream
@ -77,6 +78,39 @@ private:
ARROW_DISALLOW_COPY_AND_ASSIGN(RandomAccessFileFromSeekableReadBuffer);
};
// Thread-safe.
// Maintains a pool of SeekableReadBuffer-s. For each ReadAt(), takes a buffer, seeks it, and reads.
class RandomAccessFileFromManyReadBuffers : public arrow::io::RandomAccessFile
{
public:
explicit RandomAccessFileFromManyReadBuffers(SeekableReadBufferFactory & factory);
// These are thread safe.
arrow::Result<int64_t> GetSize() override;
arrow::Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override;
arrow::Result<std::shared_ptr<arrow::Buffer>> ReadAt(int64_t position, int64_t nbytes) override;
arrow::Future<std::shared_ptr<arrow::Buffer>> ReadAsync(const arrow::io::IOContext&, int64_t position,
int64_t nbytes) override;
// These are not thread safe, and arrow shouldn't call them. Return NotImplemented error.
arrow::Status Seek(int64_t) override;
arrow::Result<int64_t> Tell() const override;
arrow::Result<int64_t> Read(int64_t, void*) override;
arrow::Result<std::shared_ptr<arrow::Buffer>> Read(int64_t) override;
arrow::Status Close() override;
bool closed() const override { return !is_open; }
private:
SeekableReadBufferFactory & buf_factory;
bool is_open = true;
std::mutex mutex;
std::vector<std::unique_ptr<SeekableReadBuffer>> free_bufs;
ARROW_DISALLOW_COPY_AND_ASSIGN(RandomAccessFileFromManyReadBuffers);
};
class ArrowInputStreamFromReadBuffer : public arrow::io::InputStream
{
public:
@ -102,7 +136,7 @@ std::shared_ptr<arrow::io::RandomAccessFile> asArrowFile(
const std::string & format_name,
const std::string & magic_bytes,
// If true, we'll use ReadBuffer::setReadUntilPosition() to avoid buffering and readahead as
// much buffering as possible. For HTTP or S3 ReadBuffer, this means that each RandomAccessFile
// much as possible. For HTTP or S3 ReadBuffer, this means that each RandomAccessFile
// read call will do a new HTTP request. Used in parquet pre-buffered reading mode, which makes
// arrow do its own buffering and coalescing of reads.
// (ReadBuffer is not a good abstraction in this case, but it works.)

View File

@ -72,7 +72,7 @@ void JSONColumnsReaderBase::skipColumn()
JSONColumnsBlockInputFormatBase::JSONColumnsBlockInputFormatBase(
ReadBuffer & in_, const Block & header_, const FormatSettings & format_settings_, std::unique_ptr<JSONColumnsReaderBase> reader_)
: IInputFormat(header_, in_)
: IInputFormat(header_, &in_)
, format_settings(format_settings_)
, fields(header_.getNamesAndTypes())
, serializations(header_.getSerializations())

View File

@ -16,7 +16,7 @@ class NativeInputFormat final : public IInputFormat
{
public:
NativeInputFormat(ReadBuffer & buf, const Block & header_, const FormatSettings & settings)
: IInputFormat(header_, buf)
: IInputFormat(header_, &buf)
, reader(std::make_unique<NativeReader>(
buf,
header_,

View File

@ -22,7 +22,7 @@ namespace ErrorCodes
}
ORCBlockInputFormat::ORCBlockInputFormat(ReadBuffer & in_, Block header_, const FormatSettings & format_settings_)
: IInputFormat(std::move(header_), in_), format_settings(format_settings_), skip_stripes(format_settings.orc.skip_stripes)
: IInputFormat(std::move(header_), &in_), format_settings(format_settings_), skip_stripes(format_settings.orc.skip_stripes)
{
}

View File

@ -94,7 +94,7 @@ public:
};
explicit ParallelParsingInputFormat(Params params)
: IInputFormat(std::move(params.header), params.in)
: IInputFormat(std::move(params.header), &params.in)
, internal_parser_creator(params.internal_parser_creator)
, file_segmentation_engine(params.file_segmentation_engine)
, format_name(params.format_name)

View File

@ -11,12 +11,18 @@
#include <arrow/io/api.h>
#include <arrow/status.h>
#include <parquet/arrow/reader.h>
#include <parquet/arrow/schema.h>
#include <parquet/file_reader.h>
#include "ArrowBufferedStreams.h"
#include "ArrowColumnToCHColumn.h"
#include "ArrowFieldIndexUtil.h"
#include <DataTypes/NestedUtils.h>
namespace CurrentMetrics
{
extern const Metric ParquetDecoderThreads;
extern const Metric ParquetDecoderThreadsActive;
}
namespace DB
{
@ -34,75 +40,58 @@ namespace ErrorCodes
throw Exception::createDeprecated(_s.ToString(), ErrorCodes::BAD_ARGUMENTS); \
} while (false)
ParquetBlockInputFormat::ParquetBlockInputFormat(ReadBuffer & in_, Block header_, const FormatSettings & format_settings_)
: IInputFormat(std::move(header_), in_), format_settings(format_settings_), skip_row_groups(format_settings.parquet.skip_row_groups)
ParquetBlockInputFormat::ParquetBlockInputFormat(
ReadBuffer * buf,
SeekableReadBufferFactoryPtr buf_factory_,
const Block & header_,
const FormatSettings & format_settings_,
size_t max_decoding_threads_,
size_t min_bytes_for_seek_)
: IInputFormat(std::move(header_), buf)
, buf_factory(std::move(buf_factory_))
, format_settings(format_settings_)
, skip_row_groups(format_settings.parquet.skip_row_groups)
, max_decoding_threads(max_decoding_threads_)
, min_bytes_for_seek(min_bytes_for_seek_)
, to_deliver(ChunkToDeliver::Compare { .row_group_first = format_settings_.parquet.preserve_order })
{
if (max_decoding_threads > 1)
pool.emplace(CurrentMetrics::ParquetDecoderThreads, CurrentMetrics::ParquetDecoderThreadsActive, max_decoding_threads);
}
Chunk ParquetBlockInputFormat::generate()
{
Chunk res;
block_missing_values.clear();
ParquetBlockInputFormat::~ParquetBlockInputFormat() = default;
if (!file_reader)
prepareFileReader();
void ParquetBlockInputFormat::initializeIfNeeded() {
if (std::exchange(is_initialized, true))
return;
while (true)
{
if (!current_record_batch_reader && !prepareRowGroupReader())
return {};
if (is_stopped)
return {};
if (buf_factory)
arrow_file = std::make_shared<RandomAccessFileFromManyReadBuffers>(*buf_factory);
else
arrow_file = asArrowFile(*in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true);
auto batch = current_record_batch_reader->Next();
if (!batch.ok())
throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading Parquet data: {}", batch.status().ToString());
if (!*batch)
{
++row_group_current;
current_record_batch_reader.reset();
continue;
}
auto tmp_table = arrow::Table::FromRecordBatches({*batch});
/// If defaults_for_omitted_fields is true, calculate the default values from default expression for omitted fields.
/// Otherwise fill the missing columns with zero values of its type.
BlockMissingValues * block_missing_values_ptr = format_settings.defaults_for_omitted_fields ? &block_missing_values : nullptr;
arrow_column_to_ch_column->arrowTableToCHChunk(res, *tmp_table, (*tmp_table)->num_rows(), block_missing_values_ptr);
return res;
}
}
void ParquetBlockInputFormat::resetParser()
{
IInputFormat::resetParser();
file_reader.reset();
current_record_batch_reader.reset();
column_indices.clear();
row_group_current = 0;
block_missing_values.clear();
}
const BlockMissingValues & ParquetBlockInputFormat::getMissingValues() const
{
return block_missing_values;
}
static void getFileReaderAndSchema(
ReadBuffer & in,
std::unique_ptr<parquet::arrow::FileReader> & file_reader,
std::shared_ptr<arrow::Schema> & schema,
const FormatSettings & format_settings,
std::atomic<int> & is_stopped)
{
auto arrow_file = asArrowFile(in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true);
if (is_stopped)
return;
metadata = parquet::ReadMetaData(arrow_file);
std::shared_ptr<arrow::Schema> schema;
THROW_ARROW_NOT_OK(parquet::arrow::FromParquetSchema(metadata->schema(), &schema));
row_groups.resize(metadata->num_row_groups());
ArrowFieldIndexUtil field_util(
format_settings.parquet.case_insensitive_column_matching,
format_settings.parquet.allow_missing_columns);
column_indices = field_util.findRequiredIndices(getPort().getHeader(), *schema);
}
void ParquetBlockInputFormat::prepareRowGroupReader(size_t row_group_idx) {
auto & row_group = row_groups[row_group_idx];
parquet::ArrowReaderProperties properties;
properties.set_use_threads(false);
properties.set_batch_size(format_settings.parquet.max_block_size);
// When reading a row group, arrow will:
// 1. Before reading anything, look at all the byte ranges it'll need to read from the file
@ -116,66 +105,245 @@ static void getFileReaderAndSchema(
// So the memory usage of a "SELECT *" will be at least the compressed size of a row group
// (typically hundreds of MB).
//
// With this coalescing, we don't need any readahead on our side, hence avoid_buffering above.
// With this coalescing, we don't need any readahead on our side, hence avoid_buffering in
// asArrowFile().
//
// This adds one unnecessary copy. We should probably do coalescing and prefetch scheduling on
// our side instead.
properties.set_pre_buffer(true);
auto cache_options = arrow::io::CacheOptions::LazyDefaults();
cache_options.hole_size_limit = format_settings.parquet.min_bytes_for_seek;
cache_options.hole_size_limit = min_bytes_for_seek;
cache_options.range_size_limit = format_settings.parquet.max_bytes_to_read_at_once;
properties.set_cache_options(cache_options);
parquet::arrow::FileReaderBuilder builder;
THROW_ARROW_NOT_OK(builder.Open(std::move(arrow_file)));
THROW_ARROW_NOT_OK(
builder.Open(arrow_file, /* not to be confused with ArrowReaderProperties */ parquet::default_reader_properties(), metadata));
builder.properties(properties);
// TODO: Pass custom memory_pool() to enable memory accounting with non-jemalloc allocators.
THROW_ARROW_NOT_OK(builder.Build(&file_reader));
THROW_ARROW_NOT_OK(builder.Build(&row_group.file_reader));
THROW_ARROW_NOT_OK(file_reader->GetSchema(&schema));
}
THROW_ARROW_NOT_OK(
row_group.file_reader->GetRecordBatchReader({static_cast<int>(row_group_idx)}, column_indices, &row_group.record_batch_reader));
void ParquetBlockInputFormat::prepareFileReader()
{
std::shared_ptr<arrow::Schema> schema;
getFileReaderAndSchema(*in, file_reader, schema, format_settings, is_stopped);
if (is_stopped)
return;
row_group_total = file_reader->num_row_groups();
row_group_current = 0;
arrow_column_to_ch_column = std::make_unique<ArrowColumnToCHColumn>(
row_group.arrow_column_to_ch_column = std::make_unique<ArrowColumnToCHColumn>(
getPort().getHeader(),
"Parquet",
format_settings.parquet.import_nested,
format_settings.parquet.allow_missing_columns,
format_settings.null_as_default,
format_settings.parquet.case_insensitive_column_matching);
ArrowFieldIndexUtil field_util(
format_settings.parquet.case_insensitive_column_matching,
format_settings.parquet.allow_missing_columns);
column_indices = field_util.findRequiredIndices(getPort().getHeader(), *schema);
}
bool ParquetBlockInputFormat::prepareRowGroupReader()
void ParquetBlockInputFormat::scheduleRowGroup(size_t row_group_idx)
{
file_reader->set_batch_size(format_settings.parquet.max_block_size);
chassert(!mutex.try_lock());
chassert(!row_groups[row_group_idx].running);
while (row_group_current < row_group_total && skip_row_groups.contains(row_group_current))
++row_group_current;
if (row_group_current >= row_group_total)
row_groups[row_group_idx].running = true;
pool->scheduleOrThrowOnError(
[this, row_group_idx, thread_group = CurrentThread::getGroup()]()
{
if (thread_group)
CurrentThread::attachToGroupIfDetached(thread_group);
SCOPE_EXIT_SAFE(if (thread_group) CurrentThread::detachFromGroupIfNotDetached(););
try
{
setThreadName("ParquetDecoder");
threadFunction(row_group_idx);
}
catch (...)
{
std::lock_guard lock(mutex);
background_exception = std::current_exception();
condvar.notify_all();
}
});
}
void ParquetBlockInputFormat::threadFunction(size_t row_group_idx)
{
std::unique_lock lock(mutex);
auto & row_group = row_groups[row_group_idx];
chassert(!row_group.done);
while (row_group.chunks_decoded - row_group.chunks_delivered < max_pending_chunks_per_row_group && !is_stopped)
{
if (!decodeOneChunk(row_group_idx, lock))
break;
}
row_group.running = false;
}
bool ParquetBlockInputFormat::decodeOneChunk(size_t row_group_idx, std::unique_lock<std::mutex> & lock)
{
// TODO: Do most reading on IO threads. Separate max_download_threads from max_parsing_threads.
auto & row_group = row_groups[row_group_idx];
chassert(!row_group.done);
chassert(lock.owns_lock());
lock.unlock();
auto end_of_row_group = [&] {
lock.lock();
row_group.done = true;
row_group.arrow_column_to_ch_column.reset();
row_group.record_batch_reader.reset();
row_group.file_reader.reset();
// We may be able to schedule more work now, but can't call scheduleMoreWorkIfNeeded() right
// here because we're running on the same thread pool, so it may deadlock if thread limit is
// reached. Wake up generate() instead.
condvar.notify_all();
};
if (!row_group.record_batch_reader)
{
if (skip_row_groups.contains(static_cast<int>(row_group_idx)))
{
// Pretend that the row group is empty.
// (This happens kind of late. We could avoid scheduling the row group on a thread in
// the first place. But the skip_row_groups feature is mostly unused, so it's better to
// be a little inefficient than to add a bunch of extra mostly-dead code for this.)
end_of_row_group();
return false;
}
else
{
prepareRowGroupReader(row_group_idx);
}
}
auto batch = row_group.record_batch_reader->Next();
if (!batch.ok())
throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading Parquet data: {}", batch.status().ToString());
if (!*batch)
{
end_of_row_group();
return false;
}
// Read row groups one at a time. They're normally hundreds of MB each.
// If we ever encounter parquet files with lots of tiny row groups, we could detect this here
// and group them together to reduce number of seeks.
auto tmp_table = arrow::Table::FromRecordBatches({*batch});
auto read_status = file_reader->GetRecordBatchReader({row_group_current}, column_indices, &current_record_batch_reader);
if (!read_status.ok())
throw DB::ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading Parquet data: {}", read_status.ToString());
ChunkToDeliver res = {.chunk_idx = row_group.chunks_decoded, .row_group_idx = row_group_idx};
/// If defaults_for_omitted_fields is true, calculate the default values from default expression for omitted fields.
/// Otherwise fill the missing columns with zero values of its type.
BlockMissingValues * block_missing_values_ptr = format_settings.defaults_for_omitted_fields ? &res.block_missing_values : nullptr;
row_group.arrow_column_to_ch_column->arrowTableToCHChunk(res.chunk, *tmp_table, (*tmp_table)->num_rows(), block_missing_values_ptr);
lock.lock();
++row_group.chunks_decoded;
to_deliver.push(std::move(res));
condvar.notify_all();
return true;
}
void ParquetBlockInputFormat::scheduleMoreWorkIfNeeded(std::optional<size_t> row_group_touched)
{
while (row_groups_retired < row_groups.size())
{
auto & row_group = row_groups[row_groups_retired];
if (!row_group.done || row_group.chunks_delivered < row_group.chunks_decoded)
break;
++row_groups_retired;
}
if (pool)
{
while (row_groups_started - row_groups_retired < max_decoding_threads && row_groups_started < row_groups.size())
scheduleRowGroup(row_groups_started++);
if (row_group_touched)
{
auto & row_group = row_groups[*row_group_touched];
if (!row_group.done && !row_group.running && row_group.chunks_decoded - row_group.chunks_delivered < max_pending_chunks_per_row_group)
scheduleRowGroup(*row_group_touched);
}
}
}
Chunk ParquetBlockInputFormat::generate()
{
initializeIfNeeded();
std::unique_lock lock(mutex);
while (true)
{
if (background_exception)
{
is_stopped = true;
std::rethrow_exception(background_exception);
}
if (is_stopped)
return {};
scheduleMoreWorkIfNeeded();
if (!to_deliver.empty() && (!format_settings.parquet.preserve_order || to_deliver.top().row_group_idx == row_groups_retired))
{
ChunkToDeliver chunk = std::move(const_cast<ChunkToDeliver&>(to_deliver.top()));
to_deliver.pop();
previous_block_missing_values = chunk.block_missing_values;
auto & row_group = row_groups[chunk.row_group_idx];
chassert(chunk.chunk_idx == row_group.chunks_delivered);
++row_group.chunks_delivered;
scheduleMoreWorkIfNeeded(chunk.row_group_idx);
return std::move(chunk.chunk);
}
if (row_groups_retired == row_groups.size())
return {};
if (pool)
condvar.wait(lock);
else
decodeOneChunk(row_groups_retired, lock);
}
}
void ParquetBlockInputFormat::resetParser()
{
is_stopped = true;
if (pool)
pool->wait();
arrow_file.reset();
metadata.reset();
column_indices.clear();
row_groups.clear();
while (!to_deliver.empty())
to_deliver.pop();
row_groups_retired = 0;
previous_block_missing_values.clear();
row_groups_started = 0;
background_exception = nullptr;
is_stopped = false;
is_initialized = false;
IInputFormat::resetParser();
}
const BlockMissingValues & ParquetBlockInputFormat::getMissingValues() const
{
return previous_block_missing_values;
}
ParquetSchemaReader::ParquetSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_)
: ISchemaReader(in_), format_settings(format_settings_)
{
@ -183,10 +351,14 @@ ParquetSchemaReader::ParquetSchemaReader(ReadBuffer & in_, const FormatSettings
NamesAndTypesList ParquetSchemaReader::readSchema()
{
std::unique_ptr<parquet::arrow::FileReader> file_reader;
std::atomic<int> is_stopped{0};
auto file = asArrowFile(in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES);
auto metadata = parquet::ReadMetaData(file);
std::shared_ptr<arrow::Schema> schema;
std::atomic<int> is_stopped = 0;
getFileReaderAndSchema(in, file_reader, schema, format_settings, is_stopped);
THROW_ARROW_NOT_OK(parquet::arrow::FromParquetSchema(metadata->schema(), &schema));
auto header = ArrowColumnToCHColumn::arrowSchemaToCHHeader(
*schema, "Parquet", format_settings.parquet.skip_columns_with_unsupported_types_in_schema_inference);
if (format_settings.schema_inference_make_columns_nullable)
@ -198,12 +370,31 @@ void registerInputFormatParquet(FormatFactory & factory)
{
factory.registerInputFormat(
"Parquet",
[](ReadBuffer &buf,
const Block &sample,
const RowInputFormatParams &,
const FormatSettings & settings)
[](ReadBuffer & buf,
const Block & sample,
const RowInputFormatParams &,
const FormatSettings & settings)
{
return std::make_shared<ParquetBlockInputFormat>(buf, sample, settings);
// TODO: Propagate the last two from settings.
return std::make_shared<ParquetBlockInputFormat>(&buf, nullptr, sample, settings, 1, 4 * DBMS_DEFAULT_BUFFER_SIZE);
},
[](SeekableReadBufferFactoryPtr buf_factory,
const Block & sample,
const FormatSettings & settings,
const ReadSettings& read_settings,
bool is_remote_fs,
ThreadPoolCallbackRunner<void> /* io_schedule */,
size_t /* max_download_threads */,
size_t max_parsing_threads)
{
size_t min_bytes_for_seek = is_remote_fs ? read_settings.remote_read_min_bytes_for_seek : 8 * 1024;
return std::make_shared<ParquetBlockInputFormat>(
nullptr,
std::move(buf_factory),
sample,
settings,
max_parsing_threads,
min_bytes_for_seek);
});
factory.markFormatSupportsSubcolumns("Parquet");
factory.markFormatSupportsSubsetOfColumns("Parquet");

View File

@ -6,28 +6,33 @@
#include <Processors/Formats/ISchemaReader.h>
#include <Formats/FormatSettings.h>
namespace parquet { class FileMetaData; }
namespace parquet::arrow { class FileReader; }
namespace arrow { class Buffer; class RecordBatchReader;}
namespace arrow::io { class RandomAccessFile; }
namespace DB
{
class ArrowColumnToCHColumn;
class SeekableReadBufferFactory;
// Parquet files contain a metadata block with the following information:
// * list of columns,
// * list of "row groups",
// * for each column in each row group:
// - byte range for the data,
// - min, max, count.
// - min, max, count,
// - (note that we *don't* have a reliable estimate of the decompressed+decoded size; the
// metadata has decompressed size, but decoded size is sometimes much bigger because of
// dictionary encoding)
//
// This information could be used for:
// (1) Precise reads - only reading the byte ranges we need, instead of filling the whole
// arbitrarily-sized buffer inside ReadBuffer. We know in advance exactly what ranges we'll
// need to read.
// (2) Skipping row groups based on WHERE conditions.
// (3) Skipping parsing of individual rows based on PREWHERE.
// (3) Skipping decoding of individual pages based on PREWHERE.
// (4) Projections. I.e. for queries that only request min/max/count, we can report the
// min/max/count from metadata. This can be done per row group. I.e. for row groups that
// fully pass the WHERE conditions we'll use min/max/count from metadata, for row groups that
@ -36,14 +41,21 @@ class ArrowColumnToCHColumn;
// without reading data.
//
// For (1), we need the IInputFormat to be in control of reading, with its own implementation of
// parallel reading+parsing, instead of using ParallelParsingInputFormat, ParallelReadBuffer,
// AsynchronousReadIndirectBufferFromRemoteFS, ReadBufferFromRemoteFSGather.
// parallel reading+decoding, instead of using ParallelReadBuffer and ParallelParsingInputFormat.
// That's what MultistreamInputCreator in FormatFactory is about.
class ParquetBlockInputFormat : public IInputFormat
{
public:
ParquetBlockInputFormat(ReadBuffer & in_, Block header_, const FormatSettings & format_settings_);
ParquetBlockInputFormat(
// exactly one of these two is nullptr
ReadBuffer * buf,
std::unique_ptr<SeekableReadBufferFactory> buf_factory,
const Block & header,
const FormatSettings & format_settings,
size_t max_decoding_threads,
size_t min_bytes_for_seek);
~ParquetBlockInputFormat() override;
void resetParser() override;
@ -54,27 +66,85 @@ public:
private:
Chunk generate() override;
void prepareFileReader();
bool prepareRowGroupReader(); // false if at end
void onCancel() override
{
is_stopped = 1;
}
std::unique_ptr<parquet::arrow::FileReader> file_reader;
int row_group_total = 0;
int row_group_current = 0;
// indices of columns to read from Parquet file
std::vector<int> column_indices;
std::unique_ptr<ArrowColumnToCHColumn> arrow_column_to_ch_column;
BlockMissingValues block_missing_values;
void initializeIfNeeded();
void prepareRowGroupReader(size_t row_group_idx);
// The lock must be locked when calling and when returning.
bool decodeOneChunk(size_t row_group_idx, std::unique_lock<std::mutex> & lock);
void scheduleMoreWorkIfNeeded(std::optional<size_t> row_group_touched = std::nullopt);
void scheduleRowGroup(size_t row_group_idx);
void threadFunction(size_t row_group_idx);
struct RowGroupState {
bool running = false;
bool done = false; // all chunks were decoded
size_t chunks_decoded = 0; // added to_deliver
size_t chunks_delivered = 0; // removed from to_deliver
// These are only used by the decoding thread, so don't require locking the mutex.
std::unique_ptr<parquet::arrow::FileReader> file_reader;
std::shared_ptr<arrow::RecordBatchReader> record_batch_reader;
std::unique_ptr<ArrowColumnToCHColumn> arrow_column_to_ch_column;
};
struct ChunkToDeliver {
Chunk chunk;
BlockMissingValues block_missing_values;
size_t chunk_idx; // within row group
size_t row_group_idx;
// For priority_queue.
// In ordered mode we deliver strictly in order of increasing row group idx,
// in unordered mode we prefer to interleave chunks from different row groups.
struct Compare {
bool row_group_first = false;
bool operator()(const ChunkToDeliver & a, const ChunkToDeliver & b) const {
auto tuplificate = [this](const ChunkToDeliver & c)
{ return row_group_first ? std::tie(c.row_group_idx, c.chunk_idx)
: std::tie(c.chunk_idx, c.row_group_idx); };
return tuplificate(a) > tuplificate(b);
}
};
};
std::unique_ptr<SeekableReadBufferFactory> buf_factory;
const FormatSettings format_settings;
const std::unordered_set<int> & skip_row_groups;
// Reads a single row group.
std::shared_ptr<arrow::RecordBatchReader> current_record_batch_reader;
size_t max_decoding_threads;
size_t min_bytes_for_seek;
const size_t max_pending_chunks_per_row_group = 2;
// RandomAccessFile is thread safe, so we share it among threads.
// FileReader is not, so each thread creates its own.
std::shared_ptr<arrow::io::RandomAccessFile> arrow_file;
std::shared_ptr<parquet::FileMetaData> metadata;
// indices of columns to read from Parquet file
std::vector<int> column_indices;
std::mutex mutex;
std::condition_variable condvar; // notified after adding to to_deliver
std::vector<RowGroupState> row_groups;
std::priority_queue<ChunkToDeliver, std::vector<ChunkToDeliver>, ChunkToDeliver::Compare> to_deliver;
size_t row_groups_retired = 0;
BlockMissingValues previous_block_missing_values;
// These are only used when max_decoding_threads > 1.
size_t row_groups_started = 0;
std::optional<ThreadPool> pool;
std::exception_ptr background_exception = nullptr;
std::atomic<int> is_stopped{0};
bool is_initialized = false;
};
class ParquetSchemaReader : public ISchemaReader

View File

@ -45,7 +45,7 @@ ValuesBlockInputFormat::ValuesBlockInputFormat(
const Block & header_,
const RowInputFormatParams & params_,
const FormatSettings & format_settings_)
: IInputFormat(header_, *buf_), buf(std::move(buf_)),
: IInputFormat(header_, buf_.get()), buf(std::move(buf_)),
params(params_), format_settings(format_settings_), num_columns(header_.columns()),
parser_type_for_column(num_columns, ParserType::Streaming),
attempts_to_deduce_template(num_columns), attempts_to_deduce_template_cached(num_columns),

View File

@ -314,7 +314,7 @@ namespace
struct ReadBufferInfo {
// Exactly one of these two is nullptr.
std::unique_ptr<ReadBuffer> buf;
ParallelReadBuffer::ReadBufferFactoryPtr buf_factory;
SeekableReadBufferFactoryPtr buf_factory;
// TODO: This is currently not always used and not always assigned. Rethink.
// Something like this is required to make Parquet format work when the HTTP
@ -417,8 +417,7 @@ namespace
else
LOG_TRACE(&Poco::Logger::get("StorageURLSource"), "HTTP Range is not supported");
if (buf_info.seekable_read && res.getStatus() == Poco::Net::HTTPResponse::HTTP_PARTIAL_CONTENT
&& res.hasContentLength())
if (buf_info.seekable_read && res.hasContentLength())
{
LOG_TRACE(
&Poco::Logger::get("StorageURLSource"),