Updated test_executable_table_function tests

This commit is contained in:
Maksim Kita 2021-12-28 00:19:28 +03:00
parent 61b36f22b8
commit 3386378050
4 changed files with 131 additions and 105 deletions

View File

@ -568,8 +568,18 @@ Pipe ShellCommandCoordinator::createPipe(
tasks.emplace_back(std::move(task)); tasks.emplace_back(std::move(task));
} }
Pipe pipe(std::make_unique<ShellCommandSource>( auto source = std::make_unique<ShellCommandSource>(
context, configuration.format, configuration.command_read_timeout_milliseconds, std::move(sample_block), std::move(process), std::move(tasks), source_configuration, std::move(process_holder), process_pool)); context,
configuration.format,
configuration.command_read_timeout_milliseconds,
std::move(sample_block),
std::move(process),
std::move(tasks),
source_configuration,
std::move(process_holder),
process_pool);
auto pipe = Pipe(std::move(source));
return pipe; return pipe;
} }

View File

@ -18,6 +18,7 @@
#include <Processors/ISimpleTransform.h> #include <Processors/ISimpleTransform.h>
#include <Processors/Executors/CompletedPipelineExecutor.h> #include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/Formats/IOutputFormat.h> #include <Processors/Formats/IOutputFormat.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Interpreters/Context.h> #include <Interpreters/Context.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h> #include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Interpreters/evaluateConstantExpression.h> #include <Interpreters/evaluateConstantExpression.h>
@ -103,6 +104,38 @@ Pipe StorageExecutable::read(
inputs.emplace_back(QueryPipelineBuilder::getPipe(interpreter.buildQueryPipeline())); inputs.emplace_back(QueryPipelineBuilder::getPipe(interpreter.buildQueryPipeline()));
} }
if (settings.is_executable_pool)
{
/// For executable pool we read data from input streams and convert it to single blocks streams.
size_t inputs_size = inputs.size();
for (size_t i = 0; i < inputs_size; ++i)
{
auto && input = inputs[i];
QueryPipeline input_pipeline(std::move(input));
PullingPipelineExecutor input_pipeline_executor(input_pipeline);
auto header = input_pipeline_executor.getHeader();
auto result_block = header.cloneEmpty();
size_t result_block_columns = result_block.columns();
Block result;
while (input_pipeline_executor.pull(result))
{
for (size_t result_block_index = 0; result_block_index < result_block_columns; ++result_block_index)
{
auto & block_column = result.safeGetByPosition(result_block_index);
auto & result_block_column = result_block.safeGetByPosition(result_block_index);
result_block_column.column->assumeMutable()->insertRangeFrom(*block_column.column, 0, block_column.column->size());
}
}
auto source = std::make_shared<SourceFromSingleChunk>(std::move(result_block));
inputs[i] = Pipe(std::move(source));
}
}
auto sample_block = metadata_snapshot->getSampleBlock(); auto sample_block = metadata_snapshot->getSampleBlock();
ShellCommandSourceConfiguration configuration; ShellCommandSourceConfiguration configuration;

View File

