Merge pull request #44204 from kssenii/use-new-named-collections-code

Apply new code of named collections (from #43147) to external table engines part 1
This commit is contained in:
Kseniia Sumarokova 2022-12-16 10:09:01 +01:00 committed by GitHub
commit 8e1e431316
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 286 additions and 72 deletions

View File

@ -37,7 +37,7 @@
#include <AggregateFunctions/registerAggregateFunctions.h>
#include <TableFunctions/registerTableFunctions.h>
#include <Storages/registerStorages.h>
#include <Storages/NamedCollectionUtils.h>
#include <Storages/NamedCollections/NamedCollectionUtils.h>
#include <Dictionaries/registerDictionaries.h>
#include <Disks/registerDisks.h>
#include <Formats/registerFormats.h>

View File

@ -60,7 +60,7 @@
#include <Storages/System/attachInformationSchemaTables.h>
#include <Storages/Cache/ExternalDataSourceCache.h>
#include <Storages/Cache/registerRemoteFileMetadatas.h>
#include <Storages/NamedCollectionUtils.h>
#include <Storages/NamedCollections/NamedCollectionUtils.h>
#include <AggregateFunctions/registerAggregateFunctions.h>
#include <Functions/UserDefined/IUserDefinedSQLObjectsLoader.h>
#include <Functions/registerFunctions.h>

View File

@ -106,6 +106,7 @@ if (TARGET ch_contrib::nats_io)
endif()
add_headers_and_sources(dbms Storages/MeiliSearch)
add_headers_and_sources(dbms Storages/NamedCollections)
if (TARGET ch_contrib::amqp_cpp)
add_headers_and_sources(dbms Storages/RabbitMQ)

View File

@ -3,7 +3,7 @@
#include <Access/ContextAccess.h>
#include <Interpreters/Context.h>
#include <Interpreters/executeDDLQueryOnCluster.h>
#include <Storages/NamedCollectionUtils.h>
#include <Storages/NamedCollections/NamedCollectionUtils.h>
namespace DB

View File

@ -4,7 +4,7 @@
#include <Access/ContextAccess.h>
#include <Interpreters/Context.h>
#include <Interpreters/executeDDLQueryOnCluster.h>
#include <Storages/NamedCollectionUtils.h>
#include <Storages/NamedCollections/NamedCollectionUtils.h>
namespace DB

View File

@ -3,7 +3,7 @@
#include <Access/ContextAccess.h>
#include <Interpreters/Context.h>
#include <Interpreters/executeDDLQueryOnCluster.h>
#include <Storages/NamedCollectionUtils.h>
#include <Storages/NamedCollections/NamedCollectionUtils.h>
namespace DB

View File

@ -1,4 +1,4 @@
#include <Storages/NamedCollectionConfiguration.h>
#include <Storages/NamedCollections/NamedCollectionConfiguration.h>
#include <Poco/Util/XMLConfiguration.h>
#include <Common/Exception.h>
#include <Common/SettingsChanges.h>
@ -35,19 +35,30 @@ template <typename T> T getConfigValueOrDefault(
return *default_value;
}
if constexpr (std::is_same_v<T, String>)
return config.getString(path);
else if constexpr (std::is_same_v<T, UInt64>)
return config.getUInt64(path);
else if constexpr (std::is_same_v<T, Int64>)
return config.getInt64(path);
else if constexpr (std::is_same_v<T, Float64>)
return config.getDouble(path);
else
try
{
if constexpr (std::is_same_v<T, String>)
return config.getString(path);
else if constexpr (std::is_same_v<T, UInt64>)
return config.getUInt64(path);
else if constexpr (std::is_same_v<T, Int64>)
return config.getInt64(path);
else if constexpr (std::is_same_v<T, Float64>)
return config.getDouble(path);
else
throw Exception(
ErrorCodes::NOT_IMPLEMENTED,
"Unsupported type in getConfigValueOrDefault(). "
"Supported types are String, UInt64, Int64, Float64");
}
catch (const Poco::SyntaxException &)
{
throw Exception(
ErrorCodes::NOT_IMPLEMENTED,
"Unsupported type in getConfigValueOrDefault(). "
"Supported types are String, UInt64, Int64, Float64");
ErrorCodes::BAD_ARGUMENTS,
"Cannot extract {} from {}",
toString(magic_enum::enum_name(Field::TypeToEnum<NearestFieldType<T>>::value)),
path);
}
}
template<typename T> void setConfigValue(

View File

@ -1,4 +1,4 @@
#include <Storages/NamedCollectionUtils.h>
#include <Storages/NamedCollections/NamedCollectionUtils.h>
#include <Common/escapeForFileName.h>
#include <Common/FieldVisitorToString.h>
#include <Common/logger_useful.h>
@ -13,8 +13,8 @@
#include <Parsers/parseQuery.h>
#include <Parsers/ParserCreateQuery.h>
#include <Interpreters/Context.h>
#include <Storages/NamedCollections.h>
#include <Storages/NamedCollectionConfiguration.h>
#include <Storages/NamedCollections/NamedCollections.h>
#include <Storages/NamedCollections/NamedCollectionConfiguration.h>
namespace fs = std::filesystem;
@ -78,7 +78,7 @@ public:
/// (`enumerate_result` == <collection_path>).
const bool collection_is_empty = enumerate_result.size() == 1
&& *enumerate_result.begin() == collection_prefix;
std::set<std::string> keys;
std::set<std::string, std::less<>> keys;
if (!collection_is_empty)
{
/// Skip collection prefix and add +1 to avoid '.' in the beginning.
@ -296,7 +296,7 @@ private:
const auto config = NamedCollectionConfiguration::createConfiguration(
collection_name, query.changes);
std::set<std::string> keys;
std::set<std::string, std::less<>> keys;
for (const auto & [name, _] : query.changes)
keys.insert(name);

View File

@ -3,8 +3,8 @@
#include <Interpreters/Context.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <Storages/NamedCollectionConfiguration.h>
#include <Storages/NamedCollectionUtils.h>
#include <Storages/NamedCollections/NamedCollectionConfiguration.h>
#include <Storages/NamedCollections/NamedCollectionUtils.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <ranges>
@ -234,6 +234,16 @@ public:
return keys;
}
Keys::const_iterator begin() const
{
return keys.begin();
}
Keys::const_iterator end() const
{
return keys.end();
}
std::string dumpStructure() const
{
/// Convert a collection config like
@ -375,6 +385,22 @@ NamedCollection::Keys NamedCollection::getKeys() const
return pimpl->getKeys();
}
template <bool Locked> NamedCollection::const_iterator NamedCollection::begin() const
{
std::unique_lock lock(mutex, std::defer_lock);
if constexpr (!Locked)
lock.lock();
return pimpl->begin();
}
template <bool Locked> NamedCollection::const_iterator NamedCollection::end() const
{
std::unique_lock lock(mutex, std::defer_lock);
if constexpr (!Locked)
lock.lock();
return pimpl->end();
}
std::string NamedCollection::dumpStructure() const
{
std::lock_guard lock(mutex);
@ -417,4 +443,8 @@ template void NamedCollection::setOrUpdate<Float64, false>(const NamedCollection
template void NamedCollection::remove<true>(const Key & key);
template void NamedCollection::remove<false>(const Key & key);
template NamedCollection::const_iterator NamedCollection::begin<true>() const;
template NamedCollection::const_iterator NamedCollection::begin<false>() const;
template NamedCollection::const_iterator NamedCollection::end<true>() const;
template NamedCollection::const_iterator NamedCollection::end<false>() const;
}

View File

@ -1,7 +1,7 @@
#pragma once
#include <Interpreters/Context.h>
#include <Storages/NamedCollections_fwd.h>
#include <Storages/NamedCollectionUtils.h>
#include <Storages/NamedCollections/NamedCollections_fwd.h>
#include <Storages/NamedCollections/NamedCollectionUtils.h>
namespace Poco { namespace Util { class AbstractConfiguration; } }
@ -22,7 +22,7 @@ class NamedCollection
{
public:
using Key = std::string;
using Keys = std::set<Key>;
using Keys = std::set<Key, std::less<>>;
using SourceId = NamedCollectionUtils::SourceId;
static MutableNamedCollectionPtr create(
@ -49,6 +49,13 @@ public:
Keys getKeys() const;
using iterator = typename Keys::iterator;
using const_iterator = typename Keys::const_iterator;
template <bool locked = false> const_iterator begin() const;
template <bool locked = false> const_iterator end() const;
std::string dumpStructure() const;
bool isMutable() const { return is_mutable; }

View File

@ -0,0 +1,112 @@
#include "NamedCollectionsHelpers.h"
#include <Storages/NamedCollections/NamedCollections.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Storages/checkAndGetLiteralArgument.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTFunction.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
namespace
{
NamedCollectionPtr tryGetNamedCollectionFromASTs(ASTs asts)
{
if (asts.empty())
return nullptr;
const auto * identifier = asts[0]->as<ASTIdentifier>();
if (!identifier)
return nullptr;
const auto & collection_name = identifier->name();
return NamedCollectionFactory::instance().tryGet(collection_name);
}
std::optional<std::pair<std::string, Field>> getKeyValueFromAST(ASTPtr ast)
{
const auto * function = ast->as<ASTFunction>();
if (!function || function->name != "equals")
return std::nullopt;
const auto * function_args_expr = assert_cast<const ASTExpressionList *>(function->arguments.get());
const auto & function_args = function_args_expr->children;
if (function_args.size() != 2)
return std::nullopt;
auto literal_key = evaluateConstantExpressionOrIdentifierAsLiteral(
function_args[0], Context::getGlobalContextInstance());
auto key = checkAndGetLiteralArgument<String>(literal_key, "key");
auto literal_value = evaluateConstantExpressionOrIdentifierAsLiteral(
function_args[1], Context::getGlobalContextInstance());
auto value = literal_value->as<ASTLiteral>()->value;
return std::pair{key, value};
}
}
NamedCollectionPtr tryGetNamedCollectionWithOverrides(ASTs asts)
{
if (asts.empty())
return nullptr;
auto collection = tryGetNamedCollectionFromASTs(asts);
if (!collection)
return nullptr;
if (asts.size() == 1)
return collection;
auto collection_copy = collection->duplicate();
for (const auto & ast : asts)
{
auto value_override = getKeyValueFromAST(ast);
if (!value_override)
continue;
const auto & [key, value] = *value_override;
collection_copy->setOrUpdate<String>(key, toString(value));
}
return collection_copy;
}
void validateNamedCollection(
const NamedCollection & collection,
const std::unordered_set<std::string_view> & required_keys,
const std::unordered_set<std::string_view> & optional_keys)
{
const auto & keys = collection.getKeys();
for (const auto & key : keys)
{
if (!required_keys.contains(key) && !optional_keys.contains(key))
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Unexpected key `{}` in named collection. Required keys: {}, optional keys: {}",
key, fmt::join(required_keys, ", "), fmt::join(optional_keys, ", "));
}
}
for (const auto & key : required_keys)
{
if (!keys.contains(key))
{
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Key `{}` is required, but not specified. Required keys: {}, optional keys: {}",
key, fmt::join(required_keys, ", "), fmt::join(optional_keys, ", "));
}
}
}
}

