Add ability to use multiple disks/volumes for temporary data

This patch adds <tmp_policy> config directive, that will define the
policy to use for storing temporary files, if it is not set (default)
the <tmp_path> will be used.

Also tmp_policy has some limitations:
- move_factor              is ignored
- keep_free_space_bytes    is ignored
- max_data_part_size_bytes is ignored
- must have exactly one volume
This commit is contained in:
Azat Khuzhin 2020-01-19 17:26:28 +03:00
parent 1fa6adacbe
commit 88bfb788a9
23 changed files with 194 additions and 72 deletions

View File

@ -111,7 +111,7 @@ void LocalServer::tryInitPath()
/// In case of empty path set paths to helpful directories
std::string cd = Poco::Path::current();
context->setTemporaryPath(cd + "tmp");
context->setTemporaryStorage(cd + "tmp");
context->setFlagsPath(cd + "flags");
context->setUserFilesPath(""); // user's files are everywhere
}

View File

@ -17,6 +17,7 @@
#include <Common/setThreadName.h>
#include <Common/config.h>
#include <Common/SettingsChanges.h>
#include <Disks/DiskSpaceMonitor.h>
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedWriteBuffer.h>
#include <IO/ReadBufferFromIStream.h>
@ -351,7 +352,8 @@ void HTTPHandler::processQuery(
if (buffer_until_eof)
{
std::string tmp_path_template = context.getTemporaryPath() + "http_buffers/";
const std::string tmp_path(context.getTemporaryVolume()->getNextDisk()->getPath());
const std::string tmp_path_template(tmp_path + "http_buffers/");
auto create_tmp_disk_buffer = [tmp_path_template] (const WriteBufferPtr &)
{

View File

@ -77,6 +77,31 @@ namespace CurrentMetrics
extern const Metric VersionInteger;
}
namespace
{
void setupTmpPath(Logger * log, const std::string & path)
{
LOG_DEBUG(log, "Setting up " << path << " to store temporary data in it");
Poco::File(path).createDirectories();
/// Clearing old temporary files.
Poco::DirectoryIterator dir_end;
for (Poco::DirectoryIterator it(path); it != dir_end; ++it)
{
if (it->isFile() && startsWith(it.name(), "tmp"))
{
LOG_DEBUG(log, "Removing old temporary file " << it->path());
it->remove();
}
else
LOG_DEBUG(log, "Skipped file in temporary path " << it->path());
}
}
}
namespace DB
{
@ -331,22 +356,14 @@ int Server::main(const std::vector<std::string> & /*args*/)
DateLUT::instance();
LOG_TRACE(log, "Initialized DateLUT with time zone '" << DateLUT::instance().getTimeZone() << "'.");
/// Directory with temporary data for processing of heavy queries.
/// Storage with temporary data for processing of heavy queries.
{
std::string tmp_path = config().getString("tmp_path", path + "tmp/");
global_context->setTemporaryPath(tmp_path);
Poco::File(tmp_path).createDirectories();
/// Clearing old temporary files.
Poco::DirectoryIterator dir_end;
for (Poco::DirectoryIterator it(tmp_path); it != dir_end; ++it)
{
if (it->isFile() && startsWith(it.name(), "tmp"))
{
LOG_DEBUG(log, "Removing old temporary file " << it->path());
it->remove();
}
}
std::string tmp_policy = config().getString("tmp_policy", "");
const VolumePtr & volume = global_context->setTemporaryStorage(tmp_path, tmp_policy);
for (const DiskPtr & disk : volume->disks)
setupTmpPath(log, disk->getPath());
}
/** Directory with 'flags': files indicating temporary settings for the server set by system administrator.

View File

@ -133,6 +133,17 @@
<!-- Path to temporary data for processing hard queries. -->
<tmp_path>/var/lib/clickhouse/tmp/</tmp_path>
<!-- Policy from the <storage_configuration> for the temporary files.
If not set <tmp_path> is used, otherwise <tmp_path> is ignored.
Notes:
- move_factor is ignored
- keep_free_space_bytes is ignored
- max_data_part_size_bytes is ignored
- you must have exactly one volume in that policy
-->
<!-- <tmp_policy>tmp</tmp_policy> -->
<!-- Directory with user provided files that are accessible by 'file' table function. -->
<user_files_path>/var/lib/clickhouse/user_files/</user_files_path>

View File

@ -7,6 +7,7 @@
#include <IO/WriteBufferFromFile.h>
#include <Compression/CompressedWriteBuffer.h>
#include <Interpreters/sortBlock.h>
#include <Disks/DiskSpaceMonitor.h>
namespace ProfileEvents
@ -21,10 +22,10 @@ namespace DB
MergeSortingBlockInputStream::MergeSortingBlockInputStream(
const BlockInputStreamPtr & input, SortDescription & description_,
size_t max_merged_block_size_, UInt64 limit_, size_t max_bytes_before_remerge_,
size_t max_bytes_before_external_sort_, const std::string & tmp_path_, size_t min_free_disk_space_)
size_t max_bytes_before_external_sort_, VolumePtr tmp_volume_, size_t min_free_disk_space_)
: description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_),
max_bytes_before_remerge(max_bytes_before_remerge_),
max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_path(tmp_path_),
max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_volume(tmp_volume_),
min_free_disk_space(min_free_disk_space_)
{
children.push_back(input);
@ -78,10 +79,14 @@ Block MergeSortingBlockInputStream::readImpl()
*/
if (max_bytes_before_external_sort && sum_bytes_in_blocks > max_bytes_before_external_sort)
{
if (!enoughSpaceInDirectory(tmp_path, sum_bytes_in_blocks + min_free_disk_space))
throw Exception("Not enough space for external sort in " + tmp_path, ErrorCodes::NOT_ENOUGH_SPACE);
size_t size = sum_bytes_in_blocks + min_free_disk_space;
auto reservation = tmp_volume->reserve(size);
if (!reservation)
throw Exception("Not enough space for external sort in temporary storage", ErrorCodes::NOT_ENOUGH_SPACE);
const std::string tmp_path(reservation->getDisk()->getPath());
temporary_files.emplace_back(createTemporaryFile(tmp_path));
const std::string & path = temporary_files.back()->path();
MergeSortingBlocksBlockInputStream block_in(blocks, description, max_merged_block_size, limit);

View File

@ -18,6 +18,9 @@ namespace DB
struct TemporaryFileStream;
class Volume;
using VolumePtr = std::shared_ptr<Volume>;
namespace ErrorCodes
{
extern const int NOT_ENOUGH_SPACE;
@ -77,7 +80,7 @@ public:
MergeSortingBlockInputStream(const BlockInputStreamPtr & input, SortDescription & description_,
size_t max_merged_block_size_, UInt64 limit_,
size_t max_bytes_before_remerge_,
size_t max_bytes_before_external_sort_, const std::string & tmp_path_,
size_t max_bytes_before_external_sort_, VolumePtr tmp_volume_,
size_t min_free_disk_space_);
String getName() const override { return "MergeSorting"; }
@ -97,7 +100,7 @@ private:
size_t max_bytes_before_remerge;
size_t max_bytes_before_external_sort;
const std::string tmp_path;
VolumePtr tmp_volume;
size_t min_free_disk_space;
Logger * log = &Logger::get("MergeSortingBlockInputStream");

View File

@ -111,6 +111,12 @@ Volume::Volume(
<< " < " << formatReadableSizeWithBinarySuffix(MIN_PART_SIZE) << ")");
}
DiskPtr Volume::getNextDisk()
{
size_t start_from = last_used.fetch_add(1u, std::memory_order_relaxed);
size_t index = start_from % disks.size();
return disks[index];
}
ReservationPtr Volume::reserve(UInt64 expected_size)
{

View File

@ -67,6 +67,13 @@ public:
const String & config_prefix,
const DiskSelector & disk_selector);
/// Next disk (round-robin)
///
/// - Used with policy for temporary data
/// - Ignores all limitations
/// - Shares last access with reserve()
DiskPtr getNextDisk();
/// Uses Round-robin to choose disk for reservation.
/// Returns valid reservation or nullptr if there is no space left on any disk.
ReservationPtr reserve(UInt64 bytes) override;

View File

@ -28,6 +28,7 @@
#include <common/config_common.h>
#include <AggregateFunctions/AggregateFunctionArray.h>
#include <AggregateFunctions/AggregateFunctionState.h>
#include <Disks/DiskSpaceMonitor.h>
namespace ProfileEvents
@ -681,22 +682,25 @@ bool Aggregator::executeOnBlock(Columns columns, UInt64 num_rows, AggregatedData
&& current_memory_usage > static_cast<Int64>(params.max_bytes_before_external_group_by)
&& worth_convert_to_two_level)
{
if (!enoughSpaceInDirectory(params.tmp_path, current_memory_usage + params.min_free_disk_space))
throw Exception("Not enough space for external aggregation in " + params.tmp_path, ErrorCodes::NOT_ENOUGH_SPACE);
size_t size = current_memory_usage + params.min_free_disk_space;
auto reservation = params.tmp_volume->reserve(size);
if (!reservation)
throw Exception("Not enough space for external aggregation in temporary storage", ErrorCodes::NOT_ENOUGH_SPACE);
writeToTemporaryFile(result);
const std::string tmp_path(reservation->getDisk()->getPath());
writeToTemporaryFile(result, tmp_path);
}
return true;
}
void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants)
void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, const String & tmp_path)
{
Stopwatch watch;
size_t rows = data_variants.size();
auto file = createTemporaryFile(params.tmp_path);
auto file = createTemporaryFile(tmp_path);
const std::string & path = file->path();
WriteBufferFromFile file_buf(path);
CompressedWriteBuffer compressed_buf(file_buf);
@ -753,6 +757,10 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants)
<< (uncompressed_bytes / elapsed_seconds / 1048576.0) << " MiB/sec. uncompressed, "
<< (compressed_bytes / elapsed_seconds / 1048576.0) << " MiB/sec. compressed)");
}
void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants)
{
return writeToTemporaryFile(data_variants, params.tmp_volume->getNextDisk()->getPath());
}
template <typename Method>

