From d3e507b9ef935370b8bb5c8f9571abb9510824d1 Mon Sep 17 00:00:00 2001 From: Maksim Kita Date: Thu, 23 Dec 2021 13:01:44 +0300 Subject: [PATCH] Added executable function integrations tests --- .../ExecutableDictionarySource.cpp | 7 ++- .../ExecutablePoolDictionarySource.cpp | 3 +- src/Processors/Sources/ShellCommandSource.cpp | 3 +- tests/config/executable_pool_dictionary.xml | 5 +- .../__init__.py | 0 ...ecutable_user_defined_functions_config.xml | 2 + .../functions/test_function_config.xml | 59 ++++++++++++++++++ .../functions/test_function_config2.xml | 14 +++++ .../test.py | 61 +++++++++++++++++++ .../user_scripts/test_input.py | 7 +++ .../user_scripts/test_input.sh | 5 ++ .../user_scripts/test_input_sum.py | 9 +++ .../functions/test_function_config.xml | 3 +- .../functions/test_function_config2.xml | 3 +- .../test.py | 2 + .../user_scripts/test_input_1.sh | 5 ++ .../user_scripts/test_input_2.sh | 5 ++ 17 files changed, 184 insertions(+), 9 deletions(-) create mode 100644 tests/integration/test_executable_user_defined_function/__init__.py create mode 100644 tests/integration/test_executable_user_defined_function/config/executable_user_defined_functions_config.xml create mode 100644 tests/integration/test_executable_user_defined_function/functions/test_function_config.xml create mode 100644 tests/integration/test_executable_user_defined_function/functions/test_function_config2.xml create mode 100644 tests/integration/test_executable_user_defined_function/test.py create mode 100755 tests/integration/test_executable_user_defined_function/user_scripts/test_input.py create mode 100755 tests/integration/test_executable_user_defined_function/user_scripts/test_input.sh create mode 100755 tests/integration/test_executable_user_defined_function/user_scripts/test_input_sum.py create mode 100755 tests/integration/test_executable_user_defined_functions_config_reload/user_scripts/test_input_1.sh create mode 100755 tests/integration/test_executable_user_defined_functions_config_reload/user_scripts/test_input_2.sh diff --git a/src/Dictionaries/ExecutableDictionarySource.cpp b/src/Dictionaries/ExecutableDictionarySource.cpp index a4837027508..4f210d2e040 100644 --- a/src/Dictionaries/ExecutableDictionarySource.cpp +++ b/src/Dictionaries/ExecutableDictionarySource.cpp @@ -116,7 +116,12 @@ Pipe ExecutableDictionarySource::getStreamForBlock(const Block & block) Pipes shell_input_pipes; shell_input_pipes.emplace_back(std::move(shell_input_pipe)); - return coordinator->createPipe(configuration.command, std::move(shell_input_pipes), sample_block, context); + auto pipe = coordinator->createPipe(configuration.command, std::move(shell_input_pipes), sample_block, context); + + if (configuration.implicit_key) + pipe.addTransform(std::make_shared(block, pipe.getHeader())); + + return pipe; } bool ExecutableDictionarySource::isModified() const diff --git a/src/Dictionaries/ExecutablePoolDictionarySource.cpp b/src/Dictionaries/ExecutablePoolDictionarySource.cpp index 0208791a325..568a11fec8b 100644 --- a/src/Dictionaries/ExecutablePoolDictionarySource.cpp +++ b/src/Dictionaries/ExecutablePoolDictionarySource.cpp @@ -103,7 +103,8 @@ Pipe ExecutablePoolDictionarySource::getStreamForBlock(const Block & block) Pipes shell_input_pipes; shell_input_pipes.emplace_back(std::move(shell_input_pipe)); - return coordinator->createPipe(configuration.command, std::move(shell_input_pipes), std::move(sample_block), context, command_configuration); + auto pipe = coordinator->createPipe(configuration.command, std::move(shell_input_pipes), sample_block, context, command_configuration); + return pipe; } bool ExecutablePoolDictionarySource::isModified() const diff --git a/src/Processors/Sources/ShellCommandSource.cpp b/src/Processors/Sources/ShellCommandSource.cpp index 21d07ecf126..111ea367bea 100644 --- a/src/Processors/Sources/ShellCommandSource.cpp +++ b/src/Processors/Sources/ShellCommandSource.cpp @@ -104,8 +104,9 @@ namespace { rethrowExceptionDuringSendDataIfNeeded(); - if (configuration.read_fixed_number_of_rows && configuration.number_of_rows_to_read >= current_read_rows) + if (configuration.read_fixed_number_of_rows && current_read_rows >= configuration.number_of_rows_to_read) { return {}; + } Chunk chunk; diff --git a/tests/config/executable_pool_dictionary.xml b/tests/config/executable_pool_dictionary.xml index 13f34f0048e..212552a6776 100644 --- a/tests/config/executable_pool_dictionary.xml +++ b/tests/config/executable_pool_dictionary.xml @@ -61,10 +61,11 @@ - + TabSeparated while read read_data; do printf "$read_data\tvalue a\tvalue b\n"; done - + 5 + diff --git a/tests/integration/test_executable_user_defined_function/__init__.py b/tests/integration/test_executable_user_defined_function/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/tests/integration/test_executable_user_defined_function/config/executable_user_defined_functions_config.xml b/tests/integration/test_executable_user_defined_function/config/executable_user_defined_functions_config.xml new file mode 100644 index 00000000000..3cbf717bb67 --- /dev/null +++ b/tests/integration/test_executable_user_defined_function/config/executable_user_defined_functions_config.xml @@ -0,0 +1,2 @@ + + diff --git a/tests/integration/test_executable_user_defined_function/functions/test_function_config.xml b/tests/integration/test_executable_user_defined_function/functions/test_function_config.xml new file mode 100644 index 00000000000..18a1c055238 --- /dev/null +++ b/tests/integration/test_executable_user_defined_function/functions/test_function_config.xml @@ -0,0 +1,59 @@ + + + executable + test_function_bash + String + + UInt64 + + TabSeparated + test_input.sh + + + + executable_pool + test_function_pool_bash + String + + UInt64 + + TabSeparated + test_input.sh + + + + executable + test_function_python + String + + UInt64 + + TabSeparated + test_input.py + + + + executable + test_function_sum_python + String + + UInt64 + + + UInt64 + + TabSeparated + test_input_sum.py + + + + executable_pool + test_function_pool_python + String + + UInt64 + + TabSeparated + test_input.py + + diff --git a/tests/integration/test_executable_user_defined_function/functions/test_function_config2.xml b/tests/integration/test_executable_user_defined_function/functions/test_function_config2.xml new file mode 100644 index 00000000000..fe02146a6b8 --- /dev/null +++ b/tests/integration/test_executable_user_defined_function/functions/test_function_config2.xml @@ -0,0 +1,14 @@ + + + executable + test_function_2 + String + + UInt64 + + TabSeparated + while read read_data; do printf "Key_2 $read_data\n"; done + 0 + + + diff --git a/tests/integration/test_executable_user_defined_function/test.py b/tests/integration/test_executable_user_defined_function/test.py new file mode 100644 index 00000000000..9872668a901 --- /dev/null +++ b/tests/integration/test_executable_user_defined_function/test.py @@ -0,0 +1,61 @@ +import os +import sys +import time + +import pytest + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) + +from helpers.cluster import ClickHouseCluster + +cluster = ClickHouseCluster(__file__) +node = cluster.add_instance('node', stay_alive=True, main_configs=[]) + + +def skip_test_msan(instance): + if instance.is_built_with_memory_sanitizer(): + pytest.skip("Memory Sanitizer cannot work with vfork") + +def copy_file_to_container(local_path, dist_path, container_id): + os.system("docker cp {local} {cont_id}:{dist}".format(local=local_path, cont_id=container_id, dist=dist_path)) + +config = ''' + /etc/clickhouse-server/functions/test_function_config.xml +''' + +@pytest.fixture(scope="module") +def started_cluster(): + try: + cluster.start() + + node.replace_config("/etc/clickhouse-server/config.d/executable_user_defined_functions_config.xml", config) + + copy_file_to_container(os.path.join(SCRIPT_DIR, 'functions/.'), '/etc/clickhouse-server/functions', node.docker_id) + copy_file_to_container(os.path.join(SCRIPT_DIR, 'user_scripts/.'), '/var/lib/clickhouse/user_scripts', node.docker_id) + + node.restart_clickhouse() + + yield cluster + + finally: + cluster.shutdown() + +def test_executable_function_bash(started_cluster): + skip_test_msan(node) + assert node.query("SELECT test_function_bash(toUInt64(1))") == 'Key 1\n' + assert node.query("SELECT test_function_bash(1)") == 'Key 1\n' + + assert node.query("SELECT test_function_pool_bash(toUInt64(1))") == 'Key 1\n' + assert node.query("SELECT test_function_pool_bash(1)") == 'Key 1\n' + +def test_executable_function_python(started_cluster): + skip_test_msan(node) + assert node.query("SELECT test_function_python(toUInt64(1))") == 'Key 1\n' + assert node.query("SELECT test_function_python(1)") == 'Key 1\n' + + assert node.query("SELECT test_function_sum_python(toUInt64(1), toUInt64(1))") == '2\n' + assert node.query("SELECT test_function_sum_python(1, 1)") == '2\n' + + # assert node.query("SELECT test_function_pool_python(toUInt64(1))") == 'Key 1\n' + # assert node.query("SELECT test_function_pool_python(1)") == 'Key 1\n' diff --git a/tests/integration/test_executable_user_defined_function/user_scripts/test_input.py b/tests/integration/test_executable_user_defined_function/user_scripts/test_input.py new file mode 100755 index 00000000000..bdf95a9d3c4 --- /dev/null +++ b/tests/integration/test_executable_user_defined_function/user_scripts/test_input.py @@ -0,0 +1,7 @@ +#!/usr/bin/python3 + +import sys + +if __name__ == '__main__': + for line in sys.stdin: + print("Key " + line, end='') diff --git a/tests/integration/test_executable_user_defined_function/user_scripts/test_input.sh b/tests/integration/test_executable_user_defined_function/user_scripts/test_input.sh new file mode 100755 index 00000000000..aea51b82b1f --- /dev/null +++ b/tests/integration/test_executable_user_defined_function/user_scripts/test_input.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +while read read_data; + do printf "Key $read_data\n"; +done diff --git a/tests/integration/test_executable_user_defined_function/user_scripts/test_input_sum.py b/tests/integration/test_executable_user_defined_function/user_scripts/test_input_sum.py new file mode 100755 index 00000000000..a3580850f57 --- /dev/null +++ b/tests/integration/test_executable_user_defined_function/user_scripts/test_input_sum.py @@ -0,0 +1,9 @@ +#!/usr/bin/python3 + +import sys +import re + +if __name__ == '__main__': + for line in sys.stdin: + line_split = re.split(r'\t+', line) + print(int(line_split[0]) + int(line_split[1]), end='') diff --git a/tests/integration/test_executable_user_defined_functions_config_reload/functions/test_function_config.xml b/tests/integration/test_executable_user_defined_functions_config_reload/functions/test_function_config.xml index f2a7d6e67b1..d0bd6e5ab88 100644 --- a/tests/integration/test_executable_user_defined_functions_config_reload/functions/test_function_config.xml +++ b/tests/integration/test_executable_user_defined_functions_config_reload/functions/test_function_config.xml @@ -7,8 +7,7 @@ UInt64 TabSeparated - while read read_data; do printf "Key_1 $read_data\n"; done - 0 + test_input_1.sh diff --git a/tests/integration/test_executable_user_defined_functions_config_reload/functions/test_function_config2.xml b/tests/integration/test_executable_user_defined_functions_config_reload/functions/test_function_config2.xml index fe02146a6b8..80ae21a086d 100644 --- a/tests/integration/test_executable_user_defined_functions_config_reload/functions/test_function_config2.xml +++ b/tests/integration/test_executable_user_defined_functions_config_reload/functions/test_function_config2.xml @@ -7,8 +7,7 @@ UInt64 TabSeparated - while read read_data; do printf "Key_2 $read_data\n"; done - 0 + test_input_2.sh diff --git a/tests/integration/test_executable_user_defined_functions_config_reload/test.py b/tests/integration/test_executable_user_defined_functions_config_reload/test.py index 3117b3e72b1..629c426a28c 100644 --- a/tests/integration/test_executable_user_defined_functions_config_reload/test.py +++ b/tests/integration/test_executable_user_defined_functions_config_reload/test.py @@ -28,6 +28,8 @@ def started_cluster(): cluster.start() copy_file_to_container(os.path.join(SCRIPT_DIR, 'functions/.'), '/etc/clickhouse-server/functions', node.docker_id) + copy_file_to_container(os.path.join(SCRIPT_DIR, 'user_scripts/.'), '/var/lib/clickhouse/user_scripts', node.docker_id) + node.restart_clickhouse() yield cluster diff --git a/tests/integration/test_executable_user_defined_functions_config_reload/user_scripts/test_input_1.sh b/tests/integration/test_executable_user_defined_functions_config_reload/user_scripts/test_input_1.sh new file mode 100755 index 00000000000..a6cffe83bba --- /dev/null +++ b/tests/integration/test_executable_user_defined_functions_config_reload/user_scripts/test_input_1.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +while read read_data; + do printf "Key_1 $read_data\n"; +done diff --git a/tests/integration/test_executable_user_defined_functions_config_reload/user_scripts/test_input_2.sh b/tests/integration/test_executable_user_defined_functions_config_reload/user_scripts/test_input_2.sh new file mode 100755 index 00000000000..a673cfd18fb --- /dev/null +++ b/tests/integration/test_executable_user_defined_functions_config_reload/user_scripts/test_input_2.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +while read read_data; + do printf "Key_2 $read_data\n"; +done