Merge pull request #12152 from BohuTANG/mysql_kill_query

Support MySQL 'KILL QUERY [connection_id]'
This commit is contained in:
Alexander Kuzmenkov 2020-07-07 11:07:20 +03:00 committed by GitHub
commit dce7709405
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 107 additions and 33 deletions

View File

@ -45,6 +45,10 @@ namespace ErrorCodes
extern const int SUPPORT_IS_DISABLED;
}
static String select_empty_replacement_query(const String & query);
static String show_table_status_replacement_query(const String & query);
static String kill_connection_id_replacement_query(const String & query);
MySQLHandler::MySQLHandler(IServer & server_, const Poco::Net::StreamSocket & socket_,
bool ssl_enabled, size_t connection_id_)
: Poco::Net::TCPServerConnection(socket_)
@ -57,6 +61,10 @@ MySQLHandler::MySQLHandler(IServer & server_, const Poco::Net::StreamSocket & so
server_capability_flags = CLIENT_PROTOCOL_41 | CLIENT_SECURE_CONNECTION | CLIENT_PLUGIN_AUTH | CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA | CLIENT_CONNECT_WITH_DB | CLIENT_DEPRECATE_EOF;
if (ssl_enabled)
server_capability_flags |= CLIENT_SSL;
replacements.emplace("KILL QUERY", kill_connection_id_replacement_query);
replacements.emplace("SHOW TABLE STATUS LIKE", show_table_status_replacement_query);
replacements.emplace("SHOW VARIABLES", select_empty_replacement_query);
}
void MySQLHandler::run()
@ -103,7 +111,8 @@ void MySQLHandler::run()
{
if (!handshake_response.database.empty())
connection_context.setCurrentDatabase(handshake_response.database);
connection_context.setCurrentQueryId("");
connection_context.setCurrentQueryId(Poco::format("mysql:%lu", connection_id));
}
catch (const Exception & exc)
{
@ -284,20 +293,18 @@ void MySQLHandler::comQuery(ReadBuffer & payload)
}
else
{
String replacement_query = "SELECT ''";
String replacement_query;
bool should_replace = false;
bool with_output = false;
// This is a workaround in order to support adding ClickHouse to MySQL using federated server.
if (0 == strncasecmp("SHOW TABLE STATUS LIKE", query.c_str(), 22))
for (auto const & x : replacements)
{
if (0 == strncasecmp(x.first.c_str(), query.c_str(), x.first.size()))
{
should_replace = true;
replacement_query = boost::replace_all_copy(query, "SHOW TABLE STATUS LIKE ", show_table_status_replacement_query);
replacement_query = x.second(query);
break;
}
if (0 == strncasecmp("SHOW VARIABLES", query.c_str(), 13))
{
should_replace = true;
}
ReadBufferFromString replacement(replacement_query);
@ -372,7 +379,22 @@ static bool isFederatedServerSetupSetCommand(const String & query)
return 1 == std::regex_match(query, expr);
}
const String MySQLHandler::show_table_status_replacement_query("SELECT"
/// Replace "[query(such as SHOW VARIABLES...)]" into "".
static String select_empty_replacement_query(const String & query)
{
std::ignore = query;
return "select ''";
}
/// Replace "SHOW TABLE STATUS LIKE 'xx'" into "SELECT ... FROM system.tables WHERE name LIKE 'xx'".
static String show_table_status_replacement_query(const String & query)
{
const String prefix = "SHOW TABLE STATUS LIKE ";
if (query.size() > prefix.size())
{
String suffix = query.data() + prefix.length();
return (
"SELECT"
" name AS Name,"
" engine AS Engine,"
" '10' AS Version,"
@ -392,6 +414,28 @@ const String MySQLHandler::show_table_status_replacement_query("SELECT"
" '' AS Create_options,"
" '' AS Comment"
" FROM system.tables"
" WHERE name LIKE ");
" WHERE name LIKE "
+ suffix);
}
return query;
}
/// Replace "KILL QUERY [connection_id]" into "KILL QUERY WHERE query_id = 'mysql:[connection_id]'".
static String kill_connection_id_replacement_query(const String & query)
{
const String prefix = "KILL QUERY ";
if (query.size() > prefix.size())
{
String suffix = query.data() + prefix.length();
static const std::regex expr{"^[0-9]"};
if (std::regex_match(suffix, expr))
{
String replacement = Poco::format("KILL QUERY WHERE query_id = 'mysql:%s'", suffix);
return replacement;
}
}
return query;
}
}

View File

@ -72,7 +72,9 @@ protected:
bool secure_connection = false;
private:
static const String show_table_status_replacement_query;
using ReplacementFn = std::function<String(const String & query)>;
using Replacements = std::unordered_map<std::string, ReplacementFn>;
Replacements replacements;
};
#if USE_SSL

View File

@ -138,6 +138,34 @@ def test_mysql_client(mysql_client, server_address):
assert stdout == '\n'.join(['column', '0', '0', '1', '1', '5', '5', 'tmp_column', '0', '1', ''])
# Show table status.
code, (stdout, stderr) = mysql_client.exec_run('''
mysql --protocol tcp -h {host} -P {port} default -u default
--password=123 -e "show table status like 'xx';"
'''.format(host=server_address, port=server_port), demux=True)
assert code == 0
# show variables.
code, (stdout, stderr) = mysql_client.exec_run('''
mysql --protocol tcp -h {host} -P {port} default -u default
--password=123 -e "show variables;"
'''.format(host=server_address, port=server_port), demux=True)
assert code == 0
# Kill query.
code, (stdout, stderr) = mysql_client.exec_run('''
mysql --protocol tcp -h {host} -P {port} default -u default
--password=123 -e "kill query 0;"
'''.format(host=server_address, port=server_port), demux=True)
assert code == 0
code, (stdout, stderr) = mysql_client.exec_run('''
mysql --protocol tcp -h {host} -P {port} default -u default
--password=123 -e "kill query where query_id='mysql:0';"
'''.format(host=server_address, port=server_port), demux=True)
assert code == 0
def test_mysql_federated(mysql_server, server_address):
# For some reason it occasionally fails without retries.
retries = 100