Use independent io thread pool

This commit is contained in:
liuneng 2024-09-26 16:35:05 +08:00
parent 341f93d1b8
commit 9ee1652e23
6 changed files with 40 additions and 14 deletions

View File

@ -216,6 +216,9 @@
M(ParquetDecoderThreads, "Number of threads in the ParquetBlockInputFormat thread pool.") \ M(ParquetDecoderThreads, "Number of threads in the ParquetBlockInputFormat thread pool.") \
M(ParquetDecoderThreadsActive, "Number of threads in the ParquetBlockInputFormat thread pool running a task.") \ M(ParquetDecoderThreadsActive, "Number of threads in the ParquetBlockInputFormat thread pool running a task.") \
M(ParquetDecoderThreadsScheduled, "Number of queued or active jobs in the ParquetBlockInputFormat thread pool.") \ M(ParquetDecoderThreadsScheduled, "Number of queued or active jobs in the ParquetBlockInputFormat thread pool.") \
M(ParquetDecoderIOThreads, "Number of threads in the ParquetBlockInputFormat io thread pool.") \
M(ParquetDecoderIOThreadsActive, "Number of threads in the ParquetBlockInputFormat io thread pool running a task.") \
M(ParquetDecoderIOThreadsScheduled, "Number of queued or active jobs in the ParquetBlockInputFormat io thread pool.") \
M(ParquetEncoderThreads, "Number of threads in ParquetBlockOutputFormat thread pool.") \ M(ParquetEncoderThreads, "Number of threads in ParquetBlockOutputFormat thread pool.") \
M(ParquetEncoderThreadsActive, "Number of threads in ParquetBlockOutputFormat thread pool running a task.") \ M(ParquetEncoderThreadsActive, "Number of threads in ParquetBlockOutputFormat thread pool running a task.") \
M(ParquetEncoderThreadsScheduled, "Number of queued or active jobs in ParquetBlockOutputFormat thread pool.") \ M(ParquetEncoderThreadsScheduled, "Number of queued or active jobs in ParquetBlockOutputFormat thread pool.") \

View File

