diff --git a/src/Server/MySQLHandler.cpp b/src/Server/MySQLHandler.cpp index a97182f15fc..9e42f5ebc05 100644 --- a/src/Server/MySQLHandler.cpp +++ b/src/Server/MySQLHandler.cpp @@ -45,6 +45,10 @@ namespace ErrorCodes extern const int SUPPORT_IS_DISABLED; } +static String select_empty_replacement_query(const String & query); +static String show_table_status_replacement_query(const String & query); +static String kill_connection_id_replacement_query(const String & query); + MySQLHandler::MySQLHandler(IServer & server_, const Poco::Net::StreamSocket & socket_, bool ssl_enabled, size_t connection_id_) : Poco::Net::TCPServerConnection(socket_) @@ -57,6 +61,10 @@ MySQLHandler::MySQLHandler(IServer & server_, const Poco::Net::StreamSocket & so server_capability_flags = CLIENT_PROTOCOL_41 | CLIENT_SECURE_CONNECTION | CLIENT_PLUGIN_AUTH | CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA | CLIENT_CONNECT_WITH_DB | CLIENT_DEPRECATE_EOF; if (ssl_enabled) server_capability_flags |= CLIENT_SSL; + + replacements.emplace("KILL QUERY", kill_connection_id_replacement_query); + replacements.emplace("SHOW TABLE STATUS LIKE", show_table_status_replacement_query); + replacements.emplace("SHOW VARIABLES", select_empty_replacement_query); } void MySQLHandler::run() @@ -103,7 +111,8 @@ void MySQLHandler::run() { if (!handshake_response.database.empty()) connection_context.setCurrentDatabase(handshake_response.database); - connection_context.setCurrentQueryId(""); + connection_context.setCurrentQueryId(Poco::format("mysql:%lu", connection_id)); + } catch (const Exception & exc) { @@ -284,20 +293,18 @@ void MySQLHandler::comQuery(ReadBuffer & payload) } else { - String replacement_query = "SELECT ''"; + String replacement_query; bool should_replace = false; bool with_output = false; - // This is a workaround in order to support adding ClickHouse to MySQL using federated server. - if (0 == strncasecmp("SHOW TABLE STATUS LIKE", query.c_str(), 22)) + for (auto const & x : replacements) { - should_replace = true; - replacement_query = boost::replace_all_copy(query, "SHOW TABLE STATUS LIKE ", show_table_status_replacement_query); - } - - if (0 == strncasecmp("SHOW VARIABLES", query.c_str(), 13)) - { - should_replace = true; + if (0 == strncasecmp(x.first.c_str(), query.c_str(), x.first.size())) + { + should_replace = true; + replacement_query = x.second(query); + break; + } } ReadBufferFromString replacement(replacement_query); @@ -372,26 +379,63 @@ static bool isFederatedServerSetupSetCommand(const String & query) return 1 == std::regex_match(query, expr); } -const String MySQLHandler::show_table_status_replacement_query("SELECT" - " name AS Name," - " engine AS Engine," - " '10' AS Version," - " 'Dynamic' AS Row_format," - " 0 AS Rows," - " 0 AS Avg_row_length," - " 0 AS Data_length," - " 0 AS Max_data_length," - " 0 AS Index_length," - " 0 AS Data_free," - " 'NULL' AS Auto_increment," - " metadata_modification_time AS Create_time," - " metadata_modification_time AS Update_time," - " metadata_modification_time AS Check_time," - " 'utf8_bin' AS Collation," - " 'NULL' AS Checksum," - " '' AS Create_options," - " '' AS Comment" - " FROM system.tables" - " WHERE name LIKE "); +/// Replace "[query(such as SHOW VARIABLES...)]" into "". +static String select_empty_replacement_query(const String & query) +{ + std::ignore = query; + return "select ''"; +} + +/// Replace "SHOW TABLE STATUS LIKE 'xx'" into "SELECT ... FROM system.tables WHERE name LIKE 'xx'". +static String show_table_status_replacement_query(const String & query) +{ + const String prefix = "SHOW TABLE STATUS LIKE "; + if (query.size() > prefix.size()) + { + String suffix = query.data() + prefix.length(); + return ( + "SELECT" + " name AS Name," + " engine AS Engine," + " '10' AS Version," + " 'Dynamic' AS Row_format," + " 0 AS Rows," + " 0 AS Avg_row_length," + " 0 AS Data_length," + " 0 AS Max_data_length," + " 0 AS Index_length," + " 0 AS Data_free," + " 'NULL' AS Auto_increment," + " metadata_modification_time AS Create_time," + " metadata_modification_time AS Update_time," + " metadata_modification_time AS Check_time," + " 'utf8_bin' AS Collation," + " 'NULL' AS Checksum," + " '' AS Create_options," + " '' AS Comment" + " FROM system.tables" + " WHERE name LIKE " + + suffix); + } + return query; +} + +/// Replace "KILL QUERY [connection_id]" into "KILL QUERY WHERE query_id = 'mysql:[connection_id]'". +static String kill_connection_id_replacement_query(const String & query) +{ + const String prefix = "KILL QUERY "; + if (query.size() > prefix.size()) + { + String suffix = query.data() + prefix.length(); + static const std::regex expr{"^[0-9]"}; + if (std::regex_match(suffix, expr)) + { + String replacement = Poco::format("KILL QUERY WHERE query_id = 'mysql:%s'", suffix); + return replacement; + } + } + return query; +} } + diff --git a/src/Server/MySQLHandler.h b/src/Server/MySQLHandler.h index 5f506089493..f7596850a8b 100644 --- a/src/Server/MySQLHandler.h +++ b/src/Server/MySQLHandler.h @@ -72,7 +72,9 @@ protected: bool secure_connection = false; private: - static const String show_table_status_replacement_query; + using ReplacementFn = std::function; + using Replacements = std::unordered_map; + Replacements replacements; }; #if USE_SSL diff --git a/tests/integration/test_mysql_protocol/test.py b/tests/integration/test_mysql_protocol/test.py index 4ab225aee20..507445537b8 100644 --- a/tests/integration/test_mysql_protocol/test.py +++ b/tests/integration/test_mysql_protocol/test.py @@ -138,6 +138,34 @@ def test_mysql_client(mysql_client, server_address): assert stdout == '\n'.join(['column', '0', '0', '1', '1', '5', '5', 'tmp_column', '0', '1', '']) + # Show table status. + code, (stdout, stderr) = mysql_client.exec_run(''' + mysql --protocol tcp -h {host} -P {port} default -u default + --password=123 -e "show table status like 'xx';" + '''.format(host=server_address, port=server_port), demux=True) + assert code == 0 + + # show variables. + code, (stdout, stderr) = mysql_client.exec_run(''' + mysql --protocol tcp -h {host} -P {port} default -u default + --password=123 -e "show variables;" + '''.format(host=server_address, port=server_port), demux=True) + assert code == 0 + + # Kill query. + code, (stdout, stderr) = mysql_client.exec_run(''' + mysql --protocol tcp -h {host} -P {port} default -u default + --password=123 -e "kill query 0;" + '''.format(host=server_address, port=server_port), demux=True) + assert code == 0 + + code, (stdout, stderr) = mysql_client.exec_run(''' + mysql --protocol tcp -h {host} -P {port} default -u default + --password=123 -e "kill query where query_id='mysql:0';" + '''.format(host=server_address, port=server_port), demux=True) + assert code == 0 + + def test_mysql_federated(mysql_server, server_address): # For some reason it occasionally fails without retries. retries = 100