From 030fe6e2728cd93f26d912376860222b7f7b7978 Mon Sep 17 00:00:00 2001 From: Amos Bird Date: Mon, 30 May 2022 16:53:55 +0800 Subject: [PATCH 01/12] Less flaky jbod rebalancer test --- tests/integration/test_jbod_balancer/test.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/integration/test_jbod_balancer/test.py b/tests/integration/test_jbod_balancer/test.py index 3807d6e1cea..0c6a0709971 100644 --- a/tests/integration/test_jbod_balancer/test.py +++ b/tests/integration/test_jbod_balancer/test.py @@ -161,11 +161,13 @@ def test_replicated_balanced_merge_fetch(start_cluster): p = Pool(20) def task(i): - print("Processing insert {}/{}".format(i, 200)) + print("Processing insert {}/{}".format(i, 80)) # around 1k per block node1.query( "insert into tbl select randConstant() % 2, randomPrintableASCII(16) from numbers(50)" ) + + # Fill jbod disks with garbage data node1.query( "insert into tmp1 select randConstant() % 2, randomPrintableASCII(16) from numbers(50)" ) @@ -179,7 +181,7 @@ def test_replicated_balanced_merge_fetch(start_cluster): "insert into tmp2 select randConstant() % 2, randomPrintableASCII(16) from numbers(50)" ) - p.map(task, range(200)) + p.map(task, range(80)) node2.query("SYSTEM SYNC REPLICA tbl", timeout=10) From cae2478b3fa87c7fdce1352b3fb3f3d025ccb794 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Wed, 1 Jun 2022 10:57:20 +0000 Subject: [PATCH 02/12] Revert "Merge pull request #37355 from ClickHouse/revert-37266-fix-mutations-with-object" This reverts commit a53cfa9fcad121830db211a5831c3eafc36fb1ac, reversing changes made to 9acb42fcdbdcaf0f9e03859140d26b3bc1506cbf. --- src/Interpreters/MutationsInterpreter.cpp | 6 ++++-- src/Storages/MergeTree/MergeTask.cpp | 1 - src/Storages/MergeTree/MutateTask.cpp | 15 ++++++++----- .../MergeTree/StorageFromMergeTreeDataPart.h | 17 +++++++++++++++ .../01825_type_json_mutations.reference | 7 +++++++ .../0_stateless/01825_type_json_mutations.sql | 21 +++++++++++++++++++ 6 files changed, 59 insertions(+), 8 deletions(-) create mode 100644 tests/queries/0_stateless/01825_type_json_mutations.reference create mode 100644 tests/queries/0_stateless/01825_type_json_mutations.sql diff --git a/src/Interpreters/MutationsInterpreter.cpp b/src/Interpreters/MutationsInterpreter.cpp index 99032dd9f10..a55de34efbc 100644 --- a/src/Interpreters/MutationsInterpreter.cpp +++ b/src/Interpreters/MutationsInterpreter.cpp @@ -758,7 +758,9 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run) ASTPtr MutationsInterpreter::prepareInterpreterSelectQuery(std::vector & prepared_stages, bool dry_run) { - NamesAndTypesList all_columns = metadata_snapshot->getColumns().getAllPhysical(); + auto storage_snapshot = storage->getStorageSnapshot(metadata_snapshot, context); + auto options = GetColumnsOptions(GetColumnsOptions::AllPhysical).withExtendedObjects(); + auto all_columns = storage_snapshot->getColumns(options); /// Next, for each stage calculate columns changed by this and previous stages. for (size_t i = 0; i < prepared_stages.size(); ++i) @@ -802,7 +804,7 @@ ASTPtr MutationsInterpreter::prepareInterpreterSelectQuery(std::vector & /// e.g. ALTER referencing the same table in scalar subquery bool execute_scalar_subqueries = !dry_run; auto syntax_result = TreeRewriter(context).analyze( - all_asts, all_columns, storage, storage->getStorageSnapshot(metadata_snapshot, context), + all_asts, all_columns, storage, storage_snapshot, false, true, execute_scalar_subqueries); if (execute_scalar_subqueries && context->hasQueryContext()) diff --git a/src/Storages/MergeTree/MergeTask.cpp b/src/Storages/MergeTree/MergeTask.cpp index 58bffaab34b..8732a3ed3e5 100644 --- a/src/Storages/MergeTree/MergeTask.cpp +++ b/src/Storages/MergeTree/MergeTask.cpp @@ -149,7 +149,6 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare() global_ctx->merging_columns, global_ctx->merging_column_names); - auto local_single_disk_volume = std::make_shared("volume_" + global_ctx->future_part->name, ctx->disk, 0); global_ctx->new_data_part = global_ctx->data->createPart( global_ctx->future_part->name, diff --git a/src/Storages/MergeTree/MutateTask.cpp b/src/Storages/MergeTree/MutateTask.cpp index c41a2d3d52c..fb8f4ba0518 100644 --- a/src/Storages/MergeTree/MutateTask.cpp +++ b/src/Storages/MergeTree/MutateTask.cpp @@ -624,7 +624,9 @@ struct MutationContext FutureMergedMutatedPartPtr future_part; MergeTreeData::DataPartPtr source_part; + StoragePtr storage_from_source_part; StorageMetadataPtr metadata_snapshot; + MutationCommandsConstPtr commands; time_t time_of_mutation; ContextPtr context; @@ -1367,6 +1369,11 @@ MutateTask::MutateTask( ctx->space_reservation = space_reservation_; ctx->storage_columns = metadata_snapshot_->getColumns().getAllPhysical(); ctx->txn = txn; + ctx->source_part = ctx->future_part->parts[0]; + ctx->storage_from_source_part = std::make_shared(ctx->source_part); + + auto storage_snapshot = ctx->storage_from_source_part->getStorageSnapshot(ctx->metadata_snapshot, context_); + extendObjectColumns(ctx->storage_columns, storage_snapshot->object_columns, /*with_subcolumns=*/ false); } @@ -1405,8 +1412,6 @@ bool MutateTask::prepare() "This is a bug.", toString(ctx->future_part->parts.size())); ctx->num_mutations = std::make_unique(CurrentMetrics::PartMutation); - ctx->source_part = ctx->future_part->parts[0]; - auto storage_from_source_part = std::make_shared(ctx->source_part); auto context_for_reading = Context::createCopy(ctx->context); context_for_reading->setSetting("max_streams_to_max_threads_ratio", 1); @@ -1417,13 +1422,13 @@ bool MutateTask::prepare() for (const auto & command : *ctx->commands) { - if (command.partition == nullptr || ctx->future_part->parts[0]->info.partition_id == ctx->data->getPartitionIDFromQuery( + if (command.partition == nullptr || ctx->source_part->info.partition_id == ctx->data->getPartitionIDFromQuery( command.partition, context_for_reading)) ctx->commands_for_part.emplace_back(command); } if (ctx->source_part->isStoredOnDisk() && !isStorageTouchedByMutations( - storage_from_source_part, ctx->metadata_snapshot, ctx->commands_for_part, Context::createCopy(context_for_reading))) + ctx->storage_from_source_part, ctx->metadata_snapshot, ctx->commands_for_part, Context::createCopy(context_for_reading))) { LOG_TRACE(ctx->log, "Part {} doesn't change up to mutation version {}", ctx->source_part->name, ctx->future_part->part_info.mutation); promise.set_value(ctx->data->cloneAndLoadDataPartOnSameDisk(ctx->source_part, "tmp_clone_", ctx->future_part->part_info, ctx->metadata_snapshot, ctx->txn, &ctx->hardlinked_files, false)); @@ -1441,7 +1446,7 @@ bool MutateTask::prepare() if (!ctx->for_interpreter.empty()) { ctx->interpreter = std::make_unique( - storage_from_source_part, ctx->metadata_snapshot, ctx->for_interpreter, context_for_reading, true); + ctx->storage_from_source_part, ctx->metadata_snapshot, ctx->for_interpreter, context_for_reading, true); ctx->materialized_indices = ctx->interpreter->grabMaterializedIndices(); ctx->materialized_projections = ctx->interpreter->grabMaterializedProjections(); ctx->mutation_kind = ctx->interpreter->getMutationKind(); diff --git a/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h b/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h index 5be20c9a2d5..a9bcb353b84 100644 --- a/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h +++ b/src/Storages/MergeTree/StorageFromMergeTreeDataPart.h @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -36,6 +37,20 @@ public: String getName() const override { return "FromMergeTreeDataPart"; } + StorageSnapshotPtr getStorageSnapshot( + const StorageMetadataPtr & metadata_snapshot, ContextPtr /*query_context*/) const override + { + const auto & storage_columns = metadata_snapshot->getColumns(); + if (!hasObjectColumns(storage_columns)) + return std::make_shared(*this, metadata_snapshot); + + auto object_columns = getObjectColumns( + parts.begin(), parts.end(), + storage_columns, [](const auto & part) -> const auto & { return part->getColumns(); }); + + return std::make_shared(*this, metadata_snapshot, object_columns); + } + Pipe read( const Names & column_names, const StorageSnapshotPtr & storage_snapshot, @@ -65,6 +80,8 @@ public: bool supportsIndexForIn() const override { return true; } + bool supportsDynamicSubcolumns() const override { return true; } + bool mayBenefitFromIndexForIn( const ASTPtr & left_in_operand, ContextPtr query_context, const StorageMetadataPtr & metadata_snapshot) const override { diff --git a/tests/queries/0_stateless/01825_type_json_mutations.reference b/tests/queries/0_stateless/01825_type_json_mutations.reference new file mode 100644 index 00000000000..da7c278497e --- /dev/null +++ b/tests/queries/0_stateless/01825_type_json_mutations.reference @@ -0,0 +1,7 @@ +1 q (1,2,[('aaa'),('bbb')]) +2 w (3,4,[('ccc')]) +3 e (5,6,[]) +1 q (1,2,[('aaa'),('bbb')]) +3 e (5,6,[]) +1 foo +3 foo diff --git a/tests/queries/0_stateless/01825_type_json_mutations.sql b/tests/queries/0_stateless/01825_type_json_mutations.sql new file mode 100644 index 00000000000..a16710bdbf7 --- /dev/null +++ b/tests/queries/0_stateless/01825_type_json_mutations.sql @@ -0,0 +1,21 @@ +-- Tags: no-fasttest + +DROP TABLE IF EXISTS t_json_mutations; + +SET allow_experimental_object_type = 1; +SET output_format_json_named_tuples_as_objects = 1; +SET mutations_sync = 2; + +CREATE TABLE t_json_mutations(id UInt32, s String, obj JSON) ENGINE = MergeTree ORDER BY id; + +INSERT INTO t_json_mutations VALUES (1, 'q', '{"k1": 1, "k2": 2, "k3": [{"k4": "aaa"}, {"k4": "bbb"}]}'); +INSERT INTO t_json_mutations VALUES (2, 'w', '{"k1": 3, "k2": 4, "k3": [{"k4": "ccc"}]}'); +INSERT INTO t_json_mutations VALUES (3, 'e', '{"k1": 5, "k2": 6}'); + +SELECT * FROM t_json_mutations ORDER BY id; +ALTER TABLE t_json_mutations DELETE WHERE id = 2; +SELECT * FROM t_json_mutations ORDER BY id; +ALTER TABLE t_json_mutations DROP COLUMN s, DROP COLUMN obj, ADD COLUMN t String DEFAULT 'foo'; +SELECT * FROM t_json_mutations ORDER BY id; + +DROP TABLE t_json_mutations; From 139264d881b18e3428f927ffaa4ef2be51d83754 Mon Sep 17 00:00:00 2001 From: Amos Bird Date: Wed, 1 Jun 2022 23:02:31 +0800 Subject: [PATCH 03/12] Less parallelism instead --- tests/integration/test_jbod_balancer/test.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/integration/test_jbod_balancer/test.py b/tests/integration/test_jbod_balancer/test.py index 0c6a0709971..e746698611a 100644 --- a/tests/integration/test_jbod_balancer/test.py +++ b/tests/integration/test_jbod_balancer/test.py @@ -158,10 +158,10 @@ def test_replicated_balanced_merge_fetch(start_cluster): node.query("create table tmp2 as tmp1") node2.query("alter table tbl modify setting always_fetch_merged_part = 1") - p = Pool(20) + p = Pool(5) def task(i): - print("Processing insert {}/{}".format(i, 80)) + print("Processing insert {}/{}".format(i, 200)) # around 1k per block node1.query( "insert into tbl select randConstant() % 2, randomPrintableASCII(16) from numbers(50)" @@ -181,7 +181,7 @@ def test_replicated_balanced_merge_fetch(start_cluster): "insert into tmp2 select randConstant() % 2, randomPrintableASCII(16) from numbers(50)" ) - p.map(task, range(80)) + p.map(task, range(200)) node2.query("SYSTEM SYNC REPLICA tbl", timeout=10) From 7e5ef97206e6406283f0f2533f626f18cc32f3c3 Mon Sep 17 00:00:00 2001 From: Ilya Yatsishin <2159081+qoega@users.noreply.github.com> Date: Fri, 3 Jun 2022 15:16:42 +0200 Subject: [PATCH 04/12] Space in {} shows it on final page --- docs/en/operations/settings/settings.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/operations/settings/settings.md b/docs/en/operations/settings/settings.md index 0928f5c4ee7..2f6ddc35fd5 100644 --- a/docs/en/operations/settings/settings.md +++ b/docs/en/operations/settings/settings.md @@ -241,7 +241,7 @@ Possible values: Default value: `1000`. -## glob_expansion_max_elements {#glob_expansion_max_elements } +## glob_expansion_max_elements {#glob_expansion_max_elements} Sets the maximum number of addresses generated from patterns for external storages and table functions (like [url](../../sql-reference/table-functions/url.md)) except the `remote` function. From 6c5ec68a7c78a0618965071b57da239727bd3103 Mon Sep 17 00:00:00 2001 From: Maksim Kita Date: Tue, 31 May 2022 22:27:49 +0200 Subject: [PATCH 05/12] Executable user defined functions support parameters --- src/Interpreters/ActionsVisitor.cpp | 24 ++++++- ...alUserDefinedExecutableFunctionsLoader.cpp | 23 ++++++ .../UserDefinedExecutableFunction.h | 7 ++ .../UserDefinedExecutableFunctionFactory.cpp | 71 ++++++++++++++++--- .../UserDefinedExecutableFunctionFactory.h | 8 +-- 5 files changed, 119 insertions(+), 14 deletions(-) diff --git a/src/Interpreters/ActionsVisitor.cpp b/src/Interpreters/ActionsVisitor.cpp index eacdd221d6e..2b2a29f832b 100644 --- a/src/Interpreters/ActionsVisitor.cpp +++ b/src/Interpreters/ActionsVisitor.cpp @@ -953,13 +953,33 @@ void ActionsMatcher::visit(const ASTFunction & node, const ASTPtr & ast, Data & if (AggregateFunctionFactory::instance().isAggregateFunctionName(node.name)) return; - FunctionOverloadResolverPtr function_builder = UserDefinedExecutableFunctionFactory::instance().tryGet(node.name, data.getContext()); + FunctionOverloadResolverPtr function_builder; + + auto current_context = data.getContext(); + + if (UserDefinedExecutableFunctionFactory::instance().has(node.name, current_context)) + { + Array parameters; + if (node.parameters) { + auto & node_parameters = node.parameters->children; + size_t parameters_size = node_parameters.size(); + parameters.resize(parameters_size); + + for (size_t i = 0; i < parameters_size; ++i) + { + ASTPtr literal = evaluateConstantExpressionAsLiteral(node_parameters[i], current_context); + parameters[i] = literal->as()->value; + } + } + + function_builder = UserDefinedExecutableFunctionFactory::instance().tryGet(node.name, current_context, parameters); + } if (!function_builder) { try { - function_builder = FunctionFactory::instance().get(node.name, data.getContext()); + function_builder = FunctionFactory::instance().get(node.name, current_context); } catch (Exception & e) { diff --git a/src/Interpreters/ExternalUserDefinedExecutableFunctionsLoader.cpp b/src/Interpreters/ExternalUserDefinedExecutableFunctionsLoader.cpp index e3d40033cff..641073f7164 100644 --- a/src/Interpreters/ExternalUserDefinedExecutableFunctionsLoader.cpp +++ b/src/Interpreters/ExternalUserDefinedExecutableFunctionsLoader.cpp @@ -17,6 +17,7 @@ namespace ErrorCodes { extern const int BAD_ARGUMENTS; extern const int FUNCTION_ALREADY_EXISTS; + extern const int UNSUPPORTED_METHOD; } ExternalUserDefinedExecutableFunctionsLoader::ExternalUserDefinedExecutableFunctionsLoader(ContextPtr global_context_) @@ -111,6 +112,7 @@ ExternalLoader::LoadablePtr ExternalUserDefinedExecutableFunctionsLoader::create lifetime = ExternalLoadableLifetime(config, key_in_config + ".lifetime"); std::vector arguments; + std::vector parameters; Poco::Util::AbstractConfiguration::Keys config_elems; config.keys(key_in_config, config_elems); @@ -137,12 +139,33 @@ ExternalLoader::LoadablePtr ExternalUserDefinedExecutableFunctionsLoader::create arguments.emplace_back(std::move(argument)); } + for (const auto & config_elem : config_elems) + { + if (!startsWith(config_elem, "parameter")) + continue; + + UserDefinedExecutableFunctionParameter parameter; + + const auto parameter_prefix = key_in_config + '.' + config_elem + '.'; + + parameter.type = DataTypeFactory::instance().get(config.getString(parameter_prefix + "type")); + parameter.name = config.getString(parameter_prefix + "name"); + + parameters.emplace_back(std::move(parameter)); + } + + if (is_executable_pool && !parameters.empty()) { + throw Exception(ErrorCodes::UNSUPPORTED_METHOD, + "Executable user defined functions with `executable_pool` type does not support parameters"); + } + UserDefinedExecutableFunctionConfiguration function_configuration { .name = name, .command = std::move(command_value), .command_arguments = std::move(command_arguments), .arguments = std::move(arguments), + .parameters = std::move(parameters), .result_type = std::move(result_type), .result_name = std::move(result_name), }; diff --git a/src/Interpreters/UserDefinedExecutableFunction.h b/src/Interpreters/UserDefinedExecutableFunction.h index 434c77e9236..989f9dfe895 100644 --- a/src/Interpreters/UserDefinedExecutableFunction.h +++ b/src/Interpreters/UserDefinedExecutableFunction.h @@ -16,12 +16,19 @@ struct UserDefinedExecutableFunctionArgument String name; }; +struct UserDefinedExecutableFunctionParameter +{ + String name; + DataTypePtr type; +}; + struct UserDefinedExecutableFunctionConfiguration { std::string name; std::string command; std::vector command_arguments; std::vector arguments; + std::vector parameters; DataTypePtr result_type; String result_name; }; diff --git a/src/Interpreters/UserDefinedExecutableFunctionFactory.cpp b/src/Interpreters/UserDefinedExecutableFunctionFactory.cpp index b67e9c16ed5..a385233510b 100644 --- a/src/Interpreters/UserDefinedExecutableFunctionFactory.cpp +++ b/src/Interpreters/UserDefinedExecutableFunctionFactory.cpp @@ -3,6 +3,8 @@ #include #include +#include +#include #include #include @@ -11,6 +13,7 @@ #include #include #include +#include #include #include #include @@ -22,6 +25,8 @@ namespace DB namespace ErrorCodes { extern const int UNSUPPORTED_METHOD; + extern const int BAD_ARGUMENTS; + extern const int TYPE_MISMATCH; } class UserDefinedFunction final : public IFunction @@ -30,10 +35,59 @@ public: explicit UserDefinedFunction( ExternalUserDefinedExecutableFunctionsLoader::UserDefinedExecutableFunctionPtr executable_function_, - ContextPtr context_) + ContextPtr context_, + Array parameters_) : executable_function(std::move(executable_function_)) , context(context_) { + const auto & configuration = executable_function->getConfiguration(); + size_t command_parameters_size = configuration.parameters.size(); + if (command_parameters_size != parameters_.size()) { + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Executable user defined function {} number of parameters does not match. Expected {}. Actual {}", + configuration.name, + command_parameters_size, + parameters_.size()); + } + + command_with_parameters = configuration.command; + command_arguments_with_parameters = configuration.command_arguments; + + for (size_t i = 0; i < command_parameters_size; ++i) + { + const auto & command_parameter = configuration.parameters[i]; + const auto & parameter_value = parameters_[i]; + auto converted_parameter = convertFieldToTypeOrThrow(parameter_value, *command_parameter.type); + auto parameter_placeholder = "{" + command_parameter.name + "}"; + size_t parameter_placeholder_size = parameter_placeholder.size(); + + auto parameter_value_string = applyVisitor(FieldVisitorToString(), converted_parameter); + bool find_placedholder = false; + + for (auto & command_argument : command_arguments_with_parameters) { + auto parameter_placeholder_position = command_argument.find(parameter_placeholder); + if (parameter_placeholder_position == std::string::npos) + continue; + + command_argument.replace(parameter_placeholder_position, parameter_placeholder_size, parameter_value_string); + find_placedholder = true; + } + + auto parameter_placeholder_position = command_with_parameters.find(parameter_placeholder); + + if (parameter_placeholder_position != std::string::npos) { + command_with_parameters.replace(parameter_placeholder_position, parameter_placeholder_size, parameter_value_string); + find_placedholder = true; + } + + if (!find_placedholder) + { + throw Exception(ErrorCodes::BAD_ARGUMENTS, + "Executable user defined function {} no placeholder for parameter {}", + configuration.name, + command_parameter.name); + } + } } String getName() const override { return executable_function->getConfiguration().name; } @@ -63,7 +117,7 @@ public: const auto & coordinator_configuration = coordinator->getConfiguration(); const auto & configuration = executable_function->getConfiguration(); - String command = configuration.command; + String command = command_with_parameters; if (coordinator_configuration.execute_direct) { @@ -134,7 +188,7 @@ public: Pipe pipe = coordinator->createPipe( command, - configuration.command_arguments, + command_arguments_with_parameters, std::move(shell_input_pipes), result_block, context, @@ -165,9 +219,10 @@ public: } private: - ExternalUserDefinedExecutableFunctionsLoader::UserDefinedExecutableFunctionPtr executable_function; ContextPtr context; + String command_with_parameters; + std::vector command_arguments_with_parameters; }; UserDefinedExecutableFunctionFactory & UserDefinedExecutableFunctionFactory::instance() @@ -176,15 +231,15 @@ UserDefinedExecutableFunctionFactory & UserDefinedExecutableFunctionFactory::ins return result; } -FunctionOverloadResolverPtr UserDefinedExecutableFunctionFactory::get(const String & function_name, ContextPtr context) +FunctionOverloadResolverPtr UserDefinedExecutableFunctionFactory::get(const String & function_name, ContextPtr context, Array parameters) { const auto & loader = context->getExternalUserDefinedExecutableFunctionsLoader(); auto executable_function = std::static_pointer_cast(loader.load(function_name)); - auto function = std::make_shared(std::move(executable_function), std::move(context)); + auto function = std::make_shared(std::move(executable_function), std::move(context), std::move(parameters)); return std::make_unique(std::move(function)); } -FunctionOverloadResolverPtr UserDefinedExecutableFunctionFactory::tryGet(const String & function_name, ContextPtr context) +FunctionOverloadResolverPtr UserDefinedExecutableFunctionFactory::tryGet(const String & function_name, ContextPtr context, Array parameters) { const auto & loader = context->getExternalUserDefinedExecutableFunctionsLoader(); auto load_result = loader.getLoadResult(function_name); @@ -192,7 +247,7 @@ FunctionOverloadResolverPtr UserDefinedExecutableFunctionFactory::tryGet(const S if (load_result.object) { auto executable_function = std::static_pointer_cast(load_result.object); - auto function = std::make_shared(std::move(executable_function), std::move(context)); + auto function = std::make_shared(std::move(executable_function), std::move(context), std::move(parameters)); return std::make_unique(std::move(function)); } diff --git a/src/Interpreters/UserDefinedExecutableFunctionFactory.h b/src/Interpreters/UserDefinedExecutableFunctionFactory.h index 989db4c481b..ad10cb3c10f 100644 --- a/src/Interpreters/UserDefinedExecutableFunctionFactory.h +++ b/src/Interpreters/UserDefinedExecutableFunctionFactory.h @@ -5,9 +5,9 @@ #include #include -#include -#include +#include #include +#include namespace DB @@ -20,9 +20,9 @@ public: static UserDefinedExecutableFunctionFactory & instance(); - static FunctionOverloadResolverPtr get(const String & function_name, ContextPtr context); + static FunctionOverloadResolverPtr get(const String & function_name, ContextPtr context, Array parameters = {}); - static FunctionOverloadResolverPtr tryGet(const String & function_name, ContextPtr context); + static FunctionOverloadResolverPtr tryGet(const String & function_name, ContextPtr context, Array parameters = {}); static bool has(const String & function_name, ContextPtr context); From 549a55e4a268694141ff4299550b76d82f6780b0 Mon Sep 17 00:00:00 2001 From: Maksim Kita Date: Tue, 31 May 2022 22:54:46 +0200 Subject: [PATCH 06/12] Added functional tests --- tests/config/test_function.xml | 18 ++++++++++++++++++ ...e_user_defined_function_parameter.reference | 2 ++ ...cutable_user_defined_function_parameter.sql | 6 ++++++ 3 files changed, 26 insertions(+) create mode 100644 tests/queries/0_stateless/02315_executable_user_defined_function_parameter.reference create mode 100644 tests/queries/0_stateless/02315_executable_user_defined_function_parameter.sql diff --git a/tests/config/test_function.xml b/tests/config/test_function.xml index 928cbd75c78..d412879f4b6 100644 --- a/tests/config/test_function.xml +++ b/tests/config/test_function.xml @@ -13,4 +13,22 @@ cd /; clickhouse-local --input-format TabSeparated --output-format TabSeparated --structure 'x UInt64, y UInt64' --query "SELECT x + y FROM table" 0 + + executable + test_function_with_parameter + UInt64 + + UInt64 + + + UInt64 + + + test_parameter + UInt64 + + TabSeparated + cd /; clickhouse-local --input-format TabSeparated --output-format TabSeparated --structure 'x UInt64, y UInt64' --query "SELECT x + y + {test_parameter} FROM table" + 0 + diff --git a/tests/queries/0_stateless/02315_executable_user_defined_function_parameter.reference b/tests/queries/0_stateless/02315_executable_user_defined_function_parameter.reference new file mode 100644 index 00000000000..fd3c81a4d76 --- /dev/null +++ b/tests/queries/0_stateless/02315_executable_user_defined_function_parameter.reference @@ -0,0 +1,2 @@ +5 +5 diff --git a/tests/queries/0_stateless/02315_executable_user_defined_function_parameter.sql b/tests/queries/0_stateless/02315_executable_user_defined_function_parameter.sql new file mode 100644 index 00000000000..f6e5678e612 --- /dev/null +++ b/tests/queries/0_stateless/02315_executable_user_defined_function_parameter.sql @@ -0,0 +1,6 @@ +SELECT test_function_with_parameter('test')(1, 2); --{serverError 53} +SELECT test_function_with_parameter(2, 2)(1, 2); --{serverError 36} +SELECT test_function_with_parameter(1, 2); --{serverError 36} + +SELECT test_function_with_parameter(2)(1, 2); +SELECT test_function_with_parameter('2')(1, 2); From c4da2540e9d315919ee24a8074851ea4c061b01b Mon Sep 17 00:00:00 2001 From: Maksim Kita Date: Tue, 31 May 2022 23:16:24 +0200 Subject: [PATCH 07/12] Added integration tests --- .../functions/test_function_config.xml | 11 +++++++++++ .../test_executable_user_defined_function/test.py | 12 ++++++++++++ .../user_scripts/input_parameter.py | 8 ++++++++ 3 files changed, 31 insertions(+) create mode 100755 tests/integration/test_executable_user_defined_function/user_scripts/input_parameter.py diff --git a/tests/integration/test_executable_user_defined_function/functions/test_function_config.xml b/tests/integration/test_executable_user_defined_function/functions/test_function_config.xml index 5da2e854da8..d35cb173dd0 100644 --- a/tests/integration/test_executable_user_defined_function/functions/test_function_config.xml +++ b/tests/integration/test_executable_user_defined_function/functions/test_function_config.xml @@ -311,4 +311,15 @@ input_nullable.py + + executable + test_function_parameter_python + String + + UInt64 + + TabSeparated + input_parameter.py {test_parameter:UInt64} + + diff --git a/tests/integration/test_executable_user_defined_function/test.py b/tests/integration/test_executable_user_defined_function/test.py index f48547a1437..2fe8eaf5398 100644 --- a/tests/integration/test_executable_user_defined_function/test.py +++ b/tests/integration/test_executable_user_defined_function/test.py @@ -263,3 +263,15 @@ def test_executable_function_input_nullable_python(started_cluster): ) == "Key 0\nKey Nullable\nKey 2\n" ) + + +def test_executable_function_parameter_python(started_cluster): + skip_test_msan(node) + + assert node.query_and_get_error("SELECT test_function_parameter_python(2,2)(toUInt64(1))") + assert node.query_and_get_error("SELECT test_function_parameter_python(2,2)(1)") + assert node.query_and_get_error("SELECT test_function_parameter_python(1)") + assert node.query_and_get_error("SELECT test_function_parameter_python('test')(toUInt64(1))") + + assert node.query("SELECT test_function_parameter_python('2')(toUInt64(1))") == "Parameter 2 key 1\n" + assert node.query("SELECT test_function_parameter_python(2)(toUInt64(1))") == "Parameter 2 key 1\n" diff --git a/tests/integration/test_executable_user_defined_function/user_scripts/input_parameter.py b/tests/integration/test_executable_user_defined_function/user_scripts/input_parameter.py new file mode 100755 index 00000000000..0148ebcaa54 --- /dev/null +++ b/tests/integration/test_executable_user_defined_function/user_scripts/input_parameter.py @@ -0,0 +1,8 @@ +#!/usr/bin/python3 + +import sys + +if __name__ == '__main__': + for line in sys.stdin: + print("Parameter " + str(sys.argv[1]) + " key " + str(line), end='') + sys.stdout.flush() \ No newline at end of file From d14193b3adf45b5c1197c3fb1b61264a63ec4820 Mon Sep 17 00:00:00 2001 From: Maksim Kita Date: Wed, 1 Jun 2022 12:52:54 +0200 Subject: [PATCH 08/12] Executable user defined functions extract parameters name and type from command value --- src/Interpreters/ActionsVisitor.cpp | 3 +- ...alUserDefinedExecutableFunctionsLoader.cpp | 95 +++++++++++++++---- .../UserDefinedExecutableFunctionFactory.cpp | 35 ++++--- tests/config/test_function.xml | 6 +- 4 files changed, 100 insertions(+), 39 deletions(-) diff --git a/src/Interpreters/ActionsVisitor.cpp b/src/Interpreters/ActionsVisitor.cpp index 2b2a29f832b..f9d18cda777 100644 --- a/src/Interpreters/ActionsVisitor.cpp +++ b/src/Interpreters/ActionsVisitor.cpp @@ -960,7 +960,8 @@ void ActionsMatcher::visit(const ASTFunction & node, const ASTPtr & ast, Data & if (UserDefinedExecutableFunctionFactory::instance().has(node.name, current_context)) { Array parameters; - if (node.parameters) { + if (node.parameters) + { auto & node_parameters = node.parameters->children; size_t parameters_size = node_parameters.size(); parameters.resize(parameters_size); diff --git a/src/Interpreters/ExternalUserDefinedExecutableFunctionsLoader.cpp b/src/Interpreters/ExternalUserDefinedExecutableFunctionsLoader.cpp index 641073f7164..33829aceb31 100644 --- a/src/Interpreters/ExternalUserDefinedExecutableFunctionsLoader.cpp +++ b/src/Interpreters/ExternalUserDefinedExecutableFunctionsLoader.cpp @@ -1,6 +1,7 @@ #include "ExternalUserDefinedExecutableFunctionsLoader.h" #include +#include #include @@ -18,6 +19,79 @@ namespace ErrorCodes extern const int BAD_ARGUMENTS; extern const int FUNCTION_ALREADY_EXISTS; extern const int UNSUPPORTED_METHOD; + extern const int TYPE_MISMATCH; +} + +namespace +{ + /** Extract parameters from command and replace them with parameter names placeholders. + * Example: test_script.py {parameter_name: UInt64} + * After run function: test_script.py {parameter_name} + */ + std::vector extractParametersFromCommand(String & command_value) + { + std::vector parameters; + std::unordered_map parameter_name_to_type; + + size_t previous_parameter_match_position = 0; + while (true) + { + auto start_parameter_pos = command_value.find('{', previous_parameter_match_position); + if (start_parameter_pos == std::string::npos) + break; + + auto end_parameter_pos = command_value.find('}', start_parameter_pos); + if (end_parameter_pos == std::string::npos) + break; + + previous_parameter_match_position = start_parameter_pos + 1; + + auto semicolon_pos = command_value.find(':', start_parameter_pos); + if (semicolon_pos == std::string::npos) + break; + else if (semicolon_pos > end_parameter_pos) + continue; + + std::string parameter_name(command_value.data() + start_parameter_pos + 1, command_value.data() + semicolon_pos); + trim(parameter_name); + + bool is_identifier = std::all_of(parameter_name.begin(), parameter_name.end(), [](char character) + { + return isWordCharASCII(character); + }); + + if (parameter_name.empty() && !is_identifier) + continue; + + std::string data_type_name(command_value.data() + semicolon_pos + 1, command_value.data() + end_parameter_pos); + trim(data_type_name); + + if (data_type_name.empty()) + continue; + + DataTypePtr parameter_data_type = DataTypeFactory::instance().get(data_type_name); + auto parameter_name_to_type_it = parameter_name_to_type.find(parameter_name); + if (parameter_name_to_type_it != parameter_name_to_type.end() && !parameter_data_type->equals(*parameter_name_to_type_it->second)) + throw Exception(ErrorCodes::TYPE_MISMATCH, + "Multiple parameters with same name {} does not have same type. Expected {}. Actual {}", + parameter_name, + parameter_name_to_type_it->second->getName(), + parameter_data_type->getName()); + + size_t replace_size = end_parameter_pos - start_parameter_pos - 1; + command_value.replace(start_parameter_pos + 1, replace_size, parameter_name); + previous_parameter_match_position = start_parameter_pos + parameter_name.size(); + + if (parameter_name_to_type_it == parameter_name_to_type.end()) + { + parameters.emplace_back(UserDefinedExecutableFunctionParameter{std::move(parameter_name), std::move(parameter_data_type)}); + auto & last_parameter = parameters.back(); + parameter_name_to_type.emplace(last_parameter.name, last_parameter.type); + } + } + + return parameters; + } } ExternalUserDefinedExecutableFunctionsLoader::ExternalUserDefinedExecutableFunctionsLoader(ContextPtr global_context_) @@ -72,6 +146,8 @@ ExternalLoader::LoadablePtr ExternalUserDefinedExecutableFunctionsLoader::create bool execute_direct = config.getBool(key_in_config + ".execute_direct", true); String command_value = config.getString(key_in_config + ".command"); + std::vector parameters = extractParametersFromCommand(command_value); + std::vector command_arguments; if (execute_direct) @@ -112,7 +188,6 @@ ExternalLoader::LoadablePtr ExternalUserDefinedExecutableFunctionsLoader::create lifetime = ExternalLoadableLifetime(config, key_in_config + ".lifetime"); std::vector arguments; - std::vector parameters; Poco::Util::AbstractConfiguration::Keys config_elems; config.keys(key_in_config, config_elems); @@ -139,25 +214,9 @@ ExternalLoader::LoadablePtr ExternalUserDefinedExecutableFunctionsLoader::create arguments.emplace_back(std::move(argument)); } - for (const auto & config_elem : config_elems) - { - if (!startsWith(config_elem, "parameter")) - continue; - - UserDefinedExecutableFunctionParameter parameter; - - const auto parameter_prefix = key_in_config + '.' + config_elem + '.'; - - parameter.type = DataTypeFactory::instance().get(config.getString(parameter_prefix + "type")); - parameter.name = config.getString(parameter_prefix + "name"); - - parameters.emplace_back(std::move(parameter)); - } - - if (is_executable_pool && !parameters.empty()) { + if (is_executable_pool && !parameters.empty()) throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "Executable user defined functions with `executable_pool` type does not support parameters"); - } UserDefinedExecutableFunctionConfiguration function_configuration { diff --git a/src/Interpreters/UserDefinedExecutableFunctionFactory.cpp b/src/Interpreters/UserDefinedExecutableFunctionFactory.cpp index a385233510b..8e4b66ef893 100644 --- a/src/Interpreters/UserDefinedExecutableFunctionFactory.cpp +++ b/src/Interpreters/UserDefinedExecutableFunctionFactory.cpp @@ -26,7 +26,6 @@ namespace ErrorCodes { extern const int UNSUPPORTED_METHOD; extern const int BAD_ARGUMENTS; - extern const int TYPE_MISMATCH; } class UserDefinedFunction final : public IFunction @@ -42,13 +41,12 @@ public: { const auto & configuration = executable_function->getConfiguration(); size_t command_parameters_size = configuration.parameters.size(); - if (command_parameters_size != parameters_.size()) { + if (command_parameters_size != parameters_.size()) throw Exception(ErrorCodes::BAD_ARGUMENTS, "Executable user defined function {} number of parameters does not match. Expected {}. Actual {}", configuration.name, command_parameters_size, parameters_.size()); - } command_with_parameters = configuration.command; command_arguments_with_parameters = configuration.command_arguments; @@ -59,26 +57,33 @@ public: const auto & parameter_value = parameters_[i]; auto converted_parameter = convertFieldToTypeOrThrow(parameter_value, *command_parameter.type); auto parameter_placeholder = "{" + command_parameter.name + "}"; - size_t parameter_placeholder_size = parameter_placeholder.size(); auto parameter_value_string = applyVisitor(FieldVisitorToString(), converted_parameter); bool find_placedholder = false; - for (auto & command_argument : command_arguments_with_parameters) { - auto parameter_placeholder_position = command_argument.find(parameter_placeholder); - if (parameter_placeholder_position == std::string::npos) - continue; + auto try_replace_parameter_placeholder_with_value = [&](std::string & command_part) + { + size_t previous_parameter_placeholder_position = 0; + + while (true) + { + auto parameter_placeholder_position = command_part.find(parameter_placeholder, previous_parameter_placeholder_position); + if (parameter_placeholder_position == std::string::npos) + break; + + size_t parameter_placeholder_size = parameter_placeholder.size(); + command_part.replace(parameter_placeholder_position, parameter_placeholder_size, parameter_value_string); + previous_parameter_placeholder_position = parameter_placeholder_position + parameter_value_string.size(); + find_placedholder = true; + } - command_argument.replace(parameter_placeholder_position, parameter_placeholder_size, parameter_value_string); find_placedholder = true; - } + }; - auto parameter_placeholder_position = command_with_parameters.find(parameter_placeholder); + for (auto & command_argument : command_arguments_with_parameters) + try_replace_parameter_placeholder_with_value(command_argument); - if (parameter_placeholder_position != std::string::npos) { - command_with_parameters.replace(parameter_placeholder_position, parameter_placeholder_size, parameter_value_string); - find_placedholder = true; - } + try_replace_parameter_placeholder_with_value(command_with_parameters); if (!find_placedholder) { diff --git a/tests/config/test_function.xml b/tests/config/test_function.xml index d412879f4b6..a50ab69422a 100644 --- a/tests/config/test_function.xml +++ b/tests/config/test_function.xml @@ -23,12 +23,8 @@ UInt64 - - test_parameter - UInt64 - TabSeparated - cd /; clickhouse-local --input-format TabSeparated --output-format TabSeparated --structure 'x UInt64, y UInt64' --query "SELECT x + y + {test_parameter} FROM table" + cd /; clickhouse-local --input-format TabSeparated --output-format TabSeparated --structure 'x UInt64, y UInt64' --query "SELECT x + y + {test_parameter : UInt64} FROM table" 0 From 23b92cbe10a666e7a56abb329609d6f0aced2966 Mon Sep 17 00:00:00 2001 From: Maksim Kita Date: Wed, 1 Jun 2022 12:55:08 +0200 Subject: [PATCH 09/12] Fixed style check --- .../test.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/tests/integration/test_executable_user_defined_function/test.py b/tests/integration/test_executable_user_defined_function/test.py index 2fe8eaf5398..1f4e14470c3 100644 --- a/tests/integration/test_executable_user_defined_function/test.py +++ b/tests/integration/test_executable_user_defined_function/test.py @@ -268,10 +268,20 @@ def test_executable_function_input_nullable_python(started_cluster): def test_executable_function_parameter_python(started_cluster): skip_test_msan(node) - assert node.query_and_get_error("SELECT test_function_parameter_python(2,2)(toUInt64(1))") + assert node.query_and_get_error( + "SELECT test_function_parameter_python(2,2)(toUInt64(1))" + ) assert node.query_and_get_error("SELECT test_function_parameter_python(2,2)(1)") assert node.query_and_get_error("SELECT test_function_parameter_python(1)") - assert node.query_and_get_error("SELECT test_function_parameter_python('test')(toUInt64(1))") + assert node.query_and_get_error( + "SELECT test_function_parameter_python('test')(toUInt64(1))" + ) - assert node.query("SELECT test_function_parameter_python('2')(toUInt64(1))") == "Parameter 2 key 1\n" - assert node.query("SELECT test_function_parameter_python(2)(toUInt64(1))") == "Parameter 2 key 1\n" + assert ( + node.query("SELECT test_function_parameter_python('2')(toUInt64(1))") + == "Parameter 2 key 1\n" + ) + assert ( + node.query("SELECT test_function_parameter_python(2)(toUInt64(1))") + == "Parameter 2 key 1\n" + ) From b4cc8ce22b6f89c1ed5ddba6addf77e7a9d46d1d Mon Sep 17 00:00:00 2001 From: Maksim Kita Date: Wed, 1 Jun 2022 14:29:22 +0200 Subject: [PATCH 10/12] Fixed style check --- .../user_scripts/input_parameter.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/integration/test_executable_user_defined_function/user_scripts/input_parameter.py b/tests/integration/test_executable_user_defined_function/user_scripts/input_parameter.py index 0148ebcaa54..c9d6ff9f9d5 100755 --- a/tests/integration/test_executable_user_defined_function/user_scripts/input_parameter.py +++ b/tests/integration/test_executable_user_defined_function/user_scripts/input_parameter.py @@ -2,7 +2,7 @@ import sys -if __name__ == '__main__': +if __name__ == "__main__": for line in sys.stdin: - print("Parameter " + str(sys.argv[1]) + " key " + str(line), end='') - sys.stdout.flush() \ No newline at end of file + print("Parameter " + str(sys.argv[1]) + " key " + str(line), end="") + sys.stdout.flush() From 90f023d65f0079c8342c1b420ffe7dc2fedd9e42 Mon Sep 17 00:00:00 2001 From: Nickita Taranov Date: Mon, 6 Jun 2022 12:55:07 +0200 Subject: [PATCH 11/12] impl --- .../test_storage_kafka/kafka_pb2.py | 91 +++--------------- .../test_storage_rabbitmq/rabbitmq_pb2.py | 92 +++---------------- 2 files changed, 25 insertions(+), 158 deletions(-) diff --git a/tests/integration/test_storage_kafka/kafka_pb2.py b/tests/integration/test_storage_kafka/kafka_pb2.py index d29bc7e8541..7de1363bbf1 100644 --- a/tests/integration/test_storage_kafka/kafka_pb2.py +++ b/tests/integration/test_storage_kafka/kafka_pb2.py @@ -1,93 +1,28 @@ +# -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: clickhouse_path/format_schemas/kafka.proto - -import sys - -_b = sys.version_info[0] < 3 and (lambda x: x) or (lambda x: x.encode("latin1")) +"""Generated protocol buffer code.""" +from google.protobuf.internal import builder as _builder from google.protobuf import descriptor as _descriptor -from google.protobuf import message as _message -from google.protobuf import reflection as _reflection +from google.protobuf import descriptor_pool as _descriptor_pool from google.protobuf import symbol_database as _symbol_database # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() -DESCRIPTOR = _descriptor.FileDescriptor( - name="clickhouse_path/format_schemas/kafka.proto", - package="", - syntax="proto3", - serialized_pb=_b( - '\n*clickhouse_path/format_schemas/kafka.proto"*\n\x0cKeyValuePair\x12\x0b\n\x03key\x18\x01 \x01(\x04\x12\r\n\x05value\x18\x02 \x01(\tb\x06proto3' - ), -) -_sym_db.RegisterFileDescriptor(DESCRIPTOR) -_KEYVALUEPAIR = _descriptor.Descriptor( - name="KeyValuePair", - full_name="KeyValuePair", - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - _descriptor.FieldDescriptor( - name="key", - full_name="KeyValuePair.key", - index=0, - number=1, - type=4, - cpp_type=4, - label=1, - has_default_value=False, - default_value=0, - message_type=None, - enum_type=None, - containing_type=None, - is_extension=False, - extension_scope=None, - options=None, - ), - _descriptor.FieldDescriptor( - name="value", - full_name="KeyValuePair.value", - index=1, - number=2, - type=9, - cpp_type=9, - label=1, - has_default_value=False, - default_value=_b("").decode("utf-8"), - message_type=None, - enum_type=None, - containing_type=None, - is_extension=False, - extension_scope=None, - options=None, - ), - ], - extensions=[], - nested_types=[], - enum_types=[], - options=None, - is_extendable=False, - syntax="proto3", - extension_ranges=[], - oneofs=[], - serialized_start=46, - serialized_end=88, +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( + b'\n*clickhouse_path/format_schemas/kafka.proto"*\n\x0cKeyValuePair\x12\x0b\n\x03key\x18\x01 \x01(\x04\x12\r\n\x05value\x18\x02 \x01(\tb\x06proto3' ) -DESCRIPTOR.message_types_by_name["KeyValuePair"] = _KEYVALUEPAIR - -KeyValuePair = _reflection.GeneratedProtocolMessageType( - "KeyValuePair", - (_message.Message,), - dict( - DESCRIPTOR=_KEYVALUEPAIR, - __module__="clickhouse_path.format_schemas.kafka_pb2" - # @@protoc_insertion_point(class_scope:KeyValuePair) - ), +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) +_builder.BuildTopDescriptorsAndMessages( + DESCRIPTOR, "clickhouse_path.format_schemas.kafka_pb2", globals() ) -_sym_db.RegisterMessage(KeyValuePair) +if _descriptor._USE_C_DESCRIPTORS == False: + DESCRIPTOR._options = None + _KEYVALUEPAIR._serialized_start = 46 + _KEYVALUEPAIR._serialized_end = 88 # @@protoc_insertion_point(module_scope) diff --git a/tests/integration/test_storage_rabbitmq/rabbitmq_pb2.py b/tests/integration/test_storage_rabbitmq/rabbitmq_pb2.py index df5c29adc6d..e017b4e66c2 100644 --- a/tests/integration/test_storage_rabbitmq/rabbitmq_pb2.py +++ b/tests/integration/test_storage_rabbitmq/rabbitmq_pb2.py @@ -2,95 +2,27 @@ # Generated by the protocol buffer compiler. DO NOT EDIT! # source: clickhouse_path/format_schemas/rabbitmq.proto """Generated protocol buffer code.""" +from google.protobuf.internal import builder as _builder from google.protobuf import descriptor as _descriptor -from google.protobuf import message as _message -from google.protobuf import reflection as _reflection +from google.protobuf import descriptor_pool as _descriptor_pool from google.protobuf import symbol_database as _symbol_database # @@protoc_insertion_point(imports) _sym_db = _symbol_database.Default() -DESCRIPTOR = _descriptor.FileDescriptor( - name="clickhouse_path/format_schemas/rabbitmq.proto", - package="", - syntax="proto3", - serialized_options=None, - create_key=_descriptor._internal_create_key, - serialized_pb=b'\n-clickhouse_path/format_schemas/rabbitmq.proto"+\n\rKeyValueProto\x12\x0b\n\x03key\x18\x01 \x01(\x04\x12\r\n\x05value\x18\x02 \x01(\tb\x06proto3', + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( + b'\n-clickhouse_path/format_schemas/rabbitmq.proto"+\n\rKeyValueProto\x12\x0b\n\x03key\x18\x01 \x01(\x04\x12\r\n\x05value\x18\x02 \x01(\tb\x06proto3' ) -_KEYVALUEPROTO = _descriptor.Descriptor( - name="KeyValueProto", - full_name="KeyValueProto", - filename=None, - file=DESCRIPTOR, - containing_type=None, - create_key=_descriptor._internal_create_key, - fields=[ - _descriptor.FieldDescriptor( - name="key", - full_name="KeyValueProto.key", - index=0, - number=1, - type=4, - cpp_type=4, - label=1, - has_default_value=False, - default_value=0, - message_type=None, - enum_type=None, - containing_type=None, - is_extension=False, - extension_scope=None, - serialized_options=None, - file=DESCRIPTOR, - create_key=_descriptor._internal_create_key, - ), - _descriptor.FieldDescriptor( - name="value", - full_name="KeyValueProto.value", - index=1, - number=2, - type=9, - cpp_type=9, - label=1, - has_default_value=False, - default_value=b"".decode("utf-8"), - message_type=None, - enum_type=None, - containing_type=None, - is_extension=False, - extension_scope=None, - serialized_options=None, - file=DESCRIPTOR, - create_key=_descriptor._internal_create_key, - ), - ], - extensions=[], - nested_types=[], - enum_types=[], - serialized_options=None, - is_extendable=False, - syntax="proto3", - extension_ranges=[], - oneofs=[], - serialized_start=49, - serialized_end=92, +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) +_builder.BuildTopDescriptorsAndMessages( + DESCRIPTOR, "clickhouse_path.format_schemas.rabbitmq_pb2", globals() ) +if _descriptor._USE_C_DESCRIPTORS == False: -DESCRIPTOR.message_types_by_name["KeyValueProto"] = _KEYVALUEPROTO -_sym_db.RegisterFileDescriptor(DESCRIPTOR) - -KeyValueProto = _reflection.GeneratedProtocolMessageType( - "KeyValueProto", - (_message.Message,), - { - "DESCRIPTOR": _KEYVALUEPROTO, - "__module__": "clickhouse_path.format_schemas.rabbitmq_pb2" - # @@protoc_insertion_point(class_scope:KeyValueProto) - }, -) -_sym_db.RegisterMessage(KeyValueProto) - + DESCRIPTOR._options = None + _KEYVALUEPROTO._serialized_start = 49 + _KEYVALUEPROTO._serialized_end = 92 # @@protoc_insertion_point(module_scope) From 9fd9836237607e547967932258d19e9a15562d9f Mon Sep 17 00:00:00 2001 From: Nickita Taranov Date: Mon, 6 Jun 2022 15:31:20 +0200 Subject: [PATCH 12/12] update other files --- .../message_with_repeated_pb2.py | 315 +----------------- .../test_storage_kafka/social_pb2.py | 93 +----- 2 files changed, 29 insertions(+), 379 deletions(-) diff --git a/tests/integration/test_storage_kafka/message_with_repeated_pb2.py b/tests/integration/test_storage_kafka/message_with_repeated_pb2.py index b0755a121ae..4d1a23c0b43 100644 --- a/tests/integration/test_storage_kafka/message_with_repeated_pb2.py +++ b/tests/integration/test_storage_kafka/message_with_repeated_pb2.py @@ -1,12 +1,10 @@ +# -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! # source: clickhouse_path/format_schemas/message_with_repeated.proto - -import sys - -_b = sys.version_info[0] < 3 and (lambda x: x) or (lambda x: x.encode("latin1")) +"""Generated protocol buffer code.""" +from google.protobuf.internal import builder as _builder from google.protobuf import descriptor as _descriptor -from google.protobuf import message as _message -from google.protobuf import reflection as _reflection +from google.protobuf import descriptor_pool as _descriptor_pool from google.protobuf import symbol_database as _symbol_database # @@protoc_insertion_point(imports) @@ -14,301 +12,20 @@ from google.protobuf import symbol_database as _symbol_database _sym_db = _symbol_database.Default() -DESCRIPTOR = _descriptor.FileDescriptor( - name="clickhouse_path/format_schemas/message_with_repeated.proto", - package="", - syntax="proto3", - serialized_options=_b("H\001"), - serialized_pb=_b( - '\n:clickhouse_path/format_schemas/message_with_repeated.proto"t\n\x07Message\x12\x0c\n\x04tnow\x18\x01 \x01(\r\x12\x0e\n\x06server\x18\x02 \x01(\t\x12\r\n\x05\x63lien\x18\x03 \x01(\t\x12\r\n\x05sPort\x18\x04 \x01(\r\x12\r\n\x05\x63Port\x18\x05 \x01(\r\x12\x0e\n\x01r\x18\x06 \x03(\x0b\x32\x03.dd\x12\x0e\n\x06method\x18\x07 \x01(\t"J\n\x02\x64\x64\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\r\n\x05\x63lass\x18\x02 \x01(\r\x12\x0c\n\x04type\x18\x03 \x01(\r\x12\x0b\n\x03ttl\x18\x04 \x01(\x04\x12\x0c\n\x04\x64\x61ta\x18\x05 \x01(\x0c\x42\x02H\x01\x62\x06proto3' - ), +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( + b'\n:clickhouse_path/format_schemas/message_with_repeated.proto"t\n\x07Message\x12\x0c\n\x04tnow\x18\x01 \x01(\r\x12\x0e\n\x06server\x18\x02 \x01(\t\x12\r\n\x05\x63lien\x18\x03 \x01(\t\x12\r\n\x05sPort\x18\x04 \x01(\r\x12\r\n\x05\x63Port\x18\x05 \x01(\r\x12\x0e\n\x01r\x18\x06 \x03(\x0b\x32\x03.dd\x12\x0e\n\x06method\x18\x07 \x01(\t"J\n\x02\x64\x64\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\r\n\x05\x63lass\x18\x02 \x01(\r\x12\x0c\n\x04type\x18\x03 \x01(\r\x12\x0b\n\x03ttl\x18\x04 \x01(\x04\x12\x0c\n\x04\x64\x61ta\x18\x05 \x01(\x0c\x42\x02H\x01\x62\x06proto3' ) - -_MESSAGE = _descriptor.Descriptor( - name="Message", - full_name="Message", - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - _descriptor.FieldDescriptor( - name="tnow", - full_name="Message.tnow", - index=0, - number=1, - type=13, - cpp_type=3, - label=1, - has_default_value=False, - default_value=0, - message_type=None, - enum_type=None, - containing_type=None, - is_extension=False, - extension_scope=None, - serialized_options=None, - file=DESCRIPTOR, - ), - _descriptor.FieldDescriptor( - name="server", - full_name="Message.server", - index=1, - number=2, - type=9, - cpp_type=9, - label=1, - has_default_value=False, - default_value=_b("").decode("utf-8"), - message_type=None, - enum_type=None, - containing_type=None, - is_extension=False, - extension_scope=None, - serialized_options=None, - file=DESCRIPTOR, - ), - _descriptor.FieldDescriptor( - name="clien", - full_name="Message.clien", - index=2, - number=3, - type=9, - cpp_type=9, - label=1, - has_default_value=False, - default_value=_b("").decode("utf-8"), - message_type=None, - enum_type=None, - containing_type=None, - is_extension=False, - extension_scope=None, - serialized_options=None, - file=DESCRIPTOR, - ), - _descriptor.FieldDescriptor( - name="sPort", - full_name="Message.sPort", - index=3, - number=4, - type=13, - cpp_type=3, - label=1, - has_default_value=False, - default_value=0, - message_type=None, - enum_type=None, - containing_type=None, - is_extension=False, - extension_scope=None, - serialized_options=None, - file=DESCRIPTOR, - ), - _descriptor.FieldDescriptor( - name="cPort", - full_name="Message.cPort", - index=4, - number=5, - type=13, - cpp_type=3, - label=1, - has_default_value=False, - default_value=0, - message_type=None, - enum_type=None, - containing_type=None, - is_extension=False, - extension_scope=None, - serialized_options=None, - file=DESCRIPTOR, - ), - _descriptor.FieldDescriptor( - name="r", - full_name="Message.r", - index=5, - number=6, - type=11, - cpp_type=10, - label=3, - has_default_value=False, - default_value=[], - message_type=None, - enum_type=None, - containing_type=None, - is_extension=False, - extension_scope=None, - serialized_options=None, - file=DESCRIPTOR, - ), - _descriptor.FieldDescriptor( - name="method", - full_name="Message.method", - index=6, - number=7, - type=9, - cpp_type=9, - label=1, - has_default_value=False, - default_value=_b("").decode("utf-8"), - message_type=None, - enum_type=None, - containing_type=None, - is_extension=False, - extension_scope=None, - serialized_options=None, - file=DESCRIPTOR, - ), - ], - extensions=[], - nested_types=[], - enum_types=[], - serialized_options=None, - is_extendable=False, - syntax="proto3", - extension_ranges=[], - oneofs=[], - serialized_start=62, - serialized_end=178, +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) +_builder.BuildTopDescriptorsAndMessages( + DESCRIPTOR, "clickhouse_path.format_schemas.message_with_repeated_pb2", globals() ) +if _descriptor._USE_C_DESCRIPTORS == False: - -_DD = _descriptor.Descriptor( - name="dd", - full_name="dd", - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - _descriptor.FieldDescriptor( - name="name", - full_name="dd.name", - index=0, - number=1, - type=9, - cpp_type=9, - label=1, - has_default_value=False, - default_value=_b("").decode("utf-8"), - message_type=None, - enum_type=None, - containing_type=None, - is_extension=False, - extension_scope=None, - serialized_options=None, - file=DESCRIPTOR, - ), - _descriptor.FieldDescriptor( - name="class", - full_name="dd.class", - index=1, - number=2, - type=13, - cpp_type=3, - label=1, - has_default_value=False, - default_value=0, - message_type=None, - enum_type=None, - containing_type=None, - is_extension=False, - extension_scope=None, - serialized_options=None, - file=DESCRIPTOR, - ), - _descriptor.FieldDescriptor( - name="type", - full_name="dd.type", - index=2, - number=3, - type=13, - cpp_type=3, - label=1, - has_default_value=False, - default_value=0, - message_type=None, - enum_type=None, - containing_type=None, - is_extension=False, - extension_scope=None, - serialized_options=None, - file=DESCRIPTOR, - ), - _descriptor.FieldDescriptor( - name="ttl", - full_name="dd.ttl", - index=3, - number=4, - type=4, - cpp_type=4, - label=1, - has_default_value=False, - default_value=0, - message_type=None, - enum_type=None, - containing_type=None, - is_extension=False, - extension_scope=None, - serialized_options=None, - file=DESCRIPTOR, - ), - _descriptor.FieldDescriptor( - name="data", - full_name="dd.data", - index=4, - number=5, - type=12, - cpp_type=9, - label=1, - has_default_value=False, - default_value=_b(""), - message_type=None, - enum_type=None, - containing_type=None, - is_extension=False, - extension_scope=None, - serialized_options=None, - file=DESCRIPTOR, - ), - ], - extensions=[], - nested_types=[], - enum_types=[], - serialized_options=None, - is_extendable=False, - syntax="proto3", - extension_ranges=[], - oneofs=[], - serialized_start=180, - serialized_end=254, -) - -_MESSAGE.fields_by_name["r"].message_type = _DD -DESCRIPTOR.message_types_by_name["Message"] = _MESSAGE -DESCRIPTOR.message_types_by_name["dd"] = _DD -_sym_db.RegisterFileDescriptor(DESCRIPTOR) - -Message = _reflection.GeneratedProtocolMessageType( - "Message", - (_message.Message,), - dict( - DESCRIPTOR=_MESSAGE, - __module__="clickhouse_path.format_schemas.message_with_repeated_pb2" - # @@protoc_insertion_point(class_scope:Message) - ), -) -_sym_db.RegisterMessage(Message) - -dd = _reflection.GeneratedProtocolMessageType( - "dd", - (_message.Message,), - dict( - DESCRIPTOR=_DD, - __module__="clickhouse_path.format_schemas.message_with_repeated_pb2" - # @@protoc_insertion_point(class_scope:dd) - ), -) -_sym_db.RegisterMessage(dd) - - -DESCRIPTOR._options = None + DESCRIPTOR._options = None + DESCRIPTOR._serialized_options = b"H\001" + _MESSAGE._serialized_start = 62 + _MESSAGE._serialized_end = 178 + _DD._serialized_start = 180 + _DD._serialized_end = 254 # @@protoc_insertion_point(module_scope) diff --git a/tests/integration/test_storage_kafka/social_pb2.py b/tests/integration/test_storage_kafka/social_pb2.py index 429572a0b45..830ade81d33 100644 --- a/tests/integration/test_storage_kafka/social_pb2.py +++ b/tests/integration/test_storage_kafka/social_pb2.py @@ -1,10 +1,10 @@ # -*- coding: utf-8 -*- # Generated by the protocol buffer compiler. DO NOT EDIT! -# source: social.proto - +# source: clickhouse_path/format_schemas/social.proto +"""Generated protocol buffer code.""" +from google.protobuf.internal import builder as _builder from google.protobuf import descriptor as _descriptor -from google.protobuf import message as _message -from google.protobuf import reflection as _reflection +from google.protobuf import descriptor_pool as _descriptor_pool from google.protobuf import symbol_database as _symbol_database # @@protoc_insertion_point(imports) @@ -12,84 +12,17 @@ from google.protobuf import symbol_database as _symbol_database _sym_db = _symbol_database.Default() -DESCRIPTOR = _descriptor.FileDescriptor( - name="social.proto", - package="", - syntax="proto3", - serialized_options=None, - serialized_pb=b'\n\x0csocial.proto"+\n\x04User\x12\x10\n\x08username\x18\x01 \x01(\t\x12\x11\n\ttimestamp\x18\x02 \x01(\x05\x62\x06proto3', +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( + b'\n+clickhouse_path/format_schemas/social.proto"+\n\x04User\x12\x10\n\x08username\x18\x01 \x01(\t\x12\x11\n\ttimestamp\x18\x02 \x01(\x05\x62\x06proto3' ) - -_USER = _descriptor.Descriptor( - name="User", - full_name="User", - filename=None, - file=DESCRIPTOR, - containing_type=None, - fields=[ - _descriptor.FieldDescriptor( - name="username", - full_name="User.username", - index=0, - number=1, - type=9, - cpp_type=9, - label=1, - has_default_value=False, - default_value=b"".decode("utf-8"), - message_type=None, - enum_type=None, - containing_type=None, - is_extension=False, - extension_scope=None, - serialized_options=None, - file=DESCRIPTOR, - ), - _descriptor.FieldDescriptor( - name="timestamp", - full_name="User.timestamp", - index=1, - number=2, - type=5, - cpp_type=1, - label=1, - has_default_value=False, - default_value=0, - message_type=None, - enum_type=None, - containing_type=None, - is_extension=False, - extension_scope=None, - serialized_options=None, - file=DESCRIPTOR, - ), - ], - extensions=[], - nested_types=[], - enum_types=[], - serialized_options=None, - is_extendable=False, - syntax="proto3", - extension_ranges=[], - oneofs=[], - serialized_start=16, - serialized_end=59, +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) +_builder.BuildTopDescriptorsAndMessages( + DESCRIPTOR, "clickhouse_path.format_schemas.social_pb2", globals() ) +if _descriptor._USE_C_DESCRIPTORS == False: -DESCRIPTOR.message_types_by_name["User"] = _USER -_sym_db.RegisterFileDescriptor(DESCRIPTOR) - -User = _reflection.GeneratedProtocolMessageType( - "User", - (_message.Message,), - { - "DESCRIPTOR": _USER, - "__module__": "social_pb2" - # @@protoc_insertion_point(class_scope:User) - }, -) -_sym_db.RegisterMessage(User) - - + DESCRIPTOR._options = None + _USER._serialized_start = 47 + _USER._serialized_end = 90 # @@protoc_insertion_point(module_scope)