View File

@ -0,0 +1,18 @@
#pragma once
#include <Parsers/IAST_fwd.h>
#include <Storages/NamedCollections/NamedCollections_fwd.h>
#include <unordered_set>
#include <string_view>
namespace DB
{
NamedCollectionPtr tryGetNamedCollectionWithOverrides(ASTs asts);
void validateNamedCollection(
const NamedCollection & collection,
const std::unordered_set<std::string_view> & required_keys,
const std::unordered_set<std::string_view> & optional_keys);
}

View File

@ -28,6 +28,8 @@
#include <Storages/getVirtualsForStorage.h>
#include <Storages/checkAndGetLiteralArgument.h>
#include <Storages/StorageURL.h>
#include <Storages/NamedCollections/NamedCollectionsHelpers.h>
#include <Storages/NamedCollections/NamedCollections.h>
#include <Storages/ReadFromStorageProgress.h>
#include <IO/ReadBufferFromS3.h>
@ -64,8 +66,6 @@
namespace fs = std::filesystem;
static const String PARTITION_ID_WILDCARD = "{_partition_id}";
namespace ProfileEvents
{
extern const Event S3DeleteObjects;
@ -75,6 +75,28 @@ namespace ProfileEvents
namespace DB
{
static const String PARTITION_ID_WILDCARD = "{_partition_id}";
static const std::unordered_set<std::string_view> required_configuration_keys = {
"url",
};
static std::unordered_set<std::string_view> optional_configuration_keys = {
"format",
"compression",
"compression_method",
"structure",
"access_key_id",
"secret_access_key",
"filename",
"use_environment_credentials",
"max_single_read_retries",
"min_upload_part_size",
"upload_part_size_multiply_factor",
"upload_part_size_multiply_parts_count_threshold",
"max_single_part_upload_size",
"max_connections",
};
namespace ErrorCodes
{
extern const int CANNOT_PARSE_TEXT;
@ -1175,48 +1197,60 @@ void StorageS3::updateS3Configuration(ContextPtr ctx, StorageS3::S3Configuration
upd.auth_settings.use_insecure_imds_request.value_or(ctx->getConfigRef().getBool("s3.use_insecure_imds_request", false)));
}
void StorageS3::processNamedCollectionResult(StorageS3Configuration & configuration, const std::vector<std::pair<String, ASTPtr>> & key_value_args)
void StorageS3::processNamedCollectionResult(StorageS3Configuration & configuration, const NamedCollection & collection)
{
for (const auto & [arg_name, arg_value] : key_value_args)
{
if (arg_name == "access_key_id")
configuration.auth_settings.access_key_id = checkAndGetLiteralArgument<String>(arg_value, "access_key_id");
else if (arg_name == "secret_access_key")
configuration.auth_settings.secret_access_key = checkAndGetLiteralArgument<String>(arg_value, "secret_access_key");
else if (arg_name == "filename")
configuration.url = std::filesystem::path(configuration.url) / checkAndGetLiteralArgument<String>(arg_value, "filename");
else if (arg_name == "use_environment_credentials")
configuration.auth_settings.use_environment_credentials = checkAndGetLiteralArgument<UInt8>(arg_value, "use_environment_credentials");
else if (arg_name == "max_single_read_retries")
configuration.request_settings.max_single_read_retries = checkAndGetLiteralArgument<UInt64>(arg_value, "max_single_read_retries");
else if (arg_name == "min_upload_part_size")
configuration.request_settings.min_upload_part_size = checkAndGetLiteralArgument<UInt64>(arg_value, "min_upload_part_size");
else if (arg_name == "upload_part_size_multiply_factor")
configuration.request_settings.upload_part_size_multiply_factor = checkAndGetLiteralArgument<UInt64>(arg_value, "upload_part_size_multiply_factor");
else if (arg_name == "upload_part_size_multiply_parts_count_threshold")
configuration.request_settings.upload_part_size_multiply_parts_count_threshold = checkAndGetLiteralArgument<UInt64>(arg_value, "upload_part_size_multiply_parts_count_threshold");
else if (arg_name == "max_single_part_upload_size")
configuration.request_settings.max_single_part_upload_size = checkAndGetLiteralArgument<UInt64>(arg_value, "max_single_part_upload_size");
else if (arg_name == "max_connections")
configuration.request_settings.max_connections = checkAndGetLiteralArgument<UInt64>(arg_value, "max_connections");
else
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Unknown key-value argument `{}` for StorageS3, expected: url, [access_key_id, secret_access_key], name of used format and [compression_method].",
arg_name);
}
}
validateNamedCollection(collection, required_configuration_keys, optional_configuration_keys);
std::string filename;
for (const auto & key : collection)
{
if (key == "url")
configuration.url = collection.get<String>(key);
else if (key == "access_key_id")
configuration.auth_settings.access_key_id = collection.get<String>(key);
else if (key == "secret_access_key")
configuration.auth_settings.secret_access_key = collection.get<String>(key);
else if (key == "filename")
filename = collection.get<String>(key);
else if (key == "format")
configuration.format = collection.get<String>(key);
else if (key == "compression" || key == "compression_method")
configuration.compression_method = collection.get<String>(key);
else if (key == "structure")
configuration.structure = collection.get<String>(key);
else if (key == "use_environment_credentials")
configuration.auth_settings.use_environment_credentials = collection.get<UInt64>(key);
else if (key == "max_single_read_retries")
configuration.request_settings.max_single_read_retries = collection.get<UInt64>(key);
else if (key == "min_upload_part_size")
configuration.request_settings.min_upload_part_size = collection.get<UInt64>(key);
else if (key == "upload_part_size_multiply_factor")
configuration.request_settings.upload_part_size_multiply_factor = collection.get<UInt64>(key);
else if (key == "upload_part_size_multiply_parts_count_threshold")
configuration.request_settings.upload_part_size_multiply_parts_count_threshold = collection.get<UInt64>(key);
else if (key == "max_single_part_upload_size")
configuration.request_settings.max_single_part_upload_size = collection.get<UInt64>(key);
else if (key == "max_connections")
configuration.request_settings.max_connections = collection.get<UInt64>(key);
else
throw Exception(
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Unknown configuration key `{}` for StorageS3, "
"expected: url, [access_key_id, secret_access_key], "
"name of used format and [compression_method].",
key);
}
if (!filename.empty())
configuration.url = std::filesystem::path(configuration.url) / filename;
}
StorageS3Configuration StorageS3::getConfiguration(ASTs & engine_args, ContextPtr local_context)
{
StorageS3Configuration configuration;
if (auto named_collection = getURLBasedDataSourceConfiguration(engine_args, local_context))
if (auto named_collection = tryGetNamedCollectionWithOverrides(engine_args))
{
auto [common_configuration, storage_specific_args] = named_collection.value();
configuration.set(common_configuration);
processNamedCollectionResult(configuration, storage_specific_args);
processNamedCollectionResult(configuration, *named_collection);
}
else
{

View File

@ -30,6 +30,8 @@ namespace DB
class PullingPipelineExecutor;
class StorageS3SequentialSource;
class NamedCollection;
class StorageS3Source : public ISource, WithContext
{
public:
@ -216,7 +218,7 @@ public:
ContextPtr ctx,
std::unordered_map<String, S3::ObjectInfo> * object_infos = nullptr);
static void processNamedCollectionResult(StorageS3Configuration & configuration, const std::vector<std::pair<String, ASTPtr>> & key_value_args);
static void processNamedCollectionResult(StorageS3Configuration & configuration, const NamedCollection & collection);
struct S3Configuration
{

View File

@ -8,7 +8,7 @@
#include <Access/Common/AccessType.h>
#include <Access/Common/AccessFlags.h>
#include <Columns/ColumnMap.h>
#include <Storages/NamedCollections.h>
#include <Storages/NamedCollections/NamedCollections.h>
namespace DB

View File

@ -1,6 +1,6 @@
#include <Common/tests/gtest_global_context.h>
#include <Storages/NamedCollections.h>
#include <Storages/NamedCollectionUtils.h>
#include <Storages/NamedCollections/NamedCollections.h>
#include <Storages/NamedCollections/NamedCollectionUtils.h>
#include <Poco/Util/XMLConfiguration.h>
#include <Poco/DOM/DOMParser.h>
#include <gtest/gtest.h>

View File

@ -13,6 +13,7 @@
#include <Storages/checkAndGetLiteralArgument.h>
#include <Storages/StorageS3.h>
#include <Storages/StorageURL.h>
#include <Storages/NamedCollections/NamedCollectionsHelpers.h>
#include <Formats/FormatFactory.h>
#include "registerTableFunctions.h"
#include <filesystem>
@ -30,11 +31,9 @@ namespace ErrorCodes
/// This is needed to avoid copy-pase. Because s3Cluster arguments only differ in additional argument (first) - cluster name
void TableFunctionS3::parseArgumentsImpl(const String & error_message, ASTs & args, ContextPtr context, StorageS3Configuration & s3_configuration)
{
if (auto named_collection = getURLBasedDataSourceConfiguration(args, context))
if (auto named_collection = tryGetNamedCollectionWithOverrides(args))
{
auto [common_configuration, storage_specific_args] = named_collection.value();
s3_configuration.set(common_configuration);
StorageS3::processNamedCollectionResult(s3_configuration, storage_specific_args);
StorageS3::processNamedCollectionResult(s3_configuration, *named_collection);
}
else
{