rework custom table's disk usage

This commit is contained in:
Sema Checherinda 2024-08-05 20:23:41 +02:00
parent 5d9d5bf919
commit 995187006a
14 changed files with 264 additions and 219 deletions

191
src/Disks/DiskFomAST.cpp Normal file
View File

@ -0,0 +1,191 @@
#include <Disks/DiskFomAST.h>
#include <Common/assert_cast.h>
#include <Common/filesystemHelpers.h>
#include <Disks/getDiskConfigurationFromAST.h>
#include <Disks/DiskSelector.h>
#include <Parsers/formatAST.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/isDiskFunction.h>
#include <Interpreters/Context.h>
#include <Parsers/IAST.h>
#include <Interpreters/InDepthNodeVisitor.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
extern const int UNKNOWN_DISK;
}
std::string getOrCreateCustomDisk(DiskConfigurationPtr config, const std::string & serialization, ContextPtr context, bool attach)
{
Poco::Util::AbstractConfiguration::Keys disk_settings_keys;
config->keys(disk_settings_keys);
// Check that no settings are defined when disk from the config is referred.
if (disk_settings_keys.empty())
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Disk function has no arguments. Invalid disk description.");
if (disk_settings_keys.size() == 1 && disk_settings_keys.front() == "name" && !attach)
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Disk function `{}` has to have the other arguments which describe the disk. Invalid disk description.",
serialization);
}
std::string disk_name;
if (config->has("name"))
{
disk_name = config->getString("name");
}
if (!disk_name.empty())
{
if (disk_name.starts_with(DiskSelector::CUSTOM_DISK_PREFIX))
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Disk name `{}` could not start with `{}`",
disk_name, DiskSelector::CUSTOM_DISK_PREFIX);
if (auto disk = context->tryGetDisk(disk_name))
{
/// the disk is defined by config
if (disk->isCustomDisk())
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Disk with name `{}` already exist as a custom disk but the name does not start with `{}`",
disk_name,
DiskSelector::CUSTOM_DISK_PREFIX);
throw Exception(ErrorCodes::BAD_ARGUMENTS, "The disk `{}` is already exist. It is impossible to redefine it.", disk_name);
}
}
auto disk_settings_hash = sipHash128(serialization.data(), serialization.size());
std::string custom_disk_name;
if (disk_name.empty())
{
/// We need a unique name for a created custom disk, but it needs to be the same
/// after table is reattached or server is restarted, so take a hash of the disk
/// configuration serialized ast as a disk name suffix.
custom_disk_name = toString(DiskSelector::CUSTOM_DISK_PREFIX) + "noname_" + toString(disk_settings_hash);
}
else
{
custom_disk_name = toString(DiskSelector::CUSTOM_DISK_PREFIX) + disk_name;
}
auto result_disk = context->getOrCreateDisk(custom_disk_name, [&](const DisksMap & disks_map) -> DiskPtr {
auto disk = DiskFactory::instance().create(
disk_name, *config, /* config_path */"", context, disks_map, /* attach */attach, /* custom_disk */true);
/// Mark that disk can be used without storage policy.
disk->markDiskAsCustom(disk_settings_hash);
return disk;
});
if (!result_disk->isCustomDisk())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Disk with name `{}` expected to be custom disk", disk_name);
if (result_disk->getCustomDiskSettings() != disk_settings_hash && !attach)
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"The disk `{}` is already configured as a custom disk in another table. It can't be redefined with different settings.",
disk_name);
if (!attach && !result_disk->isRemote())
{
static constexpr auto custom_local_disks_base_dir_in_config = "custom_local_disks_base_directory";
auto disk_path_expected_prefix = context->getConfigRef().getString(custom_local_disks_base_dir_in_config, "");
if (disk_path_expected_prefix.empty())
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Base path for custom local disks must be defined in config file by `{}`",
custom_local_disks_base_dir_in_config);
if (!pathStartsWith(result_disk->getPath(), disk_path_expected_prefix))
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Path of the custom local disk must be inside `{}` directory",
disk_path_expected_prefix);
}
return custom_disk_name;
}
class DiskConfigurationFlattener
{
public:
struct Data
{
ContextPtr context;
bool attach;
};
static bool needChildVisit(const ASTPtr &, const ASTPtr &) { return true; }
static void visit(ASTPtr & ast, Data & data)
{
if (isDiskFunction(ast))
{
const auto * function = ast->as<ASTFunction>();
const auto * function_args_expr = assert_cast<const ASTExpressionList *>(function->arguments.get());
const auto & function_args = function_args_expr->children;
auto config = getDiskConfigurationFromAST(function_args, data.context);
auto disk_setting_string = serializeAST(*function);
auto disk_name = getOrCreateCustomDisk(config, disk_setting_string, data.context, data.attach);
ast = std::make_shared<ASTLiteral>(disk_name);
}
}
};
std::string DiskFomAST::createCustomDisk(const ASTPtr & disk_function_ast, ContextPtr context, bool attach)
{
if (!isDiskFunction(disk_function_ast))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected a disk function");
auto ast = disk_function_ast->clone();
using FlattenDiskConfigurationVisitor = InDepthNodeVisitor<DiskConfigurationFlattener, false>;
FlattenDiskConfigurationVisitor::Data data{context, attach};
FlattenDiskConfigurationVisitor{data}.visit(ast);
auto disk_name = assert_cast<const ASTLiteral &>(*ast).value.get<String>();
return disk_name;
}
std::string DiskFomAST::getConfigDefinedDisk(const std::string &disk_name, ContextPtr context)
{
if (disk_name.starts_with(DiskSelector::CUSTOM_DISK_PREFIX))
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Disk name `{}` could not start with `{}`",
disk_name, DiskSelector::CUSTOM_DISK_PREFIX);
if (auto result = context->tryGetDisk(disk_name))
return disk_name;
std::string custom_disk_name = DiskSelector::CUSTOM_DISK_PREFIX + disk_name;
if (auto result = context->tryGetDisk(custom_disk_name))
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Disk name `{}` is a custom disk that is used in other table."
"That disk could not be used by a reference. The custom disk should be fully specified with a disk function.",
disk_name);
throw Exception(ErrorCodes::UNKNOWN_DISK, "Unknown disk {}", disk_name);
}
}

