Merge pull request #67684 from ClickHouse/chesema-rewrite-storage-policy

rework usage of custom table's disks
This commit is contained in:
Sema Checherinda 2024-08-09 12:36:29 +00:00 committed by GitHub
commit 1e67b46b57
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 219 additions and 163 deletions

150
src/Disks/DiskFomAST.cpp Normal file
View File

@ -0,0 +1,150 @@
#include <Disks/DiskFomAST.h>
#include <Common/assert_cast.h>
#include <Common/filesystemHelpers.h>
#include <Disks/getDiskConfigurationFromAST.h>
#include <Disks/DiskSelector.h>
#include <Parsers/formatAST.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/isDiskFunction.h>
#include <Interpreters/Context.h>
#include <Parsers/IAST.h>
#include <Interpreters/InDepthNodeVisitor.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
std::string getOrCreateCustomDisk(DiskConfigurationPtr config, const std::string & serialization, ContextPtr context, bool attach)
{
Poco::Util::AbstractConfiguration::Keys disk_settings_keys;
config->keys(disk_settings_keys);
/// Check that no settings are defined when disk from the config is referred.
if (disk_settings_keys.empty())
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Disk function must have arguments. Invalid disk description.");
if (disk_settings_keys.size() == 1 && disk_settings_keys.front() == "name" && !attach)
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Disk function `{}` must have other arguments apart from `name`, which describe disk configuration. Invalid disk description.",
serialization);
auto disk_settings_hash = sipHash128(serialization.data(), serialization.size());
std::string disk_name;
if (config->has("name"))
{
disk_name = config->getString("name");
}
else
{
/// We need a unique name for a created custom disk, but it needs to be the same
/// after table is reattached or server is restarted, so take a hash of the disk
/// configuration serialized ast as a disk name suffix.
disk_name = DiskSelector::TMP_INTERNAL_DISK_PREFIX + toString(disk_settings_hash);
}
auto disk = context->getOrCreateDisk(disk_name, [&](const DisksMap & disks_map) -> DiskPtr {
auto result = DiskFactory::instance().create(
disk_name, *config, /* config_path */"", context, disks_map, /* attach */attach, /* custom_disk */true);
/// Mark that disk can be used without storage policy.
result->markDiskAsCustom(disk_settings_hash);
return result;
});
if (!disk->isCustomDisk())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Disk `{}` already exists and is described by the config."
" It is impossible to redefine it.",
disk_name);
if (disk->getCustomDiskSettings() != disk_settings_hash && !attach)
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"The disk `{}` is already configured as a custom disk in another table. It can't be redefined with different settings.",
disk_name);
if (!attach && !disk->isRemote())
{
static constexpr auto custom_local_disks_base_dir_in_config = "custom_local_disks_base_directory";
auto disk_path_expected_prefix = context->getConfigRef().getString(custom_local_disks_base_dir_in_config, "");
if (disk_path_expected_prefix.empty())
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Base path for custom local disks must be defined in config file by `{}`",
custom_local_disks_base_dir_in_config);
if (!pathStartsWith(disk->getPath(), disk_path_expected_prefix))
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Path of the custom local disk must be inside `{}` directory",
disk_path_expected_prefix);
}
return disk_name;
}
class DiskConfigurationFlattener
{
public:
struct Data
{
ContextPtr context;
bool attach;
};
static bool needChildVisit(const ASTPtr &, const ASTPtr &) { return true; }
static void visit(ASTPtr & ast, Data & data)
{
if (isDiskFunction(ast))
{
const auto * function = ast->as<ASTFunction>();
const auto * function_args_expr = assert_cast<const ASTExpressionList *>(function->arguments.get());
const auto & function_args = function_args_expr->children;
auto config = getDiskConfigurationFromAST(function_args, data.context);
auto disk_setting_string = serializeAST(*function);
auto disk_name = getOrCreateCustomDisk(config, disk_setting_string, data.context, data.attach);
ast = std::make_shared<ASTLiteral>(disk_name);
}
}
};
std::string DiskFomAST::createCustomDisk(const ASTPtr & disk_function_ast, ContextPtr context, bool attach)
{
if (!isDiskFunction(disk_function_ast))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected a disk function");
auto ast = disk_function_ast->clone();
using FlattenDiskConfigurationVisitor = InDepthNodeVisitor<DiskConfigurationFlattener, false>;
FlattenDiskConfigurationVisitor::Data data{context, attach};
FlattenDiskConfigurationVisitor{data}.visit(ast);
return assert_cast<const ASTLiteral &>(*ast).value.get<String>();
}
void DiskFomAST::ensureDiskIsNotCustom(const std::string & disk_name, ContextPtr context)
{
auto disk = context->getDisk(disk_name);
if (disk->isCustomDisk())
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Disk name `{}` is a custom disk that is used in other table. "
"That disk could not be used by a reference by other tables. The custom disk should be fully specified with a disk function.",
disk_name);
}
}

