diff --git a/src/Server/grpc_protos/clickhouse_grpc.proto b/src/Server/grpc_protos/clickhouse_grpc.proto index 2f25973297c..da01432d76c 100644 --- a/src/Server/grpc_protos/clickhouse_grpc.proto +++ b/src/Server/grpc_protos/clickhouse_grpc.proto @@ -32,7 +32,7 @@ message ExternalTable { // Data to insert to the external table. // If a method with streaming input (i.e. ExecuteQueryWithStreamInput() or ExecuteQueryWithStreamIO()) is used, // then data for insertion to the same external table can be split between multiple QueryInfos. - string data = 3; + bytes data = 3; // Format of the data to insert to the external table. string format = 4; @@ -53,10 +53,10 @@ message QueryInfo { string database = 4; // Input data, used both as data for INSERT query and as data for the input() function. - string input_data = 5; + bytes input_data = 5; // Delimiter for input_data, inserted between input_data from adjacent QueryInfos. - string input_data_delimiter = 6; + bytes input_data_delimiter = 6; // Default output format. If not specified, 'TabSeparated' is used. string output_format = 7; @@ -128,9 +128,9 @@ message Exception { // Result of execution of a query which is sent back by the ClickHouse server to the client. message Result { // Output of the query, represented in the `output_format` or in a format specified in `query`. - string output = 1; - string totals = 2; - string extremes = 3; + bytes output = 1; + bytes totals = 2; + bytes extremes = 3; repeated LogEntry logs = 4; Progress progress = 5; diff --git a/tests/integration/test_grpc_protocol/test.py b/tests/integration/test_grpc_protocol/test.py index ee7e94bad1d..b0c1f8067b6 100644 --- a/tests/integration/test_grpc_protocol/test.py +++ b/tests/integration/test_grpc_protocol/test.py @@ -6,7 +6,9 @@ import grpc from helpers.cluster import ClickHouseCluster, run_and_check from threading import Thread +GRPC_PORT = 9100 SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) +DEFAULT_ENCODING = 'utf-8' # Use grpcio-tools to generate *pb2.py files from *.proto. @@ -28,11 +30,10 @@ import clickhouse_grpc_pb2_grpc config_dir = os.path.join(SCRIPT_DIR, './configs') cluster = ClickHouseCluster(__file__) node = cluster.add_instance('node', main_configs=['configs/grpc_config.xml']) -grpc_port = 9100 main_channel = None def create_channel(): - node_ip_with_grpc_port = cluster.get_instance_ip('node') + ':' + str(grpc_port) + node_ip_with_grpc_port = cluster.get_instance_ip('node') + ':' + str(GRPC_PORT) channel = grpc.insecure_channel(node_ip_with_grpc_port) grpc.channel_ready_future(channel).result(timeout=10) global main_channel @@ -42,20 +43,27 @@ def create_channel(): def query_common(query_text, settings={}, input_data=[], input_data_delimiter='', output_format='TabSeparated', external_tables=[], user_name='', password='', query_id='123', session_id='', stream_output=False, channel=None): - if type(input_data) == str: + if type(input_data) is not list: input_data = [input_data] + if type(input_data_delimiter) is str: + input_data_delimiter=input_data_delimiter.encode(DEFAULT_ENCODING) if not channel: channel = main_channel stub = clickhouse_grpc_pb2_grpc.ClickHouseStub(channel) def query_info(): - input_data_part = input_data.pop(0) if input_data else '' - return clickhouse_grpc_pb2.QueryInfo(query=query_text, settings=settings, input_data=input_data_part, input_data_delimiter=input_data_delimiter, - output_format=output_format, external_tables=external_tables, user_name=user_name, password=password, - query_id=query_id, session_id=session_id, next_query_info=bool(input_data)) + input_data_part = input_data.pop(0) if input_data else b'' + if type(input_data_part) is str: + input_data_part = input_data_part.encode(DEFAULT_ENCODING) + return clickhouse_grpc_pb2.QueryInfo(query=query_text, settings=settings, input_data=input_data_part, + input_data_delimiter=input_data_delimiter, output_format=output_format, + external_tables=external_tables, user_name=user_name, password=password, query_id=query_id, + session_id=session_id, next_query_info=bool(input_data)) def send_query_info(): yield query_info() while input_data: input_data_part = input_data.pop(0) + if type(input_data_part) is str: + input_data_part = input_data_part.encode(DEFAULT_ENCODING) yield clickhouse_grpc_pb2.QueryInfo(input_data=input_data_part, next_query_info=bool(input_data)) stream_input = len(input_data) > 1 if stream_input and stream_output: @@ -74,10 +82,10 @@ def query_no_errors(*args, **kwargs): return results def query(*args, **kwargs): - output = "" + output = b'' for result in query_no_errors(*args, **kwargs): output += result.output - return output + return output.decode(DEFAULT_ENCODING) def query_and_get_error(*args, **kwargs): results = query_common(*args, **kwargs) @@ -86,16 +94,16 @@ def query_and_get_error(*args, **kwargs): return results[-1].exception def query_and_get_totals(*args, **kwargs): - totals = "" + totals = b'' for result in query_no_errors(*args, **kwargs): totals += result.totals - return totals + return totals.decode(DEFAULT_ENCODING) def query_and_get_extremes(*args, **kwargs): - extremes = "" + extremes = b'' for result in query_no_errors(*args, **kwargs): extremes += result.extremes - return extremes + return extremes.decode(DEFAULT_ENCODING) def query_and_get_logs(*args, **kwargs): logs = "" @@ -135,6 +143,7 @@ def reset_after_test(): yield node.query_with_retry("DROP TABLE IF EXISTS t") + # Actual tests def test_select_one(): @@ -270,18 +279,18 @@ def test_input_function(): def test_external_table(): columns = [clickhouse_grpc_pb2.NameAndType(name='UserID', type='UInt64'), clickhouse_grpc_pb2.NameAndType(name='UserName', type='String')] - ext1 = clickhouse_grpc_pb2.ExternalTable(name='ext1', columns=columns, data='1\tAlex\n2\tBen\n3\tCarl\n', format='TabSeparated') + ext1 = clickhouse_grpc_pb2.ExternalTable(name='ext1', columns=columns, data=b'1\tAlex\n2\tBen\n3\tCarl\n', format='TabSeparated') assert query("SELECT * FROM ext1 ORDER BY UserID", external_tables=[ext1]) == "1\tAlex\n"\ "2\tBen\n"\ "3\tCarl\n" - ext2 = clickhouse_grpc_pb2.ExternalTable(name='ext2', columns=columns, data='4,Daniel\n5,Ethan\n', format='CSV') + ext2 = clickhouse_grpc_pb2.ExternalTable(name='ext2', columns=columns, data=b'4,Daniel\n5,Ethan\n', format='CSV') assert query("SELECT * FROM (SELECT * FROM ext1 UNION ALL SELECT * FROM ext2) ORDER BY UserID", external_tables=[ext1, ext2]) == "1\tAlex\n"\ "2\tBen\n"\ "3\tCarl\n"\ "4\tDaniel\n"\ "5\tEthan\n" unnamed_columns = [clickhouse_grpc_pb2.NameAndType(type='UInt64'), clickhouse_grpc_pb2.NameAndType(type='String')] - unnamed_table = clickhouse_grpc_pb2.ExternalTable(columns=unnamed_columns, data='6\tGeorge\n7\tFred\n') + unnamed_table = clickhouse_grpc_pb2.ExternalTable(columns=unnamed_columns, data=b'6\tGeorge\n7\tFred\n') assert query("SELECT * FROM _data ORDER BY _2", external_tables=[unnamed_table]) == "7\tFred\n"\ "6\tGeorge\n" @@ -289,16 +298,16 @@ def test_external_table_streaming(): columns = [clickhouse_grpc_pb2.NameAndType(name='UserID', type='UInt64'), clickhouse_grpc_pb2.NameAndType(name='UserName', type='String')] def send_query_info(): yield clickhouse_grpc_pb2.QueryInfo(query="SELECT * FROM exts ORDER BY UserID", - external_tables=[clickhouse_grpc_pb2.ExternalTable(name='exts', columns=columns, data='1\tAlex\n2\tBen\n3\tCarl\n')], + external_tables=[clickhouse_grpc_pb2.ExternalTable(name='exts', columns=columns, data=b'1\tAlex\n2\tBen\n3\tCarl\n')], next_query_info=True) - yield clickhouse_grpc_pb2.QueryInfo(external_tables=[clickhouse_grpc_pb2.ExternalTable(name='exts', data='4\tDaniel\n5\tEthan\n')]) + yield clickhouse_grpc_pb2.QueryInfo(external_tables=[clickhouse_grpc_pb2.ExternalTable(name='exts', data=b'4\tDaniel\n5\tEthan\n')]) stub = clickhouse_grpc_pb2_grpc.ClickHouseStub(main_channel) result = stub.ExecuteQueryWithStreamInput(send_query_info()) - assert result.output == "1\tAlex\n"\ - "2\tBen\n"\ - "3\tCarl\n"\ - "4\tDaniel\n"\ - "5\tEthan\n" + assert result.output == b'1\tAlex\n'\ + b'2\tBen\n'\ + b'3\tCarl\n'\ + b'4\tDaniel\n'\ + b'5\tEthan\n' def test_simultaneous_queries_same_channel(): threads=[] @@ -325,8 +334,8 @@ def test_simultaneous_queries_multiple_channels(): def test_cancel_while_processing_input(): query("CREATE TABLE t (a UInt8) ENGINE = Memory") def send_query_info(): - yield clickhouse_grpc_pb2.QueryInfo(query="INSERT INTO t FORMAT TabSeparated", input_data="1\n2\n3\n", next_query_info=True) - yield clickhouse_grpc_pb2.QueryInfo(input_data="4\n5\n6\n", next_query_info=True) + yield clickhouse_grpc_pb2.QueryInfo(query="INSERT INTO t FORMAT TabSeparated", input_data=b'1\n2\n3\n', next_query_info=True) + yield clickhouse_grpc_pb2.QueryInfo(input_data=b'4\n5\n6\n', next_query_info=True) yield clickhouse_grpc_pb2.QueryInfo(cancel=True) stub = clickhouse_grpc_pb2_grpc.ClickHouseStub(main_channel) result = stub.ExecuteQueryWithStreamInput(send_query_info()) @@ -343,7 +352,7 @@ def test_cancel_while_generating_output(): results = list(stub.ExecuteQueryWithStreamIO(send_query_info())) assert len(results) >= 1 assert results[-1].cancelled == True - output = '' + output = b'' for result in results: output += result.output - assert output == '0\t0\n1\t0\n2\t0\n3\t0\n' + assert output == b'0\t0\n1\t0\n2\t0\n3\t0\n' diff --git a/tests/integration/test_grpc_protocol_ssl/test.py b/tests/integration/test_grpc_protocol_ssl/test.py index c040ccd041f..1f21fbe5f8a 100644 --- a/tests/integration/test_grpc_protocol_ssl/test.py +++ b/tests/integration/test_grpc_protocol_ssl/test.py @@ -4,7 +4,10 @@ import sys import grpc from helpers.cluster import ClickHouseCluster, run_and_check +GRPC_PORT = 9100 +NODE_IP = '10.5.172.77' # It's important for the node to work at this IP because 'server-cert.pem' requires that (see server-ext.cnf). SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__)) +DEFAULT_ENCODING = 'utf-8' # Use grpcio-tools to generate *pb2.py files from *.proto. @@ -23,12 +26,10 @@ import clickhouse_grpc_pb2_grpc # Utilities -node_ip = '10.5.172.77' # It's important for the node to work at this IP because 'server-cert.pem' requires that (see server-ext.cnf). -grpc_port = 9100 -node_ip_with_grpc_port = node_ip + ':' + str(grpc_port) +node_ip_with_grpc_port = NODE_IP + ':' + str(GRPC_PORT) config_dir = os.path.join(SCRIPT_DIR, './configs') cluster = ClickHouseCluster(__file__) -node = cluster.add_instance('node', ipv4_address=node_ip, main_configs=['configs/grpc_config.xml', 'configs/server-key.pem', 'configs/server-cert.pem', 'configs/ca-cert.pem']) +node = cluster.add_instance('node', ipv4_address=NODE_IP, main_configs=['configs/grpc_config.xml', 'configs/server-key.pem', 'configs/server-cert.pem', 'configs/ca-cert.pem']) def create_secure_channel(): ca_cert = open(os.path.join(config_dir, 'ca-cert.pem'), 'rb').read() @@ -59,7 +60,7 @@ def query(query_text, channel): result = stub.ExecuteQuery(query_info) if result and result.HasField('exception'): raise Exception(result.exception.display_text) - return result.output + return result.output.decode(DEFAULT_ENCODING) @pytest.fixture(scope="module", autouse=True) def start_cluster(): diff --git a/utils/grpc-client/clickhouse-grpc-client.py b/utils/grpc-client/clickhouse-grpc-client.py index 19d213a8e3f..dfaa7ed4e01 100755 --- a/utils/grpc-client/clickhouse-grpc-client.py +++ b/utils/grpc-client/clickhouse-grpc-client.py @@ -14,13 +14,14 @@ import grpc # pip3 install grpcio import grpc_tools # pip3 install grpcio-tools import argparse, cmd, os, signal, subprocess, sys, threading, time, uuid -default_host = 'localhost' -default_port = 9100 -default_user_name = 'default' -default_output_format_for_interactive_mode = 'PrettyCompact' -history_filename = '~/.clickhouse_grpc_history' -history_size = 1000 -stdin_bufsize = 1048576 +DEFAULT_HOST = 'localhost' +DEFAULT_PORT = 9100 +DEFAULT_USER_NAME = 'default' +DEFAULT_OUTPUT_FORMAT_FOR_INTERACTIVE_MODE = 'PrettyCompact' +HISTORY_FILENAME = '~/.clickhouse_grpc_history' +HISTORY_SIZE = 1000 +STDIN_BUFFER_SIZE = 1048576 +DEFAULT_ENCODING = 'utf-8' class ClickHouseGRPCError(Exception): @@ -52,7 +53,7 @@ def error_print(*args, **kwargs): class ClickHouseGRPCClient(cmd.Cmd): prompt="grpc :) " - def __init__(self, host=default_host, port=default_port, user_name=default_user_name, password='', + def __init__(self, host=DEFAULT_HOST, port=DEFAULT_PORT, user_name=DEFAULT_USER_NAME, password='', database='', output_format='', settings='', verbatim=False, show_debug_info=False): super(ClickHouseGRPCClient, self).__init__(completekey=None) self.host = host @@ -85,7 +86,7 @@ class ClickHouseGRPCClient(cmd.Cmd): if self.show_debug_info: print('\nresult={}'.format(result)) ClickHouseGRPCClient.__check_no_errors(result) - return result.output + return result.output.decode(DEFAULT_ENCODING) # Executes a query using the stream IO and with ability to cancel it by pressing Ctrl+C. def run_query(self, query_text, raise_exceptions=True, allow_cancel=False): @@ -117,7 +118,7 @@ class ClickHouseGRPCClient(cmd.Cmd): # send input data if not sys.stdin.isatty(): while True: - info.input_data = sys.stdin.buffer.read(stdin_bufsize) + info.input_data = sys.stdin.buffer.read(STDIN_BUFFER_SIZE) if not info.input_data: break info.next_query_info = True @@ -134,7 +135,8 @@ class ClickHouseGRPCClient(cmd.Cmd): if self.show_debug_info: print('\nresult={}'.format(result)) ClickHouseGRPCClient.__check_no_errors(result) - print(result.output, end='') + sys.stdout.buffer.write(result.output) + sys.stdout.flush() if result.cancelled: cancelled = True self.verbatim_print("Query was cancelled.") @@ -244,7 +246,7 @@ class ClickHouseGRPCClient(cmd.Cmd): import readline except ImportError: readline = None - histfile = os.path.expanduser(history_filename) + histfile = os.path.expanduser(HISTORY_FILENAME) if readline and os.path.exists(histfile): readline.read_history_file(histfile) @@ -252,8 +254,8 @@ class ClickHouseGRPCClient(cmd.Cmd): def __write_history(): global readline if readline: - readline.set_history_length(history_size) - histfile = os.path.expanduser(history_filename) + readline.set_history_length(HISTORY_SIZE) + histfile = os.path.expanduser(HISTORY_FILENAME) readline.write_history_file(histfile) @@ -281,7 +283,7 @@ def main(args): output_format = args.output_format if not output_format and interactive_mode: - output_format = default_output_format_for_interactive_mode + output_format = DEFAULT_OUTPUT_FORMAT_FOR_INTERACTIVE_MODE try: with ClickHouseGRPCClient(host=args.host, port=args.port, user_name=args.user_name, password=args.password,