diff --git a/src/Common/filesystemHelpers.cpp b/src/Common/filesystemHelpers.cpp index 610608cd312..ecc3efae726 100644 --- a/src/Common/filesystemHelpers.cpp +++ b/src/Common/filesystemHelpers.cpp @@ -16,6 +16,7 @@ #include #include #include +#include namespace fs = std::filesystem; @@ -62,6 +63,12 @@ std::unique_ptr createTemporaryFile(const std::string & path) return std::make_unique(path); } +std::unique_ptr createTemporaryFile(const DiskPtr & disk) +{ + disk->createDirectories(disk->getPath()); + return std::make_unique(disk); +} + #if !defined(OS_LINUX) [[noreturn]] #endif diff --git a/src/Common/filesystemHelpers.h b/src/Common/filesystemHelpers.h index f96fe269eab..314652ffbdb 100644 --- a/src/Common/filesystemHelpers.h +++ b/src/Common/filesystemHelpers.h @@ -7,17 +7,18 @@ #include #include #include -#include +#include namespace fs = std::filesystem; namespace DB { -using TemporaryFile = Poco::TemporaryFile; +using TemporaryFile = TemporaryFileOnDisk; bool enoughSpaceInDirectory(const std::string & path, size_t data_size); std::unique_ptr createTemporaryFile(const std::string & path); +std::unique_ptr createTemporaryFile(const DiskPtr & disk); // Determine what block device is responsible for specified path #if !defined(OS_LINUX) diff --git a/src/Disks/TemporaryFileOnDisk.cpp b/src/Disks/TemporaryFileOnDisk.cpp index 93c3662f6ed..a0e957f7ef7 100644 --- a/src/Disks/TemporaryFileOnDisk.cpp +++ b/src/Disks/TemporaryFileOnDisk.cpp @@ -6,6 +6,11 @@ namespace DB { +TemporaryFileOnDisk::TemporaryFileOnDisk(const String & prefix_) + : tmp_file(std::make_unique(prefix_)) +{ +} + TemporaryFileOnDisk::TemporaryFileOnDisk(const DiskPtr & disk_, const String & prefix_) : disk(disk_) { diff --git a/src/Disks/TemporaryFileOnDisk.h b/src/Disks/TemporaryFileOnDisk.h index b82cb7d2254..10b863d5e37 100644 --- a/src/Disks/TemporaryFileOnDisk.h +++ b/src/Disks/TemporaryFileOnDisk.h @@ -2,6 +2,7 @@ #include #include +#include namespace DB { @@ -16,14 +17,20 @@ class TemporaryFileOnDisk { public: explicit TemporaryFileOnDisk(const DiskPtr & disk_, const String & prefix_ = "tmp"); + explicit TemporaryFileOnDisk(const String & prefix_ = "tmp"); ~TemporaryFileOnDisk(); DiskPtr getDisk() const { return disk; } const String & getPath() const { return filepath; } + const String & path() const { return filepath; } private: DiskPtr disk; String filepath; + + /// If disk is not provided, fallback to Poco::TemporaryFile + /// TODO: it's better to use DiskLocal for that case + std::unique_ptr tmp_file; }; } diff --git a/src/Interpreters/Aggregator.cpp b/src/Interpreters/Aggregator.cpp index 1d0c4364a01..be7c74b03a9 100644 --- a/src/Interpreters/Aggregator.cpp +++ b/src/Interpreters/Aggregator.cpp @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -1470,8 +1471,7 @@ bool Aggregator::executeOnBlock(Columns columns, { size_t size = current_memory_usage + params.min_free_disk_space; - std::string tmp_path = params.tmp_volume->getDisk()->getPath(); - + auto file = createTemporaryFile(params.tmp_volume->getDisk()); // enoughSpaceInDirectory() is not enough to make it right, since // another process (or another thread of aggregator) can consume all // space. @@ -1481,23 +1481,22 @@ bool Aggregator::executeOnBlock(Columns columns, // will reserve way more that actually will be used. // // Hence let's do a simple check. - if (!enoughSpaceInDirectory(tmp_path, size)) - throw Exception("Not enough space for external aggregation in " + tmp_path, ErrorCodes::NOT_ENOUGH_SPACE); + if (!enoughSpaceInDirectory(file->path(), size)) + throw Exception(ErrorCodes::NOT_ENOUGH_SPACE, "Not enough space for external aggregation in '{}'", file->path()); - writeToTemporaryFile(result, tmp_path); + writeToTemporaryFile(result, std::move(file)); } return true; } -void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, const String & tmp_path) const +void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, std::unique_ptr file) const { Stopwatch watch; size_t rows = data_variants.size(); - auto file = createTemporaryFile(tmp_path); - const std::string & path = file->path(); + std::string path = file->getPath(); WriteBufferFromFile file_buf(path); CompressedWriteBuffer compressed_buf(file_buf); NativeWriter block_out(compressed_buf, DBMS_TCP_PROTOCOL_VERSION, getHeader(false)); @@ -1565,8 +1564,8 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, co void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants) const { - String tmp_path = params.tmp_volume->getDisk()->getPath(); - return writeToTemporaryFile(data_variants, tmp_path); + auto file = createTemporaryFile(params.tmp_volume->getDisk()); + return writeToTemporaryFile(data_variants, std::move(file)); } @@ -2832,7 +2831,7 @@ bool Aggregator::mergeOnBlock(Block block, AggregatedDataVariants & result, bool { size_t size = current_memory_usage + params.min_free_disk_space; - std::string tmp_path = params.tmp_volume->getDisk()->getPath(); + auto file = createTemporaryFile(params.tmp_volume->getDisk()); // enoughSpaceInDirectory() is not enough to make it right, since // another process (or another thread of aggregator) can consume all @@ -2843,10 +2842,10 @@ bool Aggregator::mergeOnBlock(Block block, AggregatedDataVariants & result, bool // will reserve way more that actually will be used. // // Hence let's do a simple check. - if (!enoughSpaceInDirectory(tmp_path, size)) - throw Exception("Not enough space for external aggregation in " + tmp_path, ErrorCodes::NOT_ENOUGH_SPACE); + if (!enoughSpaceInDirectory(file->path(), size)) + throw Exception(ErrorCodes::NOT_ENOUGH_SPACE, "Not enough space for external aggregation in '{}'", file->path()); - writeToTemporaryFile(result, tmp_path); + writeToTemporaryFile(result, std::move(file)); } return true; diff --git a/src/Interpreters/Aggregator.h b/src/Interpreters/Aggregator.h index 3e8b25c1a8c..074072439b3 100644 --- a/src/Interpreters/Aggregator.h +++ b/src/Interpreters/Aggregator.h @@ -1058,14 +1058,14 @@ public: std::vector convertBlockToTwoLevel(const Block & block) const; /// For external aggregation. - void writeToTemporaryFile(AggregatedDataVariants & data_variants, const String & tmp_path) const; + void writeToTemporaryFile(AggregatedDataVariants & data_variants, std::unique_ptr file) const; void writeToTemporaryFile(AggregatedDataVariants & data_variants) const; bool hasTemporaryFiles() const { return !temporary_files.empty(); } struct TemporaryFiles { - std::vector> files; + std::vector> files; size_t sum_size_uncompressed = 0; size_t sum_size_compressed = 0; mutable std::mutex mutex; diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 9b87e4dbbef..76f199d8e41 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -2198,7 +2198,7 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc query_info.syntax_analyzer_result); } } - else + else if (optimize_aggregation_in_order) { if (query_info.projection) { diff --git a/src/Interpreters/SortedBlocksWriter.cpp b/src/Interpreters/SortedBlocksWriter.cpp index c2a6f513224..ae54bdd953d 100644 --- a/src/Interpreters/SortedBlocksWriter.cpp +++ b/src/Interpreters/SortedBlocksWriter.cpp @@ -15,16 +15,14 @@ namespace DB namespace { -std::unique_ptr flushToFile(const String & tmp_path, const Block & header, QueryPipelineBuilder pipeline, const String & codec) +std::unique_ptr flushToFile(const DiskPtr & disk, const Block & header, QueryPipelineBuilder pipeline, const String & codec) { - auto tmp_file = createTemporaryFile(tmp_path); - - TemporaryFileStream::write(tmp_file->path(), header, std::move(pipeline), codec); - + auto tmp_file = createTemporaryFile(disk); + TemporaryFileStream::write(tmp_file->getPath(), header, std::move(pipeline), codec); return tmp_file; } -SortedBlocksWriter::SortedFiles flushToManyFiles(const String & tmp_path, const Block & header, QueryPipelineBuilder builder, +SortedBlocksWriter::SortedFiles flushToManyFiles(const DiskPtr & disk, const Block & header, QueryPipelineBuilder builder, const String & codec, std::function callback = [](const Block &){}) { std::vector> files; @@ -42,7 +40,7 @@ SortedBlocksWriter::SortedFiles flushToManyFiles(const String & tmp_path, const QueryPipelineBuilder one_block_pipeline; Chunk chunk(block.getColumns(), block.rows()); one_block_pipeline.init(Pipe(std::make_shared(block.cloneEmpty(), std::move(chunk)))); - auto tmp_file = flushToFile(tmp_path, header, std::move(one_block_pipeline), codec); + auto tmp_file = flushToFile(disk, header, std::move(one_block_pipeline), codec); files.emplace_back(std::move(tmp_file)); } @@ -116,8 +114,6 @@ void SortedBlocksWriter::insert(Block && block) SortedBlocksWriter::TmpFilePtr SortedBlocksWriter::flush(const BlocksList & blocks) const { - const std::string path = getPath(); - Pipes pipes; pipes.reserve(blocks.size()); for (const auto & block : blocks) @@ -142,7 +138,7 @@ SortedBlocksWriter::TmpFilePtr SortedBlocksWriter::flush(const BlocksList & bloc pipeline.addTransform(std::move(transform)); } - return flushToFile(path, sample_block, std::move(pipeline), codec); + return flushToFile(volume->getDisk(), sample_block, std::move(pipeline), codec); } SortedBlocksWriter::PremergedFiles SortedBlocksWriter::premerge() @@ -197,7 +193,7 @@ SortedBlocksWriter::PremergedFiles SortedBlocksWriter::premerge() pipeline.addTransform(std::move(transform)); } - new_files.emplace_back(flushToFile(getPath(), sample_block, std::move(pipeline), codec)); + new_files.emplace_back(flushToFile(volume->getDisk(), sample_block, std::move(pipeline), codec)); } } @@ -230,7 +226,7 @@ SortedBlocksWriter::SortedFiles SortedBlocksWriter::finishMerge(std::functiongetDisk(), sample_block, std::move(pipeline), codec, callback); } Pipe SortedBlocksWriter::streamFromFile(const TmpFilePtr & file) const @@ -238,11 +234,6 @@ Pipe SortedBlocksWriter::streamFromFile(const TmpFilePtr & file) const return Pipe(std::make_shared(file->path(), materializeBlock(sample_block))); } -String SortedBlocksWriter::getPath() const -{ - return volume->getDisk()->getPath(); -} - Block SortedBlocksBuffer::exchange(Block && block) { diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index 81840f0cffb..5361b691b67 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -241,7 +241,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() } case MergeAlgorithm::Vertical : { - ctx->rows_sources_file = createTemporaryFile(ctx->tmp_disk->getPath()); + ctx->rows_sources_file = createTemporaryFile(ctx->tmp_disk); ctx->rows_sources_uncompressed_write_buf = ctx->tmp_disk->writeFile(fileName(ctx->rows_sources_file->path()), DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, global_ctx->context->getWriteSettings()); ctx->rows_sources_write_buf = std::make_unique(*ctx->rows_sources_uncompressed_write_buf);