15
src/Disks/DiskFomAST.h Normal file
View File

@ -0,0 +1,15 @@
#pragma once
#include <string>
#include <Interpreters/Context_fwd.h>
#include <Parsers/IAST_fwd.h>
namespace DB
{
namespace DiskFomAST
{
void ensureDiskIsNotCustom(const std::string & name, ContextPtr context);
std::string createCustomDisk(const ASTPtr & disk_function, ContextPtr context, bool attach);
}
}

View File

@ -6,6 +6,8 @@
#include <Poco/Util/AbstractConfiguration.h>
#include <map>
#include <sstream>
#include <string_view>
namespace DB
{

View File

@ -464,9 +464,9 @@ public:
virtual void chmod(const String & /*path*/, mode_t /*mode*/) { throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Disk does not support chmod"); }
/// Was disk created to be used without storage configuration?
bool isCustomDisk() const { return is_custom_disk; }
void markDiskAsCustom() { is_custom_disk = true; }
bool isCustomDisk() const { return custom_disk_settings_hash != 0; }
UInt128 getCustomDiskSettings() const { return custom_disk_settings_hash; }
void markDiskAsCustom(UInt128 settings_hash) { custom_disk_settings_hash = settings_hash; }
virtual DiskPtr getDelegateDiskIfExists() const { return nullptr; }
@ -504,7 +504,8 @@ protected:
private:
ThreadPool copying_thread_pool;
bool is_custom_disk = false;
// 0 means the disk is not custom, the disk is predefined in the config
UInt128 custom_disk_settings_hash = 0;
/// Check access to the disk.
void checkAccess();

View File

@ -12,7 +12,6 @@
#include <Common/formatReadable.h>
#include <memory>
#include <mutex>
#include <unordered_map>
#include <unistd.h>
#include <boost/noncopyable.hpp>

View File

@ -1,121 +0,0 @@
#include <Disks/getOrCreateDiskFromAST.h>
#include <Common/logger_useful.h>
#include <Common/assert_cast.h>
#include <Common/filesystemHelpers.h>
#include <Disks/getDiskConfigurationFromAST.h>
#include <Disks/DiskSelector.h>
#include <Parsers/formatAST.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/isDiskFunction.h>
#include <Interpreters/Context.h>
#include <Parsers/IAST.h>
#include <Interpreters/InDepthNodeVisitor.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
namespace
{
std::string getOrCreateDiskFromDiskAST(const ASTFunction & function, ContextPtr context, bool attach)
{
const auto * function_args_expr = assert_cast<const ASTExpressionList *>(function.arguments.get());
const auto & function_args = function_args_expr->children;
auto config = getDiskConfigurationFromAST(function_args, context);
std::string disk_name;
if (config->has("name"))
{
disk_name = config->getString("name");
}
else
{
/// We need a unique name for a created custom disk, but it needs to be the same
/// after table is reattached or server is restarted, so take a hash of the disk
/// configuration serialized ast as a disk name suffix.
auto disk_setting_string = serializeAST(function);
disk_name = DiskSelector::TMP_INTERNAL_DISK_PREFIX
+ toString(sipHash128(disk_setting_string.data(), disk_setting_string.size()));
}
auto result_disk = context->getOrCreateDisk(disk_name, [&](const DisksMap & disks_map) -> DiskPtr {
auto disk = DiskFactory::instance().create(
disk_name, *config, /* config_path */"", context, disks_map, /* attach */attach, /* custom_disk */true);
/// Mark that disk can be used without storage policy.
disk->markDiskAsCustom();
return disk;
});
if (!result_disk->isCustomDisk())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Disk with name `{}` already exist", disk_name);
if (!attach && !result_disk->isRemote())
{
static constexpr auto custom_local_disks_base_dir_in_config = "custom_local_disks_base_directory";
auto disk_path_expected_prefix = context->getConfigRef().getString(custom_local_disks_base_dir_in_config, "");
if (disk_path_expected_prefix.empty())
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Base path for custom local disks must be defined in config file by `{}`",
custom_local_disks_base_dir_in_config);
if (!pathStartsWith(result_disk->getPath(), disk_path_expected_prefix))
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Path of the custom local disk must be inside `{}` directory",
disk_path_expected_prefix);
}
return disk_name;
}
class DiskConfigurationFlattener
{
public:
struct Data
{
ContextPtr context;
bool attach;
};
static bool needChildVisit(const ASTPtr &, const ASTPtr &) { return true; }
static void visit(ASTPtr & ast, Data & data)
{
if (isDiskFunction(ast))
{
auto disk_name = getOrCreateDiskFromDiskAST(*ast->as<ASTFunction>(), data.context, data.attach);
ast = std::make_shared<ASTLiteral>(disk_name);
}
}
};
/// Visits children first.
using FlattenDiskConfigurationVisitor = InDepthNodeVisitor<DiskConfigurationFlattener, false>;
}
std::string getOrCreateDiskFromDiskAST(const ASTPtr & disk_function, ContextPtr context, bool attach)
{
if (!isDiskFunction(disk_function))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected a disk function");
auto ast = disk_function->clone();
FlattenDiskConfigurationVisitor::Data data{context, attach};
FlattenDiskConfigurationVisitor{data}.visit(ast);
auto disk_name = assert_cast<const ASTLiteral &>(*ast).value.get<String>();
LOG_TRACE(getLogger("getOrCreateDiskFromDiskAST"), "Result disk name: {}", disk_name);
return disk_name;
}
}

