diff --git a/dbms/src/Common/filesystemHelpers.cpp b/dbms/src/Common/filesystemHelpers.cpp new file mode 100644 index 00000000000..752195456e7 --- /dev/null +++ b/dbms/src/Common/filesystemHelpers.cpp @@ -0,0 +1,27 @@ +#include +#include +#include +#include + +namespace DB +{ + +bool checkFreeSpace(const std::string & path, size_t data_size) +{ +#if !UNBUNDLED + auto free_space = Poco::File(path).freeSpace(); + if (data_size > free_space) + return false; +#endif + return true; +} + +std::unique_ptr createTemporaryFile(const std::string & path) +{ + Poco::File(path).createDirectories(); + + /// NOTE: std::make_shared cannot use protected constructors + return std::make_unique(path); +} + +} diff --git a/dbms/src/Common/filesystemHelpers.h b/dbms/src/Common/filesystemHelpers.h new file mode 100644 index 00000000000..6fc772374ea --- /dev/null +++ b/dbms/src/Common/filesystemHelpers.h @@ -0,0 +1,16 @@ +#pragma once + +#include +#include + +#include + +namespace DB +{ + +using TemporaryFile = Poco::TemporaryFile; + +bool checkFreeSpace(const std::string & path, size_t data_size); +std::unique_ptr createTemporaryFile(const std::string & path); + +} diff --git a/dbms/src/DataStreams/AggregatingBlockInputStream.cpp b/dbms/src/DataStreams/AggregatingBlockInputStream.cpp index e8acc63932c..f288e202c80 100644 --- a/dbms/src/DataStreams/AggregatingBlockInputStream.cpp +++ b/dbms/src/DataStreams/AggregatingBlockInputStream.cpp @@ -74,9 +74,4 @@ Block AggregatingBlockInputStream::readImpl() return impl->read(); } - -AggregatingBlockInputStream::TemporaryFileStream::TemporaryFileStream(const std::string & path) - : file_in(path), compressed_in(file_in), - block_in(std::make_shared(compressed_in, ClickHouseRevision::get())) {} - } diff --git a/dbms/src/DataStreams/AggregatingBlockInputStream.h b/dbms/src/DataStreams/AggregatingBlockInputStream.h index 53469744d95..5e993949b63 100644 --- a/dbms/src/DataStreams/AggregatingBlockInputStream.h +++ b/dbms/src/DataStreams/AggregatingBlockInputStream.h @@ -4,6 +4,7 @@ #include #include #include +#include namespace DB @@ -41,15 +42,6 @@ protected: bool executed = false; - /// To read the data that was flushed into the temporary data file. - struct TemporaryFileStream - { - ReadBufferFromFile file_in; - CompressedReadBuffer compressed_in; - BlockInputStreamPtr block_in; - - TemporaryFileStream(const std::string & path); - }; std::vector> temporary_inputs; /** From here we will get the completed blocks after the aggregation. */ diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp index 377c1fee4e0..a676d75de35 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.cpp @@ -4,7 +4,6 @@ #include #include #include -#include #include #include #include @@ -79,14 +78,10 @@ Block MergeSortingBlockInputStream::readImpl() */ if (max_bytes_before_external_sort && sum_bytes_in_blocks > max_bytes_before_external_sort) { -#if !UNBUNDLED - auto free_space = Poco::File(tmp_path).freeSpace(); - if (sum_bytes_in_blocks + min_free_disk_space > free_space) + if (!checkFreeSpace(tmp_path, sum_bytes_in_blocks + min_free_disk_space)) throw Exception("Not enough space for external sort in " + tmp_path, ErrorCodes::NOT_ENOUGH_SPACE); -#endif - Poco::File(tmp_path).createDirectories(); - temporary_files.emplace_back(std::make_unique(tmp_path)); + temporary_files.emplace_back(createTemporaryFile(tmp_path)); const std::string & path = temporary_files.back()->path(); WriteBufferFromFile file_buf(path); CompressedWriteBuffer compressed_buf(file_buf); diff --git a/dbms/src/DataStreams/MergeSortingBlockInputStream.h b/dbms/src/DataStreams/MergeSortingBlockInputStream.h index 4cd9315bc3c..ffc8c471270 100644 --- a/dbms/src/DataStreams/MergeSortingBlockInputStream.h +++ b/dbms/src/DataStreams/MergeSortingBlockInputStream.h @@ -1,15 +1,16 @@ #pragma once #include -#include #include +#include #include #include #include #include +#include #include #include @@ -114,19 +115,7 @@ private: Block header_without_constants; /// Everything below is for external sorting. - std::vector> temporary_files; - - /// For reading data from temporary file. - struct TemporaryFileStream - { - ReadBufferFromFile file_in; - CompressedReadBuffer compressed_in; - BlockInputStreamPtr block_in; - - TemporaryFileStream(const std::string & path, const Block & header_) - : file_in(path), compressed_in(file_in), block_in(std::make_shared(compressed_in, header_, 0)) {} - }; - + std::vector> temporary_files; std::vector> temporary_inputs; BlockInputStreams inputs_to_merge; diff --git a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp index 6c75ec726d4..3ce3254bed0 100644 --- a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp +++ b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.cpp @@ -101,12 +101,6 @@ Block ParallelAggregatingBlockInputStream::readImpl() } -ParallelAggregatingBlockInputStream::TemporaryFileStream::TemporaryFileStream(const std::string & path) - : file_in(path), compressed_in(file_in), - block_in(std::make_shared(compressed_in, ClickHouseRevision::get())) {} - - - void ParallelAggregatingBlockInputStream::Handler::onBlock(Block & block, size_t thread_num) { parent.aggregator.executeOnBlock(block, *parent.many_data[thread_num], diff --git a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h index 0c93f5d1161..942c906b872 100644 --- a/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h +++ b/dbms/src/DataStreams/ParallelAggregatingBlockInputStream.h @@ -5,6 +5,7 @@ #include #include #include +#include namespace DB @@ -57,16 +58,6 @@ private: bool no_more_keys = false; std::atomic executed {false}; - - /// To read the data stored into the temporary data file. - struct TemporaryFileStream - { - ReadBufferFromFile file_in; - CompressedReadBuffer compressed_in; - BlockInputStreamPtr block_in; - - TemporaryFileStream(const std::string & path); - }; std::vector> temporary_inputs; Logger * log = &Logger::get("ParallelAggregatingBlockInputStream"); diff --git a/dbms/src/DataStreams/TemporaryFileStream.h b/dbms/src/DataStreams/TemporaryFileStream.h new file mode 100644 index 00000000000..1e55f2c4aae --- /dev/null +++ b/dbms/src/DataStreams/TemporaryFileStream.h @@ -0,0 +1,32 @@ +#pragma once + +#include +#include +#include +#include +#include + +namespace DB +{ + +/// To read the data that was flushed into the temporary data file. +struct TemporaryFileStream +{ + ReadBufferFromFile file_in; + CompressedReadBuffer compressed_in; + BlockInputStreamPtr block_in; + + TemporaryFileStream(const std::string & path) + : file_in(path) + , compressed_in(file_in) + , block_in(std::make_shared(compressed_in, ClickHouseRevision::get())) + {} + + TemporaryFileStream(const std::string & path, const Block & header_) + : file_in(path) + , compressed_in(file_in) + , block_in(std::make_shared(compressed_in, header_, 0)) + {} +}; + +} diff --git a/dbms/src/IO/WriteBufferFromTemporaryFile.cpp b/dbms/src/IO/WriteBufferFromTemporaryFile.cpp index c5a6bc04350..007b8a4283e 100644 --- a/dbms/src/IO/WriteBufferFromTemporaryFile.cpp +++ b/dbms/src/IO/WriteBufferFromTemporaryFile.cpp @@ -1,7 +1,6 @@ #include #include -#include #include @@ -15,17 +14,14 @@ namespace ErrorCodes } -WriteBufferFromTemporaryFile::WriteBufferFromTemporaryFile(std::unique_ptr && tmp_file_) +WriteBufferFromTemporaryFile::WriteBufferFromTemporaryFile(std::unique_ptr && tmp_file_) : WriteBufferFromFile(tmp_file_->path(), DBMS_DEFAULT_BUFFER_SIZE, O_RDWR | O_TRUNC | O_CREAT, 0600), tmp_file(std::move(tmp_file_)) {} WriteBufferFromTemporaryFile::Ptr WriteBufferFromTemporaryFile::create(const std::string & tmp_dir) { - Poco::File(tmp_dir).createDirectories(); - - /// NOTE: std::make_shared cannot use protected constructors - return Ptr{new WriteBufferFromTemporaryFile(std::make_unique(tmp_dir))}; + return Ptr{new WriteBufferFromTemporaryFile(createTemporaryFile(tmp_dir))}; } @@ -45,11 +41,11 @@ public: return std::make_shared(fd, file_name, std::move(origin->tmp_file)); } - ReadBufferFromTemporaryWriteBuffer(int fd_, const std::string & file_name_, std::unique_ptr && tmp_file_) + ReadBufferFromTemporaryWriteBuffer(int fd_, const std::string & file_name_, std::unique_ptr && tmp_file_) : ReadBufferFromFile(fd_, file_name_), tmp_file(std::move(tmp_file_)) {} - std::unique_ptr tmp_file; + std::unique_ptr tmp_file; }; diff --git a/dbms/src/IO/WriteBufferFromTemporaryFile.h b/dbms/src/IO/WriteBufferFromTemporaryFile.h index 12b5f04a576..8b724eb937c 100644 --- a/dbms/src/IO/WriteBufferFromTemporaryFile.h +++ b/dbms/src/IO/WriteBufferFromTemporaryFile.h @@ -1,7 +1,7 @@ #include #include #include -#include +#include namespace DB @@ -20,13 +20,13 @@ public: protected: - WriteBufferFromTemporaryFile(std::unique_ptr && tmp_file); + WriteBufferFromTemporaryFile(std::unique_ptr && tmp_file); std::shared_ptr getReadBufferImpl() override; protected: - std::unique_ptr tmp_file; + std::unique_ptr tmp_file; friend class ReadBufferFromTemporaryWriteBuffer; }; diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index 6b83246b83a..337b5258afa 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -646,11 +646,8 @@ bool Aggregator::executeOnBlock(Columns columns, UInt64 num_rows, AggregatedData && current_memory_usage > static_cast(params.max_bytes_before_external_group_by) && worth_convert_to_two_level) { -#if !UNBUNDLED - auto free_space = Poco::File(params.tmp_path).freeSpace(); - if (current_memory_usage + params.min_free_disk_space > free_space) + if (!checkFreeSpace(params.tmp_path, current_memory_usage + params.min_free_disk_space)) throw Exception("Not enough space for external aggregation in " + params.tmp_path, ErrorCodes::NOT_ENOUGH_SPACE); -#endif writeToTemporaryFile(result); } @@ -664,8 +661,7 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants) Stopwatch watch; size_t rows = data_variants.size(); - Poco::File(params.tmp_path).createDirectories(); - auto file = std::make_unique(params.tmp_path); + auto file = createTemporaryFile(params.tmp_path); const std::string & path = file->path(); WriteBufferFromFile file_buf(path); CompressedWriteBuffer compressed_buf(file_buf); diff --git a/dbms/src/Interpreters/Aggregator.h b/dbms/src/Interpreters/Aggregator.h index 34c9b1a6bfd..a1369e2fa44 100644 --- a/dbms/src/Interpreters/Aggregator.h +++ b/dbms/src/Interpreters/Aggregator.h @@ -4,8 +4,6 @@ #include #include -#include - #include #include @@ -18,6 +16,7 @@ #include #include #include +#include #include #include diff --git a/dbms/src/Processors/Transforms/MergeSortingTransform.cpp b/dbms/src/Processors/Transforms/MergeSortingTransform.cpp index 25338b8f1d2..e3ae7e90540 100644 --- a/dbms/src/Processors/Transforms/MergeSortingTransform.cpp +++ b/dbms/src/Processors/Transforms/MergeSortingTransform.cpp @@ -168,14 +168,10 @@ void MergeSortingTransform::consume(Chunk chunk) */ if (max_bytes_before_external_sort && sum_bytes_in_blocks > max_bytes_before_external_sort) { -#if !UNBUNDLED - auto free_space = Poco::File(tmp_path).freeSpace(); - if (sum_bytes_in_blocks + min_free_disk_space > free_space) + if (!checkFreeSpace(tmp_path, sum_bytes_in_blocks + min_free_disk_space)) throw Exception("Not enough space for external sort in " + tmp_path, ErrorCodes::NOT_ENOUGH_SPACE); -#endif - Poco::File(tmp_path).createDirectories(); - temporary_files.emplace_back(std::make_unique(tmp_path)); + temporary_files.emplace_back(createTemporaryFile(tmp_path)); const std::string & path = temporary_files.back()->path(); merge_sorter = std::make_unique(std::move(chunks), description, max_merged_block_size, limit); auto current_processor = std::make_shared(header_without_constants, log, path); diff --git a/dbms/src/Processors/Transforms/MergeSortingTransform.h b/dbms/src/Processors/Transforms/MergeSortingTransform.h index dcc9b4d5a29..ee51f29565a 100644 --- a/dbms/src/Processors/Transforms/MergeSortingTransform.h +++ b/dbms/src/Processors/Transforms/MergeSortingTransform.h @@ -1,7 +1,7 @@ #pragma once #include #include -#include +#include #include #include #include @@ -52,7 +52,7 @@ private: bool remerge_is_useful = true; /// Everything below is for external sorting. - std::vector> temporary_files; + std::vector> temporary_files; /// Merge all accumulated blocks to keep no more than limit rows. void remerge();