diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index d97473d0a36..96354443b1f 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -219,7 +219,11 @@ struct ContextSharedPart : boost::noncopyable ConfigurationPtr config; /// Global configuration settings. String tmp_path; /// Path to the temporary files that occur when processing the request. - TemporaryDataOnDiskScopePtr temp_data_on_disk; /// Temporary files that occur when processing the request accounted here. + + /// All temporary files that occur when processing the requests accounted here. + /// Child scopes for more fine-grained accounting are created per user/query/etc. + /// Initialized once during server startup. + TemporaryDataOnDiskScopePtr root_temp_data_on_disk; mutable std::unique_ptr embedded_dictionaries; /// Metrica's dictionaries. Have lazy initialization. mutable std::unique_ptr external_dictionaries_loader; @@ -752,25 +756,35 @@ Strings Context::getWarnings() const } /// TODO: remove, use `getTempDataOnDisk` -VolumePtr Context::getTemporaryVolume() const +VolumePtr Context::getGlobalTemporaryVolume() const { auto lock = getLock(); - if (shared->temp_data_on_disk) - return shared->temp_data_on_disk->getVolume(); + /// Calling this method we just bypass the `temp_data_on_disk` and write to the file on the volume directly. + /// Volume is the same for `root_temp_data_on_disk` (always set) and `temp_data_on_disk` (if it's set). + if (shared->root_temp_data_on_disk) + return shared->root_temp_data_on_disk->getVolume(); return nullptr; } TemporaryDataOnDiskScopePtr Context::getTempDataOnDisk() const { - auto lock = getLock(); if (this->temp_data_on_disk) return this->temp_data_on_disk; - return shared->temp_data_on_disk; + + auto lock = getLock(); + return shared->root_temp_data_on_disk; +} + +TemporaryDataOnDiskScopePtr Context::getSharedTempDataOnDisk() const +{ + auto lock = getLock(); + return shared->root_temp_data_on_disk; } void Context::setTempDataOnDisk(TemporaryDataOnDiskScopePtr temp_data_on_disk_) { - auto lock = getLock(); + /// It's set from `ProcessList::insert` in `executeQueryImpl` before query execution + /// so no races with `getTempDataOnDisk` which is called from query execution. this->temp_data_on_disk = std::move(temp_data_on_disk_); } @@ -780,7 +794,7 @@ void Context::setPath(const String & path) shared->path = path; - if (shared->tmp_path.empty() && !shared->temp_data_on_disk) + if (shared->tmp_path.empty() && !shared->root_temp_data_on_disk) shared->tmp_path = shared->path + "tmp/"; if (shared->flags_path.empty()) @@ -836,6 +850,11 @@ static VolumePtr createLocalSingleDiskVolume(const std::string & path) void Context::setTemporaryStoragePath(const String & path, size_t max_size) { + auto lock = getLock(); + + if (shared->root_temp_data_on_disk) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary storage is already set"); + shared->tmp_path = path; if (!shared->tmp_path.ends_with('/')) shared->tmp_path += '/'; @@ -847,17 +866,23 @@ void Context::setTemporaryStoragePath(const String & path, size_t max_size) setupTmpPath(shared->log, disk->getPath()); } - shared->temp_data_on_disk = std::make_shared(volume, max_size); + shared->root_temp_data_on_disk = std::make_shared(volume, max_size); } void Context::setTemporaryStoragePolicy(const String & policy_name, size_t max_size) { - std::lock_guard lock(shared->storage_policies_mutex); + StoragePolicyPtr tmp_policy; + { + /// lock in required only for accessing `shared->merge_tree_storage_policy_selector` + /// StoragePolicy itself is immutable. + std::lock_guard storage_policies_lock(shared->storage_policies_mutex); + tmp_policy = getStoragePolicySelector(storage_policies_lock)->get(policy_name); + } - StoragePolicyPtr tmp_policy = getStoragePolicySelector(lock)->get(policy_name); if (tmp_policy->getVolumes().size() != 1) - throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, + throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "Policy '{}' is used temporary files, such policy should have exactly one volume", policy_name); + VolumePtr volume = tmp_policy->getVolume(0); if (volume->getDisks().empty()) @@ -882,12 +907,21 @@ void Context::setTemporaryStoragePolicy(const String & policy_name, size_t max_s setupTmpPath(shared->log, disk->getPath()); } - shared->temp_data_on_disk = std::make_shared(volume, max_size); -} + auto lock = getLock(); + if (shared->root_temp_data_on_disk) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary storage is already set"); + + shared->root_temp_data_on_disk = std::make_shared(volume, max_size); +} void Context::setTemporaryStorageInCache(const String & cache_disk_name, size_t max_size) { + auto lock = getLock(); + + if (shared->root_temp_data_on_disk) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary storage is already set"); + auto disk_ptr = getDisk(cache_disk_name); if (!disk_ptr) throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "Disk '{}' is not found", cache_disk_name); @@ -904,7 +938,7 @@ void Context::setTemporaryStorageInCache(const String & cache_disk_name, size_t shared->tmp_path = file_cache->getBasePath(); VolumePtr volume = createLocalSingleDiskVolume(shared->tmp_path); - shared->temp_data_on_disk = std::make_shared(volume, file_cache.get(), max_size); + shared->root_temp_data_on_disk = std::make_shared(volume, file_cache.get(), max_size); } void Context::setFlagsPath(const String & path) @@ -3460,7 +3494,7 @@ void Context::shutdown() } /// Special volumes might also use disks that require shutdown. - auto & tmp_data = shared->temp_data_on_disk; + auto & tmp_data = shared->root_temp_data_on_disk; if (tmp_data && tmp_data->getVolume()) { auto & disks = tmp_data->getVolume()->getDisks(); diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 99e83046323..d5ade8c02c7 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -472,9 +472,10 @@ public: /// A list of warnings about server configuration to place in `system.warnings` table. Strings getWarnings() const; - VolumePtr getTemporaryVolume() const; /// TODO: remove, use `getTempDataOnDisk` + VolumePtr getGlobalTemporaryVolume() const; /// TODO: remove, use `getTempDataOnDisk` TemporaryDataOnDiskScopePtr getTempDataOnDisk() const; + TemporaryDataOnDiskScopePtr getSharedTempDataOnDisk() const; void setTempDataOnDisk(TemporaryDataOnDiskScopePtr temp_data_on_disk_); void setPath(const String & path); diff --git a/src/Interpreters/JoinedTables.cpp b/src/Interpreters/JoinedTables.cpp index 80b2fe5302c..37bce592f37 100644 --- a/src/Interpreters/JoinedTables.cpp +++ b/src/Interpreters/JoinedTables.cpp @@ -308,7 +308,7 @@ std::shared_ptr JoinedTables::makeTableJoin(const ASTSelectQuery & se auto settings = context->getSettingsRef(); MultiEnum join_algorithm = settings.join_algorithm; - auto table_join = std::make_shared(settings, context->getTemporaryVolume()); + auto table_join = std::make_shared(settings, context->getGlobalTemporaryVolume()); const ASTTablesInSelectQueryElement * ast_join = select_query.join(); const auto & table_to_join = ast_join->table_expression->as(); diff --git a/src/Interpreters/MergeJoin.cpp b/src/Interpreters/MergeJoin.cpp index a5ab6b25d02..07f5ae31ed6 100644 --- a/src/Interpreters/MergeJoin.cpp +++ b/src/Interpreters/MergeJoin.cpp @@ -1045,7 +1045,7 @@ std::shared_ptr MergeJoin::loadRightBlock(size_t pos) const void MergeJoin::initRightTableWriter() { - disk_writer = std::make_unique(size_limits, table_join->getTemporaryVolume(), + disk_writer = std::make_unique(size_limits, table_join->getGlobalTemporaryVolume(), right_sample_block, right_sort_description, max_rows_in_right_block, max_files_to_merge, table_join->temporaryFilesCodec()); disk_writer->addBlocks(right_blocks); diff --git a/src/Interpreters/ProcessList.cpp b/src/Interpreters/ProcessList.cpp index d66d4bdea64..51053bd2884 100644 --- a/src/Interpreters/ProcessList.cpp +++ b/src/Interpreters/ProcessList.cpp @@ -625,7 +625,7 @@ ProcessListForUser::ProcessListForUser(ContextPtr global_context, ProcessList * if (global_context) { size_t size_limit = global_context->getSettingsRef().max_temporary_data_on_disk_size_for_user; - user_temp_data_on_disk = std::make_shared(global_context->getTempDataOnDisk(), size_limit); + user_temp_data_on_disk = std::make_shared(global_context->getSharedTempDataOnDisk(), size_limit); } } diff --git a/src/Interpreters/TableJoin.h b/src/Interpreters/TableJoin.h index 84390adc0df..95471885a2a 100644 --- a/src/Interpreters/TableJoin.h +++ b/src/Interpreters/TableJoin.h @@ -209,7 +209,7 @@ public: JoinStrictness strictness() const { return table_join.strictness; } bool sameStrictnessAndKind(JoinStrictness, JoinKind) const; const SizeLimits & sizeLimits() const { return size_limits; } - VolumePtr getTemporaryVolume() { return tmp_volume; } + VolumePtr getGlobalTemporaryVolume() { return tmp_volume; } bool isEnabledAlgorithm(JoinAlgorithm val) const { diff --git a/src/Planner/PlannerJoinTree.cpp b/src/Planner/PlannerJoinTree.cpp index 0479170eba1..ab7086c820f 100644 --- a/src/Planner/PlannerJoinTree.cpp +++ b/src/Planner/PlannerJoinTree.cpp @@ -994,7 +994,7 @@ JoinTreeQueryPlan buildQueryPlanForJoinNode(const QueryTreeNodePtr & join_table_ } } - auto table_join = std::make_shared(settings, query_context->getTemporaryVolume()); + auto table_join = std::make_shared(settings, query_context->getGlobalTemporaryVolume()); table_join->getTableJoin() = join_node.toASTTableJoin()->as(); table_join->getTableJoin().kind = join_kind; diff --git a/src/Server/HTTPHandler.cpp b/src/Server/HTTPHandler.cpp index bfdc067f733..16718a0a218 100644 --- a/src/Server/HTTPHandler.cpp +++ b/src/Server/HTTPHandler.cpp @@ -623,7 +623,7 @@ void HTTPHandler::processQuery( if (buffer_until_eof) { - const std::string tmp_path(server.context()->getTemporaryVolume()->getDisk()->getPath()); + const std::string tmp_path(server.context()->getGlobalTemporaryVolume()->getDisk()->getPath()); const std::string tmp_path_template(fs::path(tmp_path) / "http_buffers/"); auto create_tmp_disk_buffer = [tmp_path_template] (const WriteBufferPtr &) diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index d1dfa96b87c..d1e062be92a 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -273,7 +273,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() ctx->compression_codec = global_ctx->data->getCompressionCodecForPart( global_ctx->merge_list_element_ptr->total_size_bytes_compressed, global_ctx->new_data_part->ttl_infos, global_ctx->time_of_merge); - ctx->tmp_disk = global_ctx->context->getTemporaryVolume()->getDisk(); + ctx->tmp_disk = global_ctx->context->getGlobalTemporaryVolume()->getDisk(); switch (global_ctx->chosen_merge_algorithm) { diff --git a/src/Storages/StorageMemory.cpp b/src/Storages/StorageMemory.cpp index c568178a469..d469113d358 100644 --- a/src/Storages/StorageMemory.cpp +++ b/src/Storages/StorageMemory.cpp @@ -406,7 +406,7 @@ namespace void StorageMemory::backupData(BackupEntriesCollector & backup_entries_collector, const String & data_path_in_backup, const std::optional & /* partitions */) { - auto temp_disk = backup_entries_collector.getContext()->getTemporaryVolume()->getDisk(0); + auto temp_disk = backup_entries_collector.getContext()->getGlobalTemporaryVolume()->getDisk(0); auto max_compress_block_size = backup_entries_collector.getContext()->getSettingsRef().max_compress_block_size; backup_entries_collector.addBackupEntries(std::make_shared( backup_entries_collector.getContext(), @@ -426,7 +426,7 @@ void StorageMemory::restoreDataFromBackup(RestorerFromBackup & restorer, const S if (!restorer.isNonEmptyTableAllowed() && total_size_bytes) RestorerFromBackup::throwTableIsNotEmpty(getStorageID()); - auto temp_disk = restorer.getContext()->getTemporaryVolume()->getDisk(0); + auto temp_disk = restorer.getContext()->getGlobalTemporaryVolume()->getDisk(0); restorer.addDataRestoreTask( [storage = std::static_pointer_cast(shared_from_this()), backup, data_path_in_backup, temp_disk]