Merge pull request #34653 from kitaisreal/executable-udf-support-specify-argument-names

ExecutableUserDefinedFunctions allow to specify argument names
This commit is contained in:
Maksim Kita 2022-02-22 00:02:14 +01:00 committed by GitHub
commit 174257dad0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 376 additions and 25 deletions

View File

@ -74,9 +74,10 @@ A function configuration contains the following settings:
- `name` - a function name.
- `command` - script name to execute or command if `execute_direct` is false.
- `argument` - argument description with the `type` of an argument. Each argument is described in a separate setting.
- `argument` - argument description with the `type`, and optional `name` of an argument. Each argument is described in a separate setting. Specifying name is necessary if argument names are part of serialization for user defined function format like [Native](../../interfaces/formats.md#native) or [JSONEachRow](../../interfaces/formats.md#jsoneachrow). Default argument name value is `c` + argument_number.
- `format` - a [format](../../interfaces/formats.md) in which arguments are passed to the command.
- `return_type` - the type of a returned value.
- `return_name` - name of retuned value. Specifying return name is necessary if return name is part of serialization for user defined function format like [Native](../../interfaces/formats.md#native) or [JSONEachRow](../../interfaces/formats.md#jsoneachrow). Optional. Default value is `result`.
- `type` - an executable type. If `type` is set to `executable` then single command is started. If it is set to `executable_pool` then a pool of commands is created.
- `max_command_execution_time` - maximum execution time in seconds for processing block of data. This setting is valid for `executable_pool` commands only. Optional. Default value is `10`.
- `command_termination_timeout` - time in seconds during which a command should finish after its pipe is closed. After that time `SIGTERM` is sent to the process executing the command. Optional. Default value is `10`.
@ -100,6 +101,7 @@ File test_function.xml.
<return_type>String</return_type>
<argument>
<type>UInt64</type>
<name>value</name>
</argument>
<format>TabSeparated</format>
<command>test_function.py</command>
@ -144,9 +146,11 @@ File test_function.xml.
<return_type>UInt64</return_type>
<argument>
<type>UInt64</type>
<name>lhs</name>
</argument>
<argument>
<type>UInt64</type>
<name>rhs</name>
</argument>
<format>TabSeparated</format>
<command>cd /; clickhouse-local --input-format TabSeparated --output-format TabSeparated --structure 'x UInt64, y UInt64' --query "SELECT x + y FROM table"</command>
@ -169,6 +173,58 @@ Result:
└─────────────────────────┘
```
Creating `test_function_sum_json` with named arguments and format [JSONEachRow](../../interfaces/formats.md#jsoneachrow) using XML configuration.
File test_function.xml.
```xml
<function>
<type>executable</type>
<name>test_function_sum_json</name>
<return_type>UInt64</return_type>
<return_name>result_name</return_name>
<argument>
<type>UInt64</type>
<name>argument_1</name>
</argument>
<argument>
<type>UInt64</type>
<name>argument_2</name>
</argument>
<format>JSONEachRow</format>
<command>test_function_sum_json.py</command>
</function>
```
Script file inside `user_scripts` folder `test_function_sum_json.py`.
```python
#!/usr/bin/python3
import sys
import json
if __name__ == '__main__':
for line in sys.stdin:
value = json.loads(line)
first_arg = int(value['argument_1'])
second_arg = int(value['argument_2'])
result = {'result_name': first_arg + second_arg}
print(json.dumps(result), end='\n')
sys.stdout.flush()
```
Query:
``` sql
SELECT test_function_sum_json(2, 2);
```
Result:
``` text
┌─test_function_sum_json(2, 2)─┐
│ 4 │
└──────────────────────────────┘
```
## Error Handling {#error-handling}

View File

@ -72,37 +72,89 @@ ClickHouse может вызывать внешнюю программу или
Конфигурация функции содержит следующие настройки:
- `name` - имя функции.
- `command` - исполняемая команда или скрипт.
- `argument` - описание аргумента, содержащее его тип во вложенной настройке `type`. Каждый аргумент описывается отдельно.
- `command` - имя скрипта для выполнения или команды, если `execute_direct` равно false.
- `argument` - описание аргумента, содержащее его тип во вложенной настройке `type`, и опционально его имя во вложенной настройке `name`. Каждый аргумент описывается отдельно. Указание имени для аргумента необходимо, если имена аргументов являются частью сериализации для пользовательского формата функции, например [Native](../../interfaces/formats.md#native) или [JSONEachRow](../../interfaces/formats.md#jsoneachrow). Значение имени аргумента по умолчанию `c` + номер аргумента.
- `format` - [формат](../../interfaces/formats.md) передачи аргументов.
- `return_type` - тип возвращаемого значения.
- `return_name` - имя возвращаемого значения. Указание имени возвращаемого значения необходимо, если имя возвращаемого значения является частью сериализации для пользовательского формата функции, например [Native](../../interfaces/formats.md#native) или [JSONEachRow](../../interfaces/formats.md#jsoneachrow). Необязательный. Значение по умолчанию — `result`.
- `type` - вариант запуска команды. Если задан вариант `executable`, то запускается одна команда. При указании `executable_pool` создается пул команд.
- `max_command_execution_time` - максимальное время в секундах, которое отводится на обработку блока данных. Эта настройка применима только для команд с вариантом запуска `executable_pool`. Необязательная настройка. Значение по умолчанию `10`.
- `command_termination_timeout` - максимальное время завершения команды в секундах после закрытия конвейера. Если команда не завершается, то процессу отправляется сигнал `SIGTERM`. Эта настройка применима только для команд с вариантом запуска `executable_pool`. Необязательная настройка. Значение по умолчанию `10`.
- `command_read_timeout` - время ожидания чтения данных из команды stdout в миллисекундах. Значение по умолчанию 10000. Необязательная настройка.
- `command_write_timeout` - время ожидания записи данных в команду stdin в миллисекундах. Значение по умолчанию 10000. Необязательная настройка.
- `pool_size` - размер пула команд. Необязательная настройка. Значение по умолчанию `16`.
- `lifetime` - интервал перезагрузки функций в секундах. Если задан `0`, то функция не перезагружается.
- `send_chunk_header` - управляет отправкой количества строк перед отправкой блока данных для обработки. Необязательная настройка. Значение по умолчанию `false`.
- `execute_direct` - Если `execute_direct` = `1`, то будет произведен поиск `command` в папке user_scripts. Дополнительные аргументы скрипта можно указать с помощью разделителя пробелов. Пример: `script_name arg1 arg2`. Если `execute_direct` = `0`, `command` передается как аргумент для `bin/sh -c`. Значение по умолчанию `1`. Необязательный параметр.
- `lifetime` - интервал перезагрузки функций в секундах. Если задан `0`, то функция не перезагружается.
Команда должна читать аргументы из `STDIN` и выводить результат в `STDOUT`. Обработка должна выполняться в цикле. То есть после обработки группы аргументов команда должна ожидать следующую группу.
**Пример**
XML конфигурация, описывающая функцию `test_function`:
```
Создание `test_function` с использованием конфигурации XML.
Файл test_function.xml.
```xml
<functions>
<function>
<type>executable</type>
<name>test_function</name>
<name>test_function_python</name>
<return_type>String</return_type>
<argument>
<type>UInt64</type>
<name>value</name>
</argument>
<format>TabSeparated</format>
<command>test_function.py</command>
</function>
</functions>
```
Файл скрипта внутри папки `user_scripts` `test_function.py`.
```python
#!/usr/bin/python3
import sys
if __name__ == '__main__':
for line in sys.stdin:
print("Value " + line, end='')
sys.stdout.flush()
```
Запрос:
``` sql
SELECT test_function_python(toUInt64(2));
```
Результат:
``` text
┌─test_function_python(2)─┐
│ Value 2 │
└─────────────────────────┘
```
Создание `test_function_sum`, указав для `execute_direct` значение `0`, используя конфигурацию XML.
File test_function.xml.
```xml
<functions>
<function>
<type>executable</type>
<name>test_function_sum</name>
<return_type>UInt64</return_type>
<argument>
<type>UInt64</type>
<name>lhs</name>
</argument>
<argument>
<type>UInt64</type>
<name>rhs</name>
</argument>
<format>TabSeparated</format>
<command>cd /; clickhouse-local --input-format TabSeparated --output-format TabSeparated --structure 'x UInt64, y UInt64' --query "SELECT x + y FROM table"</command>
<lifetime>0</lifetime>
<execute_direct>0</execute_direct>
</function>
</functions>
```
@ -110,15 +162,68 @@ XML конфигурация, описывающая функцию `test_functi
Запрос:
``` sql
SELECT test_function(toUInt64(2), toUInt64(2));
SELECT test_function_sum(2, 2);
```
Результат:
``` text
┌─test_function(toUInt64(2), toUInt64(2))─┐
│ 4 │
└─────────────────────────────────────────┘
┌─test_function_sum(2, 2)─┐
│ 4 │
└─────────────────────────┘
```
Создание `test_function_sum_json` с именноваными аргументами и форматом [JSONEachRow](../../interfaces/formats.md#jsoneachrow) с использованием конфигурации XML.
Файл test_function.xml.
```xml
<function>
<type>executable</type>
<name>test_function_sum_json</name>
<return_type>UInt64</return_type>
<return_name>result_name</return_name>
<argument>
<type>UInt64</type>
<name>argument_1</name>
</argument>
<argument>
<type>UInt64</type>
<name>argument_2</name>
</argument>
<format>JSONEachRow</format>
<command>test_function_sum_json.py</command>
</function>
```
Файл скрипта внутри папки `user_scripts` `test_function_sum_json.py`.
```python
#!/usr/bin/python3
import sys
import json
if __name__ == '__main__':
for line in sys.stdin:
value = json.loads(line)
first_arg = int(value['argument_1'])
second_arg = int(value['argument_2'])
result = {'result_name': first_arg + second_arg}
print(json.dumps(result), end='\n')
sys.stdout.flush()
```
Запрос:
``` sql
SELECT test_function_sum_json(2, 2);
```
Результат:
``` text
┌─test_function_sum_json(2, 2)─┐
│ 4 │
└──────────────────────────────┘
```
## Обработка ошибок {#obrabotka-oshibok}

View File

@ -83,6 +83,10 @@ ExternalLoader::LoadablePtr ExternalUserDefinedExecutableFunctionsLoader::create
String format = config.getString(key_in_config + ".format");
DataTypePtr result_type = DataTypeFactory::instance().get(config.getString(key_in_config + ".return_type"));
String result_name = "result";
if (config.has(key_in_config + ".return_name"))
result_name = config.getString(key_in_config + ".return_name");
bool send_chunk_header = config.getBool(key_in_config + ".send_chunk_header", false);
size_t command_termination_timeout_seconds = config.getUInt64(key_in_config + ".command_termination_timeout", 10);
size_t command_read_timeout_milliseconds = config.getUInt64(key_in_config + ".command_read_timeout", 10000);
@ -106,33 +110,46 @@ ExternalLoader::LoadablePtr ExternalUserDefinedExecutableFunctionsLoader::create
if (config.has(key_in_config + ".lifetime"))
lifetime = ExternalLoadableLifetime(config, key_in_config + ".lifetime");
std::vector<DataTypePtr> argument_types;
std::vector<UserDefinedExecutableFunctionArgument> arguments;
Poco::Util::AbstractConfiguration::Keys config_elems;
config.keys(key_in_config, config_elems);
size_t argument_number = 1;
for (const auto & config_elem : config_elems)
{
if (!startsWith(config_elem, "argument"))
continue;
UserDefinedExecutableFunctionArgument argument;
const auto argument_prefix = key_in_config + '.' + config_elem + '.';
auto argument_type = DataTypeFactory::instance().get(config.getString(argument_prefix + "type"));
argument_types.emplace_back(std::move(argument_type));
argument.type = DataTypeFactory::instance().get(config.getString(argument_prefix + "type"));
if (config.has(argument_prefix + "name"))
argument.name = config.getString(argument_prefix + "name");
else
argument.name = "c" + std::to_string(argument_number);
++argument_number;
arguments.emplace_back(std::move(argument));
}
UserDefinedExecutableFunctionConfiguration function_configuration
{
.name = std::move(name), //-V1030
.command = std::move(command_value), //-V1030
.command_arguments = std::move(command_arguments), //-V1030
.argument_types = std::move(argument_types), //-V1030
.result_type = std::move(result_type), //-V1030
.name = std::move(name),
.command = std::move(command_value),
.command_arguments = std::move(command_arguments),
.arguments = std::move(arguments),
.result_type = std::move(result_type),
.result_name = std::move(result_name),
};
ShellCommandSourceCoordinator::Configuration shell_command_coordinator_configration
{
.format = std::move(format), //-V1030
.format = std::move(format),
.command_termination_timeout_seconds = command_termination_timeout_seconds,
.command_read_timeout_milliseconds = command_read_timeout_milliseconds,
.command_write_timeout_milliseconds = command_write_timeout_milliseconds,

View File

@ -10,13 +10,20 @@
namespace DB
{
struct UserDefinedExecutableFunctionArgument
{
DataTypePtr type;
String name;
};
struct UserDefinedExecutableFunctionConfiguration
{
std::string name;
std::string command;
std::vector<std::string> command_arguments;
std::vector<DataTypePtr> argument_types;
std::vector<UserDefinedExecutableFunctionArgument> arguments;
DataTypePtr result_type;
String result_name;
};
class UserDefinedExecutableFunction final : public IExternalLoadable

View File

@ -42,7 +42,7 @@ public:
bool isVariadic() const override { return false; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; }
size_t getNumberOfArguments() const override { return executable_function->getConfiguration().argument_types.size(); }
size_t getNumberOfArguments() const override { return executable_function->getConfiguration().arguments.size(); }
bool useDefaultImplementationForConstants() const override { return true; }
bool useDefaultImplementationForNulls() const override { return true; }
@ -90,7 +90,11 @@ public:
auto & column_with_type = arguments_copy[i];
column_with_type.column = column_with_type.column->convertToFullColumnIfConst();
const auto & argument_type = configuration.argument_types[i];
const auto & argument = configuration.arguments[i];
column_with_type.name = argument.name;
const auto & argument_type = argument.type;
if (areTypesEqual(arguments_copy[i].type, argument_type))
continue;
@ -101,7 +105,7 @@ public:
column_with_type = std::move(column_to_cast);
}
ColumnWithTypeAndName result(result_type, "result");
ColumnWithTypeAndName result(result_type, configuration.result_name);
Block result_block({result});
Block arguments_block(arguments_copy);

View File

@ -193,4 +193,100 @@
<execute_direct>0</execute_direct>
</function>
<function>
<type>executable</type>
<name>test_function_sum_json_unnamed_args_python</name>
<return_type>UInt64</return_type>
<return_name>result_name</return_name>
<argument>
<type>UInt64</type>
</argument>
<argument>
<type>UInt64</type>
</argument>
<format>JSONEachRow</format>
<command>input_sum_json_unnamed_args.py</command>
</function>
<function>
<type>executable_pool</type>
<name>test_function_sum_json_unnamed_args_pool_python</name>
<return_type>UInt64</return_type>
<return_name>result_name</return_name>
<argument>
<type>UInt64</type>
</argument>
<argument>
<type>UInt64</type>
</argument>
<format>JSONEachRow</format>
<command>input_sum_json_unnamed_args.py</command>
</function>
<function>
<type>executable</type>
<name>test_function_sum_json_partially_named_args_python</name>
<return_type>UInt64</return_type>
<return_name>result_name</return_name>
<argument>
<type>UInt64</type>
<name>argument_1</name>
</argument>
<argument>
<type>UInt64</type>
</argument>
<format>JSONEachRow</format>
<command>input_sum_json_partially_named_args.py</command>
</function>
<function>
<type>executable_pool</type>
<name>test_function_sum_json_partially_named_args_pool_python</name>
<return_type>UInt64</return_type>
<return_name>result_name</return_name>
<argument>
<type>UInt64</type>
<name>argument_1</name>
</argument>
<argument>
<type>UInt64</type>
</argument>
<format>JSONEachRow</format>
<command>input_sum_json_partially_named_args.py</command>
</function>
<function>
<type>executable</type>
<name>test_function_sum_json_named_args_python</name>
<return_type>UInt64</return_type>
<return_name>result_name</return_name>
<argument>
<type>UInt64</type>
<name>argument_1</name>
</argument>
<argument>
<type>UInt64</type>
<name>argument_2</name>
</argument>
<format>JSONEachRow</format>
<command>input_sum_json_named_args.py</command>
</function>
<function>
<type>executable_pool</type>
<name>test_function_sum_json_named_args_pool_python</name>
<return_type>UInt64</return_type>
<return_name>result_name</return_name>
<argument>
<type>UInt64</type>
<name>argument_1</name>
</argument>
<argument>
<type>UInt64</type>
<name>argument_2</name>
</argument>
<format>JSONEachRow</format>
<command>input_sum_json_named_args.py</command>
</function>
</functions>

View File

@ -104,3 +104,30 @@ def test_executable_function_non_direct_bash(started_cluster):
assert node.query("SELECT test_function_non_direct_pool_bash(toUInt64(1))") == 'Key 1\n'
assert node.query("SELECT test_function_non_direct_pool_bash(1)") == 'Key 1\n'
def test_executable_function_sum_json_python(started_cluster):
skip_test_msan(node)
node.query("CREATE TABLE test_table (lhs UInt64, rhs UInt64) ENGINE=TinyLog;")
node.query("INSERT INTO test_table VALUES (0, 0), (1, 1), (2, 2);")
assert node.query("SELECT test_function_sum_json_unnamed_args_python(1, 2);") == '3\n'
assert node.query("SELECT test_function_sum_json_unnamed_args_python(lhs, rhs) FROM test_table;") == '0\n2\n4\n'
assert node.query("SELECT test_function_sum_json_partially_named_args_python(1, 2);") == '3\n'
assert node.query("SELECT test_function_sum_json_partially_named_args_python(lhs, rhs) FROM test_table;") == '0\n2\n4\n'
assert node.query("SELECT test_function_sum_json_named_args_python(1, 2);") == '3\n'
assert node.query("SELECT test_function_sum_json_named_args_python(lhs, rhs) FROM test_table;") == '0\n2\n4\n'
assert node.query("SELECT test_function_sum_json_unnamed_args_pool_python(1, 2);") == '3\n'
assert node.query("SELECT test_function_sum_json_unnamed_args_pool_python(lhs, rhs) FROM test_table;") == '0\n2\n4\n'
assert node.query("SELECT test_function_sum_json_partially_named_args_python(1, 2);") == '3\n'
assert node.query("SELECT test_function_sum_json_partially_named_args_python(lhs, rhs) FROM test_table;") == '0\n2\n4\n'
assert node.query("SELECT test_function_sum_json_named_args_pool_python(1, 2);") == '3\n'
assert node.query("SELECT test_function_sum_json_named_args_pool_python(lhs, rhs) FROM test_table;") == '0\n2\n4\n'
node.query("DROP TABLE test_table;")

View File

@ -0,0 +1,13 @@
#!/usr/bin/python3
import sys
import json
if __name__ == '__main__':
for line in sys.stdin:
value = json.loads(line)
first_arg = int(value['argument_1'])
second_arg = int(value['argument_2'])
result = {'result_name': first_arg + second_arg}
print(json.dumps(result), end='\n')
sys.stdout.flush()

View File

@ -0,0 +1,13 @@
#!/usr/bin/python3
import sys
import json
if __name__ == '__main__':
for line in sys.stdin:
value = json.loads(line)
first_arg = int(value['argument_1'])
second_arg = int(value['c2'])
result = {'result_name': first_arg + second_arg}
print(json.dumps(result), end='\n')
sys.stdout.flush()

View File

@ -0,0 +1,13 @@
#!/usr/bin/python3
import sys
import json
if __name__ == '__main__':
for line in sys.stdin:
value = json.loads(line)
first_arg = int(value['c1'])
second_arg = int(value['c2'])
result = {'result_name': first_arg + second_arg}
print(json.dumps(result), end='\n')
sys.stdout.flush()