@ -1,6 +1,5 @@
import os import os
import sys import sys
import time
import pytest import pytest
@ -211,104 +210,104 @@ def test_executable_storage_input_slow_python(started_cluster):
assert node.query_and_get_error("SELECT * FROM test_table") assert node.query_and_get_error("SELECT * FROM test_table")
node.query("DROP TABLE test_table") node.query("DROP TABLE test_table")
# def test_executable_pool_storage_input_python(started_cluster): def test_executable_function_input_multiple_pipes_python(started_cluster):
# skip_test_msan(node) skip_test_msan(node)
# query = "CREATE TABLE test_table (value String) ENGINE=ExecutablePool('test_input_pool.py', 'TabSeparated', {source}) SETTINGS send_chunk_header=1, pool_size=1" query = "CREATE TABLE test_table (value String) ENGINE=Executable('test_input_multiple_pipes.py', 'TabSeparated', {source})"
# node.query("DROP TABLE IF EXISTS test_table") node.query("DROP TABLE IF EXISTS test_table")
# node.query(query.format(source='(SELECT 1)')) node.query(query.format(source='(SELECT 1), (SELECT 2), (SELECT 3)'))
assert node.query("SELECT * FROM test_table") == 'Key from 4 fd 3\nKey from 3 fd 2\nKey from 0 fd 1\n'
node.query("DROP TABLE test_table")
# assert node.query("SELECT * FROM test_table") == 'Key 1\n' node.query(query.format(source='(SELECT id FROM test_data_table), (SELECT 2), (SELECT 3)'))
# assert node.query("SELECT * FROM test_table") == 'Key 1\n' assert node.query("SELECT * FROM test_table") == 'Key from 4 fd 3\nKey from 3 fd 2\nKey from 0 fd 0\nKey from 0 fd 1\nKey from 0 fd 2\n'
# assert node.query("SELECT * FROM test_table") == 'Key 1\n' node.query("DROP TABLE test_table")
# node.query("DROP TABLE test_table") def test_executable_pool_storage_input_python(started_cluster):
skip_test_msan(node)
# node.query(query.format(source='(SELECT id FROM test_data_table)')) query = "CREATE TABLE test_table (value String) ENGINE=ExecutablePool('test_input_pool.py', 'TabSeparated', {source}) SETTINGS send_chunk_header=1, pool_size=1"
# assert node.query("SELECT * FROM test_table") == 'Key 0\nKey 1\nKey 2\n' node.query("DROP TABLE IF EXISTS test_table")
# assert node.query("SELECT * FROM test_table") == 'Key 0\nKey 1\nKey 2\n' node.query(query.format(source='(SELECT 1)'))
# assert node.query("SELECT * FROM test_table") == 'Key 0\nKey 1\nKey 2\n'
# node.query("DROP TABLE test_table") assert node.query("SELECT * FROM test_table") == 'Key 1\n'
assert node.query("SELECT * FROM test_table") == 'Key 1\n'
assert node.query("SELECT * FROM test_table") == 'Key 1\n'
# def test_executable_pool_storage_input_sum_python(started_cluster): node.query("DROP TABLE test_table")
# skip_test_msan(node)
# query = "CREATE TABLE test_table (value UInt64) ENGINE=ExecutablePool('test_input_sum_pool.py', 'TabSeparated', {source}) SETTINGS send_chunk_header=1, pool_size=1" node.query(query.format(source='(SELECT id FROM test_data_table)'))
# node.query("DROP TABLE IF EXISTS test_table") assert node.query("SELECT * FROM test_table") == 'Key 0\nKey 1\nKey 2\n'
# node.query(query.format(source='(SELECT 1, 1)')) assert node.query("SELECT * FROM test_table") == 'Key 0\nKey 1\nKey 2\n'
assert node.query("SELECT * FROM test_table") == 'Key 0\nKey 1\nKey 2\n'
# assert node.query("SELECT * FROM test_table") == '2\n' node.query("DROP TABLE test_table")
# assert node.query("SELECT * FROM test_table") == '2\n'
# assert node.query("SELECT * FROM test_table") == '2\n'
# node.query("DROP TABLE test_table") def test_executable_pool_storage_input_sum_python(started_cluster):
skip_test_msan(node)
# node.query(query.format(source='(SELECT id, id FROM test_data_table)')) query = "CREATE TABLE test_table (value UInt64) ENGINE=ExecutablePool('test_input_sum_pool.py', 'TabSeparated', {source}) SETTINGS send_chunk_header=1, pool_size=1"
# assert node.query("SELECT * FROM test_table") == '0\n2\n4\n' node.query("DROP TABLE IF EXISTS test_table")
# assert node.query("SELECT * FROM test_table") == '0\n2\n4\n' node.query(query.format(source='(SELECT 1, 1)'))
# assert node.query("SELECT * FROM test_table") == '0\n2\n4\n'
# node.query("DROP TABLE test_table") assert node.query("SELECT * FROM test_table") == '2\n'
assert node.query("SELECT * FROM test_table") == '2\n'
assert node.query("SELECT * FROM test_table") == '2\n'
# def test_executable_pool_storage_input_argument_python(started_cluster): node.query("DROP TABLE test_table")
# skip_test_msan(node)
# query = "CREATE TABLE test_table (value String) ENGINE=ExecutablePool('test_input_argument_pool.py 1', 'TabSeparated', {source}) SETTINGS send_chunk_header=1, pool_size=1" node.query(query.format(source='(SELECT id, id FROM test_data_table)'))
# node.query("DROP TABLE IF EXISTS test_table") assert node.query("SELECT * FROM test_table") == '0\n2\n4\n'
# node.query(query.format(source='(SELECT 1)')) assert node.query("SELECT * FROM test_table") == '0\n2\n4\n'
assert node.query("SELECT * FROM test_table") == '0\n2\n4\n'
# assert node.query("SELECT * FROM test_table") == 'Key 1 1\n' node.query("DROP TABLE test_table")
# assert node.query("SELECT * FROM test_table") == 'Key 1 1\n'
# assert node.query("SELECT * FROM test_table") == 'Key 1 1\n'
# node.query("DROP TABLE test_table") def test_executable_pool_storage_input_argument_python(started_cluster):
skip_test_msan(node)
# node.query(query.format(source='(SELECT id FROM test_data_table)')) query = "CREATE TABLE test_table (value String) ENGINE=ExecutablePool('test_input_argument_pool.py 1', 'TabSeparated', {source}) SETTINGS send_chunk_header=1, pool_size=1"
# assert node.query("SELECT * FROM test_table") == 'Key 1 0\nKey 1 1\nKey 1 2\n' node.query("DROP TABLE IF EXISTS test_table")
# assert node.query("SELECT * FROM test_table") == 'Key 1 0\nKey 1 1\nKey 1 2\n' node.query(query.format(source='(SELECT 1)'))
# assert node.query("SELECT * FROM test_table") == 'Key 1 0\nKey 1 1\nKey 1 2\n'
# node.query("DROP TABLE test_table") assert node.query("SELECT * FROM test_table") == 'Key 1 1\n'
assert node.query("SELECT * FROM test_table") == 'Key 1 1\n'
assert node.query("SELECT * FROM test_table") == 'Key 1 1\n'
# def test_executable_pool_storage_input_multiple_blocks_python(started_cluster): node.query("DROP TABLE test_table")
# skip_test_msan(node)
# query = "CREATE TABLE test_table (value String) ENGINE=ExecutablePool('test_input_multiple_blocks_pool.py', 'TabSeparated', {source}) SETTINGS send_chunk_header=1, pool_size=1" node.query(query.format(source='(SELECT id FROM test_data_table)'))
# node.query("DROP TABLE IF EXISTS test_table") assert node.query("SELECT * FROM test_table") == 'Key 1 0\nKey 1 1\nKey 1 2\n'
# node.query(query.format(source='(SELECT 1)')) assert node.query("SELECT * FROM test_table") == 'Key 1 0\nKey 1 1\nKey 1 2\n'
assert node.query("SELECT * FROM test_table") == 'Key 1 0\nKey 1 1\nKey 1 2\n'
# assert node.query("SELECT * FROM test_table") == 'Key 1\n' node.query("DROP TABLE test_table")
# assert node.query("SELECT * FROM test_table") == 'Key 1\n'
# assert node.query("SELECT * FROM test_table") == 'Key 1\n'
# node.query("DROP TABLE test_table") def test_executable_pool_storage_input_python(started_cluster):
skip_test_msan(node)
# node.query(query.format(source='(SELECT id FROM test_data_table)')) query = "CREATE TABLE test_table (value String) ENGINE=ExecutablePool('test_input_multiple_pipes_pool.py', 'TabSeparated', {source}) SETTINGS send_chunk_header=1, pool_size=1"
# assert node.query("SELECT * FROM test_table") == 'Key 0\nKey 1\nKey 2\n' node.query("DROP TABLE IF EXISTS test_table")
# assert node.query("SELECT * FROM test_table") == 'Key 0\nKey 1\nKey 2\n' node.query(query.format(source='(SELECT 1), (SELECT 2), (SELECT 3)'))
# assert node.query("SELECT * FROM test_table") == 'Key 0\nKey 1\nKey 2\n'
# node.query("DROP TABLE test_table") assert node.query("SELECT * FROM test_table") == 'Key from 4 fd 3\nKey from 3 fd 2\nKey from 0 fd 1\n'
assert node.query("SELECT * FROM test_table") == 'Key from 4 fd 3\nKey from 3 fd 2\nKey from 0 fd 1\n'
assert node.query("SELECT * FROM test_table") == 'Key from 4 fd 3\nKey from 3 fd 2\nKey from 0 fd 1\n'
# def test_executable_pool_storage(started_cluster): node.query("DROP TABLE test_table")
# skip_test_msan(node)
# node.query("DROP TABLE IF EXISTS test_table")
# node.query("CREATE TABLE test_table (value String) ENGINE=ExecutablePool('test_input_process_pool.sh', 'TabSeparated', (SELECT 1))")
# assert node.query("SELECT * FROM test_table") == 'Key 1\n'
# node.query("DROP TABLE test_table")
# def test_executable_pool_storage_multiple_pipes(started_cluster): node.query(query.format(source='(SELECT id FROM test_data_table), (SELECT 2), (SELECT 3)'))
# skip_test_msan(node)
# node.query("DROP TABLE IF EXISTS test_table") assert node.query("SELECT * FROM test_table") == 'Key from 4 fd 3\nKey from 3 fd 2\nKey from 0 fd 0\nKey from 0 fd 1\nKey from 0 fd 2\n'
# node.query("CREATE TABLE test_table (value String) ENGINE=ExecutablePool('test_input_process_pool_multiple_pipes.sh', 'TabSeparated', (SELECT 1), (SELECT 2), (SELECT 3))") assert node.query("SELECT * FROM test_table") == 'Key from 4 fd 3\nKey from 3 fd 2\nKey from 0 fd 0\nKey from 0 fd 1\nKey from 0 fd 2\n'
# assert node.query("SELECT * FROM test_table") == 'Key from 4 fd 3\nKey from 3 fd 2\nKey from 0 fd 1\n' assert node.query("SELECT * FROM test_table") == 'Key from 4 fd 3\nKey from 3 fd 2\nKey from 0 fd 0\nKey from 0 fd 1\nKey from 0 fd 2\n'
# node.query("DROP TABLE test_table")
node.query("DROP TABLE test_table")

