diff --git a/docs/en/interfaces/formats.md b/docs/en/interfaces/formats.md index 9b8354f23a2..58e986cc2f3 100644 --- a/docs/en/interfaces/formats.md +++ b/docs/en/interfaces/formats.md @@ -1020,6 +1020,62 @@ Example: } ``` +To use object name as column value you can use special setting [format_json_object_each_row_column_for_object_name](../operations/settings/settings.md#format_json_object_each_row_column_for_object_name). Value of this setting is set to the name of a column, that is used as JSON key for a row in resulting object. +Examples: + +For output: + +Let's say we have table `test` with two columns: +``` +┌─object_name─┬─number─┐ +│ first_obj │ 1 │ +│ second_obj │ 2 │ +│ third_obj │ 3 │ +└─────────────┴────────┘ +``` +Let's output it in `JSONObjectEachRow` format and use `format_json_object_each_row_column_for_object_name` setting: + +```sql +select * from test settings format_json_object_each_row_column_for_object_name='object_name' +``` + +The output: +```json +{ + "first_obj": {"number": 1}, + "second_obj": {"number": 2}, + "third_obj": {"number": 3} +} +``` + +For input: + +Let's say we stored output from previous example in a file with name `data.json`: +```sql +select * from file('data.json', JSONObjectEachRow, 'object_name String, number UInt64') settings format_json_object_each_row_column_for_object_name='object_name' +``` + +``` +┌─object_name─┬─number─┐ +│ first_obj │ 1 │ +│ second_obj │ 2 │ +│ third_obj │ 3 │ +└─────────────┴────────┘ +``` + +It also works in schema inference: + +```sql +desc file('data.json', JSONObjectEachRow) settings format_json_object_each_row_column_for_object_name='object_name' +``` + +``` +┌─name────────┬─type────────────┐ +│ object_name │ String │ +│ number │ Nullable(Int64) │ +└─────────────┴─────────────────┘ +``` + ### Inserting Data {#json-inserting-data} diff --git a/docs/en/interfaces/third-party/client-libraries.md b/docs/en/interfaces/third-party/client-libraries.md index e085566aa7e..c26532c98cb 100644 --- a/docs/en/interfaces/third-party/client-libraries.md +++ b/docs/en/interfaces/third-party/client-libraries.md @@ -41,6 +41,7 @@ ClickHouse Inc does **not** maintain the libraries listed below and hasn’t don - [node-clickhouse](https://github.com/apla/node-clickhouse) - [nestjs-clickhouse](https://github.com/depyronick/nestjs-clickhouse) - [clickhouse-client](https://github.com/depyronick/clickhouse-client) + - [node-clickhouse-orm](https://github.com/zimv/node-clickhouse-orm) - Perl - [perl-DBD-ClickHouse](https://github.com/elcamlost/perl-DBD-ClickHouse) - [HTTP-ClickHouse](https://metacpan.org/release/HTTP-ClickHouse) diff --git a/docs/en/operations/settings/settings.md b/docs/en/operations/settings/settings.md index efdce2d4a88..dfcef4ae200 100644 --- a/docs/en/operations/settings/settings.md +++ b/docs/en/operations/settings/settings.md @@ -3902,6 +3902,13 @@ Controls validation of UTF-8 sequences in JSON output formats, doesn't impact fo Disabled by default. +### format_json_object_each_row_column_for_object_name {#format_json_object_each_row_column_for_object_name} + +The name of column that will be used for storing/writing object names in [JSONObjectEachRow](../../interfaces/formats.md#jsonobjecteachrow) format. +Column type should be String. If value is empty, default names `row_{i}`will be used for object names. + +Default value: ''. + ## TSV format settings {#tsv-format-settings} ### input_format_tsv_empty_as_default {#input_format_tsv_empty_as_default} diff --git a/docs/ru/interfaces/third-party/client-libraries.md b/docs/ru/interfaces/third-party/client-libraries.md index ce9f94d5d74..b000208b53b 100644 --- a/docs/ru/interfaces/third-party/client-libraries.md +++ b/docs/ru/interfaces/third-party/client-libraries.md @@ -34,6 +34,7 @@ sidebar_label: "Клиентские библиотеки от сторонни - [node-clickhouse](https://github.com/apla/node-clickhouse) - [nestjs-clickhouse](https://github.com/depyronick/nestjs-clickhouse) - [clickhouse-client](https://github.com/depyronick/clickhouse-client) + - [node-clickhouse-orm](https://github.com/zimv/node-clickhouse-orm) - Perl - [perl-DBD-ClickHouse](https://github.com/elcamlost/perl-DBD-ClickHouse) - [HTTP-ClickHouse](https://metacpan.org/release/HTTP-ClickHouse) diff --git a/docs/zh/interfaces/third-party/client-libraries.md b/docs/zh/interfaces/third-party/client-libraries.md index d4959e37668..1b7bff02b1a 100644 --- a/docs/zh/interfaces/third-party/client-libraries.md +++ b/docs/zh/interfaces/third-party/client-libraries.md @@ -35,6 +35,9 @@ Yandex**没有**维护下面列出的库,也没有做过任何广泛的测试 - NodeJs - [clickhouse (NodeJs)](https://github.com/TimonKK/clickhouse) - [node-clickhouse](https://github.com/apla/node-clickhouse) + - [nestjs-clickhouse](https://github.com/depyronick/nestjs-clickhouse) + - [clickhouse-client](https://github.com/depyronick/clickhouse-client) + - [node-clickhouse-orm](https://github.com/zimv/node-clickhouse-orm) - Perl - [perl-DBD-ClickHouse](https://github.com/elcamlost/perl-DBD-ClickHouse) - [HTTP-ClickHouse](https://metacpan.org/release/HTTP-ClickHouse) diff --git a/programs/disks/DisksApp.cpp b/programs/disks/DisksApp.cpp index b662921a3b1..749ccb3e503 100644 --- a/programs/disks/DisksApp.cpp +++ b/programs/disks/DisksApp.cpp @@ -58,7 +58,7 @@ void DisksApp::addOptions( ("disk", po::value(), "Set disk name") ("command_name", po::value(), "Name for command to do") ("send-logs", "Send logs") - ("log-level", "Logging level") + ("log-level", po::value(), "Logging level") ; positional_options_description.add("command_name", 1); diff --git a/programs/local/LocalServer.cpp b/programs/local/LocalServer.cpp index ffec435239e..e7bc019f597 100644 --- a/programs/local/LocalServer.cpp +++ b/programs/local/LocalServer.cpp @@ -13,7 +13,6 @@ #include #include #include -#include #include #include #include @@ -32,6 +31,7 @@ #include #include #include +#include #include #include #include @@ -602,8 +602,6 @@ void LocalServer::processConfig() global_context->setCurrentDatabase(default_database); applyCmdOptions(global_context); - bool enable_objects_loader = false; - if (config().has("path")) { String path = global_context->getPath(); @@ -611,12 +609,6 @@ void LocalServer::processConfig() /// Lock path directory before read status.emplace(fs::path(path) / "status", StatusFile::write_full_info); - LOG_DEBUG(log, "Loading user defined objects from {}", path); - Poco::File(path + "user_defined/").createDirectories(); - UserDefinedSQLObjectsLoader::instance().loadObjects(global_context); - enable_objects_loader = true; - LOG_DEBUG(log, "Loaded user defined objects."); - LOG_DEBUG(log, "Loading metadata from {}", path); loadMetadataSystem(global_context); attachSystemTablesLocal(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::SYSTEM_DATABASE)); @@ -630,6 +622,9 @@ void LocalServer::processConfig() DatabaseCatalog::instance().loadDatabases(); } + /// For ClickHouse local if path is not set the loader will be disabled. + global_context->getUserDefinedSQLObjectsLoader().loadObjects(); + LOG_DEBUG(log, "Loaded metadata."); } else if (!config().has("no-system-tables")) @@ -639,9 +634,6 @@ void LocalServer::processConfig() attachInformationSchema(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::INFORMATION_SCHEMA_UPPERCASE)); } - /// Persist SQL user defined objects only if user_defined folder was created - UserDefinedSQLObjectsLoader::instance().enable(enable_objects_loader); - server_display_name = config().getString("display_name", getFQDNOrHostName()); prompt_by_server_display_name = config().getRawString("prompt_by_server_display_name.default", "{display_name} :) "); std::map prompt_substitutions{{"display_name", server_display_name}}; diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp index aed586a86f6..a296f87b084 100644 --- a/programs/server/Server.cpp +++ b/programs/server/Server.cpp @@ -53,7 +53,6 @@ #include #include #include -#include #include #include #include @@ -62,6 +61,7 @@ #include #include #include +#include #include #include #include @@ -1010,12 +1010,6 @@ int Server::main(const std::vector & /*args*/) fs::create_directories(user_scripts_path); } - { - std::string user_defined_path = config().getString("user_defined_path", path / "user_defined/"); - global_context->setUserDefinedPath(user_defined_path); - fs::create_directories(user_defined_path); - } - /// top_level_domains_lists { const std::string & top_level_domains_path = config().getString("top_level_domains_path", path / "top_level_domains/"); @@ -1559,18 +1553,6 @@ int Server::main(const std::vector & /*args*/) /// system logs may copy global context. global_context->setCurrentDatabaseNameInGlobalContext(default_database); - LOG_INFO(log, "Loading user defined objects from {}", path_str); - try - { - UserDefinedSQLObjectsLoader::instance().loadObjects(global_context); - } - catch (...) - { - tryLogCurrentException(log, "Caught exception while loading user defined objects"); - throw; - } - LOG_DEBUG(log, "Loaded user defined objects"); - LOG_INFO(log, "Loading metadata from {}", path_str); try @@ -1598,6 +1580,8 @@ int Server::main(const std::vector & /*args*/) database_catalog.loadDatabases(); /// After loading validate that default database exists database_catalog.assertDatabaseExists(default_database); + /// Load user-defined SQL functions. + global_context->getUserDefinedSQLObjectsLoader().loadObjects(); } catch (...) { diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index f33744e918d..419a34fc2ab 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -284,6 +284,7 @@ add_object_library(clickhouse_processors_ttl Processors/TTL) add_object_library(clickhouse_processors_merges_algorithms Processors/Merges/Algorithms) add_object_library(clickhouse_processors_queryplan Processors/QueryPlan) add_object_library(clickhouse_processors_queryplan_optimizations Processors/QueryPlan/Optimizations) +add_object_library(clickhouse_user_defined_functions Functions/UserDefined) if (TARGET ch_contrib::nuraft) add_object_library(clickhouse_coordination Coordination) diff --git a/src/Client/QueryFuzzer.cpp b/src/Client/QueryFuzzer.cpp index e6905d8dc3c..f0c4313e8a8 100644 --- a/src/Client/QueryFuzzer.cpp +++ b/src/Client/QueryFuzzer.cpp @@ -236,6 +236,21 @@ ASTPtr QueryFuzzer::getRandomColumnLike() return new_ast; } +ASTPtr QueryFuzzer::getRandomExpressionList() +{ + if (column_like.empty()) + { + return nullptr; + } + + ASTPtr new_ast = std::make_shared(); + for (size_t i = 0; i < fuzz_rand() % 5 + 1; ++i) + { + new_ast->children.push_back(getRandomColumnLike()); + } + return new_ast; +} + void QueryFuzzer::replaceWithColumnLike(ASTPtr & ast) { if (column_like.empty()) @@ -841,7 +856,52 @@ void QueryFuzzer::fuzz(ASTPtr & ast) else if (auto * select = typeid_cast(ast.get())) { fuzzColumnLikeExpressionList(select->select().get()); - fuzzColumnLikeExpressionList(select->groupBy().get()); + + if (select->groupBy().get()) + { + if (fuzz_rand() % 50 == 0) + { + select->groupBy()->children.clear(); + select->setExpression(ASTSelectQuery::Expression::GROUP_BY, {}); + select->group_by_with_grouping_sets = false; + select->group_by_with_rollup = false; + select->group_by_with_cube = false; + select->group_by_with_totals = true; + } + else if (fuzz_rand() % 100 == 0) + { + select->group_by_with_grouping_sets = !select->group_by_with_grouping_sets; + } + else if (fuzz_rand() % 100 == 0) + { + select->group_by_with_rollup = !select->group_by_with_rollup; + } + else if (fuzz_rand() % 100 == 0) + { + select->group_by_with_cube = !select->group_by_with_cube; + } + else if (fuzz_rand() % 100 == 0) + { + select->group_by_with_totals = !select->group_by_with_totals; + } + } + else if (fuzz_rand() % 50 == 0) + { + select->setExpression(ASTSelectQuery::Expression::GROUP_BY, getRandomExpressionList()); + } + + if (select->where().get()) + { + if (fuzz_rand() % 50 == 0) + { + select->where()->children.clear(); + select->setExpression(ASTSelectQuery::Expression::WHERE, {}); + } + } + else if (fuzz_rand() % 50 == 0) + { + select->setExpression(ASTSelectQuery::Expression::WHERE, getRandomColumnLike()); + } fuzzOrderByList(select->orderBy().get()); fuzz(select->children); diff --git a/src/Client/QueryFuzzer.h b/src/Client/QueryFuzzer.h index 9afe7867dd2..bdfdeb67663 100644 --- a/src/Client/QueryFuzzer.h +++ b/src/Client/QueryFuzzer.h @@ -8,6 +8,7 @@ #include #include +#include "Parsers/IAST_fwd.h" #include #include @@ -72,6 +73,7 @@ struct QueryFuzzer Field getRandomField(int type); Field fuzzField(Field field); ASTPtr getRandomColumnLike(); + ASTPtr getRandomExpressionList(); DataTypePtr fuzzDataType(DataTypePtr type); DataTypePtr getRandomType(); ASTs getInsertQueriesForFuzzedTables(const String & full_query); diff --git a/src/Common/ZooKeeper/ZooKeeper.cpp b/src/Common/ZooKeeper/ZooKeeper.cpp index 78124c7891a..406d8b27c39 100644 --- a/src/Common/ZooKeeper/ZooKeeper.cpp +++ b/src/Common/ZooKeeper/ZooKeeper.cpp @@ -99,7 +99,7 @@ void ZooKeeper::init(ZooKeeperArgs args_) if (dns_error) throw KeeperException("Cannot resolve any of provided ZooKeeper hosts due to DNS error", Coordination::Error::ZCONNECTIONLOSS); else - throw KeeperException("Cannot use any of provided ZooKeeper nodes", Coordination::Error::ZBADARGUMENTS); + throw KeeperException("Cannot use any of provided ZooKeeper nodes", Coordination::Error::ZCONNECTIONLOSS); } impl = std::make_unique(nodes, args, zk_log); diff --git a/src/Core/Settings.h b/src/Core/Settings.h index 7f649826893..1f784497b34 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -776,6 +776,8 @@ static constexpr UInt64 operator""_GiB(unsigned long long value) M(Bool, output_format_json_array_of_rows, false, "Output a JSON array of all rows in JSONEachRow(Compact) format.", 0) \ M(Bool, output_format_json_validate_utf8, false, "Validate UTF-8 sequences in JSON output formats, doesn't impact formats JSON/JSONCompact/JSONColumnsWithMetadata, they always validate utf8", 0) \ \ + M(String, format_json_object_each_row_column_for_object_name, "", "The name of column that will be used as object names in JSONObjectEachRow format. Column type should be String", 0) \ + \ M(UInt64, output_format_pretty_max_rows, 10000, "Rows limit for Pretty formats.", 0) \ M(UInt64, output_format_pretty_max_column_pad_width, 250, "Maximum width to pad all values in a column in Pretty formats.", 0) \ M(UInt64, output_format_pretty_max_value_width, 10000, "Maximum width of value to display in Pretty formats. If greater - it will be cut.", 0) \ diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp index 8afeef3b315..bfe651dd1af 100644 --- a/src/Formats/FormatFactory.cpp +++ b/src/Formats/FormatFactory.cpp @@ -100,6 +100,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings) format_settings.json.try_infer_numbers_from_strings = settings.input_format_json_try_infer_numbers_from_strings; format_settings.json.validate_types_from_metadata = settings.input_format_json_validate_types_from_metadata; format_settings.json.validate_utf8 = settings.output_format_json_validate_utf8; + format_settings.json_object_each_row.column_for_object_name = settings.format_json_object_each_row_column_for_object_name; format_settings.json.try_infer_objects = context->getSettingsRef().allow_experimental_object_type; format_settings.null_as_default = settings.input_format_null_as_default; format_settings.decimal_trailing_zeros = settings.output_format_decimal_trailing_zeros; diff --git a/src/Formats/FormatSettings.h b/src/Formats/FormatSettings.h index 66888df7e43..3ff227c5b56 100644 --- a/src/Formats/FormatSettings.h +++ b/src/Formats/FormatSettings.h @@ -156,6 +156,11 @@ struct FormatSettings bool try_infer_objects = false; } json; + struct + { + String column_for_object_name; + } json_object_each_row; + struct { UInt64 row_group_size = 1000000; diff --git a/src/Interpreters/ExternalUserDefinedExecutableFunctionsLoader.cpp b/src/Functions/UserDefined/ExternalUserDefinedExecutableFunctionsLoader.cpp similarity index 98% rename from src/Interpreters/ExternalUserDefinedExecutableFunctionsLoader.cpp rename to src/Functions/UserDefined/ExternalUserDefinedExecutableFunctionsLoader.cpp index 8c7220a85da..d4ecbf66987 100644 --- a/src/Interpreters/ExternalUserDefinedExecutableFunctionsLoader.cpp +++ b/src/Functions/UserDefined/ExternalUserDefinedExecutableFunctionsLoader.cpp @@ -5,8 +5,8 @@ #include -#include -#include +#include +#include #include #include diff --git a/src/Interpreters/ExternalUserDefinedExecutableFunctionsLoader.h b/src/Functions/UserDefined/ExternalUserDefinedExecutableFunctionsLoader.h similarity index 94% rename from src/Interpreters/ExternalUserDefinedExecutableFunctionsLoader.h rename to src/Functions/UserDefined/ExternalUserDefinedExecutableFunctionsLoader.h index 4d4843e8677..1a62175eb0c 100644 --- a/src/Interpreters/ExternalUserDefinedExecutableFunctionsLoader.h +++ b/src/Functions/UserDefined/ExternalUserDefinedExecutableFunctionsLoader.h @@ -4,7 +4,7 @@ #include #include -#include +#include namespace DB { diff --git a/src/Functions/UserDefined/IUserDefinedSQLObjectsLoader.h b/src/Functions/UserDefined/IUserDefinedSQLObjectsLoader.h new file mode 100644 index 00000000000..4c7850951b5 --- /dev/null +++ b/src/Functions/UserDefined/IUserDefinedSQLObjectsLoader.h @@ -0,0 +1,47 @@ +#pragma once + +#include + + +namespace DB +{ +class IAST; +struct Settings; +enum class UserDefinedSQLObjectType; + +/// Interface for a loader of user-defined SQL objects. +/// Implementations: UserDefinedSQLLoaderFromDisk, UserDefinedSQLLoaderFromZooKeeper +class IUserDefinedSQLObjectsLoader +{ +public: + virtual ~IUserDefinedSQLObjectsLoader() = default; + + /// Whether this loader can replicate SQL objects to another node. + virtual bool isReplicated() const { return false; } + virtual String getReplicationID() const { return ""; } + + /// Loads all objects. Can be called once - if objects are already loaded the function does nothing. + virtual void loadObjects() = 0; + + /// Stops watching. + virtual void stopWatching() {} + + /// Immediately reloads all objects, throws an exception if failed. + virtual void reloadObjects() = 0; + + /// Immediately reloads a specified object only. + virtual void reloadObject(UserDefinedSQLObjectType object_type, const String & object_name) = 0; + + /// Stores an object (must be called only by UserDefinedSQLFunctionFactory::registerFunction). + virtual bool storeObject( + UserDefinedSQLObjectType object_type, + const String & object_name, + const IAST & create_object_query, + bool throw_if_exists, + bool replace_if_exists, + const Settings & settings) = 0; + + /// Removes an object (must be called only by UserDefinedSQLFunctionFactory::unregisterFunction). + virtual bool removeObject(UserDefinedSQLObjectType object_type, const String & object_name, bool throw_if_not_exists) = 0; +}; +} diff --git a/src/Interpreters/UserDefinedExecutableFunction.cpp b/src/Functions/UserDefined/UserDefinedExecutableFunction.cpp similarity index 100% rename from src/Interpreters/UserDefinedExecutableFunction.cpp rename to src/Functions/UserDefined/UserDefinedExecutableFunction.cpp diff --git a/src/Interpreters/UserDefinedExecutableFunction.h b/src/Functions/UserDefined/UserDefinedExecutableFunction.h similarity index 100% rename from src/Interpreters/UserDefinedExecutableFunction.h rename to src/Functions/UserDefined/UserDefinedExecutableFunction.h diff --git a/src/Interpreters/UserDefinedExecutableFunctionFactory.cpp b/src/Functions/UserDefined/UserDefinedExecutableFunctionFactory.cpp similarity index 99% rename from src/Interpreters/UserDefinedExecutableFunctionFactory.cpp rename to src/Functions/UserDefined/UserDefinedExecutableFunctionFactory.cpp index 18784609397..3f3cfc4c8e3 100644 --- a/src/Interpreters/UserDefinedExecutableFunctionFactory.cpp +++ b/src/Functions/UserDefined/UserDefinedExecutableFunctionFactory.cpp @@ -12,9 +12,9 @@ #include #include +#include #include #include -#include #include #include diff --git a/src/Interpreters/UserDefinedExecutableFunctionFactory.h b/src/Functions/UserDefined/UserDefinedExecutableFunctionFactory.h similarity index 100% rename from src/Interpreters/UserDefinedExecutableFunctionFactory.h rename to src/Functions/UserDefined/UserDefinedExecutableFunctionFactory.h diff --git a/src/Functions/UserDefined/UserDefinedSQLFunctionFactory.cpp b/src/Functions/UserDefined/UserDefinedSQLFunctionFactory.cpp new file mode 100644 index 00000000000..622854b3508 --- /dev/null +++ b/src/Functions/UserDefined/UserDefinedSQLFunctionFactory.cpp @@ -0,0 +1,301 @@ +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int FUNCTION_ALREADY_EXISTS; + extern const int UNKNOWN_FUNCTION; + extern const int CANNOT_DROP_FUNCTION; + extern const int CANNOT_CREATE_RECURSIVE_FUNCTION; + extern const int UNSUPPORTED_METHOD; +} + + +namespace +{ + void validateFunctionRecursiveness(const IAST & node, const String & function_to_create) + { + for (const auto & child : node.children) + { + auto function_name_opt = tryGetFunctionName(child); + if (function_name_opt && function_name_opt.value() == function_to_create) + throw Exception(ErrorCodes::CANNOT_CREATE_RECURSIVE_FUNCTION, "You cannot create recursive function"); + + validateFunctionRecursiveness(*child, function_to_create); + } + } + + void validateFunction(ASTPtr function, const String & name) + { + ASTFunction * lambda_function = function->as(); + + if (!lambda_function) + throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Expected function, got: {}", function->formatForErrorMessage()); + + auto & lambda_function_expression_list = lambda_function->arguments->children; + + if (lambda_function_expression_list.size() != 2) + throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Lambda must have arguments and body"); + + const ASTFunction * tuple_function_arguments = lambda_function_expression_list[0]->as(); + + if (!tuple_function_arguments || !tuple_function_arguments->arguments) + throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Lambda must have valid arguments"); + + std::unordered_set arguments; + + for (const auto & argument : tuple_function_arguments->arguments->children) + { + const auto * argument_identifier = argument->as(); + + if (!argument_identifier) + throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Lambda argument must be identifier"); + + const auto & argument_name = argument_identifier->name(); + auto [_, inserted] = arguments.insert(argument_name); + if (!inserted) + throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Identifier {} already used as function parameter", argument_name); + } + + ASTPtr function_body = lambda_function_expression_list[1]; + if (!function_body) + throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Lambda must have valid function body"); + + validateFunctionRecursiveness(*function_body, name); + } + + ASTPtr normalizeCreateFunctionQuery(const IAST & create_function_query) + { + auto ptr = create_function_query.clone(); + auto & res = typeid_cast(*ptr); + res.if_not_exists = false; + res.or_replace = false; + FunctionNameNormalizer().visit(res.function_core.get()); + return ptr; + } +} + + +UserDefinedSQLFunctionFactory & UserDefinedSQLFunctionFactory::instance() +{ + static UserDefinedSQLFunctionFactory result; + return result; +} + +void UserDefinedSQLFunctionFactory::checkCanBeRegistered(const ContextPtr & context, const String & function_name, const IAST & create_function_query) +{ + if (FunctionFactory::instance().hasNameOrAlias(function_name)) + throw Exception(ErrorCodes::FUNCTION_ALREADY_EXISTS, "The function '{}' already exists", function_name); + + if (AggregateFunctionFactory::instance().hasNameOrAlias(function_name)) + throw Exception(ErrorCodes::FUNCTION_ALREADY_EXISTS, "The aggregate function '{}' already exists", function_name); + + if (UserDefinedExecutableFunctionFactory::instance().has(function_name, context)) + throw Exception(ErrorCodes::FUNCTION_ALREADY_EXISTS, "User defined executable function '{}' already exists", function_name); + + validateFunction(assert_cast(create_function_query).function_core, function_name); +} + +void UserDefinedSQLFunctionFactory::checkCanBeUnregistered(const ContextPtr & context, const String & function_name) +{ + if (FunctionFactory::instance().hasNameOrAlias(function_name) || + AggregateFunctionFactory::instance().hasNameOrAlias(function_name)) + throw Exception(ErrorCodes::CANNOT_DROP_FUNCTION, "Cannot drop system function '{}'", function_name); + + if (UserDefinedExecutableFunctionFactory::instance().has(function_name, context)) + throw Exception(ErrorCodes::CANNOT_DROP_FUNCTION, "Cannot drop user defined executable function '{}'", function_name); +} + +bool UserDefinedSQLFunctionFactory::registerFunction(const ContextMutablePtr & context, const String & function_name, ASTPtr create_function_query, bool throw_if_exists, bool replace_if_exists) +{ + checkCanBeRegistered(context, function_name, *create_function_query); + create_function_query = normalizeCreateFunctionQuery(*create_function_query); + + std::lock_guard lock{mutex}; + auto it = function_name_to_create_query_map.find(function_name); + if (it != function_name_to_create_query_map.end()) + { + if (throw_if_exists) + throw Exception(ErrorCodes::FUNCTION_ALREADY_EXISTS, "User-defined function '{}' already exists", function_name); + else if (!replace_if_exists) + return false; + } + + try + { + auto & loader = context->getUserDefinedSQLObjectsLoader(); + bool stored = loader.storeObject(UserDefinedSQLObjectType::Function, function_name, *create_function_query, throw_if_exists, replace_if_exists, context->getSettingsRef()); + if (!stored) + return false; + } + catch (Exception & exception) + { + exception.addMessage(fmt::format("while storing user defined function {}", backQuote(function_name))); + throw; + } + + function_name_to_create_query_map[function_name] = create_function_query; + return true; +} + +bool UserDefinedSQLFunctionFactory::unregisterFunction(const ContextMutablePtr & context, const String & function_name, bool throw_if_not_exists) +{ + checkCanBeUnregistered(context, function_name); + + std::lock_guard lock(mutex); + auto it = function_name_to_create_query_map.find(function_name); + if (it == function_name_to_create_query_map.end()) + { + if (throw_if_not_exists) + throw Exception(ErrorCodes::UNKNOWN_FUNCTION, "User-defined function '{}' doesn't exist", function_name); + else + return false; + } + + try + { + auto & loader = context->getUserDefinedSQLObjectsLoader(); + bool removed = loader.removeObject(UserDefinedSQLObjectType::Function, function_name, throw_if_not_exists); + if (!removed) + return false; + } + catch (Exception & exception) + { + exception.addMessage(fmt::format("while removing user defined function {}", backQuote(function_name))); + throw; + } + + function_name_to_create_query_map.erase(function_name); + return true; +} + +ASTPtr UserDefinedSQLFunctionFactory::get(const String & function_name) const +{ + std::lock_guard lock(mutex); + + auto it = function_name_to_create_query_map.find(function_name); + if (it == function_name_to_create_query_map.end()) + throw Exception(ErrorCodes::UNKNOWN_FUNCTION, + "The function name '{}' is not registered", + function_name); + + return it->second; +} + +ASTPtr UserDefinedSQLFunctionFactory::tryGet(const std::string & function_name) const +{ + std::lock_guard lock(mutex); + + auto it = function_name_to_create_query_map.find(function_name); + if (it == function_name_to_create_query_map.end()) + return nullptr; + + return it->second; +} + +bool UserDefinedSQLFunctionFactory::has(const String & function_name) const +{ + return tryGet(function_name) != nullptr; +} + +std::vector UserDefinedSQLFunctionFactory::getAllRegisteredNames() const +{ + std::vector registered_names; + + std::lock_guard lock(mutex); + registered_names.reserve(function_name_to_create_query_map.size()); + + for (const auto & [name, _] : function_name_to_create_query_map) + registered_names.emplace_back(name); + + return registered_names; +} + +bool UserDefinedSQLFunctionFactory::empty() const +{ + std::lock_guard lock(mutex); + return function_name_to_create_query_map.empty(); +} + +void UserDefinedSQLFunctionFactory::backup(BackupEntriesCollector & backup_entries_collector, const String & data_path_in_backup) const +{ + backupUserDefinedSQLObjects(backup_entries_collector, data_path_in_backup, UserDefinedSQLObjectType::Function, getAllFunctions()); +} + +void UserDefinedSQLFunctionFactory::restore(RestorerFromBackup & restorer, const String & data_path_in_backup) +{ + auto restored_functions = restoreUserDefinedSQLObjects(restorer, data_path_in_backup, UserDefinedSQLObjectType::Function); + const auto & restore_settings = restorer.getRestoreSettings(); + bool throw_if_exists = (restore_settings.create_function == RestoreUDFCreationMode::kCreate); + bool replace_if_exists = (restore_settings.create_function == RestoreUDFCreationMode::kReplace); + auto context = restorer.getContext(); + for (const auto & [function_name, create_function_query] : restored_functions) + registerFunction(context, function_name, create_function_query, throw_if_exists, replace_if_exists); +} + +void UserDefinedSQLFunctionFactory::setAllFunctions(const std::vector> & new_functions) +{ + std::unordered_map normalized_functions; + for (const auto & [function_name, create_query] : new_functions) + normalized_functions[function_name] = normalizeCreateFunctionQuery(*create_query); + + std::lock_guard lock(mutex); + function_name_to_create_query_map = std::move(normalized_functions); +} + +std::vector> UserDefinedSQLFunctionFactory::getAllFunctions() const +{ + std::lock_guard lock{mutex}; + std::vector> all_functions; + all_functions.reserve(function_name_to_create_query_map.size()); + std::copy(function_name_to_create_query_map.begin(), function_name_to_create_query_map.end(), std::back_inserter(all_functions)); + return all_functions; +} + +void UserDefinedSQLFunctionFactory::setFunction(const String & function_name, const IAST & create_function_query) +{ + std::lock_guard lock(mutex); + function_name_to_create_query_map[function_name] = normalizeCreateFunctionQuery(create_function_query); +} + +void UserDefinedSQLFunctionFactory::removeFunction(const String & function_name) +{ + std::lock_guard lock(mutex); + function_name_to_create_query_map.erase(function_name); +} + +void UserDefinedSQLFunctionFactory::removeAllFunctionsExcept(const Strings & function_names_to_keep) +{ + boost::container::flat_set names_set_to_keep{function_names_to_keep.begin(), function_names_to_keep.end()}; + std::lock_guard lock(mutex); + for (auto it = function_name_to_create_query_map.begin(); it != function_name_to_create_query_map.end();) + { + auto current = it++; + if (!names_set_to_keep.contains(current->first)) + function_name_to_create_query_map.erase(current); + } +} + +std::unique_lock UserDefinedSQLFunctionFactory::getLock() const +{ + return std::unique_lock{mutex}; +} + +} diff --git a/src/Functions/UserDefined/UserDefinedSQLFunctionFactory.h b/src/Functions/UserDefined/UserDefinedSQLFunctionFactory.h new file mode 100644 index 00000000000..45196759d3b --- /dev/null +++ b/src/Functions/UserDefined/UserDefinedSQLFunctionFactory.h @@ -0,0 +1,70 @@ +#pragma once + +#include +#include + +#include + +#include +#include + + +namespace DB +{ +class BackupEntriesCollector; +class RestorerFromBackup; + +/// Factory for SQLUserDefinedFunctions +class UserDefinedSQLFunctionFactory : public IHints<1, UserDefinedSQLFunctionFactory> +{ +public: + static UserDefinedSQLFunctionFactory & instance(); + + /// Register function for function_name in factory for specified create_function_query. + bool registerFunction(const ContextMutablePtr & context, const String & function_name, ASTPtr create_function_query, bool throw_if_exists, bool replace_if_exists); + + /// Unregister function for function_name. + bool unregisterFunction(const ContextMutablePtr & context, const String & function_name, bool throw_if_not_exists); + + /// Get function create query for function_name. If no function registered with function_name throws exception. + ASTPtr get(const String & function_name) const; + + /// Get function create query for function_name. If no function registered with function_name return nullptr. + ASTPtr tryGet(const String & function_name) const; + + /// Check if function with function_name registered. + bool has(const String & function_name) const; + + /// Get all user defined functions registered names. + std::vector getAllRegisteredNames() const override; + + /// Check whether any UDFs have been registered + bool empty() const; + + /// Makes backup entries for all user-defined SQL functions. + void backup(BackupEntriesCollector & backup_entries_collector, const String & data_path_in_backup) const; + + /// Restores user-defined SQL functions from the backup. + void restore(RestorerFromBackup & restorer, const String & data_path_in_backup); + +private: + friend class UserDefinedSQLObjectsLoaderFromDisk; + friend class UserDefinedSQLObjectsLoaderFromZooKeeper; + + /// Checks that a specified function can be registered, throws an exception if not. + static void checkCanBeRegistered(const ContextPtr & context, const String & function_name, const IAST & create_function_query); + static void checkCanBeUnregistered(const ContextPtr & context, const String & function_name); + + /// The following functions must be called only by the loader. + void setAllFunctions(const std::vector> & new_functions); + std::vector> getAllFunctions() const; + void setFunction(const String & function_name, const IAST & create_function_query); + void removeFunction(const String & function_name); + void removeAllFunctionsExcept(const Strings & function_names_to_keep); + std::unique_lock getLock() const; + + std::unordered_map function_name_to_create_query_map; + mutable std::recursive_mutex mutex; +}; + +} diff --git a/src/Interpreters/UserDefinedSQLFunctionVisitor.cpp b/src/Functions/UserDefined/UserDefinedSQLFunctionVisitor.cpp similarity index 98% rename from src/Interpreters/UserDefinedSQLFunctionVisitor.cpp rename to src/Functions/UserDefined/UserDefinedSQLFunctionVisitor.cpp index 1adb3d5819a..9bb0abc6369 100644 --- a/src/Interpreters/UserDefinedSQLFunctionVisitor.cpp +++ b/src/Functions/UserDefined/UserDefinedSQLFunctionVisitor.cpp @@ -8,7 +8,7 @@ #include #include #include -#include +#include namespace DB diff --git a/src/Interpreters/UserDefinedSQLFunctionVisitor.h b/src/Functions/UserDefined/UserDefinedSQLFunctionVisitor.h similarity index 100% rename from src/Interpreters/UserDefinedSQLFunctionVisitor.h rename to src/Functions/UserDefined/UserDefinedSQLFunctionVisitor.h diff --git a/src/Functions/UserDefined/UserDefinedSQLObjectType.h b/src/Functions/UserDefined/UserDefinedSQLObjectType.h new file mode 100644 index 00000000000..f7e6fff5cad --- /dev/null +++ b/src/Functions/UserDefined/UserDefinedSQLObjectType.h @@ -0,0 +1,12 @@ +#pragma once + + +namespace DB +{ + +enum class UserDefinedSQLObjectType +{ + Function +}; + +} diff --git a/src/Functions/UserDefined/UserDefinedSQLObjectsBackup.cpp b/src/Functions/UserDefined/UserDefinedSQLObjectsBackup.cpp new file mode 100644 index 00000000000..60f0219e92d --- /dev/null +++ b/src/Functions/UserDefined/UserDefinedSQLObjectsBackup.cpp @@ -0,0 +1,103 @@ +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int CANNOT_RESTORE_TABLE; +} + +void backupUserDefinedSQLObjects( + BackupEntriesCollector & backup_entries_collector, + const String & data_path_in_backup, + UserDefinedSQLObjectType /* object_type */, + const std::vector> & objects) +{ + std::vector> backup_entries; + backup_entries.reserve(objects.size()); + for (const auto & [function_name, create_function_query] : objects) + backup_entries.emplace_back( + escapeForFileName(function_name) + ".sql", std::make_shared(queryToString(create_function_query))); + + fs::path data_path_in_backup_fs{data_path_in_backup}; + for (const auto & entry : backup_entries) + backup_entries_collector.addBackupEntry(data_path_in_backup_fs / entry.first, entry.second); +} + + +std::vector> +restoreUserDefinedSQLObjects(RestorerFromBackup & restorer, const String & data_path_in_backup, UserDefinedSQLObjectType object_type) +{ + auto context = restorer.getContext(); + auto backup = restorer.getBackup(); + fs::path data_path_in_backup_fs{data_path_in_backup}; + + Strings filenames = backup->listFiles(data_path_in_backup); + if (filenames.empty()) + return {}; /// Nothing to restore. + + for (const auto & filename : filenames) + { + if (!filename.ends_with(".sql")) + { + throw Exception( + ErrorCodes::CANNOT_RESTORE_TABLE, + "Cannot restore user-defined SQL objects: File name {} doesn't have the extension .sql", + String{data_path_in_backup_fs / filename}); + } + } + + std::vector> res; + + for (const auto & filename : filenames) + { + String escaped_function_name = filename.substr(0, filename.length() - strlen(".sql")); + String function_name = unescapeForFileName(escaped_function_name); + + String filepath = data_path_in_backup_fs / filename; + auto backup_entry = backup->readFile(filepath); + auto in = backup_entry->getReadBuffer(); + String statement_def; + readStringUntilEOF(statement_def, *in); + + ASTPtr ast; + + switch (object_type) + { + case UserDefinedSQLObjectType::Function: + { + ParserCreateFunctionQuery parser; + ast = parseQuery( + parser, + statement_def.data(), + statement_def.data() + statement_def.size(), + "in file " + filepath + " from backup " + backup->getName(), + 0, + context->getSettingsRef().max_parser_depth); + break; + } + } + + res.emplace_back(std::move(function_name), ast); + } + + return res; +} + +} diff --git a/src/Functions/UserDefined/UserDefinedSQLObjectsBackup.h b/src/Functions/UserDefined/UserDefinedSQLObjectsBackup.h new file mode 100644 index 00000000000..a1e970d8af5 --- /dev/null +++ b/src/Functions/UserDefined/UserDefinedSQLObjectsBackup.h @@ -0,0 +1,25 @@ +#pragma once + +#include +#include + + +namespace DB +{ +class BackupEntriesCollector; +class RestorerFromBackup; +enum class UserDefinedSQLObjectType; +class IBackupEntry; +using BackupEntryPtr = std::shared_ptr; + +/// Makes backup entries to backup user-defined SQL objects. +void backupUserDefinedSQLObjects( + BackupEntriesCollector & backup_entries_collector, + const String & data_path_in_backup, + UserDefinedSQLObjectType object_type, + const std::vector> & objects); + +/// Restores user-defined SQL objects from the backup. +std::vector> +restoreUserDefinedSQLObjects(RestorerFromBackup & restorer, const String & data_path_in_backup, UserDefinedSQLObjectType object_type); +} diff --git a/src/Functions/UserDefined/UserDefinedSQLObjectsLoaderFromDisk.cpp b/src/Functions/UserDefined/UserDefinedSQLObjectsLoaderFromDisk.cpp new file mode 100644 index 00000000000..93466be54fb --- /dev/null +++ b/src/Functions/UserDefined/UserDefinedSQLObjectsLoaderFromDisk.cpp @@ -0,0 +1,265 @@ +#include "Functions/UserDefined/UserDefinedSQLObjectsLoaderFromDisk.h" + +#include "Functions/UserDefined/UserDefinedSQLFunctionFactory.h" +#include "Functions/UserDefined/UserDefinedSQLObjectType.h" + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include + +#include +#include +#include + +#include +#include + +#include + +namespace fs = std::filesystem; + + +namespace DB +{ + +namespace ErrorCodes +{ + extern const int DIRECTORY_DOESNT_EXIST; + extern const int FUNCTION_ALREADY_EXISTS; + extern const int UNKNOWN_FUNCTION; +} + + +namespace +{ + /// Converts a path to an absolute path and append it with a separator. + String makeDirectoryPathCanonical(const String & directory_path) + { + auto canonical_directory_path = std::filesystem::weakly_canonical(directory_path); + if (canonical_directory_path.has_filename()) + canonical_directory_path += std::filesystem::path::preferred_separator; + return canonical_directory_path; + } +} + +UserDefinedSQLObjectsLoaderFromDisk::UserDefinedSQLObjectsLoaderFromDisk(const ContextPtr & global_context_, const String & dir_path_) + : global_context(global_context_) + , dir_path{makeDirectoryPathCanonical(dir_path_)} + , log{&Poco::Logger::get("UserDefinedSQLObjectsLoaderFromDisk")} +{ + createDirectory(); +} + + +ASTPtr UserDefinedSQLObjectsLoaderFromDisk::tryLoadObject(UserDefinedSQLObjectType object_type, const String & object_name) +{ + return tryLoadObject(object_type, object_name, getFilePath(object_type, object_name), /* check_file_exists= */ true); +} + + +ASTPtr UserDefinedSQLObjectsLoaderFromDisk::tryLoadObject(UserDefinedSQLObjectType object_type, const String & object_name, const String & path, bool check_file_exists) +{ + LOG_DEBUG(log, "Loading user defined object {} from file {}", backQuote(object_name), path); + + try + { + if (check_file_exists && !fs::exists(path)) + return nullptr; + + /// There is .sql file with user defined object creation statement. + ReadBufferFromFile in(path); + + String object_create_query; + readStringUntilEOF(object_create_query, in); + + switch (object_type) + { + case UserDefinedSQLObjectType::Function: + { + ParserCreateFunctionQuery parser; + ASTPtr ast = parseQuery( + parser, + object_create_query.data(), + object_create_query.data() + object_create_query.size(), + "", + 0, + global_context->getSettingsRef().max_parser_depth); + UserDefinedSQLFunctionFactory::checkCanBeRegistered(global_context, object_name, *ast); + return ast; + } + } + } + catch (...) + { + tryLogCurrentException(log, fmt::format("while loading user defined SQL object {} from path {}", backQuote(object_name), path)); + return nullptr; /// Failed to load this sql object, will ignore it + } +} + + +void UserDefinedSQLObjectsLoaderFromDisk::loadObjects() +{ + if (!objects_loaded) + loadObjectsImpl(); +} + + +void UserDefinedSQLObjectsLoaderFromDisk::reloadObjects() +{ + loadObjectsImpl(); +} + + +void UserDefinedSQLObjectsLoaderFromDisk::loadObjectsImpl() +{ + LOG_INFO(log, "Loading user defined objects from {}", dir_path); + createDirectory(); + + std::vector> function_names_and_queries; + + Poco::DirectoryIterator dir_end; + for (Poco::DirectoryIterator it(dir_path); it != dir_end; ++it) + { + if (it->isDirectory()) + continue; + + const String & file_name = it.name(); + if (!startsWith(file_name, "function_") || !endsWith(file_name, ".sql")) + continue; + + size_t prefix_length = strlen("function_"); + size_t suffix_length = strlen(".sql"); + String function_name = unescapeForFileName(file_name.substr(prefix_length, file_name.length() - prefix_length - suffix_length)); + + if (function_name.empty()) + continue; + + ASTPtr ast = tryLoadObject(UserDefinedSQLObjectType::Function, function_name, dir_path + it.name(), /* check_file_exists= */ false); + if (ast) + function_names_and_queries.emplace_back(function_name, ast); + } + + UserDefinedSQLFunctionFactory::instance().setAllFunctions(function_names_and_queries); + objects_loaded = true; + + LOG_DEBUG(log, "User defined objects loaded"); +} + + +void UserDefinedSQLObjectsLoaderFromDisk::reloadObject(UserDefinedSQLObjectType object_type, const String & object_name) +{ + createDirectory(); + auto ast = tryLoadObject(object_type, object_name); + auto & factory = UserDefinedSQLFunctionFactory::instance(); + if (ast) + factory.setFunction(object_name, *ast); + else + factory.removeFunction(object_name); +} + + +void UserDefinedSQLObjectsLoaderFromDisk::createDirectory() +{ + std::error_code create_dir_error_code; + fs::create_directories(dir_path, create_dir_error_code); + if (!fs::exists(dir_path) || !fs::is_directory(dir_path) || create_dir_error_code) + throw Exception("Couldn't create directory " + dir_path + " reason: '" + create_dir_error_code.message() + "'", ErrorCodes::DIRECTORY_DOESNT_EXIST); +} + + +bool UserDefinedSQLObjectsLoaderFromDisk::storeObject( + UserDefinedSQLObjectType object_type, + const String & object_name, + const IAST & create_object_query, + bool throw_if_exists, + bool replace_if_exists, + const Settings & settings) +{ + String file_path = getFilePath(object_type, object_name); + LOG_DEBUG(log, "Storing user-defined object {} to file {}", backQuote(object_name), file_path); + + if (fs::exists(file_path)) + { + if (throw_if_exists) + throw Exception(ErrorCodes::FUNCTION_ALREADY_EXISTS, "User-defined function '{}' already exists", object_name); + else if (!replace_if_exists) + return false; + } + + WriteBufferFromOwnString create_statement_buf; + formatAST(create_object_query, create_statement_buf, false); + writeChar('\n', create_statement_buf); + String create_statement = create_statement_buf.str(); + + String temp_file_path = file_path + ".tmp"; + + try + { + WriteBufferFromFile out(temp_file_path, create_statement.size()); + writeString(create_statement, out); + out.next(); + if (settings.fsync_metadata) + out.sync(); + out.close(); + + if (replace_if_exists) + fs::rename(temp_file_path, file_path); + else + renameNoReplace(temp_file_path, file_path); + } + catch (...) + { + fs::remove(temp_file_path); + throw; + } + + LOG_TRACE(log, "Object {} stored", backQuote(object_name)); + return true; +} + + +bool UserDefinedSQLObjectsLoaderFromDisk::removeObject( + UserDefinedSQLObjectType object_type, const String & object_name, bool throw_if_not_exists) +{ + String file_path = getFilePath(object_type, object_name); + LOG_DEBUG(log, "Removing user defined object {} stored in file {}", backQuote(object_name), file_path); + + bool existed = fs::remove(file_path); + + if (!existed) + { + if (throw_if_not_exists) + throw Exception(ErrorCodes::UNKNOWN_FUNCTION, "User-defined function '{}' doesn't exist", object_name); + else + return false; + } + + LOG_TRACE(log, "Object {} removed", backQuote(object_name)); + return true; +} + + +String UserDefinedSQLObjectsLoaderFromDisk::getFilePath(UserDefinedSQLObjectType object_type, const String & object_name) const +{ + String file_path; + switch (object_type) + { + case UserDefinedSQLObjectType::Function: + { + file_path = dir_path + "function_" + escapeForFileName(object_name) + ".sql"; + break; + } + } + return file_path; +} + +} diff --git a/src/Functions/UserDefined/UserDefinedSQLObjectsLoaderFromDisk.h b/src/Functions/UserDefined/UserDefinedSQLObjectsLoaderFromDisk.h new file mode 100644 index 00000000000..7b0bb291f42 --- /dev/null +++ b/src/Functions/UserDefined/UserDefinedSQLObjectsLoaderFromDisk.h @@ -0,0 +1,46 @@ +#pragma once + +#include +#include +#include + + +namespace DB +{ + +/// Loads user-defined sql objects from a specified folder. +class UserDefinedSQLObjectsLoaderFromDisk : public IUserDefinedSQLObjectsLoader +{ +public: + UserDefinedSQLObjectsLoaderFromDisk(const ContextPtr & global_context_, const String & dir_path_); + + void loadObjects() override; + + void reloadObjects() override; + + void reloadObject(UserDefinedSQLObjectType object_type, const String & object_name) override; + + bool storeObject( + UserDefinedSQLObjectType object_type, + const String & object_name, + const IAST & create_object_query, + bool throw_if_exists, + bool replace_if_exists, + const Settings & settings) override; + + bool removeObject(UserDefinedSQLObjectType object_type, const String & object_name, bool throw_if_not_exists) override; + +private: + void createDirectory(); + void loadObjectsImpl(); + ASTPtr tryLoadObject(UserDefinedSQLObjectType object_type, const String & object_name); + ASTPtr tryLoadObject(UserDefinedSQLObjectType object_type, const String & object_name, const String & file_path, bool check_file_exists); + String getFilePath(UserDefinedSQLObjectType object_type, const String & object_name) const; + + ContextPtr global_context; + String dir_path; + Poco::Logger * log; + std::atomic objects_loaded = false; +}; + +} diff --git a/src/Functions/UserDefined/createUserDefinedSQLObjectsLoader.cpp b/src/Functions/UserDefined/createUserDefinedSQLObjectsLoader.cpp new file mode 100644 index 00000000000..9d0137328d1 --- /dev/null +++ b/src/Functions/UserDefined/createUserDefinedSQLObjectsLoader.cpp @@ -0,0 +1,21 @@ +#include +#include +#include +#include +#include + +namespace fs = std::filesystem; + + +namespace DB +{ + +std::unique_ptr createUserDefinedSQLObjectsLoader(const ContextMutablePtr & global_context) +{ + const auto & config = global_context->getConfigRef(); + String default_path = fs::path{global_context->getPath()} / "user_defined/"; + String path = config.getString("user_defined_path", default_path); + return std::make_unique(global_context, path); +} + +} diff --git a/src/Functions/UserDefined/createUserDefinedSQLObjectsLoader.h b/src/Functions/UserDefined/createUserDefinedSQLObjectsLoader.h new file mode 100644 index 00000000000..b3a4623dba3 --- /dev/null +++ b/src/Functions/UserDefined/createUserDefinedSQLObjectsLoader.h @@ -0,0 +1,12 @@ +#pragma once + +#include + + +namespace DB +{ +class IUserDefinedSQLObjectsLoader; + +std::unique_ptr createUserDefinedSQLObjectsLoader(const ContextMutablePtr & global_context); + +} diff --git a/src/Interpreters/ActionsVisitor.cpp b/src/Interpreters/ActionsVisitor.cpp index 980207c7f9c..9a0d33b19fc 100644 --- a/src/Interpreters/ActionsVisitor.cpp +++ b/src/Interpreters/ActionsVisitor.cpp @@ -52,7 +52,7 @@ #include #include #include -#include +#include namespace DB diff --git a/src/Interpreters/Context.cpp b/src/Interpreters/Context.cpp index b08c2bab81c..1de56e950c6 100644 --- a/src/Interpreters/Context.cpp +++ b/src/Interpreters/Context.cpp @@ -57,7 +57,9 @@ #include #include #include -#include +#include +#include +#include #include #include #include @@ -186,7 +188,6 @@ struct ContextSharedPart : boost::noncopyable String user_files_path; /// Path to the directory with user provided files, usable by 'file' table function. String dictionaries_lib_path; /// Path to the directory with user provided binaries and libraries for external dictionaries. String user_scripts_path; /// Path to the directory with user provided scripts. - String user_defined_path; /// Path to the directory with user defined objects. ConfigurationPtr config; /// Global configuration settings. String tmp_path; /// Path to the temporary files that occur when processing the request. @@ -194,16 +195,18 @@ struct ContextSharedPart : boost::noncopyable mutable std::unique_ptr embedded_dictionaries; /// Metrica's dictionaries. Have lazy initialization. mutable std::unique_ptr external_dictionaries_loader; - mutable std::unique_ptr external_user_defined_executable_functions_loader; scope_guard models_repository_guard; ExternalLoaderXMLConfigRepository * external_dictionaries_config_repository = nullptr; scope_guard dictionaries_xmls; + mutable std::unique_ptr external_user_defined_executable_functions_loader; ExternalLoaderXMLConfigRepository * user_defined_executable_functions_config_repository = nullptr; scope_guard user_defined_executable_functions_xmls; + mutable std::unique_ptr user_defined_sql_objects_loader; + #if USE_NLP mutable std::optional synonyms_extensions; mutable std::optional lemmatizers; @@ -420,6 +423,8 @@ struct ContextSharedPart : boost::noncopyable external_dictionaries_loader->enablePeriodicUpdates(false); if (external_user_defined_executable_functions_loader) external_user_defined_executable_functions_loader->enablePeriodicUpdates(false); + if (user_defined_sql_objects_loader) + user_defined_sql_objects_loader->stopWatching(); Session::shutdownNamedSessions(); @@ -450,6 +455,7 @@ struct ContextSharedPart : boost::noncopyable std::unique_ptr delete_embedded_dictionaries; std::unique_ptr delete_external_dictionaries_loader; std::unique_ptr delete_external_user_defined_executable_functions_loader; + std::unique_ptr delete_user_defined_sql_objects_loader; std::unique_ptr delete_buffer_flush_schedule_pool; std::unique_ptr delete_schedule_pool; std::unique_ptr delete_distributed_schedule_pool; @@ -488,6 +494,7 @@ struct ContextSharedPart : boost::noncopyable delete_embedded_dictionaries = std::move(embedded_dictionaries); delete_external_dictionaries_loader = std::move(external_dictionaries_loader); delete_external_user_defined_executable_functions_loader = std::move(external_user_defined_executable_functions_loader); + delete_user_defined_sql_objects_loader = std::move(user_defined_sql_objects_loader); delete_buffer_flush_schedule_pool = std::move(buffer_flush_schedule_pool); delete_schedule_pool = std::move(schedule_pool); delete_distributed_schedule_pool = std::move(distributed_schedule_pool); @@ -515,6 +522,7 @@ struct ContextSharedPart : boost::noncopyable delete_embedded_dictionaries.reset(); delete_external_dictionaries_loader.reset(); delete_external_user_defined_executable_functions_loader.reset(); + delete_user_defined_sql_objects_loader.reset(); delete_ddl_worker.reset(); delete_buffer_flush_schedule_pool.reset(); delete_schedule_pool.reset(); @@ -658,12 +666,6 @@ String Context::getUserScriptsPath() const return shared->user_scripts_path; } -String Context::getUserDefinedPath() const -{ - auto lock = getLock(); - return shared->user_defined_path; -} - Strings Context::getWarnings() const { Strings common_warnings; @@ -726,9 +728,6 @@ void Context::setPath(const String & path) if (shared->user_scripts_path.empty()) shared->user_scripts_path = shared->path + "user_scripts/"; - - if (shared->user_defined_path.empty()) - shared->user_defined_path = shared->path + "user_defined/"; } VolumePtr Context::setTemporaryStorage(const String & path, const String & policy_name, size_t max_size) @@ -804,12 +803,6 @@ void Context::setUserScriptsPath(const String & path) shared->user_scripts_path = path; } -void Context::setUserDefinedPath(const String & path) -{ - auto lock = getLock(); - shared->user_defined_path = path; -} - void Context::addWarningMessage(const String & msg) const { auto lock = getLock(); @@ -1652,6 +1645,22 @@ void Context::loadOrReloadUserDefinedExecutableFunctions(const Poco::Util::Abstr shared->user_defined_executable_functions_xmls = external_user_defined_executable_functions_loader.addConfigRepository(std::move(repository)); } +const IUserDefinedSQLObjectsLoader & Context::getUserDefinedSQLObjectsLoader() const +{ + auto lock = getLock(); + if (!shared->user_defined_sql_objects_loader) + shared->user_defined_sql_objects_loader = createUserDefinedSQLObjectsLoader(getGlobalContext()); + return *shared->user_defined_sql_objects_loader; +} + +IUserDefinedSQLObjectsLoader & Context::getUserDefinedSQLObjectsLoader() +{ + auto lock = getLock(); + if (!shared->user_defined_sql_objects_loader) + shared->user_defined_sql_objects_loader = createUserDefinedSQLObjectsLoader(getGlobalContext()); + return *shared->user_defined_sql_objects_loader; +} + #if USE_NLP SynonymsExtensions & Context::getSynonymsExtensions() const @@ -3410,7 +3419,7 @@ void Context::initializeBackgroundExecutorsIfNeeded() size_t background_merges_mutations_concurrency_ratio = 2; if (config.has("background_merges_mutations_concurrency_ratio")) background_merges_mutations_concurrency_ratio = config.getUInt64("background_merges_mutations_concurrency_ratio"); - else if (config.has("profiles.default.background_pool_size")) + else if (config.has("profiles.default.background_merges_mutations_concurrency_ratio")) background_merges_mutations_concurrency_ratio = config.getUInt64("profiles.default.background_merges_mutations_concurrency_ratio"); size_t background_move_pool_size = 8; diff --git a/src/Interpreters/Context.h b/src/Interpreters/Context.h index 7711ea34dc7..233f4011ce3 100644 --- a/src/Interpreters/Context.h +++ b/src/Interpreters/Context.h @@ -54,6 +54,7 @@ enum class RowPolicyFilterType; class EmbeddedDictionaries; class ExternalDictionariesLoader; class ExternalUserDefinedExecutableFunctionsLoader; +class IUserDefinedSQLObjectsLoader; class InterserverCredentials; using InterserverCredentialsPtr = std::shared_ptr; class InterserverIOHandler; @@ -435,7 +436,6 @@ public: String getUserFilesPath() const; String getDictionariesLibPath() const; String getUserScriptsPath() const; - String getUserDefinedPath() const; /// A list of warnings about server configuration to place in `system.warnings` table. Strings getWarnings() const; @@ -450,7 +450,6 @@ public: void setUserFilesPath(const String & path); void setDictionariesLibPath(const String & path); void setUserScriptsPath(const String & path); - void setUserDefinedPath(const String & path); void addWarningMessage(const String & msg) const; @@ -653,16 +652,19 @@ public: /// Returns the current constraints (can return null). std::shared_ptr getSettingsConstraintsAndCurrentProfiles() const; - const EmbeddedDictionaries & getEmbeddedDictionaries() const; const ExternalDictionariesLoader & getExternalDictionariesLoader() const; - const ExternalUserDefinedExecutableFunctionsLoader & getExternalUserDefinedExecutableFunctionsLoader() const; - EmbeddedDictionaries & getEmbeddedDictionaries(); ExternalDictionariesLoader & getExternalDictionariesLoader(); ExternalDictionariesLoader & getExternalDictionariesLoaderUnlocked(); - ExternalUserDefinedExecutableFunctionsLoader & getExternalUserDefinedExecutableFunctionsLoader(); - ExternalUserDefinedExecutableFunctionsLoader & getExternalUserDefinedExecutableFunctionsLoaderUnlocked(); + const EmbeddedDictionaries & getEmbeddedDictionaries() const; + EmbeddedDictionaries & getEmbeddedDictionaries(); void tryCreateEmbeddedDictionaries(const Poco::Util::AbstractConfiguration & config) const; void loadOrReloadDictionaries(const Poco::Util::AbstractConfiguration & config); + + const ExternalUserDefinedExecutableFunctionsLoader & getExternalUserDefinedExecutableFunctionsLoader() const; + ExternalUserDefinedExecutableFunctionsLoader & getExternalUserDefinedExecutableFunctionsLoader(); + ExternalUserDefinedExecutableFunctionsLoader & getExternalUserDefinedExecutableFunctionsLoaderUnlocked(); + const IUserDefinedSQLObjectsLoader & getUserDefinedSQLObjectsLoader() const; + IUserDefinedSQLObjectsLoader & getUserDefinedSQLObjectsLoader(); void loadOrReloadUserDefinedExecutableFunctions(const Poco::Util::AbstractConfiguration & config); #if USE_NLP diff --git a/src/Interpreters/InterpreterCreateFunctionQuery.cpp b/src/Interpreters/InterpreterCreateFunctionQuery.cpp index dfd18ad28de..d56b5029e41 100644 --- a/src/Interpreters/InterpreterCreateFunctionQuery.cpp +++ b/src/Interpreters/InterpreterCreateFunctionQuery.cpp @@ -1,16 +1,11 @@ #include #include +#include +#include #include -#include -#include -#include -#include -#include #include #include -#include -#include namespace DB @@ -18,13 +13,11 @@ namespace DB namespace ErrorCodes { - extern const int CANNOT_CREATE_RECURSIVE_FUNCTION; - extern const int UNSUPPORTED_METHOD; + extern const int INCORRECT_QUERY; } BlockIO InterpreterCreateFunctionQuery::execute() { - FunctionNameNormalizer().visit(query_ptr.get()); ASTCreateFunctionQuery & create_function_query = query_ptr->as(); AccessRightsElements access_rights_elements; @@ -33,80 +26,27 @@ BlockIO InterpreterCreateFunctionQuery::execute() if (create_function_query.or_replace) access_rights_elements.emplace_back(AccessType::DROP_FUNCTION); + auto current_context = getContext(); + if (!create_function_query.cluster.empty()) { + if (current_context->getUserDefinedSQLObjectsLoader().isReplicated()) + throw Exception(ErrorCodes::INCORRECT_QUERY, "ON CLUSTER is not allowed because used-defined functions are replicated automatically"); + DDLQueryOnClusterParams params; params.access_to_check = std::move(access_rights_elements); - return executeDDLQueryOnCluster(query_ptr, getContext(), params); + return executeDDLQueryOnCluster(query_ptr, current_context, params); } - auto current_context = getContext(); current_context->checkAccess(access_rights_elements); - auto & user_defined_function_factory = UserDefinedSQLFunctionFactory::instance(); - auto function_name = create_function_query.getFunctionName(); + bool throw_if_exists = !create_function_query.if_not_exists && !create_function_query.or_replace; + bool replace_if_exists = create_function_query.or_replace; - bool if_not_exists = create_function_query.if_not_exists; - bool replace = create_function_query.or_replace; - - create_function_query.if_not_exists = false; - create_function_query.or_replace = false; - - validateFunction(create_function_query.function_core, function_name); - user_defined_function_factory.registerFunction(current_context, function_name, query_ptr, replace, if_not_exists, persist_function); + UserDefinedSQLFunctionFactory::instance().registerFunction(current_context, function_name, query_ptr, throw_if_exists, replace_if_exists); return {}; } -void InterpreterCreateFunctionQuery::validateFunction(ASTPtr function, const String & name) -{ - ASTFunction * lambda_function = function->as(); - - if (!lambda_function) - throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Expected function, got: {}", function->formatForErrorMessage()); - - auto & lambda_function_expression_list = lambda_function->arguments->children; - - if (lambda_function_expression_list.size() != 2) - throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Lambda must have arguments and body"); - - const ASTFunction * tuple_function_arguments = lambda_function_expression_list[0]->as(); - - if (!tuple_function_arguments || !tuple_function_arguments->arguments) - throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Lambda must have valid arguments"); - - std::unordered_set arguments; - - for (const auto & argument : tuple_function_arguments->arguments->children) - { - const auto * argument_identifier = argument->as(); - - if (!argument_identifier) - throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Lambda argument must be identifier"); - - const auto & argument_name = argument_identifier->name(); - auto [_, inserted] = arguments.insert(argument_name); - if (!inserted) - throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Identifier {} already used as function parameter", argument_name); - } - - ASTPtr function_body = lambda_function_expression_list[1]; - if (!function_body) - throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Lambda must have valid function body"); - - validateFunctionRecursiveness(function_body, name); -} - -void InterpreterCreateFunctionQuery::validateFunctionRecursiveness(ASTPtr node, const String & function_to_create) -{ - for (const auto & child : node->children) - { - auto function_name_opt = tryGetFunctionName(child); - if (function_name_opt && function_name_opt.value() == function_to_create) - throw Exception(ErrorCodes::CANNOT_CREATE_RECURSIVE_FUNCTION, "You cannot create recursive function"); - - validateFunctionRecursiveness(child, function_to_create); - } -} } diff --git a/src/Interpreters/InterpreterCreateFunctionQuery.h b/src/Interpreters/InterpreterCreateFunctionQuery.h index a67fdb9605d..d5fedd5ca6b 100644 --- a/src/Interpreters/InterpreterCreateFunctionQuery.h +++ b/src/Interpreters/InterpreterCreateFunctionQuery.h @@ -8,24 +8,18 @@ namespace DB class Context; -class InterpreterCreateFunctionQuery : public IInterpreter, WithContext +class InterpreterCreateFunctionQuery : public IInterpreter, WithMutableContext { public: - InterpreterCreateFunctionQuery(const ASTPtr & query_ptr_, ContextPtr context_, bool persist_function_) - : WithContext(context_) - , query_ptr(query_ptr_) - , persist_function(persist_function_) {} + InterpreterCreateFunctionQuery(const ASTPtr & query_ptr_, ContextMutablePtr context_) + : WithMutableContext(context_), query_ptr(query_ptr_) + { + } BlockIO execute() override; - void setInternal(bool internal_); - private: - static void validateFunction(ASTPtr function, const String & name); - static void validateFunctionRecursiveness(ASTPtr node, const String & function_to_create); - ASTPtr query_ptr; - bool persist_function; }; } diff --git a/src/Interpreters/InterpreterDropFunctionQuery.cpp b/src/Interpreters/InterpreterDropFunctionQuery.cpp index bb2032f355a..df81ae661c7 100644 --- a/src/Interpreters/InterpreterDropFunctionQuery.cpp +++ b/src/Interpreters/InterpreterDropFunctionQuery.cpp @@ -1,17 +1,22 @@ #include #include +#include +#include #include #include #include -#include -#include #include namespace DB { +namespace ErrorCodes +{ + extern const int INCORRECT_QUERY; +} + BlockIO InterpreterDropFunctionQuery::execute() { FunctionNameNormalizer().visit(query_ptr.get()); @@ -20,17 +25,23 @@ BlockIO InterpreterDropFunctionQuery::execute() AccessRightsElements access_rights_elements; access_rights_elements.emplace_back(AccessType::DROP_FUNCTION); + auto current_context = getContext(); + if (!drop_function_query.cluster.empty()) { + if (current_context->getUserDefinedSQLObjectsLoader().isReplicated()) + throw Exception(ErrorCodes::INCORRECT_QUERY, "ON CLUSTER is not allowed because used-defined functions are replicated automatically"); + DDLQueryOnClusterParams params; params.access_to_check = std::move(access_rights_elements); - return executeDDLQueryOnCluster(query_ptr, getContext(), params); + return executeDDLQueryOnCluster(query_ptr, current_context, params); } - auto current_context = getContext(); current_context->checkAccess(access_rights_elements); - UserDefinedSQLFunctionFactory::instance().unregisterFunction(current_context, drop_function_query.function_name, drop_function_query.if_exists); + bool throw_if_not_exists = !drop_function_query.if_exists; + + UserDefinedSQLFunctionFactory::instance().unregisterFunction(current_context, drop_function_query.function_name, throw_if_not_exists); return {}; } diff --git a/src/Interpreters/InterpreterFactory.cpp b/src/Interpreters/InterpreterFactory.cpp index 170f3c463b4..ca0a59c0c1a 100644 --- a/src/Interpreters/InterpreterFactory.cpp +++ b/src/Interpreters/InterpreterFactory.cpp @@ -296,7 +296,7 @@ std::unique_ptr InterpreterFactory::get(ASTPtr & query, ContextMut } else if (query->as()) { - return std::make_unique(query, context, true /*persist_function*/); + return std::make_unique(query, context); } else if (query->as()) { diff --git a/src/Interpreters/InterpreterSystemQuery.cpp b/src/Interpreters/InterpreterSystemQuery.cpp index dcb8f0eaadb..d05fd70e074 100644 --- a/src/Interpreters/InterpreterSystemQuery.cpp +++ b/src/Interpreters/InterpreterSystemQuery.cpp @@ -12,7 +12,7 @@ #include #include #include -#include +#include #include #include #include diff --git a/src/Interpreters/TreeOptimizer.cpp b/src/Interpreters/TreeOptimizer.cpp index 74f084df40b..e4301bad1e8 100644 --- a/src/Interpreters/TreeOptimizer.cpp +++ b/src/Interpreters/TreeOptimizer.cpp @@ -23,7 +23,6 @@ #include #include #include -#include #include #include @@ -35,6 +34,7 @@ #include #include +#include #include #include diff --git a/src/Interpreters/TreeRewriter.cpp b/src/Interpreters/TreeRewriter.cpp index 2f5bfd00938..ac49d79c6ba 100644 --- a/src/Interpreters/TreeRewriter.cpp +++ b/src/Interpreters/TreeRewriter.cpp @@ -24,13 +24,14 @@ #include #include #include -#include -#include #include #include #include #include +#include +#include + #include #include #include diff --git a/src/Interpreters/UserDefinedSQLFunctionFactory.cpp b/src/Interpreters/UserDefinedSQLFunctionFactory.cpp deleted file mode 100644 index 2f876f00cc3..00000000000 --- a/src/Interpreters/UserDefinedSQLFunctionFactory.cpp +++ /dev/null @@ -1,168 +0,0 @@ -#include "UserDefinedSQLFunctionFactory.h" - -#include - -#include -#include -#include -#include -#include - - -namespace DB -{ - -namespace ErrorCodes -{ - extern const int FUNCTION_ALREADY_EXISTS; - extern const int UNKNOWN_FUNCTION; - extern const int CANNOT_DROP_FUNCTION; -} - -UserDefinedSQLFunctionFactory & UserDefinedSQLFunctionFactory::instance() -{ - static UserDefinedSQLFunctionFactory result; - return result; -} - -void UserDefinedSQLFunctionFactory::registerFunction(ContextPtr context, const String & function_name, ASTPtr create_function_query, bool replace, bool if_not_exists, bool persist) -{ - if (FunctionFactory::instance().hasNameOrAlias(function_name)) - { - if (if_not_exists) - return; - - throw Exception(ErrorCodes::FUNCTION_ALREADY_EXISTS, "The function '{}' already exists", function_name); - } - - if (AggregateFunctionFactory::instance().hasNameOrAlias(function_name)) - { - if (if_not_exists) - return; - - throw Exception(ErrorCodes::FUNCTION_ALREADY_EXISTS, "The aggregate function '{}' already exists", function_name); - } - - if (UserDefinedExecutableFunctionFactory::instance().has(function_name, context)) - { - if (if_not_exists) - return; - - throw Exception(ErrorCodes::FUNCTION_ALREADY_EXISTS, "User defined executable function '{}' already exists", function_name); - } - - std::lock_guard lock(mutex); - - auto [it, inserted] = function_name_to_create_query.emplace(function_name, create_function_query); - - if (!inserted) - { - if (if_not_exists) - return; - - if (replace) - it->second = create_function_query; - else - throw Exception(ErrorCodes::FUNCTION_ALREADY_EXISTS, - "The function name '{}' is not unique", - function_name); - } - - if (persist) - { - try - { - UserDefinedSQLObjectsLoader::instance().storeObject(context, UserDefinedSQLObjectType::Function, function_name, *create_function_query, replace); - } - catch (Exception & exception) - { - function_name_to_create_query.erase(it); - exception.addMessage(fmt::format("while storing user defined function {} on disk", backQuote(function_name))); - throw; - } - } -} - -void UserDefinedSQLFunctionFactory::unregisterFunction(ContextPtr context, const String & function_name, bool if_exists) -{ - if (FunctionFactory::instance().hasNameOrAlias(function_name) || - AggregateFunctionFactory::instance().hasNameOrAlias(function_name)) - throw Exception(ErrorCodes::CANNOT_DROP_FUNCTION, "Cannot drop system function '{}'", function_name); - - if (UserDefinedExecutableFunctionFactory::instance().has(function_name, context)) - throw Exception(ErrorCodes::CANNOT_DROP_FUNCTION, "Cannot drop user defined executable function '{}'", function_name); - - std::lock_guard lock(mutex); - - auto it = function_name_to_create_query.find(function_name); - if (it == function_name_to_create_query.end()) - { - if (if_exists) - return; - - throw Exception(ErrorCodes::UNKNOWN_FUNCTION, - "The function name '{}' is not registered", - function_name); - } - - try - { - UserDefinedSQLObjectsLoader::instance().removeObject(context, UserDefinedSQLObjectType::Function, function_name); - } - catch (Exception & exception) - { - exception.addMessage(fmt::format("while removing user defined function {} from disk", backQuote(function_name))); - throw; - } - - function_name_to_create_query.erase(it); -} - -ASTPtr UserDefinedSQLFunctionFactory::get(const String & function_name) const -{ - std::lock_guard lock(mutex); - - auto it = function_name_to_create_query.find(function_name); - if (it == function_name_to_create_query.end()) - throw Exception(ErrorCodes::UNKNOWN_FUNCTION, - "The function name '{}' is not registered", - function_name); - - return it->second; -} - -ASTPtr UserDefinedSQLFunctionFactory::tryGet(const std::string & function_name) const -{ - std::lock_guard lock(mutex); - - auto it = function_name_to_create_query.find(function_name); - if (it == function_name_to_create_query.end()) - return nullptr; - - return it->second; -} - -bool UserDefinedSQLFunctionFactory::has(const String & function_name) const -{ - return tryGet(function_name) != nullptr; -} - -std::vector UserDefinedSQLFunctionFactory::getAllRegisteredNames() const -{ - std::vector registered_names; - - std::lock_guard lock(mutex); - registered_names.reserve(function_name_to_create_query.size()); - - for (const auto & [name, _] : function_name_to_create_query) - registered_names.emplace_back(name); - - return registered_names; -} - -bool UserDefinedSQLFunctionFactory::empty() const -{ - std::lock_guard lock(mutex); - return function_name_to_create_query.empty(); -} -} diff --git a/src/Interpreters/UserDefinedSQLFunctionFactory.h b/src/Interpreters/UserDefinedSQLFunctionFactory.h deleted file mode 100644 index db43bb7298e..00000000000 --- a/src/Interpreters/UserDefinedSQLFunctionFactory.h +++ /dev/null @@ -1,54 +0,0 @@ -#pragma once - -#include -#include - -#include - -#include -#include - - -namespace DB -{ - -/// Factory for SQLUserDefinedFunctions -class UserDefinedSQLFunctionFactory : public IHints<1, UserDefinedSQLFunctionFactory> -{ -public: - static UserDefinedSQLFunctionFactory & instance(); - - /** Register function for function_name in factory for specified create_function_query. - * If function exists and if_not_exists = false and replace = false throws exception. - * If replace = true and sql user defined function with function_name already exists replace it with create_function_query. - * If persist = true persist function on disk. - */ - void registerFunction(ContextPtr context, const String & function_name, ASTPtr create_function_query, bool replace, bool if_not_exists, bool persist); - - /** Unregister function for function_name. - * If if_exists = true then do not throw exception if function is not registered. - * If if_exists = false then throw exception if function is not registered. - */ - void unregisterFunction(ContextPtr context, const String & function_name, bool if_exists); - - /// Get function create query for function_name. If no function registered with function_name throws exception. - ASTPtr get(const String & function_name) const; - - /// Get function create query for function_name. If no function registered with function_name return nullptr. - ASTPtr tryGet(const String & function_name) const; - - /// Check if function with function_name registered. - bool has(const String & function_name) const; - - /// Get all user defined functions registered names. - std::vector getAllRegisteredNames() const override; - - /// Check whether any UDFs have been registered - bool empty() const; - -private: - std::unordered_map function_name_to_create_query; - mutable std::mutex mutex; -}; - -} diff --git a/src/Interpreters/UserDefinedSQLObjectsLoader.cpp b/src/Interpreters/UserDefinedSQLObjectsLoader.cpp deleted file mode 100644 index c6f50fc4a0a..00000000000 --- a/src/Interpreters/UserDefinedSQLObjectsLoader.cpp +++ /dev/null @@ -1,184 +0,0 @@ -#include "UserDefinedSQLObjectsLoader.h" - -#include - -#include -#include -#include - -#include -#include -#include -#include - -#include -#include - -#include -#include -#include -#include - -#include -#include -#include - - -namespace DB -{ - -namespace ErrorCodes -{ - extern const int OBJECT_ALREADY_STORED_ON_DISK; - extern const int OBJECT_WAS_NOT_STORED_ON_DISK; -} - -UserDefinedSQLObjectsLoader & UserDefinedSQLObjectsLoader::instance() -{ - static UserDefinedSQLObjectsLoader ret; - return ret; -} - -UserDefinedSQLObjectsLoader::UserDefinedSQLObjectsLoader() - : log(&Poco::Logger::get("UserDefinedSQLObjectsLoader")) -{} - -void UserDefinedSQLObjectsLoader::loadUserDefinedObject(ContextPtr context, UserDefinedSQLObjectType object_type, std::string_view name, const String & path) -{ - auto name_ref = StringRef(name.data(), name.size()); - LOG_DEBUG(log, "Loading user defined object {} from file {}", backQuote(name_ref), path); - - /// There is .sql file with user defined object creation statement. - ReadBufferFromFile in(path); - - String object_create_query; - readStringUntilEOF(object_create_query, in); - - try - { - switch (object_type) - { - case UserDefinedSQLObjectType::Function: - { - ParserCreateFunctionQuery parser; - ASTPtr ast = parseQuery( - parser, - object_create_query.data(), - object_create_query.data() + object_create_query.size(), - "in file " + path, - 0, - context->getSettingsRef().max_parser_depth); - - InterpreterCreateFunctionQuery interpreter(ast, context, false /*persist_function*/); - interpreter.execute(); - } - } - } - catch (Exception & e) - { - e.addMessage(fmt::format("while loading user defined objects {} from path {}", backQuote(name_ref), path)); - throw; - } -} - -void UserDefinedSQLObjectsLoader::loadObjects(ContextPtr context) -{ - if (unlikely(!enable_persistence)) - return; - - LOG_DEBUG(log, "Loading user defined objects"); - - String dir_path = context->getUserDefinedPath(); - Poco::DirectoryIterator dir_end; - for (Poco::DirectoryIterator it(dir_path); it != dir_end; ++it) - { - if (it->isDirectory()) - continue; - - const std::string & file_name = it.name(); - - /// For '.svn', '.gitignore' directory and similar. - if (file_name.at(0) == '.') - continue; - - if (!startsWith(file_name, "function_") || !endsWith(file_name, ".sql")) - continue; - - std::string_view object_name = file_name; - - object_name.remove_prefix(strlen("function_")); - object_name.remove_suffix(strlen(".sql")); - - if (object_name.empty()) - continue; - - loadUserDefinedObject(context, UserDefinedSQLObjectType::Function, object_name, dir_path + it.name()); - } -} - -void UserDefinedSQLObjectsLoader::storeObject(ContextPtr context, UserDefinedSQLObjectType object_type, const String & object_name, const IAST & ast, bool replace) -{ - if (unlikely(!enable_persistence)) - return; - - String dir_path = context->getUserDefinedPath(); - String file_path; - - switch (object_type) - { - case UserDefinedSQLObjectType::Function: - { - file_path = dir_path + "function_" + escapeForFileName(object_name) + ".sql"; - } - } - - if (!replace && std::filesystem::exists(file_path)) - throw Exception(ErrorCodes::OBJECT_ALREADY_STORED_ON_DISK, "User defined object {} already stored on disk", backQuote(file_path)); - - LOG_DEBUG(log, "Storing object {} to file {}", backQuote(object_name), file_path); - - WriteBufferFromOwnString create_statement_buf; - formatAST(ast, create_statement_buf, false); - writeChar('\n', create_statement_buf); - String create_statement = create_statement_buf.str(); - - WriteBufferFromFile out(file_path, create_statement.size()); - writeString(create_statement, out); - out.next(); - if (context->getSettingsRef().fsync_metadata) - out.sync(); - out.close(); - - LOG_DEBUG(log, "Stored object {}", backQuote(object_name)); -} - -void UserDefinedSQLObjectsLoader::removeObject(ContextPtr context, UserDefinedSQLObjectType object_type, const String & object_name) -{ - if (unlikely(!enable_persistence)) - return; - - String dir_path = context->getUserDefinedPath(); - LOG_DEBUG(log, "Removing file for user defined object {} from {}", backQuote(object_name), dir_path); - - std::filesystem::path file_path; - - switch (object_type) - { - case UserDefinedSQLObjectType::Function: - { - file_path = dir_path + "function_" + escapeForFileName(object_name) + ".sql"; - } - } - - if (!std::filesystem::exists(file_path)) - throw Exception(ErrorCodes::OBJECT_WAS_NOT_STORED_ON_DISK, "User defined object {} was not stored on disk", backQuote(file_path.string())); - - std::filesystem::remove(file_path); -} - -void UserDefinedSQLObjectsLoader::enable(bool enable_persistence_) -{ - enable_persistence = enable_persistence_; -} - -} diff --git a/src/Interpreters/UserDefinedSQLObjectsLoader.h b/src/Interpreters/UserDefinedSQLObjectsLoader.h deleted file mode 100644 index 9dfba1181c1..00000000000 --- a/src/Interpreters/UserDefinedSQLObjectsLoader.h +++ /dev/null @@ -1,37 +0,0 @@ -#pragma once - -#include -#include - -#include - - -namespace DB -{ - -enum class UserDefinedSQLObjectType -{ - Function -}; - -class UserDefinedSQLObjectsLoader : private boost::noncopyable -{ -public: - static UserDefinedSQLObjectsLoader & instance(); - UserDefinedSQLObjectsLoader(); - - void loadObjects(ContextPtr context); - void storeObject(ContextPtr context, UserDefinedSQLObjectType object_type, const String & object_name, const IAST & ast, bool replace); - void removeObject(ContextPtr context, UserDefinedSQLObjectType object_type, const String & object_name); - - /// For ClickHouse local if path is not set we can disable loader. - void enable(bool enable_persistence); - -private: - - void loadUserDefinedObject(ContextPtr context, UserDefinedSQLObjectType object_type, std::string_view object_name, const String & file_path); - Poco::Logger * log; - bool enable_persistence = true; -}; - -} diff --git a/src/Interpreters/executeQuery.cpp b/src/Interpreters/executeQuery.cpp index 63641d4bdcb..86686b3eb13 100644 --- a/src/Interpreters/executeQuery.cpp +++ b/src/Interpreters/executeQuery.cpp @@ -549,15 +549,9 @@ static std::tuple executeQueryImpl( if (insert_query) { if (insert_query->table_id) - { insert_query->table_id = context->resolveStorageID(insert_query->table_id); - LOG_DEBUG(&Poco::Logger::get("executeQuery"), "2) database: {}", insert_query->table_id.getDatabaseName()); - } else if (auto table = insert_query->getTable(); !table.empty()) - { insert_query->table_id = context->resolveStorageID(StorageID{insert_query->getDatabase(), table}); - LOG_DEBUG(&Poco::Logger::get("executeQuery"), "2) database: {}", insert_query->table_id.getDatabaseName()); - } } if (insert_query && insert_query->select) diff --git a/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.cpp b/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.cpp index 0c150750e09..db5a027844b 100644 --- a/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.cpp @@ -214,7 +214,7 @@ bool JSONEachRowRowInputFormat::readRow(MutableColumns & columns, RowReadExtensi seen_columns.assign(num_columns, false); nested_prefix_length = 0; - readRowStart(); + readRowStart(columns); readJSONObject(columns); const auto & header = getPort().getHeader(); diff --git a/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.h b/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.h index 59447180f77..4e2946cfea6 100644 --- a/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.h +++ b/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.h @@ -48,7 +48,7 @@ private: void readJSONObject(MutableColumns & columns); void readNestedData(const String & name, MutableColumns & columns); - virtual void readRowStart() {} + virtual void readRowStart(MutableColumns &) {} virtual bool checkEndOfData(bool is_first_row); const FormatSettings format_settings; @@ -66,10 +66,6 @@ private: /// the nested column names are 'n.i' and 'n.s' and the nested prefix is 'n.' size_t nested_prefix_length = 0; - /// Set of columns for which the values were read. The rest will be filled with default values. - std::vector read_columns; - /// Set of columns which already met in row. Exception is thrown if there are more than one column with the same name. - std::vector seen_columns; /// These sets may be different, because if null_as_default=1 read_columns[i] will be false and seen_columns[i] will be true /// for row like {..., "non-nullable column name" : null, ...} @@ -85,6 +81,12 @@ private: bool yield_strings; protected: + + /// Set of columns for which the values were read. The rest will be filled with default values. + std::vector read_columns; + /// Set of columns which already met in row. Exception is thrown if there are more than one column with the same name. + std::vector seen_columns; + /// This flag is needed to know if data is in square brackets. bool data_in_square_brackets = false; }; diff --git a/src/Processors/Formats/Impl/JSONObjectEachRowRowInputFormat.cpp b/src/Processors/Formats/Impl/JSONObjectEachRowRowInputFormat.cpp index 5ca1ba33c27..6e6d6287840 100644 --- a/src/Processors/Formats/Impl/JSONObjectEachRowRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/JSONObjectEachRowRowInputFormat.cpp @@ -2,12 +2,39 @@ #include #include #include +#include namespace DB { +namespace ErrorCodes +{ + extern const int BAD_ARGUMENTS; +} + +std::optional getColumnIndexForJSONObjectEachRowObjectName(const Block & header, const FormatSettings & format_settings) +{ + if (format_settings.json_object_each_row.column_for_object_name.empty()) + return std::nullopt; + + if (!header.has(format_settings.json_object_each_row.column_for_object_name)) + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "Column name '{}' from setting format_json_object_each_row_column_for_object_name doesn't exists in header", + format_settings.json_object_each_row.column_for_object_name); + + size_t index = header.getPositionByName(format_settings.json_object_each_row.column_for_object_name); + if (!isStringOrFixedString(header.getDataTypes()[index])) + throw Exception( + ErrorCodes::BAD_ARGUMENTS, + "Column '{}' from setting json_object_each_row_column_for_object_name must have String type", + format_settings.json_object_each_row.column_for_object_name); + + return index; +} + JSONObjectEachRowInputFormat::JSONObjectEachRowInputFormat(ReadBuffer & in_, const Block & header_, Params params_, const FormatSettings & format_settings_) - : JSONEachRowRowInputFormat(in_, header_, params_, format_settings_, false) + : JSONEachRowRowInputFormat(in_, header_, params_, format_settings_, false), field_index_for_object_name(getColumnIndexForJSONObjectEachRowObjectName(header_, format_settings_)) { } @@ -16,9 +43,15 @@ void JSONObjectEachRowInputFormat::readPrefix() JSONUtils::skipObjectStart(*in); } -void JSONObjectEachRowInputFormat::readRowStart() +void JSONObjectEachRowInputFormat::readRowStart(MutableColumns & columns) { - JSONUtils::readFieldName(*in); + auto object_name = JSONUtils::readFieldName(*in); + if (field_index_for_object_name) + { + columns[*field_index_for_object_name]->insertData(object_name.data(), object_name.size()); + seen_columns[*field_index_for_object_name] = true; + read_columns[*field_index_for_object_name] = true; + } } bool JSONObjectEachRowInputFormat::checkEndOfData(bool is_first_row) @@ -30,7 +63,6 @@ bool JSONObjectEachRowInputFormat::checkEndOfData(bool is_first_row) return false; } - JSONObjectEachRowSchemaReader::JSONObjectEachRowSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_) : IRowWithNamesSchemaReader(in_, format_settings_) { @@ -53,7 +85,10 @@ NamesAndTypesList JSONObjectEachRowSchemaReader::readRowAndGetNamesAndDataTypes( JSONUtils::skipComma(in); JSONUtils::readFieldName(in); - return JSONUtils::readRowAndGetNamesAndDataTypesForJSONEachRow(in, format_settings, false); + auto names_and_types = JSONUtils::readRowAndGetNamesAndDataTypesForJSONEachRow(in, format_settings, false); + if (!format_settings.json_object_each_row.column_for_object_name.empty()) + names_and_types.emplace_front(format_settings.json_object_each_row.column_for_object_name, std::make_shared()); + return names_and_types; } void JSONObjectEachRowSchemaReader::transformTypesIfNeeded(DataTypePtr & type, DataTypePtr & new_type) @@ -83,7 +118,8 @@ void registerJSONObjectEachRowSchemaReader(FormatFactory & factory) }); factory.registerAdditionalInfoForSchemaCacheGetter("JSONObjectEachRow", [](const FormatSettings & settings) { - return getAdditionalFormatInfoByEscapingRule(settings, FormatSettings::EscapingRule::JSON); + return getAdditionalFormatInfoByEscapingRule(settings, FormatSettings::EscapingRule::JSON) + + fmt::format(", format_json_object_each_row_column_for_object_name={}", settings.json_object_each_row.column_for_object_name); }); } diff --git a/src/Processors/Formats/Impl/JSONObjectEachRowRowInputFormat.h b/src/Processors/Formats/Impl/JSONObjectEachRowRowInputFormat.h index fd98f43649f..466c0111a03 100644 --- a/src/Processors/Formats/Impl/JSONObjectEachRowRowInputFormat.h +++ b/src/Processors/Formats/Impl/JSONObjectEachRowRowInputFormat.h @@ -27,8 +27,10 @@ public: private: void readPrefix() override; void readSuffix() override {} - void readRowStart() override; + void readRowStart(MutableColumns & columns) override; bool checkEndOfData(bool is_first_row) override; + + std::optional field_index_for_object_name; }; @@ -44,4 +46,6 @@ private: bool first_row = true; }; +std::optional getColumnIndexForJSONObjectEachRowObjectName(const Block & header, const FormatSettings & settings); + } diff --git a/src/Processors/Formats/Impl/JSONObjectEachRowRowOutputFormat.cpp b/src/Processors/Formats/Impl/JSONObjectEachRowRowOutputFormat.cpp index 10c1e9beda5..6155efd4b63 100644 --- a/src/Processors/Formats/Impl/JSONObjectEachRowRowOutputFormat.cpp +++ b/src/Processors/Formats/Impl/JSONObjectEachRowRowOutputFormat.cpp @@ -1,4 +1,5 @@ #include +#include #include #include @@ -6,10 +7,38 @@ namespace DB { JSONObjectEachRowRowOutputFormat::JSONObjectEachRowRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & settings_) - : JSONEachRowRowOutputFormat(out_, header_, params_, settings_) + : JSONEachRowRowOutputFormat(out_, header_, params_, settings_), field_index_for_object_name(getColumnIndexForJSONObjectEachRowObjectName(header_, settings_)) { } +void JSONObjectEachRowRowOutputFormat::writeField(const IColumn & column, const ISerialization & serialization, size_t row) +{ + if (field_number == field_index_for_object_name) + { + ++field_number; + return; + } + JSONEachRowRowOutputFormat::writeField(column, serialization, row); +} + +void JSONObjectEachRowRowOutputFormat::write(const Columns & columns, size_t row) +{ + if (field_index_for_object_name) + object_name = columns[*field_index_for_object_name]->getDataAt(row).toString(); + else + object_name = "row_" + std::to_string(row + 1); + + IRowOutputFormat::write(columns, row); +} + +void JSONObjectEachRowRowOutputFormat::writeFieldDelimiter() +{ + /// We should not write comma before column that is used for + /// object name and also after it if it's in the first place + if (field_number != field_index_for_object_name && !(field_index_for_object_name == 0 && field_number == 1)) + JSONEachRowRowOutputFormat::writeFieldDelimiter(); +} + void JSONObjectEachRowRowOutputFormat::writePrefix() { JSONUtils::writeObjectStart(*ostr); @@ -17,9 +46,7 @@ void JSONObjectEachRowRowOutputFormat::writePrefix() void JSONObjectEachRowRowOutputFormat::writeRowStartDelimiter() { - ++row_num; - String title = "row_" + std::to_string(row_num); - JSONUtils::writeCompactObjectStart(*ostr, 1, title.c_str()); + JSONUtils::writeCompactObjectStart(*ostr, 1, object_name.c_str()); } void JSONObjectEachRowRowOutputFormat::writeRowEndDelimiter() @@ -52,6 +79,7 @@ void registerOutputFormatJSONObjectEachRow(FormatFactory & factory) return std::make_shared(buf, sample, params, settings); }); factory.markOutputFormatSupportsParallelFormatting("JSONObjectEachRow"); + factory.markFormatHasNoAppendSupport("JSONObjectEachRow"); } } diff --git a/src/Processors/Formats/Impl/JSONObjectEachRowRowOutputFormat.h b/src/Processors/Formats/Impl/JSONObjectEachRowRowOutputFormat.h index 51db22fb606..19d9fe1aa53 100644 --- a/src/Processors/Formats/Impl/JSONObjectEachRowRowOutputFormat.h +++ b/src/Processors/Formats/Impl/JSONObjectEachRowRowOutputFormat.h @@ -29,6 +29,9 @@ public: String getName() const override { return "JSONObjectEachRowRowOutputFormat"; } private: + void write(const Columns & columns, size_t row) override; + void writeField(const IColumn & column, const ISerialization & serialization, size_t row) override; + void writeFieldDelimiter() override; void writeRowStartDelimiter() override; void writeRowEndDelimiter() override; void writeRowBetweenDelimiter() override; @@ -36,7 +39,8 @@ private: void writePrefix() override; void writeSuffix() override; - size_t row_num = 0; + std::optional field_index_for_object_name; + String object_name; }; } diff --git a/src/Storages/System/StorageSystemFunctions.cpp b/src/Storages/System/StorageSystemFunctions.cpp index db6b51cb4f1..a0a406a974c 100644 --- a/src/Storages/System/StorageSystemFunctions.cpp +++ b/src/Storages/System/StorageSystemFunctions.cpp @@ -6,18 +6,9 @@ #include #include #include -#include -#include +#include +#include #include -#include -#include -#include -#include -#include -#include -#include - -namespace fs = std::filesystem; namespace DB @@ -30,11 +21,6 @@ enum class FunctionOrigin : Int8 EXECUTABLE_USER_DEFINED = 2 }; -namespace ErrorCodes -{ - extern const int CANNOT_RESTORE_TABLE; -} - namespace { template @@ -134,63 +120,12 @@ void StorageSystemFunctions::fillData(MutableColumns & res_columns, ContextPtr c void StorageSystemFunctions::backupData(BackupEntriesCollector & backup_entries_collector, const String & data_path_in_backup, const std::optional & /* partitions */) { - const auto & user_defined_sql_functions_factory = UserDefinedSQLFunctionFactory::instance(); - const auto & user_defined_sql_functions_names = user_defined_sql_functions_factory.getAllRegisteredNames(); - fs::path data_path_in_backup_fs{data_path_in_backup}; - for (const auto & function_name : user_defined_sql_functions_names) - { - auto ast = user_defined_sql_functions_factory.tryGet(function_name); - if (!ast) - continue; - backup_entries_collector.addBackupEntry( - data_path_in_backup_fs / (escapeForFileName(function_name) + ".sql"), - std::make_shared(queryToString(ast))); - } + UserDefinedSQLFunctionFactory::instance().backup(backup_entries_collector, data_path_in_backup); } void StorageSystemFunctions::restoreDataFromBackup(RestorerFromBackup & restorer, const String & data_path_in_backup, const std::optional & /* partitions */) { - auto backup = restorer.getBackup(); - fs::path data_path_in_backup_fs{data_path_in_backup}; - - Strings filenames = backup->listFiles(data_path_in_backup); - for (const auto & filename : filenames) - { - if (!filename.ends_with(".sql")) - { - throw Exception(ErrorCodes::CANNOT_RESTORE_TABLE, "Cannot restore table {}: File name {} doesn't have the extension .sql", - getStorageID().getFullTableName(), String{data_path_in_backup_fs / filename}); - } - } - - auto & user_defined_sql_functions_factory = UserDefinedSQLFunctionFactory::instance(); - const auto & restore_settings = restorer.getRestoreSettings(); - auto context = restorer.getContext(); - - for (const auto & filename : filenames) - { - String escaped_function_name = filename.substr(0, filename.length() - strlen(".sql")); - String function_name = unescapeForFileName(escaped_function_name); - - String filepath = data_path_in_backup_fs / filename; - auto function_def_entry = backup->readFile(filepath); - auto function_def_in = function_def_entry->getReadBuffer(); - String function_def; - readStringUntilEOF(function_def, *function_def_in); - - ParserCreateFunctionQuery parser; - ASTPtr ast = parseQuery( - parser, - function_def.data(), - function_def.data() + function_def.size(), - "in file " + filepath + " from backup " + backup->getName(), - 0, - context->getSettingsRef().max_parser_depth); - - bool replace = (restore_settings.create_function == RestoreUDFCreationMode::kReplace); - bool if_not_exists = (restore_settings.create_function == RestoreUDFCreationMode::kCreateIfNotExists); - user_defined_sql_functions_factory.registerFunction(context, function_name, ast, replace, if_not_exists, true); - } + UserDefinedSQLFunctionFactory::instance().restore(restorer, data_path_in_backup); } } diff --git a/tests/queries/0_stateless/02454_json_object_each_row_column_for_object_name.reference b/tests/queries/0_stateless/02454_json_object_each_row_column_for_object_name.reference new file mode 100644 index 00000000000..8925084f2ed --- /dev/null +++ b/tests/queries/0_stateless/02454_json_object_each_row_column_for_object_name.reference @@ -0,0 +1,20 @@ +{ + "name_0": {"number":"0"}, + "name_1": {"number":"1"}, + "name_2": {"number":"2"} +} +{ + "name_0": {"number":"0","x":"1"}, + "name_1": {"number":"1","x":"2"}, + "name_2": {"number":"2","x":"3"} +} +{ + "name_0": {"number":"0"}, + "name_1": {"number":"1"}, + "name_2": {"number":"2"} +} +name String +number Nullable(Int64) +name_0 0 +name_1 1 +name_2 2 diff --git a/tests/queries/0_stateless/02454_json_object_each_row_column_for_object_name.sql b/tests/queries/0_stateless/02454_json_object_each_row_column_for_object_name.sql new file mode 100644 index 00000000000..df0f75f68f2 --- /dev/null +++ b/tests/queries/0_stateless/02454_json_object_each_row_column_for_object_name.sql @@ -0,0 +1,11 @@ +-- Tags: no-fasttest, no-parallel +set format_json_object_each_row_column_for_object_name='name'; + +select number, concat('name_', toString(number)) as name from numbers(3) format JSONObjectEachRow; +select number, concat('name_', toString(number)) as name, number + 1 as x from numbers(3) format JSONObjectEachRow; +select concat('name_', toString(number)) as name, number from numbers(3) format JSONObjectEachRow; + +insert into function file(02454_data.jsonobjecteachrow) select number, concat('name_', toString(number)) as name from numbers(3) settings engine_file_truncate_on_insert=1; +desc file(02454_data.jsonobjecteachrow); +select * from file(02454_data.jsonobjecteachrow); +