Added input_data_delimiter to protocol.

This commit is contained in:
Vitaly Baranov 2020-11-04 17:59:31 +03:00
parent 797c84889f
commit 218d9ea3e8
3 changed files with 31 additions and 8 deletions

View File

@ -216,6 +216,7 @@ namespace
ASTPtr ast;
ASTInsertQuery * insert_query = nullptr;
String input_format;
String input_data_delimiter;
String output_format;
uint64_t interactive_delay = 100000;
bool send_exception_with_stacktrace = true;
@ -238,6 +239,7 @@ namespace
BlockOutputStreamPtr block_output_stream;
bool need_input_data_from_insert_query = true;
bool need_input_data_from_query_info = true;
bool need_input_data_delimiter = false;
Stopwatch query_time;
UInt64 waited_for_client_reading = 0;
@ -385,6 +387,8 @@ namespace
input_format = "Values";
}
input_data_delimiter = query_info.input_data_delimiter();
/// Choose output format.
query_context->setDefaultFormat(query_info.output_format());
if (const auto * ast_query_with_output = dynamic_cast<const ASTQueryWithOutput *>(ast.get());
@ -442,6 +446,7 @@ namespace
need_input_data_from_insert_query = false;
if (insert_query && insert_query->data && (insert_query->data != insert_query->end))
{
need_input_data_delimiter = !input_data_delimiter.empty();
return {insert_query->data, insert_query->end - insert_query->data};
}
}
@ -450,9 +455,17 @@ namespace
{
if (need_input_data_from_query_info)
{
if (need_input_data_delimiter && !query_info.input_data().empty())
{
need_input_data_delimiter = false;
return {input_data_delimiter.data(), input_data_delimiter.size()};
}
need_input_data_from_query_info = false;
if (!query_info.input_data().empty())
{
need_input_data_delimiter = !input_data_delimiter.empty();
return {query_info.input_data().data(), query_info.input_data().size()};
}
}
if (!query_info.next_query_info())

View File

@ -8,11 +8,12 @@ message QueryInfo {
map<string, string> settings = 3;
string database = 4;
string input_data = 5;
string output_format = 6;
string user_name = 7;
string password = 8;
string quota = 9;
bool next_query_info = 10;
string input_data_delimiter = 6;
string output_format = 7;
string user_name = 8;
string password = 9;
string quota = 10;
bool next_query_info = 11;
}
message Progress {

View File

@ -39,7 +39,7 @@ def create_channel():
main_channel = channel
return channel
def query_common(query_text, settings={}, input_data=[], output_format='TabSeparated', query_id='123', channel=None):
def query_common(query_text, settings={}, input_data=[], input_data_delimiter='', output_format='TabSeparated', query_id='123', channel=None):
if type(input_data) == str:
input_data = [input_data]
if not channel:
@ -47,8 +47,8 @@ def query_common(query_text, settings={}, input_data=[], output_format='TabSepar
stub = clickhouse_grpc_pb2_grpc.ClickHouseStub(channel)
def send_query_info():
input_data_part = input_data.pop(0) if input_data else ''
yield clickhouse_grpc_pb2.QueryInfo(query=query_text, settings=settings, input_data=input_data_part, output_format=output_format,
query_id=query_id, next_query_info=bool(input_data))
yield clickhouse_grpc_pb2.QueryInfo(query=query_text, settings=settings, input_data=input_data_part, input_data_delimiter=input_data_delimiter,
output_format=output_format, query_id=query_id, next_query_info=bool(input_data))
while input_data:
input_data_part = input_data.pop(0)
yield clickhouse_grpc_pb2.QueryInfo(input_data=input_data_part, next_query_info=bool(input_data))
@ -120,6 +120,15 @@ def test_insert_query_streaming():
query("INSERT INTO t VALUES", input_data=["(1),(2),(3),", "(5),(4),(6),", "(7),(8),(9)"])
assert query("SELECT a FROM t ORDER BY a") == "1\n2\n3\n4\n5\n6\n7\n8\n9\n"
def test_insert_query_delimiter():
query("CREATE TABLE t (a UInt8) ENGINE = Memory")
query("INSERT INTO t FORMAT CSV 1\n2", input_data=["3", "4\n5"], input_data_delimiter='\n')
assert query("SELECT a FROM t ORDER BY a") == "1\n2\n3\n4\n5\n"
query("DROP TABLE t")
query("CREATE TABLE t (a UInt8) ENGINE = Memory")
query("INSERT INTO t FORMAT CSV 1\n2", input_data=["3", "4\n5"])
assert query("SELECT a FROM t ORDER BY a") == "1\n5\n234\n"
def test_insert_default_column():
query("CREATE TABLE t (a UInt8, b Int32 DEFAULT 100, c String DEFAULT 'c') ENGINE = Memory")
query("INSERT INTO t (c, a) VALUES ('x',1),('y',2)")