@ -72,6 +72,7 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"parallel_replicas_mode", "read_tasks", "read_tasks", "This setting was introduced as a part of making parallel replicas feature Beta"}, {"parallel_replicas_mode", "read_tasks", "read_tasks", "This setting was introduced as a part of making parallel replicas feature Beta"},
{"show_create_query_identifier_quoting_rule", "when_necessary", "when_necessary", "New setting."}, {"show_create_query_identifier_quoting_rule", "when_necessary", "when_necessary", "New setting."},
{"show_create_query_identifier_quoting_style", "Backticks", "Backticks", "New setting."}, {"show_create_query_identifier_quoting_style", "Backticks", "Backticks", "New setting."},
{"input_format_parquet_enable_row_group_prefetch", false, true, "Enable row group prefetching during parquet parsing. Currently, only single-threaded parsing can prefetch."},
} }
}, },
{"24.9", {"24.9",
@ -90,7 +91,6 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"join_to_sort_maximum_table_rows", 0, 10000, "The maximum number of rows in the right table to determine whether to rerange the right table by key in left or inner join"}, {"join_to_sort_maximum_table_rows", 0, 10000, "The maximum number of rows in the right table to determine whether to rerange the right table by key in left or inner join"},
{"allow_experimental_join_right_table_sorting", false, false, "If it is set to true, and the conditions of `join_to_sort_minimum_perkey_rows` and `join_to_sort_maximum_table_rows` are met, rerange the right table by key to improve the performance in left or inner hash join"}, {"allow_experimental_join_right_table_sorting", false, false, "If it is set to true, and the conditions of `join_to_sort_minimum_perkey_rows` and `join_to_sort_maximum_table_rows` are met, rerange the right table by key to improve the performance in left or inner hash join"},
{"mongodb_throw_on_unsupported_query", false, true, "New setting."}, {"mongodb_throw_on_unsupported_query", false, true, "New setting."},
{"input_format_parquet_enable_row_group_prefetch", false, true, "Enable row group prefetching during parquet parsing. Currently, only single-threaded parsing can prefetch."},
} }
}, },
{"24.8", {"24.8",

View File

@ -186,9 +186,10 @@ arrow::Status ArrowInputStreamFromReadBuffer::Close()
return arrow::Status(); return arrow::Status();
} }
RandomAccessFileFromRandomAccessReadBuffer::RandomAccessFileFromRandomAccessReadBuffer(SeekableReadBuffer & in_, size_t file_size_) : in(in_), file_size(file_size_) RandomAccessFileFromRandomAccessReadBuffer::RandomAccessFileFromRandomAccessReadBuffer(SeekableReadBuffer & in_, size_t file_size_, std::shared_ptr<ThreadPool> io_pool_) : in(in_), file_size(file_size_), io_pool(io_pool_)
{ {
async_runner = threadPoolCallbackRunnerUnsafe<void>(getIOThreadPool().get(), "ArrowFile"); if (io_pool)
async_runner = threadPoolCallbackRunnerUnsafe<void>(*io_pool, "ArrowFile");
} }
arrow::Result<int64_t> RandomAccessFileFromRandomAccessReadBuffer::GetSize() arrow::Result<int64_t> RandomAccessFileFromRandomAccessReadBuffer::GetSize()
@ -225,7 +226,12 @@ arrow::Result<std::shared_ptr<arrow::Buffer>> RandomAccessFileFromRandomAccessRe
arrow::Future<std::shared_ptr<arrow::Buffer>> RandomAccessFileFromRandomAccessReadBuffer::ReadAsync(const arrow::io::IOContext&, int64_t position, int64_t nbytes) arrow::Future<std::shared_ptr<arrow::Buffer>> RandomAccessFileFromRandomAccessReadBuffer::ReadAsync(const arrow::io::IOContext&, int64_t position, int64_t nbytes)
{ {
auto future = arrow::Future<std::shared_ptr<arrow::Buffer>>::Make(); auto future = arrow::Future<std::shared_ptr<arrow::Buffer>>::Make();
async_runner([this, weak_future = arrow::WeakFuture(future), position, nbytes]() mutable { asyncThreadFunction(weak_future, position, nbytes); }, {0}); if (io_pool)
{
async_runner([this, future, position, nbytes]() mutable { asyncThreadFunction(future, position, nbytes); }, {});
return future.Then([=]() {return future.result();});
}
asyncThreadFunction(future, position, nbytes);
return future; return future;
} }
@ -237,10 +243,10 @@ arrow::Status RandomAccessFileFromRandomAccessReadBuffer::Close()
return arrow::Status::OK(); return arrow::Status::OK();
} }
void RandomAccessFileFromRandomAccessReadBuffer::asyncThreadFunction( void RandomAccessFileFromRandomAccessReadBuffer::asyncThreadFunction(
arrow::WeakFuture<std::shared_ptr<arrow::Buffer>> & future, int64_t position, int64_t nbytes) arrow::Future<std::shared_ptr<arrow::Buffer>> future, int64_t position, int64_t nbytes)
{ {
auto buffer = ReadAt(position, nbytes); auto buffer = ReadAt(position, nbytes);
future.get().MarkFinished(buffer); future.MarkFinished(buffer);
} }
arrow::Status RandomAccessFileFromRandomAccessReadBuffer::Seek(int64_t) { return arrow::Status::NotImplemented(""); } arrow::Status RandomAccessFileFromRandomAccessReadBuffer::Seek(int64_t) { return arrow::Status::NotImplemented(""); }
@ -320,7 +326,8 @@ std::shared_ptr<arrow::io::RandomAccessFile> asArrowFile(
std::atomic<int> & is_cancelled, std::atomic<int> & is_cancelled,
const std::string & format_name, const std::string & format_name,
const std::string & magic_bytes, const std::string & magic_bytes,
bool avoid_buffering) bool avoid_buffering,
std::shared_ptr<ThreadPool> io_pool)
{ {
bool has_file_size = isBufferWithFileSize(in); bool has_file_size = isBufferWithFileSize(in);
auto * seekable_in = dynamic_cast<SeekableReadBuffer *>(&in); auto * seekable_in = dynamic_cast<SeekableReadBuffer *>(&in);
@ -328,7 +335,7 @@ std::shared_ptr<arrow::io::RandomAccessFile> asArrowFile(
if (has_file_size && seekable_in && settings.seekable_read) if (has_file_size && seekable_in && settings.seekable_read)
{ {
if (avoid_buffering && seekable_in->supportsReadAt()) if (avoid_buffering && seekable_in->supportsReadAt())
return std::make_shared<RandomAccessFileFromRandomAccessReadBuffer>(*seekable_in, getFileSizeFromReadBuffer(in)); return std::make_shared<RandomAccessFileFromRandomAccessReadBuffer>(*seekable_in, getFileSizeFromReadBuffer(in), io_pool);
if (seekable_in->checkIfActuallySeekable()) if (seekable_in->checkIfActuallySeekable())
return std::make_shared<RandomAccessFileFromSeekableReadBuffer>(*seekable_in, std::nullopt, avoid_buffering); return std::make_shared<RandomAccessFileFromSeekableReadBuffer>(*seekable_in, std::nullopt, avoid_buffering);

View File

@ -83,7 +83,7 @@ private:
class RandomAccessFileFromRandomAccessReadBuffer : public arrow::io::RandomAccessFile class RandomAccessFileFromRandomAccessReadBuffer : public arrow::io::RandomAccessFile
{ {
public: public:
explicit RandomAccessFileFromRandomAccessReadBuffer(SeekableReadBuffer & in_, size_t file_size_); explicit RandomAccessFileFromRandomAccessReadBuffer(SeekableReadBuffer & in_, size_t file_size_, std::shared_ptr<ThreadPool> io_pool = nullptr);
// These are thread safe. // These are thread safe.
arrow::Result<int64_t> GetSize() override; arrow::Result<int64_t> GetSize() override;
@ -102,12 +102,13 @@ public:
bool closed() const override { return !is_open; } bool closed() const override { return !is_open; }
private: private:
void asyncThreadFunction(arrow::WeakFuture<std::shared_ptr<arrow::Buffer>>& future, int64_t position, int64_t nbytes); void asyncThreadFunction(arrow::Future<std::shared_ptr<arrow::Buffer>> future, int64_t position, int64_t nbytes);
SeekableReadBuffer & in; SeekableReadBuffer & in;
ThreadPoolCallbackRunnerUnsafe<void> async_runner; ThreadPoolCallbackRunnerUnsafe<void> async_runner;
size_t file_size; size_t file_size;
bool is_open = true; bool is_open = true;
std::shared_ptr<ThreadPool> io_pool;
ARROW_DISALLOW_COPY_AND_ASSIGN(RandomAccessFileFromRandomAccessReadBuffer); ARROW_DISALLOW_COPY_AND_ASSIGN(RandomAccessFileFromRandomAccessReadBuffer);
}; };
@ -162,7 +163,8 @@ std::shared_ptr<arrow::io::RandomAccessFile> asArrowFile(
// read call will do a new HTTP request. Used in parquet pre-buffered reading mode, which makes // read call will do a new HTTP request. Used in parquet pre-buffered reading mode, which makes
// arrow do its own buffering and coalescing of reads. // arrow do its own buffering and coalescing of reads.
// (ReadBuffer is not a good abstraction in this case, but it works.) // (ReadBuffer is not a good abstraction in this case, but it works.)
bool avoid_buffering = false); bool avoid_buffering = false,
std::shared_ptr<ThreadPool> io_pool = nullptr);
// Reads the whole file into a memory buffer, owned by the returned RandomAccessFile. // Reads the whole file into a memory buffer, owned by the returned RandomAccessFile.
std::shared_ptr<arrow::io::RandomAccessFile> asArrowFileLoadIntoMemory( std::shared_ptr<arrow::io::RandomAccessFile> asArrowFileLoadIntoMemory(

View File

@ -32,6 +32,10 @@ namespace CurrentMetrics
extern const Metric ParquetDecoderThreads; extern const Metric ParquetDecoderThreads;
extern const Metric ParquetDecoderThreadsActive; extern const Metric ParquetDecoderThreadsActive;
extern const Metric ParquetDecoderThreadsScheduled; extern const Metric ParquetDecoderThreadsScheduled;
extern const Metric ParquetDecoderIOThreads;
extern const Metric ParquetDecoderIOThreadsActive;
extern const Metric ParquetDecoderIOThreadsScheduled;
} }
namespace DB namespace DB
@ -434,16 +438,20 @@ ParquetBlockInputFormat::ParquetBlockInputFormat(
const Block & header_, const Block & header_,
const FormatSettings & format_settings_, const FormatSettings & format_settings_,
size_t max_decoding_threads_, size_t max_decoding_threads_,
size_t max_io_threads_,
size_t min_bytes_for_seek_) size_t min_bytes_for_seek_)
: IInputFormat(header_, &buf) : IInputFormat(header_, &buf)
, format_settings(format_settings_) , format_settings(format_settings_)
, skip_row_groups(format_settings.parquet.skip_row_groups) , skip_row_groups(format_settings.parquet.skip_row_groups)
, max_decoding_threads(max_decoding_threads_) , max_decoding_threads(max_decoding_threads_)
, max_io_threads(max_io_threads_)
, min_bytes_for_seek(min_bytes_for_seek_) , min_bytes_for_seek(min_bytes_for_seek_)
, pending_chunks(PendingChunk::Compare { .row_group_first = format_settings_.parquet.preserve_order }) , pending_chunks(PendingChunk::Compare { .row_group_first = format_settings_.parquet.preserve_order })
{ {
if (max_decoding_threads > 1) if (max_decoding_threads > 1)
pool = std::make_unique<ThreadPool>(CurrentMetrics::ParquetDecoderThreads, CurrentMetrics::ParquetDecoderThreadsActive, CurrentMetrics::ParquetDecoderThreadsScheduled, max_decoding_threads); pool = std::make_unique<ThreadPool>(CurrentMetrics::ParquetDecoderThreads, CurrentMetrics::ParquetDecoderThreadsActive, CurrentMetrics::ParquetDecoderThreadsScheduled, max_decoding_threads);
if (supportPrefetch())
io_pool = std::make_shared<ThreadPool>(CurrentMetrics::ParquetDecoderIOThreads, CurrentMetrics::ParquetDecoderIOThreadsActive, CurrentMetrics::ParquetDecoderIOThreadsScheduled, max_io_threads);
} }
ParquetBlockInputFormat::~ParquetBlockInputFormat() ParquetBlockInputFormat::~ParquetBlockInputFormat()
@ -451,6 +459,8 @@ ParquetBlockInputFormat::~ParquetBlockInputFormat()
is_stopped = true; is_stopped = true;
if (pool) if (pool)
pool->wait(); pool->wait();
if (io_pool)
io_pool->wait();
} }
void ParquetBlockInputFormat::initializeIfNeeded() void ParquetBlockInputFormat::initializeIfNeeded()
@ -461,7 +471,7 @@ void ParquetBlockInputFormat::initializeIfNeeded()
// Create arrow file adapter. // Create arrow file adapter.
// TODO: Make the adapter do prefetching on IO threads, based on the full set of ranges that // TODO: Make the adapter do prefetching on IO threads, based on the full set of ranges that
// we'll need to read (which we know in advance). Use max_download_threads for that. // we'll need to read (which we know in advance). Use max_download_threads for that.
arrow_file = asArrowFile(*in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true); arrow_file = asArrowFile(*in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true, io_pool);
if (is_stopped) if (is_stopped)
return; return;
@ -680,7 +690,7 @@ void ParquetBlockInputFormat::threadFunction(size_t row_group_batch_idx)
} }
bool ParquetBlockInputFormat::supportPrefetch() const bool ParquetBlockInputFormat::supportPrefetch() const
{ {
return max_decoding_threads == 1 && format_settings.parquet.enable_row_group_prefetch && !format_settings.parquet.use_native_reader; return max_decoding_threads == 1 && max_io_threads > 0 && format_settings.parquet.enable_row_group_prefetch && !format_settings.parquet.use_native_reader;
} }
std::shared_ptr<arrow::RecordBatchReader> ParquetBlockInputFormat::RowGroupPrefetchIterator::nextRowGroupReader() std::shared_ptr<arrow::RecordBatchReader> ParquetBlockInputFormat::RowGroupPrefetchIterator::nextRowGroupReader()
@ -953,7 +963,7 @@ void registerInputFormatParquet(FormatFactory & factory)
const FormatSettings & settings, const FormatSettings & settings,
const ReadSettings & read_settings, const ReadSettings & read_settings,
bool is_remote_fs, bool is_remote_fs,
size_t /* max_download_threads */, size_t max_download_threads,
size_t max_parsing_threads) size_t max_parsing_threads)
{ {
size_t min_bytes_for_seek = is_remote_fs ? read_settings.remote_read_min_bytes_for_seek : settings.parquet.local_read_min_bytes_for_seek; size_t min_bytes_for_seek = is_remote_fs ? read_settings.remote_read_min_bytes_for_seek : settings.parquet.local_read_min_bytes_for_seek;
@ -962,6 +972,7 @@ void registerInputFormatParquet(FormatFactory & factory)
sample, sample,
settings, settings,
max_parsing_threads, max_parsing_threads,
max_download_threads,
min_bytes_for_seek); min_bytes_for_seek);
}); });
factory.markFormatSupportsSubsetOfColumns("Parquet"); factory.markFormatSupportsSubsetOfColumns("Parquet");

