Merge branch 'master' into revert-37534-revert-37036-keeper-preprocess-operations

This commit is contained in:
Antonio Andelic 2022-06-07 09:11:41 +00:00
commit f25098e48d
23 changed files with 364 additions and 561 deletions

View File

@ -953,13 +953,34 @@ void ActionsMatcher::visit(const ASTFunction & node, const ASTPtr & ast, Data &
if (AggregateFunctionFactory::instance().isAggregateFunctionName(node.name))
return;
FunctionOverloadResolverPtr function_builder = UserDefinedExecutableFunctionFactory::instance().tryGet(node.name, data.getContext());
FunctionOverloadResolverPtr function_builder;
auto current_context = data.getContext();
if (UserDefinedExecutableFunctionFactory::instance().has(node.name, current_context))
{
Array parameters;
if (node.parameters)
{
auto & node_parameters = node.parameters->children;
size_t parameters_size = node_parameters.size();
parameters.resize(parameters_size);
for (size_t i = 0; i < parameters_size; ++i)
{
ASTPtr literal = evaluateConstantExpressionAsLiteral(node_parameters[i], current_context);
parameters[i] = literal->as<ASTLiteral>()->value;
}
}
function_builder = UserDefinedExecutableFunctionFactory::instance().tryGet(node.name, current_context, parameters);
}
if (!function_builder)
{
try
{
function_builder = FunctionFactory::instance().get(node.name, data.getContext());
function_builder = FunctionFactory::instance().get(node.name, current_context);
}
catch (Exception & e)
{

View File

@ -1,6 +1,7 @@
#include "ExternalUserDefinedExecutableFunctionsLoader.h"
#include <boost/algorithm/string/split.hpp>
#include <Common/StringUtils/StringUtils.h>
#include <DataTypes/DataTypeFactory.h>
@ -17,6 +18,80 @@ namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int FUNCTION_ALREADY_EXISTS;
extern const int UNSUPPORTED_METHOD;
extern const int TYPE_MISMATCH;
}
namespace
{
/** Extract parameters from command and replace them with parameter names placeholders.
* Example: test_script.py {parameter_name: UInt64}
* After run function: test_script.py {parameter_name}
*/
std::vector<UserDefinedExecutableFunctionParameter> extractParametersFromCommand(String & command_value)
{
std::vector<UserDefinedExecutableFunctionParameter> parameters;
std::unordered_map<std::string_view, DataTypePtr> parameter_name_to_type;
size_t previous_parameter_match_position = 0;
while (true)
{
auto start_parameter_pos = command_value.find('{', previous_parameter_match_position);
if (start_parameter_pos == std::string::npos)
break;
auto end_parameter_pos = command_value.find('}', start_parameter_pos);
if (end_parameter_pos == std::string::npos)
break;
previous_parameter_match_position = start_parameter_pos + 1;
auto semicolon_pos = command_value.find(':', start_parameter_pos);
if (semicolon_pos == std::string::npos)
break;
else if (semicolon_pos > end_parameter_pos)
continue;
std::string parameter_name(command_value.data() + start_parameter_pos + 1, command_value.data() + semicolon_pos);
trim(parameter_name);
bool is_identifier = std::all_of(parameter_name.begin(), parameter_name.end(), [](char character)
{
return isWordCharASCII(character);
});
if (parameter_name.empty() && !is_identifier)
continue;
std::string data_type_name(command_value.data() + semicolon_pos + 1, command_value.data() + end_parameter_pos);
trim(data_type_name);
if (data_type_name.empty())
continue;
DataTypePtr parameter_data_type = DataTypeFactory::instance().get(data_type_name);
auto parameter_name_to_type_it = parameter_name_to_type.find(parameter_name);
if (parameter_name_to_type_it != parameter_name_to_type.end() && !parameter_data_type->equals(*parameter_name_to_type_it->second))
throw Exception(ErrorCodes::TYPE_MISMATCH,
"Multiple parameters with same name {} does not have same type. Expected {}. Actual {}",
parameter_name,
parameter_name_to_type_it->second->getName(),
parameter_data_type->getName());
size_t replace_size = end_parameter_pos - start_parameter_pos - 1;
command_value.replace(start_parameter_pos + 1, replace_size, parameter_name);
previous_parameter_match_position = start_parameter_pos + parameter_name.size();
if (parameter_name_to_type_it == parameter_name_to_type.end())
{
parameters.emplace_back(UserDefinedExecutableFunctionParameter{std::move(parameter_name), std::move(parameter_data_type)});
auto & last_parameter = parameters.back();
parameter_name_to_type.emplace(last_parameter.name, last_parameter.type);
}
}
return parameters;
}
}
ExternalUserDefinedExecutableFunctionsLoader::ExternalUserDefinedExecutableFunctionsLoader(ContextPtr global_context_)
@ -71,6 +146,8 @@ ExternalLoader::LoadablePtr ExternalUserDefinedExecutableFunctionsLoader::create
bool execute_direct = config.getBool(key_in_config + ".execute_direct", true);
String command_value = config.getString(key_in_config + ".command");
std::vector<UserDefinedExecutableFunctionParameter> parameters = extractParametersFromCommand(command_value);
std::vector<String> command_arguments;
if (execute_direct)
@ -137,12 +214,17 @@ ExternalLoader::LoadablePtr ExternalUserDefinedExecutableFunctionsLoader::create
arguments.emplace_back(std::move(argument));
}
if (is_executable_pool && !parameters.empty())
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
"Executable user defined functions with `executable_pool` type does not support parameters");
UserDefinedExecutableFunctionConfiguration function_configuration
{
.name = name,
.command = std::move(command_value),
.command_arguments = std::move(command_arguments),
.arguments = std::move(arguments),
.parameters = std::move(parameters),
.result_type = std::move(result_type),
.result_name = std::move(result_name),
};

