Extend protocol with query parameters (#39906)

This commit is contained in:
Nikita Taranov 2022-08-12 14:28:35 +02:00 committed by GitHub
parent 86061d945e
commit 17956cb668
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 213 additions and 21 deletions

View File

@ -133,6 +133,7 @@ std::vector<String> Client::loadWarningMessages()
std::vector<String> messages;
connection->sendQuery(connection_parameters.timeouts,
"SELECT * FROM viewIfPermitted(SELECT message FROM system.warnings ELSE null('message String'))",
{} /* query_parameters */,
"" /* query_id */,
QueryProcessingStage::Complete,
&global_context->getSettingsRef(),

View File

@ -740,8 +740,10 @@ void ClientBase::processOrdinaryQuery(const String & query_to_execute, ASTPtr pa
/// Rewrite query only when we have query parameters.
/// Note that if query is rewritten, comments in query are lost.
/// But the user often wants to see comments in server logs, query log, processlist, etc.
/// For recent versions of the server query parameters will be transferred by network and applied on the server side.
auto query = query_to_execute;
if (!query_parameters.empty())
if (!query_parameters.empty()
&& connection->getServerRevision(connection_parameters.timeouts) < DBMS_MIN_PROTOCOL_VERSION_WITH_PARAMETERS)
{
/// Replace ASTQueryParameter with ASTLiteral for prepared statements.
ReplaceQueryParameterVisitor visitor(query_parameters);
@ -762,6 +764,7 @@ void ClientBase::processOrdinaryQuery(const String & query_to_execute, ASTPtr pa
connection->sendQuery(
connection_parameters.timeouts,
query,
query_parameters,
global_context->getCurrentQueryId(),
query_processing_stage,
&global_context->getSettingsRef(),
@ -1087,7 +1090,8 @@ bool ClientBase::receiveSampleBlock(Block & out, ColumnsDescription & columns_de
void ClientBase::processInsertQuery(const String & query_to_execute, ASTPtr parsed_query)
{
auto query = query_to_execute;
if (!query_parameters.empty())
if (!query_parameters.empty()
&& connection->getServerRevision(connection_parameters.timeouts) < DBMS_MIN_PROTOCOL_VERSION_WITH_PARAMETERS)
{
/// Replace ASTQueryParameter with ASTLiteral for prepared statements.
ReplaceQueryParameterVisitor visitor(query_parameters);
@ -1114,6 +1118,7 @@ void ClientBase::processInsertQuery(const String & query_to_execute, ASTPtr pars
connection->sendQuery(
connection_parameters.timeouts,
query,
query_parameters,
global_context->getCurrentQueryId(),
query_processing_stage,
&global_context->getSettingsRef(),

View File

@ -477,6 +477,7 @@ TablesStatusResponse Connection::getTablesStatus(const ConnectionTimeouts & time
void Connection::sendQuery(
const ConnectionTimeouts & timeouts,
const String & query,
const NameToNameMap & query_parameters,
const String & query_id_,
UInt64 stage,
const Settings * settings,
@ -569,6 +570,14 @@ void Connection::sendQuery(
writeStringBinary(query, *out);
if (server_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_PARAMETERS)
{
Settings params;
for (const auto & [name, value] : query_parameters)
params.set(name, value);
params.write(*out, SettingsWriteFormat::STRINGS_WITH_FLAGS);
}
maybe_compressed_in.reset();
maybe_compressed_out.reset();
block_in.reset();

View File

@ -97,6 +97,7 @@ public:
void sendQuery(
const ConnectionTimeouts & timeouts,
const String & query,
const NameToNameMap& query_parameters,
const String & query_id_/* = "" */,
UInt64 stage/* = QueryProcessingStage::Complete */,
const Settings * settings/* = nullptr */,

View File

@ -183,7 +183,7 @@ void HedgedConnections::sendQuery(
modified_settings.parallel_replica_offset = fd_to_replica_location[replica.packet_receiver->getFileDescriptor()].offset;
}
replica.connection->sendQuery(timeouts, query, query_id, stage, &modified_settings, &client_info, with_pending_data, {});
replica.connection->sendQuery(timeouts, query, /* query_parameters */ {}, query_id, stage, &modified_settings, &client_info, with_pending_data, {});
replica.change_replica_timeout.setRelative(timeouts.receive_data_timeout);
replica.packet_receiver->setReceiveTimeout(hedged_connections_factory.getConnectionTimeouts().receive_timeout);
};

View File

@ -86,6 +86,7 @@ public:
virtual void sendQuery(
const ConnectionTimeouts & timeouts,
const String & query,
const NameToNameMap & query_parameters,
const String & query_id_,
UInt64 stage,
const Settings * settings,

View File

@ -75,6 +75,7 @@ void LocalConnection::sendProfileEvents()
void LocalConnection::sendQuery(
const ConnectionTimeouts &,
const String & query,
const NameToNameMap & query_parameters,
const String & query_id,
UInt64 stage,
const Settings *,
@ -82,6 +83,9 @@ void LocalConnection::sendQuery(
bool,
std::function<void(const Progress &)> process_progress_callback)
{
if (!query_parameters.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "clickhouse local does not support query parameters");
/// Suggestion comes without client_info.
if (client_info)
query_context = session.makeQueryContext(*client_info);

View File

@ -94,6 +94,7 @@ public:
void sendQuery(
const ConnectionTimeouts & timeouts,
const String & query,
const NameToNameMap & query_parameters,
const String & query_id/* = "" */,
UInt64 stage/* = QueryProcessingStage::Complete */,
const Settings * settings/* = nullptr */,

View File

@ -160,15 +160,15 @@ void MultiplexedConnections::sendQuery(
if (enable_sample_offset_parallel_processing)
modified_settings.parallel_replica_offset = i;
replica_states[i].connection->sendQuery(timeouts, query, query_id,
stage, &modified_settings, &client_info, with_pending_data, {});
replica_states[i].connection->sendQuery(
timeouts, query, /* query_parameters */ {}, query_id, stage, &modified_settings, &client_info, with_pending_data, {});
}
}
else
{
/// Use single replica.
replica_states[0].connection->sendQuery(timeouts, query, query_id,
stage, &modified_settings, &client_info, with_pending_data, {});
replica_states[0].connection->sendQuery(
timeouts, query, /* query_parameters */ {}, query_id, stage, &modified_settings, &client_info, with_pending_data, {});
}
sent_query = true;

View File

@ -138,7 +138,8 @@ void Suggest::load(ContextPtr context, const ConnectionParameters & connection_p
void Suggest::fetch(IServerConnection & connection, const ConnectionTimeouts & timeouts, const std::string & query)
{
connection.sendQuery(timeouts, query, "" /* query_id */, QueryProcessingStage::Complete, nullptr, nullptr, false, {});
connection.sendQuery(
timeouts, query, {} /* query_parameters */, "" /* query_id */, QueryProcessingStage::Complete, nullptr, nullptr, false, {});
while (true)
{

View File

@ -52,7 +52,7 @@
/// NOTE: DBMS_TCP_PROTOCOL_VERSION has nothing common with VERSION_REVISION,
/// later is just a number for server version (one number instead of commit SHA)
/// for simplicity (sometimes it may be more convenient in some use cases).
#define DBMS_TCP_PROTOCOL_VERSION 54458
#define DBMS_TCP_PROTOCOL_VERSION 54459
#define DBMS_MIN_PROTOCOL_VERSION_WITH_INITIAL_QUERY_START_TIME 54449
@ -63,3 +63,5 @@
#define DBMS_MIN_PROTOCOL_VERSION_WITH_ADDENDUM 54458
#define DBMS_MIN_PROTOCOL_VERSION_WITH_QUOTA_KEY 54458
#define DBMS_MIN_PROTOCOL_VERSION_WITH_PARAMETERS 54459

View File

@ -2965,6 +2965,11 @@ void Context::setQueryParameter(const String & name, const String & value)
throw Exception("Duplicate name " + backQuote(name) + " of query parameter", ErrorCodes::BAD_ARGUMENTS);
}
void Context::addQueryParameters(const NameToNameMap & parameters)
{
for (const auto & [name, value] : parameters)
query_parameters.insert_or_assign(name, value);
}
void Context::addBridgeCommand(std::unique_ptr<ShellCommand> cmd) const
{

View File

@ -946,9 +946,14 @@ public:
/// Query parameters for prepared statements.
bool hasQueryParameters() const;
const NameToNameMap & getQueryParameters() const;
/// Throws if parameter with the given name already set.
void setQueryParameter(const String & name, const String & value);
void setQueryParameters(const NameToNameMap & parameters) { query_parameters = parameters; }
/// Overrides values of existing parameters.
void addQueryParameters(const NameToNameMap & parameters);
/// Add started bridge command. It will be killed after context destruction
void addBridgeCommand(std::unique_ptr<ShellCommand> cmd) const;

View File

@ -1,6 +1,6 @@
#include <Parsers/ASTSetQuery.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterSetQuery.h>
#include <Parsers/ASTSetQuery.h>
namespace DB
{
@ -10,7 +10,9 @@ BlockIO InterpreterSetQuery::execute()
{
const auto & ast = query_ptr->as<ASTSetQuery &>();
getContext()->checkSettingsConstraints(ast.changes);
getContext()->getSessionContext()->applySettingsChanges(ast.changes);
auto session_context = getContext()->getSessionContext();
session_context->applySettingsChanges(ast.changes);
session_context->addQueryParameters(ast.query_parameters);
return {};
}

View File

@ -1,8 +1,8 @@
#pragma once
#include <Common/SettingsChanges.h>
#include <Core/Names.h>
#include <Parsers/IAST.h>
#include <Common/SettingsChanges.h>
namespace DB
{
@ -15,6 +15,7 @@ public:
bool is_standalone = true; /// If false, this AST is a part of another query, such as SELECT.
SettingsChanges changes;
NameToNameMap query_parameters;
/** Get the text that identifies this element. */
String getID(char) const override { return "Set"; }

View File

@ -5,13 +5,38 @@
#include <Parsers/CommonParsers.h>
#include <Parsers/ParserSetQuery.h>
#include <Common/typeid_cast.h>
#include <Core/Names.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <Common/FieldVisitorToString.h>
#include <Common/SettingsChanges.h>
#include <Common/typeid_cast.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
static NameToNameMap::value_type convertToQueryParameter(SettingChange change)
{
auto name = change.name.substr(strlen(QUERY_PARAMETER_NAME_PREFIX));
if (name.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Parameter name cannot be empty");
auto value = applyVisitor(FieldVisitorToString(), change.value);
/// writeQuoted is not always quoted in line with SQL standard https://github.com/ClickHouse/ClickHouse/blob/master/src/IO/WriteHelpers.h
if (value.starts_with('\''))
{
ReadBufferFromOwnString buf(value);
readQuoted(value, buf);
}
return {name, value};
}
class ParserLiteralOrMap : public IParserBase
{
public:
@ -111,16 +136,23 @@ bool ParserSetQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
}
SettingsChanges changes;
NameToNameMap query_parameters;
while (true)
{
if (!changes.empty() && !s_comma.ignore(pos))
if ((!changes.empty() || !query_parameters.empty()) && !s_comma.ignore(pos))
break;
changes.push_back(SettingChange{});
/// Either a setting or a parameter for prepared statement (if name starts with QUERY_PARAMETER_NAME_PREFIX)
SettingChange current;
if (!parseNameValuePair(changes.back(), pos, expected))
if (!parseNameValuePair(current, pos, expected))
return false;
if (current.name.starts_with(QUERY_PARAMETER_NAME_PREFIX))
query_parameters.emplace(convertToQueryParameter(std::move(current)));
else
changes.push_back(std::move(current));
}
auto query = std::make_shared<ASTSetQuery>();
@ -128,6 +160,7 @@ bool ParserSetQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
query->is_standalone = !parse_only_internals;
query->changes = std::move(changes);
query->query_parameters = std::move(query_parameters);
return true;
}

View File

@ -9,6 +9,8 @@ namespace DB
struct SettingChange;
constexpr char QUERY_PARAMETER_NAME_PREFIX[] = "param_";
/** Query like this:
* SET name1 = value1, name2 = value2, ...
*/

View File

@ -67,7 +67,8 @@ RemoteInserter::RemoteInserter(
/** Send query and receive "header", that describes table structure.
* Header is needed to know, what structure is required for blocks to be passed to 'write' method.
*/
connection.sendQuery(timeouts, query, "", QueryProcessingStage::Complete, &settings, &modified_client_info, false, {});
connection.sendQuery(
timeouts, query, /* query_parameters */ {}, "", QueryProcessingStage::Complete, &settings, &modified_client_info, false, {});
while (true)
{

View File

@ -26,6 +26,7 @@
#include <Common/StringUtils/StringUtils.h>
#include <Common/setThreadName.h>
#include <Common/typeid_cast.h>
#include <Parsers/ParserSetQuery.h>
#include <base/getFQDNOrHostName.h>
#include <base/scope_guard.h>
@ -1014,10 +1015,10 @@ bool DynamicQueryHandler::customizeQueryParam(ContextMutablePtr context, const s
if (key == param_name)
return true; /// do nothing
if (startsWith(key, "param_"))
if (startsWith(key, QUERY_PARAMETER_NAME_PREFIX))
{
/// Save name and values of substitution in dictionary.
const String parameter_name = key.substr(strlen("param_"));
const String parameter_name = key.substr(strlen(QUERY_PARAMETER_NAME_PREFIX));
if (!context->getQueryParameters().contains(parameter_name))
context->setQueryParameter(parameter_name, value);

View File

@ -57,6 +57,7 @@
#include <Common/config_version.h>
using namespace std::literals;
using namespace DB;
namespace CurrentMetrics
@ -64,6 +65,23 @@ namespace CurrentMetrics
extern const Metric QueryThread;
}
namespace
{
NameToNameMap convertToQueryParameters(const Settings & passed_params)
{
NameToNameMap query_parameters;
for (const auto & param : passed_params)
{
std::string value;
ReadBufferFromOwnString buf(param.getValueString());
readQuoted(value, buf);
query_parameters.emplace(param.getName(), value);
}
return query_parameters;
}
}
namespace DB
{
@ -1334,6 +1352,10 @@ void TCPHandler::receiveQuery()
readStringBinary(state.query, *in);
Settings passed_params;
if (client_tcp_protocol_version >= DBMS_MIN_PROTOCOL_VERSION_WITH_PARAMETERS)
passed_params.read(*in, settings_format);
/// TODO Unify interserver authentication (and make sure that it's secure enough)
if (is_interserver_mode)
{
@ -1424,6 +1446,8 @@ void TCPHandler::receiveQuery()
/// so we have to apply the changes first.
query_context->setCurrentQueryId(state.query_id);
query_context->addQueryParameters(convertToQueryParameters(passed_params));
/// For testing hedged requests
if (unlikely(sleep_after_receiving_query.totalMilliseconds()))
{
@ -1460,6 +1484,9 @@ void TCPHandler::receiveUnexpectedQuery()
readStringBinary(skip_string, *in);
if (client_tcp_protocol_version >= DBMS_MIN_PROTOCOL_VERSION_WITH_PARAMETERS)
skip_settings.read(*in, settings_format);
throw NetException("Unexpected packet Query received from client", ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
}

View File

@ -4,3 +4,4 @@
[[10],[10],[10]]
[10,10,10] [[10],[10],[10]] (10,'Test') (10,('dt',10)) 2015-02-15
Code: 457.
Code: 457.

View File

@ -0,0 +1,9 @@
42 hello 2022-08-04 18:30:53 {'2b95a497-3a5d-49af-bf85-15763318cde7':[1.2,3.4]}
UInt64 String DateTime Map(UUID, Array(Float32))
42 [1,2,3] {'abc':22,'def':33} [[4,5,6],[7],[8,9]] {10:[11,12],13:[14,15]} {'ghj':{'klm':[16,17]},'nop':{'rst':[18]}}
5
42
13
13 str 2022-08-04 18:30:53 {'10':[11,12],'13':[14,15]}
1
1

View File

@ -0,0 +1,80 @@
#!/usr/bin/env bash
# shellcheck disable=SC2154
unset CLICKHOUSE_LOG_COMMENT
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_CLIENT \
--param_num="42" \
--param_str="hello" \
--param_date="2022-08-04 18:30:53" \
--param_map="{'2b95a497-3a5d-49af-bf85-15763318cde7': [1.2, 3.4]}" \
-q "select {num:UInt64}, {str:String}, {date:DateTime}, {map:Map(UUID, Array(Float32))}"
$CLICKHOUSE_CLIENT \
--param_num="42" \
--param_str="hello" \
--param_date="2022-08-04 18:30:53" \
--param_map="{'2b95a497-3a5d-49af-bf85-15763318cde7': [1.2, 3.4]}" \
-q "select toTypeName({num:UInt64}), toTypeName({str:String}), toTypeName({date:DateTime}), toTypeName({map:Map(UUID, Array(Float32))})"
table_name="t_02377_extend_protocol_with_query_parameters_$RANDOM$RANDOM"
$CLICKHOUSE_CLIENT -n -q "
create table $table_name(
id Int64,
arr Array(UInt8),
map Map(String, UInt8),
mul_arr Array(Array(UInt8)),
map_arr Map(UInt8, Array(UInt8)),
map_map_arr Map(String, Map(String, Array(UInt8))))
engine = MergeTree
order by (id)"
$CLICKHOUSE_CLIENT \
--param_id="42" \
--param_arr="[1, 2, 3]" \
--param_map="{'abc': 22, 'def': 33}" \
--param_mul_arr="[[4, 5, 6], [7], [8, 9]]" \
--param_map_arr="{10: [11, 12], 13: [14, 15]}" \
--param_map_map_arr="{'ghj': {'klm': [16, 17]}, 'nop': {'rst': [18]}}" \
-q "insert into $table_name values({id: Int64}, {arr: Array(UInt8)}, {map: Map(String, UInt8)}, {mul_arr: Array(Array(UInt8))}, {map_arr: Map(UInt8, Array(UInt8))}, {map_map_arr: Map(String, Map(String, Array(UInt8)))})"
$CLICKHOUSE_CLIENT -q "select * from $table_name"
$CLICKHOUSE_CLIENT \
--param_tbl="numbers" \
--param_db="system" \
--param_col="number" \
-q "select {col:Identifier} from {db:Identifier}.{tbl:Identifier} limit 1 offset 5"
# it is possible to set parameter for the current session
$CLICKHOUSE_CLIENT -n -q "set param_n = 42; select {n: UInt8}"
# and it will not be visible to other sessions
$CLICKHOUSE_CLIENT -n -q "select {n: UInt8} -- { serverError 456 }"
# the same parameter could be set multiple times within one session (new value overrides the previous one)
$CLICKHOUSE_CLIENT -n -q "set param_n = 12; set param_n = 13; select {n: UInt8}"
# but multiple different parameters could be defined within each session
$CLICKHOUSE_CLIENT -n -q "
set param_a = 13, param_b = 'str';
set param_c = '2022-08-04 18:30:53';
set param_d = '{\'10\': [11, 12], \'13\': [14, 15]}';
select {a: UInt32}, {b: String}, {c: DateTime}, {d: Map(String, Array(UInt8))}"
# empty parameter name is not allowed
$CLICKHOUSE_CLIENT --param_="" -q "select 1" 2>&1 | grep -c 'Code: 36'
$CLICKHOUSE_CLIENT -q "set param_ = ''" 2>&1 | grep -c 'Code: 36'