View File

@ -1,18 +0,0 @@
#pragma once
#include <string>
#include <Interpreters/Context_fwd.h>
#include <Parsers/IAST_fwd.h>
namespace DB
{
class ASTFunction;
/**
* Create a DiskPtr from disk AST function like disk(<disk_configuration>),
* add it to DiskSelector by a unique (but always the same for given configuration) disk name
* and return this name.
*/
std::string getOrCreateDiskFromDiskAST(const ASTPtr & disk_function, ContextPtr context, bool attach);
}

View File

@ -1,5 +1,4 @@
#include <Parsers/FieldFromAST.h>
#include <Disks/getOrCreateDiskFromAST.h>
#include <Parsers/formatAST.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>

View File

@ -1,6 +1,6 @@
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Disks/getOrCreateDiskFromAST.h>
#include <Disks/DiskFomAST.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <Parsers/ASTFunction.h>
@ -59,15 +59,19 @@ void MergeTreeSettings::loadFromQuery(ASTStorage & storage_def, ContextPtr conte
CustomType custom;
if (name == "disk")
{
ASTPtr value_as_custom_ast = nullptr;
if (value.tryGet<CustomType>(custom) && 0 == strcmp(custom.getTypeName(), "AST"))
value_as_custom_ast = dynamic_cast<const FieldFromASTImpl &>(custom.getImpl()).ast;
if (value_as_custom_ast && isDiskFunction(value_as_custom_ast))
{
auto ast = dynamic_cast<const FieldFromASTImpl &>(custom.getImpl()).ast;
if (ast && isDiskFunction(ast))
{
auto disk_name = getOrCreateDiskFromDiskAST(ast, context, is_attach);
LOG_TRACE(getLogger("MergeTreeSettings"), "Created custom disk {}", disk_name);
value = disk_name;
}
auto disk_name = DiskFomAST::createCustomDisk(value_as_custom_ast, context, is_attach);
LOG_DEBUG(getLogger("MergeTreeSettings"), "Created custom disk {}", disk_name);
value = disk_name;
}
else
{
DiskFomAST::ensureDiskIsNotCustom(value.safeGet<String>(), context);
}
if (has("storage_policy"))

View File

@ -373,7 +373,7 @@ def test_merge_tree_setting_override(start_cluster):
CREATE TABLE {TABLE_NAME} (a Int32)
ENGINE = MergeTree()
ORDER BY tuple()
SETTINGS disk = 'kek', storage_policy = 's3';
SETTINGS disk = 's3', storage_policy = 's3';
"""
)
)

View File

@ -13,7 +13,7 @@ DROP TABLE IF EXISTS test;
CREATE TABLE test (a Int32, b String)
ENGINE = MergeTree() ORDER BY tuple()
SETTINGS disk = disk(name = 's3_disk', type = cache, max_size = '100Ki', path = ${CLICKHOUSE_TEST_UNIQUE_NAME}, disk = s3_disk);
""" 2>&1 | grep -q "Disk with name \`s3_disk\` already exist" && echo 'OK' || echo 'FAIL'
""" 2>&1 | grep -q "Disk \`s3_disk\` already exists and is described by the config" && echo 'OK' || echo 'FAIL'
disk_name="${CLICKHOUSE_TEST_UNIQUE_NAME}"

View File

@ -2,13 +2,33 @@
drop table if exists test;
create table test (a Int32) engine = MergeTree() order by tuple()
settings disk=disk(name='test1', type = object_storage, object_storage_type = local_blob_storage, path='./02963_test1/');
settings disk=disk(name='02963_custom_disk', type = object_storage, object_storage_type = local_blob_storage, path='./02963_test1/');
drop table test;
drop table if exists test;
create table test (a Int32) engine = MergeTree() order by tuple()
settings disk=disk(name='02963_custom_disk', type = object_storage, object_storage_type = local_blob_storage, path='./02963_test2/'); -- { serverError BAD_ARGUMENTS }
drop table if exists test;
create table test (a Int32) engine = MergeTree() order by tuple()
settings disk=disk(name='02963_custom_disk'); -- { serverError BAD_ARGUMENTS }
drop table if exists test;
create table test (a Int32) engine = MergeTree() order by tuple()
settings disk='02963_custom_disk'; -- { serverError BAD_ARGUMENTS }
drop table if exists test;
create table test (a Int32) engine = MergeTree() order by tuple()
settings disk=disk(name='s3_disk_02963'); -- { serverError BAD_ARGUMENTS }
drop table if exists test;
create table test (a Int32) engine = MergeTree() order by tuple()
settings disk='s3_disk_02963';
drop table test;
drop table if exists test;
create table test (a Int32) engine = MergeTree() order by tuple()
settings disk=disk(name='s3_disk_02963', type = object_storage, object_storage_type = local_blob_storage, path='./02963_test2/'); -- { serverError BAD_ARGUMENTS }
drop table if exists test;
create table test (a Int32) engine = MergeTree() order by tuple()
settings disk=disk(name='test1',
type = object_storage,
@ -17,7 +37,7 @@ settings disk=disk(name='test1',
access_key_id = clickhouse,
secret_access_key = clickhouse);
drop table test;
drop table if exists test;
create table test (a Int32) engine = MergeTree() order by tuple()
settings disk=disk(name='test2',
type = object_storage,
@ -27,7 +47,7 @@ settings disk=disk(name='test2',
access_key_id = clickhouse,
secret_access_key = clickhouse);
drop table test;
drop table if exists test;
create table test (a Int32) engine = MergeTree() order by tuple()
settings disk=disk(name='test3',
type = object_storage,
@ -37,8 +57,8 @@ settings disk=disk(name='test3',
endpoint = 'http://localhost:11111/test/common/',
access_key_id = clickhouse,
secret_access_key = clickhouse);
drop table test;
drop table if exists test;
create table test (a Int32) engine = MergeTree() order by tuple()
settings disk=disk(name='test4',
type = object_storage,
@ -48,8 +68,8 @@ settings disk=disk(name='test4',
endpoint = 'http://localhost:11111/test/common/',
access_key_id = clickhouse,
secret_access_key = clickhouse);
drop table test;
drop table if exists test;
create table test (a Int32) engine = MergeTree() order by tuple()
settings disk=disk(name='test5',
type = object_storage,

View File

@ -46,7 +46,12 @@ ${CLICKHOUSE_CLIENT} --query "drop table if exists test_s3_mt_dst"
${CLICKHOUSE_CLIENT} -m --query "
create table test_s3_mt_dst (a Int32, b Int64, c Int64) engine = MergeTree() partition by intDiv(a, 1000) order by tuple(a, b)
settings disk = '03008_s3_plain_rewritable'
settings disk = disk(
name = 03008_s3_plain_rewritable,
type = s3_plain_rewritable,
endpoint = 'http://localhost:11111/test/03008_test_s3_mt/',
access_key_id = clickhouse,
secret_access_key = clickhouse);
"
${CLICKHOUSE_CLIENT} -m --query "