Use bytes instead of strings in the GRPC protocol.

This commit is contained in:
Vitaly Baranov 2021-08-06 13:55:49 +03:00
parent 40a3c8281a
commit 9a40ce87e9
4 changed files with 65 additions and 53 deletions

View File

@ -32,7 +32,7 @@ message ExternalTable {
// Data to insert to the external table.
// If a method with streaming input (i.e. ExecuteQueryWithStreamInput() or ExecuteQueryWithStreamIO()) is used,
// then data for insertion to the same external table can be split between multiple QueryInfos.
string data = 3;
bytes data = 3;
// Format of the data to insert to the external table.
string format = 4;
@ -53,10 +53,10 @@ message QueryInfo {
string database = 4;
// Input data, used both as data for INSERT query and as data for the input() function.
string input_data = 5;
bytes input_data = 5;
// Delimiter for input_data, inserted between input_data from adjacent QueryInfos.
string input_data_delimiter = 6;
bytes input_data_delimiter = 6;
// Default output format. If not specified, 'TabSeparated' is used.
string output_format = 7;
@ -128,9 +128,9 @@ message Exception {
// Result of execution of a query which is sent back by the ClickHouse server to the client.
message Result {
// Output of the query, represented in the `output_format` or in a format specified in `query`.
string output = 1;
string totals = 2;
string extremes = 3;
bytes output = 1;
bytes totals = 2;
bytes extremes = 3;
repeated LogEntry logs = 4;
Progress progress = 5;

View File

@ -6,7 +6,9 @@ import grpc
from helpers.cluster import ClickHouseCluster, run_and_check
from threading import Thread
GRPC_PORT = 9100
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
DEFAULT_ENCODING = 'utf-8'
# Use grpcio-tools to generate *pb2.py files from *.proto.
@ -28,11 +30,10 @@ import clickhouse_grpc_pb2_grpc
config_dir = os.path.join(SCRIPT_DIR, './configs')
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance('node', main_configs=['configs/grpc_config.xml'])
grpc_port = 9100
main_channel = None
def create_channel():
node_ip_with_grpc_port = cluster.get_instance_ip('node') + ':' + str(grpc_port)
node_ip_with_grpc_port = cluster.get_instance_ip('node') + ':' + str(GRPC_PORT)
channel = grpc.insecure_channel(node_ip_with_grpc_port)
grpc.channel_ready_future(channel).result(timeout=10)
global main_channel
@ -42,20 +43,27 @@ def create_channel():
def query_common(query_text, settings={}, input_data=[], input_data_delimiter='', output_format='TabSeparated', external_tables=[],
user_name='', password='', query_id='123', session_id='', stream_output=False, channel=None):
if type(input_data) == str:
if type(input_data) is not list:
input_data = [input_data]
if type(input_data_delimiter) is str:
input_data_delimiter=input_data_delimiter.encode(DEFAULT_ENCODING)
if not channel:
channel = main_channel
stub = clickhouse_grpc_pb2_grpc.ClickHouseStub(channel)
def query_info():
input_data_part = input_data.pop(0) if input_data else ''
return clickhouse_grpc_pb2.QueryInfo(query=query_text, settings=settings, input_data=input_data_part, input_data_delimiter=input_data_delimiter,
output_format=output_format, external_tables=external_tables, user_name=user_name, password=password,
query_id=query_id, session_id=session_id, next_query_info=bool(input_data))
input_data_part = input_data.pop(0) if input_data else b''
if type(input_data_part) is str:
input_data_part = input_data_part.encode(DEFAULT_ENCODING)
return clickhouse_grpc_pb2.QueryInfo(query=query_text, settings=settings, input_data=input_data_part,
input_data_delimiter=input_data_delimiter, output_format=output_format,
external_tables=external_tables, user_name=user_name, password=password, query_id=query_id,
session_id=session_id, next_query_info=bool(input_data))
def send_query_info():
yield query_info()
while input_data:
input_data_part = input_data.pop(0)
if type(input_data_part) is str:
input_data_part = input_data_part.encode(DEFAULT_ENCODING)
yield clickhouse_grpc_pb2.QueryInfo(input_data=input_data_part, next_query_info=bool(input_data))
stream_input = len(input_data) > 1
if stream_input and stream_output:
@ -74,10 +82,10 @@ def query_no_errors(*args, **kwargs):
return results
def query(*args, **kwargs):
output = ""
output = b''
for result in query_no_errors(*args, **kwargs):
output += result.output
return output
return output.decode(DEFAULT_ENCODING)
def query_and_get_error(*args, **kwargs):
results = query_common(*args, **kwargs)
@ -86,16 +94,16 @@ def query_and_get_error(*args, **kwargs):
return results[-1].exception
def query_and_get_totals(*args, **kwargs):
totals = ""
totals = b''
for result in query_no_errors(*args, **kwargs):
totals += result.totals
return totals
return totals.decode(DEFAULT_ENCODING)
def query_and_get_extremes(*args, **kwargs):
extremes = ""
extremes = b''
for result in query_no_errors(*args, **kwargs):
extremes += result.extremes
return extremes
return extremes.decode(DEFAULT_ENCODING)
def query_and_get_logs(*args, **kwargs):
logs = ""
@ -135,6 +143,7 @@ def reset_after_test():
yield
node.query_with_retry("DROP TABLE IF EXISTS t")
# Actual tests
def test_select_one():
@ -270,18 +279,18 @@ def test_input_function():
def test_external_table():
columns = [clickhouse_grpc_pb2.NameAndType(name='UserID', type='UInt64'), clickhouse_grpc_pb2.NameAndType(name='UserName', type='String')]
ext1 = clickhouse_grpc_pb2.ExternalTable(name='ext1', columns=columns, data='1\tAlex\n2\tBen\n3\tCarl\n', format='TabSeparated')
ext1 = clickhouse_grpc_pb2.ExternalTable(name='ext1', columns=columns, data=b'1\tAlex\n2\tBen\n3\tCarl\n', format='TabSeparated')
assert query("SELECT * FROM ext1 ORDER BY UserID", external_tables=[ext1]) == "1\tAlex\n"\
"2\tBen\n"\
"3\tCarl\n"
ext2 = clickhouse_grpc_pb2.ExternalTable(name='ext2', columns=columns, data='4,Daniel\n5,Ethan\n', format='CSV')
ext2 = clickhouse_grpc_pb2.ExternalTable(name='ext2', columns=columns, data=b'4,Daniel\n5,Ethan\n', format='CSV')
assert query("SELECT * FROM (SELECT * FROM ext1 UNION ALL SELECT * FROM ext2) ORDER BY UserID", external_tables=[ext1, ext2]) == "1\tAlex\n"\
"2\tBen\n"\
"3\tCarl\n"\
"4\tDaniel\n"\
"5\tEthan\n"
unnamed_columns = [clickhouse_grpc_pb2.NameAndType(type='UInt64'), clickhouse_grpc_pb2.NameAndType(type='String')]
unnamed_table = clickhouse_grpc_pb2.ExternalTable(columns=unnamed_columns, data='6\tGeorge\n7\tFred\n')
unnamed_table = clickhouse_grpc_pb2.ExternalTable(columns=unnamed_columns, data=b'6\tGeorge\n7\tFred\n')
assert query("SELECT * FROM _data ORDER BY _2", external_tables=[unnamed_table]) == "7\tFred\n"\
"6\tGeorge\n"
@ -289,16 +298,16 @@ def test_external_table_streaming():
columns = [clickhouse_grpc_pb2.NameAndType(name='UserID', type='UInt64'), clickhouse_grpc_pb2.NameAndType(name='UserName', type='String')]
def send_query_info():
yield clickhouse_grpc_pb2.QueryInfo(query="SELECT * FROM exts ORDER BY UserID",
external_tables=[clickhouse_grpc_pb2.ExternalTable(name='exts', columns=columns, data='1\tAlex\n2\tBen\n3\tCarl\n')],
external_tables=[clickhouse_grpc_pb2.ExternalTable(name='exts', columns=columns, data=b'1\tAlex\n2\tBen\n3\tCarl\n')],
next_query_info=True)
yield clickhouse_grpc_pb2.QueryInfo(external_tables=[clickhouse_grpc_pb2.ExternalTable(name='exts', data='4\tDaniel\n5\tEthan\n')])
yield clickhouse_grpc_pb2.QueryInfo(external_tables=[clickhouse_grpc_pb2.ExternalTable(name='exts', data=b'4\tDaniel\n5\tEthan\n')])
stub = clickhouse_grpc_pb2_grpc.ClickHouseStub(main_channel)
result = stub.ExecuteQueryWithStreamInput(send_query_info())
assert result.output == "1\tAlex\n"\
"2\tBen\n"\
"3\tCarl\n"\
"4\tDaniel\n"\
"5\tEthan\n"
assert result.output == b'1\tAlex\n'\
b'2\tBen\n'\
b'3\tCarl\n'\
b'4\tDaniel\n'\
b'5\tEthan\n'
def test_simultaneous_queries_same_channel():
threads=[]
@ -325,8 +334,8 @@ def test_simultaneous_queries_multiple_channels():
def test_cancel_while_processing_input():
query("CREATE TABLE t (a UInt8) ENGINE = Memory")
def send_query_info():
yield clickhouse_grpc_pb2.QueryInfo(query="INSERT INTO t FORMAT TabSeparated", input_data="1\n2\n3\n", next_query_info=True)
yield clickhouse_grpc_pb2.QueryInfo(input_data="4\n5\n6\n", next_query_info=True)
yield clickhouse_grpc_pb2.QueryInfo(query="INSERT INTO t FORMAT TabSeparated", input_data=b'1\n2\n3\n', next_query_info=True)
yield clickhouse_grpc_pb2.QueryInfo(input_data=b'4\n5\n6\n', next_query_info=True)
yield clickhouse_grpc_pb2.QueryInfo(cancel=True)
stub = clickhouse_grpc_pb2_grpc.ClickHouseStub(main_channel)
result = stub.ExecuteQueryWithStreamInput(send_query_info())
@ -343,7 +352,7 @@ def test_cancel_while_generating_output():
results = list(stub.ExecuteQueryWithStreamIO(send_query_info()))
assert len(results) >= 1
assert results[-1].cancelled == True
output = ''
output = b''
for result in results:
output += result.output
assert output == '0\t0\n1\t0\n2\t0\n3\t0\n'
assert output == b'0\t0\n1\t0\n2\t0\n3\t0\n'

View File

@ -4,7 +4,10 @@ import sys
import grpc
from helpers.cluster import ClickHouseCluster, run_and_check
GRPC_PORT = 9100
NODE_IP = '10.5.172.77' # It's important for the node to work at this IP because 'server-cert.pem' requires that (see server-ext.cnf).
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
DEFAULT_ENCODING = 'utf-8'
# Use grpcio-tools to generate *pb2.py files from *.proto.
@ -23,12 +26,10 @@ import clickhouse_grpc_pb2_grpc
# Utilities
node_ip = '10.5.172.77' # It's important for the node to work at this IP because 'server-cert.pem' requires that (see server-ext.cnf).
grpc_port = 9100
node_ip_with_grpc_port = node_ip + ':' + str(grpc_port)
node_ip_with_grpc_port = NODE_IP + ':' + str(GRPC_PORT)
config_dir = os.path.join(SCRIPT_DIR, './configs')
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance('node', ipv4_address=node_ip, main_configs=['configs/grpc_config.xml', 'configs/server-key.pem', 'configs/server-cert.pem', 'configs/ca-cert.pem'])
node = cluster.add_instance('node', ipv4_address=NODE_IP, main_configs=['configs/grpc_config.xml', 'configs/server-key.pem', 'configs/server-cert.pem', 'configs/ca-cert.pem'])
def create_secure_channel():
ca_cert = open(os.path.join(config_dir, 'ca-cert.pem'), 'rb').read()
@ -59,7 +60,7 @@ def query(query_text, channel):
result = stub.ExecuteQuery(query_info)
if result and result.HasField('exception'):
raise Exception(result.exception.display_text)
return result.output
return result.output.decode(DEFAULT_ENCODING)
@pytest.fixture(scope="module", autouse=True)
def start_cluster():

View File

@ -14,13 +14,14 @@ import grpc # pip3 install grpcio
import grpc_tools # pip3 install grpcio-tools
import argparse, cmd, os, signal, subprocess, sys, threading, time, uuid
default_host = 'localhost'
default_port = 9100
default_user_name = 'default'
default_output_format_for_interactive_mode = 'PrettyCompact'
history_filename = '~/.clickhouse_grpc_history'
history_size = 1000
stdin_bufsize = 1048576
DEFAULT_HOST = 'localhost'
DEFAULT_PORT = 9100
DEFAULT_USER_NAME = 'default'
DEFAULT_OUTPUT_FORMAT_FOR_INTERACTIVE_MODE = 'PrettyCompact'
HISTORY_FILENAME = '~/.clickhouse_grpc_history'
HISTORY_SIZE = 1000
STDIN_BUFFER_SIZE = 1048576
DEFAULT_ENCODING = 'utf-8'
class ClickHouseGRPCError(Exception):
@ -52,7 +53,7 @@ def error_print(*args, **kwargs):
class ClickHouseGRPCClient(cmd.Cmd):
prompt="grpc :) "
def __init__(self, host=default_host, port=default_port, user_name=default_user_name, password='',
def __init__(self, host=DEFAULT_HOST, port=DEFAULT_PORT, user_name=DEFAULT_USER_NAME, password='',
database='', output_format='', settings='', verbatim=False, show_debug_info=False):
super(ClickHouseGRPCClient, self).__init__(completekey=None)
self.host = host
@ -85,7 +86,7 @@ class ClickHouseGRPCClient(cmd.Cmd):
if self.show_debug_info:
print('\nresult={}'.format(result))
ClickHouseGRPCClient.__check_no_errors(result)
return result.output
return result.output.decode(DEFAULT_ENCODING)
# Executes a query using the stream IO and with ability to cancel it by pressing Ctrl+C.
def run_query(self, query_text, raise_exceptions=True, allow_cancel=False):
@ -117,7 +118,7 @@ class ClickHouseGRPCClient(cmd.Cmd):
# send input data
if not sys.stdin.isatty():
while True:
info.input_data = sys.stdin.buffer.read(stdin_bufsize)
info.input_data = sys.stdin.buffer.read(STDIN_BUFFER_SIZE)
if not info.input_data:
break
info.next_query_info = True
@ -134,7 +135,8 @@ class ClickHouseGRPCClient(cmd.Cmd):
if self.show_debug_info:
print('\nresult={}'.format(result))
ClickHouseGRPCClient.__check_no_errors(result)
print(result.output, end='')
sys.stdout.buffer.write(result.output)
sys.stdout.flush()
if result.cancelled:
cancelled = True
self.verbatim_print("Query was cancelled.")
@ -244,7 +246,7 @@ class ClickHouseGRPCClient(cmd.Cmd):
import readline
except ImportError:
readline = None
histfile = os.path.expanduser(history_filename)
histfile = os.path.expanduser(HISTORY_FILENAME)
if readline and os.path.exists(histfile):
readline.read_history_file(histfile)
@ -252,8 +254,8 @@ class ClickHouseGRPCClient(cmd.Cmd):
def __write_history():
global readline
if readline:
readline.set_history_length(history_size)
histfile = os.path.expanduser(history_filename)
readline.set_history_length(HISTORY_SIZE)
histfile = os.path.expanduser(HISTORY_FILENAME)
readline.write_history_file(histfile)
@ -281,7 +283,7 @@ def main(args):
output_format = args.output_format
if not output_format and interactive_mode:
output_format = default_output_format_for_interactive_mode
output_format = DEFAULT_OUTPUT_FORMAT_FOR_INTERACTIVE_MODE
try:
with ClickHouseGRPCClient(host=args.host, port=args.port, user_name=args.user_name, password=args.password,