View File

@ -7,55 +7,39 @@ if __name__ == '__main__':
fd3 = os.fdopen(3) fd3 = os.fdopen(3)
fd4 = os.fdopen(4) fd4 = os.fdopen(4)
for chunk_header in fd4: lines = []
fd4_chunk_length = int(chunk_header)
print(str(fd4_chunk_length), end='\n') for chunk_header_fd4 in fd4:
fd4_chunk_length = int(chunk_header_fd4)
while fd4_chunk_length != 0: while fd4_chunk_length != 0:
line = sys.stdin.readline() line = fd4.readline()
fd4_chunk_length -= 1 fd4_chunk_length -= 1
print("Key from fd 4 " + line, end='') lines.append("Key from 4 fd " + line)
sys.stdout.flush() for chunk_header_fd3 in fd3:
fd3_chunk_length = int(chunk_header_fd3)
for chunk_header in fd3:
fd3_chunk_length = int(chunk_header)
print(str(fd3_chunk_length), end='\n')
while fd3_chunk_length != 0: while fd3_chunk_length != 0:
line = sys.stdin.readline() line = fd3.readline()
fd3_chunk_length -= 1 fd3_chunk_length -= 1
print("Key from fd 3 " + line, end='') lines.append("Key from 3 fd " + line)
sys.stdout.flush()
for chunk_header in sys.stdin: for chunk_header in sys.stdin:
chunk_length = int(chunk_header) chunk_length = int(chunk_header)
print(str(chunk_length), end='\n')
while chunk_length != 0: while chunk_length != 0:
line = sys.stdin.readline() line = sys.stdin.readline()
chunk_length -= 1 chunk_length -= 1
print("Key " + line, end='') lines.append("Key from 0 fd " + line)
sys.stdout.flush() break
break
#!/usr/bin/python3 print(str(len(lines)), end='\n')
import sys for line in lines:
import os print(line, end='')
lines.clear()
if __name__ == '__main__': sys.stdout.flush()
fd3 = os.fdopen(3)
fd4 = os.fdopen(4)
for line in fd4:
print("Key from 4 fd " + line, end='')
for line in fd3:
print("Key from 3 fd " + line, end='')
for line in sys.stdin:
print("Key from 0 fd " + line, end='')
sys.stdout.flush()