View File

@ -59,6 +59,7 @@ public:
const Block & header, const Block & header,
const FormatSettings & format_settings, const FormatSettings & format_settings,
size_t max_decoding_threads, size_t max_decoding_threads,
size_t max_io_threads,
size_t min_bytes_for_seek); size_t min_bytes_for_seek);
~ParquetBlockInputFormat() override; ~ParquetBlockInputFormat() override;
@ -293,6 +294,7 @@ private:
const FormatSettings format_settings; const FormatSettings format_settings;
const std::unordered_set<int> & skip_row_groups; const std::unordered_set<int> & skip_row_groups;
size_t max_decoding_threads; size_t max_decoding_threads;
size_t max_io_threads;
size_t min_bytes_for_seek; size_t min_bytes_for_seek;
const size_t max_pending_chunks_per_row_group_batch = 2; const size_t max_pending_chunks_per_row_group_batch = 2;
@ -324,6 +326,7 @@ private:
// These are only used when max_decoding_threads > 1. // These are only used when max_decoding_threads > 1.
size_t row_group_batches_started = 0; size_t row_group_batches_started = 0;
std::unique_ptr<ThreadPool> pool; std::unique_ptr<ThreadPool> pool;
std::shared_ptr<ThreadPool> io_pool;
BlockMissingValues previous_block_missing_values; BlockMissingValues previous_block_missing_values;
size_t previous_approx_bytes_read_for_chunk = 0; size_t previous_approx_bytes_read_for_chunk = 0;