This commit is contained in:
Attila Szakacs 2024-11-20 15:10:04 -08:00 committed by GitHub
commit 1874dca374
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 81 additions and 7 deletions

View File

@ -14,6 +14,7 @@ ClickHouse supports [gRPC](https://grpc.io/) interface. It is an open source rem
- authentication; - authentication;
- sessions; - sessions;
- compression; - compression;
- query parameters;
- parallel queries through the same channel; - parallel queries through the same channel;
- cancellation of queries; - cancellation of queries;
- getting progress and logs; - getting progress and logs;
@ -21,6 +22,14 @@ ClickHouse supports [gRPC](https://grpc.io/) interface. It is an open source rem
The specification of the interface is described in [clickhouse_grpc.proto](https://github.com/ClickHouse/ClickHouse/blob/master/src/Server/grpc_protos/clickhouse_grpc.proto). The specification of the interface is described in [clickhouse_grpc.proto](https://github.com/ClickHouse/ClickHouse/blob/master/src/Server/grpc_protos/clickhouse_grpc.proto).
Query parameters can be set in the Metadata fields of the Client Context, with `param_` prefix for the key, for example:
```py
[
("param_table", "my_table"),
("param_column", "my_column"),
]
```
## gRPC Configuration {#grpc-interface-configuration} ## gRPC Configuration {#grpc-interface-configuration}
To use the gRPC interface set `grpc_port` in the main [server configuration](../operations/configuration-files.md). Other configuration options see in the following example: To use the gRPC interface set `grpc_port` in the main [server configuration](../operations/configuration-files.md). Other configuration options see in the following example:
@ -72,6 +81,7 @@ The client supports the following arguments:
- `--query QUERY, -q QUERY` A query to process when using non-interactive mode. - `--query QUERY, -q QUERY` A query to process when using non-interactive mode.
- `--database DATABASE, -d DATABASE` A default database. If not specified, the current database set in the server settings is used (`default` by default). - `--database DATABASE, -d DATABASE` A default database. If not specified, the current database set in the server settings is used (`default` by default).
- `--format OUTPUT_FORMAT, -f OUTPUT_FORMAT` A result output [format](formats.md). Default value for interactive mode: `PrettyCompact`. - `--format OUTPUT_FORMAT, -f OUTPUT_FORMAT` A result output [format](formats.md). Default value for interactive mode: `PrettyCompact`.
- `--param PARAM` Query parameters in `key=value` format. Can be used multiple times.
- `--debug` Enables showing debug information. - `--debug` Enables showing debug information.
To run the client in an interactive mode call it without `--query` argument. To run the client in an interactive mode call it without `--query` argument.

View File

@ -369,6 +369,11 @@ namespace
return std::nullopt; return std::nullopt;
} }
const std::multimap<::grpc::string_ref, ::grpc::string_ref> & getClientHeaders() const
{
return grpc_context.client_metadata();
}
void setTransportCompression(const TransportCompression & transport_compression) void setTransportCompression(const TransportCompression & transport_compression)
{ {
grpc_context.set_compression_algorithm(transport_compression.algorithm); grpc_context.set_compression_algorithm(transport_compression.algorithm);
@ -872,6 +877,16 @@ namespace
query_context = session->makeQueryContext(std::move(client_info)); query_context = session->makeQueryContext(std::move(client_info));
/// Extract query params from gRPC client context.
for (const auto & [key, value] : responder->getClientHeaders())
{
if (key.starts_with("param_"))
{
query_context->setQueryParameter(
String{key.data() + 6, key.size() - 6}, String{value.data(), value.size()});
}
}
/// Prepare settings. /// Prepare settings.
SettingsChanges settings_changes; SettingsChanges settings_changes;
for (const auto & [key, value] : query_info.settings()) for (const auto & [key, value] : query_info.settings())

View File

@ -72,6 +72,7 @@ def query_common(
session_id="", session_id="",
stream_output=False, stream_output=False,
channel=None, channel=None,
query_params={},
): ):
if type(input_data) is not list: if type(input_data) is not list:
input_data = [input_data] input_data = [input_data]
@ -110,15 +111,24 @@ def query_common(
input_data=input_data_part, next_query_info=bool(input_data) input_data=input_data_part, next_query_info=bool(input_data)
) )
def metadata():
return [("param_" + param, value) for param, value in query_params.items()]
stream_input = len(input_data) > 1 stream_input = len(input_data) > 1
if stream_input and stream_output: if stream_input and stream_output:
return list(stub.ExecuteQueryWithStreamIO(send_query_info())) return list(
stub.ExecuteQueryWithStreamIO(send_query_info(), metadata=metadata())
)
elif stream_input: elif stream_input:
return [stub.ExecuteQueryWithStreamInput(send_query_info())] return [
stub.ExecuteQueryWithStreamInput(send_query_info(), metadata=metadata())
]
elif stream_output: elif stream_output:
return list(stub.ExecuteQueryWithStreamOutput(query_info())) return list(
stub.ExecuteQueryWithStreamOutput(query_info(), metadata=metadata())
)
else: else:
return [stub.ExecuteQuery(query_info())] return [stub.ExecuteQuery(query_info(), metadata=metadata())]
def query_no_errors(*args, **kwargs): def query_no_errors(*args, **kwargs):
@ -273,6 +283,24 @@ def test_insert_splitted_row():
assert query("SELECT a FROM t ORDER BY a") == "1\n2\n3\n4\n5\n6\n" assert query("SELECT a FROM t ORDER BY a") == "1\n2\n3\n4\n5\n6\n"
def test_query_params():
query(
r"CREATE TABLE {table:Identifier} (a UInt8) ENGINE = Memory",
query_params={"table": "t"},
)
query(
r"INSERT INTO {table:Identifier} VALUES (1),(2),(3)",
query_params={"table": "t"},
)
assert (
query(
r"SELECT {column:Identifier} FROM {table:Identifier} ORDER BY {column:Identifier}",
query_params={"table": "t", "column": "a"},
)
== "1\n2\n3\n"
)
def test_output_format(): def test_output_format():
query("CREATE TABLE t (a UInt8) ENGINE = Memory") query("CREATE TABLE t (a UInt8) ENGINE = Memory")
query("INSERT INTO t VALUES (1),(2),(3)") query("INSERT INTO t VALUES (1),(2),(3)")

View File

@ -79,6 +79,7 @@ class ClickHouseGRPCClient(cmd.Cmd):
settings="", settings="",
verbatim=False, verbatim=False,
show_debug_info=False, show_debug_info=False,
params=[],
): ):
super(ClickHouseGRPCClient, self).__init__(completekey=None) super(ClickHouseGRPCClient, self).__init__(completekey=None)
self.host = host self.host = host
@ -93,6 +94,7 @@ class ClickHouseGRPCClient(cmd.Cmd):
self.channel = None self.channel = None
self.stub = None self.stub = None
self.session_id = None self.session_id = None
self.params = params
def __enter__(self): def __enter__(self):
self.__connect() self.__connect()
@ -101,8 +103,11 @@ class ClickHouseGRPCClient(cmd.Cmd):
def __exit__(self, exc_type, exc_value, traceback): def __exit__(self, exc_type, exc_value, traceback):
self.__disconnect() self.__disconnect()
def format_metadata(self):
return [("param_" + param, value) for param, value in self.params]
# Executes a simple query and returns its output. # Executes a simple query and returns its output.
def get_simple_query_output(self, query_text): def get_simple_query_output(self, query_text, params=[]):
result = self.stub.ExecuteQuery( result = self.stub.ExecuteQuery(
clickhouse_grpc_pb2.QueryInfo( clickhouse_grpc_pb2.QueryInfo(
query=query_text, query=query_text,
@ -113,7 +118,8 @@ class ClickHouseGRPCClient(cmd.Cmd):
settings=self.settings, settings=self.settings,
session_id=self.session_id, session_id=self.session_id,
query_id=str(uuid.uuid4()), query_id=str(uuid.uuid4()),
) ),
metadata=self.format_metadata(),
) )
if self.show_debug_info: if self.show_debug_info:
print("\nresult={}".format(result)) print("\nresult={}".format(result))
@ -171,7 +177,9 @@ class ClickHouseGRPCClient(cmd.Cmd):
if cancel_tries > 0: if cancel_tries > 0:
yield clickhouse_grpc_pb2.QueryInfo(cancel=True) yield clickhouse_grpc_pb2.QueryInfo(cancel=True)
for result in self.stub.ExecuteQueryWithStreamIO(send_query_info()): for result in self.stub.ExecuteQueryWithStreamIO(
send_query_info(), metadata=self.format_metadata()
):
if self.show_debug_info: if self.show_debug_info:
print("\nresult={}".format(result)) print("\nresult={}".format(result))
ClickHouseGRPCClient.__check_no_errors(result) ClickHouseGRPCClient.__check_no_errors(result)
@ -290,6 +298,11 @@ class ClickHouseGRPCClient(cmd.Cmd):
readline.write_history_file(histfile) readline.write_history_file(histfile)
def parse_param(arg):
key, value = arg.split("=")
return key, value
# MAIN # MAIN
@ -341,6 +354,13 @@ def main(args):
help="Use the specified default format to output the result.", help="Use the specified default format to output the result.",
default="", default="",
) )
parser.add_argument(
"--param",
action="append",
type=parse_param,
help="Query parameters in key=value format. Can be used multiple times.",
default=[],
)
parser.add_argument( parser.add_argument(
"--debug", "--debug",
dest="show_debug_info", dest="show_debug_info",
@ -370,6 +390,7 @@ def main(args):
output_format=output_format, output_format=output_format,
verbatim=verbatim, verbatim=verbatim,
show_debug_info=args.show_debug_info, show_debug_info=args.show_debug_info,
params=args.param,
) as client: ) as client:
if interactive_mode: if interactive_mode:
client.cmdloop() client.cmdloop()