Added executable function integrations tests

This commit is contained in:
Maksim Kita 2021-12-23 13:01:44 +03:00
parent 2ffd83ee83
commit d3e507b9ef
17 changed files with 184 additions and 9 deletions

View File

@ -116,7 +116,12 @@ Pipe ExecutableDictionarySource::getStreamForBlock(const Block & block)
Pipes shell_input_pipes; Pipes shell_input_pipes;
shell_input_pipes.emplace_back(std::move(shell_input_pipe)); shell_input_pipes.emplace_back(std::move(shell_input_pipe));
return coordinator->createPipe(configuration.command, std::move(shell_input_pipes), sample_block, context); auto pipe = coordinator->createPipe(configuration.command, std::move(shell_input_pipes), sample_block, context);
if (configuration.implicit_key)
pipe.addTransform(std::make_shared<TransformWithAdditionalColumns>(block, pipe.getHeader()));
return pipe;
} }
bool ExecutableDictionarySource::isModified() const bool ExecutableDictionarySource::isModified() const

View File

@ -103,7 +103,8 @@ Pipe ExecutablePoolDictionarySource::getStreamForBlock(const Block & block)
Pipes shell_input_pipes; Pipes shell_input_pipes;
shell_input_pipes.emplace_back(std::move(shell_input_pipe)); shell_input_pipes.emplace_back(std::move(shell_input_pipe));
return coordinator->createPipe(configuration.command, std::move(shell_input_pipes), std::move(sample_block), context, command_configuration); auto pipe = coordinator->createPipe(configuration.command, std::move(shell_input_pipes), sample_block, context, command_configuration);
return pipe;
} }
bool ExecutablePoolDictionarySource::isModified() const bool ExecutablePoolDictionarySource::isModified() const

View File

