Merge pull request #40893 from ClickHouse/vdimir/track-tmp-disk

This commit is contained in:
Vladimir C 2022-09-30 11:27:24 +02:00 committed by GitHub
commit 895afdec45
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
34 changed files with 679 additions and 245 deletions

View File

@ -1498,7 +1498,7 @@ If not set, [tmp_path](#tmp-path) is used, otherwise it is ignored.
- `move_factor` is ignored.
- `keep_free_space_bytes` is ignored.
- `max_data_part_size_bytes` is ignored.
- Уou must have exactly one volume in that policy.
- Policy should have exactly one volume with local disks.
:::
## uncompressed_cache_size {#server-settings-uncompressed_cache_size}

View File

@ -1342,12 +1342,13 @@ TCP порт для защищённого обмена данными с кли
Если политика не задана, используется [tmp_path](#tmp-path). В противном случае `tmp_path` игнорируется.
:::note "Примечание"
- `move_factor` игнорируется.
- `keep_free_space_bytes` игнорируется.
- `max_data_part_size_bytes` игнорируется.
- В данной политике у вас должен быть ровно один том.
:::
:::note "Примечание"
- `move_factor` игнорируется.
- `keep_free_space_bytes` игнорируется.
- `max_data_part_size_bytes` игнорируется.
- В данной политике должен быть ровно один том, содержащий только локальный диски.
:::
## uncompressed_cache_size {#server-settings-uncompressed_cache_size}
Размер кеша (в байтах) для несжатых данных, используемых движками таблиц семейства [MergeTree](../../operations/server-configuration-parameters/settings.md).

View File

@ -203,7 +203,7 @@ void LocalServer::tryInitPath()
global_context->setPath(path);
global_context->setTemporaryStorage(path + "tmp");
global_context->setTemporaryStorage(path + "tmp", "", 0);
global_context->setFlagsPath(path + "flags");
global_context->setUserFilesPath(""); // user's files are everywhere

View File

@ -209,7 +209,7 @@ try
fs::remove(it->path());
}
else
LOG_DEBUG(log, "Skipped file in temporary path {}", it->path().string());
LOG_DEBUG(log, "Found unknown file in temporary path {}", it->path().string());
}
}
catch (...)
@ -971,7 +971,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
{
std::string tmp_path = config().getString("tmp_path", path / "tmp/");
std::string tmp_policy = config().getString("tmp_policy", "");
const VolumePtr & volume = global_context->setTemporaryStorage(tmp_path, tmp_policy);
size_t tmp_max_size = config().getUInt64("tmp_max_size", 0);
const VolumePtr & volume = global_context->setTemporaryStorage(tmp_path, tmp_policy, tmp_max_size);
for (const DiskPtr & disk : volume->getDisks())
setupTmpPath(log, disk->getPath());
}

View File

@ -399,6 +399,9 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(UInt64, max_network_bandwidth_for_user, 0, "The maximum speed of data exchange over the network in bytes per second for all concurrently running user queries. Zero means unlimited.", 0)\
M(UInt64, max_network_bandwidth_for_all_users, 0, "The maximum speed of data exchange over the network in bytes per second for all concurrently running queries. Zero means unlimited.", 0) \
\
M(UInt64, max_temp_data_on_disk_size_for_user, 0, "The maximum amount of data consumed by temporary files on disk in bytes for all concurrently running user queries. Zero means unlimited.", 0)\
M(UInt64, max_temp_data_on_disk_size_for_query, 0, "The maximum amount of data consumed by temporary files on disk in bytes for all concurrently running queries. Zero means unlimited.", 0)\
\
M(UInt64, backup_threads, 16, "The maximum number of threads to execute BACKUP requests.", 0) \
M(UInt64, restore_threads, 16, "The maximum number of threads to execute RESTORE requests.", 0) \
\

View File

@ -241,4 +241,11 @@ DiskObjectStoragePtr DiskDecorator::createDiskObjectStorage()
return delegate->createDiskObjectStorage();
}
DiskPtr DiskDecorator::getNestedDisk() const
{
if (const auto * decorator = dynamic_cast<const DiskDecorator *>(delegate.get()))
return decorator->getNestedDisk();
return delegate;
}
}

View File

@ -107,6 +107,8 @@ public:
bool supportsChmod() const override { return delegate->supportsChmod(); }
void chmod(const String & path, mode_t mode) override { delegate->chmod(path, mode); }
virtual DiskPtr getNestedDisk() const;
protected:
Executor & getExecutor() override;

View File

@ -331,6 +331,20 @@ void DiskRestartProxy::getRemotePathsRecursive(
return DiskDecorator::getRemotePathsRecursive(path, paths_map);
}
DiskPtr DiskRestartProxy::getNestedDisk() const
{
DiskPtr delegate_copy;
{
ReadLock lock (mutex);
delegate_copy = delegate;
}
if (const auto * decorator = dynamic_cast<const DiskDecorator *>(delegate_copy.get()))
return decorator->getNestedDisk();
return delegate_copy;
}
void DiskRestartProxy::restart(ContextPtr context)
{
/// Speed up processing unhealthy requests.

View File

@ -71,6 +71,8 @@ public:
void restart(ContextPtr context);
DiskPtr getNestedDisk() const override;
private:
friend class RestartAwareReadBuffer;
friend class RestartAwareWriteBuffer;

View File

@ -237,6 +237,7 @@ Block NativeReader::read()
else
tmp_res.insert({col.type->createColumn()->cloneResized(rows), col.type, col.name});
}
tmp_res.info = res.info;
res.swap(tmp_res);
}

View File

