mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-10 09:32:06 +00:00
Merge pull request #27431 from vitlibar/grpc-protocol-bytes-instead-of-strings
Use bytes instead of strings in the GRPC protocol.
This commit is contained in:
commit
5a7fe51532
@ -32,7 +32,7 @@ message ExternalTable {
|
||||
// Data to insert to the external table.
|
||||
// If a method with streaming input (i.e. ExecuteQueryWithStreamInput() or ExecuteQueryWithStreamIO()) is used,
|
||||
// then data for insertion to the same external table can be split between multiple QueryInfos.
|
||||
string data = 3;
|
||||
bytes data = 3;
|
||||
|
||||
// Format of the data to insert to the external table.
|
||||
string format = 4;
|
||||
@ -53,10 +53,10 @@ message QueryInfo {
|
||||
string database = 4;
|
||||
|
||||
// Input data, used both as data for INSERT query and as data for the input() function.
|
||||
string input_data = 5;
|
||||
bytes input_data = 5;
|
||||
|
||||
// Delimiter for input_data, inserted between input_data from adjacent QueryInfos.
|
||||
string input_data_delimiter = 6;
|
||||
bytes input_data_delimiter = 6;
|
||||
|
||||
// Default output format. If not specified, 'TabSeparated' is used.
|
||||
string output_format = 7;
|
||||
@ -128,9 +128,9 @@ message Exception {
|
||||
// Result of execution of a query which is sent back by the ClickHouse server to the client.
|
||||
message Result {
|
||||
// Output of the query, represented in the `output_format` or in a format specified in `query`.
|
||||
string output = 1;
|
||||
string totals = 2;
|
||||
string extremes = 3;
|
||||
bytes output = 1;
|
||||
bytes totals = 2;
|
||||
bytes extremes = 3;
|
||||
|
||||
repeated LogEntry logs = 4;
|
||||
Progress progress = 5;
|
||||
|
@ -6,7 +6,9 @@ import grpc
|
||||
from helpers.cluster import ClickHouseCluster, run_and_check
|
||||
from threading import Thread
|
||||
|
||||
GRPC_PORT = 9100
|
||||
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
|
||||
DEFAULT_ENCODING = 'utf-8'
|
||||
|
||||
|
||||
# Use grpcio-tools to generate *pb2.py files from *.proto.
|
||||
@ -28,11 +30,10 @@ import clickhouse_grpc_pb2_grpc
|
||||
config_dir = os.path.join(SCRIPT_DIR, './configs')
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
node = cluster.add_instance('node', main_configs=['configs/grpc_config.xml'])
|
||||
grpc_port = 9100
|
||||
main_channel = None
|
||||
|
||||
def create_channel():
|
||||
node_ip_with_grpc_port = cluster.get_instance_ip('node') + ':' + str(grpc_port)
|
||||
node_ip_with_grpc_port = cluster.get_instance_ip('node') + ':' + str(GRPC_PORT)
|
||||
channel = grpc.insecure_channel(node_ip_with_grpc_port)
|
||||
grpc.channel_ready_future(channel).result(timeout=10)
|
||||
global main_channel
|
||||
@ -42,20 +43,27 @@ def create_channel():
|
||||
|
||||
def query_common(query_text, settings={}, input_data=[], input_data_delimiter='', output_format='TabSeparated', external_tables=[],
|
||||
user_name='', password='', query_id='123', session_id='', stream_output=False, channel=None):
|
||||
if type(input_data) == str:
|
||||
if type(input_data) is not list:
|
||||
input_data = [input_data]
|
||||
if type(input_data_delimiter) is str:
|
||||
input_data_delimiter=input_data_delimiter.encode(DEFAULT_ENCODING)
|
||||
if not channel:
|
||||
channel = main_channel
|
||||
stub = clickhouse_grpc_pb2_grpc.ClickHouseStub(channel)
|
||||
def query_info():
|
||||
input_data_part = input_data.pop(0) if input_data else ''
|
||||
return clickhouse_grpc_pb2.QueryInfo(query=query_text, settings=settings, input_data=input_data_part, input_data_delimiter=input_data_delimiter,
|
||||
output_format=output_format, external_tables=external_tables, user_name=user_name, password=password,
|
||||
query_id=query_id, session_id=session_id, next_query_info=bool(input_data))
|
||||
input_data_part = input_data.pop(0) if input_data else b''
|
||||
if type(input_data_part) is str:
|
||||
input_data_part = input_data_part.encode(DEFAULT_ENCODING)
|
||||
return clickhouse_grpc_pb2.QueryInfo(query=query_text, settings=settings, input_data=input_data_part,
|
||||
input_data_delimiter=input_data_delimiter, output_format=output_format,
|
||||
external_tables=external_tables, user_name=user_name, password=password, query_id=query_id,
|
||||
session_id=session_id, next_query_info=bool(input_data))
|
||||
def send_query_info():
|
||||
yield query_info()
|
||||
while input_data:
|
||||
input_data_part = input_data.pop(0)
|
||||
if type(input_data_part) is str:
|
||||
input_data_part = input_data_part.encode(DEFAULT_ENCODING)
|
||||
yield clickhouse_grpc_pb2.QueryInfo(input_data=input_data_part, next_query_info=bool(input_data))
|
||||
stream_input = len(input_data) > 1
|
||||
if stream_input and stream_output:
|
||||
@ -74,10 +82,10 @@ def query_no_errors(*args, **kwargs):
|
||||
return results
|
||||
|
||||
def query(*args, **kwargs):
|
||||
output = ""
|
||||
output = b''
|
||||
for result in query_no_errors(*args, **kwargs):
|
||||
output += result.output
|
||||
return output
|
||||
return output.decode(DEFAULT_ENCODING)
|
||||
|
||||
def query_and_get_error(*args, **kwargs):
|
||||
results = query_common(*args, **kwargs)
|
||||
@ -86,16 +94,16 @@ def query_and_get_error(*args, **kwargs):
|
||||
return results[-1].exception
|
||||
|
||||
def query_and_get_totals(*args, **kwargs):
|
||||
totals = ""
|
||||
totals = b''
|
||||
for result in query_no_errors(*args, **kwargs):
|
||||
totals += result.totals
|
||||
return totals
|
||||
return totals.decode(DEFAULT_ENCODING)
|
||||
|
||||
def query_and_get_extremes(*args, **kwargs):
|
||||
extremes = ""
|
||||
extremes = b''
|
||||
for result in query_no_errors(*args, **kwargs):
|
||||
extremes += result.extremes
|
||||
return extremes
|
||||
return extremes.decode(DEFAULT_ENCODING)
|
||||
|
||||
def query_and_get_logs(*args, **kwargs):
|
||||
logs = ""
|
||||
@ -135,6 +143,7 @@ def reset_after_test():
|
||||
yield
|
||||
node.query_with_retry("DROP TABLE IF EXISTS t")
|
||||
|
||||
|
||||
# Actual tests
|
||||
|
||||
def test_select_one():
|
||||
@ -270,18 +279,18 @@ def test_input_function():
|
||||
|
||||
def test_external_table():
|
||||
columns = [clickhouse_grpc_pb2.NameAndType(name='UserID', type='UInt64'), clickhouse_grpc_pb2.NameAndType(name='UserName', type='String')]
|
||||
ext1 = clickhouse_grpc_pb2.ExternalTable(name='ext1', columns=columns, data='1\tAlex\n2\tBen\n3\tCarl\n', format='TabSeparated')
|
||||
ext1 = clickhouse_grpc_pb2.ExternalTable(name='ext1', columns=columns, data=b'1\tAlex\n2\tBen\n3\tCarl\n', format='TabSeparated')
|
||||
assert query("SELECT * FROM ext1 ORDER BY UserID", external_tables=[ext1]) == "1\tAlex\n"\
|
||||
"2\tBen\n"\
|
||||
"3\tCarl\n"
|
||||
ext2 = clickhouse_grpc_pb2.ExternalTable(name='ext2', columns=columns, data='4,Daniel\n5,Ethan\n', format='CSV')
|
||||
ext2 = clickhouse_grpc_pb2.ExternalTable(name='ext2', columns=columns, data=b'4,Daniel\n5,Ethan\n', format='CSV')
|
||||
assert query("SELECT * FROM (SELECT * FROM ext1 UNION ALL SELECT * FROM ext2) ORDER BY UserID", external_tables=[ext1, ext2]) == "1\tAlex\n"\
|
||||
"2\tBen\n"\
|
||||
"3\tCarl\n"\
|
||||
"4\tDaniel\n"\
|
||||
"5\tEthan\n"
|
||||
unnamed_columns = [clickhouse_grpc_pb2.NameAndType(type='UInt64'), clickhouse_grpc_pb2.NameAndType(type='String')]
|
||||
unnamed_table = clickhouse_grpc_pb2.ExternalTable(columns=unnamed_columns, data='6\tGeorge\n7\tFred\n')
|
||||
unnamed_table = clickhouse_grpc_pb2.ExternalTable(columns=unnamed_columns, data=b'6\tGeorge\n7\tFred\n')
|
||||
assert query("SELECT * FROM _data ORDER BY _2", external_tables=[unnamed_table]) == "7\tFred\n"\
|
||||
"6\tGeorge\n"
|
||||
|
||||
@ -289,16 +298,16 @@ def test_external_table_streaming():
|
||||
columns = [clickhouse_grpc_pb2.NameAndType(name='UserID', type='UInt64'), clickhouse_grpc_pb2.NameAndType(name='UserName', type='String')]
|
||||
def send_query_info():
|
||||
yield clickhouse_grpc_pb2.QueryInfo(query="SELECT * FROM exts ORDER BY UserID",
|
||||
external_tables=[clickhouse_grpc_pb2.ExternalTable(name='exts', columns=columns, data='1\tAlex\n2\tBen\n3\tCarl\n')],
|
||||
external_tables=[clickhouse_grpc_pb2.ExternalTable(name='exts', columns=columns, data=b'1\tAlex\n2\tBen\n3\tCarl\n')],
|
||||
next_query_info=True)
|
||||
yield clickhouse_grpc_pb2.QueryInfo(external_tables=[clickhouse_grpc_pb2.ExternalTable(name='exts', data='4\tDaniel\n5\tEthan\n')])
|
||||
yield clickhouse_grpc_pb2.QueryInfo(external_tables=[clickhouse_grpc_pb2.ExternalTable(name='exts', data=b'4\tDaniel\n5\tEthan\n')])
|
||||
stub = clickhouse_grpc_pb2_grpc.ClickHouseStub(main_channel)
|
||||
result = stub.ExecuteQueryWithStreamInput(send_query_info())
|
||||
assert result.output == "1\tAlex\n"\
|
||||
"2\tBen\n"\
|
||||
"3\tCarl\n"\
|
||||
"4\tDaniel\n"\
|
||||
"5\tEthan\n"
|
||||
assert result.output == b'1\tAlex\n'\
|
||||
b'2\tBen\n'\
|
||||
b'3\tCarl\n'\
|
||||
b'4\tDaniel\n'\
|
||||
b'5\tEthan\n'
|
||||
|
||||
def test_simultaneous_queries_same_channel():
|
||||
threads=[]
|
||||
@ -325,8 +334,8 @@ def test_simultaneous_queries_multiple_channels():
|
||||
def test_cancel_while_processing_input():
|
||||
query("CREATE TABLE t (a UInt8) ENGINE = Memory")
|
||||
def send_query_info():
|
||||
yield clickhouse_grpc_pb2.QueryInfo(query="INSERT INTO t FORMAT TabSeparated", input_data="1\n2\n3\n", next_query_info=True)
|
||||
yield clickhouse_grpc_pb2.QueryInfo(input_data="4\n5\n6\n", next_query_info=True)
|
||||
yield clickhouse_grpc_pb2.QueryInfo(query="INSERT INTO t FORMAT TabSeparated", input_data=b'1\n2\n3\n', next_query_info=True)
|
||||
yield clickhouse_grpc_pb2.QueryInfo(input_data=b'4\n5\n6\n', next_query_info=True)
|
||||
yield clickhouse_grpc_pb2.QueryInfo(cancel=True)
|
||||
stub = clickhouse_grpc_pb2_grpc.ClickHouseStub(main_channel)
|
||||
result = stub.ExecuteQueryWithStreamInput(send_query_info())
|
||||
@ -343,7 +352,7 @@ def test_cancel_while_generating_output():
|
||||
results = list(stub.ExecuteQueryWithStreamIO(send_query_info()))
|
||||
assert len(results) >= 1
|
||||
assert results[-1].cancelled == True
|
||||
output = ''
|
||||
output = b''
|
||||
for result in results:
|
||||
output += result.output
|
||||
assert output == '0\t0\n1\t0\n2\t0\n3\t0\n'
|
||||
assert output == b'0\t0\n1\t0\n2\t0\n3\t0\n'
|
||||
|
@ -4,7 +4,10 @@ import sys
|
||||
import grpc
|
||||
from helpers.cluster import ClickHouseCluster, run_and_check
|
||||
|
||||
GRPC_PORT = 9100
|
||||
NODE_IP = '10.5.172.77' # It's important for the node to work at this IP because 'server-cert.pem' requires that (see server-ext.cnf).
|
||||
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
|
||||
DEFAULT_ENCODING = 'utf-8'
|
||||
|
||||
|
||||
# Use grpcio-tools to generate *pb2.py files from *.proto.
|
||||
@ -23,12 +26,10 @@ import clickhouse_grpc_pb2_grpc
|
||||
|
||||
# Utilities
|
||||
|
||||
node_ip = '10.5.172.77' # It's important for the node to work at this IP because 'server-cert.pem' requires that (see server-ext.cnf).
|
||||
grpc_port = 9100
|
||||
node_ip_with_grpc_port = node_ip + ':' + str(grpc_port)
|
||||
node_ip_with_grpc_port = NODE_IP + ':' + str(GRPC_PORT)
|
||||
config_dir = os.path.join(SCRIPT_DIR, './configs')
|
||||
cluster = ClickHouseCluster(__file__)
|
||||
node = cluster.add_instance('node', ipv4_address=node_ip, main_configs=['configs/grpc_config.xml', 'configs/server-key.pem', 'configs/server-cert.pem', 'configs/ca-cert.pem'])
|
||||
node = cluster.add_instance('node', ipv4_address=NODE_IP, main_configs=['configs/grpc_config.xml', 'configs/server-key.pem', 'configs/server-cert.pem', 'configs/ca-cert.pem'])
|
||||
|
||||
def create_secure_channel():
|
||||
ca_cert = open(os.path.join(config_dir, 'ca-cert.pem'), 'rb').read()
|
||||
@ -59,7 +60,7 @@ def query(query_text, channel):
|
||||
result = stub.ExecuteQuery(query_info)
|
||||
if result and result.HasField('exception'):
|
||||
raise Exception(result.exception.display_text)
|
||||
return result.output
|
||||
return result.output.decode(DEFAULT_ENCODING)
|
||||
|
||||
@pytest.fixture(scope="module", autouse=True)
|
||||
def start_cluster():
|
||||
|
@ -14,13 +14,14 @@ import grpc # pip3 install grpcio
|
||||
import grpc_tools # pip3 install grpcio-tools
|
||||
import argparse, cmd, os, signal, subprocess, sys, threading, time, uuid
|
||||
|
||||
default_host = 'localhost'
|
||||
default_port = 9100
|
||||
default_user_name = 'default'
|
||||
default_output_format_for_interactive_mode = 'PrettyCompact'
|
||||
history_filename = '~/.clickhouse_grpc_history'
|
||||
history_size = 1000
|
||||
stdin_bufsize = 1048576
|
||||
DEFAULT_HOST = 'localhost'
|
||||
DEFAULT_PORT = 9100
|
||||
DEFAULT_USER_NAME = 'default'
|
||||
DEFAULT_OUTPUT_FORMAT_FOR_INTERACTIVE_MODE = 'PrettyCompact'
|
||||
HISTORY_FILENAME = '~/.clickhouse_grpc_history'
|
||||
HISTORY_SIZE = 1000
|
||||
STDIN_BUFFER_SIZE = 1048576
|
||||
DEFAULT_ENCODING = 'utf-8'
|
||||
|
||||
|
||||
class ClickHouseGRPCError(Exception):
|
||||
@ -52,7 +53,7 @@ def error_print(*args, **kwargs):
|
||||
class ClickHouseGRPCClient(cmd.Cmd):
|
||||
prompt="grpc :) "
|
||||
|
||||
def __init__(self, host=default_host, port=default_port, user_name=default_user_name, password='',
|
||||
def __init__(self, host=DEFAULT_HOST, port=DEFAULT_PORT, user_name=DEFAULT_USER_NAME, password='',
|
||||
database='', output_format='', settings='', verbatim=False, show_debug_info=False):
|
||||
super(ClickHouseGRPCClient, self).__init__(completekey=None)
|
||||
self.host = host
|
||||
@ -85,7 +86,7 @@ class ClickHouseGRPCClient(cmd.Cmd):
|
||||
if self.show_debug_info:
|
||||
print('\nresult={}'.format(result))
|
||||
ClickHouseGRPCClient.__check_no_errors(result)
|
||||
return result.output
|
||||
return result.output.decode(DEFAULT_ENCODING)
|
||||
|
||||
# Executes a query using the stream IO and with ability to cancel it by pressing Ctrl+C.
|
||||
def run_query(self, query_text, raise_exceptions=True, allow_cancel=False):
|
||||
@ -117,7 +118,7 @@ class ClickHouseGRPCClient(cmd.Cmd):
|
||||
# send input data
|
||||
if not sys.stdin.isatty():
|
||||
while True:
|
||||
info.input_data = sys.stdin.buffer.read(stdin_bufsize)
|
||||
info.input_data = sys.stdin.buffer.read(STDIN_BUFFER_SIZE)
|
||||
if not info.input_data:
|
||||
break
|
||||
info.next_query_info = True
|
||||
@ -134,7 +135,8 @@ class ClickHouseGRPCClient(cmd.Cmd):
|
||||
if self.show_debug_info:
|
||||
print('\nresult={}'.format(result))
|
||||
ClickHouseGRPCClient.__check_no_errors(result)
|
||||
print(result.output, end='')
|
||||
sys.stdout.buffer.write(result.output)
|
||||
sys.stdout.flush()
|
||||
if result.cancelled:
|
||||
cancelled = True
|
||||
self.verbatim_print("Query was cancelled.")
|
||||
@ -244,7 +246,7 @@ class ClickHouseGRPCClient(cmd.Cmd):
|
||||
import readline
|
||||
except ImportError:
|
||||
readline = None
|
||||
histfile = os.path.expanduser(history_filename)
|
||||
histfile = os.path.expanduser(HISTORY_FILENAME)
|
||||
if readline and os.path.exists(histfile):
|
||||
readline.read_history_file(histfile)
|
||||
|
||||
@ -252,8 +254,8 @@ class ClickHouseGRPCClient(cmd.Cmd):
|
||||
def __write_history():
|
||||
global readline
|
||||
if readline:
|
||||
readline.set_history_length(history_size)
|
||||
histfile = os.path.expanduser(history_filename)
|
||||
readline.set_history_length(HISTORY_SIZE)
|
||||
histfile = os.path.expanduser(HISTORY_FILENAME)
|
||||
readline.write_history_file(histfile)
|
||||
|
||||
|
||||
@ -281,7 +283,7 @@ def main(args):
|
||||
|
||||
output_format = args.output_format
|
||||
if not output_format and interactive_mode:
|
||||
output_format = default_output_format_for_interactive_mode
|
||||
output_format = DEFAULT_OUTPUT_FORMAT_FOR_INTERACTIVE_MODE
|
||||
|
||||
try:
|
||||
with ClickHouseGRPCClient(host=args.host, port=args.port, user_name=args.user_name, password=args.password,
|
||||
|
Loading…
Reference in New Issue
Block a user