@ -104,8 +104,9 @@ namespace
{ {
rethrowExceptionDuringSendDataIfNeeded(); rethrowExceptionDuringSendDataIfNeeded();
if (configuration.read_fixed_number_of_rows && configuration.number_of_rows_to_read >= current_read_rows) if (configuration.read_fixed_number_of_rows && current_read_rows >= configuration.number_of_rows_to_read) {
return {}; return {};
}
Chunk chunk; Chunk chunk;

View File

@ -61,10 +61,11 @@
</structure> </structure>
<source> <source>
<executable> <executable_pool>
<format>TabSeparated</format> <format>TabSeparated</format>
<command>while read read_data; do printf "$read_data\tvalue a\tvalue b\n"; done</command> <command>while read read_data; do printf "$read_data\tvalue a\tvalue b\n"; done</command>
</executable> <size>5</size>
</executable_pool>
</source> </source>
<layout> <layout>

View File

@ -0,0 +1,59 @@
<functions>
<function>
<type>executable</type>
<name>test_function_bash</name>
<return_type>String</return_type>
<argument>
<type>UInt64</type>
</argument>
<format>TabSeparated</format>
<command>test_input.sh</command>
</function>
<function>
<type>executable_pool</type>
<name>test_function_pool_bash</name>
<return_type>String</return_type>
<argument>
<type>UInt64</type>
</argument>
<format>TabSeparated</format>
<command>test_input.sh</command>
</function>
<function>
<type>executable</type>
<name>test_function_python</name>
<return_type>String</return_type>
<argument>
<type>UInt64</type>
</argument>
<format>TabSeparated</format>
<command>test_input.py</command>
</function>
<function>
<type>executable</type>
<name>test_function_sum_python</name>
<return_type>String</return_type>
<argument>
<type>UInt64</type>
</argument>
<argument>
<type>UInt64</type>
</argument>
<format>TabSeparated</format>
<command>test_input_sum.py</command>
</function>
<function>
<type>executable_pool</type>
<name>test_function_pool_python</name>
<return_type>String</return_type>
<argument>
<type>UInt64</type>
</argument>
<format>TabSeparated</format>
<command>test_input.py</command>
</function>
</functions>

View File

@ -0,0 +1,14 @@
<functions>
<function>
<type>executable</type>
<name>test_function_2</name>
<return_type>String</return_type>
<argument>
<type>UInt64</type>
</argument>
<format>TabSeparated</format>
<command>while read read_data; do printf "Key_2 $read_data\n"; done</command>
<lifetime>0</lifetime>
</function>
</functions>

View File

@ -0,0 +1,61 @@
import os
import sys
import time
import pytest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance('node', stay_alive=True, main_configs=[])
def skip_test_msan(instance):
if instance.is_built_with_memory_sanitizer():
pytest.skip("Memory Sanitizer cannot work with vfork")
def copy_file_to_container(local_path, dist_path, container_id):
os.system("docker cp {local} {cont_id}:{dist}".format(local=local_path, cont_id=container_id, dist=dist_path))
config = '''<clickhouse>
<user_defined_executable_functions_config>/etc/clickhouse-server/functions/test_function_config.xml</user_defined_executable_functions_config>
</clickhouse>'''
@pytest.fixture(scope="module")
def started_cluster():
try:
cluster.start()
node.replace_config("/etc/clickhouse-server/config.d/executable_user_defined_functions_config.xml", config)
copy_file_to_container(os.path.join(SCRIPT_DIR, 'functions/.'), '/etc/clickhouse-server/functions', node.docker_id)
copy_file_to_container(os.path.join(SCRIPT_DIR, 'user_scripts/.'), '/var/lib/clickhouse/user_scripts', node.docker_id)
node.restart_clickhouse()
yield cluster
finally:
cluster.shutdown()
def test_executable_function_bash(started_cluster):
skip_test_msan(node)
assert node.query("SELECT test_function_bash(toUInt64(1))") == 'Key 1\n'
assert node.query("SELECT test_function_bash(1)") == 'Key 1\n'
assert node.query("SELECT test_function_pool_bash(toUInt64(1))") == 'Key 1\n'
assert node.query("SELECT test_function_pool_bash(1)") == 'Key 1\n'
def test_executable_function_python(started_cluster):
skip_test_msan(node)
assert node.query("SELECT test_function_python(toUInt64(1))") == 'Key 1\n'
assert node.query("SELECT test_function_python(1)") == 'Key 1\n'
assert node.query("SELECT test_function_sum_python(toUInt64(1), toUInt64(1))") == '2\n'
assert node.query("SELECT test_function_sum_python(1, 1)") == '2\n'
# assert node.query("SELECT test_function_pool_python(toUInt64(1))") == 'Key 1\n'
# assert node.query("SELECT test_function_pool_python(1)") == 'Key 1\n'

View File

@ -0,0 +1,7 @@
#!/usr/bin/python3
import sys
if __name__ == '__main__':
for line in sys.stdin:
print("Key " + line, end='')

View File

@ -0,0 +1,5 @@
#!/bin/bash
while read read_data;
do printf "Key $read_data\n";
done

View File

@ -0,0 +1,9 @@
#!/usr/bin/python3
import sys
import re
if __name__ == '__main__':
for line in sys.stdin:
line_split = re.split(r'\t+', line)
print(int(line_split[0]) + int(line_split[1]), end='')

View File

@ -7,8 +7,7 @@
<type>UInt64</type> <type>UInt64</type>
</argument> </argument>
<format>TabSeparated</format> <format>TabSeparated</format>
<command>while read read_data; do printf "Key_1 $read_data\n"; done</command> <command>test_input_1.sh</command>
<lifetime>0</lifetime>
</function> </function>
</functions> </functions>

View File

@ -7,8 +7,7 @@
<type>UInt64</type> <type>UInt64</type>
</argument> </argument>
<format>TabSeparated</format> <format>TabSeparated</format>
<command>while read read_data; do printf "Key_2 $read_data\n"; done</command> <command>test_input_2.sh</command>
<lifetime>0</lifetime>
</function> </function>
</functions> </functions>

View File

@ -28,6 +28,8 @@ def started_cluster():
cluster.start() cluster.start()
copy_file_to_container(os.path.join(SCRIPT_DIR, 'functions/.'), '/etc/clickhouse-server/functions', node.docker_id) copy_file_to_container(os.path.join(SCRIPT_DIR, 'functions/.'), '/etc/clickhouse-server/functions', node.docker_id)
copy_file_to_container(os.path.join(SCRIPT_DIR, 'user_scripts/.'), '/var/lib/clickhouse/user_scripts', node.docker_id)
node.restart_clickhouse() node.restart_clickhouse()
yield cluster yield cluster

View File

@ -0,0 +1,5 @@
#!/bin/bash
while read read_data;
do printf "Key_1 $read_data\n";
done

View File

@ -0,0 +1,5 @@
#!/bin/bash
while read read_data;
do printf "Key_2 $read_data\n";
done