View File

@ -758,7 +758,9 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
ASTPtr MutationsInterpreter::prepareInterpreterSelectQuery(std::vector<Stage> & prepared_stages, bool dry_run)
{
NamesAndTypesList all_columns = metadata_snapshot->getColumns().getAllPhysical();
auto storage_snapshot = storage->getStorageSnapshot(metadata_snapshot, context);
auto options = GetColumnsOptions(GetColumnsOptions::AllPhysical).withExtendedObjects();
auto all_columns = storage_snapshot->getColumns(options);
/// Next, for each stage calculate columns changed by this and previous stages.
for (size_t i = 0; i < prepared_stages.size(); ++i)
@ -802,7 +804,7 @@ ASTPtr MutationsInterpreter::prepareInterpreterSelectQuery(std::vector<Stage> &
/// e.g. ALTER referencing the same table in scalar subquery
bool execute_scalar_subqueries = !dry_run;
auto syntax_result = TreeRewriter(context).analyze(
all_asts, all_columns, storage, storage->getStorageSnapshot(metadata_snapshot, context),
all_asts, all_columns, storage, storage_snapshot,
false, true, execute_scalar_subqueries);
if (execute_scalar_subqueries && context->hasQueryContext())

View File

@ -16,12 +16,19 @@ struct UserDefinedExecutableFunctionArgument
String name;
};
struct UserDefinedExecutableFunctionParameter
{
String name;
DataTypePtr type;
};
struct UserDefinedExecutableFunctionConfiguration
{
std::string name;
std::string command;
std::vector<std::string> command_arguments;
std::vector<UserDefinedExecutableFunctionArgument> arguments;
std::vector<UserDefinedExecutableFunctionParameter> parameters;
DataTypePtr result_type;
String result_name;
};

View File

@ -3,6 +3,8 @@
#include <filesystem>
#include <Common/filesystemHelpers.h>
#include <Common/FieldVisitorToString.h>
#include <DataTypes/FieldToDataType.h>
#include <Processors/Sources/ShellCommandSource.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
@ -11,6 +13,7 @@
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <Interpreters/convertFieldToType.h>
#include <Interpreters/ExternalUserDefinedExecutableFunctionsLoader.h>
#include <Interpreters/Context.h>
#include <Interpreters/castColumn.h>
@ -22,6 +25,7 @@ namespace DB
namespace ErrorCodes
{
extern const int UNSUPPORTED_METHOD;
extern const int BAD_ARGUMENTS;
}
class UserDefinedFunction final : public IFunction
@ -30,10 +34,65 @@ public:
explicit UserDefinedFunction(
ExternalUserDefinedExecutableFunctionsLoader::UserDefinedExecutableFunctionPtr executable_function_,
ContextPtr context_)
ContextPtr context_,
Array parameters_)
: executable_function(std::move(executable_function_))
, context(context_)
{
const auto & configuration = executable_function->getConfiguration();
size_t command_parameters_size = configuration.parameters.size();
if (command_parameters_size != parameters_.size())
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Executable user defined function {} number of parameters does not match. Expected {}. Actual {}",
configuration.name,
command_parameters_size,
parameters_.size());
command_with_parameters = configuration.command;
command_arguments_with_parameters = configuration.command_arguments;
for (size_t i = 0; i < command_parameters_size; ++i)
{
const auto & command_parameter = configuration.parameters[i];
const auto & parameter_value = parameters_[i];
auto converted_parameter = convertFieldToTypeOrThrow(parameter_value, *command_parameter.type);
auto parameter_placeholder = "{" + command_parameter.name + "}";
auto parameter_value_string = applyVisitor(FieldVisitorToString(), converted_parameter);
bool find_placedholder = false;
auto try_replace_parameter_placeholder_with_value = [&](std::string & command_part)
{
size_t previous_parameter_placeholder_position = 0;
while (true)
{
auto parameter_placeholder_position = command_part.find(parameter_placeholder, previous_parameter_placeholder_position);
if (parameter_placeholder_position == std::string::npos)
break;
size_t parameter_placeholder_size = parameter_placeholder.size();
command_part.replace(parameter_placeholder_position, parameter_placeholder_size, parameter_value_string);
previous_parameter_placeholder_position = parameter_placeholder_position + parameter_value_string.size();
find_placedholder = true;
}
find_placedholder = true;
};
for (auto & command_argument : command_arguments_with_parameters)
try_replace_parameter_placeholder_with_value(command_argument);
try_replace_parameter_placeholder_with_value(command_with_parameters);
if (!find_placedholder)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Executable user defined function {} no placeholder for parameter {}",
configuration.name,
command_parameter.name);
}
}
}
String getName() const override { return executable_function->getConfiguration().name; }
@ -63,7 +122,7 @@ public:
const auto & coordinator_configuration = coordinator->getConfiguration();
const auto & configuration = executable_function->getConfiguration();
String command = configuration.command;
String command = command_with_parameters;
if (coordinator_configuration.execute_direct)
{
@ -134,7 +193,7 @@ public:
Pipe pipe = coordinator->createPipe(
command,
configuration.command_arguments,
command_arguments_with_parameters,
std::move(shell_input_pipes),
result_block,
context,
@ -165,9 +224,10 @@ public:
}
private:
ExternalUserDefinedExecutableFunctionsLoader::UserDefinedExecutableFunctionPtr executable_function;
ContextPtr context;
String command_with_parameters;
std::vector<std::string> command_arguments_with_parameters;
};
UserDefinedExecutableFunctionFactory & UserDefinedExecutableFunctionFactory::instance()
@ -176,15 +236,15 @@ UserDefinedExecutableFunctionFactory & UserDefinedExecutableFunctionFactory::ins
return result;
}
FunctionOverloadResolverPtr UserDefinedExecutableFunctionFactory::get(const String & function_name, ContextPtr context)
FunctionOverloadResolverPtr UserDefinedExecutableFunctionFactory::get(const String & function_name, ContextPtr context, Array parameters)
{
const auto & loader = context->getExternalUserDefinedExecutableFunctionsLoader();
auto executable_function = std::static_pointer_cast<const UserDefinedExecutableFunction>(loader.load(function_name));
auto function = std::make_shared<UserDefinedFunction>(std::move(executable_function), std::move(context));
auto function = std::make_shared<UserDefinedFunction>(std::move(executable_function), std::move(context), std::move(parameters));
return std::make_unique<FunctionToOverloadResolverAdaptor>(std::move(function));
}
FunctionOverloadResolverPtr UserDefinedExecutableFunctionFactory::tryGet(const String & function_name, ContextPtr context)
FunctionOverloadResolverPtr UserDefinedExecutableFunctionFactory::tryGet(const String & function_name, ContextPtr context, Array parameters)
{
const auto & loader = context->getExternalUserDefinedExecutableFunctionsLoader();
auto load_result = loader.getLoadResult(function_name);
@ -192,7 +252,7 @@ FunctionOverloadResolverPtr UserDefinedExecutableFunctionFactory::tryGet(const S
if (load_result.object)
{
auto executable_function = std::static_pointer_cast<const UserDefinedExecutableFunction>(load_result.object);
auto function = std::make_shared<UserDefinedFunction>(std::move(executable_function), std::move(context));
auto function = std::make_shared<UserDefinedFunction>(std::move(executable_function), std::move(context), std::move(parameters));
return std::make_unique<FunctionToOverloadResolverAdaptor>(std::move(function));
}

View File

@ -5,9 +5,9 @@
#include <string>
#include <unordered_map>
#include <Common/NamePrompter.h>
#include <Interpreters/Context_fwd.h>
#include <Core/Field.h>
#include <Functions/IFunction.h>
#include <Interpreters/Context_fwd.h>
namespace DB
@ -20,9 +20,9 @@ public:
static UserDefinedExecutableFunctionFactory & instance();
static FunctionOverloadResolverPtr get(const String & function_name, ContextPtr context);
static FunctionOverloadResolverPtr get(const String & function_name, ContextPtr context, Array parameters = {});
static FunctionOverloadResolverPtr tryGet(const String & function_name, ContextPtr context);
static FunctionOverloadResolverPtr tryGet(const String & function_name, ContextPtr context, Array parameters = {});
static bool has(const String & function_name, ContextPtr context);

View File

@ -149,7 +149,6 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
global_ctx->merging_columns,
global_ctx->merging_column_names);
auto local_single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + global_ctx->future_part->name, ctx->disk, 0);
global_ctx->new_data_part = global_ctx->data->createPart(
global_ctx->future_part->name,

View File

@ -621,7 +621,9 @@ struct MutationContext
FutureMergedMutatedPartPtr future_part;
MergeTreeData::DataPartPtr source_part;
StoragePtr storage_from_source_part;
StorageMetadataPtr metadata_snapshot;
MutationCommandsConstPtr commands;
time_t time_of_mutation;
ContextPtr context;
@ -1370,6 +1372,11 @@ MutateTask::MutateTask(
ctx->space_reservation = space_reservation_;
ctx->storage_columns = metadata_snapshot_->getColumns().getAllPhysical();
ctx->txn = txn;
ctx->source_part = ctx->future_part->parts[0];
ctx->storage_from_source_part = std::make_shared<StorageFromMergeTreeDataPart>(ctx->source_part);
auto storage_snapshot = ctx->storage_from_source_part->getStorageSnapshot(ctx->metadata_snapshot, context_);
extendObjectColumns(ctx->storage_columns, storage_snapshot->object_columns, /*with_subcolumns=*/ false);
}
@ -1408,8 +1415,6 @@ bool MutateTask::prepare()
"This is a bug.", toString(ctx->future_part->parts.size()));
ctx->num_mutations = std::make_unique<CurrentMetrics::Increment>(CurrentMetrics::PartMutation);
ctx->source_part = ctx->future_part->parts[0];
auto storage_from_source_part = std::make_shared<StorageFromMergeTreeDataPart>(ctx->source_part);
auto context_for_reading = Context::createCopy(ctx->context);
context_for_reading->setSetting("max_streams_to_max_threads_ratio", 1);
@ -1420,13 +1425,13 @@ bool MutateTask::prepare()
for (const auto & command : *ctx->commands)
{
if (command.partition == nullptr || ctx->future_part->parts[0]->info.partition_id == ctx->data->getPartitionIDFromQuery(
if (command.partition == nullptr || ctx->source_part->info.partition_id == ctx->data->getPartitionIDFromQuery(
command.partition, context_for_reading))
ctx->commands_for_part.emplace_back(command);
}
if (ctx->source_part->isStoredOnDisk() && !isStorageTouchedByMutations(
storage_from_source_part, ctx->metadata_snapshot, ctx->commands_for_part, Context::createCopy(context_for_reading)))
ctx->storage_from_source_part, ctx->metadata_snapshot, ctx->commands_for_part, Context::createCopy(context_for_reading)))
{
LOG_TRACE(ctx->log, "Part {} doesn't change up to mutation version {}", ctx->source_part->name, ctx->future_part->part_info.mutation);
promise.set_value(ctx->data->cloneAndLoadDataPartOnSameDisk(ctx->source_part, "tmp_clone_", ctx->future_part->part_info, ctx->metadata_snapshot, ctx->txn, &ctx->hardlinked_files, false));
@ -1444,7 +1449,7 @@ bool MutateTask::prepare()
if (!ctx->for_interpreter.empty())
{
ctx->interpreter = std::make_unique<MutationsInterpreter>(
storage_from_source_part, ctx->metadata_snapshot, ctx->for_interpreter, context_for_reading, true);
ctx->storage_from_source_part, ctx->metadata_snapshot, ctx->for_interpreter, context_for_reading, true);
ctx->materialized_indices = ctx->interpreter->grabMaterializedIndices();
ctx->materialized_projections = ctx->interpreter->grabMaterializedProjections();
ctx->mutation_kind = ctx->interpreter->getMutationKind();

View File

@ -3,6 +3,7 @@
#include <Storages/IStorage.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <DataTypes/ObjectUtils.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
@ -36,6 +37,20 @@ public:
String getName() const override { return "FromMergeTreeDataPart"; }
StorageSnapshotPtr getStorageSnapshot(
const StorageMetadataPtr & metadata_snapshot, ContextPtr /*query_context*/) const override
{
const auto & storage_columns = metadata_snapshot->getColumns();
if (!hasObjectColumns(storage_columns))
return std::make_shared<StorageSnapshot>(*this, metadata_snapshot);
auto object_columns = getObjectColumns(
parts.begin(), parts.end(),
storage_columns, [](const auto & part) -> const auto & { return part->getColumns(); });
return std::make_shared<StorageSnapshot>(*this, metadata_snapshot, object_columns);
}
void read(
QueryPlan & query_plan,
const Names & column_names,
@ -63,6 +78,8 @@ public:
bool supportsIndexForIn() const override { return true; }
bool supportsDynamicSubcolumns() const override { return true; }
bool mayBenefitFromIndexForIn(
const ASTPtr & left_in_operand, ContextPtr query_context, const StorageMetadataPtr & metadata_snapshot) const override
{

View File

@ -13,4 +13,18 @@
<command>cd /; clickhouse-local --input-format TabSeparated --output-format TabSeparated --structure 'x UInt64, y UInt64' --query "SELECT x + y FROM table"</command>
<execute_direct>0</execute_direct>
</function>
<function>
<type>executable</type>
<name>test_function_with_parameter</name>
<return_type>UInt64</return_type>
<argument>
<type>UInt64</type>
</argument>
<argument>
<type>UInt64</type>
</argument>
<format>TabSeparated</format>
<command>cd /; clickhouse-local --input-format TabSeparated --output-format TabSeparated --structure 'x UInt64, y UInt64' --query "SELECT x + y + {test_parameter : UInt64} FROM table"</command>
<execute_direct>0</execute_direct>
</function>
</functions>

View File

@ -311,4 +311,15 @@
<command>input_nullable.py</command>
</function>
<function>
<type>executable</type>
<name>test_function_parameter_python</name>
<return_type>String</return_type>
<argument>
<type>UInt64</type>
</argument>
<format>TabSeparated</format>
<command>input_parameter.py {test_parameter:UInt64}</command>
</function>
</functions>

View File

@ -263,3 +263,25 @@ def test_executable_function_input_nullable_python(started_cluster):
)
== "Key 0\nKey Nullable\nKey 2\n"
)
def test_executable_function_parameter_python(started_cluster):
skip_test_msan(node)
assert node.query_and_get_error(
"SELECT test_function_parameter_python(2,2)(toUInt64(1))"
)
assert node.query_and_get_error("SELECT test_function_parameter_python(2,2)(1)")
assert node.query_and_get_error("SELECT test_function_parameter_python(1)")
assert node.query_and_get_error(
"SELECT test_function_parameter_python('test')(toUInt64(1))"
)
assert (
node.query("SELECT test_function_parameter_python('2')(toUInt64(1))")
== "Parameter 2 key 1\n"
)
assert (
node.query("SELECT test_function_parameter_python(2)(toUInt64(1))")
== "Parameter 2 key 1\n"
)

View File

@ -0,0 +1,8 @@
#!/usr/bin/python3
import sys
if __name__ == "__main__":
for line in sys.stdin:
print("Parameter " + str(sys.argv[1]) + " key " + str(line), end="")
sys.stdout.flush()

View File

@ -158,7 +158,7 @@ def test_replicated_balanced_merge_fetch(start_cluster):
node.query("create table tmp2 as tmp1")
node2.query("alter table tbl modify setting always_fetch_merged_part = 1")
p = Pool(20)
p = Pool(5)
def task(i):
print("Processing insert {}/{}".format(i, 200))
@ -166,6 +166,8 @@ def test_replicated_balanced_merge_fetch(start_cluster):
node1.query(
"insert into tbl select randConstant() % 2, randomPrintableASCII(16) from numbers(50)"
)
# Fill jbod disks with garbage data
node1.query(
"insert into tmp1 select randConstant() % 2, randomPrintableASCII(16) from numbers(50)"
)

View File

@ -1,93 +1,28 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: clickhouse_path/format_schemas/kafka.proto
import sys
_b = sys.version_info[0] < 3 and (lambda x: x) or (lambda x: x.encode("latin1"))
"""Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name="clickhouse_path/format_schemas/kafka.proto",
package="",
syntax="proto3",
serialized_pb=_b(
'\n*clickhouse_path/format_schemas/kafka.proto"*\n\x0cKeyValuePair\x12\x0b\n\x03key\x18\x01 \x01(\x04\x12\r\n\x05value\x18\x02 \x01(\tb\x06proto3'
),
)
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
_KEYVALUEPAIR = _descriptor.Descriptor(
name="KeyValuePair",
full_name="KeyValuePair",
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name="key",
full_name="KeyValuePair.key",
index=0,
number=1,
type=4,
cpp_type=4,
label=1,
has_default_value=False,
default_value=0,
message_type=None,
enum_type=None,
containing_type=None,
is_extension=False,
extension_scope=None,
options=None,
),
_descriptor.FieldDescriptor(
name="value",
full_name="KeyValuePair.value",
index=1,
number=2,
type=9,
cpp_type=9,
label=1,
has_default_value=False,
default_value=_b("").decode("utf-8"),
message_type=None,
enum_type=None,
containing_type=None,
is_extension=False,
extension_scope=None,
options=None,
),
],
extensions=[],
nested_types=[],
enum_types=[],
options=None,
is_extendable=False,
syntax="proto3",
extension_ranges=[],
oneofs=[],
serialized_start=46,
serialized_end=88,
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
b'\n*clickhouse_path/format_schemas/kafka.proto"*\n\x0cKeyValuePair\x12\x0b\n\x03key\x18\x01 \x01(\x04\x12\r\n\x05value\x18\x02 \x01(\tb\x06proto3'
)
DESCRIPTOR.message_types_by_name["KeyValuePair"] = _KEYVALUEPAIR
KeyValuePair = _reflection.GeneratedProtocolMessageType(
"KeyValuePair",
(_message.Message,),
dict(
DESCRIPTOR=_KEYVALUEPAIR,
__module__="clickhouse_path.format_schemas.kafka_pb2"
# @@protoc_insertion_point(class_scope:KeyValuePair)
),
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(
DESCRIPTOR, "clickhouse_path.format_schemas.kafka_pb2", globals()
)
_sym_db.RegisterMessage(KeyValuePair)
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
_KEYVALUEPAIR._serialized_start = 46
_KEYVALUEPAIR._serialized_end = 88
# @@protoc_insertion_point(module_scope)

View File

@ -1,12 +1,10 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: clickhouse_path/format_schemas/message_with_repeated.proto
import sys
_b = sys.version_info[0] < 3 and (lambda x: x) or (lambda x: x.encode("latin1"))
"""Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
@ -14,301 +12,20 @@ from google.protobuf import symbol_database as _symbol_database
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name="clickhouse_path/format_schemas/message_with_repeated.proto",
package="",
syntax="proto3",
serialized_options=_b("H\001"),
serialized_pb=_b(
'\n:clickhouse_path/format_schemas/message_with_repeated.proto"t\n\x07Message\x12\x0c\n\x04tnow\x18\x01 \x01(\r\x12\x0e\n\x06server\x18\x02 \x01(\t\x12\r\n\x05\x63lien\x18\x03 \x01(\t\x12\r\n\x05sPort\x18\x04 \x01(\r\x12\r\n\x05\x63Port\x18\x05 \x01(\r\x12\x0e\n\x01r\x18\x06 \x03(\x0b\x32\x03.dd\x12\x0e\n\x06method\x18\x07 \x01(\t"J\n\x02\x64\x64\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\r\n\x05\x63lass\x18\x02 \x01(\r\x12\x0c\n\x04type\x18\x03 \x01(\r\x12\x0b\n\x03ttl\x18\x04 \x01(\x04\x12\x0c\n\x04\x64\x61ta\x18\x05 \x01(\x0c\x42\x02H\x01\x62\x06proto3'
),
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
b'\n:clickhouse_path/format_schemas/message_with_repeated.proto"t\n\x07Message\x12\x0c\n\x04tnow\x18\x01 \x01(\r\x12\x0e\n\x06server\x18\x02 \x01(\t\x12\r\n\x05\x63lien\x18\x03 \x01(\t\x12\r\n\x05sPort\x18\x04 \x01(\r\x12\r\n\x05\x63Port\x18\x05 \x01(\r\x12\x0e\n\x01r\x18\x06 \x03(\x0b\x32\x03.dd\x12\x0e\n\x06method\x18\x07 \x01(\t"J\n\x02\x64\x64\x12\x0c\n\x04name\x18\x01 \x01(\t\x12\r\n\x05\x63lass\x18\x02 \x01(\r\x12\x0c\n\x04type\x18\x03 \x01(\r\x12\x0b\n\x03ttl\x18\x04 \x01(\x04\x12\x0c\n\x04\x64\x61ta\x18\x05 \x01(\x0c\x42\x02H\x01\x62\x06proto3'
)
_MESSAGE = _descriptor.Descriptor(
name="Message",
full_name="Message",
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name="tnow",
full_name="Message.tnow",
index=0,
number=1,
type=13,
cpp_type=3,
label=1,
has_default_value=False,
default_value=0,
message_type=None,
enum_type=None,
containing_type=None,
is_extension=False,
extension_scope=None,
serialized_options=None,
file=DESCRIPTOR,
),
_descriptor.FieldDescriptor(
name="server",
full_name="Message.server",
index=1,
number=2,
type=9,
cpp_type=9,
label=1,
has_default_value=False,
default_value=_b("").decode("utf-8"),
message_type=None,
enum_type=None,
containing_type=None,
is_extension=False,
extension_scope=None,
serialized_options=None,
file=DESCRIPTOR,
),
_descriptor.FieldDescriptor(
name="clien",
full_name="Message.clien",
index=2,
number=3,
type=9,
cpp_type=9,
label=1,
has_default_value=False,
default_value=_b("").decode("utf-8"),
message_type=None,
enum_type=None,
containing_type=None,
is_extension=False,
extension_scope=None,
serialized_options=None,
file=DESCRIPTOR,
),
_descriptor.FieldDescriptor(
name="sPort",
full_name="Message.sPort",
index=3,
number=4,
type=13,
cpp_type=3,
label=1,
has_default_value=False,
default_value=0,
message_type=None,
enum_type=None,
containing_type=None,
is_extension=False,
extension_scope=None,
serialized_options=None,
file=DESCRIPTOR,
),
_descriptor.FieldDescriptor(
name="cPort",
full_name="Message.cPort",
index=4,
number=5,
type=13,
cpp_type=3,
label=1,
has_default_value=False,
default_value=0,
message_type=None,
enum_type=None,
containing_type=None,
is_extension=False,
extension_scope=None,
serialized_options=None,
file=DESCRIPTOR,
),
_descriptor.FieldDescriptor(
name="r",
full_name="Message.r",
index=5,
number=6,
type=11,
cpp_type=10,
label=3,
has_default_value=False,
default_value=[],
message_type=None,
enum_type=None,
containing_type=None,
is_extension=False,
extension_scope=None,
serialized_options=None,
file=DESCRIPTOR,
),
_descriptor.FieldDescriptor(
name="method",
full_name="Message.method",
index=6,
number=7,
type=9,
cpp_type=9,
label=1,
has_default_value=False,
default_value=_b("").decode("utf-8"),
message_type=None,
enum_type=None,
containing_type=None,
is_extension=False,
extension_scope=None,
serialized_options=None,
file=DESCRIPTOR,
),
],
extensions=[],
nested_types=[],
enum_types=[],
serialized_options=None,
is_extendable=False,
syntax="proto3",
extension_ranges=[],
oneofs=[],
serialized_start=62,
serialized_end=178,
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(
DESCRIPTOR, "clickhouse_path.format_schemas.message_with_repeated_pb2", globals()
)
_DD = _descriptor.Descriptor(
name="dd",
full_name="dd",
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name="name",
full_name="dd.name",
index=0,
number=1,
type=9,
cpp_type=9,
label=1,
has_default_value=False,
default_value=_b("").decode("utf-8"),
message_type=None,
enum_type=None,
containing_type=None,
is_extension=False,
extension_scope=None,
serialized_options=None,
file=DESCRIPTOR,
),
_descriptor.FieldDescriptor(
name="class",
full_name="dd.class",
index=1,
number=2,
type=13,
cpp_type=3,
label=1,
has_default_value=False,
default_value=0,
message_type=None,
enum_type=None,
containing_type=None,
is_extension=False,
extension_scope=None,
serialized_options=None,
file=DESCRIPTOR,
),
_descriptor.FieldDescriptor(
name="type",
full_name="dd.type",
index=2,
number=3,
type=13,
cpp_type=3,
label=1,
has_default_value=False,
default_value=0,
message_type=None,
enum_type=None,
containing_type=None,
is_extension=False,
extension_scope=None,
serialized_options=None,
file=DESCRIPTOR,
),
_descriptor.FieldDescriptor(
name="ttl",
full_name="dd.ttl",
index=3,
number=4,
type=4,
cpp_type=4,
label=1,
has_default_value=False,
default_value=0,
message_type=None,
enum_type=None,
containing_type=None,
is_extension=False,
extension_scope=None,
serialized_options=None,
file=DESCRIPTOR,
),
_descriptor.FieldDescriptor(
name="data",
full_name="dd.data",
index=4,
number=5,
type=12,
cpp_type=9,
label=1,
has_default_value=False,
default_value=_b(""),
message_type=None,
enum_type=None,
containing_type=None,
is_extension=False,
extension_scope=None,
serialized_options=None,
file=DESCRIPTOR,
),
],
extensions=[],
nested_types=[],
enum_types=[],
serialized_options=None,
is_extendable=False,
syntax="proto3",
extension_ranges=[],
oneofs=[],
serialized_start=180,
serialized_end=254,
)
_MESSAGE.fields_by_name["r"].message_type = _DD
DESCRIPTOR.message_types_by_name["Message"] = _MESSAGE
DESCRIPTOR.message_types_by_name["dd"] = _DD
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
Message = _reflection.GeneratedProtocolMessageType(
"Message",
(_message.Message,),
dict(
DESCRIPTOR=_MESSAGE,
__module__="clickhouse_path.format_schemas.message_with_repeated_pb2"
# @@protoc_insertion_point(class_scope:Message)
),
)
_sym_db.RegisterMessage(Message)
dd = _reflection.GeneratedProtocolMessageType(
"dd",
(_message.Message,),
dict(
DESCRIPTOR=_DD,
__module__="clickhouse_path.format_schemas.message_with_repeated_pb2"
# @@protoc_insertion_point(class_scope:dd)
),
)
_sym_db.RegisterMessage(dd)
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR._options = None
DESCRIPTOR._serialized_options = b"H\001"
_MESSAGE._serialized_start = 62
_MESSAGE._serialized_end = 178
_DD._serialized_start = 180
_DD._serialized_end = 254
# @@protoc_insertion_point(module_scope)

View File

@ -1,10 +1,10 @@
# -*- coding: utf-8 -*-
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: social.proto
# source: clickhouse_path/format_schemas/social.proto
"""Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
@ -12,84 +12,17 @@ from google.protobuf import symbol_database as _symbol_database
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name="social.proto",
package="",
syntax="proto3",
serialized_options=None,
serialized_pb=b'\n\x0csocial.proto"+\n\x04User\x12\x10\n\x08username\x18\x01 \x01(\t\x12\x11\n\ttimestamp\x18\x02 \x01(\x05\x62\x06proto3',
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
b'\n+clickhouse_path/format_schemas/social.proto"+\n\x04User\x12\x10\n\x08username\x18\x01 \x01(\t\x12\x11\n\ttimestamp\x18\x02 \x01(\x05\x62\x06proto3'
)
_USER = _descriptor.Descriptor(
name="User",
full_name="User",
filename=None,
file=DESCRIPTOR,
containing_type=None,
fields=[
_descriptor.FieldDescriptor(
name="username",
full_name="User.username",
index=0,
number=1,
type=9,
cpp_type=9,
label=1,
has_default_value=False,
default_value=b"".decode("utf-8"),
message_type=None,
enum_type=None,
containing_type=None,
is_extension=False,
extension_scope=None,
serialized_options=None,
file=DESCRIPTOR,
),
_descriptor.FieldDescriptor(
name="timestamp",
full_name="User.timestamp",
index=1,
number=2,
type=5,
cpp_type=1,
label=1,
has_default_value=False,
default_value=0,
message_type=None,
enum_type=None,
containing_type=None,
is_extension=False,
extension_scope=None,
serialized_options=None,
file=DESCRIPTOR,
),
],
extensions=[],
nested_types=[],
enum_types=[],
serialized_options=None,
is_extendable=False,
syntax="proto3",
extension_ranges=[],
oneofs=[],
serialized_start=16,
serialized_end=59,
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(
DESCRIPTOR, "clickhouse_path.format_schemas.social_pb2", globals()
)
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR.message_types_by_name["User"] = _USER
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
User = _reflection.GeneratedProtocolMessageType(
"User",
(_message.Message,),
{
"DESCRIPTOR": _USER,
"__module__": "social_pb2"
# @@protoc_insertion_point(class_scope:User)
},
)
_sym_db.RegisterMessage(User)
DESCRIPTOR._options = None
_USER._serialized_start = 47
_USER._serialized_end = 90
# @@protoc_insertion_point(module_scope)

View File

@ -2,95 +2,27 @@
# Generated by the protocol buffer compiler. DO NOT EDIT!
# source: clickhouse_path/format_schemas/rabbitmq.proto
"""Generated protocol buffer code."""
from google.protobuf.internal import builder as _builder
from google.protobuf import descriptor as _descriptor
from google.protobuf import message as _message
from google.protobuf import reflection as _reflection
from google.protobuf import descriptor_pool as _descriptor_pool
from google.protobuf import symbol_database as _symbol_database
# @@protoc_insertion_point(imports)
_sym_db = _symbol_database.Default()
DESCRIPTOR = _descriptor.FileDescriptor(
name="clickhouse_path/format_schemas/rabbitmq.proto",
package="",
syntax="proto3",
serialized_options=None,
create_key=_descriptor._internal_create_key,
serialized_pb=b'\n-clickhouse_path/format_schemas/rabbitmq.proto"+\n\rKeyValueProto\x12\x0b\n\x03key\x18\x01 \x01(\x04\x12\r\n\x05value\x18\x02 \x01(\tb\x06proto3',
DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(
b'\n-clickhouse_path/format_schemas/rabbitmq.proto"+\n\rKeyValueProto\x12\x0b\n\x03key\x18\x01 \x01(\x04\x12\r\n\x05value\x18\x02 \x01(\tb\x06proto3'
)
_KEYVALUEPROTO = _descriptor.Descriptor(
name="KeyValueProto",
full_name="KeyValueProto",
filename=None,
file=DESCRIPTOR,
containing_type=None,
create_key=_descriptor._internal_create_key,
fields=[
_descriptor.FieldDescriptor(
name="key",
full_name="KeyValueProto.key",
index=0,
number=1,
type=4,
cpp_type=4,
label=1,
has_default_value=False,
default_value=0,
message_type=None,
enum_type=None,
containing_type=None,
is_extension=False,
extension_scope=None,
serialized_options=None,
file=DESCRIPTOR,
create_key=_descriptor._internal_create_key,
),
_descriptor.FieldDescriptor(
name="value",
full_name="KeyValueProto.value",
index=1,
number=2,
type=9,
cpp_type=9,
label=1,
has_default_value=False,
default_value=b"".decode("utf-8"),
message_type=None,
enum_type=None,
containing_type=None,
is_extension=False,
extension_scope=None,
serialized_options=None,
file=DESCRIPTOR,
create_key=_descriptor._internal_create_key,
),
],
extensions=[],
nested_types=[],
enum_types=[],
serialized_options=None,
is_extendable=False,
syntax="proto3",
extension_ranges=[],
oneofs=[],
serialized_start=49,
serialized_end=92,
_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals())
_builder.BuildTopDescriptorsAndMessages(
DESCRIPTOR, "clickhouse_path.format_schemas.rabbitmq_pb2", globals()
)
if _descriptor._USE_C_DESCRIPTORS == False:
DESCRIPTOR.message_types_by_name["KeyValueProto"] = _KEYVALUEPROTO
_sym_db.RegisterFileDescriptor(DESCRIPTOR)
KeyValueProto = _reflection.GeneratedProtocolMessageType(
"KeyValueProto",
(_message.Message,),
{
"DESCRIPTOR": _KEYVALUEPROTO,
"__module__": "clickhouse_path.format_schemas.rabbitmq_pb2"
# @@protoc_insertion_point(class_scope:KeyValueProto)
},
)
_sym_db.RegisterMessage(KeyValueProto)
DESCRIPTOR._options = None
_KEYVALUEPROTO._serialized_start = 49
_KEYVALUEPROTO._serialized_end = 92
# @@protoc_insertion_point(module_scope)

View File

@ -0,0 +1,7 @@
1 q (1,2,[('aaa'),('bbb')])
2 w (3,4,[('ccc')])
3 e (5,6,[])
1 q (1,2,[('aaa'),('bbb')])
3 e (5,6,[])
1 foo
3 foo

View File

@ -0,0 +1,21 @@
-- Tags: no-fasttest
DROP TABLE IF EXISTS t_json_mutations;
SET allow_experimental_object_type = 1;
SET output_format_json_named_tuples_as_objects = 1;
SET mutations_sync = 2;
CREATE TABLE t_json_mutations(id UInt32, s String, obj JSON) ENGINE = MergeTree ORDER BY id;
INSERT INTO t_json_mutations VALUES (1, 'q', '{"k1": 1, "k2": 2, "k3": [{"k4": "aaa"}, {"k4": "bbb"}]}');
INSERT INTO t_json_mutations VALUES (2, 'w', '{"k1": 3, "k2": 4, "k3": [{"k4": "ccc"}]}');
INSERT INTO t_json_mutations VALUES (3, 'e', '{"k1": 5, "k2": 6}');
SELECT * FROM t_json_mutations ORDER BY id;
ALTER TABLE t_json_mutations DELETE WHERE id = 2;
SELECT * FROM t_json_mutations ORDER BY id;
ALTER TABLE t_json_mutations DROP COLUMN s, DROP COLUMN obj, ADD COLUMN t String DEFAULT 'foo';
SELECT * FROM t_json_mutations ORDER BY id;
DROP TABLE t_json_mutations;

View File

@ -0,0 +1,6 @@
SELECT test_function_with_parameter('test')(1, 2); --{serverError 53}
SELECT test_function_with_parameter(2, 2)(1, 2); --{serverError 36}
SELECT test_function_with_parameter(1, 2); --{serverError 36}
SELECT test_function_with_parameter(2)(1, 2);
SELECT test_function_with_parameter('2')(1, 2);