@ -1,4 +1,4 @@
#include <Formats/TemporaryFileStream.h>
#include <Formats/TemporaryFileStreamLegacy.h>
#include <Formats/NativeReader.h>
#include <Formats/NativeWriter.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
@ -12,20 +12,20 @@ namespace DB
{
/// To read the data that was flushed into the temporary data file.
TemporaryFileStream::TemporaryFileStream(const std::string & path)
TemporaryFileStreamLegacy::TemporaryFileStreamLegacy(const std::string & path)
: file_in(path)
, compressed_in(file_in)
, block_in(std::make_unique<NativeReader>(compressed_in, DBMS_TCP_PROTOCOL_VERSION))
{}
TemporaryFileStream::TemporaryFileStream(const std::string & path, const Block & header_)
TemporaryFileStreamLegacy::TemporaryFileStreamLegacy(const std::string & path, const Block & header_)
: file_in(path)
, compressed_in(file_in)
, block_in(std::make_unique<NativeReader>(compressed_in, header_, 0))
{}
/// Flush data from input stream into file for future reading
TemporaryFileStream::Stat TemporaryFileStream::write(const std::string & path, const Block & header, QueryPipelineBuilder builder, const std::string & codec)
TemporaryFileStreamLegacy::Stat TemporaryFileStreamLegacy::write(const std::string & path, const Block & header, QueryPipelineBuilder builder, const std::string & codec)
{
WriteBufferFromFile file_buf(path);
CompressedWriteBuffer compressed_buf(file_buf, CompressionCodecFactory::instance().get(codec, {}));

View File

@ -9,8 +9,10 @@
namespace DB
{
/// Used only in MergeJoin
/// TODO: use `TemporaryDataOnDisk` instead
/// To read the data that was flushed into the temporary data file.
struct TemporaryFileStream
struct TemporaryFileStreamLegacy
{
struct Stat
{
@ -22,8 +24,8 @@ struct TemporaryFileStream
CompressedReadBuffer compressed_in;
std::unique_ptr<NativeReader> block_in;
explicit TemporaryFileStream(const std::string & path);
TemporaryFileStream(const std::string & path, const Block & header_);
explicit TemporaryFileStreamLegacy(const std::string & path);
TemporaryFileStreamLegacy(const std::string & path, const Block & header_);
/// Flush data from input stream into file for future reading
static Stat write(const std::string & path, const Block & header, QueryPipelineBuilder builder, const std::string & codec);

View File

@ -35,6 +35,7 @@
#include <Interpreters/JIT/CompiledExpressionCache.h>
#include <Core/ProtocolDefines.h>
#include <Disks/TemporaryFileOnDisk.h>
#include <Interpreters/TemporaryDataOnDisk.h>
#include <Parsers/ASTSelectQuery.h>
@ -59,6 +60,20 @@ namespace CurrentMetrics
extern const Metric TemporaryFilesForAggregation;
}
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_AGGREGATED_DATA_VARIANT;
extern const int TOO_MANY_ROWS;
extern const int EMPTY_DATA_PASSED;
extern const int CANNOT_MERGE_DIFFERENT_AGGREGATED_DATA_VARIANTS;
extern const int LOGICAL_ERROR;
}
}
namespace
{
/** Collects observed HashMap-s sizes to avoid redundant intermediate resizes.
@ -311,17 +326,6 @@ size_t getMinBytesForPrefetch()
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_AGGREGATED_DATA_VARIANT;
extern const int NOT_ENOUGH_SPACE;
extern const int TOO_MANY_ROWS;
extern const int EMPTY_DATA_PASSED;
extern const int CANNOT_MERGE_DIFFERENT_AGGREGATED_DATA_VARIANTS;
extern const int LOGICAL_ERROR;
}
AggregatedDataVariants::~AggregatedDataVariants()
{
if (aggregator && !aggregator->all_aggregates_has_trivial_destructor)
@ -566,6 +570,7 @@ Aggregator::Aggregator(const Block & header_, const Params & params_)
: header(header_)
, keys_positions(calculateKeysPositions(header, params_))
, params(params_)
, tmp_data(params.tmp_data_scope ? std::make_unique<TemporaryDataOnDisk>(params.tmp_data_scope) : nullptr)
, min_bytes_for_prefetch(getMinBytesForPrefetch())
{
/// Use query-level memory tracker
@ -1562,30 +1567,28 @@ bool Aggregator::executeOnBlock(Columns columns,
void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, size_t max_temp_file_size) const
{
if (!tmp_data)
throw Exception("Cannot write to temporary file because temporary file is not initialized", ErrorCodes::LOGICAL_ERROR);
Stopwatch watch;
size_t rows = data_variants.size();
auto file = createTempFile(max_temp_file_size);
const auto & path = file->path();
WriteBufferFromFile file_buf(path);
CompressedWriteBuffer compressed_buf(file_buf);
NativeWriter block_out(compressed_buf, DBMS_TCP_PROTOCOL_VERSION, getHeader(false));
LOG_DEBUG(log, "Writing part of aggregation data into temporary file {}", path);
auto & out_stream = tmp_data->createStream(getHeader(false), CurrentMetrics::TemporaryFilesForAggregation, max_temp_file_size);
ProfileEvents::increment(ProfileEvents::ExternalAggregationWritePart);
LOG_DEBUG(log, "Writing part of aggregation data into temporary file {}", out_stream.path());
/// Flush only two-level data and possibly overflow data.
#define M(NAME) \
else if (data_variants.type == AggregatedDataVariants::Type::NAME) \
writeToTemporaryFileImpl(data_variants, *data_variants.NAME, block_out);
writeToTemporaryFileImpl(data_variants, *data_variants.NAME, out_stream);
if (false) {} // NOLINT
APPLY_FOR_VARIANTS_TWO_LEVEL(M)
#undef M
else
throw Exception("Unknown aggregated data variant.", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
throw Exception("Unknown aggregated data variant", ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT);
/// NOTE Instead of freeing up memory and creating new hash tables and arenas, you can re-use the old ones.
data_variants.init(data_variants.type);
@ -1598,62 +1601,32 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, si
data_variants.without_key = place;
}
block_out.flush();
compressed_buf.next();
file_buf.next();
auto stat = out_stream.finishWriting();
ProfileEvents::increment(ProfileEvents::ExternalAggregationCompressedBytes, stat.compressed_size);
ProfileEvents::increment(ProfileEvents::ExternalAggregationUncompressedBytes, stat.uncompressed_size);
ProfileEvents::increment(ProfileEvents::ExternalProcessingCompressedBytesTotal, stat.compressed_size);
ProfileEvents::increment(ProfileEvents::ExternalProcessingUncompressedBytesTotal, stat.uncompressed_size);
double elapsed_seconds = watch.elapsedSeconds();
size_t compressed_bytes = file_buf.count();
size_t uncompressed_bytes = compressed_buf.count();
{
std::lock_guard lock(temporary_files.mutex);
temporary_files.files.emplace_back(std::move(file));
temporary_files.sum_size_uncompressed += uncompressed_bytes;
temporary_files.sum_size_compressed += compressed_bytes;
}
ProfileEvents::increment(ProfileEvents::ExternalAggregationCompressedBytes, compressed_bytes);
ProfileEvents::increment(ProfileEvents::ExternalAggregationUncompressedBytes, uncompressed_bytes);
ProfileEvents::increment(ProfileEvents::ExternalProcessingCompressedBytesTotal, compressed_bytes);
ProfileEvents::increment(ProfileEvents::ExternalProcessingUncompressedBytesTotal, uncompressed_bytes);
double compressed_size = stat.compressed_size;
double uncompressed_size = stat.uncompressed_size;
LOG_DEBUG(log,
"Written part in {:.3f} sec., {} rows, {} uncompressed, {} compressed,"
" {:.3f} uncompressed bytes per row, {:.3f} compressed bytes per row, compression rate: {:.3f}"
" ({:.3f} rows/sec., {}/sec. uncompressed, {}/sec. compressed)",
elapsed_seconds,
rows,
ReadableSize(uncompressed_bytes),
ReadableSize(compressed_bytes),
static_cast<double>(uncompressed_bytes) / rows,
static_cast<double>(compressed_bytes) / rows,
static_cast<double>(uncompressed_bytes) / compressed_bytes,
ReadableSize(uncompressed_size),
ReadableSize(compressed_size),
static_cast<double>(uncompressed_size) / rows,
static_cast<double>(compressed_size) / rows,
static_cast<double>(uncompressed_size) / compressed_size,
static_cast<double>(rows) / elapsed_seconds,
ReadableSize(static_cast<double>(uncompressed_bytes) / elapsed_seconds),
ReadableSize(static_cast<double>(compressed_bytes) / elapsed_seconds));
ReadableSize(static_cast<double>(uncompressed_size) / elapsed_seconds),
ReadableSize(static_cast<double>(compressed_size) / elapsed_seconds));
}
TemporaryFileOnDiskHolder Aggregator::createTempFile(size_t max_temp_file_size) const
{
auto file = std::make_unique<TemporaryFileOnDisk>(params.tmp_volume->getDisk(), CurrentMetrics::TemporaryFilesForAggregation);
// enoughSpaceInDirectory() is not enough to make it right, since
// another process (or another thread of aggregator) can consume all
// space.
//
// But true reservation (IVolume::reserve()) cannot be used here since
// current_memory_usage does not takes compression into account and
// will reserve way more that actually will be used.
//
// Hence let's do a simple check.
if (max_temp_file_size > 0 && !enoughSpaceInDirectory(file->getPath(), max_temp_file_size))
throw Exception(ErrorCodes::NOT_ENOUGH_SPACE, "Not enough space for external aggregation in '{}'", file->path());
return file;
}
template <typename Method>
Block Aggregator::convertOneBucketToBlock(
AggregatedDataVariants & data_variants,
@ -1703,7 +1676,7 @@ template <typename Method>
void Aggregator::writeToTemporaryFileImpl(
AggregatedDataVariants & data_variants,
Method & method,
NativeWriter & out) const
TemporaryFileStream & out) const
{
size_t max_temporary_block_size_rows = 0;
size_t max_temporary_block_size_bytes = 0;

View File

@ -29,6 +29,7 @@
#include <Interpreters/AggregateDescription.h>
#include <Interpreters/AggregationCommon.h>
#include <Interpreters/JIT/compileFunction.h>
#include <Interpreters/TemporaryDataOnDisk.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnFixedString.h>
@ -925,7 +926,7 @@ public:
/// Return empty result when aggregating without keys on empty set.
bool empty_result_for_aggregation_by_empty_set;
VolumePtr tmp_volume;
TemporaryDataOnDiskScopePtr tmp_data_scope;
/// Settings is used to determine cache size. No threads are created.
size_t max_threads;
@ -970,7 +971,7 @@ public:
size_t group_by_two_level_threshold_bytes_,
size_t max_bytes_before_external_group_by_,
bool empty_result_for_aggregation_by_empty_set_,
VolumePtr tmp_volume_,
TemporaryDataOnDiskScopePtr tmp_data_scope_,
size_t max_threads_,
size_t min_free_disk_space_,
bool compile_aggregate_expressions_,
@ -990,7 +991,7 @@ public:
, group_by_two_level_threshold_bytes(group_by_two_level_threshold_bytes_)
, max_bytes_before_external_group_by(max_bytes_before_external_group_by_)
, empty_result_for_aggregation_by_empty_set(empty_result_for_aggregation_by_empty_set_)
, tmp_volume(tmp_volume_)
, tmp_data_scope(std::move(tmp_data_scope_))
, max_threads(max_threads_)
, min_free_disk_space(min_free_disk_space_)
, compile_aggregate_expressions(compile_aggregate_expressions_)
@ -1071,25 +1072,9 @@ public:
/// For external aggregation.
void writeToTemporaryFile(AggregatedDataVariants & data_variants, size_t max_temp_file_size = 0) const;
TemporaryFileOnDiskHolder createTempFile(size_t max_temp_file_size) const;
bool hasTemporaryData() const { return tmp_data && !tmp_data->empty(); }
bool hasTemporaryFiles() const { return !temporary_files.empty(); }
struct TemporaryFiles
{
std::vector<TemporaryFileOnDiskHolder> files;
size_t sum_size_uncompressed = 0;
size_t sum_size_compressed = 0;
mutable std::mutex mutex;
bool empty() const
{
std::lock_guard lock(mutex);
return files.empty();
}
};
const TemporaryFiles & getTemporaryFiles() const { return temporary_files; }
const TemporaryDataOnDisk & getTemporaryData() const { return *tmp_data; }
/// Get data structure of the result.
Block getHeader(bool final) const;
@ -1148,7 +1133,7 @@ private:
Poco::Logger * log = &Poco::Logger::get("Aggregator");
/// For external aggregation.
mutable TemporaryFiles temporary_files;
TemporaryDataOnDiskPtr tmp_data;
size_t min_bytes_for_prefetch = 0;
@ -1251,7 +1236,7 @@ private:
void writeToTemporaryFileImpl(
AggregatedDataVariants & data_variants,
Method & method,
NativeWriter & out) const;
TemporaryFileStream & out) const;
/// Merge NULL key data from hash table `src` into `dst`.
template <typename Method, typename Table>

View File

@ -30,6 +30,7 @@
#include <Storages/CompressionCodecSelector.h>
#include <Storages/StorageS3Settings.h>
#include <Disks/DiskLocal.h>
#include <Disks/DiskDecorator.h>
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <Disks/IO/ThreadPoolRemoteFSReader.h>
#include <Disks/IO/ThreadPoolReader.h>
@ -37,6 +38,7 @@
#include <TableFunctions/TableFunctionFactory.h>
#include <Interpreters/ActionLocksManager.h>
#include <Interpreters/ExternalLoaderXMLConfigRepository.h>
#include <Interpreters/TemporaryDataOnDisk.h>
#include <Core/Settings.h>
#include <Core/SettingsQuirks.h>
#include <Access/AccessControl.h>
@ -188,7 +190,7 @@ struct ContextSharedPart : boost::noncopyable
ConfigurationPtr config; /// Global configuration settings.
String tmp_path; /// Path to the temporary files that occur when processing the request.
mutable VolumePtr tmp_volume; /// Volume for the the temporary files that occur when processing the request.
TemporaryDataOnDiskScopePtr temp_data_on_disk; /// Temporary files that occur when processing the request accounted here.
mutable std::unique_ptr<EmbeddedDictionaries> embedded_dictionaries; /// Metrica's dictionaries. Have lazy initialization.
mutable std::unique_ptr<ExternalDictionariesLoader> external_dictionaries_loader;
@ -681,10 +683,27 @@ Strings Context::getWarnings() const
return common_warnings;
}
/// TODO: remove, use `getTempDataOnDisk`
VolumePtr Context::getTemporaryVolume() const
{
auto lock = getLock();
return shared->tmp_volume;
if (shared->temp_data_on_disk)
return shared->temp_data_on_disk->getVolume();
return nullptr;
}
TemporaryDataOnDiskScopePtr Context::getTempDataOnDisk() const
{
auto lock = getLock();
if (this->temp_data_on_disk)
return this->temp_data_on_disk;
return shared->temp_data_on_disk;
}
void Context::setTempDataOnDisk(TemporaryDataOnDiskScopePtr temp_data_on_disk_)
{
auto lock = getLock();
this->temp_data_on_disk = std::move(temp_data_on_disk_);
}
void Context::setPath(const String & path)
@ -693,7 +712,7 @@ void Context::setPath(const String & path)
shared->path = path;
if (shared->tmp_path.empty() && !shared->tmp_volume)
if (shared->tmp_path.empty() && !shared->temp_data_on_disk)
shared->tmp_path = shared->path + "tmp/";
if (shared->flags_path.empty())
@ -712,9 +731,10 @@ void Context::setPath(const String & path)
shared->user_defined_path = shared->path + "user_defined/";
}
VolumePtr Context::setTemporaryStorage(const String & path, const String & policy_name)
VolumePtr Context::setTemporaryStorage(const String & path, const String & policy_name, size_t max_size)
{
std::lock_guard lock(shared->storage_policies_mutex);
VolumePtr volume;
if (policy_name.empty())
{
@ -723,21 +743,41 @@ VolumePtr Context::setTemporaryStorage(const String & path, const String & polic
shared->tmp_path += '/';
auto disk = std::make_shared<DiskLocal>("_tmp_default", shared->tmp_path, 0);
shared->tmp_volume = std::make_shared<SingleDiskVolume>("_tmp_default", disk, 0);
volume = std::make_shared<SingleDiskVolume>("_tmp_default", disk, 0);
}
else
{
StoragePolicyPtr tmp_policy = getStoragePolicySelector(lock)->get(policy_name);
if (tmp_policy->getVolumes().size() != 1)
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG,
"Policy '{} is used temporary files, such policy should have exactly one volume", policy_name);
shared->tmp_volume = tmp_policy->getVolume(0);
"Policy '{}' is used temporary files, such policy should have exactly one volume", policy_name);
volume = tmp_policy->getVolume(0);
}
if (shared->tmp_volume->getDisks().empty())
if (volume->getDisks().empty())
throw Exception("No disks volume for temporary files", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
return shared->tmp_volume;
for (const auto & disk : volume->getDisks())
{
if (!disk)
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "Temporary disk is null");
/// Check that underlying disk is local (can be wrapped in decorator)
DiskPtr disk_ptr = disk;
if (const auto * disk_decorator = dynamic_cast<const DiskDecorator *>(disk_ptr.get()))
disk_ptr = disk_decorator->getNestedDisk();
if (dynamic_cast<const DiskLocal *>(disk_ptr.get()) == nullptr)
{
const auto * disk_raw_ptr = disk_ptr.get();
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG,
"Disk '{}' ({}) is not local and can't be used for temporary files",
disk_ptr->getName(), typeid(*disk_raw_ptr).name());
}
}
shared->temp_data_on_disk = std::make_shared<TemporaryDataOnDiskScope>(volume, max_size);
return volume;
}
void Context::setFlagsPath(const String & path)
@ -2897,14 +2937,13 @@ void Context::shutdown()
}
}
// Special volumes might also use disks that require shutdown.
if (shared->tmp_volume)
/// Special volumes might also use disks that require shutdown.
auto & tmp_data = shared->temp_data_on_disk;
if (tmp_data && tmp_data->getVolume())
{
auto & disks = shared->tmp_volume->getDisks();
auto & disks = tmp_data->getVolume()->getDisks();
for (auto & disk : disks)
{
disk->shutdown();
}
}
shared->shutdown();

View File

@ -161,6 +161,8 @@ using ReadTaskCallback = std::function<String()>;
using MergeTreeReadTaskCallback = std::function<std::optional<PartitionReadResponse>(PartitionReadRequest)>;
class TemporaryDataOnDiskScope;
using TemporaryDataOnDiskScopePtr = std::shared_ptr<TemporaryDataOnDiskScope>;
#if USE_ROCKSDB
class MergeTreeMetadataCache;
@ -362,6 +364,8 @@ private:
/// A flag, used to mark if reader needs to apply deleted rows mask.
bool apply_deleted_mask = true;
/// Temporary data for query execution accounting.
TemporaryDataOnDiskScopePtr temp_data_on_disk;
public:
/// Some counters for current query execution.
/// Most of them are workarounds and should be removed in the future.
@ -435,7 +439,10 @@ public:
/// A list of warnings about server configuration to place in `system.warnings` table.
Strings getWarnings() const;
VolumePtr getTemporaryVolume() const;
VolumePtr getTemporaryVolume() const; /// TODO: remove, use `getTempDataOnDisk`
TemporaryDataOnDiskScopePtr getTempDataOnDisk() const;
void setTempDataOnDisk(TemporaryDataOnDiskScopePtr temp_data_on_disk_);
void setPath(const String & path);
void setFlagsPath(const String & path);
@ -446,7 +453,7 @@ public:
void addWarningMessage(const String & msg) const;
VolumePtr setTemporaryStorage(const String & path, const String & policy_name = "");
VolumePtr setTemporaryStorage(const String & path, const String & policy_name, size_t max_size);
using ConfigurationPtr = Poco::AutoPtr<Poco::Util::AbstractConfiguration>;

View File

@ -1453,7 +1453,7 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
settings.max_bytes_before_remerge_sort,
settings.remerge_sort_lowered_memory_bytes_ratio,
settings.max_bytes_before_external_sort,
this->context->getTemporaryVolume(),
this->context->getTempDataOnDisk(),
settings.min_free_disk_space_for_temporary_data,
settings.optimize_sorting_by_input_stream_properties);
sorting_step->setStepDescription(fmt::format("Sort {} before JOIN", join_pos));
@ -2354,7 +2354,7 @@ static Aggregator::Params getAggregatorParams(
settings.empty_result_for_aggregation_by_empty_set
|| (settings.empty_result_for_aggregation_by_constant_keys_on_empty_set && keys.empty()
&& query_analyzer.hasConstAggregationKeys()),
context.getTemporaryVolume(),
context.getTempDataOnDisk(),
settings.max_threads,
settings.min_free_disk_space_for_temporary_data,
settings.compile_aggregate_expressions,
@ -2616,7 +2616,7 @@ void InterpreterSelectQuery::executeWindow(QueryPlan & query_plan)
settings.max_bytes_before_remerge_sort,
settings.remerge_sort_lowered_memory_bytes_ratio,
settings.max_bytes_before_external_sort,
context->getTemporaryVolume(),
context->getTempDataOnDisk(),
settings.min_free_disk_space_for_temporary_data,
settings.optimize_sorting_by_input_stream_properties);
sorting_step->setStepDescription("Sorting for window '" + window.window_name + "'");
@ -2675,7 +2675,7 @@ void InterpreterSelectQuery::executeOrder(QueryPlan & query_plan, InputOrderInfo
settings.max_bytes_before_remerge_sort,
settings.remerge_sort_lowered_memory_bytes_ratio,
settings.max_bytes_before_external_sort,
context->getTemporaryVolume(),
context->getTempDataOnDisk(),
settings.min_free_disk_space_for_temporary_data,
settings.optimize_sorting_by_input_stream_properties);

View File

@ -4,12 +4,13 @@
#include <Columns/ColumnLowCardinality.h>
#include <Core/SortCursor.h>
#include <Formats/TemporaryFileStream.h>
#include <Formats/TemporaryFileStreamLegacy.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/MergeJoin.h>
#include <Interpreters/TableJoin.h>
#include <Interpreters/JoinUtils.h>
#include <Interpreters/TemporaryDataOnDisk.h>
#include <Interpreters/sortBlock.h>
#include <Processors/Sources/BlocksListSource.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
@ -1032,7 +1033,7 @@ std::shared_ptr<Block> MergeJoin::loadRightBlock(size_t pos) const
{
auto load_func = [&]() -> std::shared_ptr<Block>
{
TemporaryFileStream input(flushed_right_blocks[pos]->path(), materializeBlock(right_sample_block));
TemporaryFileStreamLegacy input(flushed_right_blocks[pos]->path(), materializeBlock(right_sample_block));
return std::make_shared<Block>(input.block_in->read());
};

View File

@ -69,7 +69,7 @@ static bool isUnlimitedQuery(const IAST * ast)
}
ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * ast, ContextPtr query_context)
ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * ast, ContextMutablePtr query_context)
{
EntryPtr res;
@ -198,7 +198,11 @@ ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * as
auto user_process_list_it = user_to_queries.find(client_info.current_user);
if (user_process_list_it == user_to_queries.end())
user_process_list_it = user_to_queries.emplace(client_info.current_user, this).first;
{
user_process_list_it = user_to_queries.emplace(std::piecewise_construct,
std::forward_as_tuple(client_info.current_user),
std::forward_as_tuple(query_context->getGlobalContext(), this)).first;
}
ProcessListForUser & user_process_list = user_process_list_it->second;
/// Actualize thread group info
@ -208,6 +212,11 @@ ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * as
std::lock_guard lock_thread_group(thread_group->mutex);
thread_group->performance_counters.setParent(&user_process_list.user_performance_counters);
thread_group->memory_tracker.setParent(&user_process_list.user_memory_tracker);
if (user_process_list.user_temp_data_on_disk)
{
query_context->setTempDataOnDisk(std::make_shared<TemporaryDataOnDiskScope>(
user_process_list.user_temp_data_on_disk, settings.max_temp_data_on_disk_size_for_query));
}
thread_group->query = query_;
thread_group->one_line_query = toOneLineQuery(query_);
thread_group->normalized_query_hash = normalizedQueryHash<false>(query_);
@ -556,9 +565,19 @@ ProcessList::Info ProcessList::getInfo(bool get_thread_list, bool get_profile_ev
ProcessListForUser::ProcessListForUser(ProcessList * global_process_list)
: ProcessListForUser(nullptr, global_process_list)
{}
ProcessListForUser::ProcessListForUser(ContextPtr global_context, ProcessList * global_process_list)
: user_overcommit_tracker(global_process_list, this)
{
user_memory_tracker.setOvercommitTracker(&user_overcommit_tracker);
if (global_context)
{
size_t size_limit = global_context->getSettingsRef().max_temp_data_on_disk_size_for_user;
user_temp_data_on_disk = std::make_shared<TemporaryDataOnDiskScope>(global_context->getTempDataOnDisk(), size_limit);
}
}

View File

@ -5,6 +5,8 @@
#include <Interpreters/CancellationCode.h>
#include <Interpreters/ClientInfo.h>
#include <Interpreters/QueryPriorities.h>
#include <Interpreters/TemporaryDataOnDisk.h>
#include <Interpreters/Context.h>
#include <QueryPipeline/BlockIO.h>
#include <QueryPipeline/ExecutionSpeedLimits.h>
#include <Storages/IStorage_fwd.h>
@ -236,6 +238,8 @@ struct ProcessListForUser
{
explicit ProcessListForUser(ProcessList * global_process_list);
ProcessListForUser(ContextPtr global_context, ProcessList * global_process_list);
/// query_id -> ProcessListElement(s). There can be multiple queries with the same query_id as long as all queries except one are cancelled.
using QueryToElement = std::unordered_map<String, QueryStatus *>;
QueryToElement queries;
@ -244,6 +248,8 @@ struct ProcessListForUser
/// Limit and counter for memory of all simultaneously running queries of single user.
MemoryTracker user_memory_tracker{VariableContext::User};
TemporaryDataOnDiskScopePtr user_temp_data_on_disk;
UserOvercommitTracker user_overcommit_tracker;
/// Count network usage for all simultaneously running queries of single user.
@ -257,6 +263,7 @@ struct ProcessListForUser
/// Clears network bandwidth Throttler, so it will not count periods of inactivity.
void resetTrackers()
{
/// TODO: should we drop user_temp_data_on_disk here?
user_memory_tracker.reset();
if (user_throttler)
user_throttler.reset();
@ -374,7 +381,7 @@ public:
* If timeout is passed - throw an exception.
* Don't count KILL QUERY queries.
*/
EntryPtr insert(const String & query_, const IAST * ast, ContextPtr query_context);
EntryPtr insert(const String & query_, const IAST * ast, ContextMutablePtr query_context);
/// Number of currently executing queries.
size_t size() const { return processes.size(); }

View File

@ -5,7 +5,7 @@
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Processors/Merges/MergingSortedTransform.h>
#include <Processors/Sources/TemporaryFileLazySource.h>
#include <Formats/TemporaryFileStream.h>
#include <Formats/TemporaryFileStreamLegacy.h>
#include <Disks/IVolume.h>
#include <Disks/TemporaryFileOnDisk.h>
@ -39,7 +39,7 @@ namespace
TemporaryFileOnDiskHolder flushToFile(const DiskPtr & disk, const Block & header, QueryPipelineBuilder pipeline, const String & codec)
{
auto tmp_file = std::make_unique<TemporaryFileOnDisk>(disk, CurrentMetrics::TemporaryFilesForJoin);
auto write_stat = TemporaryFileStream::write(tmp_file->getPath(), header, std::move(pipeline), codec);
auto write_stat = TemporaryFileStreamLegacy::write(tmp_file->getPath(), header, std::move(pipeline), codec);
ProfileEvents::increment(ProfileEvents::ExternalProcessingCompressedBytesTotal, write_stat.compressed_bytes);
ProfileEvents::increment(ProfileEvents::ExternalProcessingUncompressedBytesTotal, write_stat.uncompressed_bytes);

View File

@ -0,0 +1,270 @@
#include <Interpreters/TemporaryDataOnDisk.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/ReadBufferFromFile.h>
#include <Compression/CompressedWriteBuffer.h>
#include <Compression/CompressedReadBuffer.h>
#include <Formats/NativeWriter.h>
#include <Formats/NativeReader.h>
#include <Core/ProtocolDefines.h>
#include <Common/logger_useful.h>
namespace DB
{
namespace ErrorCodes
{
extern const int TOO_MANY_ROWS_OR_BYTES;
extern const int LOGICAL_ERROR;
extern const int NOT_ENOUGH_SPACE;
}
void TemporaryDataOnDiskScope::deltaAllocAndCheck(int compressed_delta, int uncompressed_delta)
{
if (parent)
parent->deltaAllocAndCheck(compressed_delta, uncompressed_delta);
/// check that we don't go negative
if ((compressed_delta < 0 && stat.compressed_size < static_cast<size_t>(-compressed_delta)) ||
(uncompressed_delta < 0 && stat.uncompressed_size < static_cast<size_t>(-uncompressed_delta)))
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Negative temporary data size");
}
size_t new_consumprion = stat.compressed_size + compressed_delta;
if (compressed_delta > 0 && limit && new_consumprion > limit)
throw Exception(ErrorCodes::TOO_MANY_ROWS_OR_BYTES, "Limit for temporary files size exceeded");
stat.compressed_size += compressed_delta;
stat.uncompressed_size += uncompressed_delta;
}
TemporaryFileStream & TemporaryDataOnDisk::createStream(const Block & header, CurrentMetrics::Value metric_scope, size_t max_file_size)
{
DiskPtr disk;
if (max_file_size > 0)
{
auto reservation = volume->reserve(max_file_size);
if (!reservation)
throw Exception("Not enough space on temporary disk", ErrorCodes::NOT_ENOUGH_SPACE);
disk = reservation->getDisk();
}
else
{
disk = volume->getDisk();
}
auto tmp_file = std::make_unique<TemporaryFileOnDisk>(disk, metric_scope);
std::lock_guard lock(mutex);
TemporaryFileStreamPtr & tmp_stream = streams.emplace_back(std::make_unique<TemporaryFileStream>(std::move(tmp_file), header, this));
return *tmp_stream;
}
std::vector<TemporaryFileStream *> TemporaryDataOnDisk::getStreams() const
{
std::vector<TemporaryFileStream *> res;
std::lock_guard lock(mutex);
res.reserve(streams.size());
for (const auto & stream : streams)
res.push_back(stream.get());
return res;
}
bool TemporaryDataOnDisk::empty() const
{
std::lock_guard lock(mutex);
return streams.empty();
}
struct TemporaryFileStream::OutputWriter
{
OutputWriter(const String & path, const Block & header_)
: out_file_buf(path)
, out_compressed_buf(out_file_buf)
, out_writer(out_compressed_buf, DBMS_TCP_PROTOCOL_VERSION, header_)
{
}
void write(const Block & block)
{
if (finalized)
throw Exception("Cannot write to finalized stream", ErrorCodes::LOGICAL_ERROR);
out_writer.write(block);
}
void finalize()
{
if (finalized)
return;
/// if we called finalize() explicitly, and got an exception,
/// we don't want to get it again in the destructor, so set finalized flag first
finalized = true;
out_writer.flush();
out_compressed_buf.finalize();
out_file_buf.finalize();
}
~OutputWriter()
{
try
{
finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
WriteBufferFromFile out_file_buf;
CompressedWriteBuffer out_compressed_buf;
NativeWriter out_writer;
bool finalized = false;
};
struct TemporaryFileStream::InputReader
{
InputReader(const String & path, const Block & header_)
: in_file_buf(path)
, in_compressed_buf(in_file_buf)
, in_reader(in_compressed_buf, header_, DBMS_TCP_PROTOCOL_VERSION)
{
}
explicit InputReader(const String & path)
: in_file_buf(path)
, in_compressed_buf(in_file_buf)
, in_reader(in_compressed_buf, DBMS_TCP_PROTOCOL_VERSION)
{
}
Block read() { return in_reader.read(); }
ReadBufferFromFile in_file_buf;
CompressedReadBuffer in_compressed_buf;
NativeReader in_reader;
};
TemporaryFileStream::TemporaryFileStream(TemporaryFileOnDiskHolder file_, const Block & header_, TemporaryDataOnDisk * parent_)
: parent(parent_)
, header(header_)
, file(std::move(file_))
, out_writer(std::make_unique<OutputWriter>(file->path(), header))
{
}
void TemporaryFileStream::write(const Block & block)
{
if (!out_writer)
throw Exception("Writing has been finished", ErrorCodes::LOGICAL_ERROR);
updateAllocAndCheck();
out_writer->write(block);
}
TemporaryFileStream::Stat TemporaryFileStream::finishWriting()
{
if (out_writer)
{
out_writer->finalize();
/// The amount of written data can be changed after finalization, some buffers can be flushed
/// Need to update the stat
updateAllocAndCheck();
out_writer.reset();
/// reader will be created at the first read call, not to consume memory before it is needed
}
return stat;
}
bool TemporaryFileStream::isWriteFinished() const
{
assert(in_reader == nullptr || out_writer == nullptr);
return out_writer == nullptr;
}
Block TemporaryFileStream::read()
{
if (!isWriteFinished())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Writing has been not finished");
if (isFinalized())
return {};
if (!in_reader)
{
in_reader = std::make_unique<InputReader>(file->path(), header);
}
Block block = in_reader->read();
if (!block)
{
/// finalize earlier to release resources, do not wait for the destructor
this->finalize();
}
return block;
}
void TemporaryFileStream::updateAllocAndCheck()
{
assert(out_writer);
size_t new_compressed_size = out_writer->out_compressed_buf.getCompressedBytes();
size_t new_uncompressed_size = out_writer->out_compressed_buf.getUncompressedBytes();
if (unlikely(new_compressed_size < stat.compressed_size || new_uncompressed_size < stat.uncompressed_size))
{
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Temporary file {} size decreased after write: compressed: {} -> {}, uncompressed: {} -> {}",
file->path(), new_compressed_size, stat.compressed_size, new_uncompressed_size, stat.uncompressed_size);
}
parent->deltaAllocAndCheck(new_compressed_size - stat.compressed_size, new_uncompressed_size - stat.uncompressed_size);
stat.compressed_size = new_compressed_size;
stat.uncompressed_size = new_uncompressed_size;
}
bool TemporaryFileStream::isFinalized() const
{
return file == nullptr;
}
void TemporaryFileStream::finalize()
{
if (file)
{
file.reset();
parent->deltaAllocAndCheck(-stat.compressed_size, -stat.uncompressed_size);
}
if (in_reader)
in_reader.reset();
if (out_writer)
{
out_writer->finalize();
out_writer.reset();
}
}
TemporaryFileStream::~TemporaryFileStream()
{
try
{
finalize();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
assert(false); /// deltaAllocAndCheck with negative can't throw exception
}
}
}

View File

@ -0,0 +1,139 @@
#pragma once
#include <boost/noncopyable.hpp>
#include <Interpreters/Context.h>
#include <Disks/TemporaryFileOnDisk.h>
#include <Disks/IVolume.h>
namespace DB
{
class TemporaryDataOnDiskScope;
using TemporaryDataOnDiskScopePtr = std::shared_ptr<TemporaryDataOnDiskScope>;
class TemporaryDataOnDisk;
using TemporaryDataOnDiskPtr = std::unique_ptr<TemporaryDataOnDisk>;
class TemporaryFileStream;
using TemporaryFileStreamPtr = std::unique_ptr<TemporaryFileStream>;
/*
* Used to account amount of temporary data written to disk.
* If limit is set, throws exception if limit is exceeded.
* Data can be nested, so parent scope accounts all data written by children.
* Scopes are: global -> per-user -> per-query -> per-purpose (sorting, aggregation, etc).
*/
class TemporaryDataOnDiskScope : boost::noncopyable
{
public:
struct StatAtomic
{
std::atomic<size_t> compressed_size;
std::atomic<size_t> uncompressed_size;
};
explicit TemporaryDataOnDiskScope(VolumePtr volume_, size_t limit_)
: volume(std::move(volume_)), limit(limit_)
{}
explicit TemporaryDataOnDiskScope(TemporaryDataOnDiskScopePtr parent_, size_t limit_)
: parent(std::move(parent_)), volume(parent->volume), limit(limit_)
{}
/// TODO: remove
/// Refactor all code that uses volume directly to use TemporaryDataOnDisk.
VolumePtr getVolume() const { return volume; }
protected:
void deltaAllocAndCheck(int compressed_delta, int uncompressed_delta);
TemporaryDataOnDiskScopePtr parent = nullptr;
VolumePtr volume;
StatAtomic stat;
size_t limit = 0;
};
/*
* Holds the set of temporary files.
* New file stream is created with `createStream`.
* Streams are owned by this object and will be deleted when it is deleted.
* It's a leaf node in temorarty data scope tree.
*/
class TemporaryDataOnDisk : private TemporaryDataOnDiskScope
{
friend class TemporaryFileStream; /// to allow it to call `deltaAllocAndCheck` to account data
public:
using TemporaryDataOnDiskScope::StatAtomic;
explicit TemporaryDataOnDisk(TemporaryDataOnDiskScopePtr parent_)
: TemporaryDataOnDiskScope(std::move(parent_), 0)
{}
/// If max_file_size > 0, then check that there's enough space on the disk and throw an exception in case of lack of free space
TemporaryFileStream & createStream(const Block & header, CurrentMetrics::Value metric_scope, size_t max_file_size = 0);
std::vector<TemporaryFileStream *> getStreams() const;
bool empty() const;
const StatAtomic & getStat() const { return stat; }
private:
mutable std::mutex mutex;
std::vector<TemporaryFileStreamPtr> streams TSA_GUARDED_BY(mutex);
};
/*
* Data can be written into this stream and then read.
* After finish writing, call `finishWriting` and then `read` to read the data.
* Account amount of data written to disk in parent scope.
*/
class TemporaryFileStream : boost::noncopyable
{
public:
struct Stat
{
/// Statistics for file
/// Non-atomic because we don't allow to `read` or `write` into single file from multiple threads
size_t compressed_size = 0;
size_t uncompressed_size = 0;
};
TemporaryFileStream(TemporaryFileOnDiskHolder file_, const Block & header_, TemporaryDataOnDisk * parent_);
void write(const Block & block);
Stat finishWriting();
bool isWriteFinished() const;
Block read();
const String & path() const { return file->getPath(); }
Block getHeader() const { return header; }
~TemporaryFileStream();
private:
void updateAllocAndCheck();
/// Finalize everything, close reader and writer, delete file
void finalize();
bool isFinalized() const;
TemporaryDataOnDisk * parent;
Block header;
TemporaryFileOnDiskHolder file;
Stat stat;
struct OutputWriter;
std::unique_ptr<OutputWriter> out_writer;
struct InputReader;
std::unique_ptr<InputReader> in_reader;
};
}

View File

@ -177,7 +177,7 @@ void AggregatingStep::transformPipeline(QueryPipelineBuilder & pipeline, const B
transform_params->params.group_by_two_level_threshold_bytes,
transform_params->params.max_bytes_before_external_group_by,
transform_params->params.empty_result_for_aggregation_by_empty_set,
transform_params->params.tmp_volume,
transform_params->params.tmp_data_scope,
transform_params->params.max_threads,
transform_params->params.min_free_disk_space,
transform_params->params.compile_aggregate_expressions,

View File

@ -12,6 +12,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
static ITransformingStep::Traits getTraits(size_t limit)
{
return ITransformingStep::Traits
@ -37,7 +42,7 @@ SortingStep::SortingStep(
size_t max_bytes_before_remerge_,
double remerge_lowered_memory_bytes_ratio_,
size_t max_bytes_before_external_sort_,
VolumePtr tmp_volume_,
TemporaryDataOnDiskScopePtr tmp_data_,
size_t min_free_disk_space_,
bool optimize_sorting_by_input_stream_properties_)
: ITransformingStep(input_stream, input_stream.header, getTraits(limit_))
@ -49,10 +54,13 @@ SortingStep::SortingStep(
, max_bytes_before_remerge(max_bytes_before_remerge_)
, remerge_lowered_memory_bytes_ratio(remerge_lowered_memory_bytes_ratio_)
, max_bytes_before_external_sort(max_bytes_before_external_sort_)
, tmp_volume(tmp_volume_)
, tmp_data(tmp_data_)
, min_free_disk_space(min_free_disk_space_)
, optimize_sorting_by_input_stream_properties(optimize_sorting_by_input_stream_properties_)
{
if (max_bytes_before_external_sort && tmp_data == nullptr)
throw Exception("Temporary data storage for external sorting is not provided", ErrorCodes::LOGICAL_ERROR);
/// TODO: check input_stream is partially sorted by the same description.
output_stream->sort_description = result_description;
output_stream->sort_scope = DataStream::SortScope::Global;
@ -189,7 +197,7 @@ void SortingStep::mergeSorting(QueryPipelineBuilder & pipeline, const SortDescri
max_bytes_before_remerge / pipeline.getNumStreams(),
remerge_lowered_memory_bytes_ratio,
max_bytes_before_external_sort,
tmp_volume,
std::make_unique<TemporaryDataOnDisk>(tmp_data),
min_free_disk_space);
});
}

View File

@ -2,7 +2,7 @@
#include <Processors/QueryPlan/ITransformingStep.h>
#include <Core/SortDescription.h>
#include <QueryPipeline/SizeLimits.h>
#include <Disks/IVolume.h>
#include <Interpreters/TemporaryDataOnDisk.h>
namespace DB
{
@ -21,7 +21,7 @@ public:
size_t max_bytes_before_remerge_,
double remerge_lowered_memory_bytes_ratio_,
size_t max_bytes_before_external_sort_,
VolumePtr tmp_volume_,
TemporaryDataOnDiskScopePtr tmp_data_,
size_t min_free_disk_space_,
bool optimize_sorting_by_input_stream_properties_);
@ -85,7 +85,8 @@ private:
size_t max_bytes_before_remerge = 0;
double remerge_lowered_memory_bytes_ratio = 0;
size_t max_bytes_before_external_sort = 0;
VolumePtr tmp_volume;
TemporaryDataOnDiskScopePtr tmp_data = nullptr;
size_t min_free_disk_space = 0;
const bool optimize_sorting_by_input_stream_properties = false;
};

View File

@ -1,5 +1,5 @@
#include <Processors/Sources/TemporaryFileLazySource.h>
#include <Formats/TemporaryFileStream.h>
#include <Formats/TemporaryFileStreamLegacy.h>
namespace DB
{
@ -18,7 +18,7 @@ Chunk TemporaryFileLazySource::generate()
return {};
if (!stream)
stream = std::make_unique<TemporaryFileStream>(path, header);
stream = std::make_unique<TemporaryFileStreamLegacy>(path, header);
auto block = stream->block_in->read();
if (!block)

View File

@ -5,7 +5,7 @@
namespace DB
{
struct TemporaryFileStream;
struct TemporaryFileStreamLegacy;
class TemporaryFileLazySource : public ISource
{
@ -22,7 +22,7 @@ private:
Block header;
bool done;
std::unique_ptr<TemporaryFileStream> stream;
std::unique_ptr<TemporaryFileStreamLegacy> stream;
};
}

View File

@ -34,7 +34,7 @@ TTLAggregationAlgorithm::TTLAggregationAlgorithm(
0,
settings.max_bytes_before_external_group_by,
settings.empty_result_for_aggregation_by_empty_set,
storage_.getContext()->getTemporaryVolume(),
storage_.getContext()->getTempDataOnDisk(),
settings.max_threads,
settings.min_free_disk_space_for_temporary_data,
settings.compile_aggregate_expressions,

View File

@ -53,33 +53,29 @@ namespace
class SourceFromNativeStream : public ISource
{
public:
SourceFromNativeStream(const Block & header, const std::string & path)
: ISource(header), file_in(path), compressed_in(file_in),
block_in(std::make_unique<NativeReader>(compressed_in, DBMS_TCP_PROTOCOL_VERSION))
{
}
explicit SourceFromNativeStream(TemporaryFileStream * tmp_stream_)
: ISource(tmp_stream_->getHeader())
, tmp_stream(tmp_stream_)
{}
String getName() const override { return "SourceFromNativeStream"; }
Chunk generate() override
{
if (!block_in)
if (!tmp_stream)
return {};
auto block = block_in->read();
auto block = tmp_stream->read();
if (!block)
{
block_in.reset();
tmp_stream = nullptr;
return {};
}
return convertToChunk(block);
}
private:
ReadBufferFromFile file_in;
CompressedReadBuffer compressed_in;
std::unique_ptr<NativeReader> block_in;
TemporaryFileStream * tmp_stream;
};
}
@ -564,7 +560,7 @@ void AggregatingTransform::initGenerate()
elapsed_seconds, src_rows / elapsed_seconds,
ReadableSize(src_bytes / elapsed_seconds));
if (params->aggregator.hasTemporaryFiles())
if (params->aggregator.hasTemporaryData())
{
if (variants.isConvertibleToTwoLevel())
variants.convertToTwoLevel();
@ -577,7 +573,7 @@ void AggregatingTransform::initGenerate()
if (many_data->num_finished.fetch_add(1) + 1 < many_data->variants.size())
return;
if (!params->aggregator.hasTemporaryFiles())
if (!params->aggregator.hasTemporaryData())
{
auto prepared_data = params->aggregator.prepareVariantsToMerge(many_data->variants);
auto prepared_data_ptr = std::make_shared<ManyAggregatedDataVariants>(std::move(prepared_data));
@ -604,25 +600,27 @@ void AggregatingTransform::initGenerate()
}
}
const auto & files = params->aggregator.getTemporaryFiles();
Pipe pipe;
const auto & tmp_data = params->aggregator.getTemporaryData();
Pipe pipe;
{
auto header = params->aggregator.getHeader(false);
Pipes pipes;
for (const auto & file : files.files)
pipes.emplace_back(Pipe(std::make_unique<SourceFromNativeStream>(header, file->path())));
for (auto * tmp_stream : tmp_data.getStreams())
pipes.emplace_back(Pipe(std::make_unique<SourceFromNativeStream>(tmp_stream)));
pipe = Pipe::unitePipes(std::move(pipes));
}
size_t num_streams = tmp_data.getStreams().size();
size_t compressed_size = tmp_data.getStat().compressed_size;
size_t uncompressed_size = tmp_data.getStat().uncompressed_size;
LOG_DEBUG(
log,
"Will merge {} temporary files of size {} compressed, {} uncompressed.",
files.files.size(),
ReadableSize(files.sum_size_compressed),
ReadableSize(files.sum_size_uncompressed));
num_streams,
ReadableSize(compressed_size),
ReadableSize(uncompressed_size));
addMergingAggregatedMemoryEfficientTransform(pipe, params, temporary_data_merge_threads);

View File

@ -30,21 +30,15 @@ namespace CurrentMetrics
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_ENOUGH_SPACE;
}
class BufferingToFileTransform : public IAccumulatingTransform
{
public:
BufferingToFileTransform(const Block & header, Poco::Logger * log_, std::string path_)
: IAccumulatingTransform(header, header), log(log_)
, path(std::move(path_)), file_buf_out(path), compressed_buf_out(file_buf_out)
, out_stream(std::make_unique<NativeWriter>(compressed_buf_out, 0, header))
BufferingToFileTransform(const Block & header, TemporaryFileStream & tmp_stream_, Poco::Logger * log_)
: IAccumulatingTransform(header, header)
, tmp_stream(tmp_stream_)
, log(log_)
{
LOG_INFO(log, "Sorting and writing part of data into temporary file {}", path);
LOG_INFO(log, "Sorting and writing part of data into temporary file {}", tmp_stream.path());
ProfileEvents::increment(ProfileEvents::ExternalSortWritePart);
}
@ -52,71 +46,37 @@ public:
void consume(Chunk chunk) override
{
out_stream->write(getInputPort().getHeader().cloneWithColumns(chunk.detachColumns()));
Block block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
tmp_stream.write(block);
}
Chunk generate() override
{
if (out_stream)
if (!tmp_stream.isWriteFinished())
{
out_stream->flush();
compressed_buf_out.next();
file_buf_out.next();
auto stat = tmp_stream.finishWriting();
auto stat = updateWriteStat();
ProfileEvents::increment(ProfileEvents::ExternalProcessingCompressedBytesTotal, stat.compressed_size);
ProfileEvents::increment(ProfileEvents::ExternalProcessingUncompressedBytesTotal, stat.uncompressed_size);
ProfileEvents::increment(ProfileEvents::ExternalSortCompressedBytes, stat.compressed_size);
ProfileEvents::increment(ProfileEvents::ExternalSortUncompressedBytes, stat.uncompressed_size);
LOG_INFO(log, "Done writing part of data into temporary file {}, compressed {}, uncompressed {} ",
path, ReadableSize(static_cast<double>(stat.compressed_size)), ReadableSize(static_cast<double>(stat.uncompressed_size)));
out_stream.reset();
file_in = std::make_unique<ReadBufferFromFile>(path);
compressed_in = std::make_unique<CompressedReadBuffer>(*file_in);
block_in = std::make_unique<NativeReader>(*compressed_in, getOutputPort().getHeader(), 0);
tmp_stream.path(), ReadableSize(static_cast<double>(stat.compressed_size)), ReadableSize(static_cast<double>(stat.uncompressed_size)));
}
if (!block_in)
return {};
auto block = block_in->read();
Block block = tmp_stream.read();
if (!block)
{
block_in.reset();
return {};
}
UInt64 num_rows = block.rows();
return Chunk(block.getColumns(), num_rows);
}
private:
struct Stat
{
size_t compressed_size = 0;
size_t uncompressed_size = 0;
};
Stat updateWriteStat()
{
Stat res{compressed_buf_out.getCompressedBytes(), compressed_buf_out.getUncompressedBytes()};
ProfileEvents::increment(ProfileEvents::ExternalProcessingCompressedBytesTotal, res.compressed_size);
ProfileEvents::increment(ProfileEvents::ExternalProcessingUncompressedBytesTotal, res.uncompressed_size);
ProfileEvents::increment(ProfileEvents::ExternalSortCompressedBytes, res.compressed_size);
ProfileEvents::increment(ProfileEvents::ExternalSortUncompressedBytes, res.uncompressed_size);
return res;
}
TemporaryFileStream & tmp_stream;
Poco::Logger * log;
std::string path;
WriteBufferFromFile file_buf_out;
CompressedWriteBuffer compressed_buf_out;
std::unique_ptr<NativeWriter> out_stream;
std::unique_ptr<ReadBufferFromFile> file_in;
std::unique_ptr<CompressedReadBuffer> compressed_in;
std::unique_ptr<NativeReader> block_in;
};
MergeSortingTransform::MergeSortingTransform(
@ -128,13 +88,13 @@ MergeSortingTransform::MergeSortingTransform(
size_t max_bytes_before_remerge_,
double remerge_lowered_memory_bytes_ratio_,
size_t max_bytes_before_external_sort_,
VolumePtr tmp_volume_,
TemporaryDataOnDiskPtr tmp_data_,
size_t min_free_disk_space_)
: SortingTransform(header, description_, max_merged_block_size_, limit_, increase_sort_description_compile_attempts)
, max_bytes_before_remerge(max_bytes_before_remerge_)
, remerge_lowered_memory_bytes_ratio(remerge_lowered_memory_bytes_ratio_)
, max_bytes_before_external_sort(max_bytes_before_external_sort_)
, tmp_volume(tmp_volume_)
, tmp_data(std::move(tmp_data_))
, min_free_disk_space(min_free_disk_space_)
{
}
@ -209,17 +169,12 @@ void MergeSortingTransform::consume(Chunk chunk)
*/
if (max_bytes_before_external_sort && sum_bytes_in_blocks > max_bytes_before_external_sort)
{
size_t size = sum_bytes_in_blocks + min_free_disk_space;
auto reservation = tmp_volume->reserve(size);
if (!reservation)
throw Exception("Not enough space for external sort in temporary storage", ErrorCodes::NOT_ENOUGH_SPACE);
/// If there's less free disk space than reserve_size, an exception will be thrown
size_t reserve_size = sum_bytes_in_blocks + min_free_disk_space;
auto & tmp_stream = tmp_data->createStream(header_without_constants, CurrentMetrics::TemporaryFilesForSort, reserve_size);
temporary_files.emplace_back(std::make_unique<TemporaryFileOnDisk>(reservation->getDisk(), CurrentMetrics::TemporaryFilesForSort));
const std::string & path = temporary_files.back()->path();
merge_sorter
= std::make_unique<MergeSorter>(header_without_constants, std::move(chunks), description, max_merged_block_size, limit);
auto current_processor = std::make_shared<BufferingToFileTransform>(header_without_constants, log, path);
merge_sorter = std::make_unique<MergeSorter>(header_without_constants, std::move(chunks), description, max_merged_block_size, limit);
auto current_processor = std::make_shared<BufferingToFileTransform>(header_without_constants, tmp_stream, log);
processors.emplace_back(current_processor);
@ -261,13 +216,14 @@ void MergeSortingTransform::generate()
{
if (!generated_prefix)
{
if (temporary_files.empty())
size_t num_tmp_files = tmp_data ? tmp_data->getStreams().size() : 0;
if (num_tmp_files == 0)
merge_sorter
= std::make_unique<MergeSorter>(header_without_constants, std::move(chunks), description, max_merged_block_size, limit);
else
{
ProfileEvents::increment(ProfileEvents::ExternalSortMerge);
LOG_INFO(log, "There are {} temporary sorted parts to merge", temporary_files.size());
LOG_INFO(log, "There are {} temporary sorted parts to merge", num_tmp_files);
processors.emplace_back(std::make_shared<MergeSorterSource>(
header_without_constants, std::move(chunks), description, max_merged_block_size, limit));

View File

@ -4,6 +4,7 @@
#include <Core/SortDescription.h>
#include <Common/filesystemHelpers.h>
#include <Disks/TemporaryFileOnDisk.h>
#include <Interpreters/TemporaryDataOnDisk.h>
#include <Common/logger_useful.h>
@ -28,7 +29,7 @@ public:
size_t max_bytes_before_remerge_,
double remerge_lowered_memory_bytes_ratio_,
size_t max_bytes_before_external_sort_,
VolumePtr tmp_volume_,
TemporaryDataOnDiskPtr tmp_data_,
size_t min_free_disk_space_);
String getName() const override { return "MergeSortingTransform"; }
@ -44,7 +45,7 @@ private:
size_t max_bytes_before_remerge;
double remerge_lowered_memory_bytes_ratio;
size_t max_bytes_before_external_sort;
VolumePtr tmp_volume;
TemporaryDataOnDiskPtr tmp_data;
size_t min_free_disk_space;
size_t sum_rows_in_blocks = 0;
@ -55,9 +56,6 @@ private:
/// If remerge doesn't save memory at least several times, mark it as useless and don't do it anymore.
bool remerge_is_useful = true;
/// Everything below is for external sorting.
std::vector<TemporaryFileOnDiskHolder> temporary_files;
/// Merge all accumulated blocks to keep no more than limit rows.
void remerge();

View File

@ -310,7 +310,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
settings.group_by_two_level_threshold_bytes,
settings.max_bytes_before_external_group_by,
settings.empty_result_for_aggregation_by_empty_set,
context->getTemporaryVolume(),
context->getTempDataOnDisk(),
settings.max_threads,
settings.min_free_disk_space_for_temporary_data,
settings.compile_aggregate_expressions,

View File

@ -30,7 +30,7 @@ SELECT intDiv(number, 2) AS k, count(), argMax(toString(number), number) FROM (S
SELECT '*** External aggregation.';
SET max_bytes_before_external_group_by=1000000;
SET max_bytes_before_external_group_by = 1000000;
SET group_by_two_level_threshold = 100000;
SELECT '**** totals_mode = after_having_auto';