mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 08:32:02 +00:00
Implemented clickhouse grpc client.
This commit is contained in:
parent
950bccb130
commit
1bd19bb124
1
utils/grpc-client/.gitignore
vendored
Normal file
1
utils/grpc-client/.gitignore
vendored
Normal file
@ -0,0 +1 @@
|
||||
_gen
|
301
utils/grpc-client/clickhouse-grpc-client.py
Executable file
301
utils/grpc-client/clickhouse-grpc-client.py
Executable file
@ -0,0 +1,301 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# This utility provides similar interface to clickhouse-client.
|
||||
# This utility also can work in two modes: interactive and non-interactive (batch).
|
||||
#
|
||||
# For example,
|
||||
# ./clickhouse_grpc_client.py - runs interactive mode; and
|
||||
# ./clickhouse_grpc_client.py -u John -q "SELECT * FROM mytable" - runs only a specified query from the user John.
|
||||
#
|
||||
# Most of the command line options are the same, for more information type
|
||||
# ./clickhouse_grpc_client.py --help
|
||||
|
||||
import argparse, cmd, os, signal, subprocess, sys, threading, time, uuid, grpc
|
||||
|
||||
default_host = 'localhost'
|
||||
default_port = 9001
|
||||
default_user_name = 'default'
|
||||
default_output_format_for_interactive_mode = 'PrettyCompact'
|
||||
history_filename = '~/.clickhouse_grpc_history'
|
||||
history_size = 1000
|
||||
stdin_bufsize = 1048576
|
||||
|
||||
|
||||
class ClickHouseGRPCError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
# Temporary override reaction on Ctrl+C.
|
||||
class KeyboardInterruptHandlerOverride:
|
||||
# If `handler` return True that means pressing Ctrl+C has been handled, no need to call previous handler.
|
||||
def __init__(self, handler):
|
||||
self.handler = handler
|
||||
|
||||
def __enter__(self):
|
||||
self.previous_handler = signal.signal(signal.SIGINT, self.__handler)
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
signal.signal(signal.SIGINT, self.previous_handler)
|
||||
|
||||
def __handler(self, signum, frame):
|
||||
if not self.handler():
|
||||
self.previous_handler(signum, frame)
|
||||
|
||||
|
||||
# Print to stderr
|
||||
def error_print(*args, **kwargs):
|
||||
print(*args, file=sys.stderr, **kwargs)
|
||||
|
||||
|
||||
class ClickHouseGRPCClient(cmd.Cmd):
|
||||
prompt="grpc :) "
|
||||
|
||||
def __init__(self, host=default_host, port=default_port, user_name=default_user_name, password='',
|
||||
database='', output_format='', settings='', verbatim=False, show_debug_info=False):
|
||||
super(ClickHouseGRPCClient, self).__init__(completekey=None)
|
||||
self.host = host
|
||||
self.port = port
|
||||
self.user_name = user_name
|
||||
self.password = password
|
||||
self.database = database
|
||||
self.output_format = output_format
|
||||
self.settings = settings
|
||||
self.verbatim = verbatim
|
||||
self.show_debug_info = show_debug_info
|
||||
self.channel = None
|
||||
self.stub = None
|
||||
self.session_id = None
|
||||
|
||||
def __enter__(self):
|
||||
ClickHouseGRPCClient.__generate_pb2()
|
||||
ClickHouseGRPCClient.__import_pb2()
|
||||
self.__connect()
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
self.__disconnect()
|
||||
|
||||
# Executes a simple query and returns its output.
|
||||
def get_simple_query_output(self, query_text):
|
||||
result = self.stub.ExecuteQuery(clickhouse_grpc_pb2.QueryInfo(query=query_text, user_name=self.user_name, password=self.password,
|
||||
database=self.database, output_format='TabSeparated', settings=self.settings,
|
||||
session_id=self.session_id, query_id=str(uuid.uuid4())))
|
||||
if self.show_debug_info:
|
||||
print('\nresult={}'.format(result))
|
||||
ClickHouseGRPCClient.__check_no_errors(result)
|
||||
return result.output
|
||||
|
||||
# Executes a query using the stream IO and with ability to cancel it by pressing Ctrl+C.
|
||||
def run_query(self, query_text, raise_exceptions=True, allow_cancel=False):
|
||||
start_time = time.time()
|
||||
cancelled = False
|
||||
cancel_tries = 0
|
||||
cancel_event = threading.Event()
|
||||
|
||||
def keyboard_interrupt_handler():
|
||||
if allow_cancel:
|
||||
nonlocal cancel_tries
|
||||
cancel_tries = cancel_tries + 1
|
||||
if cancel_tries < 3:
|
||||
self.verbatim_print("Cancelling query.")
|
||||
if cancel_tries == 1:
|
||||
cancel_event.set()
|
||||
return True
|
||||
# third attempt to cancel - we use previous handler which will terminate the client.
|
||||
self.verbatim_print("Couldn't cancel the query, terminating.")
|
||||
return False
|
||||
|
||||
with KeyboardInterruptHandlerOverride(keyboard_interrupt_handler):
|
||||
try:
|
||||
def send_query_info():
|
||||
# send main query info
|
||||
info = clickhouse_grpc_pb2.QueryInfo(query=query_text, user_name=self.user_name, password=self.password,
|
||||
database=self.database, output_format=self.output_format, settings=self.settings,
|
||||
session_id=self.session_id, query_id=str(uuid.uuid4()))
|
||||
# send input data
|
||||
if not sys.stdin.isatty():
|
||||
while True:
|
||||
info.input_data = sys.stdin.buffer.read(stdin_bufsize)
|
||||
if not info.input_data:
|
||||
break
|
||||
info.next_query_info = True
|
||||
yield info
|
||||
info = clickhouse_grpc_pb2.QueryInfo()
|
||||
yield info
|
||||
# wait for possible cancel
|
||||
if allow_cancel:
|
||||
cancel_event.wait()
|
||||
if cancel_tries > 0:
|
||||
yield clickhouse_grpc_pb2.QueryInfo(cancel=True)
|
||||
|
||||
for result in self.stub.ExecuteQueryWithStreamIO(send_query_info()):
|
||||
if self.show_debug_info:
|
||||
print('\nresult={}'.format(result))
|
||||
ClickHouseGRPCClient.__check_no_errors(result)
|
||||
print(result.output, end='')
|
||||
if result.cancelled:
|
||||
cancelled = True
|
||||
self.verbatim_print("Query was cancelled.")
|
||||
|
||||
cancel_event.set()
|
||||
if not cancelled:
|
||||
execution_time = time.time() - start_time
|
||||
self.verbatim_print('\nElapsed: {execution_time} sec.\n'.format(execution_time=execution_time))
|
||||
|
||||
except Exception as e:
|
||||
if raise_exceptions:
|
||||
raise
|
||||
error_print(e)
|
||||
|
||||
# Establish connection.
|
||||
def __connect(self):
|
||||
self.verbatim_print("Connecting to {host}:{port} as user {user_name}.".format(host=self.host, port=self.port, user_name=self.user_name))
|
||||
# Secure channels are supported by server but not supported by this client.
|
||||
start_time = time.time()
|
||||
self.channel = grpc.insecure_channel(self.host + ':' + str(self.port))
|
||||
connection_time = 0
|
||||
timeout=5
|
||||
while True:
|
||||
try:
|
||||
grpc.channel_ready_future(self.channel).result(timeout=timeout)
|
||||
break;
|
||||
except grpc.FutureTimeoutError:
|
||||
connection_time += timeout
|
||||
self.verbatim_print("Couldn't connect to ClickHouse server in {connection_time} seconds.".format(connection_time=connection_time))
|
||||
self.stub = clickhouse_grpc_pb2_grpc.ClickHouseStub(self.channel)
|
||||
connection_time = time.time() - start_time
|
||||
if self.verbatim:
|
||||
version = self.get_simple_query_output("SELECT version() FORMAT TabSeparated").rstrip('\n')
|
||||
self.verbatim_print("Connected to ClickHouse server version {version} via gRPC protocol in {connection_time}.".format(version=version, connection_time=connection_time))
|
||||
|
||||
def __disconnect(self):
|
||||
if self.channel:
|
||||
self.channel.close()
|
||||
self.channel = None
|
||||
self.stub = None
|
||||
self.session_id = None
|
||||
|
||||
@staticmethod
|
||||
def __check_no_errors(result):
|
||||
if result.HasField('exception'):
|
||||
raise ClickHouseGRPCError(result.exception.display_text)
|
||||
|
||||
# Use grpcio-tools to generate *pb2.py files from *.proto.
|
||||
@staticmethod
|
||||
def __generate_pb2():
|
||||
script_dir = os.path.dirname(os.path.realpath(__file__))
|
||||
proto_dir = os.path.join(script_dir, './protos')
|
||||
gen_dir = os.path.join(script_dir, './_gen')
|
||||
if os.path.exists(os.path.join(gen_dir, 'clickhouse_grpc_pb2_grpc.py')):
|
||||
return
|
||||
os.makedirs(gen_dir, exist_ok=True)
|
||||
cmd = ['python3', '-m', 'grpc_tools.protoc', '-I'+proto_dir, '--python_out='+gen_dir, '--grpc_python_out='+gen_dir,
|
||||
proto_dir+'/clickhouse_grpc.proto']
|
||||
p = subprocess.Popen(cmd, stderr=subprocess.PIPE)
|
||||
# We don't want to show grpc_tools warnings.
|
||||
errors = p.stderr.read().decode().strip('\n').split('\n')
|
||||
only_warnings = all(('Warning' in error) for error in errors)
|
||||
if not only_warnings:
|
||||
error_print(errors.join('\n'))
|
||||
|
||||
# Import the generated *pb2.py files.
|
||||
@staticmethod
|
||||
def __import_pb2():
|
||||
script_dir = os.path.dirname(os.path.realpath(__file__))
|
||||
gen_dir = os.path.join(script_dir, './_gen')
|
||||
sys.path.append(gen_dir)
|
||||
global clickhouse_grpc_pb2, clickhouse_grpc_pb2_grpc
|
||||
import clickhouse_grpc_pb2, clickhouse_grpc_pb2_grpc
|
||||
|
||||
# Prints only if interactive mode is activated.
|
||||
def verbatim_print(self, *args, **kwargs):
|
||||
if self.verbatim:
|
||||
print(*args, **kwargs)
|
||||
|
||||
# Overrides Cmd.preloop(). Executed once when cmdloop() is called.
|
||||
def preloop(self):
|
||||
super(ClickHouseGRPCClient, self).preloop()
|
||||
ClickHouseGRPCClient.__read_history()
|
||||
# we use session for interactive mode
|
||||
self.session_id = str(uuid.uuid4())
|
||||
|
||||
# Overrides Cmd.postloop(). Executed once when cmdloop() is about to return.
|
||||
def postloop(self):
|
||||
super(ClickHouseGRPCClient, self).postloop()
|
||||
ClickHouseGRPCClient.__write_history()
|
||||
|
||||
# Overrides Cmd.onecmd(). Runs single command.
|
||||
def onecmd(self, line):
|
||||
stripped = line.strip()
|
||||
if stripped == 'exit' or stripped == 'quit':
|
||||
return True
|
||||
if stripped == '':
|
||||
return False
|
||||
self.run_query(line, raise_exceptions=False, allow_cancel=True)
|
||||
return False
|
||||
|
||||
# Enables history of commands for interactive mode.
|
||||
@staticmethod
|
||||
def __read_history():
|
||||
global readline
|
||||
try:
|
||||
import readline
|
||||
except ImportError:
|
||||
readline = None
|
||||
histfile = os.path.expanduser(history_filename)
|
||||
if readline and os.path.exists(histfile):
|
||||
readline.read_history_file(histfile)
|
||||
|
||||
@staticmethod
|
||||
def __write_history():
|
||||
global readline
|
||||
if readline:
|
||||
readline.set_history_length(history_size)
|
||||
histfile = os.path.expanduser(history_filename)
|
||||
readline.write_history_file(histfile)
|
||||
|
||||
|
||||
# MAIN
|
||||
|
||||
def main(args):
|
||||
parser = argparse.ArgumentParser(description='ClickHouse client accessing server through gRPC protocol.', add_help=False)
|
||||
parser.add_argument('--help', help='Show this help message and exit', action='store_true')
|
||||
parser.add_argument('--host', '-h', help='The server name, ‘localhost’ by default. You can use either the name or the IPv4 or IPv6 address.', default='localhost')
|
||||
parser.add_argument('--port', help='The port to connect to. This port should be enabled on the ClickHouse server (see grpc_port in the config).', default=9001)
|
||||
parser.add_argument('--user', '-u', dest='user_name', help='The username. Default value: ‘default’.', default='default')
|
||||
parser.add_argument('--password', help='The password. Default value: empty string.', default='')
|
||||
parser.add_argument('--query', '-q', help='The query to process when using non-interactive mode.', default='')
|
||||
parser.add_argument('--database', '-d', help='Select the current default database. Default value: the current database from the server settings (‘default’ by default).', default='')
|
||||
parser.add_argument('--format', '-f', dest='output_format', help='Use the specified default format to output the result.', default='')
|
||||
parser.add_argument('--debug', dest='show_debug_info', help='Enables showing the debug information.', action='store_true')
|
||||
args = parser.parse_args(args)
|
||||
|
||||
if args.help:
|
||||
parser.print_help()
|
||||
sys.exit(0)
|
||||
|
||||
interactive_mode = not args.query
|
||||
verbatim = interactive_mode
|
||||
|
||||
output_format = args.output_format
|
||||
if not output_format and interactive_mode:
|
||||
output_format = default_output_format_for_interactive_mode
|
||||
|
||||
try:
|
||||
with ClickHouseGRPCClient(host=args.host, port=args.port, user_name=args.user_name, password=args.password,
|
||||
database=args.database, output_format=output_format, verbatim=verbatim,
|
||||
show_debug_info=args.show_debug_info) as client:
|
||||
if interactive_mode:
|
||||
client.cmdloop()
|
||||
else:
|
||||
client.run_query(args.query)
|
||||
except KeyboardInterrupt:
|
||||
pass
|
||||
except Exception as e:
|
||||
error_print(e)
|
||||
|
||||
if verbatim:
|
||||
print("\nBye")
|
||||
|
||||
if __name__ == '__main__':
|
||||
main(sys.argv[1:])
|
1
utils/grpc-client/protos/clickhouse_grpc.proto
Symbolic link
1
utils/grpc-client/protos/clickhouse_grpc.proto
Symbolic link
@ -0,0 +1 @@
|
||||
../../../src/Server/grpc_protos/clickhouse_grpc.proto
|
Loading…
Reference in New Issue
Block a user