Move back TemporaryFile -> TemporaryFileOnDisk

This commit is contained in:
vdimir 2022-08-23 10:52:38 +00:00
parent 0349c85017
commit 7194df1184
No known key found for this signature in database
GPG Key ID: 6EE4CE2BEDC51862
14 changed files with 41 additions and 44 deletions

View File

@ -16,10 +16,17 @@
#include <IO/Operators.h>
#include <IO/WriteBufferFromString.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
#include <Disks/IDisk.h>
namespace fs = std::filesystem;
namespace ProfileEvents
{
extern const Event ExternalProcessingFilesTotal;
}
namespace DB
{
@ -35,7 +42,6 @@ namespace ErrorCodes
extern const int CANNOT_CREATE_FILE;
}
struct statvfs getStatVFS(const String & path)
{
struct statvfs fs;
@ -48,7 +54,6 @@ struct statvfs getStatVFS(const String & path)
return fs;
}
bool enoughSpaceInDirectory(const std::string & path, size_t data_size)
{
fs::path filepath(path);
@ -61,17 +66,11 @@ bool enoughSpaceInDirectory(const std::string & path, size_t data_size)
std::unique_ptr<TemporaryFile> createTemporaryFile(const std::string & path)
{
ProfileEvents::increment(ProfileEvents::ExternalProcessingFilesTotal);
fs::create_directories(path);
return std::make_unique<TemporaryFile>(path);
}
std::unique_ptr<TemporaryFile> createTemporaryFile(const DiskPtr & disk, std::unique_ptr<CurrentMetrics::Increment> metric_increment)
{
/// is is possible to use with disk other than DickLocal ?
disk->createDirectories(disk->getPath());
return std::make_unique<TemporaryFile>(disk, disk->getPath(), std::move(metric_increment));
}
#if !defined(OS_LINUX)
[[noreturn]]
#endif

View File

@ -7,18 +7,17 @@
#include <memory>
#include <string>
#include <sys/statvfs.h>
#include <Common/TemporaryFile.h>
#include <Poco/TemporaryFile.h>
namespace fs = std::filesystem;
namespace DB
{
using TemporaryFile = TemporaryFileOnDisk;
using TemporaryFile = Poco::TemporaryFile;
bool enoughSpaceInDirectory(const std::string & path, size_t data_size);
std::unique_ptr<TemporaryFile> createTemporaryFile(const std::string & path);
std::unique_ptr<TemporaryFile> createTemporaryFile(const DiskPtr & disk, std::unique_ptr<CurrentMetrics::Increment> metric_increment = nullptr);
// Determine what block device is responsible for specified path

View File

@ -1,5 +1,4 @@
#include <Common/TemporaryFile.h>
#include <Disks/IDisk.h>
#include <Disks/TemporaryFileOnDisk.h>
#include <Poco/TemporaryFile.h>
@ -11,17 +10,17 @@ namespace ProfileEvents
namespace DB
{
TemporaryFileOnDisk::TemporaryFileOnDisk(const String & prefix_)
: tmp_file(std::make_unique<Poco::TemporaryFile>(prefix_))
, filepath(tmp_file->path())
{
ProfileEvents::increment(ProfileEvents::ExternalProcessingFilesTotal);
}
TemporaryFileOnDisk::TemporaryFileOnDisk(const DiskPtr & disk_, std::unique_ptr<CurrentMetrics::Increment> increment_)
: TemporaryFileOnDisk(disk_, disk_->getPath(), std::move(increment_))
{}
TemporaryFileOnDisk::TemporaryFileOnDisk(const DiskPtr & disk_, const String & prefix_, std::unique_ptr<CurrentMetrics::Increment> increment_)
: disk(disk_)
, sub_metric_increment(std::move(increment_))
{
/// is is possible to use with disk other than DickLocal ?
disk->createDirectories(prefix_);
ProfileEvents::increment(ProfileEvents::ExternalProcessingFilesTotal);
/// Do not use default temporaty root path `/tmp/tmpXXXXXX`.

View File

@ -2,7 +2,7 @@
#include <Core/Types.h>
#include <memory>
#include <Poco/TemporaryFile.h>
#include <Disks/IDisk.h>
#include <Common/CurrentMetrics.h>
@ -13,18 +13,18 @@ namespace CurrentMetrics
namespace DB
{
class IDisk;
using DiskPtr = std::shared_ptr<IDisk>;
/// This class helps with the handling of temporary files or directories.
/// A unique name for the temporary file or directory is automatically chosen based on a specified prefix.
/// Optionally can create a directory in the constructor.
/// Create a directory in the constructor.
/// The destructor always removes the temporary file or directory with all contained files.
class TemporaryFileOnDisk
{
public:
explicit TemporaryFileOnDisk(const DiskPtr & disk_, const String & prefix_ = "tmp", std::unique_ptr<CurrentMetrics::Increment> increment_ = nullptr);
explicit TemporaryFileOnDisk(const String & prefix_);
explicit TemporaryFileOnDisk(const DiskPtr & disk_, std::unique_ptr<CurrentMetrics::Value> increment_);
explicit TemporaryFileOnDisk(const DiskPtr & disk_, const String & prefix_, std::unique_ptr<CurrentMetrics::Increment> increment_);
~TemporaryFileOnDisk();
DiskPtr getDisk() const { return disk; }
@ -34,10 +34,6 @@ public:
private:
DiskPtr disk;
/// If disk is not provided, fallback to Poco::TemporaryFile
/// TODO: it's better to use DiskLocal for that case as well
std::unique_ptr<Poco::TemporaryFile> tmp_file;
String filepath;
CurrentMetrics::Increment metric_increment{CurrentMetrics::TotalTemporaryFiles};
@ -45,4 +41,6 @@ private:
std::unique_ptr<CurrentMetrics::Increment> sub_metric_increment;
};
using TemporaryFileOnDiskHolder = std::unique_ptr<TemporaryFileOnDisk>;
}

View File

@ -30,6 +30,7 @@
#include <Interpreters/JIT/compileFunction.h>
#include <Interpreters/JIT/CompiledExpressionCache.h>
#include <Core/ProtocolDefines.h>
#include <Disks/TemporaryFileOnDisk.h>
#include <Parsers/ASTSelectQuery.h>
@ -1491,7 +1492,7 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, si
auto file = createTempFile(max_temp_file_size);
std::string path = file->getPath();
const auto & path = file->getPath();
WriteBufferFromFile file_buf(path);
CompressedWriteBuffer compressed_buf(file_buf);
NativeWriter block_out(compressed_buf, DBMS_TCP_PROTOCOL_VERSION, getHeader(false));
@ -1559,9 +1560,9 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, si
}
std::unique_ptr<TemporaryFile> Aggregator::createTempFile(size_t max_temp_file_size) const
std::unique_ptr<TemporaryFileOnDisk> Aggregator::createTempFile(size_t max_temp_file_size) const
{
auto file = createTemporaryFile(params.tmp_volume->getDisk(),
auto file = std::make_unique<TemporaryFileOnDisk>(params.tmp_volume->getDisk(),
std::make_unique<CurrentMetrics::Increment>(CurrentMetrics::TemporaryFilesForAggregation));
// enoughSpaceInDirectory() is not enough to make it right, since

View File

@ -7,6 +7,7 @@
#include <Processors/Sources/TemporaryFileLazySource.h>
#include <Formats/TemporaryFileStream.h>
#include <Disks/IVolume.h>
#include <Disks/TemporaryFileOnDisk.h>
namespace ProfileEvents
@ -29,10 +30,9 @@ namespace DB
namespace
{
std::unique_ptr<TemporaryFile> flushToFile(const DiskPtr & disk, const Block & header, QueryPipelineBuilder pipeline, const String & codec)
TemporaryFileOnDiskHolder flushToFile(const DiskPtr & disk, const Block & header, QueryPipelineBuilder pipeline, const String & codec)
{
auto tmp_file = createTemporaryFile(disk, std::make_unique<CurrentMetrics::Increment>(CurrentMetrics::TemporaryFilesForJoin));
auto tmp_file = std::make_unsigned<TemporaryFileOnDisk>(disk, std::make_unique<CurrentMetrics::Increment>(CurrentMetrics::TemporaryFilesForJoin));
auto write_stat = TemporaryFileStream::write(tmp_file->getPath(), header, std::move(pipeline), codec);
ProfileEvents::increment(ProfileEvents::ExternalProcessingCompressedBytesTotal, write_stat.compressed_bytes);
@ -48,7 +48,7 @@ std::unique_ptr<TemporaryFile> flushToFile(const DiskPtr & disk, const Block & h
SortedBlocksWriter::SortedFiles flushToManyFiles(const DiskPtr & disk, const Block & header, QueryPipelineBuilder builder,
const String & codec, std::function<void(const Block &)> callback = [](const Block &){})
{
std::vector<std::unique_ptr<TemporaryFile>> files;
std::vector<TemporaryFileOnDiskHolder> files;
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
PullingPipelineExecutor executor(pipeline);

View File

@ -215,7 +215,7 @@ void MergeSortingTransform::consume(Chunk chunk)
throw Exception("Not enough space for external sort in temporary storage", ErrorCodes::NOT_ENOUGH_SPACE);
auto files_num_counter = std::make_unique<CurrentMetrics::Increment>(CurrentMetrics::TemporaryFilesForSort);
temporary_files.emplace_back(createTemporaryFile(reservation->getDisk(), std::move(files_num_counter)));
temporary_files.emplace_back(std::make_unique<TemporaryFileOnDisk>(reservation->getDisk(), std::move(files_num_counter)));
const std::string & path = temporary_files.back()->path();
merge_sorter

View File

@ -3,6 +3,7 @@
#include <Processors/Transforms/SortingTransform.h>
#include <Core/SortDescription.h>
#include <Common/filesystemHelpers.h>
#include <Disks/TemporaryFileOnDisk.h>
#include <Common/logger_useful.h>
@ -55,7 +56,7 @@ private:
bool remerge_is_useful = true;
/// Everything below is for external sorting.
std::vector<std::unique_ptr<TemporaryFile>> temporary_files;
std::vector<TemporaryFileOnDiskHolder> temporary_files;
/// Merge all accumulated blocks to keep no more than limit rows.
void remerge();

View File

@ -1,7 +1,7 @@
#include <Storages/MergeTree/DataPartStorageOnDisk.h>
#include <Storages/MergeTree/MergeTreeDataPartChecksum.h>
#include <Disks/IVolume.h>
#include <Common/TemporaryFile.h>
#include <Disks/TemporaryFileOnDisk.h>
#include <IO/WriteBufferFromFileBase.h>
#include <IO/ReadBufferFromFileBase.h>
#include <IO/ReadHelpers.h>

View File

@ -241,7 +241,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
}
case MergeAlgorithm::Vertical :
{
ctx->rows_sources_file = createTemporaryFile(ctx->tmp_disk);
ctx->rows_sources_file = createTemporaryFile(ctx->tmp_disk->getPath());
ctx->rows_sources_uncompressed_write_buf = ctx->tmp_disk->writeFile(fileName(ctx->rows_sources_file->path()), DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Rewrite, global_ctx->context->getWriteSettings());
ctx->rows_sources_write_buf = std::make_unique<CompressedWriteBuffer>(*ctx->rows_sources_uncompressed_write_buf);

View File

@ -63,7 +63,7 @@
#include <Common/SimpleIncrement.h>
#include <Common/Stopwatch.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/TemporaryFile.h>
#include <Disks/TemporaryFileOnDisk.h>
#include <Common/escapeForFileName.h>
#include <Common/quoteString.h>
#include <Common/typeid_cast.h>

View File

@ -30,7 +30,7 @@
#include <Backups/BackupEntryFromSmallFile.h>
#include <Backups/IBackup.h>
#include <Backups/RestorerFromBackup.h>
#include <Common/TemporaryFile.h>
#include <Disks/TemporaryFileOnDisk.h>
#include <cassert>
#include <chrono>

View File

@ -30,7 +30,7 @@
#include <Backups/IBackupEntriesLazyBatch.h>
#include <Backups/RestorerFromBackup.h>
#include <Disks/IO/createReadBufferFromFileBase.h>
#include <Common/TemporaryFile.h>
#include <Disks/TemporaryFileOnDisk.h>
#include <IO/copyData.h>

View File

@ -34,7 +34,7 @@
#include <Backups/BackupEntryFromSmallFile.h>
#include <Backups/IBackup.h>
#include <Backups/RestorerFromBackup.h>
#include <Common/TemporaryFile.h>
#include <Disks/TemporaryFileOnDisk.h>
#include <base/insertAtEnd.h>