diff --git a/src/Common/CurrentMetrics.cpp b/src/Common/CurrentMetrics.cpp index 658eaedbda1..dab10c135ef 100644 --- a/src/Common/CurrentMetrics.cpp +++ b/src/Common/CurrentMetrics.cpp @@ -216,6 +216,9 @@ M(ParquetDecoderThreads, "Number of threads in the ParquetBlockInputFormat thread pool.") \ M(ParquetDecoderThreadsActive, "Number of threads in the ParquetBlockInputFormat thread pool running a task.") \ M(ParquetDecoderThreadsScheduled, "Number of queued or active jobs in the ParquetBlockInputFormat thread pool.") \ + M(ParquetDecoderIOThreads, "Number of threads in the ParquetBlockInputFormat io thread pool.") \ + M(ParquetDecoderIOThreadsActive, "Number of threads in the ParquetBlockInputFormat io thread pool running a task.") \ + M(ParquetDecoderIOThreadsScheduled, "Number of queued or active jobs in the ParquetBlockInputFormat io thread pool.") \ M(ParquetEncoderThreads, "Number of threads in ParquetBlockOutputFormat thread pool.") \ M(ParquetEncoderThreadsActive, "Number of threads in ParquetBlockOutputFormat thread pool running a task.") \ M(ParquetEncoderThreadsScheduled, "Number of queued or active jobs in ParquetBlockOutputFormat thread pool.") \ diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 3771d534b49..4374643e938 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -72,6 +72,7 @@ static std::initializer_list io_pool_) : in(in_), file_size(file_size_), io_pool(io_pool_) { - async_runner = threadPoolCallbackRunnerUnsafe(getIOThreadPool().get(), "ArrowFile"); + if (io_pool) + async_runner = threadPoolCallbackRunnerUnsafe(*io_pool, "ArrowFile"); } arrow::Result RandomAccessFileFromRandomAccessReadBuffer::GetSize() @@ -225,7 +226,12 @@ arrow::Result> RandomAccessFileFromRandomAccessRe arrow::Future> RandomAccessFileFromRandomAccessReadBuffer::ReadAsync(const arrow::io::IOContext&, int64_t position, int64_t nbytes) { auto future = arrow::Future>::Make(); - async_runner([this, weak_future = arrow::WeakFuture(future), position, nbytes]() mutable { asyncThreadFunction(weak_future, position, nbytes); }, {0}); + if (io_pool) + { + async_runner([this, future, position, nbytes]() mutable { asyncThreadFunction(future, position, nbytes); }, {}); + return future.Then([=]() {return future.result();}); + } + asyncThreadFunction(future, position, nbytes); return future; } @@ -237,10 +243,10 @@ arrow::Status RandomAccessFileFromRandomAccessReadBuffer::Close() return arrow::Status::OK(); } void RandomAccessFileFromRandomAccessReadBuffer::asyncThreadFunction( - arrow::WeakFuture> & future, int64_t position, int64_t nbytes) + arrow::Future> future, int64_t position, int64_t nbytes) { auto buffer = ReadAt(position, nbytes); - future.get().MarkFinished(buffer); + future.MarkFinished(buffer); } arrow::Status RandomAccessFileFromRandomAccessReadBuffer::Seek(int64_t) { return arrow::Status::NotImplemented(""); } @@ -320,7 +326,8 @@ std::shared_ptr asArrowFile( std::atomic & is_cancelled, const std::string & format_name, const std::string & magic_bytes, - bool avoid_buffering) + bool avoid_buffering, + std::shared_ptr io_pool) { bool has_file_size = isBufferWithFileSize(in); auto * seekable_in = dynamic_cast(&in); @@ -328,7 +335,7 @@ std::shared_ptr asArrowFile( if (has_file_size && seekable_in && settings.seekable_read) { if (avoid_buffering && seekable_in->supportsReadAt()) - return std::make_shared(*seekable_in, getFileSizeFromReadBuffer(in)); + return std::make_shared(*seekable_in, getFileSizeFromReadBuffer(in), io_pool); if (seekable_in->checkIfActuallySeekable()) return std::make_shared(*seekable_in, std::nullopt, avoid_buffering); diff --git a/src/Processors/Formats/Impl/ArrowBufferedStreams.h b/src/Processors/Formats/Impl/ArrowBufferedStreams.h index ddf9c884dd7..1911edc4128 100644 --- a/src/Processors/Formats/Impl/ArrowBufferedStreams.h +++ b/src/Processors/Formats/Impl/ArrowBufferedStreams.h @@ -83,7 +83,7 @@ private: class RandomAccessFileFromRandomAccessReadBuffer : public arrow::io::RandomAccessFile { public: - explicit RandomAccessFileFromRandomAccessReadBuffer(SeekableReadBuffer & in_, size_t file_size_); + explicit RandomAccessFileFromRandomAccessReadBuffer(SeekableReadBuffer & in_, size_t file_size_, std::shared_ptr io_pool = nullptr); // These are thread safe. arrow::Result GetSize() override; @@ -102,12 +102,13 @@ public: bool closed() const override { return !is_open; } private: - void asyncThreadFunction(arrow::WeakFuture>& future, int64_t position, int64_t nbytes); + void asyncThreadFunction(arrow::Future> future, int64_t position, int64_t nbytes); SeekableReadBuffer & in; ThreadPoolCallbackRunnerUnsafe async_runner; size_t file_size; bool is_open = true; + std::shared_ptr io_pool; ARROW_DISALLOW_COPY_AND_ASSIGN(RandomAccessFileFromRandomAccessReadBuffer); }; @@ -162,7 +163,8 @@ std::shared_ptr asArrowFile( // read call will do a new HTTP request. Used in parquet pre-buffered reading mode, which makes // arrow do its own buffering and coalescing of reads. // (ReadBuffer is not a good abstraction in this case, but it works.) - bool avoid_buffering = false); + bool avoid_buffering = false, + std::shared_ptr io_pool = nullptr); // Reads the whole file into a memory buffer, owned by the returned RandomAccessFile. std::shared_ptr asArrowFileLoadIntoMemory( diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp index 35a6141b7e7..382ddf8d02f 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp @@ -32,6 +32,10 @@ namespace CurrentMetrics extern const Metric ParquetDecoderThreads; extern const Metric ParquetDecoderThreadsActive; extern const Metric ParquetDecoderThreadsScheduled; + + extern const Metric ParquetDecoderIOThreads; + extern const Metric ParquetDecoderIOThreadsActive; + extern const Metric ParquetDecoderIOThreadsScheduled; } namespace DB @@ -434,16 +438,20 @@ ParquetBlockInputFormat::ParquetBlockInputFormat( const Block & header_, const FormatSettings & format_settings_, size_t max_decoding_threads_, + size_t max_io_threads_, size_t min_bytes_for_seek_) : IInputFormat(header_, &buf) , format_settings(format_settings_) , skip_row_groups(format_settings.parquet.skip_row_groups) , max_decoding_threads(max_decoding_threads_) + , max_io_threads(max_io_threads_) , min_bytes_for_seek(min_bytes_for_seek_) , pending_chunks(PendingChunk::Compare { .row_group_first = format_settings_.parquet.preserve_order }) { if (max_decoding_threads > 1) pool = std::make_unique(CurrentMetrics::ParquetDecoderThreads, CurrentMetrics::ParquetDecoderThreadsActive, CurrentMetrics::ParquetDecoderThreadsScheduled, max_decoding_threads); + if (supportPrefetch()) + io_pool = std::make_shared(CurrentMetrics::ParquetDecoderIOThreads, CurrentMetrics::ParquetDecoderIOThreadsActive, CurrentMetrics::ParquetDecoderIOThreadsScheduled, max_io_threads); } ParquetBlockInputFormat::~ParquetBlockInputFormat() @@ -451,6 +459,8 @@ ParquetBlockInputFormat::~ParquetBlockInputFormat() is_stopped = true; if (pool) pool->wait(); + if (io_pool) + io_pool->wait(); } void ParquetBlockInputFormat::initializeIfNeeded() @@ -461,7 +471,7 @@ void ParquetBlockInputFormat::initializeIfNeeded() // Create arrow file adapter. // TODO: Make the adapter do prefetching on IO threads, based on the full set of ranges that // we'll need to read (which we know in advance). Use max_download_threads for that. - arrow_file = asArrowFile(*in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true); + arrow_file = asArrowFile(*in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true, io_pool); if (is_stopped) return; @@ -680,7 +690,7 @@ void ParquetBlockInputFormat::threadFunction(size_t row_group_batch_idx) } bool ParquetBlockInputFormat::supportPrefetch() const { - return max_decoding_threads == 1 && format_settings.parquet.enable_row_group_prefetch && !format_settings.parquet.use_native_reader; + return max_decoding_threads == 1 && max_io_threads > 0 && format_settings.parquet.enable_row_group_prefetch && !format_settings.parquet.use_native_reader; } std::shared_ptr ParquetBlockInputFormat::RowGroupPrefetchIterator::nextRowGroupReader() @@ -953,7 +963,7 @@ void registerInputFormatParquet(FormatFactory & factory) const FormatSettings & settings, const ReadSettings & read_settings, bool is_remote_fs, - size_t /* max_download_threads */, + size_t max_download_threads, size_t max_parsing_threads) { size_t min_bytes_for_seek = is_remote_fs ? read_settings.remote_read_min_bytes_for_seek : settings.parquet.local_read_min_bytes_for_seek; @@ -962,6 +972,7 @@ void registerInputFormatParquet(FormatFactory & factory) sample, settings, max_parsing_threads, + max_download_threads, min_bytes_for_seek); }); factory.markFormatSupportsSubsetOfColumns("Parquet"); diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h index 179bc4fbdb7..b84b9452d93 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.h +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.h @@ -59,6 +59,7 @@ public: const Block & header, const FormatSettings & format_settings, size_t max_decoding_threads, + size_t max_io_threads, size_t min_bytes_for_seek); ~ParquetBlockInputFormat() override; @@ -293,6 +294,7 @@ private: const FormatSettings format_settings; const std::unordered_set & skip_row_groups; size_t max_decoding_threads; + size_t max_io_threads; size_t min_bytes_for_seek; const size_t max_pending_chunks_per_row_group_batch = 2; @@ -324,6 +326,7 @@ private: // These are only used when max_decoding_threads > 1. size_t row_group_batches_started = 0; std::unique_ptr pool; + std::shared_ptr io_pool; BlockMissingValues previous_block_missing_values; size_t previous_approx_bytes_read_for_chunk = 0;