View File

@ -46,6 +46,8 @@ namespace ErrorCodes
class IBlockOutputStream;
class Volume;
using VolumePtr = std::shared_ptr<Volume>;
/** Different data structures that can be used for aggregation
* For efficiency, the aggregation data itself is put into the pool.
@ -860,7 +862,7 @@ public:
/// Return empty result when aggregating without keys on empty set.
bool empty_result_for_aggregation_by_empty_set;
const std::string tmp_path;
VolumePtr tmp_volume;
/// Settings is used to determine cache size. No threads are created.
size_t max_threads;
@ -873,7 +875,7 @@ public:
size_t group_by_two_level_threshold_, size_t group_by_two_level_threshold_bytes_,
size_t max_bytes_before_external_group_by_,
bool empty_result_for_aggregation_by_empty_set_,
const std::string & tmp_path_, size_t max_threads_,
VolumePtr tmp_volume_, size_t max_threads_,
size_t min_free_disk_space_)
: src_header(src_header_),
keys(keys_), aggregates(aggregates_), keys_size(keys.size()), aggregates_size(aggregates.size()),
@ -881,7 +883,7 @@ public:
group_by_two_level_threshold(group_by_two_level_threshold_), group_by_two_level_threshold_bytes(group_by_two_level_threshold_bytes_),
max_bytes_before_external_group_by(max_bytes_before_external_group_by_),
empty_result_for_aggregation_by_empty_set(empty_result_for_aggregation_by_empty_set_),
tmp_path(tmp_path_), max_threads(max_threads_),
tmp_volume(tmp_volume_), max_threads(max_threads_),
min_free_disk_space(min_free_disk_space_)
{
}
@ -889,7 +891,7 @@ public:
/// Only parameters that matter during merge.
Params(const Block & intermediate_header_,
const ColumnNumbers & keys_, const AggregateDescriptions & aggregates_, bool overflow_row_, size_t max_threads_)
: Params(Block(), keys_, aggregates_, overflow_row_, 0, OverflowMode::THROW, 0, 0, 0, false, "", max_threads_, 0)
: Params(Block(), keys_, aggregates_, overflow_row_, 0, OverflowMode::THROW, 0, 0, 0, false, nullptr, max_threads_, 0)
{
intermediate_header = intermediate_header_;
}
@ -955,6 +957,7 @@ public:
void setCancellationHook(const CancellationHook cancellation_hook);
/// For external aggregation.
void writeToTemporaryFile(AggregatedDataVariants & data_variants, const String & tmp_path);
void writeToTemporaryFile(AggregatedDataVariants & data_variants);
bool hasTemporaryFiles() const { return !temporary_files.empty(); }

View File

@ -19,14 +19,14 @@ namespace ErrorCodes
extern const int PARAMETER_OUT_OF_BOUND;
}
AnalyzedJoin::AnalyzedJoin(const Settings & settings, const String & tmp_path_)
AnalyzedJoin::AnalyzedJoin(const Settings & settings, VolumePtr tmp_volume_)
: size_limits(SizeLimits{settings.max_rows_in_join, settings.max_bytes_in_join, settings.join_overflow_mode})
, default_max_bytes(settings.default_max_bytes_in_join)
, join_use_nulls(settings.join_use_nulls)
, partial_merge_join(settings.partial_merge_join)
, partial_merge_join_optimizations(settings.partial_merge_join_optimizations)
, partial_merge_join_rows_in_right_blocks(settings.partial_merge_join_rows_in_right_blocks)
, tmp_path(tmp_path_)
, tmp_volume(tmp_volume_)
{}
void AnalyzedJoin::addUsingKey(const ASTPtr & ast)

View File

@ -21,6 +21,9 @@ class Block;
struct Settings;
class Volume;
using VolumePtr = std::shared_ptr<Volume>;
class AnalyzedJoin
{
/** Query of the form `SELECT expr(x) AS k FROM t1 ANY LEFT JOIN (SELECT expr(x) AS k FROM t2) USING k`
@ -61,10 +64,10 @@ class AnalyzedJoin
/// Original name -> name. Only ranamed columns.
std::unordered_map<String, String> renames;
String tmp_path;
VolumePtr tmp_volume;
public:
AnalyzedJoin(const Settings &, const String & tmp_path);
AnalyzedJoin(const Settings &, VolumePtr tmp_volume);
/// for StorageJoin
AnalyzedJoin(SizeLimits limits, bool use_nulls, ASTTableJoin::Kind kind, ASTTableJoin::Strictness strictness,
@ -81,7 +84,7 @@ public:
ASTTableJoin::Kind kind() const { return table_join.kind; }
ASTTableJoin::Strictness strictness() const { return table_join.strictness; }
const SizeLimits & sizeLimits() const { return size_limits; }
const String & getTemporaryPath() const { return tmp_path; }
VolumePtr getTemporaryVolume() { return tmp_volume; }
bool forceNullableRight() const { return join_use_nulls && isLeftOrFull(table_join.kind); }
bool forceNullableLeft() const { return join_use_nulls && isRightOrFull(table_join.kind); }

View File

@ -22,6 +22,7 @@
#include <Storages/MergeTree/MergeList.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Storages/CompressionCodecSelector.h>
#include <Disks/DiskLocal.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Interpreters/ActionLocksManager.h>
#include <Core/Settings.h>
@ -95,6 +96,7 @@ namespace ErrorCodes
extern const int SCALAR_ALREADY_EXISTS;
extern const int UNKNOWN_SCALAR;
extern const int NOT_ENOUGH_PRIVILEGES;
extern const int UNKNOWN_POLICY;
}
@ -123,12 +125,14 @@ struct ContextShared
String interserver_scheme; /// http or https
String path; /// Path to the data directory, with a slash at the end.
String tmp_path; /// The path to the temporary files that occur when processing the request.
String flags_path; /// Path to the directory with some control flags for server maintenance.
String user_files_path; /// Path to the directory with user provided files, usable by 'file' table function.
String dictionaries_lib_path; /// Path to the directory with user provided binaries and libraries for external dictionaries.
ConfigurationPtr config; /// Global configuration settings.
String tmp_path; /// Path to the temporary files that occur when processing the request.
mutable VolumePtr tmp_volume; /// Volume for the the temporary files that occur when processing the request.
Databases databases; /// List of databases and tables in them.
mutable std::optional<EmbeddedDictionaries> embedded_dictionaries; /// Metrica's dictionaries. Have lazy initialization.
mutable std::optional<ExternalDictionariesLoader> external_dictionaries_loader;
@ -151,9 +155,9 @@ struct ContextShared
std::unique_ptr<DDLWorker> ddl_worker; /// Process ddl commands from zk.
/// Rules for selecting the compression settings, depending on the size of the part.
mutable std::unique_ptr<CompressionCodecSelector> compression_codec_selector;
/// Storage disk chooser
/// Storage disk chooser for MergeTree engines
mutable std::unique_ptr<DiskSelector> merge_tree_disk_selector;
/// Storage policy chooser
/// Storage policy chooser for MergeTree engines
mutable std::unique_ptr<StoragePolicySelector> merge_tree_storage_policy_selector;
std::optional<MergeTreeSettings> merge_tree_settings; /// Settings of MergeTree* engines.
@ -527,12 +531,6 @@ String Context::getPath() const
return shared->path;
}
String Context::getTemporaryPath() const
{
auto lock = getLock();
return shared->tmp_path;
}
String Context::getFlagsPath() const
{
auto lock = getLock();
@ -551,13 +549,19 @@ String Context::getDictionariesLibPath() const
return shared->dictionaries_lib_path;
}
VolumePtr Context::getTemporaryVolume() const
{
auto lock = getLock();
return shared->tmp_volume;
}
void Context::setPath(const String & path)
{
auto lock = getLock();
shared->path = path;
if (shared->tmp_path.empty())
if (shared->tmp_path.empty() && !shared->tmp_volume)
shared->tmp_path = shared->path + "tmp/";
if (shared->flags_path.empty())
@ -570,10 +574,31 @@ void Context::setPath(const String & path)
shared->dictionaries_lib_path = shared->path + "dictionaries_lib/";
}
void Context::setTemporaryPath(const String & path)
VolumePtr Context::setTemporaryStorage(const String & path, const String & policy_name)
{
auto lock = getLock();
if (policy_name.empty())
{
shared->tmp_path = path;
if (!shared->tmp_path.ends_with('/'))
shared->tmp_path += '/';
auto disk = std::make_shared<DiskLocal>("_tmp_default", shared->tmp_path, 0);
shared->tmp_volume = std::make_shared<Volume>("_tmp_default", std::vector<DiskPtr>{disk}, 0);
}
else
{
StoragePolicyPtr tmp_policy = getStoragePolicySelector()[policy_name];
if (tmp_policy->getVolumes().size() != 1)
throw Exception("Policy " + policy_name + " is used temporary files, such policy should have exactly one volume", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
shared->tmp_volume = tmp_policy->getVolume(0);
}
if (!shared->tmp_volume->disks.size())
throw Exception("No disks volume for temporary files", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
return shared->tmp_volume;
}
void Context::setFlagsPath(const String & path)

View File

@ -91,6 +91,9 @@ class StoragePolicySelector;
class IOutputFormat;
using OutputFormatPtr = std::shared_ptr<IOutputFormat>;
class Volume;
using VolumePtr = std::shared_ptr<Volume>;
#if USE_EMBEDDED_COMPILER
class CompiledExpressionCache;
@ -195,17 +198,19 @@ public:
~Context();
String getPath() const;
String getTemporaryPath() const;
String getFlagsPath() const;
String getUserFilesPath() const;
String getDictionariesLibPath() const;
VolumePtr getTemporaryVolume() const;
void setPath(const String & path);
void setTemporaryPath(const String & path);
void setFlagsPath(const String & path);
void setUserFilesPath(const String & path);
void setDictionariesLibPath(const String & path);
VolumePtr setTemporaryStorage(const String & path, const String & policy_name = "");
using ConfigurationPtr = Poco::AutoPtr<Poco::Util::AbstractConfiguration>;
/// Global application configuration settings.

View File

@ -1873,7 +1873,7 @@ void InterpreterSelectQuery::executeAggregation(Pipeline & pipeline, const Expre
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0),
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0),
settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set,
context->getTemporaryPath(), settings.max_threads, settings.min_free_disk_space_for_temporary_data);
context->getTemporaryVolume(), settings.max_threads, settings.min_free_disk_space_for_temporary_data);
/// If there are several sources, then we perform parallel aggregation
if (pipeline.streams.size() > 1)
@ -1939,7 +1939,7 @@ void InterpreterSelectQuery::executeAggregation(QueryPipeline & pipeline, const
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold : SettingUInt64(0),
allow_to_use_two_level_group_by ? settings.group_by_two_level_threshold_bytes : SettingUInt64(0),
settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set,
context->getTemporaryPath(), settings.max_threads, settings.min_free_disk_space_for_temporary_data);
context->getTemporaryVolume(), settings.max_threads, settings.min_free_disk_space_for_temporary_data);
auto transform_params = std::make_shared<AggregatingTransformParams>(params, final);
@ -2165,7 +2165,7 @@ void InterpreterSelectQuery::executeRollupOrCube(Pipeline & pipeline, Modificato
false, settings.max_rows_to_group_by, settings.group_by_overflow_mode,
SettingUInt64(0), SettingUInt64(0),
settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set,
context->getTemporaryPath(), settings.max_threads, settings.min_free_disk_space_for_temporary_data);
context->getTemporaryVolume(), settings.max_threads, settings.min_free_disk_space_for_temporary_data);
if (modificator == Modificator::ROLLUP)
pipeline.firstStream() = std::make_shared<RollupBlockInputStream>(pipeline.firstStream(), params);
@ -2194,7 +2194,7 @@ void InterpreterSelectQuery::executeRollupOrCube(QueryPipeline & pipeline, Modif
false, settings.max_rows_to_group_by, settings.group_by_overflow_mode,
SettingUInt64(0), SettingUInt64(0),
settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set,
context->getTemporaryPath(), settings.max_threads, settings.min_free_disk_space_for_temporary_data);
context->getTemporaryVolume(), settings.max_threads, settings.min_free_disk_space_for_temporary_data);
auto transform_params = std::make_shared<AggregatingTransformParams>(params, true);
@ -2278,7 +2278,7 @@ void InterpreterSelectQuery::executeOrder(Pipeline & pipeline, InputSortingInfoP
sorting_stream, output_order_descr, settings.max_block_size, limit,
settings.max_bytes_before_remerge_sort,
settings.max_bytes_before_external_sort / pipeline.streams.size(),
context->getTemporaryPath(), settings.min_free_disk_space_for_temporary_data);
context->getTemporaryVolume(), settings.min_free_disk_space_for_temporary_data);
stream = merging_stream;
});
@ -2360,7 +2360,8 @@ void InterpreterSelectQuery::executeOrder(QueryPipeline & pipeline, InputSorting
return std::make_shared<MergeSortingTransform>(
header, output_order_descr, settings.max_block_size, limit,
settings.max_bytes_before_remerge_sort / pipeline.getNumStreams(),
settings.max_bytes_before_external_sort, context->getTemporaryPath(), settings.min_free_disk_space_for_temporary_data);
settings.max_bytes_before_external_sort, context->getTemporaryVolume(),
settings.min_free_disk_space_for_temporary_data);
});
/// If there are several streams, we merge them into one

View File

@ -13,6 +13,7 @@
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/TemporaryFileStream.h>
#include <DataStreams/ConcatBlockInputStream.h>
#include <Disks/DiskSpaceMonitor.h>
namespace DB
{
@ -386,6 +387,8 @@ void MiniLSM::insert(const BlocksList & blocks)
if (blocks.empty())
return;
const std::string path(volume->getNextDisk()->getPath());
SortedFiles sorted_blocks;
if (blocks.size() > 1)
{
@ -414,6 +417,7 @@ void MiniLSM::merge(std::function<void(const Block &)> callback)
BlockInputStreams inputs = makeSortedInputStreams(sorted_files, sample_block);
MergingSortedBlockInputStream sorted_stream(inputs, sort_description, rows_in_block);
const std::string path(volume->getNextDisk()->getPath());
SortedFiles out;
flushStreamToFiles(path, sample_block, sorted_stream, out, callback);
@ -463,7 +467,7 @@ MergeJoin::MergeJoin(std::shared_ptr<AnalyzedJoin> table_join_, const Block & ri
makeSortAndMerge(table_join->keyNamesLeft(), left_sort_description, left_merge_description);
makeSortAndMerge(table_join->keyNamesRight(), right_sort_description, right_merge_description);
lsm = std::make_unique<MiniLSM>(table_join->getTemporaryPath(), right_sample_block, right_sort_description, max_rows_in_right_block);
lsm = std::make_unique<MiniLSM>(table_join->getTemporaryVolume(), right_sample_block, right_sort_description, max_rows_in_right_block);
}
void MergeJoin::setTotals(const Block & totals_block)

View File

@ -17,20 +17,23 @@ class AnalyzedJoin;
class MergeJoinCursor;
struct MergeJoinEqualRange;
class Volume;
using VolumePtr = std::shared_ptr<Volume>;
struct MiniLSM
{
using SortedFiles = std::vector<std::unique_ptr<TemporaryFile>>;
const String & path;
VolumePtr volume;
const Block & sample_block;
const SortDescription & sort_description;
const size_t rows_in_block;
const size_t max_size;
std::vector<SortedFiles> sorted_files;
MiniLSM(const String & path_, const Block & sample_block_, const SortDescription & description,
MiniLSM(VolumePtr volume_, const Block & sample_block_, const SortDescription & description,
size_t rows_in_block_, size_t max_size_ = 16)
: path(path_)
: volume(volume_)
, sample_block(sample_block_)
, sort_description(description)
, rows_in_block(rows_in_block_)

View File

@ -816,7 +816,7 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(
SyntaxAnalyzerResult result;
result.storage = storage;
result.source_columns = source_columns_;
result.analyzed_join = std::make_shared<AnalyzedJoin>(settings, context.getTemporaryPath()); /// TODO: move to select_query logic
result.analyzed_join = std::make_shared<AnalyzedJoin>(settings, context.getTemporaryVolume()); /// TODO: move to select_query logic
if (storage)
collectSourceColumns(storage->getColumns(), result.source_columns, (select_query != nullptr));

View File

@ -79,7 +79,7 @@ int main(int argc, char ** argv)
Aggregator::Params params(
stream->getHeader(), {0, 1}, aggregate_descriptions,
false, 0, OverflowMode::THROW, 0, 0, 0, false, "", 1, 0);
false, 0, OverflowMode::THROW, 0, 0, 0, false, nullptr, 1, 0);
Aggregator aggregator(params);

View File

@ -8,6 +8,7 @@
#include <Compression/CompressedWriteBuffer.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <Disks/DiskSpaceMonitor.h>
namespace ProfileEvents
@ -95,11 +96,11 @@ MergeSortingTransform::MergeSortingTransform(
const SortDescription & description_,
size_t max_merged_block_size_, UInt64 limit_,
size_t max_bytes_before_remerge_,
size_t max_bytes_before_external_sort_, const std::string & tmp_path_,
size_t max_bytes_before_external_sort_, VolumePtr tmp_volume_,
size_t min_free_disk_space_)
: SortingTransform(header, description_, max_merged_block_size_, limit_)
, max_bytes_before_remerge(max_bytes_before_remerge_)
, max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_path(tmp_path_)
, max_bytes_before_external_sort(max_bytes_before_external_sort_), tmp_volume(tmp_volume_)
, min_free_disk_space(min_free_disk_space_) {}
Processors MergeSortingTransform::expandPipeline()
@ -172,10 +173,14 @@ void MergeSortingTransform::consume(Chunk chunk)
*/
if (max_bytes_before_external_sort && sum_bytes_in_blocks > max_bytes_before_external_sort)
{
if (!enoughSpaceInDirectory(tmp_path, sum_bytes_in_blocks + min_free_disk_space))
throw Exception("Not enough space for external sort in " + tmp_path, ErrorCodes::NOT_ENOUGH_SPACE);
size_t size = sum_bytes_in_blocks + min_free_disk_space;
auto reservation = tmp_volume->reserve(size);
if (!reservation)
throw Exception("Not enough space for external sort in temporary storage", ErrorCodes::NOT_ENOUGH_SPACE);
const std::string tmp_path(reservation->getDisk()->getPath());
temporary_files.emplace_back(createTemporaryFile(tmp_path));
const std::string & path = temporary_files.back()->path();
merge_sorter = std::make_unique<MergeSorter>(std::move(chunks), description, max_merged_block_size, limit);
auto current_processor = std::make_shared<BufferingToFileTransform>(header_without_constants, log, path);

View File

@ -9,6 +9,9 @@
namespace DB
{
class Volume;
using VolumePtr = std::shared_ptr<Volume>;
class MergeSortingTransform : public SortingTransform
{
public:
@ -17,7 +20,7 @@ public:
const SortDescription & description_,
size_t max_merged_block_size_, UInt64 limit_,
size_t max_bytes_before_remerge_,
size_t max_bytes_before_external_sort_, const std::string & tmp_path_,
size_t max_bytes_before_external_sort_, VolumePtr tmp_volume_,
size_t min_free_disk_space_);
String getName() const override { return "MergeSortingTransform"; }
@ -32,7 +35,7 @@ protected:
private:
size_t max_bytes_before_remerge;
size_t max_bytes_before_external_sort;
const std::string tmp_path;
VolumePtr tmp_volume;
size_t min_free_disk_space;
Logger * log = &Logger::get("MergeSortingTransform");

View File

@ -27,6 +27,8 @@
#include <Processors/Transforms/MergingAggregatedTransform.h>
#include <AggregateFunctions/registerAggregateFunctions.h>
#include <Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h>
#include <Disks/DiskSpaceMonitor.h>
#include <Disks/DiskLocal.h>
#include <Poco/ConsoleChannel.h>
#include <Poco/AutoPtr.h>
#include <Common/CurrentThread.h>
@ -187,6 +189,8 @@ try
auto & factory = AggregateFunctionFactory::instance();
auto cur_path = Poco::Path().absolute().toString();
auto disk = std::make_shared<DiskLocal>("tmp", cur_path, 0);
auto tmp_volume = std::make_shared<Volume>("tmp", std::vector<DiskPtr>{disk}, 0);
auto execute_one_stream = [&](String msg, size_t num_threads, bool two_level, bool external)
{
@ -228,7 +232,7 @@ try
group_by_two_level_threshold_bytes,
max_bytes_before_external_group_by,
false, /// empty_result_for_aggregation_by_empty_set
cur_path, /// tmp_path
tmp_volume,
1, /// max_threads
0
);
@ -301,7 +305,7 @@ try
group_by_two_level_threshold_bytes,
max_bytes_before_external_group_by,
false, /// empty_result_for_aggregation_by_empty_set
cur_path, /// tmp_path
tmp_volume,
1, /// max_threads
0
);

View File

@ -1,6 +1,8 @@
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypesNumber.h>
#include <Disks/DiskSpaceMonitor.h>
#include <Disks/DiskLocal.h>
#include <Processors/IProcessor.h>
#include <Processors/ISource.h>
@ -116,7 +118,10 @@ try
Logger::root().setChannel(channel);
Logger::root().setLevel("trace");
auto execute_chain = [](
auto disk = std::make_shared<DiskLocal>("tmp", ".", 0);
auto tmp_volume = std::make_shared<Volume>("tmp", std::vector<DiskPtr>{disk}, 0);
auto execute_chain = [tmp_volume](
String msg,
UInt64 source_block_size,
UInt64 blocks_count,
@ -133,7 +138,9 @@ try
SortDescription description = {{0, 1, 1}};
auto transform = std::make_shared<MergeSortingTransform>(
source->getPort().getHeader(), description,
max_merged_block_size, limit, max_bytes_before_remerge, max_bytes_before_external_sort, ".", 0);
max_merged_block_size, limit,
max_bytes_before_remerge, max_bytes_before_external_sort,
tmp_volume, 0);
auto sink = std::make_shared<CheckSortedSink>();
connect(source->getPort(), transform->getInputs().front());