Merge pull request #34499 from vitlibar/grpc-send-output-format-back-to-client

gRPC: Send output format back to client
This commit is contained in:
Maksim Kita 2022-02-13 15:34:32 +01:00 committed by GitHub
commit 91bc9cd4cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 81 additions and 4 deletions

View File

@ -642,6 +642,9 @@ namespace
void throwIfFailedToReadQueryInfo();
bool isQueryCancelled();
void addQueryDetailsToResult();
void addOutputFormatToResult();
void addOutputColumnsNamesAndTypesToResult(const Block & headers);
void addProgressToResult();
void addTotalsToResult(const Block & totals);
void addExtremesToResult(const Block & extremes);
@ -667,6 +670,7 @@ namespace
CompressionMethod input_compression_method = CompressionMethod::None;
PODArray<char> output;
String output_format;
bool send_output_columns_names_and_types = false;
CompressionMethod output_compression_method = CompressionMethod::None;
int output_compression_level = 0;
@ -888,6 +892,8 @@ namespace
if (output_format.empty())
output_format = query_context->getDefaultFormat();
send_output_columns_names_and_types = query_info.send_output_columns();
/// Choose compression.
String input_compression_method_str = query_info.input_compression_type();
if (input_compression_method_str.empty())
@ -1150,6 +1156,9 @@ namespace
void Call::generateOutput()
{
/// We add query_id and time_zone to the first result anyway.
addQueryDetailsToResult();
if (!io.pipeline.initialized() || io.pipeline.pushing())
return;
@ -1189,6 +1198,9 @@ namespace
return true;
};
addOutputFormatToResult();
addOutputColumnsNamesAndTypesToResult(header);
Block block;
while (check_for_cancel())
{
@ -1439,6 +1451,29 @@ namespace
return false;
}
void Call::addQueryDetailsToResult()
{
*result.mutable_query_id() = query_context->getClientInfo().current_query_id;
*result.mutable_time_zone() = DateLUT::instance().getTimeZone();
}
void Call::addOutputFormatToResult()
{
*result.mutable_output_format() = output_format;
}
void Call::addOutputColumnsNamesAndTypesToResult(const Block & header)
{
if (!send_output_columns_names_and_types)
return;
for (const auto & column : header)
{
auto & name_and_type = *result.add_output_columns();
*name_and_type.mutable_name() = column.name;
*name_and_type.mutable_type() = column.type->getName();
}
}
void Call::addProgressToResult()
{
auto values = progress.fetchAndResetPiecewiseAtomically();

View File

@ -82,6 +82,9 @@ message QueryInfo {
// Default output format. If not specified, 'TabSeparated' is used.
string output_format = 7;
// Set it if you want the names and the types of output columns to be sent to the client.
bool send_output_columns = 24;
repeated ExternalTable external_tables = 8;
string user_name = 9;
@ -187,7 +190,17 @@ message Exception {
// Result of execution of a query which is sent back by the ClickHouse server to the client.
message Result {
// Output of the query, represented in the `output_format` or in a format specified in `query`.
string query_id = 9;
string time_zone = 10;
// The format in which `output`, `totals` and `extremes` are written.
// It's either the same as `output_format` specified in `QueryInfo` or the format specified in the query itself.
string output_format = 11;
// The names and types of columns of the result written in `output`.
repeated NameAndType output_columns = 12;
// Output of the query, represented in the `output_format`.
bytes output = 1;
bytes totals = 2;
bytes extremes = 3;

View File

@ -2,6 +2,8 @@ import os
import pytest
import sys
import time
import pytz
import uuid
import grpc
from helpers.cluster import ClickHouseCluster, run_and_check
from threading import Thread
@ -43,8 +45,8 @@ def create_channel():
main_channel = channel
return channel
def query_common(query_text, settings={}, input_data=[], input_data_delimiter='', output_format='TabSeparated', external_tables=[],
user_name='', password='', query_id='123', session_id='', stream_output=False, channel=None):
def query_common(query_text, settings={}, input_data=[], input_data_delimiter='', output_format='TabSeparated', send_output_columns=False,
external_tables=[], user_name='', password='', query_id='123', session_id='', stream_output=False, channel=None):
if type(input_data) is not list:
input_data = [input_data]
if type(input_data_delimiter) is str:
@ -58,7 +60,8 @@ def query_common(query_text, settings={}, input_data=[], input_data_delimiter=''
input_data_part = input_data_part.encode(DEFAULT_ENCODING)
return clickhouse_grpc_pb2.QueryInfo(query=query_text, settings=settings, input_data=input_data_part,
input_data_delimiter=input_data_delimiter, output_format=output_format,
external_tables=external_tables, user_name=user_name, password=password, query_id=query_id,
send_output_columns=send_output_columns, external_tables=external_tables,
user_name=user_name, password=password, query_id=query_id,
session_id=session_id, next_query_info=bool(input_data))
def send_query_info():
yield query_info()
@ -204,6 +207,28 @@ def test_totals_and_extremes():
assert query("SELECT x, y FROM t") == "1\t2\n2\t4\n3\t2\n3\t3\n3\t4\n"
assert query_and_get_extremes("SELECT x, y FROM t", settings={"extremes": "1"}) == "1\t2\n3\t4\n"
def test_get_query_details():
result = list(query_no_errors("CREATE TABLE t (a UInt8) ENGINE = Memory", query_id = '123'))[0]
assert result.query_id == '123'
pytz.timezone(result.time_zone)
assert result.output_format == ''
assert len(result.output_columns) == 0
assert result.output == b''
#
result = list(query_no_errors("SELECT 'a', 1", query_id = '', output_format = 'TabSeparated'))[0]
uuid.UUID(result.query_id)
pytz.timezone(result.time_zone)
assert result.output_format == 'TabSeparated'
assert len(result.output_columns) == 0
assert result.output == b'a\t1\n'
#
result = list(query_no_errors("SELECT 'a' AS x, 1 FORMAT JSONEachRow", query_id = '', send_output_columns=True))[0]
uuid.UUID(result.query_id)
pytz.timezone(result.time_zone)
assert result.output_format == 'JSONEachRow'
assert ([(col.name, col.type) for col in result.output_columns]) == [('x', 'String'), ('1', 'UInt8')]
assert result.output == b'{"x":"a","1":1}\n'
def test_errors_handling():
e = query_and_get_error("")
#print(e)
@ -225,6 +250,9 @@ def test_logs():
def test_progress():
results = query_no_errors("SELECT number, sleep(0.31) FROM numbers(8) SETTINGS max_block_size=2, interactive_delay=100000", stream_output=True)
for result in results:
result.time_zone = ''
result.query_id = ''
#print(results)
assert str(results) ==\
"""[progress {
@ -232,6 +260,7 @@ def test_progress():
read_bytes: 16
total_rows_to_read: 8
}
output_format: "TabSeparated"
, output: "0\\t0\\n1\\t0\\n"
, progress {
read_rows: 2