mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-23 16:12:01 +00:00
Merge pull request #48219 from ClickHouse/vdimir/lock_order_47072
This commit is contained in:
commit
e7a5c96353
@ -219,7 +219,11 @@ struct ContextSharedPart : boost::noncopyable
|
||||
ConfigurationPtr config; /// Global configuration settings.
|
||||
|
||||
String tmp_path; /// Path to the temporary files that occur when processing the request.
|
||||
TemporaryDataOnDiskScopePtr temp_data_on_disk; /// Temporary files that occur when processing the request accounted here.
|
||||
|
||||
/// All temporary files that occur when processing the requests accounted here.
|
||||
/// Child scopes for more fine-grained accounting are created per user/query/etc.
|
||||
/// Initialized once during server startup.
|
||||
TemporaryDataOnDiskScopePtr root_temp_data_on_disk;
|
||||
|
||||
mutable std::unique_ptr<EmbeddedDictionaries> embedded_dictionaries; /// Metrica's dictionaries. Have lazy initialization.
|
||||
mutable std::unique_ptr<ExternalDictionariesLoader> external_dictionaries_loader;
|
||||
@ -752,25 +756,35 @@ Strings Context::getWarnings() const
|
||||
}
|
||||
|
||||
/// TODO: remove, use `getTempDataOnDisk`
|
||||
VolumePtr Context::getTemporaryVolume() const
|
||||
VolumePtr Context::getGlobalTemporaryVolume() const
|
||||
{
|
||||
auto lock = getLock();
|
||||
if (shared->temp_data_on_disk)
|
||||
return shared->temp_data_on_disk->getVolume();
|
||||
/// Calling this method we just bypass the `temp_data_on_disk` and write to the file on the volume directly.
|
||||
/// Volume is the same for `root_temp_data_on_disk` (always set) and `temp_data_on_disk` (if it's set).
|
||||
if (shared->root_temp_data_on_disk)
|
||||
return shared->root_temp_data_on_disk->getVolume();
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
TemporaryDataOnDiskScopePtr Context::getTempDataOnDisk() const
|
||||
{
|
||||
auto lock = getLock();
|
||||
if (this->temp_data_on_disk)
|
||||
return this->temp_data_on_disk;
|
||||
return shared->temp_data_on_disk;
|
||||
|
||||
auto lock = getLock();
|
||||
return shared->root_temp_data_on_disk;
|
||||
}
|
||||
|
||||
TemporaryDataOnDiskScopePtr Context::getSharedTempDataOnDisk() const
|
||||
{
|
||||
auto lock = getLock();
|
||||
return shared->root_temp_data_on_disk;
|
||||
}
|
||||
|
||||
void Context::setTempDataOnDisk(TemporaryDataOnDiskScopePtr temp_data_on_disk_)
|
||||
{
|
||||
auto lock = getLock();
|
||||
/// It's set from `ProcessList::insert` in `executeQueryImpl` before query execution
|
||||
/// so no races with `getTempDataOnDisk` which is called from query execution.
|
||||
this->temp_data_on_disk = std::move(temp_data_on_disk_);
|
||||
}
|
||||
|
||||
@ -780,7 +794,7 @@ void Context::setPath(const String & path)
|
||||
|
||||
shared->path = path;
|
||||
|
||||
if (shared->tmp_path.empty() && !shared->temp_data_on_disk)
|
||||
if (shared->tmp_path.empty() && !shared->root_temp_data_on_disk)
|
||||
shared->tmp_path = shared->path + "tmp/";
|
||||
|
||||
if (shared->flags_path.empty())
|
||||
@ -836,6 +850,11 @@ static VolumePtr createLocalSingleDiskVolume(const std::string & path)
|
||||
|
||||
void Context::setTemporaryStoragePath(const String & path, size_t max_size)
|
||||
{
|
||||
auto lock = getLock();
|
||||
|
||||
if (shared->root_temp_data_on_disk)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary storage is already set");
|
||||
|
||||
shared->tmp_path = path;
|
||||
if (!shared->tmp_path.ends_with('/'))
|
||||
shared->tmp_path += '/';
|
||||
@ -847,17 +866,23 @@ void Context::setTemporaryStoragePath(const String & path, size_t max_size)
|
||||
setupTmpPath(shared->log, disk->getPath());
|
||||
}
|
||||
|
||||
shared->temp_data_on_disk = std::make_shared<TemporaryDataOnDiskScope>(volume, max_size);
|
||||
shared->root_temp_data_on_disk = std::make_shared<TemporaryDataOnDiskScope>(volume, max_size);
|
||||
}
|
||||
|
||||
void Context::setTemporaryStoragePolicy(const String & policy_name, size_t max_size)
|
||||
{
|
||||
std::lock_guard lock(shared->storage_policies_mutex);
|
||||
StoragePolicyPtr tmp_policy;
|
||||
{
|
||||
/// lock in required only for accessing `shared->merge_tree_storage_policy_selector`
|
||||
/// StoragePolicy itself is immutable.
|
||||
std::lock_guard storage_policies_lock(shared->storage_policies_mutex);
|
||||
tmp_policy = getStoragePolicySelector(storage_policies_lock)->get(policy_name);
|
||||
}
|
||||
|
||||
StoragePolicyPtr tmp_policy = getStoragePolicySelector(lock)->get(policy_name);
|
||||
if (tmp_policy->getVolumes().size() != 1)
|
||||
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG,
|
||||
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG,
|
||||
"Policy '{}' is used temporary files, such policy should have exactly one volume", policy_name);
|
||||
|
||||
VolumePtr volume = tmp_policy->getVolume(0);
|
||||
|
||||
if (volume->getDisks().empty())
|
||||
@ -882,12 +907,21 @@ void Context::setTemporaryStoragePolicy(const String & policy_name, size_t max_s
|
||||
setupTmpPath(shared->log, disk->getPath());
|
||||
}
|
||||
|
||||
shared->temp_data_on_disk = std::make_shared<TemporaryDataOnDiskScope>(volume, max_size);
|
||||
}
|
||||
auto lock = getLock();
|
||||
|
||||
if (shared->root_temp_data_on_disk)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary storage is already set");
|
||||
|
||||
shared->root_temp_data_on_disk = std::make_shared<TemporaryDataOnDiskScope>(volume, max_size);
|
||||
}
|
||||
|
||||
void Context::setTemporaryStorageInCache(const String & cache_disk_name, size_t max_size)
|
||||
{
|
||||
auto lock = getLock();
|
||||
|
||||
if (shared->root_temp_data_on_disk)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Temporary storage is already set");
|
||||
|
||||
auto disk_ptr = getDisk(cache_disk_name);
|
||||
if (!disk_ptr)
|
||||
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "Disk '{}' is not found", cache_disk_name);
|
||||
@ -904,7 +938,7 @@ void Context::setTemporaryStorageInCache(const String & cache_disk_name, size_t
|
||||
|
||||
shared->tmp_path = file_cache->getBasePath();
|
||||
VolumePtr volume = createLocalSingleDiskVolume(shared->tmp_path);
|
||||
shared->temp_data_on_disk = std::make_shared<TemporaryDataOnDiskScope>(volume, file_cache.get(), max_size);
|
||||
shared->root_temp_data_on_disk = std::make_shared<TemporaryDataOnDiskScope>(volume, file_cache.get(), max_size);
|
||||
}
|
||||
|
||||
void Context::setFlagsPath(const String & path)
|
||||
@ -3460,7 +3494,7 @@ void Context::shutdown()
|
||||
}
|
||||
|
||||
/// Special volumes might also use disks that require shutdown.
|
||||
auto & tmp_data = shared->temp_data_on_disk;
|
||||
auto & tmp_data = shared->root_temp_data_on_disk;
|
||||
if (tmp_data && tmp_data->getVolume())
|
||||
{
|
||||
auto & disks = tmp_data->getVolume()->getDisks();
|
||||
|
@ -472,9 +472,10 @@ public:
|
||||
/// A list of warnings about server configuration to place in `system.warnings` table.
|
||||
Strings getWarnings() const;
|
||||
|
||||
VolumePtr getTemporaryVolume() const; /// TODO: remove, use `getTempDataOnDisk`
|
||||
VolumePtr getGlobalTemporaryVolume() const; /// TODO: remove, use `getTempDataOnDisk`
|
||||
|
||||
TemporaryDataOnDiskScopePtr getTempDataOnDisk() const;
|
||||
TemporaryDataOnDiskScopePtr getSharedTempDataOnDisk() const;
|
||||
void setTempDataOnDisk(TemporaryDataOnDiskScopePtr temp_data_on_disk_);
|
||||
|
||||
void setPath(const String & path);
|
||||
|
@ -308,7 +308,7 @@ std::shared_ptr<TableJoin> JoinedTables::makeTableJoin(const ASTSelectQuery & se
|
||||
|
||||
auto settings = context->getSettingsRef();
|
||||
MultiEnum<JoinAlgorithm> join_algorithm = settings.join_algorithm;
|
||||
auto table_join = std::make_shared<TableJoin>(settings, context->getTemporaryVolume());
|
||||
auto table_join = std::make_shared<TableJoin>(settings, context->getGlobalTemporaryVolume());
|
||||
|
||||
const ASTTablesInSelectQueryElement * ast_join = select_query.join();
|
||||
const auto & table_to_join = ast_join->table_expression->as<ASTTableExpression &>();
|
||||
|
@ -1045,7 +1045,7 @@ std::shared_ptr<Block> MergeJoin::loadRightBlock(size_t pos) const
|
||||
|
||||
void MergeJoin::initRightTableWriter()
|
||||
{
|
||||
disk_writer = std::make_unique<SortedBlocksWriter>(size_limits, table_join->getTemporaryVolume(),
|
||||
disk_writer = std::make_unique<SortedBlocksWriter>(size_limits, table_join->getGlobalTemporaryVolume(),
|
||||
right_sample_block, right_sort_description, max_rows_in_right_block, max_files_to_merge,
|
||||
table_join->temporaryFilesCodec());
|
||||
disk_writer->addBlocks(right_blocks);
|
||||
|
@ -625,7 +625,7 @@ ProcessListForUser::ProcessListForUser(ContextPtr global_context, ProcessList *
|
||||
if (global_context)
|
||||
{
|
||||
size_t size_limit = global_context->getSettingsRef().max_temporary_data_on_disk_size_for_user;
|
||||
user_temp_data_on_disk = std::make_shared<TemporaryDataOnDiskScope>(global_context->getTempDataOnDisk(), size_limit);
|
||||
user_temp_data_on_disk = std::make_shared<TemporaryDataOnDiskScope>(global_context->getSharedTempDataOnDisk(), size_limit);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -209,7 +209,7 @@ public:
|
||||
JoinStrictness strictness() const { return table_join.strictness; }
|
||||
bool sameStrictnessAndKind(JoinStrictness, JoinKind) const;
|
||||
const SizeLimits & sizeLimits() const { return size_limits; }
|
||||
VolumePtr getTemporaryVolume() { return tmp_volume; }
|
||||
VolumePtr getGlobalTemporaryVolume() { return tmp_volume; }
|
||||
|
||||
bool isEnabledAlgorithm(JoinAlgorithm val) const
|
||||
{
|
||||
|
@ -994,7 +994,7 @@ JoinTreeQueryPlan buildQueryPlanForJoinNode(const QueryTreeNodePtr & join_table_
|
||||
}
|
||||
}
|
||||
|
||||
auto table_join = std::make_shared<TableJoin>(settings, query_context->getTemporaryVolume());
|
||||
auto table_join = std::make_shared<TableJoin>(settings, query_context->getGlobalTemporaryVolume());
|
||||
table_join->getTableJoin() = join_node.toASTTableJoin()->as<ASTTableJoin &>();
|
||||
table_join->getTableJoin().kind = join_kind;
|
||||
|
||||
|
@ -623,7 +623,7 @@ void HTTPHandler::processQuery(
|
||||
|
||||
if (buffer_until_eof)
|
||||
{
|
||||
const std::string tmp_path(server.context()->getTemporaryVolume()->getDisk()->getPath());
|
||||
const std::string tmp_path(server.context()->getGlobalTemporaryVolume()->getDisk()->getPath());
|
||||
const std::string tmp_path_template(fs::path(tmp_path) / "http_buffers/");
|
||||
|
||||
auto create_tmp_disk_buffer = [tmp_path_template] (const WriteBufferPtr &)
|
||||
|
@ -273,7 +273,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
|
||||
ctx->compression_codec = global_ctx->data->getCompressionCodecForPart(
|
||||
global_ctx->merge_list_element_ptr->total_size_bytes_compressed, global_ctx->new_data_part->ttl_infos, global_ctx->time_of_merge);
|
||||
|
||||
ctx->tmp_disk = global_ctx->context->getTemporaryVolume()->getDisk();
|
||||
ctx->tmp_disk = global_ctx->context->getGlobalTemporaryVolume()->getDisk();
|
||||
|
||||
switch (global_ctx->chosen_merge_algorithm)
|
||||
{
|
||||
|
@ -406,7 +406,7 @@ namespace
|
||||
|
||||
void StorageMemory::backupData(BackupEntriesCollector & backup_entries_collector, const String & data_path_in_backup, const std::optional<ASTs> & /* partitions */)
|
||||
{
|
||||
auto temp_disk = backup_entries_collector.getContext()->getTemporaryVolume()->getDisk(0);
|
||||
auto temp_disk = backup_entries_collector.getContext()->getGlobalTemporaryVolume()->getDisk(0);
|
||||
auto max_compress_block_size = backup_entries_collector.getContext()->getSettingsRef().max_compress_block_size;
|
||||
backup_entries_collector.addBackupEntries(std::make_shared<MemoryBackup>(
|
||||
backup_entries_collector.getContext(),
|
||||
@ -426,7 +426,7 @@ void StorageMemory::restoreDataFromBackup(RestorerFromBackup & restorer, const S
|
||||
if (!restorer.isNonEmptyTableAllowed() && total_size_bytes)
|
||||
RestorerFromBackup::throwTableIsNotEmpty(getStorageID());
|
||||
|
||||
auto temp_disk = restorer.getContext()->getTemporaryVolume()->getDisk(0);
|
||||
auto temp_disk = restorer.getContext()->getGlobalTemporaryVolume()->getDisk(0);
|
||||
|
||||
restorer.addDataRestoreTask(
|
||||
[storage = std::static_pointer_cast<StorageMemory>(shared_from_this()), backup, data_path_in_backup, temp_disk]
|
||||
|
Loading…
Reference in New Issue
Block a user