From 37ac45643947e2ecf085e0c840341e526a3b44f1 Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Mon, 6 Jul 2020 09:02:02 +0800 Subject: [PATCH 1/3] Support KILL QUERY [connection_id] for MySQL --- src/Server/MySQLHandler.cpp | 30 ++++++++++++++++++++++++++++-- src/Server/MySQLHandler.h | 1 + 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/src/Server/MySQLHandler.cpp b/src/Server/MySQLHandler.cpp index a97182f15fc..6892ebbd31a 100644 --- a/src/Server/MySQLHandler.cpp +++ b/src/Server/MySQLHandler.cpp @@ -103,7 +103,8 @@ void MySQLHandler::run() { if (!handshake_response.database.empty()) connection_context.setCurrentDatabase(handshake_response.database); - connection_context.setCurrentQueryId(""); + connection_context.setCurrentQueryId(Poco::format("mysql:%lu", connection_id)); + } catch (const Exception & exc) { @@ -295,6 +296,12 @@ void MySQLHandler::comQuery(ReadBuffer & payload) replacement_query = boost::replace_all_copy(query, "SHOW TABLE STATUS LIKE ", show_table_status_replacement_query); } + if (0 == strncasecmp("KILL QUERY", query.c_str(), 10)) + { + should_replace = true; + replacement_query = kill_connection_id_replacement_query(query); + } + if (0 == strncasecmp("SHOW VARIABLES", query.c_str(), 13)) { should_replace = true; @@ -379,7 +386,7 @@ const String MySQLHandler::show_table_status_replacement_query("SELECT" " 'Dynamic' AS Row_format," " 0 AS Rows," " 0 AS Avg_row_length," - " 0 AS Data_length," + " 0 AS Data_length," " 0 AS Max_data_length," " 0 AS Index_length," " 0 AS Data_free," @@ -394,4 +401,23 @@ const String MySQLHandler::show_table_status_replacement_query("SELECT" " FROM system.tables" " WHERE name LIKE "); +String MySQLHandler::kill_connection_id_replacement_query(const String & query) +{ + const String s = "KILL QUERY "; + + if (query.size() > s.size()) + { + String process_id = query.data() + s.length(); + + static const std::regex expr{"^[0-9]"}; + if (std::regex_match(process_id, expr)) + { + String replacement = Poco::format("KILL QUERY WHERE query_id = 'mysql:%s'", process_id); + return replacement; + } + } + return query; } + +} + diff --git a/src/Server/MySQLHandler.h b/src/Server/MySQLHandler.h index 5f506089493..41d4cc9a483 100644 --- a/src/Server/MySQLHandler.h +++ b/src/Server/MySQLHandler.h @@ -73,6 +73,7 @@ protected: private: static const String show_table_status_replacement_query; + String kill_connection_id_replacement_query(const String & query); }; #if USE_SSL From cb04c503d7a9acbe88154034ab20cb445f1d6938 Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Mon, 6 Jul 2020 10:07:38 +0800 Subject: [PATCH 2/3] Add MySQL to ClickHouse query replacement mapping table --- src/Server/MySQLHandler.cpp | 106 +++++++++++++++++++++--------------- src/Server/MySQLHandler.h | 5 +- 2 files changed, 65 insertions(+), 46 deletions(-) diff --git a/src/Server/MySQLHandler.cpp b/src/Server/MySQLHandler.cpp index 6892ebbd31a..9e42f5ebc05 100644 --- a/src/Server/MySQLHandler.cpp +++ b/src/Server/MySQLHandler.cpp @@ -45,6 +45,10 @@ namespace ErrorCodes extern const int SUPPORT_IS_DISABLED; } +static String select_empty_replacement_query(const String & query); +static String show_table_status_replacement_query(const String & query); +static String kill_connection_id_replacement_query(const String & query); + MySQLHandler::MySQLHandler(IServer & server_, const Poco::Net::StreamSocket & socket_, bool ssl_enabled, size_t connection_id_) : Poco::Net::TCPServerConnection(socket_) @@ -57,6 +61,10 @@ MySQLHandler::MySQLHandler(IServer & server_, const Poco::Net::StreamSocket & so server_capability_flags = CLIENT_PROTOCOL_41 | CLIENT_SECURE_CONNECTION | CLIENT_PLUGIN_AUTH | CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA | CLIENT_CONNECT_WITH_DB | CLIENT_DEPRECATE_EOF; if (ssl_enabled) server_capability_flags |= CLIENT_SSL; + + replacements.emplace("KILL QUERY", kill_connection_id_replacement_query); + replacements.emplace("SHOW TABLE STATUS LIKE", show_table_status_replacement_query); + replacements.emplace("SHOW VARIABLES", select_empty_replacement_query); } void MySQLHandler::run() @@ -285,26 +293,18 @@ void MySQLHandler::comQuery(ReadBuffer & payload) } else { - String replacement_query = "SELECT ''"; + String replacement_query; bool should_replace = false; bool with_output = false; - // This is a workaround in order to support adding ClickHouse to MySQL using federated server. - if (0 == strncasecmp("SHOW TABLE STATUS LIKE", query.c_str(), 22)) + for (auto const & x : replacements) { - should_replace = true; - replacement_query = boost::replace_all_copy(query, "SHOW TABLE STATUS LIKE ", show_table_status_replacement_query); - } - - if (0 == strncasecmp("KILL QUERY", query.c_str(), 10)) - { - should_replace = true; - replacement_query = kill_connection_id_replacement_query(query); - } - - if (0 == strncasecmp("SHOW VARIABLES", query.c_str(), 13)) - { - should_replace = true; + if (0 == strncasecmp(x.first.c_str(), query.c_str(), x.first.size())) + { + should_replace = true; + replacement_query = x.second(query); + break; + } } ReadBufferFromString replacement(replacement_query); @@ -379,40 +379,58 @@ static bool isFederatedServerSetupSetCommand(const String & query) return 1 == std::regex_match(query, expr); } -const String MySQLHandler::show_table_status_replacement_query("SELECT" - " name AS Name," - " engine AS Engine," - " '10' AS Version," - " 'Dynamic' AS Row_format," - " 0 AS Rows," - " 0 AS Avg_row_length," - " 0 AS Data_length," - " 0 AS Max_data_length," - " 0 AS Index_length," - " 0 AS Data_free," - " 'NULL' AS Auto_increment," - " metadata_modification_time AS Create_time," - " metadata_modification_time AS Update_time," - " metadata_modification_time AS Check_time," - " 'utf8_bin' AS Collation," - " 'NULL' AS Checksum," - " '' AS Create_options," - " '' AS Comment" - " FROM system.tables" - " WHERE name LIKE "); - -String MySQLHandler::kill_connection_id_replacement_query(const String & query) +/// Replace "[query(such as SHOW VARIABLES...)]" into "". +static String select_empty_replacement_query(const String & query) { - const String s = "KILL QUERY "; + std::ignore = query; + return "select ''"; +} - if (query.size() > s.size()) +/// Replace "SHOW TABLE STATUS LIKE 'xx'" into "SELECT ... FROM system.tables WHERE name LIKE 'xx'". +static String show_table_status_replacement_query(const String & query) +{ + const String prefix = "SHOW TABLE STATUS LIKE "; + if (query.size() > prefix.size()) { - String process_id = query.data() + s.length(); + String suffix = query.data() + prefix.length(); + return ( + "SELECT" + " name AS Name," + " engine AS Engine," + " '10' AS Version," + " 'Dynamic' AS Row_format," + " 0 AS Rows," + " 0 AS Avg_row_length," + " 0 AS Data_length," + " 0 AS Max_data_length," + " 0 AS Index_length," + " 0 AS Data_free," + " 'NULL' AS Auto_increment," + " metadata_modification_time AS Create_time," + " metadata_modification_time AS Update_time," + " metadata_modification_time AS Check_time," + " 'utf8_bin' AS Collation," + " 'NULL' AS Checksum," + " '' AS Create_options," + " '' AS Comment" + " FROM system.tables" + " WHERE name LIKE " + + suffix); + } + return query; +} +/// Replace "KILL QUERY [connection_id]" into "KILL QUERY WHERE query_id = 'mysql:[connection_id]'". +static String kill_connection_id_replacement_query(const String & query) +{ + const String prefix = "KILL QUERY "; + if (query.size() > prefix.size()) + { + String suffix = query.data() + prefix.length(); static const std::regex expr{"^[0-9]"}; - if (std::regex_match(process_id, expr)) + if (std::regex_match(suffix, expr)) { - String replacement = Poco::format("KILL QUERY WHERE query_id = 'mysql:%s'", process_id); + String replacement = Poco::format("KILL QUERY WHERE query_id = 'mysql:%s'", suffix); return replacement; } } diff --git a/src/Server/MySQLHandler.h b/src/Server/MySQLHandler.h index 41d4cc9a483..f7596850a8b 100644 --- a/src/Server/MySQLHandler.h +++ b/src/Server/MySQLHandler.h @@ -72,8 +72,9 @@ protected: bool secure_connection = false; private: - static const String show_table_status_replacement_query; - String kill_connection_id_replacement_query(const String & query); + using ReplacementFn = std::function; + using Replacements = std::unordered_map; + Replacements replacements; }; #if USE_SSL From 18c48ce12c9d86b1626c168a113993b32b3f131e Mon Sep 17 00:00:00 2001 From: BohuTANG Date: Mon, 6 Jul 2020 16:26:06 +0800 Subject: [PATCH 3/3] Add integration test for mysql replacement query --- tests/integration/test_mysql_protocol/test.py | 28 +++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/tests/integration/test_mysql_protocol/test.py b/tests/integration/test_mysql_protocol/test.py index 4ab225aee20..507445537b8 100644 --- a/tests/integration/test_mysql_protocol/test.py +++ b/tests/integration/test_mysql_protocol/test.py @@ -138,6 +138,34 @@ def test_mysql_client(mysql_client, server_address): assert stdout == '\n'.join(['column', '0', '0', '1', '1', '5', '5', 'tmp_column', '0', '1', '']) + # Show table status. + code, (stdout, stderr) = mysql_client.exec_run(''' + mysql --protocol tcp -h {host} -P {port} default -u default + --password=123 -e "show table status like 'xx';" + '''.format(host=server_address, port=server_port), demux=True) + assert code == 0 + + # show variables. + code, (stdout, stderr) = mysql_client.exec_run(''' + mysql --protocol tcp -h {host} -P {port} default -u default + --password=123 -e "show variables;" + '''.format(host=server_address, port=server_port), demux=True) + assert code == 0 + + # Kill query. + code, (stdout, stderr) = mysql_client.exec_run(''' + mysql --protocol tcp -h {host} -P {port} default -u default + --password=123 -e "kill query 0;" + '''.format(host=server_address, port=server_port), demux=True) + assert code == 0 + + code, (stdout, stderr) = mysql_client.exec_run(''' + mysql --protocol tcp -h {host} -P {port} default -u default + --password=123 -e "kill query where query_id='mysql:0';" + '''.format(host=server_address, port=server_port), demux=True) + assert code == 0 + + def test_mysql_federated(mysql_server, server_address): # For some reason it occasionally fails without retries. retries = 100