2020-11-05 14:17:13 +00:00
#!/usr/bin/env python3
# This utility provides similar interface to clickhouse-client.
# This utility also can work in two modes: interactive and non-interactive (batch).
#
# For example,
# ./clickhouse_grpc_client.py - runs interactive mode; and
# ./clickhouse_grpc_client.py -u John -q "SELECT * FROM mytable" - runs only a specified query from the user John.
#
# Most of the command line options are the same, for more information type
# ./clickhouse_grpc_client.py --help
2020-11-26 07:07:04 +00:00
import grpc # pip3 install grpcio
import grpc_tools # pip3 install grpcio-tools
import argparse , cmd , os , signal , subprocess , sys , threading , time , uuid
2020-11-05 14:17:13 +00:00
2021-08-06 10:55:49 +00:00
DEFAULT_HOST = ' localhost '
DEFAULT_PORT = 9100
DEFAULT_USER_NAME = ' default '
DEFAULT_OUTPUT_FORMAT_FOR_INTERACTIVE_MODE = ' PrettyCompact '
HISTORY_FILENAME = ' ~/.clickhouse_grpc_history '
HISTORY_SIZE = 1000
STDIN_BUFFER_SIZE = 1048576
DEFAULT_ENCODING = ' utf-8 '
2020-11-05 14:17:13 +00:00
class ClickHouseGRPCError ( Exception ) :
pass
# Temporary override reaction on Ctrl+C.
class KeyboardInterruptHandlerOverride :
# If `handler` return True that means pressing Ctrl+C has been handled, no need to call previous handler.
def __init__ ( self , handler ) :
self . handler = handler
def __enter__ ( self ) :
self . previous_handler = signal . signal ( signal . SIGINT , self . __handler )
def __exit__ ( self , exc_type , exc_value , traceback ) :
signal . signal ( signal . SIGINT , self . previous_handler )
def __handler ( self , signum , frame ) :
if not self . handler ( ) :
self . previous_handler ( signum , frame )
# Print to stderr
def error_print ( * args , * * kwargs ) :
print ( * args , file = sys . stderr , * * kwargs )
class ClickHouseGRPCClient ( cmd . Cmd ) :
prompt = " grpc :) "
2021-08-06 10:55:49 +00:00
def __init__ ( self , host = DEFAULT_HOST , port = DEFAULT_PORT , user_name = DEFAULT_USER_NAME , password = ' ' ,
2020-11-05 14:17:13 +00:00
database = ' ' , output_format = ' ' , settings = ' ' , verbatim = False , show_debug_info = False ) :
super ( ClickHouseGRPCClient , self ) . __init__ ( completekey = None )
self . host = host
self . port = port
self . user_name = user_name
self . password = password
self . database = database
self . output_format = output_format
self . settings = settings
self . verbatim = verbatim
self . show_debug_info = show_debug_info
self . channel = None
self . stub = None
self . session_id = None
def __enter__ ( self ) :
ClickHouseGRPCClient . __generate_pb2 ( )
ClickHouseGRPCClient . __import_pb2 ( )
self . __connect ( )
return self
def __exit__ ( self , exc_type , exc_value , traceback ) :
self . __disconnect ( )
# Executes a simple query and returns its output.
def get_simple_query_output ( self , query_text ) :
result = self . stub . ExecuteQuery ( clickhouse_grpc_pb2 . QueryInfo ( query = query_text , user_name = self . user_name , password = self . password ,
database = self . database , output_format = ' TabSeparated ' , settings = self . settings ,
session_id = self . session_id , query_id = str ( uuid . uuid4 ( ) ) ) )
if self . show_debug_info :
print ( ' \n result= {} ' . format ( result ) )
ClickHouseGRPCClient . __check_no_errors ( result )
2021-08-06 10:55:49 +00:00
return result . output . decode ( DEFAULT_ENCODING )
2020-11-05 14:17:13 +00:00
# Executes a query using the stream IO and with ability to cancel it by pressing Ctrl+C.
def run_query ( self , query_text , raise_exceptions = True , allow_cancel = False ) :
start_time = time . time ( )
cancelled = False
cancel_tries = 0
cancel_event = threading . Event ( )
def keyboard_interrupt_handler ( ) :
if allow_cancel :
nonlocal cancel_tries
cancel_tries = cancel_tries + 1
if cancel_tries < 3 :
self . verbatim_print ( " Cancelling query. " )
if cancel_tries == 1 :
cancel_event . set ( )
return True
# third attempt to cancel - we use previous handler which will terminate the client.
self . verbatim_print ( " Couldn ' t cancel the query, terminating. " )
return False
with KeyboardInterruptHandlerOverride ( keyboard_interrupt_handler ) :
try :
def send_query_info ( ) :
# send main query info
info = clickhouse_grpc_pb2 . QueryInfo ( query = query_text , user_name = self . user_name , password = self . password ,
database = self . database , output_format = self . output_format , settings = self . settings ,
session_id = self . session_id , query_id = str ( uuid . uuid4 ( ) ) )
# send input data
if not sys . stdin . isatty ( ) :
while True :
2021-08-06 10:55:49 +00:00
info . input_data = sys . stdin . buffer . read ( STDIN_BUFFER_SIZE )
2020-11-05 14:17:13 +00:00
if not info . input_data :
break
info . next_query_info = True
yield info
info = clickhouse_grpc_pb2 . QueryInfo ( )
yield info
# wait for possible cancel
if allow_cancel :
cancel_event . wait ( )
if cancel_tries > 0 :
yield clickhouse_grpc_pb2 . QueryInfo ( cancel = True )
for result in self . stub . ExecuteQueryWithStreamIO ( send_query_info ( ) ) :
if self . show_debug_info :
print ( ' \n result= {} ' . format ( result ) )
ClickHouseGRPCClient . __check_no_errors ( result )
2021-08-06 10:55:49 +00:00
sys . stdout . buffer . write ( result . output )
sys . stdout . flush ( )
2020-11-05 14:17:13 +00:00
if result . cancelled :
cancelled = True
self . verbatim_print ( " Query was cancelled. " )
cancel_event . set ( )
if not cancelled :
execution_time = time . time ( ) - start_time
self . verbatim_print ( ' \n Elapsed: {execution_time} sec. \n ' . format ( execution_time = execution_time ) )
except Exception as e :
if raise_exceptions :
raise
error_print ( e )
# Establish connection.
def __connect ( self ) :
self . verbatim_print ( " Connecting to {host} : {port} as user {user_name} . " . format ( host = self . host , port = self . port , user_name = self . user_name ) )
# Secure channels are supported by server but not supported by this client.
start_time = time . time ( )
self . channel = grpc . insecure_channel ( self . host + ' : ' + str ( self . port ) )
connection_time = 0
timeout = 5
while True :
try :
grpc . channel_ready_future ( self . channel ) . result ( timeout = timeout )
break ;
except grpc . FutureTimeoutError :
connection_time + = timeout
self . verbatim_print ( " Couldn ' t connect to ClickHouse server in {connection_time} seconds. " . format ( connection_time = connection_time ) )
self . stub = clickhouse_grpc_pb2_grpc . ClickHouseStub ( self . channel )
connection_time = time . time ( ) - start_time
if self . verbatim :
version = self . get_simple_query_output ( " SELECT version() FORMAT TabSeparated " ) . rstrip ( ' \n ' )
self . verbatim_print ( " Connected to ClickHouse server version {version} via gRPC protocol in {connection_time} . " . format ( version = version , connection_time = connection_time ) )
def __disconnect ( self ) :
if self . channel :
self . channel . close ( )
self . channel = None
self . stub = None
self . session_id = None
@staticmethod
def __check_no_errors ( result ) :
if result . HasField ( ' exception ' ) :
raise ClickHouseGRPCError ( result . exception . display_text )
# Use grpcio-tools to generate *pb2.py files from *.proto.
@staticmethod
def __generate_pb2 ( ) :
script_dir = os . path . dirname ( os . path . realpath ( __file__ ) )
proto_dir = os . path . join ( script_dir , ' ./protos ' )
gen_dir = os . path . join ( script_dir , ' ./_gen ' )
if os . path . exists ( os . path . join ( gen_dir , ' clickhouse_grpc_pb2_grpc.py ' ) ) :
return
os . makedirs ( gen_dir , exist_ok = True )
cmd = [ ' python3 ' , ' -m ' , ' grpc_tools.protoc ' , ' -I ' + proto_dir , ' --python_out= ' + gen_dir , ' --grpc_python_out= ' + gen_dir ,
proto_dir + ' /clickhouse_grpc.proto ' ]
p = subprocess . Popen ( cmd , stderr = subprocess . PIPE )
# We don't want to show grpc_tools warnings.
errors = p . stderr . read ( ) . decode ( ) . strip ( ' \n ' ) . split ( ' \n ' )
only_warnings = all ( ( ' Warning ' in error ) for error in errors )
if not only_warnings :
2020-11-26 07:07:04 +00:00
error_print ( ' \n ' . join ( errors ) )
2020-11-05 14:17:13 +00:00
# Import the generated *pb2.py files.
@staticmethod
def __import_pb2 ( ) :
script_dir = os . path . dirname ( os . path . realpath ( __file__ ) )
gen_dir = os . path . join ( script_dir , ' ./_gen ' )
sys . path . append ( gen_dir )
global clickhouse_grpc_pb2 , clickhouse_grpc_pb2_grpc
import clickhouse_grpc_pb2 , clickhouse_grpc_pb2_grpc
# Prints only if interactive mode is activated.
def verbatim_print ( self , * args , * * kwargs ) :
if self . verbatim :
print ( * args , * * kwargs )
# Overrides Cmd.preloop(). Executed once when cmdloop() is called.
def preloop ( self ) :
super ( ClickHouseGRPCClient , self ) . preloop ( )
ClickHouseGRPCClient . __read_history ( )
# we use session for interactive mode
self . session_id = str ( uuid . uuid4 ( ) )
# Overrides Cmd.postloop(). Executed once when cmdloop() is about to return.
def postloop ( self ) :
super ( ClickHouseGRPCClient , self ) . postloop ( )
ClickHouseGRPCClient . __write_history ( )
# Overrides Cmd.onecmd(). Runs single command.
def onecmd ( self , line ) :
stripped = line . strip ( )
if stripped == ' exit ' or stripped == ' quit ' :
return True
if stripped == ' ' :
return False
self . run_query ( line , raise_exceptions = False , allow_cancel = True )
return False
# Enables history of commands for interactive mode.
@staticmethod
def __read_history ( ) :
global readline
try :
import readline
except ImportError :
readline = None
2021-08-06 10:55:49 +00:00
histfile = os . path . expanduser ( HISTORY_FILENAME )
2020-11-05 14:17:13 +00:00
if readline and os . path . exists ( histfile ) :
readline . read_history_file ( histfile )
@staticmethod
def __write_history ( ) :
global readline
if readline :
2021-08-06 10:55:49 +00:00
readline . set_history_length ( HISTORY_SIZE )
histfile = os . path . expanduser ( HISTORY_FILENAME )
2020-11-05 14:17:13 +00:00
readline . write_history_file ( histfile )
# MAIN
def main ( args ) :
parser = argparse . ArgumentParser ( description = ' ClickHouse client accessing server through gRPC protocol. ' , add_help = False )
parser . add_argument ( ' --help ' , help = ' Show this help message and exit ' , action = ' store_true ' )
parser . add_argument ( ' --host ' , ' -h ' , help = ' The server name, ‘ localhost’ by default. You can use either the name or the IPv4 or IPv6 address. ' , default = ' localhost ' )
2020-11-26 07:14:27 +00:00
parser . add_argument ( ' --port ' , help = ' The port to connect to. This port should be enabled on the ClickHouse server (see grpc_port in the config). ' , default = 9100 )
2020-11-05 14:17:13 +00:00
parser . add_argument ( ' --user ' , ' -u ' , dest = ' user_name ' , help = ' The username. Default value: ‘ default’ . ' , default = ' default ' )
parser . add_argument ( ' --password ' , help = ' The password. Default value: empty string. ' , default = ' ' )
parser . add_argument ( ' --query ' , ' -q ' , help = ' The query to process when using non-interactive mode. ' , default = ' ' )
parser . add_argument ( ' --database ' , ' -d ' , help = ' Select the current default database. Default value: the current database from the server settings (‘ default’ by default). ' , default = ' ' )
parser . add_argument ( ' --format ' , ' -f ' , dest = ' output_format ' , help = ' Use the specified default format to output the result. ' , default = ' ' )
parser . add_argument ( ' --debug ' , dest = ' show_debug_info ' , help = ' Enables showing the debug information. ' , action = ' store_true ' )
args = parser . parse_args ( args )
if args . help :
parser . print_help ( )
sys . exit ( 0 )
interactive_mode = not args . query
verbatim = interactive_mode
output_format = args . output_format
if not output_format and interactive_mode :
2021-08-06 10:55:49 +00:00
output_format = DEFAULT_OUTPUT_FORMAT_FOR_INTERACTIVE_MODE
2020-11-05 14:17:13 +00:00
try :
with ClickHouseGRPCClient ( host = args . host , port = args . port , user_name = args . user_name , password = args . password ,
database = args . database , output_format = output_format , verbatim = verbatim ,
show_debug_info = args . show_debug_info ) as client :
if interactive_mode :
client . cmdloop ( )
else :
client . run_query ( args . query )
except KeyboardInterrupt :
pass
except Exception as e :
error_print ( e )
if verbatim :
print ( " \n Bye " )
if __name__ == ' __main__ ' :
main ( sys . argv [ 1 : ] )