refactoring: remove copy-paste

This commit is contained in:
chertus 2019-10-01 21:51:33 +03:00
parent 48ab9afce6
commit c46cc3ff1a
15 changed files with 96 additions and 78 deletions

View File

@ -0,0 +1,27 @@
#include <Common/filesystemHelpers.h>
#include <common/config_common.h>
#include <Poco/File.h>
#include <Poco/Path.h>
namespace DB
{
bool checkFreeSpace(const std::string & path, size_t data_size)
{
#if !UNBUNDLED
auto free_space = Poco::File(path).freeSpace();
if (data_size > free_space)
return false;
#endif
return true;
}
std::unique_ptr<TemporaryFile> createTemporaryFile(const std::string & path)
{
Poco::File(path).createDirectories();
/// NOTE: std::make_shared cannot use protected constructors
return std::make_unique<TemporaryFile>(path);
}
}

View File

@ -0,0 +1,16 @@
#pragma once
#include <string>
#include <memory>
#include <Poco/TemporaryFile.h>
namespace DB
{
using TemporaryFile = Poco::TemporaryFile;
bool checkFreeSpace(const std::string & path, size_t data_size);
std::unique_ptr<TemporaryFile> createTemporaryFile(const std::string & path);
}

View File

@ -74,9 +74,4 @@ Block AggregatingBlockInputStream::readImpl()
return impl->read();
}
AggregatingBlockInputStream::TemporaryFileStream::TemporaryFileStream(const std::string & path)
: file_in(path), compressed_in(file_in),
block_in(std::make_shared<NativeBlockInputStream>(compressed_in, ClickHouseRevision::get())) {}
}

View File

@ -4,6 +4,7 @@
#include <IO/ReadBufferFromFile.h>
#include <Compression/CompressedReadBuffer.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/TemporaryFileStream.h>
namespace DB
@ -41,15 +42,6 @@ protected:
bool executed = false;
/// To read the data that was flushed into the temporary data file.
struct TemporaryFileStream
{
ReadBufferFromFile file_in;
CompressedReadBuffer compressed_in;
BlockInputStreamPtr block_in;
TemporaryFileStream(const std::string & path);
};
std::vector<std::unique_ptr<TemporaryFileStream>> temporary_inputs;
/** From here we will get the completed blocks after the aggregation. */

View File

