Added support for input() function.

This commit is contained in:
Vitaly Baranov 2020-11-03 07:37:59 +03:00
parent 0e3a8840b5
commit 23842e7ac6
2 changed files with 36 additions and 4 deletions

View File

@ -497,6 +497,7 @@ namespace
String output_format; String output_format;
uint64_t interactive_delay = 100000; uint64_t interactive_delay = 100000;
bool send_exception_with_stacktrace = true; bool send_exception_with_stacktrace = true;
bool input_function_is_used = false;
BlockIO io; BlockIO io;
Progress progress; Progress progress;
@ -704,6 +705,26 @@ namespace
if (output_format.empty()) if (output_format.empty())
output_format = query_context->getDefaultFormat(); output_format = query_context->getDefaultFormat();
/// Set callbacks to execute function input().
query_context->setInputInitializer([this] (Context & context, const StoragePtr & input_storage)
{
if (&context != &query_context.value())
throw Exception("Unexpected context in Input initializer", ErrorCodes::LOGICAL_ERROR);
input_function_is_used = true;
initializeBlockInputStream(input_storage->getInMemoryMetadataPtr()->getSampleBlock());
block_input_stream->readPrefix();
});
query_context->setInputBlocksReaderCallback([this](Context & context) -> Block
{
if (&context != &query_context.value())
throw Exception("Unexpected context in InputBlocksReader", ErrorCodes::LOGICAL_ERROR);
auto block = block_input_stream->read();
if (!block)
block_input_stream->readSuffix();
return block;
});
/// Start executing the query. /// Start executing the query.
const auto * query_end = end; const auto * query_end = end;
if (insert_query && insert_query->data) if (insert_query && insert_query->data)
@ -837,8 +858,9 @@ namespace
block_output_stream = query_context->getOutputFormat(output_format, *write_buffer, async_in.getHeader()); block_output_stream = query_context->getOutputFormat(output_format, *write_buffer, async_in.getHeader());
Stopwatch after_send_progress; Stopwatch after_send_progress;
/// We are not going to receive input data anymore. /// Unless the input() function is used we are not going to receive input data anymore.
check_query_info_contains_cancel_only = true; if (!input_function_is_used)
check_query_info_contains_cancel_only = true;
auto check_for_cancel = [&] auto check_for_cancel = [&]
{ {
@ -909,8 +931,9 @@ namespace
block_output_stream->writePrefix(); block_output_stream->writePrefix();
Stopwatch after_send_progress; Stopwatch after_send_progress;
/// We are not going to receive input data anymore. /// Unless the input() function is used we are not going to receive input data anymore.
check_query_info_contains_cancel_only = true; if (!input_function_is_used)
check_query_info_contains_cancel_only = true;
auto check_for_cancel = [&] auto check_for_cancel = [&]
{ {

View File

@ -254,6 +254,15 @@ def test_no_session():
e = query_and_get_error("SET custom_x=1") e = query_and_get_error("SET custom_x=1")
assert "There is no session" in e.display_text assert "There is no session" in e.display_text
def test_input_function():
query("CREATE TABLE t (a UInt8) ENGINE = Memory")
query("INSERT INTO t SELECT col1 * col2 FROM input('col1 UInt8, col2 UInt8') FORMAT CSV", input_data=["5,4\n", "8,11\n", "10,12\n"])
assert query("SELECT a FROM t ORDER BY a") == "20\n88\n120\n"
query("INSERT INTO t SELECT col1 * col2 FROM input('col1 UInt8, col2 UInt8') FORMAT CSV 11,13")
assert query("SELECT a FROM t ORDER BY a") == "20\n88\n120\n143\n"
query("INSERT INTO t SELECT col1 * col2 FROM input('col1 UInt8, col2 UInt8') FORMAT CSV 20,10\n", input_data="15,15\n")
assert query("SELECT a FROM t ORDER BY a") == "20\n88\n120\n143\n200\n225\n"
def test_simultaneous_queries_same_channel(): def test_simultaneous_queries_same_channel():
threads=[] threads=[]
try: try: