Use TemporaryFileOnDisk instead of Poco::TemporaryFile

This commit is contained in:
vdimir 2022-08-15 16:42:50 +00:00
parent 7e871dd540
commit cd4038d511
No known key found for this signature in database
GPG Key ID: 6EE4CE2BEDC51862
9 changed files with 47 additions and 37 deletions

View File

@ -16,6 +16,7 @@
#include <IO/Operators.h>
#include <IO/WriteBufferFromString.h>
#include <Common/Exception.h>
#include <Disks/IDisk.h>
namespace fs = std::filesystem;
@ -62,6 +63,12 @@ std::unique_ptr<TemporaryFile> createTemporaryFile(const std::string & path)
return std::make_unique<TemporaryFile>(path);
}
std::unique_ptr<TemporaryFile> createTemporaryFile(const DiskPtr & disk)
{
disk->createDirectories(disk->getPath());
return std::make_unique<TemporaryFile>(disk);
}
#if !defined(OS_LINUX)
[[noreturn]]
#endif

View File

@ -7,17 +7,18 @@
#include <memory>
#include <string>
#include <sys/statvfs.h>
#include <Poco/TemporaryFile.h>
#include <Disks/TemporaryFileOnDisk.h>
namespace fs = std::filesystem;
namespace DB
{
using TemporaryFile = Poco::TemporaryFile;
using TemporaryFile = TemporaryFileOnDisk;
bool enoughSpaceInDirectory(const std::string & path, size_t data_size);
std::unique_ptr<TemporaryFile> createTemporaryFile(const std::string & path);
std::unique_ptr<TemporaryFile> createTemporaryFile(const DiskPtr & disk);
// Determine what block device is responsible for specified path
#if !defined(OS_LINUX)

View File

@ -6,6 +6,11 @@
namespace DB
{
TemporaryFileOnDisk::TemporaryFileOnDisk(const String & prefix_)
: tmp_file(std::make_unique<Poco::TemporaryFile>(prefix_))
{
}
TemporaryFileOnDisk::TemporaryFileOnDisk(const DiskPtr & disk_, const String & prefix_)
: disk(disk_)
{

View File

@ -2,6 +2,7 @@
#include <Core/Types.h>
#include <memory>
#include <Poco/TemporaryFile.h>
namespace DB
{
@ -16,14 +17,20 @@ class TemporaryFileOnDisk
{
public:
explicit TemporaryFileOnDisk(const DiskPtr & disk_, const String & prefix_ = "tmp");
explicit TemporaryFileOnDisk(const String & prefix_ = "tmp");
~TemporaryFileOnDisk();
DiskPtr getDisk() const { return disk; }
const String & getPath() const { return filepath; }
const String & path() const { return filepath; }
private:
DiskPtr disk;
String filepath;
/// If disk is not provided, fallback to Poco::TemporaryFile
/// TODO: it's better to use DiskLocal for that case
std::unique_ptr<Poco::TemporaryFile> tmp_file;
};
}

View File

@ -23,6 +23,7 @@
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include <Common/JSONBuilder.h>
#include <Common/filesystemHelpers.h>
#include <AggregateFunctions/AggregateFunctionArray.h>
#include <AggregateFunctions/AggregateFunctionState.h>
#include <IO/Operators.h>
@ -1470,8 +1471,7 @@ bool Aggregator::executeOnBlock(Columns columns,
{
size_t size = current_memory_usage + params.min_free_disk_space;
std::string tmp_path = params.tmp_volume->getDisk()->getPath();
auto file = createTemporaryFile(params.tmp_volume->getDisk());
// enoughSpaceInDirectory() is not enough to make it right, since
// another process (or another thread of aggregator) can consume all
// space.
@ -1481,23 +1481,22 @@ bool Aggregator::executeOnBlock(Columns columns,
// will reserve way more that actually will be used.
//
// Hence let's do a simple check.
if (!enoughSpaceInDirectory(tmp_path, size))
throw Exception("Not enough space for external aggregation in " + tmp_path, ErrorCodes::NOT_ENOUGH_SPACE);
if (!enoughSpaceInDirectory(file->path(), size))
throw Exception(ErrorCodes::NOT_ENOUGH_SPACE, "Not enough space for external aggregation in '{}'", file->path());
writeToTemporaryFile(result, tmp_path);
writeToTemporaryFile(result, std::move(file));
}
return true;
}
void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, const String & tmp_path) const
void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, std::unique_ptr<TemporaryFile> file) const
{
Stopwatch watch;
size_t rows = data_variants.size();
auto file = createTemporaryFile(tmp_path);
const std::string & path = file->path();
std::string path = file->getPath();
WriteBufferFromFile file_buf(path);
CompressedWriteBuffer compressed_buf(file_buf);
NativeWriter block_out(compressed_buf, DBMS_TCP_PROTOCOL_VERSION, getHeader(false));
@ -1565,8 +1564,8 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, co
void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants) const
{
String tmp_path = params.tmp_volume->getDisk()->getPath();
return writeToTemporaryFile(data_variants, tmp_path);
auto file = createTemporaryFile(params.tmp_volume->getDisk());
return writeToTemporaryFile(data_variants, std::move(file));
}
@ -2832,7 +2831,7 @@ bool Aggregator::mergeOnBlock(Block block, AggregatedDataVariants & result, bool
{
size_t size = current_memory_usage + params.min_free_disk_space;
std::string tmp_path = params.tmp_volume->getDisk()->getPath();
auto file = createTemporaryFile(params.tmp_volume->getDisk());
// enoughSpaceInDirectory() is not enough to make it right, since
// another process (or another thread of aggregator) can consume all
@ -2843,10 +2842,10 @@ bool Aggregator::mergeOnBlock(Block block, AggregatedDataVariants & result, bool
// will reserve way more that actually will be used.
//
// Hence let's do a simple check.
if (!enoughSpaceInDirectory(tmp_path, size))
throw Exception("Not enough space for external aggregation in " + tmp_path, ErrorCodes::NOT_ENOUGH_SPACE);
if (!enoughSpaceInDirectory(file->path(), size))
throw Exception(ErrorCodes::NOT_ENOUGH_SPACE, "Not enough space for external aggregation in '{}'", file->path());
writeToTemporaryFile(result, tmp_path);
writeToTemporaryFile(result, std::move(file));
}
return true;

View File

@ -1058,14 +1058,14 @@ public:
std::vector<Block> convertBlockToTwoLevel(const Block & block) const;
/// For external aggregation.
void writeToTemporaryFile(AggregatedDataVariants & data_variants, const String & tmp_path) const;
void writeToTemporaryFile(AggregatedDataVariants & data_variants, std::unique_ptr<TemporaryFile> file) const;
void writeToTemporaryFile(AggregatedDataVariants & data_variants) const;
bool hasTemporaryFiles() const { return !temporary_files.empty(); }
struct TemporaryFiles
{
std::vector<std::unique_ptr<Poco::TemporaryFile>> files;
std::vector<std::unique_ptr<TemporaryFile>> files;
size_t sum_size_uncompressed = 0;
size_t sum_size_compressed = 0;
mutable std::mutex mutex;

View File

@ -2198,7 +2198,7 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
query_info.syntax_analyzer_result);
}
}
else
else if (optimize_aggregation_in_order)
{
if (query_info.projection)
{

View File

@ -15,16 +15,14 @@ namespace DB
namespace
{
std::unique_ptr<TemporaryFile> flushToFile(const String & tmp_path, const Block & header, QueryPipelineBuilder pipeline, const String & codec)
std::unique_ptr<TemporaryFile> flushToFile(const DiskPtr & disk, const Block & header, QueryPipelineBuilder pipeline, const String & codec)
{
auto tmp_file = createTemporaryFile(tmp_path);
TemporaryFileStream::write(tmp_file->path(), header, std::move(pipeline), codec);
auto tmp_file = createTemporaryFile(disk);
TemporaryFileStream::write(tmp_file->getPath(), header, std::move(pipeline), codec);
return tmp_file;
}
SortedBlocksWriter::SortedFiles flushToManyFiles(const String & tmp_path, const Block & header, QueryPipelineBuilder builder,
SortedBlocksWriter::SortedFiles flushToManyFiles(const DiskPtr & disk, const Block & header, QueryPipelineBuilder builder,
const String & codec, std::function<void(const Block &)> callback = [](const Block &){})
{
std::vector<std::unique_ptr<TemporaryFile>> files;
@ -42,7 +40,7 @@ SortedBlocksWriter::SortedFiles flushToManyFiles(const String & tmp_path, const
QueryPipelineBuilder one_block_pipeline;
Chunk chunk(block.getColumns(), block.rows());
one_block_pipeline.init(Pipe(std::make_shared<SourceFromSingleChunk>(block.cloneEmpty(), std::move(chunk))));
auto tmp_file = flushToFile(tmp_path, header, std::move(one_block_pipeline), codec);
auto tmp_file = flushToFile(disk, header, std::move(one_block_pipeline), codec);
files.emplace_back(std::move(tmp_file));
}
@ -116,8 +114,6 @@ void SortedBlocksWriter::insert(Block && block)
SortedBlocksWriter::TmpFilePtr SortedBlocksWriter::flush(const BlocksList & blocks) const
{
const std::string path = getPath();
Pipes pipes;
pipes.reserve(blocks.size());
for (const auto & block : blocks)
@ -142,7 +138,7 @@ SortedBlocksWriter::TmpFilePtr SortedBlocksWriter::flush(const BlocksList & bloc
pipeline.addTransform(std::move(transform));
}
return flushToFile(path, sample_block, std::move(pipeline), codec);
return flushToFile(volume->getDisk(), sample_block, std::move(pipeline), codec);
}
SortedBlocksWriter::PremergedFiles SortedBlocksWriter::premerge()
@ -197,7 +193,7 @@ SortedBlocksWriter::PremergedFiles SortedBlocksWriter::premerge()
pipeline.addTransform(std::move(transform));
}
new_files.emplace_back(flushToFile(getPath(), sample_block, std::move(pipeline), codec));
new_files.emplace_back(flushToFile(volume->getDisk(), sample_block, std::move(pipeline), codec));
}
}
@ -230,7 +226,7 @@ SortedBlocksWriter::SortedFiles SortedBlocksWriter::finishMerge(std::function<vo
pipeline.addTransform(std::move(transform));
}
return flushToManyFiles(getPath(), sample_block, std::move(pipeline), codec, callback);
return flushToManyFiles(volume->getDisk(), sample_block, std::move(pipeline), codec, callback);
}
Pipe SortedBlocksWriter::streamFromFile(const TmpFilePtr & file) const
@ -238,11 +234,6 @@ Pipe SortedBlocksWriter::streamFromFile(const TmpFilePtr & file) const
return Pipe(std::make_shared<TemporaryFileLazySource>(file->path(), materializeBlock(sample_block)));
}
String SortedBlocksWriter::getPath() const
{
return volume->getDisk()->getPath();
}
Block SortedBlocksBuffer::exchange(Block && block)
{

View File

@ -241,7 +241,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
}
case MergeAlgorithm::Vertical :
{
ctx->rows_sources_file = createTemporaryFile(ctx->tmp_disk->getPath());
ctx->rows_sources_file = createTemporaryFile(ctx->tmp_disk);
ctx->rows_sources_uncompressed_write_buf = ctx->tmp_disk->writeFile(fileName(ctx->rows_sources_file->path()), DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, global_ctx->context->getWriteSettings());
ctx->rows_sources_write_buf = std::make_unique<CompressedWriteBuffer>(*ctx->rows_sources_uncompressed_write_buf);