@ -4,7 +4,6 @@
#include <DataStreams/copyData.h>
#include <DataStreams/processConstants.h>
#include <Common/formatReadable.h>
#include <common/config_common.h>
#include <IO/WriteBufferFromFile.h>
#include <Compression/CompressedWriteBuffer.h>
#include <Interpreters/sortBlock.h>
@ -79,14 +78,10 @@ Block MergeSortingBlockInputStream::readImpl()
*/
if (max_bytes_before_external_sort && sum_bytes_in_blocks > max_bytes_before_external_sort)
{
#if !UNBUNDLED
auto free_space = Poco::File(tmp_path).freeSpace();
if (sum_bytes_in_blocks + min_free_disk_space > free_space)
if (!checkFreeSpace(tmp_path, sum_bytes_in_blocks + min_free_disk_space))
throw Exception("Not enough space for external sort in " + tmp_path, ErrorCodes::NOT_ENOUGH_SPACE);
#endif
Poco::File(tmp_path).createDirectories();
temporary_files.emplace_back(std::make_unique<Poco::TemporaryFile>(tmp_path));
temporary_files.emplace_back(createTemporaryFile(tmp_path));
const std::string & path = temporary_files.back()->path();
WriteBufferFromFile file_buf(path);
CompressedWriteBuffer compressed_buf(file_buf);

View File

@ -1,15 +1,16 @@
#pragma once
#include <queue>
#include <Poco/TemporaryFile.h>
#include <common/logger_useful.h>
#include <Common/filesystemHelpers.h>
#include <Core/SortDescription.h>
#include <Core/SortCursor.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <DataStreams/TemporaryFileStream.h>
#include <IO/ReadBufferFromFile.h>
#include <Compression/CompressedReadBuffer.h>
@ -114,19 +115,7 @@ private:
Block header_without_constants;
/// Everything below is for external sorting.
std::vector<std::unique_ptr<Poco::TemporaryFile>> temporary_files;
/// For reading data from temporary file.
struct TemporaryFileStream
{
ReadBufferFromFile file_in;
CompressedReadBuffer compressed_in;
BlockInputStreamPtr block_in;
TemporaryFileStream(const std::string & path, const Block & header_)
: file_in(path), compressed_in(file_in), block_in(std::make_shared<NativeBlockInputStream>(compressed_in, header_, 0)) {}
};
std::vector<std::unique_ptr<TemporaryFile>> temporary_files;
std::vector<std::unique_ptr<TemporaryFileStream>> temporary_inputs;
BlockInputStreams inputs_to_merge;

View File

@ -101,12 +101,6 @@ Block ParallelAggregatingBlockInputStream::readImpl()
}
ParallelAggregatingBlockInputStream::TemporaryFileStream::TemporaryFileStream(const std::string & path)
: file_in(path), compressed_in(file_in),
block_in(std::make_shared<NativeBlockInputStream>(compressed_in, ClickHouseRevision::get())) {}
void ParallelAggregatingBlockInputStream::Handler::onBlock(Block & block, size_t thread_num)
{
parent.aggregator.executeOnBlock(block, *parent.many_data[thread_num],

View File

@ -5,6 +5,7 @@
#include <Compression/CompressedReadBuffer.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/ParallelInputsProcessor.h>
#include <DataStreams/TemporaryFileStream.h>
namespace DB
@ -57,16 +58,6 @@ private:
bool no_more_keys = false;
std::atomic<bool> executed {false};
/// To read the data stored into the temporary data file.
struct TemporaryFileStream
{
ReadBufferFromFile file_in;
CompressedReadBuffer compressed_in;
BlockInputStreamPtr block_in;
TemporaryFileStream(const std::string & path);
};
std::vector<std::unique_ptr<TemporaryFileStream>> temporary_inputs;
Logger * log = &Logger::get("ParallelAggregatingBlockInputStream");

View File

@ -0,0 +1,32 @@
#pragma once
#include <Common/ClickHouseRevision.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <Compression/CompressedReadBuffer.h>
#include <IO/ReadBufferFromFile.h>
namespace DB
{
/// To read the data that was flushed into the temporary data file.
struct TemporaryFileStream
{
ReadBufferFromFile file_in;
CompressedReadBuffer compressed_in;
BlockInputStreamPtr block_in;
TemporaryFileStream(const std::string & path)
: file_in(path)
, compressed_in(file_in)
, block_in(std::make_shared<NativeBlockInputStream>(compressed_in, ClickHouseRevision::get()))
{}
TemporaryFileStream(const std::string & path, const Block & header_)
: file_in(path)
, compressed_in(file_in)
, block_in(std::make_shared<NativeBlockInputStream>(compressed_in, header_, 0))
{}
};
}

View File

@ -1,7 +1,6 @@
#include <IO/WriteBufferFromTemporaryFile.h>
#include <IO/ReadBufferFromFile.h>
#include <Poco/Path.h>
#include <fcntl.h>
@ -15,17 +14,14 @@ namespace ErrorCodes
}
WriteBufferFromTemporaryFile::WriteBufferFromTemporaryFile(std::unique_ptr<Poco::TemporaryFile> && tmp_file_)
WriteBufferFromTemporaryFile::WriteBufferFromTemporaryFile(std::unique_ptr<TemporaryFile> && tmp_file_)
: WriteBufferFromFile(tmp_file_->path(), DBMS_DEFAULT_BUFFER_SIZE, O_RDWR | O_TRUNC | O_CREAT, 0600), tmp_file(std::move(tmp_file_))
{}
WriteBufferFromTemporaryFile::Ptr WriteBufferFromTemporaryFile::create(const std::string & tmp_dir)
{
Poco::File(tmp_dir).createDirectories();
/// NOTE: std::make_shared cannot use protected constructors
return Ptr{new WriteBufferFromTemporaryFile(std::make_unique<Poco::TemporaryFile>(tmp_dir))};
return Ptr{new WriteBufferFromTemporaryFile(createTemporaryFile(tmp_dir))};
}
@ -45,11 +41,11 @@ public:
return std::make_shared<ReadBufferFromTemporaryWriteBuffer>(fd, file_name, std::move(origin->tmp_file));
}
ReadBufferFromTemporaryWriteBuffer(int fd_, const std::string & file_name_, std::unique_ptr<Poco::TemporaryFile> && tmp_file_)
ReadBufferFromTemporaryWriteBuffer(int fd_, const std::string & file_name_, std::unique_ptr<TemporaryFile> && tmp_file_)
: ReadBufferFromFile(fd_, file_name_), tmp_file(std::move(tmp_file_))
{}
std::unique_ptr<Poco::TemporaryFile> tmp_file;
std::unique_ptr<TemporaryFile> tmp_file;
};

