Merge branch 'master' into system_table_add_column

This commit is contained in:
chen 2022-10-16 13:17:45 +08:00 committed by GitHub
commit 0e86976e2b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
57 changed files with 1260 additions and 695 deletions

View File

@ -1020,6 +1020,62 @@ Example:
}
```
To use object name as column value you can use special setting [format_json_object_each_row_column_for_object_name](../operations/settings/settings.md#format_json_object_each_row_column_for_object_name). Value of this setting is set to the name of a column, that is used as JSON key for a row in resulting object.
Examples:
For output:
Let's say we have table `test` with two columns:
```
┌─object_name─┬─number─┐
│ first_obj │ 1 │
│ second_obj │ 2 │
│ third_obj │ 3 │
└─────────────┴────────┘
```
Let's output it in `JSONObjectEachRow` format and use `format_json_object_each_row_column_for_object_name` setting:
```sql
select * from test settings format_json_object_each_row_column_for_object_name='object_name'
```
The output:
```json
{
"first_obj": {"number": 1},
"second_obj": {"number": 2},
"third_obj": {"number": 3}
}
```
For input:
Let's say we stored output from previous example in a file with name `data.json`:
```sql
select * from file('data.json', JSONObjectEachRow, 'object_name String, number UInt64') settings format_json_object_each_row_column_for_object_name='object_name'
```
```
┌─object_name─┬─number─┐
│ first_obj │ 1 │
│ second_obj │ 2 │
│ third_obj │ 3 │
└─────────────┴────────┘
```
It also works in schema inference:
```sql
desc file('data.json', JSONObjectEachRow) settings format_json_object_each_row_column_for_object_name='object_name'
```
```
┌─name────────┬─type────────────┐
│ object_name │ String │
│ number │ Nullable(Int64) │
└─────────────┴─────────────────┘
```
### Inserting Data {#json-inserting-data}

View File

@ -41,6 +41,7 @@ ClickHouse Inc does **not** maintain the libraries listed below and hasnt don
- [node-clickhouse](https://github.com/apla/node-clickhouse)
- [nestjs-clickhouse](https://github.com/depyronick/nestjs-clickhouse)
- [clickhouse-client](https://github.com/depyronick/clickhouse-client)
- [node-clickhouse-orm](https://github.com/zimv/node-clickhouse-orm)
- Perl
- [perl-DBD-ClickHouse](https://github.com/elcamlost/perl-DBD-ClickHouse)
- [HTTP-ClickHouse](https://metacpan.org/release/HTTP-ClickHouse)

View File

@ -3902,6 +3902,13 @@ Controls validation of UTF-8 sequences in JSON output formats, doesn't impact fo
Disabled by default.
### format_json_object_each_row_column_for_object_name {#format_json_object_each_row_column_for_object_name}
The name of column that will be used for storing/writing object names in [JSONObjectEachRow](../../interfaces/formats.md#jsonobjecteachrow) format.
Column type should be String. If value is empty, default names `row_{i}`will be used for object names.
Default value: ''.
## TSV format settings {#tsv-format-settings}
### input_format_tsv_empty_as_default {#input_format_tsv_empty_as_default}

View File

@ -34,6 +34,7 @@ sidebar_label: "Клиентские библиотеки от сторонни
- [node-clickhouse](https://github.com/apla/node-clickhouse)
- [nestjs-clickhouse](https://github.com/depyronick/nestjs-clickhouse)
- [clickhouse-client](https://github.com/depyronick/clickhouse-client)
- [node-clickhouse-orm](https://github.com/zimv/node-clickhouse-orm)
- Perl
- [perl-DBD-ClickHouse](https://github.com/elcamlost/perl-DBD-ClickHouse)
- [HTTP-ClickHouse](https://metacpan.org/release/HTTP-ClickHouse)

View File

@ -35,6 +35,9 @@ Yandex**没有**维护下面列出的库,也没有做过任何广泛的测试
- NodeJs
- [clickhouse (NodeJs)](https://github.com/TimonKK/clickhouse)
- [node-clickhouse](https://github.com/apla/node-clickhouse)
- [nestjs-clickhouse](https://github.com/depyronick/nestjs-clickhouse)
- [clickhouse-client](https://github.com/depyronick/clickhouse-client)
- [node-clickhouse-orm](https://github.com/zimv/node-clickhouse-orm)
- Perl
- [perl-DBD-ClickHouse](https://github.com/elcamlost/perl-DBD-ClickHouse)
- [HTTP-ClickHouse](https://metacpan.org/release/HTTP-ClickHouse)

View File

@ -58,7 +58,7 @@ void DisksApp::addOptions(
("disk", po::value<String>(), "Set disk name")
("command_name", po::value<String>(), "Name for command to do")
("send-logs", "Send logs")
("log-level", "Logging level")
("log-level", po::value<String>(), "Logging level")
;
positional_options_description.add("command_name", 1);

View File

@ -13,7 +13,6 @@
#include <Interpreters/DatabaseCatalog.h>
#include <base/getFQDNOrHostName.h>
#include <Common/scope_guard_safe.h>
#include <Interpreters/UserDefinedSQLObjectsLoader.h>
#include <Interpreters/Session.h>
#include <Access/AccessControl.h>
#include <Common/Exception.h>
@ -32,6 +31,7 @@
#include <Parsers/IAST.h>
#include <Parsers/ASTInsertQuery.h>
#include <Common/ErrorHandlers.h>
#include <Functions/UserDefined/IUserDefinedSQLObjectsLoader.h>
#include <Functions/registerFunctions.h>
#include <AggregateFunctions/registerAggregateFunctions.h>
#include <TableFunctions/registerTableFunctions.h>
@ -602,8 +602,6 @@ void LocalServer::processConfig()
global_context->setCurrentDatabase(default_database);
applyCmdOptions(global_context);
bool enable_objects_loader = false;
if (config().has("path"))
{
String path = global_context->getPath();
@ -611,12 +609,6 @@ void LocalServer::processConfig()
/// Lock path directory before read
status.emplace(fs::path(path) / "status", StatusFile::write_full_info);
LOG_DEBUG(log, "Loading user defined objects from {}", path);
Poco::File(path + "user_defined/").createDirectories();
UserDefinedSQLObjectsLoader::instance().loadObjects(global_context);
enable_objects_loader = true;
LOG_DEBUG(log, "Loaded user defined objects.");
LOG_DEBUG(log, "Loading metadata from {}", path);
loadMetadataSystem(global_context);
attachSystemTablesLocal(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::SYSTEM_DATABASE));
@ -630,6 +622,9 @@ void LocalServer::processConfig()
DatabaseCatalog::instance().loadDatabases();
}
/// For ClickHouse local if path is not set the loader will be disabled.
global_context->getUserDefinedSQLObjectsLoader().loadObjects();
LOG_DEBUG(log, "Loaded metadata.");
}
else if (!config().has("no-system-tables"))
@ -639,9 +634,6 @@ void LocalServer::processConfig()
attachInformationSchema(global_context, *createMemoryDatabaseIfNotExists(global_context, DatabaseCatalog::INFORMATION_SCHEMA_UPPERCASE));
}
/// Persist SQL user defined objects only if user_defined folder was created
UserDefinedSQLObjectsLoader::instance().enable(enable_objects_loader);
server_display_name = config().getString("display_name", getFQDNOrHostName());
prompt_by_server_display_name = config().getRawString("prompt_by_server_display_name.default", "{display_name} :) ");
std::map<String, String> prompt_substitutions{{"display_name", server_display_name}};

View File

@ -53,7 +53,6 @@
#include <Interpreters/ExternalDictionariesLoader.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/loadMetadata.h>
#include <Interpreters/UserDefinedSQLObjectsLoader.h>
#include <Interpreters/JIT/CompiledExpressionCache.h>
#include <Access/AccessControl.h>
#include <Storages/StorageReplicatedMergeTree.h>
@ -62,6 +61,7 @@
#include <Storages/Cache/ExternalDataSourceCache.h>
#include <Storages/Cache/registerRemoteFileMetadatas.h>
#include <AggregateFunctions/registerAggregateFunctions.h>
#include <Functions/UserDefined/IUserDefinedSQLObjectsLoader.h>
#include <Functions/registerFunctions.h>
#include <TableFunctions/registerTableFunctions.h>
#include <Formats/registerFormats.h>
@ -1010,12 +1010,6 @@ int Server::main(const std::vector<std::string> & /*args*/)
fs::create_directories(user_scripts_path);
}
{
std::string user_defined_path = config().getString("user_defined_path", path / "user_defined/");
global_context->setUserDefinedPath(user_defined_path);
fs::create_directories(user_defined_path);
}
/// top_level_domains_lists
{
const std::string & top_level_domains_path = config().getString("top_level_domains_path", path / "top_level_domains/");
@ -1559,18 +1553,6 @@ int Server::main(const std::vector<std::string> & /*args*/)
/// system logs may copy global context.
global_context->setCurrentDatabaseNameInGlobalContext(default_database);
LOG_INFO(log, "Loading user defined objects from {}", path_str);
try
{
UserDefinedSQLObjectsLoader::instance().loadObjects(global_context);
}
catch (...)
{
tryLogCurrentException(log, "Caught exception while loading user defined objects");
throw;
}
LOG_DEBUG(log, "Loaded user defined objects");
LOG_INFO(log, "Loading metadata from {}", path_str);
try
@ -1598,6 +1580,8 @@ int Server::main(const std::vector<std::string> & /*args*/)
database_catalog.loadDatabases();
/// After loading validate that default database exists
database_catalog.assertDatabaseExists(default_database);
/// Load user-defined SQL functions.
global_context->getUserDefinedSQLObjectsLoader().loadObjects();
}
catch (...)
{

View File

@ -284,6 +284,7 @@ add_object_library(clickhouse_processors_ttl Processors/TTL)
add_object_library(clickhouse_processors_merges_algorithms Processors/Merges/Algorithms)
add_object_library(clickhouse_processors_queryplan Processors/QueryPlan)
add_object_library(clickhouse_processors_queryplan_optimizations Processors/QueryPlan/Optimizations)
add_object_library(clickhouse_user_defined_functions Functions/UserDefined)
if (TARGET ch_contrib::nuraft)
add_object_library(clickhouse_coordination Coordination)

View File

@ -236,6 +236,21 @@ ASTPtr QueryFuzzer::getRandomColumnLike()
return new_ast;
}
ASTPtr QueryFuzzer::getRandomExpressionList()
{
if (column_like.empty())
{
return nullptr;
}
ASTPtr new_ast = std::make_shared<ASTExpressionList>();
for (size_t i = 0; i < fuzz_rand() % 5 + 1; ++i)
{
new_ast->children.push_back(getRandomColumnLike());
}
return new_ast;
}
void QueryFuzzer::replaceWithColumnLike(ASTPtr & ast)
{
if (column_like.empty())
@ -841,7 +856,52 @@ void QueryFuzzer::fuzz(ASTPtr & ast)
else if (auto * select = typeid_cast<ASTSelectQuery *>(ast.get()))
{
fuzzColumnLikeExpressionList(select->select().get());
fuzzColumnLikeExpressionList(select->groupBy().get());
if (select->groupBy().get())
{
if (fuzz_rand() % 50 == 0)
{
select->groupBy()->children.clear();
select->setExpression(ASTSelectQuery::Expression::GROUP_BY, {});
select->group_by_with_grouping_sets = false;
select->group_by_with_rollup = false;
select->group_by_with_cube = false;
select->group_by_with_totals = true;
}
else if (fuzz_rand() % 100 == 0)
{
select->group_by_with_grouping_sets = !select->group_by_with_grouping_sets;
}
else if (fuzz_rand() % 100 == 0)
{
select->group_by_with_rollup = !select->group_by_with_rollup;
}
else if (fuzz_rand() % 100 == 0)
{
select->group_by_with_cube = !select->group_by_with_cube;
}
else if (fuzz_rand() % 100 == 0)
{
select->group_by_with_totals = !select->group_by_with_totals;
}
}
else if (fuzz_rand() % 50 == 0)
{
select->setExpression(ASTSelectQuery::Expression::GROUP_BY, getRandomExpressionList());
}
if (select->where().get())
{
if (fuzz_rand() % 50 == 0)
{
select->where()->children.clear();
select->setExpression(ASTSelectQuery::Expression::WHERE, {});
}
}
else if (fuzz_rand() % 50 == 0)
{
select->setExpression(ASTSelectQuery::Expression::WHERE, getRandomColumnLike());
}
fuzzOrderByList(select->orderBy().get());
fuzz(select->children);

View File

@ -8,6 +8,7 @@
#include <pcg-random/pcg_random.hpp>
#include <Common/randomSeed.h>
#include "Parsers/IAST_fwd.h"
#include <Core/Field.h>
#include <Parsers/IAST.h>
@ -72,6 +73,7 @@ struct QueryFuzzer
Field getRandomField(int type);
Field fuzzField(Field field);
ASTPtr getRandomColumnLike();
ASTPtr getRandomExpressionList();
DataTypePtr fuzzDataType(DataTypePtr type);
DataTypePtr getRandomType();
ASTs getInsertQueriesForFuzzedTables(const String & full_query);

View File

@ -99,7 +99,7 @@ void ZooKeeper::init(ZooKeeperArgs args_)
if (dns_error)
throw KeeperException("Cannot resolve any of provided ZooKeeper hosts due to DNS error", Coordination::Error::ZCONNECTIONLOSS);
else
throw KeeperException("Cannot use any of provided ZooKeeper nodes", Coordination::Error::ZBADARGUMENTS);
throw KeeperException("Cannot use any of provided ZooKeeper nodes", Coordination::Error::ZCONNECTIONLOSS);
}
impl = std::make_unique<Coordination::ZooKeeper>(nodes, args, zk_log);

View File

@ -776,6 +776,8 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Bool, output_format_json_array_of_rows, false, "Output a JSON array of all rows in JSONEachRow(Compact) format.", 0) \
M(Bool, output_format_json_validate_utf8, false, "Validate UTF-8 sequences in JSON output formats, doesn't impact formats JSON/JSONCompact/JSONColumnsWithMetadata, they always validate utf8", 0) \
\
M(String, format_json_object_each_row_column_for_object_name, "", "The name of column that will be used as object names in JSONObjectEachRow format. Column type should be String", 0) \
\
M(UInt64, output_format_pretty_max_rows, 10000, "Rows limit for Pretty formats.", 0) \
M(UInt64, output_format_pretty_max_column_pad_width, 250, "Maximum width to pad all values in a column in Pretty formats.", 0) \
M(UInt64, output_format_pretty_max_value_width, 10000, "Maximum width of value to display in Pretty formats. If greater - it will be cut.", 0) \

View File

@ -100,6 +100,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.json.try_infer_numbers_from_strings = settings.input_format_json_try_infer_numbers_from_strings;
format_settings.json.validate_types_from_metadata = settings.input_format_json_validate_types_from_metadata;
format_settings.json.validate_utf8 = settings.output_format_json_validate_utf8;
format_settings.json_object_each_row.column_for_object_name = settings.format_json_object_each_row_column_for_object_name;
format_settings.json.try_infer_objects = context->getSettingsRef().allow_experimental_object_type;
format_settings.null_as_default = settings.input_format_null_as_default;
format_settings.decimal_trailing_zeros = settings.output_format_decimal_trailing_zeros;

View File

@ -156,6 +156,11 @@ struct FormatSettings
bool try_infer_objects = false;
} json;
struct
{
String column_for_object_name;
} json_object_each_row;
struct
{
UInt64 row_group_size = 1000000;

View File

@ -5,8 +5,8 @@
#include <DataTypes/DataTypeFactory.h>
#include <Interpreters/UserDefinedExecutableFunction.h>
#include <Interpreters/UserDefinedExecutableFunctionFactory.h>
#include <Functions/UserDefined/UserDefinedExecutableFunction.h>
#include <Functions/UserDefined/UserDefinedExecutableFunctionFactory.h>
#include <Functions/FunctionFactory.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>

View File

@ -4,7 +4,7 @@
#include <Interpreters/Context_fwd.h>
#include <Interpreters/ExternalLoader.h>
#include <Interpreters/UserDefinedExecutableFunction.h>
#include <Functions/UserDefined/UserDefinedExecutableFunction.h>
namespace DB
{

View File

@ -0,0 +1,47 @@
#pragma once
#include <base/types.h>
namespace DB
{
class IAST;
struct Settings;
enum class UserDefinedSQLObjectType;
/// Interface for a loader of user-defined SQL objects.
/// Implementations: UserDefinedSQLLoaderFromDisk, UserDefinedSQLLoaderFromZooKeeper
class IUserDefinedSQLObjectsLoader
{
public:
virtual ~IUserDefinedSQLObjectsLoader() = default;
/// Whether this loader can replicate SQL objects to another node.
virtual bool isReplicated() const { return false; }
virtual String getReplicationID() const { return ""; }
/// Loads all objects. Can be called once - if objects are already loaded the function does nothing.
virtual void loadObjects() = 0;
/// Stops watching.
virtual void stopWatching() {}
/// Immediately reloads all objects, throws an exception if failed.
virtual void reloadObjects() = 0;
/// Immediately reloads a specified object only.
virtual void reloadObject(UserDefinedSQLObjectType object_type, const String & object_name) = 0;
/// Stores an object (must be called only by UserDefinedSQLFunctionFactory::registerFunction).
virtual bool storeObject(
UserDefinedSQLObjectType object_type,
const String & object_name,
const IAST & create_object_query,
bool throw_if_exists,
bool replace_if_exists,
const Settings & settings) = 0;
/// Removes an object (must be called only by UserDefinedSQLFunctionFactory::unregisterFunction).
virtual bool removeObject(UserDefinedSQLObjectType object_type, const String & object_name, bool throw_if_not_exists) = 0;
};
}

View File

@ -12,9 +12,9 @@
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/UserDefined/ExternalUserDefinedExecutableFunctionsLoader.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <Interpreters/convertFieldToType.h>
#include <Interpreters/ExternalUserDefinedExecutableFunctionsLoader.h>
#include <Interpreters/Context.h>
#include <Interpreters/castColumn.h>

View File

@ -0,0 +1,301 @@
#include <Functions/UserDefined/UserDefinedSQLFunctionFactory.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <Backups/RestorerFromBackup.h>
#include <Functions/FunctionFactory.h>
#include <Functions/UserDefined/IUserDefinedSQLObjectsLoader.h>
#include <Functions/UserDefined/UserDefinedExecutableFunctionFactory.h>
#include <Functions/UserDefined/UserDefinedSQLObjectType.h>
#include <Functions/UserDefined/UserDefinedSQLObjectsBackup.h>
#include <Interpreters/Context.h>
#include <Interpreters/FunctionNameNormalizer.h>
#include <Parsers/ASTCreateFunctionQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Common/quoteString.h>
namespace DB
{
namespace ErrorCodes
{
extern const int FUNCTION_ALREADY_EXISTS;
extern const int UNKNOWN_FUNCTION;
extern const int CANNOT_DROP_FUNCTION;
extern const int CANNOT_CREATE_RECURSIVE_FUNCTION;
extern const int UNSUPPORTED_METHOD;
}
namespace
{
void validateFunctionRecursiveness(const IAST & node, const String & function_to_create)
{
for (const auto & child : node.children)
{
auto function_name_opt = tryGetFunctionName(child);
if (function_name_opt && function_name_opt.value() == function_to_create)
throw Exception(ErrorCodes::CANNOT_CREATE_RECURSIVE_FUNCTION, "You cannot create recursive function");
validateFunctionRecursiveness(*child, function_to_create);
}
}
void validateFunction(ASTPtr function, const String & name)
{
ASTFunction * lambda_function = function->as<ASTFunction>();
if (!lambda_function)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Expected function, got: {}", function->formatForErrorMessage());
auto & lambda_function_expression_list = lambda_function->arguments->children;
if (lambda_function_expression_list.size() != 2)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Lambda must have arguments and body");
const ASTFunction * tuple_function_arguments = lambda_function_expression_list[0]->as<ASTFunction>();
if (!tuple_function_arguments || !tuple_function_arguments->arguments)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Lambda must have valid arguments");
std::unordered_set<String> arguments;
for (const auto & argument : tuple_function_arguments->arguments->children)
{
const auto * argument_identifier = argument->as<ASTIdentifier>();
if (!argument_identifier)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Lambda argument must be identifier");
const auto & argument_name = argument_identifier->name();
auto [_, inserted] = arguments.insert(argument_name);
if (!inserted)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Identifier {} already used as function parameter", argument_name);
}
ASTPtr function_body = lambda_function_expression_list[1];
if (!function_body)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Lambda must have valid function body");
validateFunctionRecursiveness(*function_body, name);
}
ASTPtr normalizeCreateFunctionQuery(const IAST & create_function_query)
{
auto ptr = create_function_query.clone();
auto & res = typeid_cast<ASTCreateFunctionQuery &>(*ptr);
res.if_not_exists = false;
res.or_replace = false;
FunctionNameNormalizer().visit(res.function_core.get());
return ptr;
}
}
UserDefinedSQLFunctionFactory & UserDefinedSQLFunctionFactory::instance()
{
static UserDefinedSQLFunctionFactory result;
return result;
}
void UserDefinedSQLFunctionFactory::checkCanBeRegistered(const ContextPtr & context, const String & function_name, const IAST & create_function_query)
{
if (FunctionFactory::instance().hasNameOrAlias(function_name))
throw Exception(ErrorCodes::FUNCTION_ALREADY_EXISTS, "The function '{}' already exists", function_name);
if (AggregateFunctionFactory::instance().hasNameOrAlias(function_name))
throw Exception(ErrorCodes::FUNCTION_ALREADY_EXISTS, "The aggregate function '{}' already exists", function_name);
if (UserDefinedExecutableFunctionFactory::instance().has(function_name, context))
throw Exception(ErrorCodes::FUNCTION_ALREADY_EXISTS, "User defined executable function '{}' already exists", function_name);
validateFunction(assert_cast<const ASTCreateFunctionQuery &>(create_function_query).function_core, function_name);
}
void UserDefinedSQLFunctionFactory::checkCanBeUnregistered(const ContextPtr & context, const String & function_name)
{
if (FunctionFactory::instance().hasNameOrAlias(function_name) ||
AggregateFunctionFactory::instance().hasNameOrAlias(function_name))
throw Exception(ErrorCodes::CANNOT_DROP_FUNCTION, "Cannot drop system function '{}'", function_name);
if (UserDefinedExecutableFunctionFactory::instance().has(function_name, context))
throw Exception(ErrorCodes::CANNOT_DROP_FUNCTION, "Cannot drop user defined executable function '{}'", function_name);
}
bool UserDefinedSQLFunctionFactory::registerFunction(const ContextMutablePtr & context, const String & function_name, ASTPtr create_function_query, bool throw_if_exists, bool replace_if_exists)
{
checkCanBeRegistered(context, function_name, *create_function_query);
create_function_query = normalizeCreateFunctionQuery(*create_function_query);
std::lock_guard lock{mutex};
auto it = function_name_to_create_query_map.find(function_name);
if (it != function_name_to_create_query_map.end())
{
if (throw_if_exists)
throw Exception(ErrorCodes::FUNCTION_ALREADY_EXISTS, "User-defined function '{}' already exists", function_name);
else if (!replace_if_exists)
return false;
}
try
{
auto & loader = context->getUserDefinedSQLObjectsLoader();
bool stored = loader.storeObject(UserDefinedSQLObjectType::Function, function_name, *create_function_query, throw_if_exists, replace_if_exists, context->getSettingsRef());
if (!stored)
return false;
}
catch (Exception & exception)
{
exception.addMessage(fmt::format("while storing user defined function {}", backQuote(function_name)));
throw;
}
function_name_to_create_query_map[function_name] = create_function_query;
return true;
}
bool UserDefinedSQLFunctionFactory::unregisterFunction(const ContextMutablePtr & context, const String & function_name, bool throw_if_not_exists)
{
checkCanBeUnregistered(context, function_name);
std::lock_guard lock(mutex);
auto it = function_name_to_create_query_map.find(function_name);
if (it == function_name_to_create_query_map.end())
{
if (throw_if_not_exists)
throw Exception(ErrorCodes::UNKNOWN_FUNCTION, "User-defined function '{}' doesn't exist", function_name);
else
return false;
}
try
{
auto & loader = context->getUserDefinedSQLObjectsLoader();
bool removed = loader.removeObject(UserDefinedSQLObjectType::Function, function_name, throw_if_not_exists);
if (!removed)
return false;
}
catch (Exception & exception)
{
exception.addMessage(fmt::format("while removing user defined function {}", backQuote(function_name)));
throw;
}
function_name_to_create_query_map.erase(function_name);
return true;
}
ASTPtr UserDefinedSQLFunctionFactory::get(const String & function_name) const
{
std::lock_guard lock(mutex);
auto it = function_name_to_create_query_map.find(function_name);
if (it == function_name_to_create_query_map.end())
throw Exception(ErrorCodes::UNKNOWN_FUNCTION,
"The function name '{}' is not registered",
function_name);
return it->second;
}
ASTPtr UserDefinedSQLFunctionFactory::tryGet(const std::string & function_name) const
{
std::lock_guard lock(mutex);
auto it = function_name_to_create_query_map.find(function_name);
if (it == function_name_to_create_query_map.end())
return nullptr;
return it->second;
}
bool UserDefinedSQLFunctionFactory::has(const String & function_name) const
{
return tryGet(function_name) != nullptr;
}
std::vector<std::string> UserDefinedSQLFunctionFactory::getAllRegisteredNames() const
{
std::vector<std::string> registered_names;
std::lock_guard lock(mutex);
registered_names.reserve(function_name_to_create_query_map.size());
for (const auto & [name, _] : function_name_to_create_query_map)
registered_names.emplace_back(name);
return registered_names;
}
bool UserDefinedSQLFunctionFactory::empty() const
{
std::lock_guard lock(mutex);
return function_name_to_create_query_map.empty();
}
void UserDefinedSQLFunctionFactory::backup(BackupEntriesCollector & backup_entries_collector, const String & data_path_in_backup) const
{
backupUserDefinedSQLObjects(backup_entries_collector, data_path_in_backup, UserDefinedSQLObjectType::Function, getAllFunctions());
}
void UserDefinedSQLFunctionFactory::restore(RestorerFromBackup & restorer, const String & data_path_in_backup)
{
auto restored_functions = restoreUserDefinedSQLObjects(restorer, data_path_in_backup, UserDefinedSQLObjectType::Function);
const auto & restore_settings = restorer.getRestoreSettings();
bool throw_if_exists = (restore_settings.create_function == RestoreUDFCreationMode::kCreate);
bool replace_if_exists = (restore_settings.create_function == RestoreUDFCreationMode::kReplace);
auto context = restorer.getContext();
for (const auto & [function_name, create_function_query] : restored_functions)
registerFunction(context, function_name, create_function_query, throw_if_exists, replace_if_exists);
}
void UserDefinedSQLFunctionFactory::setAllFunctions(const std::vector<std::pair<String, ASTPtr>> & new_functions)
{
std::unordered_map<String, ASTPtr> normalized_functions;
for (const auto & [function_name, create_query] : new_functions)
normalized_functions[function_name] = normalizeCreateFunctionQuery(*create_query);
std::lock_guard lock(mutex);
function_name_to_create_query_map = std::move(normalized_functions);
}
std::vector<std::pair<String, ASTPtr>> UserDefinedSQLFunctionFactory::getAllFunctions() const
{
std::lock_guard lock{mutex};
std::vector<std::pair<String, ASTPtr>> all_functions;
all_functions.reserve(function_name_to_create_query_map.size());
std::copy(function_name_to_create_query_map.begin(), function_name_to_create_query_map.end(), std::back_inserter(all_functions));
return all_functions;
}
void UserDefinedSQLFunctionFactory::setFunction(const String & function_name, const IAST & create_function_query)
{
std::lock_guard lock(mutex);
function_name_to_create_query_map[function_name] = normalizeCreateFunctionQuery(create_function_query);
}
void UserDefinedSQLFunctionFactory::removeFunction(const String & function_name)
{
std::lock_guard lock(mutex);
function_name_to_create_query_map.erase(function_name);
}
void UserDefinedSQLFunctionFactory::removeAllFunctionsExcept(const Strings & function_names_to_keep)
{
boost::container::flat_set<std::string_view> names_set_to_keep{function_names_to_keep.begin(), function_names_to_keep.end()};
std::lock_guard lock(mutex);
for (auto it = function_name_to_create_query_map.begin(); it != function_name_to_create_query_map.end();)
{
auto current = it++;
if (!names_set_to_keep.contains(current->first))
function_name_to_create_query_map.erase(current);
}
}
std::unique_lock<std::recursive_mutex> UserDefinedSQLFunctionFactory::getLock() const
{
return std::unique_lock{mutex};
}
}

View File

@ -0,0 +1,70 @@
#pragma once
#include <unordered_map>
#include <mutex>
#include <Common/NamePrompter.h>
#include <Parsers/ASTCreateFunctionQuery.h>
#include <Interpreters/Context_fwd.h>
namespace DB
{
class BackupEntriesCollector;
class RestorerFromBackup;
/// Factory for SQLUserDefinedFunctions
class UserDefinedSQLFunctionFactory : public IHints<1, UserDefinedSQLFunctionFactory>
{
public:
static UserDefinedSQLFunctionFactory & instance();
/// Register function for function_name in factory for specified create_function_query.
bool registerFunction(const ContextMutablePtr & context, const String & function_name, ASTPtr create_function_query, bool throw_if_exists, bool replace_if_exists);
/// Unregister function for function_name.
bool unregisterFunction(const ContextMutablePtr & context, const String & function_name, bool throw_if_not_exists);
/// Get function create query for function_name. If no function registered with function_name throws exception.
ASTPtr get(const String & function_name) const;
/// Get function create query for function_name. If no function registered with function_name return nullptr.
ASTPtr tryGet(const String & function_name) const;
/// Check if function with function_name registered.
bool has(const String & function_name) const;
/// Get all user defined functions registered names.
std::vector<String> getAllRegisteredNames() const override;
/// Check whether any UDFs have been registered
bool empty() const;
/// Makes backup entries for all user-defined SQL functions.
void backup(BackupEntriesCollector & backup_entries_collector, const String & data_path_in_backup) const;
/// Restores user-defined SQL functions from the backup.
void restore(RestorerFromBackup & restorer, const String & data_path_in_backup);
private:
friend class UserDefinedSQLObjectsLoaderFromDisk;
friend class UserDefinedSQLObjectsLoaderFromZooKeeper;
/// Checks that a specified function can be registered, throws an exception if not.
static void checkCanBeRegistered(const ContextPtr & context, const String & function_name, const IAST & create_function_query);
static void checkCanBeUnregistered(const ContextPtr & context, const String & function_name);
/// The following functions must be called only by the loader.
void setAllFunctions(const std::vector<std::pair<String, ASTPtr>> & new_functions);
std::vector<std::pair<String, ASTPtr>> getAllFunctions() const;
void setFunction(const String & function_name, const IAST & create_function_query);
void removeFunction(const String & function_name);
void removeAllFunctionsExcept(const Strings & function_names_to_keep);
std::unique_lock<std::recursive_mutex> getLock() const;
std::unordered_map<String, ASTPtr> function_name_to_create_query_map;
mutable std::recursive_mutex mutex;
};
}

View File

@ -8,7 +8,7 @@
#include <Parsers/ASTCreateFunctionQuery.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTIdentifier.h>
#include <Interpreters/UserDefinedSQLFunctionFactory.h>
#include <Functions/UserDefined/UserDefinedSQLFunctionFactory.h>
namespace DB

View File

@ -0,0 +1,12 @@
#pragma once
namespace DB
{
enum class UserDefinedSQLObjectType
{
Function
};
}

View File

@ -0,0 +1,103 @@
#include <Functions/UserDefined/UserDefinedSQLObjectsBackup.h>
#include <Backups/BackupEntriesCollector.h>
#include <Backups/BackupEntryFromMemory.h>
#include <Backups/IBackup.h>
#include <Backups/IBackupCoordination.h>
#include <Backups/IRestoreCoordination.h>
#include <Backups/RestorerFromBackup.h>
#include <Functions/UserDefined/IUserDefinedSQLObjectsLoader.h>
#include <Functions/UserDefined/UserDefinedSQLObjectType.h>
#include <Interpreters/Context.h>
#include <Parsers/ParserCreateFunctionQuery.h>
#include <Parsers/parseQuery.h>
#include <Parsers/queryToString.h>
#include <Common/escapeForFileName.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_RESTORE_TABLE;
}
void backupUserDefinedSQLObjects(
BackupEntriesCollector & backup_entries_collector,
const String & data_path_in_backup,
UserDefinedSQLObjectType /* object_type */,
const std::vector<std::pair<String, ASTPtr>> & objects)
{
std::vector<std::pair<String, BackupEntryPtr>> backup_entries;
backup_entries.reserve(objects.size());
for (const auto & [function_name, create_function_query] : objects)
backup_entries.emplace_back(
escapeForFileName(function_name) + ".sql", std::make_shared<BackupEntryFromMemory>(queryToString(create_function_query)));
fs::path data_path_in_backup_fs{data_path_in_backup};
for (const auto & entry : backup_entries)
backup_entries_collector.addBackupEntry(data_path_in_backup_fs / entry.first, entry.second);
}
std::vector<std::pair<String, ASTPtr>>
restoreUserDefinedSQLObjects(RestorerFromBackup & restorer, const String & data_path_in_backup, UserDefinedSQLObjectType object_type)
{
auto context = restorer.getContext();
auto backup = restorer.getBackup();
fs::path data_path_in_backup_fs{data_path_in_backup};
Strings filenames = backup->listFiles(data_path_in_backup);
if (filenames.empty())
return {}; /// Nothing to restore.
for (const auto & filename : filenames)
{
if (!filename.ends_with(".sql"))
{
throw Exception(
ErrorCodes::CANNOT_RESTORE_TABLE,
"Cannot restore user-defined SQL objects: File name {} doesn't have the extension .sql",
String{data_path_in_backup_fs / filename});
}
}
std::vector<std::pair<String, ASTPtr>> res;
for (const auto & filename : filenames)
{
String escaped_function_name = filename.substr(0, filename.length() - strlen(".sql"));
String function_name = unescapeForFileName(escaped_function_name);
String filepath = data_path_in_backup_fs / filename;
auto backup_entry = backup->readFile(filepath);
auto in = backup_entry->getReadBuffer();
String statement_def;
readStringUntilEOF(statement_def, *in);
ASTPtr ast;
switch (object_type)
{
case UserDefinedSQLObjectType::Function:
{
ParserCreateFunctionQuery parser;
ast = parseQuery(
parser,
statement_def.data(),
statement_def.data() + statement_def.size(),
"in file " + filepath + " from backup " + backup->getName(),
0,
context->getSettingsRef().max_parser_depth);
break;
}
}
res.emplace_back(std::move(function_name), ast);
}
return res;
}
}

View File

@ -0,0 +1,25 @@
#pragma once
#include <Parsers/IAST_fwd.h>
#include <base/types.h>
namespace DB
{
class BackupEntriesCollector;
class RestorerFromBackup;
enum class UserDefinedSQLObjectType;
class IBackupEntry;
using BackupEntryPtr = std::shared_ptr<const IBackupEntry>;
/// Makes backup entries to backup user-defined SQL objects.
void backupUserDefinedSQLObjects(
BackupEntriesCollector & backup_entries_collector,
const String & data_path_in_backup,
UserDefinedSQLObjectType object_type,
const std::vector<std::pair<String, ASTPtr>> & objects);
/// Restores user-defined SQL objects from the backup.
std::vector<std::pair<String, ASTPtr>>
restoreUserDefinedSQLObjects(RestorerFromBackup & restorer, const String & data_path_in_backup, UserDefinedSQLObjectType object_type);
}

View File

@ -0,0 +1,265 @@
#include "Functions/UserDefined/UserDefinedSQLObjectsLoaderFromDisk.h"
#include "Functions/UserDefined/UserDefinedSQLFunctionFactory.h"
#include "Functions/UserDefined/UserDefinedSQLObjectType.h"
#include <Common/StringUtils/StringUtils.h>
#include <Common/atomicRename.h>
#include <Common/escapeForFileName.h>
#include <Common/logger_useful.h>
#include <Common/quoteString.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <Parsers/parseQuery.h>
#include <Parsers/formatAST.h>
#include <Parsers/ParserCreateFunctionQuery.h>
#include <Poco/DirectoryIterator.h>
#include <Poco/Logger.h>
#include <filesystem>
namespace fs = std::filesystem;
namespace DB
{
namespace ErrorCodes
{
extern const int DIRECTORY_DOESNT_EXIST;
extern const int FUNCTION_ALREADY_EXISTS;
extern const int UNKNOWN_FUNCTION;
}
namespace
{
/// Converts a path to an absolute path and append it with a separator.
String makeDirectoryPathCanonical(const String & directory_path)
{
auto canonical_directory_path = std::filesystem::weakly_canonical(directory_path);
if (canonical_directory_path.has_filename())
canonical_directory_path += std::filesystem::path::preferred_separator;
return canonical_directory_path;
}
}
UserDefinedSQLObjectsLoaderFromDisk::UserDefinedSQLObjectsLoaderFromDisk(const ContextPtr & global_context_, const String & dir_path_)
: global_context(global_context_)
, dir_path{makeDirectoryPathCanonical(dir_path_)}
, log{&Poco::Logger::get("UserDefinedSQLObjectsLoaderFromDisk")}
{
createDirectory();
}
ASTPtr UserDefinedSQLObjectsLoaderFromDisk::tryLoadObject(UserDefinedSQLObjectType object_type, const String & object_name)
{
return tryLoadObject(object_type, object_name, getFilePath(object_type, object_name), /* check_file_exists= */ true);
}
ASTPtr UserDefinedSQLObjectsLoaderFromDisk::tryLoadObject(UserDefinedSQLObjectType object_type, const String & object_name, const String & path, bool check_file_exists)
{
LOG_DEBUG(log, "Loading user defined object {} from file {}", backQuote(object_name), path);
try
{
if (check_file_exists && !fs::exists(path))
return nullptr;
/// There is .sql file with user defined object creation statement.
ReadBufferFromFile in(path);
String object_create_query;
readStringUntilEOF(object_create_query, in);
switch (object_type)
{
case UserDefinedSQLObjectType::Function:
{
ParserCreateFunctionQuery parser;
ASTPtr ast = parseQuery(
parser,
object_create_query.data(),
object_create_query.data() + object_create_query.size(),
"",
0,
global_context->getSettingsRef().max_parser_depth);
UserDefinedSQLFunctionFactory::checkCanBeRegistered(global_context, object_name, *ast);
return ast;
}
}
}
catch (...)
{
tryLogCurrentException(log, fmt::format("while loading user defined SQL object {} from path {}", backQuote(object_name), path));
return nullptr; /// Failed to load this sql object, will ignore it
}
}
void UserDefinedSQLObjectsLoaderFromDisk::loadObjects()
{
if (!objects_loaded)
loadObjectsImpl();
}
void UserDefinedSQLObjectsLoaderFromDisk::reloadObjects()
{
loadObjectsImpl();
}
void UserDefinedSQLObjectsLoaderFromDisk::loadObjectsImpl()
{
LOG_INFO(log, "Loading user defined objects from {}", dir_path);
createDirectory();
std::vector<std::pair<String, ASTPtr>> function_names_and_queries;
Poco::DirectoryIterator dir_end;
for (Poco::DirectoryIterator it(dir_path); it != dir_end; ++it)
{
if (it->isDirectory())
continue;
const String & file_name = it.name();
if (!startsWith(file_name, "function_") || !endsWith(file_name, ".sql"))
continue;
size_t prefix_length = strlen("function_");
size_t suffix_length = strlen(".sql");
String function_name = unescapeForFileName(file_name.substr(prefix_length, file_name.length() - prefix_length - suffix_length));
if (function_name.empty())
continue;
ASTPtr ast = tryLoadObject(UserDefinedSQLObjectType::Function, function_name, dir_path + it.name(), /* check_file_exists= */ false);
if (ast)
function_names_and_queries.emplace_back(function_name, ast);
}
UserDefinedSQLFunctionFactory::instance().setAllFunctions(function_names_and_queries);
objects_loaded = true;
LOG_DEBUG(log, "User defined objects loaded");
}
void UserDefinedSQLObjectsLoaderFromDisk::reloadObject(UserDefinedSQLObjectType object_type, const String & object_name)
{
createDirectory();
auto ast = tryLoadObject(object_type, object_name);
auto & factory = UserDefinedSQLFunctionFactory::instance();
if (ast)
factory.setFunction(object_name, *ast);
else
factory.removeFunction(object_name);
}
void UserDefinedSQLObjectsLoaderFromDisk::createDirectory()
{
std::error_code create_dir_error_code;
fs::create_directories(dir_path, create_dir_error_code);
if (!fs::exists(dir_path) || !fs::is_directory(dir_path) || create_dir_error_code)
throw Exception("Couldn't create directory " + dir_path + " reason: '" + create_dir_error_code.message() + "'", ErrorCodes::DIRECTORY_DOESNT_EXIST);
}
bool UserDefinedSQLObjectsLoaderFromDisk::storeObject(
UserDefinedSQLObjectType object_type,
const String & object_name,
const IAST & create_object_query,
bool throw_if_exists,
bool replace_if_exists,
const Settings & settings)
{
String file_path = getFilePath(object_type, object_name);
LOG_DEBUG(log, "Storing user-defined object {} to file {}", backQuote(object_name), file_path);
if (fs::exists(file_path))
{
if (throw_if_exists)
throw Exception(ErrorCodes::FUNCTION_ALREADY_EXISTS, "User-defined function '{}' already exists", object_name);
else if (!replace_if_exists)
return false;
}
WriteBufferFromOwnString create_statement_buf;
formatAST(create_object_query, create_statement_buf, false);
writeChar('\n', create_statement_buf);
String create_statement = create_statement_buf.str();
String temp_file_path = file_path + ".tmp";
try
{
WriteBufferFromFile out(temp_file_path, create_statement.size());
writeString(create_statement, out);
out.next();
if (settings.fsync_metadata)
out.sync();
out.close();
if (replace_if_exists)
fs::rename(temp_file_path, file_path);
else
renameNoReplace(temp_file_path, file_path);
}
catch (...)
{
fs::remove(temp_file_path);
throw;
}
LOG_TRACE(log, "Object {} stored", backQuote(object_name));
return true;
}
bool UserDefinedSQLObjectsLoaderFromDisk::removeObject(
UserDefinedSQLObjectType object_type, const String & object_name, bool throw_if_not_exists)
{
String file_path = getFilePath(object_type, object_name);
LOG_DEBUG(log, "Removing user defined object {} stored in file {}", backQuote(object_name), file_path);
bool existed = fs::remove(file_path);
if (!existed)
{
if (throw_if_not_exists)
throw Exception(ErrorCodes::UNKNOWN_FUNCTION, "User-defined function '{}' doesn't exist", object_name);
else
return false;
}
LOG_TRACE(log, "Object {} removed", backQuote(object_name));
return true;
}
String UserDefinedSQLObjectsLoaderFromDisk::getFilePath(UserDefinedSQLObjectType object_type, const String & object_name) const
{
String file_path;
switch (object_type)
{
case UserDefinedSQLObjectType::Function:
{
file_path = dir_path + "function_" + escapeForFileName(object_name) + ".sql";
break;
}
}
return file_path;
}
}

View File

@ -0,0 +1,46 @@
#pragma once
#include <Functions/UserDefined/IUserDefinedSQLObjectsLoader.h>
#include <Interpreters/Context_fwd.h>
#include <Parsers/IAST_fwd.h>
namespace DB
{
/// Loads user-defined sql objects from a specified folder.
class UserDefinedSQLObjectsLoaderFromDisk : public IUserDefinedSQLObjectsLoader
{
public:
UserDefinedSQLObjectsLoaderFromDisk(const ContextPtr & global_context_, const String & dir_path_);
void loadObjects() override;
void reloadObjects() override;
void reloadObject(UserDefinedSQLObjectType object_type, const String & object_name) override;
bool storeObject(
UserDefinedSQLObjectType object_type,
const String & object_name,
const IAST & create_object_query,
bool throw_if_exists,
bool replace_if_exists,
const Settings & settings) override;
bool removeObject(UserDefinedSQLObjectType object_type, const String & object_name, bool throw_if_not_exists) override;
private:
void createDirectory();
void loadObjectsImpl();
ASTPtr tryLoadObject(UserDefinedSQLObjectType object_type, const String & object_name);
ASTPtr tryLoadObject(UserDefinedSQLObjectType object_type, const String & object_name, const String & file_path, bool check_file_exists);
String getFilePath(UserDefinedSQLObjectType object_type, const String & object_name) const;
ContextPtr global_context;
String dir_path;
Poco::Logger * log;
std::atomic<bool> objects_loaded = false;
};
}

View File

@ -0,0 +1,21 @@
#include <Functions/UserDefined/createUserDefinedSQLObjectsLoader.h>
#include <Functions/UserDefined/UserDefinedSQLObjectsLoaderFromDisk.h>
#include <Interpreters/Context.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <filesystem>
namespace fs = std::filesystem;
namespace DB
{
std::unique_ptr<IUserDefinedSQLObjectsLoader> createUserDefinedSQLObjectsLoader(const ContextMutablePtr & global_context)
{
const auto & config = global_context->getConfigRef();
String default_path = fs::path{global_context->getPath()} / "user_defined/";
String path = config.getString("user_defined_path", default_path);
return std::make_unique<UserDefinedSQLObjectsLoaderFromDisk>(global_context, path);
}
}

View File

@ -0,0 +1,12 @@
#pragma once
#include <Interpreters/Context_fwd.h>
namespace DB
{
class IUserDefinedSQLObjectsLoader;
std::unique_ptr<IUserDefinedSQLObjectsLoader> createUserDefinedSQLObjectsLoader(const ContextMutablePtr & global_context);
}

View File

@ -52,7 +52,7 @@
#include <Interpreters/interpretSubquery.h>
#include <Interpreters/DatabaseAndTableWithAlias.h>
#include <Interpreters/IdentifierSemantic.h>
#include <Interpreters/UserDefinedExecutableFunctionFactory.h>
#include <Functions/UserDefined/UserDefinedExecutableFunctionFactory.h>
namespace DB

View File

@ -57,7 +57,9 @@
#include <Dictionaries/Embedded/GeoDictionariesLoader.h>
#include <Interpreters/EmbeddedDictionaries.h>
#include <Interpreters/ExternalDictionariesLoader.h>
#include <Interpreters/ExternalUserDefinedExecutableFunctionsLoader.h>
#include <Functions/UserDefined/ExternalUserDefinedExecutableFunctionsLoader.h>
#include <Functions/UserDefined/IUserDefinedSQLObjectsLoader.h>
#include <Functions/UserDefined/createUserDefinedSQLObjectsLoader.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/ProcessList.h>
#include <Interpreters/InterserverCredentials.h>
@ -186,7 +188,6 @@ struct ContextSharedPart : boost::noncopyable
String user_files_path; /// Path to the directory with user provided files, usable by 'file' table function.
String dictionaries_lib_path; /// Path to the directory with user provided binaries and libraries for external dictionaries.
String user_scripts_path; /// Path to the directory with user provided scripts.
String user_defined_path; /// Path to the directory with user defined objects.
ConfigurationPtr config; /// Global configuration settings.
String tmp_path; /// Path to the temporary files that occur when processing the request.
@ -194,16 +195,18 @@ struct ContextSharedPart : boost::noncopyable
mutable std::unique_ptr<EmbeddedDictionaries> embedded_dictionaries; /// Metrica's dictionaries. Have lazy initialization.
mutable std::unique_ptr<ExternalDictionariesLoader> external_dictionaries_loader;
mutable std::unique_ptr<ExternalUserDefinedExecutableFunctionsLoader> external_user_defined_executable_functions_loader;
scope_guard models_repository_guard;
ExternalLoaderXMLConfigRepository * external_dictionaries_config_repository = nullptr;
scope_guard dictionaries_xmls;
mutable std::unique_ptr<ExternalUserDefinedExecutableFunctionsLoader> external_user_defined_executable_functions_loader;
ExternalLoaderXMLConfigRepository * user_defined_executable_functions_config_repository = nullptr;
scope_guard user_defined_executable_functions_xmls;
mutable std::unique_ptr<IUserDefinedSQLObjectsLoader> user_defined_sql_objects_loader;
#if USE_NLP
mutable std::optional<SynonymsExtensions> synonyms_extensions;
mutable std::optional<Lemmatizers> lemmatizers;
@ -420,6 +423,8 @@ struct ContextSharedPart : boost::noncopyable
external_dictionaries_loader->enablePeriodicUpdates(false);
if (external_user_defined_executable_functions_loader)
external_user_defined_executable_functions_loader->enablePeriodicUpdates(false);
if (user_defined_sql_objects_loader)
user_defined_sql_objects_loader->stopWatching();
Session::shutdownNamedSessions();
@ -450,6 +455,7 @@ struct ContextSharedPart : boost::noncopyable
std::unique_ptr<EmbeddedDictionaries> delete_embedded_dictionaries;
std::unique_ptr<ExternalDictionariesLoader> delete_external_dictionaries_loader;
std::unique_ptr<ExternalUserDefinedExecutableFunctionsLoader> delete_external_user_defined_executable_functions_loader;
std::unique_ptr<IUserDefinedSQLObjectsLoader> delete_user_defined_sql_objects_loader;
std::unique_ptr<BackgroundSchedulePool> delete_buffer_flush_schedule_pool;
std::unique_ptr<BackgroundSchedulePool> delete_schedule_pool;
std::unique_ptr<BackgroundSchedulePool> delete_distributed_schedule_pool;
@ -488,6 +494,7 @@ struct ContextSharedPart : boost::noncopyable
delete_embedded_dictionaries = std::move(embedded_dictionaries);
delete_external_dictionaries_loader = std::move(external_dictionaries_loader);
delete_external_user_defined_executable_functions_loader = std::move(external_user_defined_executable_functions_loader);
delete_user_defined_sql_objects_loader = std::move(user_defined_sql_objects_loader);
delete_buffer_flush_schedule_pool = std::move(buffer_flush_schedule_pool);
delete_schedule_pool = std::move(schedule_pool);
delete_distributed_schedule_pool = std::move(distributed_schedule_pool);
@ -515,6 +522,7 @@ struct ContextSharedPart : boost::noncopyable
delete_embedded_dictionaries.reset();
delete_external_dictionaries_loader.reset();
delete_external_user_defined_executable_functions_loader.reset();
delete_user_defined_sql_objects_loader.reset();
delete_ddl_worker.reset();
delete_buffer_flush_schedule_pool.reset();
delete_schedule_pool.reset();
@ -658,12 +666,6 @@ String Context::getUserScriptsPath() const
return shared->user_scripts_path;
}
String Context::getUserDefinedPath() const
{
auto lock = getLock();
return shared->user_defined_path;
}
Strings Context::getWarnings() const
{
Strings common_warnings;
@ -726,9 +728,6 @@ void Context::setPath(const String & path)
if (shared->user_scripts_path.empty())
shared->user_scripts_path = shared->path + "user_scripts/";
if (shared->user_defined_path.empty())
shared->user_defined_path = shared->path + "user_defined/";
}
VolumePtr Context::setTemporaryStorage(const String & path, const String & policy_name, size_t max_size)
@ -804,12 +803,6 @@ void Context::setUserScriptsPath(const String & path)
shared->user_scripts_path = path;
}
void Context::setUserDefinedPath(const String & path)
{
auto lock = getLock();
shared->user_defined_path = path;
}
void Context::addWarningMessage(const String & msg) const
{
auto lock = getLock();
@ -1652,6 +1645,22 @@ void Context::loadOrReloadUserDefinedExecutableFunctions(const Poco::Util::Abstr
shared->user_defined_executable_functions_xmls = external_user_defined_executable_functions_loader.addConfigRepository(std::move(repository));
}
const IUserDefinedSQLObjectsLoader & Context::getUserDefinedSQLObjectsLoader() const
{
auto lock = getLock();
if (!shared->user_defined_sql_objects_loader)
shared->user_defined_sql_objects_loader = createUserDefinedSQLObjectsLoader(getGlobalContext());
return *shared->user_defined_sql_objects_loader;
}
IUserDefinedSQLObjectsLoader & Context::getUserDefinedSQLObjectsLoader()
{
auto lock = getLock();
if (!shared->user_defined_sql_objects_loader)
shared->user_defined_sql_objects_loader = createUserDefinedSQLObjectsLoader(getGlobalContext());
return *shared->user_defined_sql_objects_loader;
}
#if USE_NLP
SynonymsExtensions & Context::getSynonymsExtensions() const
@ -3410,7 +3419,7 @@ void Context::initializeBackgroundExecutorsIfNeeded()
size_t background_merges_mutations_concurrency_ratio = 2;
if (config.has("background_merges_mutations_concurrency_ratio"))
background_merges_mutations_concurrency_ratio = config.getUInt64("background_merges_mutations_concurrency_ratio");
else if (config.has("profiles.default.background_pool_size"))
else if (config.has("profiles.default.background_merges_mutations_concurrency_ratio"))
background_merges_mutations_concurrency_ratio = config.getUInt64("profiles.default.background_merges_mutations_concurrency_ratio");
size_t background_move_pool_size = 8;

View File

@ -54,6 +54,7 @@ enum class RowPolicyFilterType;
class EmbeddedDictionaries;
class ExternalDictionariesLoader;
class ExternalUserDefinedExecutableFunctionsLoader;
class IUserDefinedSQLObjectsLoader;
class InterserverCredentials;
using InterserverCredentialsPtr = std::shared_ptr<const InterserverCredentials>;
class InterserverIOHandler;
@ -435,7 +436,6 @@ public:
String getUserFilesPath() const;
String getDictionariesLibPath() const;
String getUserScriptsPath() const;
String getUserDefinedPath() const;
/// A list of warnings about server configuration to place in `system.warnings` table.
Strings getWarnings() const;
@ -450,7 +450,6 @@ public:
void setUserFilesPath(const String & path);
void setDictionariesLibPath(const String & path);
void setUserScriptsPath(const String & path);
void setUserDefinedPath(const String & path);
void addWarningMessage(const String & msg) const;
@ -653,16 +652,19 @@ public:
/// Returns the current constraints (can return null).
std::shared_ptr<const SettingsConstraintsAndProfileIDs> getSettingsConstraintsAndCurrentProfiles() const;
const EmbeddedDictionaries & getEmbeddedDictionaries() const;
const ExternalDictionariesLoader & getExternalDictionariesLoader() const;
const ExternalUserDefinedExecutableFunctionsLoader & getExternalUserDefinedExecutableFunctionsLoader() const;
EmbeddedDictionaries & getEmbeddedDictionaries();
ExternalDictionariesLoader & getExternalDictionariesLoader();
ExternalDictionariesLoader & getExternalDictionariesLoaderUnlocked();
ExternalUserDefinedExecutableFunctionsLoader & getExternalUserDefinedExecutableFunctionsLoader();
ExternalUserDefinedExecutableFunctionsLoader & getExternalUserDefinedExecutableFunctionsLoaderUnlocked();
const EmbeddedDictionaries & getEmbeddedDictionaries() const;
EmbeddedDictionaries & getEmbeddedDictionaries();
void tryCreateEmbeddedDictionaries(const Poco::Util::AbstractConfiguration & config) const;
void loadOrReloadDictionaries(const Poco::Util::AbstractConfiguration & config);
const ExternalUserDefinedExecutableFunctionsLoader & getExternalUserDefinedExecutableFunctionsLoader() const;
ExternalUserDefinedExecutableFunctionsLoader & getExternalUserDefinedExecutableFunctionsLoader();
ExternalUserDefinedExecutableFunctionsLoader & getExternalUserDefinedExecutableFunctionsLoaderUnlocked();
const IUserDefinedSQLObjectsLoader & getUserDefinedSQLObjectsLoader() const;
IUserDefinedSQLObjectsLoader & getUserDefinedSQLObjectsLoader();
void loadOrReloadUserDefinedExecutableFunctions(const Poco::Util::AbstractConfiguration & config);
#if USE_NLP

View File

@ -1,16 +1,11 @@
#include <Interpreters/InterpreterCreateFunctionQuery.h>
#include <Access/ContextAccess.h>
#include <Functions/UserDefined/IUserDefinedSQLObjectsLoader.h>
#include <Functions/UserDefined/UserDefinedSQLFunctionFactory.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/FunctionNameNormalizer.h>
#include <Interpreters/UserDefinedSQLFunctionFactory.h>
#include <Interpreters/UserDefinedSQLObjectsLoader.h>
#include <Interpreters/executeDDLQueryOnCluster.h>
#include <Parsers/ASTCreateFunctionQuery.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
namespace DB
@ -18,13 +13,11 @@ namespace DB
namespace ErrorCodes
{
extern const int CANNOT_CREATE_RECURSIVE_FUNCTION;
extern const int UNSUPPORTED_METHOD;
extern const int INCORRECT_QUERY;
}
BlockIO InterpreterCreateFunctionQuery::execute()
{
FunctionNameNormalizer().visit(query_ptr.get());
ASTCreateFunctionQuery & create_function_query = query_ptr->as<ASTCreateFunctionQuery &>();
AccessRightsElements access_rights_elements;
@ -33,80 +26,27 @@ BlockIO InterpreterCreateFunctionQuery::execute()
if (create_function_query.or_replace)
access_rights_elements.emplace_back(AccessType::DROP_FUNCTION);
auto current_context = getContext();
if (!create_function_query.cluster.empty())
{
if (current_context->getUserDefinedSQLObjectsLoader().isReplicated())
throw Exception(ErrorCodes::INCORRECT_QUERY, "ON CLUSTER is not allowed because used-defined functions are replicated automatically");
DDLQueryOnClusterParams params;
params.access_to_check = std::move(access_rights_elements);
return executeDDLQueryOnCluster(query_ptr, getContext(), params);
return executeDDLQueryOnCluster(query_ptr, current_context, params);
}
auto current_context = getContext();
current_context->checkAccess(access_rights_elements);
auto & user_defined_function_factory = UserDefinedSQLFunctionFactory::instance();
auto function_name = create_function_query.getFunctionName();
bool throw_if_exists = !create_function_query.if_not_exists && !create_function_query.or_replace;
bool replace_if_exists = create_function_query.or_replace;
bool if_not_exists = create_function_query.if_not_exists;
bool replace = create_function_query.or_replace;
create_function_query.if_not_exists = false;
create_function_query.or_replace = false;
validateFunction(create_function_query.function_core, function_name);
user_defined_function_factory.registerFunction(current_context, function_name, query_ptr, replace, if_not_exists, persist_function);
UserDefinedSQLFunctionFactory::instance().registerFunction(current_context, function_name, query_ptr, throw_if_exists, replace_if_exists);
return {};
}
void InterpreterCreateFunctionQuery::validateFunction(ASTPtr function, const String & name)
{
ASTFunction * lambda_function = function->as<ASTFunction>();
if (!lambda_function)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Expected function, got: {}", function->formatForErrorMessage());
auto & lambda_function_expression_list = lambda_function->arguments->children;
if (lambda_function_expression_list.size() != 2)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Lambda must have arguments and body");
const ASTFunction * tuple_function_arguments = lambda_function_expression_list[0]->as<ASTFunction>();
if (!tuple_function_arguments || !tuple_function_arguments->arguments)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Lambda must have valid arguments");
std::unordered_set<String> arguments;
for (const auto & argument : tuple_function_arguments->arguments->children)
{
const auto * argument_identifier = argument->as<ASTIdentifier>();
if (!argument_identifier)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Lambda argument must be identifier");
const auto & argument_name = argument_identifier->name();
auto [_, inserted] = arguments.insert(argument_name);
if (!inserted)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Identifier {} already used as function parameter", argument_name);
}
ASTPtr function_body = lambda_function_expression_list[1];
if (!function_body)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Lambda must have valid function body");
validateFunctionRecursiveness(function_body, name);
}
void InterpreterCreateFunctionQuery::validateFunctionRecursiveness(ASTPtr node, const String & function_to_create)
{
for (const auto & child : node->children)
{
auto function_name_opt = tryGetFunctionName(child);
if (function_name_opt && function_name_opt.value() == function_to_create)
throw Exception(ErrorCodes::CANNOT_CREATE_RECURSIVE_FUNCTION, "You cannot create recursive function");
validateFunctionRecursiveness(child, function_to_create);
}
}
}

View File

@ -8,24 +8,18 @@ namespace DB
class Context;
class InterpreterCreateFunctionQuery : public IInterpreter, WithContext
class InterpreterCreateFunctionQuery : public IInterpreter, WithMutableContext
{
public:
InterpreterCreateFunctionQuery(const ASTPtr & query_ptr_, ContextPtr context_, bool persist_function_)
: WithContext(context_)
, query_ptr(query_ptr_)
, persist_function(persist_function_) {}
InterpreterCreateFunctionQuery(const ASTPtr & query_ptr_, ContextMutablePtr context_)
: WithMutableContext(context_), query_ptr(query_ptr_)
{
}
BlockIO execute() override;
void setInternal(bool internal_);
private:
static void validateFunction(ASTPtr function, const String & name);
static void validateFunctionRecursiveness(ASTPtr node, const String & function_to_create);
ASTPtr query_ptr;
bool persist_function;
};
}

View File

@ -1,17 +1,22 @@
#include <Parsers/ASTDropFunctionQuery.h>
#include <Access/ContextAccess.h>
#include <Functions/UserDefined/IUserDefinedSQLObjectsLoader.h>
#include <Functions/UserDefined/UserDefinedSQLFunctionFactory.h>
#include <Interpreters/Context.h>
#include <Interpreters/FunctionNameNormalizer.h>
#include <Interpreters/InterpreterDropFunctionQuery.h>
#include <Interpreters/UserDefinedSQLObjectsLoader.h>
#include <Interpreters/UserDefinedSQLFunctionFactory.h>
#include <Interpreters/executeDDLQueryOnCluster.h>
namespace DB
{
namespace ErrorCodes
{
extern const int INCORRECT_QUERY;
}
BlockIO InterpreterDropFunctionQuery::execute()
{
FunctionNameNormalizer().visit(query_ptr.get());
@ -20,17 +25,23 @@ BlockIO InterpreterDropFunctionQuery::execute()
AccessRightsElements access_rights_elements;
access_rights_elements.emplace_back(AccessType::DROP_FUNCTION);
auto current_context = getContext();
if (!drop_function_query.cluster.empty())
{
if (current_context->getUserDefinedSQLObjectsLoader().isReplicated())
throw Exception(ErrorCodes::INCORRECT_QUERY, "ON CLUSTER is not allowed because used-defined functions are replicated automatically");
DDLQueryOnClusterParams params;
params.access_to_check = std::move(access_rights_elements);
return executeDDLQueryOnCluster(query_ptr, getContext(), params);
return executeDDLQueryOnCluster(query_ptr, current_context, params);
}
auto current_context = getContext();
current_context->checkAccess(access_rights_elements);
UserDefinedSQLFunctionFactory::instance().unregisterFunction(current_context, drop_function_query.function_name, drop_function_query.if_exists);
bool throw_if_not_exists = !drop_function_query.if_exists;
UserDefinedSQLFunctionFactory::instance().unregisterFunction(current_context, drop_function_query.function_name, throw_if_not_exists);
return {};
}

View File

@ -296,7 +296,7 @@ std::unique_ptr<IInterpreter> InterpreterFactory::get(ASTPtr & query, ContextMut
}
else if (query->as<ASTCreateFunctionQuery>())
{
return std::make_unique<InterpreterCreateFunctionQuery>(query, context, true /*persist_function*/);
return std::make_unique<InterpreterCreateFunctionQuery>(query, context);
}
else if (query->as<ASTDropFunctionQuery>())
{

View File

@ -12,7 +12,7 @@
#include <Interpreters/Context.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/ExternalDictionariesLoader.h>
#include <Interpreters/ExternalUserDefinedExecutableFunctionsLoader.h>
#include <Functions/UserDefined/ExternalUserDefinedExecutableFunctionsLoader.h>
#include <Interpreters/EmbeddedDictionaries.h>
#include <Interpreters/ActionLocksManager.h>
#include <Interpreters/InterpreterDropQuery.h>

View File

@ -23,7 +23,6 @@
#include <Interpreters/Context.h>
#include <Interpreters/ExternalDictionariesLoader.h>
#include <Interpreters/GatherFunctionQuantileVisitor.h>
#include <Interpreters/UserDefinedExecutableFunctionFactory.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h>
@ -35,6 +34,7 @@
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Functions/FunctionFactory.h>
#include <Functions/UserDefined/UserDefinedExecutableFunctionFactory.h>
#include <Storages/IStorage.h>
#include <Interpreters/RewriteSumIfFunctionVisitor.h>

View File

@ -24,13 +24,14 @@
#include <Interpreters/TranslateQualifiedNamesVisitor.h>
#include <Interpreters/TreeOptimizer.h>
#include <Interpreters/TreeRewriter.h>
#include <Interpreters/UserDefinedSQLFunctionFactory.h>
#include <Interpreters/UserDefinedSQLFunctionVisitor.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/getTableExpressions.h>
#include <Interpreters/replaceAliasColumnsInQuery.h>
#include <Interpreters/replaceForPositionalArguments.h>
#include <Functions/UserDefined/UserDefinedSQLFunctionFactory.h>
#include <Functions/UserDefined/UserDefinedSQLFunctionVisitor.h>
#include <Parsers/IAST_fwd.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h>

View File

@ -1,168 +0,0 @@
#include "UserDefinedSQLFunctionFactory.h"
#include <Common/quoteString.h>
#include <Functions/FunctionFactory.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <Interpreters/UserDefinedSQLObjectsLoader.h>
#include <Interpreters/UserDefinedExecutableFunctionFactory.h>
#include <Interpreters/Context.h>
namespace DB
{
namespace ErrorCodes
{
extern const int FUNCTION_ALREADY_EXISTS;
extern const int UNKNOWN_FUNCTION;
extern const int CANNOT_DROP_FUNCTION;
}
UserDefinedSQLFunctionFactory & UserDefinedSQLFunctionFactory::instance()
{
static UserDefinedSQLFunctionFactory result;
return result;
}
void UserDefinedSQLFunctionFactory::registerFunction(ContextPtr context, const String & function_name, ASTPtr create_function_query, bool replace, bool if_not_exists, bool persist)
{
if (FunctionFactory::instance().hasNameOrAlias(function_name))
{
if (if_not_exists)
return;
throw Exception(ErrorCodes::FUNCTION_ALREADY_EXISTS, "The function '{}' already exists", function_name);
}
if (AggregateFunctionFactory::instance().hasNameOrAlias(function_name))
{
if (if_not_exists)
return;
throw Exception(ErrorCodes::FUNCTION_ALREADY_EXISTS, "The aggregate function '{}' already exists", function_name);
}
if (UserDefinedExecutableFunctionFactory::instance().has(function_name, context))
{
if (if_not_exists)
return;
throw Exception(ErrorCodes::FUNCTION_ALREADY_EXISTS, "User defined executable function '{}' already exists", function_name);
}
std::lock_guard lock(mutex);
auto [it, inserted] = function_name_to_create_query.emplace(function_name, create_function_query);
if (!inserted)
{
if (if_not_exists)
return;
if (replace)
it->second = create_function_query;
else
throw Exception(ErrorCodes::FUNCTION_ALREADY_EXISTS,
"The function name '{}' is not unique",
function_name);
}
if (persist)
{
try
{
UserDefinedSQLObjectsLoader::instance().storeObject(context, UserDefinedSQLObjectType::Function, function_name, *create_function_query, replace);
}
catch (Exception & exception)
{
function_name_to_create_query.erase(it);
exception.addMessage(fmt::format("while storing user defined function {} on disk", backQuote(function_name)));
throw;
}
}
}
void UserDefinedSQLFunctionFactory::unregisterFunction(ContextPtr context, const String & function_name, bool if_exists)
{
if (FunctionFactory::instance().hasNameOrAlias(function_name) ||
AggregateFunctionFactory::instance().hasNameOrAlias(function_name))
throw Exception(ErrorCodes::CANNOT_DROP_FUNCTION, "Cannot drop system function '{}'", function_name);
if (UserDefinedExecutableFunctionFactory::instance().has(function_name, context))
throw Exception(ErrorCodes::CANNOT_DROP_FUNCTION, "Cannot drop user defined executable function '{}'", function_name);
std::lock_guard lock(mutex);
auto it = function_name_to_create_query.find(function_name);
if (it == function_name_to_create_query.end())
{
if (if_exists)
return;
throw Exception(ErrorCodes::UNKNOWN_FUNCTION,
"The function name '{}' is not registered",
function_name);
}
try
{
UserDefinedSQLObjectsLoader::instance().removeObject(context, UserDefinedSQLObjectType::Function, function_name);
}
catch (Exception & exception)
{
exception.addMessage(fmt::format("while removing user defined function {} from disk", backQuote(function_name)));
throw;
}
function_name_to_create_query.erase(it);
}
ASTPtr UserDefinedSQLFunctionFactory::get(const String & function_name) const
{
std::lock_guard lock(mutex);
auto it = function_name_to_create_query.find(function_name);
if (it == function_name_to_create_query.end())
throw Exception(ErrorCodes::UNKNOWN_FUNCTION,
"The function name '{}' is not registered",
function_name);
return it->second;
}
ASTPtr UserDefinedSQLFunctionFactory::tryGet(const std::string & function_name) const
{
std::lock_guard lock(mutex);
auto it = function_name_to_create_query.find(function_name);
if (it == function_name_to_create_query.end())
return nullptr;
return it->second;
}
bool UserDefinedSQLFunctionFactory::has(const String & function_name) const
{
return tryGet(function_name) != nullptr;
}
std::vector<std::string> UserDefinedSQLFunctionFactory::getAllRegisteredNames() const
{
std::vector<std::string> registered_names;
std::lock_guard lock(mutex);
registered_names.reserve(function_name_to_create_query.size());
for (const auto & [name, _] : function_name_to_create_query)
registered_names.emplace_back(name);
return registered_names;
}
bool UserDefinedSQLFunctionFactory::empty() const
{
std::lock_guard lock(mutex);
return function_name_to_create_query.empty();
}
}

View File

@ -1,54 +0,0 @@
#pragma once
#include <unordered_map>
#include <mutex>
#include <Common/NamePrompter.h>
#include <Parsers/ASTCreateFunctionQuery.h>
#include <Interpreters/Context_fwd.h>
namespace DB
{
/// Factory for SQLUserDefinedFunctions
class UserDefinedSQLFunctionFactory : public IHints<1, UserDefinedSQLFunctionFactory>
{
public:
static UserDefinedSQLFunctionFactory & instance();
/** Register function for function_name in factory for specified create_function_query.
* If function exists and if_not_exists = false and replace = false throws exception.
* If replace = true and sql user defined function with function_name already exists replace it with create_function_query.
* If persist = true persist function on disk.
*/
void registerFunction(ContextPtr context, const String & function_name, ASTPtr create_function_query, bool replace, bool if_not_exists, bool persist);
/** Unregister function for function_name.
* If if_exists = true then do not throw exception if function is not registered.
* If if_exists = false then throw exception if function is not registered.
*/
void unregisterFunction(ContextPtr context, const String & function_name, bool if_exists);
/// Get function create query for function_name. If no function registered with function_name throws exception.
ASTPtr get(const String & function_name) const;
/// Get function create query for function_name. If no function registered with function_name return nullptr.
ASTPtr tryGet(const String & function_name) const;
/// Check if function with function_name registered.
bool has(const String & function_name) const;
/// Get all user defined functions registered names.
std::vector<String> getAllRegisteredNames() const override;
/// Check whether any UDFs have been registered
bool empty() const;
private:
std::unordered_map<String, ASTPtr> function_name_to_create_query;
mutable std::mutex mutex;
};
}

View File

@ -1,184 +0,0 @@
#include "UserDefinedSQLObjectsLoader.h"
#include <filesystem>
#include <Common/escapeForFileName.h>
#include <Common/quoteString.h>
#include <Common/StringUtils/StringUtils.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterCreateFunctionQuery.h>
#include <Parsers/parseQuery.h>
#include <Parsers/ASTCreateFunctionQuery.h>
#include <Parsers/formatAST.h>
#include <Parsers/ParserCreateFunctionQuery.h>
#include <Poco/DirectoryIterator.h>
#include <Poco/Logger.h>
#include <Common/logger_useful.h>
namespace DB
{
namespace ErrorCodes
{
extern const int OBJECT_ALREADY_STORED_ON_DISK;
extern const int OBJECT_WAS_NOT_STORED_ON_DISK;
}
UserDefinedSQLObjectsLoader & UserDefinedSQLObjectsLoader::instance()
{
static UserDefinedSQLObjectsLoader ret;
return ret;
}
UserDefinedSQLObjectsLoader::UserDefinedSQLObjectsLoader()
: log(&Poco::Logger::get("UserDefinedSQLObjectsLoader"))
{}
void UserDefinedSQLObjectsLoader::loadUserDefinedObject(ContextPtr context, UserDefinedSQLObjectType object_type, std::string_view name, const String & path)
{
auto name_ref = StringRef(name.data(), name.size());
LOG_DEBUG(log, "Loading user defined object {} from file {}", backQuote(name_ref), path);
/// There is .sql file with user defined object creation statement.
ReadBufferFromFile in(path);
String object_create_query;
readStringUntilEOF(object_create_query, in);
try
{
switch (object_type)
{
case UserDefinedSQLObjectType::Function:
{
ParserCreateFunctionQuery parser;
ASTPtr ast = parseQuery(
parser,
object_create_query.data(),
object_create_query.data() + object_create_query.size(),
"in file " + path,
0,
context->getSettingsRef().max_parser_depth);
InterpreterCreateFunctionQuery interpreter(ast, context, false /*persist_function*/);
interpreter.execute();
}
}
}
catch (Exception & e)
{
e.addMessage(fmt::format("while loading user defined objects {} from path {}", backQuote(name_ref), path));
throw;
}
}
void UserDefinedSQLObjectsLoader::loadObjects(ContextPtr context)
{
if (unlikely(!enable_persistence))
return;
LOG_DEBUG(log, "Loading user defined objects");
String dir_path = context->getUserDefinedPath();
Poco::DirectoryIterator dir_end;
for (Poco::DirectoryIterator it(dir_path); it != dir_end; ++it)
{
if (it->isDirectory())
continue;
const std::string & file_name = it.name();
/// For '.svn', '.gitignore' directory and similar.
if (file_name.at(0) == '.')
continue;
if (!startsWith(file_name, "function_") || !endsWith(file_name, ".sql"))
continue;
std::string_view object_name = file_name;
object_name.remove_prefix(strlen("function_"));
object_name.remove_suffix(strlen(".sql"));
if (object_name.empty())
continue;
loadUserDefinedObject(context, UserDefinedSQLObjectType::Function, object_name, dir_path + it.name());
}
}
void UserDefinedSQLObjectsLoader::storeObject(ContextPtr context, UserDefinedSQLObjectType object_type, const String & object_name, const IAST & ast, bool replace)
{
if (unlikely(!enable_persistence))
return;
String dir_path = context->getUserDefinedPath();
String file_path;
switch (object_type)
{
case UserDefinedSQLObjectType::Function:
{
file_path = dir_path + "function_" + escapeForFileName(object_name) + ".sql";
}
}
if (!replace && std::filesystem::exists(file_path))
throw Exception(ErrorCodes::OBJECT_ALREADY_STORED_ON_DISK, "User defined object {} already stored on disk", backQuote(file_path));
LOG_DEBUG(log, "Storing object {} to file {}", backQuote(object_name), file_path);
WriteBufferFromOwnString create_statement_buf;
formatAST(ast, create_statement_buf, false);
writeChar('\n', create_statement_buf);
String create_statement = create_statement_buf.str();
WriteBufferFromFile out(file_path, create_statement.size());
writeString(create_statement, out);
out.next();
if (context->getSettingsRef().fsync_metadata)
out.sync();
out.close();
LOG_DEBUG(log, "Stored object {}", backQuote(object_name));
}
void UserDefinedSQLObjectsLoader::removeObject(ContextPtr context, UserDefinedSQLObjectType object_type, const String & object_name)
{
if (unlikely(!enable_persistence))
return;
String dir_path = context->getUserDefinedPath();
LOG_DEBUG(log, "Removing file for user defined object {} from {}", backQuote(object_name), dir_path);
std::filesystem::path file_path;
switch (object_type)
{
case UserDefinedSQLObjectType::Function:
{
file_path = dir_path + "function_" + escapeForFileName(object_name) + ".sql";
}
}
if (!std::filesystem::exists(file_path))
throw Exception(ErrorCodes::OBJECT_WAS_NOT_STORED_ON_DISK, "User defined object {} was not stored on disk", backQuote(file_path.string()));
std::filesystem::remove(file_path);
}
void UserDefinedSQLObjectsLoader::enable(bool enable_persistence_)
{
enable_persistence = enable_persistence_;
}
}

View File

@ -1,37 +0,0 @@
#pragma once
#include <Interpreters/Context_fwd.h>
#include <Parsers/IAST.h>
#include <boost/noncopyable.hpp>
namespace DB
{
enum class UserDefinedSQLObjectType
{
Function
};
class UserDefinedSQLObjectsLoader : private boost::noncopyable
{
public:
static UserDefinedSQLObjectsLoader & instance();
UserDefinedSQLObjectsLoader();
void loadObjects(ContextPtr context);
void storeObject(ContextPtr context, UserDefinedSQLObjectType object_type, const String & object_name, const IAST & ast, bool replace);
void removeObject(ContextPtr context, UserDefinedSQLObjectType object_type, const String & object_name);
/// For ClickHouse local if path is not set we can disable loader.
void enable(bool enable_persistence);
private:
void loadUserDefinedObject(ContextPtr context, UserDefinedSQLObjectType object_type, std::string_view object_name, const String & file_path);
Poco::Logger * log;
bool enable_persistence = true;
};
}

View File

@ -549,15 +549,9 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
if (insert_query)
{
if (insert_query->table_id)
{
insert_query->table_id = context->resolveStorageID(insert_query->table_id);
LOG_DEBUG(&Poco::Logger::get("executeQuery"), "2) database: {}", insert_query->table_id.getDatabaseName());
}
else if (auto table = insert_query->getTable(); !table.empty())
{
insert_query->table_id = context->resolveStorageID(StorageID{insert_query->getDatabase(), table});
LOG_DEBUG(&Poco::Logger::get("executeQuery"), "2) database: {}", insert_query->table_id.getDatabaseName());
}
}
if (insert_query && insert_query->select)

View File

@ -214,7 +214,7 @@ bool JSONEachRowRowInputFormat::readRow(MutableColumns & columns, RowReadExtensi
seen_columns.assign(num_columns, false);
nested_prefix_length = 0;
readRowStart();
readRowStart(columns);
readJSONObject(columns);
const auto & header = getPort().getHeader();

View File

@ -48,7 +48,7 @@ private:
void readJSONObject(MutableColumns & columns);
void readNestedData(const String & name, MutableColumns & columns);
virtual void readRowStart() {}
virtual void readRowStart(MutableColumns &) {}
virtual bool checkEndOfData(bool is_first_row);
const FormatSettings format_settings;
@ -66,10 +66,6 @@ private:
/// the nested column names are 'n.i' and 'n.s' and the nested prefix is 'n.'
size_t nested_prefix_length = 0;
/// Set of columns for which the values were read. The rest will be filled with default values.
std::vector<UInt8> read_columns;
/// Set of columns which already met in row. Exception is thrown if there are more than one column with the same name.
std::vector<UInt8> seen_columns;
/// These sets may be different, because if null_as_default=1 read_columns[i] will be false and seen_columns[i] will be true
/// for row like {..., "non-nullable column name" : null, ...}
@ -85,6 +81,12 @@ private:
bool yield_strings;
protected:
/// Set of columns for which the values were read. The rest will be filled with default values.
std::vector<UInt8> read_columns;
/// Set of columns which already met in row. Exception is thrown if there are more than one column with the same name.
std::vector<UInt8> seen_columns;
/// This flag is needed to know if data is in square brackets.
bool data_in_square_brackets = false;
};

View File

@ -2,12 +2,39 @@
#include <Formats/JSONUtils.h>
#include <Formats/FormatFactory.h>
#include <Formats/EscapingRuleUtils.h>
#include <DataTypes/DataTypeString.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
std::optional<size_t> getColumnIndexForJSONObjectEachRowObjectName(const Block & header, const FormatSettings & format_settings)
{
if (format_settings.json_object_each_row.column_for_object_name.empty())
return std::nullopt;
if (!header.has(format_settings.json_object_each_row.column_for_object_name))
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Column name '{}' from setting format_json_object_each_row_column_for_object_name doesn't exists in header",
format_settings.json_object_each_row.column_for_object_name);
size_t index = header.getPositionByName(format_settings.json_object_each_row.column_for_object_name);
if (!isStringOrFixedString(header.getDataTypes()[index]))
throw Exception(
ErrorCodes::BAD_ARGUMENTS,
"Column '{}' from setting json_object_each_row_column_for_object_name must have String type",
format_settings.json_object_each_row.column_for_object_name);
return index;
}
JSONObjectEachRowInputFormat::JSONObjectEachRowInputFormat(ReadBuffer & in_, const Block & header_, Params params_, const FormatSettings & format_settings_)
: JSONEachRowRowInputFormat(in_, header_, params_, format_settings_, false)
: JSONEachRowRowInputFormat(in_, header_, params_, format_settings_, false), field_index_for_object_name(getColumnIndexForJSONObjectEachRowObjectName(header_, format_settings_))
{
}
@ -16,9 +43,15 @@ void JSONObjectEachRowInputFormat::readPrefix()
JSONUtils::skipObjectStart(*in);
}
void JSONObjectEachRowInputFormat::readRowStart()
void JSONObjectEachRowInputFormat::readRowStart(MutableColumns & columns)
{
JSONUtils::readFieldName(*in);
auto object_name = JSONUtils::readFieldName(*in);
if (field_index_for_object_name)
{
columns[*field_index_for_object_name]->insertData(object_name.data(), object_name.size());
seen_columns[*field_index_for_object_name] = true;
read_columns[*field_index_for_object_name] = true;
}
}
bool JSONObjectEachRowInputFormat::checkEndOfData(bool is_first_row)
@ -30,7 +63,6 @@ bool JSONObjectEachRowInputFormat::checkEndOfData(bool is_first_row)
return false;
}
JSONObjectEachRowSchemaReader::JSONObjectEachRowSchemaReader(ReadBuffer & in_, const FormatSettings & format_settings_)
: IRowWithNamesSchemaReader(in_, format_settings_)
{
@ -53,7 +85,10 @@ NamesAndTypesList JSONObjectEachRowSchemaReader::readRowAndGetNamesAndDataTypes(
JSONUtils::skipComma(in);
JSONUtils::readFieldName(in);
return JSONUtils::readRowAndGetNamesAndDataTypesForJSONEachRow(in, format_settings, false);
auto names_and_types = JSONUtils::readRowAndGetNamesAndDataTypesForJSONEachRow(in, format_settings, false);
if (!format_settings.json_object_each_row.column_for_object_name.empty())
names_and_types.emplace_front(format_settings.json_object_each_row.column_for_object_name, std::make_shared<DataTypeString>());
return names_and_types;
}
void JSONObjectEachRowSchemaReader::transformTypesIfNeeded(DataTypePtr & type, DataTypePtr & new_type)
@ -83,7 +118,8 @@ void registerJSONObjectEachRowSchemaReader(FormatFactory & factory)
});
factory.registerAdditionalInfoForSchemaCacheGetter("JSONObjectEachRow", [](const FormatSettings & settings)
{
return getAdditionalFormatInfoByEscapingRule(settings, FormatSettings::EscapingRule::JSON);
return getAdditionalFormatInfoByEscapingRule(settings, FormatSettings::EscapingRule::JSON)
+ fmt::format(", format_json_object_each_row_column_for_object_name={}", settings.json_object_each_row.column_for_object_name);
});
}

View File

@ -27,8 +27,10 @@ public:
private:
void readPrefix() override;
void readSuffix() override {}
void readRowStart() override;
void readRowStart(MutableColumns & columns) override;
bool checkEndOfData(bool is_first_row) override;
std::optional<size_t> field_index_for_object_name;
};
@ -44,4 +46,6 @@ private:
bool first_row = true;
};
std::optional<size_t> getColumnIndexForJSONObjectEachRowObjectName(const Block & header, const FormatSettings & settings);
}

View File

@ -1,4 +1,5 @@
#include <Processors/Formats/Impl/JSONObjectEachRowRowOutputFormat.h>
#include <Processors/Formats/Impl/JSONObjectEachRowRowInputFormat.h>
#include <Formats/JSONUtils.h>
#include <IO/WriteHelpers.h>
@ -6,10 +7,38 @@ namespace DB
{
JSONObjectEachRowRowOutputFormat::JSONObjectEachRowRowOutputFormat(WriteBuffer & out_, const Block & header_, const RowOutputFormatParams & params_, const FormatSettings & settings_)
: JSONEachRowRowOutputFormat(out_, header_, params_, settings_)
: JSONEachRowRowOutputFormat(out_, header_, params_, settings_), field_index_for_object_name(getColumnIndexForJSONObjectEachRowObjectName(header_, settings_))
{
}
void JSONObjectEachRowRowOutputFormat::writeField(const IColumn & column, const ISerialization & serialization, size_t row)
{
if (field_number == field_index_for_object_name)
{
++field_number;
return;
}
JSONEachRowRowOutputFormat::writeField(column, serialization, row);
}
void JSONObjectEachRowRowOutputFormat::write(const Columns & columns, size_t row)
{
if (field_index_for_object_name)
object_name = columns[*field_index_for_object_name]->getDataAt(row).toString();
else
object_name = "row_" + std::to_string(row + 1);
IRowOutputFormat::write(columns, row);
}
void JSONObjectEachRowRowOutputFormat::writeFieldDelimiter()
{
/// We should not write comma before column that is used for
/// object name and also after it if it's in the first place
if (field_number != field_index_for_object_name && !(field_index_for_object_name == 0 && field_number == 1))
JSONEachRowRowOutputFormat::writeFieldDelimiter();
}
void JSONObjectEachRowRowOutputFormat::writePrefix()
{
JSONUtils::writeObjectStart(*ostr);
@ -17,9 +46,7 @@ void JSONObjectEachRowRowOutputFormat::writePrefix()
void JSONObjectEachRowRowOutputFormat::writeRowStartDelimiter()
{
++row_num;
String title = "row_" + std::to_string(row_num);
JSONUtils::writeCompactObjectStart(*ostr, 1, title.c_str());
JSONUtils::writeCompactObjectStart(*ostr, 1, object_name.c_str());
}
void JSONObjectEachRowRowOutputFormat::writeRowEndDelimiter()
@ -52,6 +79,7 @@ void registerOutputFormatJSONObjectEachRow(FormatFactory & factory)
return std::make_shared<JSONObjectEachRowRowOutputFormat>(buf, sample, params, settings);
});
factory.markOutputFormatSupportsParallelFormatting("JSONObjectEachRow");
factory.markFormatHasNoAppendSupport("JSONObjectEachRow");
}
}

View File

@ -29,6 +29,9 @@ public:
String getName() const override { return "JSONObjectEachRowRowOutputFormat"; }
private:
void write(const Columns & columns, size_t row) override;
void writeField(const IColumn & column, const ISerialization & serialization, size_t row) override;
void writeFieldDelimiter() override;
void writeRowStartDelimiter() override;
void writeRowEndDelimiter() override;
void writeRowBetweenDelimiter() override;
@ -36,7 +39,8 @@ private:
void writePrefix() override;
void writeSuffix() override;
size_t row_num = 0;
std::optional<size_t> field_index_for_object_name;
String object_name;
};
}

View File

@ -6,18 +6,9 @@
#include <Functions/FunctionFactory.h>
#include <Functions/IFunction.h>
#include <Interpreters/Context.h>
#include <Interpreters/UserDefinedSQLFunctionFactory.h>
#include <Interpreters/UserDefinedExecutableFunctionFactory.h>
#include <Functions/UserDefined/UserDefinedSQLFunctionFactory.h>
#include <Functions/UserDefined/UserDefinedExecutableFunctionFactory.h>
#include <Storages/System/StorageSystemFunctions.h>
#include <Common/escapeForFileName.h>
#include <Backups/BackupEntriesCollector.h>
#include <Backups/BackupEntryFromMemory.h>
#include <Backups/RestorerFromBackup.h>
#include <Backups/IBackup.h>
#include <Parsers/ParserCreateFunctionQuery.h>
#include <Parsers/parseQuery.h>
namespace fs = std::filesystem;
namespace DB
@ -30,11 +21,6 @@ enum class FunctionOrigin : Int8
EXECUTABLE_USER_DEFINED = 2
};
namespace ErrorCodes
{
extern const int CANNOT_RESTORE_TABLE;
}
namespace
{
template <typename Factory>
@ -134,63 +120,12 @@ void StorageSystemFunctions::fillData(MutableColumns & res_columns, ContextPtr c
void StorageSystemFunctions::backupData(BackupEntriesCollector & backup_entries_collector, const String & data_path_in_backup, const std::optional<ASTs> & /* partitions */)
{
const auto & user_defined_sql_functions_factory = UserDefinedSQLFunctionFactory::instance();
const auto & user_defined_sql_functions_names = user_defined_sql_functions_factory.getAllRegisteredNames();
fs::path data_path_in_backup_fs{data_path_in_backup};
for (const auto & function_name : user_defined_sql_functions_names)
{
auto ast = user_defined_sql_functions_factory.tryGet(function_name);
if (!ast)
continue;
backup_entries_collector.addBackupEntry(
data_path_in_backup_fs / (escapeForFileName(function_name) + ".sql"),
std::make_shared<BackupEntryFromMemory>(queryToString(ast)));
}
UserDefinedSQLFunctionFactory::instance().backup(backup_entries_collector, data_path_in_backup);
}
void StorageSystemFunctions::restoreDataFromBackup(RestorerFromBackup & restorer, const String & data_path_in_backup, const std::optional<ASTs> & /* partitions */)
{
auto backup = restorer.getBackup();
fs::path data_path_in_backup_fs{data_path_in_backup};
Strings filenames = backup->listFiles(data_path_in_backup);
for (const auto & filename : filenames)
{
if (!filename.ends_with(".sql"))
{
throw Exception(ErrorCodes::CANNOT_RESTORE_TABLE, "Cannot restore table {}: File name {} doesn't have the extension .sql",
getStorageID().getFullTableName(), String{data_path_in_backup_fs / filename});
}
}
auto & user_defined_sql_functions_factory = UserDefinedSQLFunctionFactory::instance();
const auto & restore_settings = restorer.getRestoreSettings();
auto context = restorer.getContext();
for (const auto & filename : filenames)
{
String escaped_function_name = filename.substr(0, filename.length() - strlen(".sql"));
String function_name = unescapeForFileName(escaped_function_name);
String filepath = data_path_in_backup_fs / filename;
auto function_def_entry = backup->readFile(filepath);
auto function_def_in = function_def_entry->getReadBuffer();
String function_def;
readStringUntilEOF(function_def, *function_def_in);
ParserCreateFunctionQuery parser;
ASTPtr ast = parseQuery(
parser,
function_def.data(),
function_def.data() + function_def.size(),
"in file " + filepath + " from backup " + backup->getName(),
0,
context->getSettingsRef().max_parser_depth);
bool replace = (restore_settings.create_function == RestoreUDFCreationMode::kReplace);
bool if_not_exists = (restore_settings.create_function == RestoreUDFCreationMode::kCreateIfNotExists);
user_defined_sql_functions_factory.registerFunction(context, function_name, ast, replace, if_not_exists, true);
}
UserDefinedSQLFunctionFactory::instance().restore(restorer, data_path_in_backup);
}
}

View File

@ -0,0 +1,20 @@
{
"name_0": {"number":"0"},
"name_1": {"number":"1"},
"name_2": {"number":"2"}
}
{
"name_0": {"number":"0","x":"1"},
"name_1": {"number":"1","x":"2"},
"name_2": {"number":"2","x":"3"}
}
{
"name_0": {"number":"0"},
"name_1": {"number":"1"},
"name_2": {"number":"2"}
}
name String
number Nullable(Int64)
name_0 0
name_1 1
name_2 2

View File

@ -0,0 +1,11 @@
-- Tags: no-fasttest, no-parallel
set format_json_object_each_row_column_for_object_name='name';
select number, concat('name_', toString(number)) as name from numbers(3) format JSONObjectEachRow;
select number, concat('name_', toString(number)) as name, number + 1 as x from numbers(3) format JSONObjectEachRow;
select concat('name_', toString(number)) as name, number from numbers(3) format JSONObjectEachRow;
insert into function file(02454_data.jsonobjecteachrow) select number, concat('name_', toString(number)) as name from numbers(3) settings engine_file_truncate_on_insert=1;
desc file(02454_data.jsonobjecteachrow);
select * from file(02454_data.jsonobjecteachrow);