Followup for TemporaryDataOnDisk

This commit is contained in:
vdimir 2022-10-05 16:35:10 +00:00 committed by Vladimir C
parent 52f57cd1ee
commit 0178307c27
13 changed files with 138 additions and 52 deletions

View File

@ -1502,6 +1502,21 @@ If not set, [tmp_path](#tmp-path) is used, otherwise it is ignored.
- Policy should have exactly one volume with local disks.
:::
## max_temporary_data_on_disk_size {#max_temporary_data_on_disk_size}
Limit the amount of disk space consumed by temporary files in `tmp_path` for the server.
Queries that exceed this limit will fail with an exception.
Default value: `0`.
**See also**
- [max_temporary_data_on_disk_size_for_user](../../operations/settings/query-complexity.md#settings_max_temporary_data_on_disk_size_for_user)
- [max_temporary_data_on_disk_size_for_query](../../operations/settings/query-complexity.md#settings_max_temporary_data_on_disk_size_for_query)
- [tmp_path](#tmp-path)
- [tmp_policy](#tmp-policy)
- [max_server_memory_usage](#max_server_memory_usage)
## uncompressed_cache_size {#server-settings-uncompressed_cache_size}
Cache size (in bytes) for uncompressed data used by table engines from the [MergeTree](../../engines/table-engines/mergetree-family/mergetree.md).

View File

@ -313,4 +313,19 @@ When inserting data, ClickHouse calculates the number of partitions in the inser
> “Too many partitions for single INSERT block (more than” + toString(max_parts) + “). The limit is controlled by max_partitions_per_insert_block setting. A large number of partitions is a common misconception. It will lead to severe negative performance impact, including slow server startup, slow INSERT queries and slow SELECT queries. Recommended total number of partitions for a table is under 1000..10000. Please note, that partitioning is not intended to speed up SELECT queries (ORDER BY key is sufficient to make range queries fast). Partitions are intended for data manipulation (DROP PARTITION, etc).”
## max_temporary_data_on_disk_size_for_user {#settings_max_temporary_data_on_disk_size_for_user}
The maximum amount of data consumed by temporary files on disk in bytes for all concurrently running user queries.
Zero means unlimited.
Default value: 0.
## max_temporary_data_on_disk_size_for_query {#settings_max_temporary_data_on_disk_size_for_query}
The maximum amount of data consumed by temporary files on disk in bytes for all concurrently running queries.
Zero means unlimited.
Default value: 0.
[Original article](https://clickhouse.com/docs/en/operations/settings/query_complexity/) <!--hide-->

View File

@ -971,10 +971,10 @@ int Server::main(const std::vector<std::string> & /*args*/)
/// Storage with temporary data for processing of heavy queries.
{
std::string tmp_path = config().getString("tmp_path", path / "tmp/");
std::string tmp_policy = config().getString("tmp_policy", "");
size_t tmp_max_size = config().getUInt64("tmp_max_size", 0);
const VolumePtr & volume = global_context->setTemporaryStorage(tmp_path, tmp_policy, tmp_max_size);
std::string temporary_path = config().getString("tmp_path", path / "tmp/");
std::string temporary_policy = config().getString("tmp_policy", "");
size_t max_size = config().getUInt64("max_temporary_data_on_disk_size", 0);
const VolumePtr & volume = global_context->setTemporaryStorage(temporary_path, temporary_policy, max_size);
for (const DiskPtr & disk : volume->getDisks())
setupTmpPath(log, disk->getPath());
}

View File

@ -33,6 +33,7 @@
M(TemporaryFilesForSort, "Number of temporary files created for external sorting") \
M(TemporaryFilesForAggregation, "Number of temporary files created for external aggregation") \
M(TemporaryFilesForJoin, "Number of temporary files created for JOIN") \
M(TemporaryFilesUnknown, "Number of temporary files created without known purpose") \
M(Read, "Number of read (read, pread, io_getevents, etc.) syscalls in fly") \
M(Write, "Number of write (write, pwrite, io_getevents, etc.) syscalls in fly") \
M(NetworkReceive, "Number of threads receiving data from network. Only ClickHouse-related network interaction is included, not by 3rd party libraries.") \

View File

@ -1,17 +1,31 @@
#include <Disks/TemporaryFileOnDisk.h>
#include <Poco/TemporaryFile.h>
#include <Common/CurrentMetrics.h>
#include <Common/logger_useful.h>
#include <filesystem>
namespace ProfileEvents
{
extern const Event ExternalProcessingFilesTotal;
}
namespace CurrentMetrics
{
extern const Metric TotalTemporaryFiles;
}
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
TemporaryFileOnDisk::TemporaryFileOnDisk(const DiskPtr & disk_)
: TemporaryFileOnDisk(disk_, disk_->getPath())
: TemporaryFileOnDisk(disk_, "")
{}
TemporaryFileOnDisk::TemporaryFileOnDisk(const DiskPtr & disk_, CurrentMetrics::Value metric_scope)
@ -20,33 +34,54 @@ TemporaryFileOnDisk::TemporaryFileOnDisk(const DiskPtr & disk_, CurrentMetrics::
sub_metric_increment.emplace(metric_scope);
}
TemporaryFileOnDisk::TemporaryFileOnDisk(const DiskPtr & disk_, const String & prefix_)
TemporaryFileOnDisk::TemporaryFileOnDisk(const DiskPtr & disk_, const String & prefix)
: disk(disk_)
, metric_increment(CurrentMetrics::TotalTemporaryFiles)
{
/// is is possible to use with disk other than DickLocal ?
disk->createDirectories(prefix_);
if (!disk)
throw Exception("Disk is not specified", ErrorCodes::LOGICAL_ERROR);
if (fs::path prefix_path(prefix); prefix_path.has_parent_path())
disk->createDirectories(prefix_path.parent_path());
ProfileEvents::increment(ProfileEvents::ExternalProcessingFilesTotal);
/// Do not use default temporaty root path `/tmp/tmpXXXXXX`.
/// The `dummy_prefix` is used to know what to replace with the real prefix.
String dummy_prefix = "a/";
filepath = Poco::TemporaryFile::tempName(dummy_prefix);
relative_path = Poco::TemporaryFile::tempName(dummy_prefix);
dummy_prefix += "tmp";
/// a/tmpXXXXX -> <prefix>XXXXX
assert(filepath.starts_with(dummy_prefix));
filepath.replace(0, dummy_prefix.length(), prefix_);
assert(relative_path.starts_with(dummy_prefix));
relative_path.replace(0, dummy_prefix.length(), prefix);
if (relative_path.empty())
throw Exception("Temporary file name is empty", ErrorCodes::LOGICAL_ERROR);
}
String TemporaryFileOnDisk::getPath() const
{
return std::filesystem::path(disk->getPath()) / relative_path;
}
TemporaryFileOnDisk::~TemporaryFileOnDisk()
{
try
{
if (disk && !filepath.empty() && disk->exists(filepath))
disk->removeRecursive(filepath);
if (!disk || relative_path.empty())
return;
if (!disk->exists(relative_path))
{
LOG_WARNING(&Poco::Logger::get("TemporaryFileOnDisk"), "Temporary path '{}' does not exist in '{}'", relative_path, disk->getPath());
return;
}
disk->removeRecursive(relative_path);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}

View File

@ -5,12 +5,6 @@
#include <Disks/IDisk.h>
#include <Common/CurrentMetrics.h>
namespace CurrentMetrics
{
extern const Metric TotalTemporaryFiles;
}
namespace DB
{
using DiskPtr = std::shared_ptr<IDisk>;
@ -24,20 +18,21 @@ class TemporaryFileOnDisk
public:
explicit TemporaryFileOnDisk(const DiskPtr & disk_);
explicit TemporaryFileOnDisk(const DiskPtr & disk_, CurrentMetrics::Value metric_scope);
explicit TemporaryFileOnDisk(const DiskPtr & disk_, const String & prefix_);
explicit TemporaryFileOnDisk(const DiskPtr & disk_, const String & prefix);
~TemporaryFileOnDisk();
DiskPtr getDisk() const { return disk; }
const String & getPath() const { return filepath; }
const String & path() const { return filepath; }
String getPath() const;
private:
DiskPtr disk;
String filepath;
/// Relative path in disk to the temporary file or directory
String relative_path;
CurrentMetrics::Increment metric_increment;
CurrentMetrics::Increment metric_increment{CurrentMetrics::TotalTemporaryFiles};
/// Specified if we know what for file is used (sort/aggregate/join).
std::optional<CurrentMetrics::Increment> sub_metric_increment = {};
};

View File

@ -570,7 +570,7 @@ Aggregator::Aggregator(const Block & header_, const Params & params_)
: header(header_)
, keys_positions(calculateKeysPositions(header, params_))
, params(params_)
, tmp_data(params.tmp_data_scope ? std::make_unique<TemporaryDataOnDisk>(params.tmp_data_scope) : nullptr)
, tmp_data(params.tmp_data_scope ? std::make_unique<TemporaryDataOnDisk>(params.tmp_data_scope, CurrentMetrics::TemporaryFilesForAggregation) : nullptr)
, min_bytes_for_prefetch(getMinBytesForPrefetch())
{
/// Use query-level memory tracker
@ -1573,7 +1573,7 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, si
Stopwatch watch;
size_t rows = data_variants.size();
auto & out_stream = tmp_data->createStream(getHeader(false), CurrentMetrics::TemporaryFilesForAggregation, max_temp_file_size);
auto & out_stream = tmp_data->createStream(getHeader(false), max_temp_file_size);
ProfileEvents::increment(ProfileEvents::ExternalAggregationWritePart);
LOG_DEBUG(log, "Writing part of aggregation data into temporary file {}", out_stream.path());

View File

@ -1033,7 +1033,7 @@ std::shared_ptr<Block> MergeJoin::loadRightBlock(size_t pos) const
{
auto load_func = [&]() -> std::shared_ptr<Block>
{
TemporaryFileStreamLegacy input(flushed_right_blocks[pos]->path(), materializeBlock(right_sample_block));
TemporaryFileStreamLegacy input(flushed_right_blocks[pos]->getPath(), materializeBlock(right_sample_block));
return std::make_shared<Block>(input.block_in->read());
};

View File

@ -264,7 +264,7 @@ SortedBlocksWriter::SortedFiles SortedBlocksWriter::finishMerge(std::function<vo
Pipe SortedBlocksWriter::streamFromFile(const TmpFilePtr & file) const
{
return Pipe(std::make_shared<TemporaryFileLazySource>(file->path(), materializeBlock(sample_block)));
return Pipe(std::make_shared<TemporaryFileLazySource>(file->getPath(), materializeBlock(sample_block)));
}

View File

@ -41,7 +41,7 @@ void TemporaryDataOnDiskScope::deltaAllocAndCheck(int compressed_delta, int unco
stat.uncompressed_size += uncompressed_delta;
}
TemporaryFileStream & TemporaryDataOnDisk::createStream(const Block & header, CurrentMetrics::Value metric_scope, size_t max_file_size)
TemporaryFileStream & TemporaryDataOnDisk::createStream(const Block & header, size_t max_file_size)
{
DiskPtr disk;
if (max_file_size > 0)
@ -56,7 +56,7 @@ TemporaryFileStream & TemporaryDataOnDisk::createStream(const Block & header, Cu
disk = volume->getDisk();
}
auto tmp_file = std::make_unique<TemporaryFileOnDisk>(disk, metric_scope);
auto tmp_file = std::make_unique<TemporaryFileOnDisk>(disk, current_metric_scope);
std::lock_guard lock(mutex);
TemporaryFileStreamPtr & tmp_stream = streams.emplace_back(std::make_unique<TemporaryFileStream>(std::move(tmp_file), header, this));
@ -94,9 +94,9 @@ struct TemporaryFileStream::OutputWriter
if (finalized)
throw Exception("Cannot write to finalized stream", ErrorCodes::LOGICAL_ERROR);
out_writer.write(block);
num_rows += block.rows();
}
void finalize()
{
if (finalized)
@ -127,6 +127,8 @@ struct TemporaryFileStream::OutputWriter
CompressedWriteBuffer out_compressed_buf;
NativeWriter out_writer;
std::atomic_size_t num_rows = 0;
bool finalized = false;
};
@ -157,7 +159,7 @@ TemporaryFileStream::TemporaryFileStream(TemporaryFileOnDiskHolder file_, const
: parent(parent_)
, header(header_)
, file(std::move(file_))
, out_writer(std::make_unique<OutputWriter>(file->path(), header))
, out_writer(std::make_unique<OutputWriter>(file->getPath(), header))
{
}
@ -172,6 +174,9 @@ void TemporaryFileStream::write(const Block & block)
TemporaryFileStream::Stat TemporaryFileStream::finishWriting()
{
if (isWriteFinished())
return stat;
if (out_writer)
{
out_writer->finalize();
@ -196,19 +201,19 @@ Block TemporaryFileStream::read()
if (!isWriteFinished())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Writing has been not finished");
if (isFinalized())
if (isEof())
return {};
if (!in_reader)
{
in_reader = std::make_unique<InputReader>(file->path(), header);
in_reader = std::make_unique<InputReader>(file->getPath(), header);
}
Block block = in_reader->read();
if (!block)
{
/// finalize earlier to release resources, do not wait for the destructor
this->finalize();
this->release();
}
return block;
}
@ -223,20 +228,21 @@ void TemporaryFileStream::updateAllocAndCheck()
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Temporary file {} size decreased after write: compressed: {} -> {}, uncompressed: {} -> {}",
file->path(), new_compressed_size, stat.compressed_size, new_uncompressed_size, stat.uncompressed_size);
file->getPath(), new_compressed_size, stat.compressed_size, new_uncompressed_size, stat.uncompressed_size);
}
parent->deltaAllocAndCheck(new_compressed_size - stat.compressed_size, new_uncompressed_size - stat.uncompressed_size);
stat.compressed_size = new_compressed_size;
stat.uncompressed_size = new_uncompressed_size;
stat.num_rows = out_writer->num_rows;
}
bool TemporaryFileStream::isFinalized() const
bool TemporaryFileStream::isEof() const
{
return file == nullptr;
}
void TemporaryFileStream::finalize()
void TemporaryFileStream::release()
{
if (file)
{
@ -258,7 +264,7 @@ TemporaryFileStream::~TemporaryFileStream()
{
try
{
finalize();
release();
}
catch (...)
{

View File

@ -5,6 +5,13 @@
#include <Interpreters/Context.h>
#include <Disks/TemporaryFileOnDisk.h>
#include <Disks/IVolume.h>
#include <Common/CurrentMetrics.h>
namespace CurrentMetrics
{
extern const Metric TemporaryFilesUnknown;
}
namespace DB
{
@ -18,7 +25,6 @@ using TemporaryDataOnDiskPtr = std::unique_ptr<TemporaryDataOnDisk>;
class TemporaryFileStream;
using TemporaryFileStreamPtr = std::unique_ptr<TemporaryFileStream>;
/*
* Used to account amount of temporary data written to disk.
* If limit is set, throws exception if limit is exceeded.
@ -65,15 +71,21 @@ protected:
class TemporaryDataOnDisk : private TemporaryDataOnDiskScope
{
friend class TemporaryFileStream; /// to allow it to call `deltaAllocAndCheck` to account data
public:
using TemporaryDataOnDiskScope::StatAtomic;
explicit TemporaryDataOnDisk(TemporaryDataOnDiskScopePtr parent_)
: TemporaryDataOnDiskScope(std::move(parent_), 0)
: TemporaryDataOnDiskScope(std::move(parent_), /* limit_ = */ 0)
{}
explicit TemporaryDataOnDisk(TemporaryDataOnDiskScopePtr parent_, CurrentMetrics::Value metric_scope)
: TemporaryDataOnDiskScope(std::move(parent_), /* limit_ = */ 0)
, current_metric_scope(metric_scope)
{}
/// If max_file_size > 0, then check that there's enough space on the disk and throw an exception in case of lack of free space
TemporaryFileStream & createStream(const Block & header, CurrentMetrics::Value metric_scope, size_t max_file_size = 0);
TemporaryFileStream & createStream(const Block & header, size_t max_file_size = 0);
std::vector<TemporaryFileStream *> getStreams() const;
bool empty() const;
@ -83,6 +95,8 @@ public:
private:
mutable std::mutex mutex;
std::vector<TemporaryFileStreamPtr> streams TSA_GUARDED_BY(mutex);
typename CurrentMetrics::Value current_metric_scope = CurrentMetrics::TemporaryFilesUnknown;
};
/*
@ -99,6 +113,7 @@ public:
/// Non-atomic because we don't allow to `read` or `write` into single file from multiple threads
size_t compressed_size = 0;
size_t uncompressed_size = 0;
size_t num_rows = 0;
};
TemporaryFileStream(TemporaryFileOnDiskHolder file_, const Block & header_, TemporaryDataOnDisk * parent_);
@ -109,17 +124,19 @@ public:
Block read();
const String & path() const { return file->getPath(); }
const String path() const { return file->getPath(); }
Block getHeader() const { return header; }
/// Read finished and file released
bool isEof() const;
~TemporaryFileStream();
private:
void updateAllocAndCheck();
/// Finalize everything, close reader and writer, delete file
void finalize();
bool isFinalized() const;
/// Release everything, close reader and writer, delete file
void release();
TemporaryDataOnDisk * parent;

View File

@ -9,6 +9,12 @@
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Common/JSONBuilder.h>
namespace CurrentMetrics
{
extern const Metric TemporaryFilesForSort;
}
namespace DB
{
@ -197,7 +203,7 @@ void SortingStep::mergeSorting(QueryPipelineBuilder & pipeline, const SortDescri
max_bytes_before_remerge / pipeline.getNumStreams(),
remerge_lowered_memory_bytes_ratio,
max_bytes_before_external_sort,
std::make_unique<TemporaryDataOnDisk>(tmp_data),
std::make_unique<TemporaryDataOnDisk>(tmp_data, CurrentMetrics::TemporaryFilesForSort),
min_free_disk_space);
});
}

View File

@ -22,10 +22,6 @@ namespace ProfileEvents
extern const Event ExternalProcessingUncompressedBytesTotal;
}
namespace CurrentMetrics
{
extern const Metric TemporaryFilesForSort;
}
namespace DB
{
@ -171,7 +167,7 @@ void MergeSortingTransform::consume(Chunk chunk)
{
/// If there's less free disk space than reserve_size, an exception will be thrown
size_t reserve_size = sum_bytes_in_blocks + min_free_disk_space;
auto & tmp_stream = tmp_data->createStream(header_without_constants, CurrentMetrics::TemporaryFilesForSort, reserve_size);
auto & tmp_stream = tmp_data->createStream(header_without_constants, reserve_size);
merge_sorter = std::make_unique<MergeSorter>(header_without_constants, std::move(chunks), description, max_merged_block_size, limit);
auto current_processor = std::make_shared<BufferingToFileTransform>(header_without_constants, tmp_stream, log);