15
src/Disks/DiskFomAST.h Normal file
View File

@ -0,0 +1,15 @@
#pragma once
#include <string>
#include <Interpreters/Context_fwd.h>
#include <Parsers/IAST_fwd.h>
namespace DB
{
namespace DiskFomAST
{
std::string getConfigDefinedDisk(const std::string & name, ContextPtr context);
std::string createCustomDisk(const ASTPtr & disk_function, ContextPtr context, bool attach);
}
}

View File

@ -6,6 +6,8 @@
#include <Poco/Util/AbstractConfiguration.h>
#include <map>
#include <sstream>
#include <string_view>
namespace DB
{
@ -18,7 +20,7 @@ using DiskSelectorPtr = std::shared_ptr<const DiskSelector>;
class DiskSelector
{
public:
static constexpr auto TMP_INTERNAL_DISK_PREFIX = "__tmp_internal_";
static constexpr auto CUSTOM_DISK_PREFIX = "__";
explicit DiskSelector(std::unordered_set<String> skip_types_ = {}) : skip_types(skip_types_) { }
DiskSelector(const DiskSelector & from) = default;

View File

@ -464,9 +464,9 @@ public:
virtual void chmod(const String & /*path*/, mode_t /*mode*/) { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Disk does not support chmod"); }
/// Was disk created to be used without storage configuration?
bool isCustomDisk() const { return is_custom_disk; }
void markDiskAsCustom() { is_custom_disk = true; }
bool isCustomDisk() const { return custom_disk_settings_hash != 0; }
UInt128 getCustomDiskSettings() const { return custom_disk_settings_hash; }
void markDiskAsCustom(UInt128 settings_hash) { custom_disk_settings_hash = settings_hash; }
virtual DiskPtr getDelegateDiskIfExists() const { return nullptr; }
@ -504,7 +504,8 @@ protected:
private:
ThreadPool copying_thread_pool;
bool is_custom_disk = false;
// 0 means the disk is not custom, the disk is predefined in the config
UInt128 custom_disk_settings_hash = 0;
/// Check access to the disk.
void checkAccess();

View File

@ -13,6 +13,7 @@
#include <memory>
#include <mutex>
#include <string_view>
#include <unordered_map>
#include <unistd.h>
#include <boost/noncopyable.hpp>
@ -119,6 +120,7 @@ class StoragePolicySelector
{
public:
static constexpr auto TMP_STORAGE_POLICY_PREFIX = "__";
static_assert(std::string_view(DiskSelector::CUSTOM_DISK_PREFIX) == std::string_view(TMP_STORAGE_POLICY_PREFIX));
StoragePolicySelector(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, DiskSelectorPtr disks);

View File

@ -1,121 +0,0 @@
#include <Disks/getOrCreateDiskFromAST.h>
#include <Common/logger_useful.h>
#include <Common/assert_cast.h>
#include <Common/filesystemHelpers.h>
#include <Disks/getDiskConfigurationFromAST.h>
#include <Disks/DiskSelector.h>
#include <Parsers/formatAST.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/isDiskFunction.h>
#include <Interpreters/Context.h>
#include <Parsers/IAST.h>
#include <Interpreters/InDepthNodeVisitor.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
namespace
{
std::string getOrCreateDiskFromDiskAST(const ASTFunction & function, ContextPtr context, bool attach)
{
const auto * function_args_expr = assert_cast<const ASTExpressionList *>(function.arguments.get());
const auto & function_args = function_args_expr->children;
auto config = getDiskConfigurationFromAST(function_args, context);
std::string disk_name;
if (config->has("name"))
{
disk_name = config->getString("name");
}
else
{
/// We need a unique name for a created custom disk, but it needs to be the same
/// after table is reattached or server is restarted, so take a hash of the disk
/// configuration serialized ast as a disk name suffix.
auto disk_setting_string = serializeAST(function);
disk_name = DiskSelector::TMP_INTERNAL_DISK_PREFIX
+ toString(sipHash128(disk_setting_string.data(), disk_setting_string.size()));
}
auto result_disk = context->getOrCreateDisk(disk_name, [&](const DisksMap & disks_map) -> DiskPtr {
auto disk = DiskFactory::instance().create(
disk_name, *config, /* config_path */"", context, disks_map, /* attach */attach, /* custom_disk */true);
/// Mark that disk can be used without storage policy.
disk->markDiskAsCustom();
return disk;
});
if (!result_disk->isCustomDisk())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Disk with name `{}` already exist", disk_name);
if (!attach && !result_disk->isRemote())
{
static constexpr auto custom_local_disks_base_dir_in_config = "custom_local_disks_base_directory";
auto disk_path_expected_prefix = context->getConfigRef().getString(custom_local_disks_base_dir_in_config, "");
if (disk_path_expected_prefix.empty())
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Base path for custom local disks must be defined in config file by `{}`",
custom_local_disks_base_dir_in_config);
if (!pathStartsWith(result_disk->getPath(), disk_path_expected_prefix))
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Path of the custom local disk must be inside `{}` directory",
disk_path_expected_prefix);
}
return disk_name;
}
class DiskConfigurationFlattener
{
public:
struct Data
{
ContextPtr context;
bool attach;
};
static bool needChildVisit(const ASTPtr &, const ASTPtr &) { return true; }
static void visit(ASTPtr & ast, Data & data)
{
if (isDiskFunction(ast))
{
auto disk_name = getOrCreateDiskFromDiskAST(*ast->as<ASTFunction>(), data.context, data.attach);
ast = std::make_shared<ASTLiteral>(disk_name);
}
}
};
/// Visits children first.
using FlattenDiskConfigurationVisitor = InDepthNodeVisitor<DiskConfigurationFlattener, false>;
}
std::string getOrCreateDiskFromDiskAST(const ASTPtr & disk_function, ContextPtr context, bool attach)
{
if (!isDiskFunction(disk_function))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected a disk function");
auto ast = disk_function->clone();
FlattenDiskConfigurationVisitor::Data data{context, attach};
FlattenDiskConfigurationVisitor{data}.visit(ast);
auto disk_name = assert_cast<const ASTLiteral &>(*ast).value.get<String>();
LOG_TRACE(getLogger("getOrCreateDiskFromDiskAST"), "Result disk name: {}", disk_name);
return disk_name;
}
}

View File

@ -1,18 +0,0 @@
#pragma once
#include <string>
#include <Interpreters/Context_fwd.h>
#include <Parsers/IAST_fwd.h>
namespace DB
{
class ASTFunction;
/**
* Create a DiskPtr from disk AST function like disk(<disk_configuration>),
* add it to DiskSelector by a unique (but always the same for given configuration) disk name
* and return this name.
*/
std::string getOrCreateDiskFromDiskAST(const ASTPtr & disk_function, ContextPtr context, bool attach);
}

View File

@ -4,6 +4,7 @@
#include <memory>
#include <Poco/UUID.h>
#include <Poco/Util/Application.h>
#include "Common/Logger.h"
#include <Common/AsyncLoader.h>
#include <Common/PoolId.h>
#include <Common/SensitiveDataMasker.h>
@ -4395,6 +4396,15 @@ DiskPtr Context::getDisk(const String & name) const
return disk_selector->get(name);
}
DiskPtr Context::tryGetDisk(const String & name) const
{
std::lock_guard lock(shared->storage_policies_mutex);
auto disk_selector = getDiskSelector(lock);
return disk_selector->tryGet(name);
}
DiskPtr Context::getOrCreateDisk(const String & name, DiskCreator creator) const
{
std::lock_guard lock(shared->storage_policies_mutex);
@ -4422,9 +4432,11 @@ StoragePolicyPtr Context::getStoragePolicy(const String & name) const
StoragePolicyPtr Context::getStoragePolicyFromDisk(const String & disk_name) const
{
LOG_DEBUG(getLogger("StoragePolicy"), "getStoragePolicyFromDisk disk_name {}", disk_name);
std::lock_guard lock(shared->storage_policies_mutex);
const std::string storage_policy_name = StoragePolicySelector::TMP_STORAGE_POLICY_PREFIX + disk_name;
const std::string storage_policy_name = disk_name.starts_with(DiskSelector::CUSTOM_DISK_PREFIX) ? disk_name : StoragePolicySelector::TMP_STORAGE_POLICY_PREFIX + disk_name;
auto storage_policy_selector = getStoragePolicySelector(lock);
StoragePolicyPtr storage_policy = storage_policy_selector->tryGet(storage_policy_name);

View File

@ -1186,6 +1186,7 @@ public:
/// Provides storage disks
DiskPtr getDisk(const String & name) const;
DiskPtr tryGetDisk(const String & name) const;
using DiskCreator = std::function<DiskPtr(const DisksMap & disks_map)>;
DiskPtr getOrCreateDisk(const String & name, DiskCreator creator) const;

View File

@ -1,5 +1,4 @@
#include <Parsers/FieldFromAST.h>
#include <Disks/getOrCreateDiskFromAST.h>
#include <Parsers/formatAST.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>

View File

@ -1,6 +1,6 @@
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Disks/getOrCreateDiskFromAST.h>
#include <Disks/DiskFomAST.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <Parsers/ASTFunction.h>
@ -64,10 +64,14 @@ void MergeTreeSettings::loadFromQuery(ASTStorage & storage_def, ContextPtr conte
auto ast = dynamic_cast<const FieldFromASTImpl &>(custom.getImpl()).ast;
if (ast && isDiskFunction(ast))
{
auto disk_name = getOrCreateDiskFromDiskAST(ast, context, is_attach);
LOG_TRACE(getLogger("MergeTreeSettings"), "Created custom disk {}", disk_name);
auto disk_name = DiskFomAST::createCustomDisk(ast, context, is_attach);
LOG_DEBUG(getLogger("MergeTreeSettings"), "Created custom disk {}", disk_name);
value = disk_name;
}
else
{
value = DiskFomAST::getConfigDefinedDisk(value.safeGet<String>(), context);
}
}
if (has("storage_policy"))

View File

@ -1,12 +1,4 @@
<clickhouse>
<blob_storage_log>
<database>system</database>
<table>blob_storage_log</table>
<partition_by>toYYYYMM(event_date)</partition_by>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
<ttl>event_date + INTERVAL 30 DAY</ttl>
</blob_storage_log>
<storage_configuration>
<disks>
<disk1>

View File

@ -38,58 +38,3 @@ def test_storage_policy_configuration_change(started_cluster):
"/etc/clickhouse-server/config.d/disks.xml",
)
node.start_clickhouse()
def test_disk_is_immutable(started_cluster):
node.query("DROP TABLE IF EXISTS test_1")
node.query(
"""
create table test_1 (a Int32)
engine = MergeTree()
order by tuple()
settings
disk=disk(
name='not_uniq_disk_name',
type = object_storage,
object_storage_type = local_blob_storage,
path='./03215_data_test_1/')
"""
)
node.query("INSERT INTO test_1 VALUES (1)")
node.query("SYSTEM FLUSH LOGS;")
print(node.query("SELECT 'test_1', * FROM system.blob_storage_log"))
print(node.query("SELECT 'test_1', * FROM test_1"))
node.query("DROP TABLE test_1 SYNC")
node.query("DROP TABLE IF EXISTS test_2")
node.query(
"""
create table test_2 (a Int32)
engine = MergeTree()
order by tuple()
settings
disk=disk(
name='not_uniq_disk_name',
type = object_storage,
object_storage_type = local_blob_storage,
path='./03215_data_test_2/')
"""
)
node.query("INSERT INTO test_2 VALUES (1)")
node.query("SYSTEM FLUSH LOGS;")
print(node.query("SELECT 'test_2', * FROM system.blob_storage_log"))
print(node.query("SELECT 'test_2', * FROM test_2"))
node.restart_clickhouse()
print(node.query("SELECT 'test_2', * FROM system.blob_storage_log"))
print(node.query("SELECT 'test_2', * FROM test_2"))

View File

@ -2,13 +2,33 @@
drop table if exists test;
create table test (a Int32) engine = MergeTree() order by tuple()
settings disk=disk(name='test1', type = object_storage, object_storage_type = local_blob_storage, path='./02963_test1/');
settings disk=disk(name='02963_custom_disk', type = object_storage, object_storage_type = local_blob_storage, path='./02963_test1/');
drop table test;
drop table if exists test;
create table test (a Int32) engine = MergeTree() order by tuple()
settings disk=disk(name='02963_custom_disk', type = object_storage, object_storage_type = local_blob_storage, path='./02963_test2/'); -- { serverError BAD_ARGUMENTS }
drop table if exists test;
create table test (a Int32) engine = MergeTree() order by tuple()
settings disk=disk(name='02963_custom_disk'); -- { serverError BAD_ARGUMENTS }
drop table if exists test;
create table test (a Int32) engine = MergeTree() order by tuple()
settings disk='02963_custom_disk'; -- { serverError BAD_ARGUMENTS }
drop table if exists test;
create table test (a Int32) engine = MergeTree() order by tuple()
settings disk=disk(name='s3_disk_02963'); -- { serverError BAD_ARGUMENTS }
drop table if exists test;
create table test (a Int32) engine = MergeTree() order by tuple()
settings disk='s3_disk_02963';
drop table test;
drop table if exists test;
create table test (a Int32) engine = MergeTree() order by tuple()
settings disk=disk(name='s3_disk_02963', type = object_storage, object_storage_type = local_blob_storage, path='./02963_test2/'); -- { serverError BAD_ARGUMENTS }
drop table if exists test;
create table test (a Int32) engine = MergeTree() order by tuple()
settings disk=disk(name='test1',
type = object_storage,
@ -17,7 +37,7 @@ settings disk=disk(name='test1',
access_key_id = clickhouse,
secret_access_key = clickhouse);
drop table test;
drop table if exists test;
create table test (a Int32) engine = MergeTree() order by tuple()
settings disk=disk(name='test2',
type = object_storage,
@ -27,7 +47,7 @@ settings disk=disk(name='test2',
access_key_id = clickhouse,
secret_access_key = clickhouse);
drop table test;
drop table if exists test;
create table test (a Int32) engine = MergeTree() order by tuple()
settings disk=disk(name='test3',
type = object_storage,
@ -37,8 +57,8 @@ settings disk=disk(name='test3',
endpoint = 'http://localhost:11111/test/common/',
access_key_id = clickhouse,
secret_access_key = clickhouse);
drop table test;
drop table if exists test;
create table test (a Int32) engine = MergeTree() order by tuple()
settings disk=disk(name='test4',
type = object_storage,
@ -48,8 +68,8 @@ settings disk=disk(name='test4',
endpoint = 'http://localhost:11111/test/common/',
access_key_id = clickhouse,
secret_access_key = clickhouse);
drop table test;
drop table if exists test;
create table test (a Int32) engine = MergeTree() order by tuple()
settings disk=disk(name='test5',
type = object_storage,