diff --git a/src/Common/EventFD.cpp b/src/Common/EventFD.cpp index fafcdc675d9..b5701a24407 100644 --- a/src/Common/EventFD.cpp +++ b/src/Common/EventFD.cpp @@ -1,4 +1,6 @@ +#if defined(OS_LINUX) + #include #include #include @@ -57,3 +59,5 @@ EventFD::~EventFD() } } + +#endif diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 27149144719..616f8620084 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -631,7 +631,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value) M(Bool, allow_unrestricted_reads_from_keeper, false, "Allow unrestricted (without condition on path) reads from system.zookeeper table, can be handy, but is not safe for zookeeper", 0) \ M(Bool, allow_deprecated_database_ordinary, false, "Allow to create databases with deprecated Ordinary engine", 0) \ M(Bool, allow_deprecated_syntax_for_merge_tree, false, "Allow to create *MergeTree tables with deprecated engine definition syntax", 0) \ - M(Bool, allow_asynchronous_read_from_io_pool_for_merge_tree, true, "Use background I/O pool to read from MergeTree tables. This setting may increase performance for I/O bound queries", 0) \ + M(Bool, allow_asynchronous_read_from_io_pool_for_merge_tree, false, "Use background I/O pool to read from MergeTree tables. This setting may increase performance for I/O bound queries", 0) \ \ M(Bool, force_grouping_standard_compatibility, true, "Make GROUPING function to return 1 when argument is not used as an aggregation key", 0) \ \ diff --git a/src/Processors/QueryPlan/ReadFromMergeTree.cpp b/src/Processors/QueryPlan/ReadFromMergeTree.cpp index 323ee40badf..f89b3bf7d3e 100644 --- a/src/Processors/QueryPlan/ReadFromMergeTree.cpp +++ b/src/Processors/QueryPlan/ReadFromMergeTree.cpp @@ -65,7 +65,7 @@ static MergeTreeReaderSettings getMergeTreeReaderSettings( .checksum_on_read = settings.checksum_on_read, .read_in_order = query_info.input_order_info != nullptr, .apply_deleted_mask = context->applyDeletedMask(), - .use_asynchronous_read_from_pool = settings.allow_asynchronous_read_from_io_pool_for_merge_tree, + .use_asynchronous_read_from_pool = settings.allow_asynchronous_read_from_io_pool_for_merge_tree && settings.max_streams_to_max_threads_ratio > 1, }; } diff --git a/src/Storages/MergeTree/MergeTreeSource.cpp b/src/Storages/MergeTree/MergeTreeSource.cpp index 5d4f047eb22..3057a631458 100644 --- a/src/Storages/MergeTree/MergeTreeSource.cpp +++ b/src/Storages/MergeTree/MergeTreeSource.cpp @@ -11,8 +11,10 @@ MergeTreeSource::MergeTreeSource(MergeTreeSelectAlgorithmPtr algorithm_) : ISource(algorithm_->getHeader()) , algorithm(std::move(algorithm_)) { +#if defined(OS_LINUX) if (algorithm->getSettings().use_asynchronous_read_from_pool) async_reading_state = std::make_unique(); +#endif } MergeTreeSource::~MergeTreeSource() = default; @@ -27,6 +29,7 @@ void MergeTreeSource::onCancel() algorithm->cancel(); } +#if defined(OS_LINUX) struct MergeTreeSource::AsyncReadingState { /// NotStarted -> InProgress -> IsFinished -> NotStarted ... @@ -137,10 +140,11 @@ private: ThreadPoolCallbackRunner callback_runner; std::shared_ptr control; }; - +#endif ISource::Status MergeTreeSource::prepare() { +#if defined(OS_LINUX) if (!async_reading_state) return ISource::prepare(); @@ -153,6 +157,7 @@ ISource::Status MergeTreeSource::prepare() if (async_reading_state && async_reading_state->getStage() == AsyncReadingState::Stage::InProgress) return ISource::Status::Async; +#endif return ISource::prepare(); } @@ -172,39 +177,44 @@ std::optional MergeTreeSource::reportProgress(ChunkAndProgress chunk) std::optional MergeTreeSource::tryGenerate() { - if (!async_reading_state) - return reportProgress(algorithm->read()); - - if (async_reading_state->getStage() == AsyncReadingState::Stage::IsFinished) - return reportProgress(async_reading_state->getResult()); - - chassert(async_reading_state->getStage() == AsyncReadingState::Stage::NotStarted); - - /// It is important to store control into job. - /// Otherwise, race between job and ~MergeTreeBaseSelectProcessor is possible. - auto job = [this, control = async_reading_state->start()]() mutable +#if defined(OS_LINUX) + if (async_reading_state) { - auto holder = std::move(control); + if (async_reading_state->getStage() == AsyncReadingState::Stage::IsFinished) + return reportProgress(async_reading_state->getResult()); - try + chassert(async_reading_state->getStage() == AsyncReadingState::Stage::NotStarted); + + /// It is important to store control into job. + /// Otherwise, race between job and ~MergeTreeBaseSelectProcessor is possible. + auto job = [this, control = async_reading_state->start()]() mutable { - holder->setResult(algorithm->read()); - } - catch (...) - { - holder->setException(std::current_exception()); - } - }; + auto holder = std::move(control); - async_reading_state->schedule(std::move(job)); + try + { + holder->setResult(algorithm->read()); + } + catch (...) + { + holder->setException(std::current_exception()); + } + }; - return Chunk(); + async_reading_state->schedule(std::move(job)); + + return Chunk(); + } +#endif + + return reportProgress(algorithm->read()); } - +#if defined(OS_LINUX) int MergeTreeSource::schedule() { return async_reading_state->getFD(); } +#endif } diff --git a/src/Storages/MergeTree/MergeTreeSource.h b/src/Storages/MergeTree/MergeTreeSource.h index cfc3ae53fca..bba0c0af80e 100644 --- a/src/Storages/MergeTree/MergeTreeSource.h +++ b/src/Storages/MergeTree/MergeTreeSource.h @@ -18,7 +18,10 @@ public: std::string getName() const override; Status prepare() override; + +#if defined(OS_LINUX) int schedule() override; +#endif protected: std::optional tryGenerate() override; @@ -28,8 +31,10 @@ protected: private: MergeTreeSelectAlgorithmPtr algorithm; +#if defined(OS_LINUX) struct AsyncReadingState; std::unique_ptr async_reading_state; +#endif std::optional reportProgress(ChunkAndProgress chunk); };