View File

@ -1,7 +1,7 @@
#include <IO/WriteBuffer.h>
#include <IO/IReadableWriteBuffer.h>
#include <IO/WriteBufferFromFile.h>
#include <Poco/TemporaryFile.h>
#include <Common/filesystemHelpers.h>
namespace DB
@ -20,13 +20,13 @@ public:
protected:
WriteBufferFromTemporaryFile(std::unique_ptr<Poco::TemporaryFile> && tmp_file);
WriteBufferFromTemporaryFile(std::unique_ptr<TemporaryFile> && tmp_file);
std::shared_ptr<ReadBuffer> getReadBufferImpl() override;
protected:
std::unique_ptr<Poco::TemporaryFile> tmp_file;
std::unique_ptr<TemporaryFile> tmp_file;
friend class ReadBufferFromTemporaryWriteBuffer;
};

View File

@ -646,11 +646,8 @@ bool Aggregator::executeOnBlock(Columns columns, UInt64 num_rows, AggregatedData
&& current_memory_usage > static_cast<Int64>(params.max_bytes_before_external_group_by)
&& worth_convert_to_two_level)
{
#if !UNBUNDLED
auto free_space = Poco::File(params.tmp_path).freeSpace();
if (current_memory_usage + params.min_free_disk_space > free_space)
if (!checkFreeSpace(params.tmp_path, current_memory_usage + params.min_free_disk_space))
throw Exception("Not enough space for external aggregation in " + params.tmp_path, ErrorCodes::NOT_ENOUGH_SPACE);
#endif
writeToTemporaryFile(result);
}
@ -664,8 +661,7 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants)
Stopwatch watch;
size_t rows = data_variants.size();
Poco::File(params.tmp_path).createDirectories();
auto file = std::make_unique<Poco::TemporaryFile>(params.tmp_path);
auto file = createTemporaryFile(params.tmp_path);
const std::string & path = file->path();
WriteBufferFromFile file_buf(path);
CompressedWriteBuffer compressed_buf(file_buf);

View File

@ -4,8 +4,6 @@
#include <memory>
#include <functional>
#include <Poco/TemporaryFile.h>
#include <common/logger_useful.h>
#include <common/StringRef.h>
@ -18,6 +16,7 @@
#include <Common/LRUCache.h>
#include <Common/ColumnsHashing.h>
#include <Common/assert_cast.h>
#include <Common/filesystemHelpers.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <DataStreams/SizeLimits.h>

View File

@ -168,14 +168,10 @@ void MergeSortingTransform::consume(Chunk chunk)
*/
if (max_bytes_before_external_sort && sum_bytes_in_blocks > max_bytes_before_external_sort)
{
#if !UNBUNDLED
auto free_space = Poco::File(tmp_path).freeSpace();
if (sum_bytes_in_blocks + min_free_disk_space > free_space)
if (!checkFreeSpace(tmp_path, sum_bytes_in_blocks + min_free_disk_space))
throw Exception("Not enough space for external sort in " + tmp_path, ErrorCodes::NOT_ENOUGH_SPACE);
#endif
Poco::File(tmp_path).createDirectories();
temporary_files.emplace_back(std::make_unique<Poco::TemporaryFile>(tmp_path));
temporary_files.emplace_back(createTemporaryFile(tmp_path));
const std::string & path = temporary_files.back()->path();
merge_sorter = std::make_unique<MergeSorter>(std::move(chunks), description, max_merged_block_size, limit);
auto current_processor = std::make_shared<BufferingToFileTransform>(header_without_constants, log, path);

View File

@ -1,7 +1,7 @@
#pragma once
#include <Processors/Transforms/SortingTransform.h>
#include <Core/SortDescription.h>
#include <Poco/TemporaryFile.h>
#include <Common/filesystemHelpers.h>
#include <IO/ReadBufferFromFile.h>
#include <Compression/CompressedReadBuffer.h>
#include <DataStreams/IBlockInputStream.h>
@ -52,7 +52,7 @@ private:
bool remerge_is_useful = true;
/// Everything below is for external sorting.
std::vector<std::unique_ptr<Poco::TemporaryFile>> temporary_files;
std::vector<std::unique_ptr<TemporaryFile>> temporary_files;
/// Merge all accumulated blocks to keep no more than limit rows.
void remerge();