Merge pull request #59293 from slvrtrn/reapply-net-read-write-mysql

Re-apply MySQL net write/read settings changeset
This commit is contained in:
Alexey Milovidov 2024-01-30 06:37:09 +01:00 committed by GitHub
commit 5cf1242a72
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 195 additions and 118 deletions

View File

@ -57,16 +57,109 @@ namespace ErrorCodes
extern const int UNSUPPORTED_METHOD;
}
static const size_t PACKET_HEADER_SIZE = 4;
static const size_t SSL_REQUEST_PAYLOAD_SIZE = 32;
static String showWarningsReplacementQuery(const String & query);
static String showCountWarningsReplacementQuery(const String & query);
static String selectEmptyReplacementQuery(const String & query);
static String showTableStatusReplacementQuery(const String & query);
static String killConnectionIdReplacementQuery(const String & query);
static String selectLimitReplacementQuery(const String & query);
static bool checkShouldReplaceQuery(const String & query, const String & prefix)
{
return query.length() >= prefix.length()
&& std::equal(prefix.begin(), prefix.end(), query.begin(), [](char a, char b) { return std::tolower(a) == std::tolower(b); });
}
static bool isFederatedServerSetupSetCommand(const String & query)
{
re2::RE2::Options regexp_options;
regexp_options.set_case_sensitive(false);
static const re2::RE2 expr(
"(^(SET NAMES(.*)))"
"|(^(SET character_set_results(.*)))"
"|(^(SET FOREIGN_KEY_CHECKS(.*)))"
"|(^(SET AUTOCOMMIT(.*)))"
"|(^(SET sql_mode(.*)))"
"|(^(SET @@(.*)))"
"|(^(SET SESSION TRANSACTION ISOLATION LEVEL(.*)))", regexp_options);
assert(expr.ok());
return re2::RE2::FullMatch(query, expr);
}
/// Always return an empty set with appropriate column definitions for SHOW WARNINGS queries
/// See also: https://dev.mysql.com/doc/refman/8.0/en/show-warnings.html
static String showWarningsReplacementQuery([[maybe_unused]] const String & query)
{
return "SELECT '' AS Level, 0::UInt32 AS Code, '' AS Message WHERE false";
}
static String showCountWarningsReplacementQuery([[maybe_unused]] const String & query)
{
return "SELECT 0::UInt64 AS `@@session.warning_count`";
}
/// Replace "[query(such as SHOW VARIABLES...)]" into "".
static String selectEmptyReplacementQuery(const String & query)
{
std::ignore = query;
return "select ''";
}
/// Replace "SHOW TABLE STATUS LIKE 'xx'" into "SELECT ... FROM system.tables WHERE name LIKE 'xx'".
static String showTableStatusReplacementQuery(const String & query)
{
const String prefix = "SHOW TABLE STATUS LIKE ";
if (query.size() > prefix.size())
{
String suffix = query.data() + prefix.length();
return (
"SELECT"
" name AS Name,"
" engine AS Engine,"
" '10' AS Version,"
" 'Dynamic' AS Row_format,"
" 0 AS Rows,"
" 0 AS Avg_row_length,"
" 0 AS Data_length,"
" 0 AS Max_data_length,"
" 0 AS Index_length,"
" 0 AS Data_free,"
" 'NULL' AS Auto_increment,"
" metadata_modification_time AS Create_time,"
" metadata_modification_time AS Update_time,"
" metadata_modification_time AS Check_time,"
" 'utf8_bin' AS Collation,"
" 'NULL' AS Checksum,"
" '' AS Create_options,"
" '' AS Comment"
" FROM system.tables"
" WHERE name LIKE "
+ suffix);
}
return query;
}
static std::optional<String> setSettingReplacementQuery(const String & query, const String & mysql_setting, const String & clickhouse_setting)
{
const String prefix = "SET " + mysql_setting;
// if (query.length() >= prefix.length() && boost::iequals(std::string_view(prefix), std::string_view(query.data(), 3)))
if (checkShouldReplaceQuery(query, prefix))
return "SET " + clickhouse_setting + String(query.data() + prefix.length());
return std::nullopt;
}
/// Replace "KILL QUERY [connection_id]" into "KILL QUERY WHERE query_id LIKE 'mysql:[connection_id]:xxx'".
static String killConnectionIdReplacementQuery(const String & query)
{
const String prefix = "KILL QUERY ";
if (query.size() > prefix.size())
{
String suffix = query.data() + prefix.length();
static const re2::RE2 expr("^[0-9]");
if (re2::RE2::FullMatch(suffix, expr))
{
String replacement = fmt::format("KILL QUERY WHERE query_id LIKE 'mysql:{}:%'", suffix);
return replacement;
}
}
return query;
}
MySQLHandler::MySQLHandler(
IServer & server_,
@ -88,12 +181,14 @@ MySQLHandler::MySQLHandler(
if (ssl_enabled)
server_capabilities |= CLIENT_SSL;
replacements.emplace("SHOW WARNINGS", showWarningsReplacementQuery);
replacements.emplace("SHOW COUNT(*) WARNINGS", showCountWarningsReplacementQuery);
replacements.emplace("KILL QUERY", killConnectionIdReplacementQuery);
replacements.emplace("SHOW TABLE STATUS LIKE", showTableStatusReplacementQuery);
replacements.emplace("SHOW VARIABLES", selectEmptyReplacementQuery);
replacements.emplace("SET SQL_SELECT_LIMIT", selectLimitReplacementQuery);
queries_replacements.emplace("SHOW WARNINGS", showWarningsReplacementQuery);
queries_replacements.emplace("SHOW COUNT(*) WARNINGS", showCountWarningsReplacementQuery);
queries_replacements.emplace("KILL QUERY", killConnectionIdReplacementQuery);
queries_replacements.emplace("SHOW TABLE STATUS LIKE", showTableStatusReplacementQuery);
queries_replacements.emplace("SHOW VARIABLES", selectEmptyReplacementQuery);
settings_replacements.emplace("SQL_SELECT_LIMIT", "limit");
settings_replacements.emplace("NET_WRITE_TIMEOUT", "send_timeout");
settings_replacements.emplace("NET_READ_TIMEOUT", "receive_timeout");
}
void MySQLHandler::run()
@ -324,8 +419,6 @@ void MySQLHandler::comPing()
packet_endpoint->sendPacket(OKPacket(0x0, client_capabilities, 0, 0, 0), true);
}
static bool isFederatedServerSetupSetCommand(const String & query);
void MySQLHandler::comQuery(ReadBuffer & payload, bool binary_protocol)
{
String query = String(payload.position(), payload.buffer().end());
@ -342,17 +435,29 @@ void MySQLHandler::comQuery(ReadBuffer & payload, bool binary_protocol)
bool should_replace = false;
bool with_output = false;
for (auto const & x : replacements)
// Queries replacements
for (auto const & [query_to_replace, replacement_fn] : queries_replacements)
{
if (0 == strncasecmp(x.first.c_str(), query.c_str(), x.first.size()))
if (checkShouldReplaceQuery(query, query_to_replace))
{
should_replace = true;
replacement_query = x.second(query);
replacement_query = replacement_fn(query);
break;
}
}
ReadBufferFromString replacement(replacement_query);
// Settings replacements
if (!should_replace)
for (auto const & [mysql_setting, clickhouse_setting] : settings_replacements)
{
const auto replacement_query_opt = setSettingReplacementQuery(query, mysql_setting, clickhouse_setting);
if (replacement_query_opt.has_value())
{
should_replace = true;
replacement_query = replacement_query_opt.value();
break;
}
}
auto query_context = session->makeQueryContext();
query_context->setCurrentQueryId(fmt::format("mysql:{}:{}", connection_id, toString(UUIDHelpers::generateV4())));
@ -385,7 +490,14 @@ void MySQLHandler::comQuery(ReadBuffer & payload, bool binary_protocol)
}
};
executeQuery(should_replace ? replacement : payload, *out, false, query_context, set_result_details, QueryFlags{}, format_settings);
if (should_replace)
{
ReadBufferFromString replacement(replacement_query);
executeQuery(replacement, *out, false, query_context, set_result_details, QueryFlags{}, format_settings);
}
else
executeQuery(payload, *out, false, query_context, set_result_details, QueryFlags{}, format_settings);
if (!with_output)
packet_endpoint->sendPacket(OKPacket(0x00, client_capabilities, affected_rows, 0, 0), true);
@ -531,99 +643,4 @@ void MySQLHandlerSSL::finishHandshakeSSL(
}
#endif
static bool isFederatedServerSetupSetCommand(const String & query)
{
re2::RE2::Options regexp_options;
regexp_options.set_case_sensitive(false);
static const re2::RE2 expr(
"(^(SET NAMES(.*)))"
"|(^(SET character_set_results(.*)))"
"|(^(SET FOREIGN_KEY_CHECKS(.*)))"
"|(^(SET AUTOCOMMIT(.*)))"
"|(^(SET sql_mode(.*)))"
"|(^(SET @@(.*)))"
"|(^(SET SESSION TRANSACTION ISOLATION LEVEL(.*)))", regexp_options);
assert(expr.ok());
return re2::RE2::FullMatch(query, expr);
}
/// Always return an empty set with appropriate column definitions for SHOW WARNINGS queries
/// See also: https://dev.mysql.com/doc/refman/8.0/en/show-warnings.html
static String showWarningsReplacementQuery([[maybe_unused]] const String & query)
{
return "SELECT '' AS Level, 0::UInt32 AS Code, '' AS Message WHERE false";
}
static String showCountWarningsReplacementQuery([[maybe_unused]] const String & query)
{
return "SELECT 0::UInt64 AS `@@session.warning_count`";
}
/// Replace "[query(such as SHOW VARIABLES...)]" into "".
static String selectEmptyReplacementQuery(const String & query)
{
std::ignore = query;
return "select ''";
}
/// Replace "SHOW TABLE STATUS LIKE 'xx'" into "SELECT ... FROM system.tables WHERE name LIKE 'xx'".
static String showTableStatusReplacementQuery(const String & query)
{
const String prefix = "SHOW TABLE STATUS LIKE ";
if (query.size() > prefix.size())
{
String suffix = query.data() + prefix.length();
return (
"SELECT"
" name AS Name,"
" engine AS Engine,"
" '10' AS Version,"
" 'Dynamic' AS Row_format,"
" 0 AS Rows,"
" 0 AS Avg_row_length,"
" 0 AS Data_length,"
" 0 AS Max_data_length,"
" 0 AS Index_length,"
" 0 AS Data_free,"
" 'NULL' AS Auto_increment,"
" metadata_modification_time AS Create_time,"
" metadata_modification_time AS Update_time,"
" metadata_modification_time AS Check_time,"
" 'utf8_bin' AS Collation,"
" 'NULL' AS Checksum,"
" '' AS Create_options,"
" '' AS Comment"
" FROM system.tables"
" WHERE name LIKE "
+ suffix);
}
return query;
}
static String selectLimitReplacementQuery(const String & query)
{
const String prefix = "SET SQL_SELECT_LIMIT";
if (query.starts_with(prefix))
return "SET limit" + std::string(query.data() + prefix.length());
return query;
}
/// Replace "KILL QUERY [connection_id]" into "KILL QUERY WHERE query_id LIKE 'mysql:[connection_id]:xxx'".
static String killConnectionIdReplacementQuery(const String & query)
{
const String prefix = "KILL QUERY ";
if (query.size() > prefix.size())
{
String suffix = query.data() + prefix.length();
static const re2::RE2 expr("^[0-9]");
if (re2::RE2::FullMatch(suffix, expr))
{
String replacement = fmt::format("KILL QUERY WHERE query_id LIKE 'mysql:{}:%'", suffix);
return replacement;
}
}
return query;
}
}

View File

@ -92,9 +92,13 @@ protected:
MySQLProtocol::PacketEndpointPtr packet_endpoint;
std::unique_ptr<Session> session;
using ReplacementFn = std::function<String(const String & query)>;
using Replacements = std::unordered_map<std::string, ReplacementFn>;
Replacements replacements;
using QueryReplacementFn = std::function<String(const String & query)>;
using QueriesReplacements = std::unordered_map<std::string, QueryReplacementFn>;
QueriesReplacements queries_replacements;
/// MySQL setting name --> ClickHouse setting name
using SettingsReplacements = std::unordered_map<std::string, std::string>;
SettingsReplacements settings_replacements;
std::mutex prepared_statements_mutex;
UInt32 current_prepared_statement_id TSA_GUARDED_BY(prepared_statements_mutex) = 0;

View File

@ -0,0 +1,23 @@
-- Init
s
a
b
c
d
-- Uppercase setting name
s
a
b
name value
send_timeout 22
name value
receive_timeout 33
-- Lowercase setting name
s
a
b
c
name value
send_timeout 55
name value
receive_timeout 66

View File

@ -0,0 +1,33 @@
#!/usr/bin/env bash
# Tags: no-fasttest
# Tag no-fasttest: requires mysql client
# Tests that certain MySQL-proprietary settings are mapped to ClickHouse-native settings.
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
CHANGED_SETTINGS_QUERY="SELECT name, value FROM system.settings WHERE name IN ('send_timeout', 'receive_timeout') AND changed;"
TEST_TABLE="mysql_settings_override_test"
DROP_TABLE="DROP TABLE IF EXISTS $TEST_TABLE;"
CREATE_TABLE="CREATE TABLE $TEST_TABLE (s String) ENGINE MergeTree ORDER BY s;"
INSERT_STMT="INSERT INTO $TEST_TABLE VALUES ('a'), ('b'), ('c'), ('d');"
SELECT_STMT="SELECT * FROM $TEST_TABLE ORDER BY s;"
echo "-- Init"
${MYSQL_CLIENT} --execute "$DROP_TABLE $CREATE_TABLE $INSERT_STMT $SELECT_STMT" # should fetch all 4 records
echo "-- Uppercase setting name"
${MYSQL_CLIENT} --execute "SET SQL_SELECT_LIMIT = 2; $SELECT_STMT" # should fetch 2 records out of 4
${MYSQL_CLIENT} --execute "SET NET_WRITE_TIMEOUT = 22; $CHANGED_SETTINGS_QUERY"
${MYSQL_CLIENT} --execute "SET NET_READ_TIMEOUT = 33; $CHANGED_SETTINGS_QUERY"
echo "-- Lowercase setting name"
${MYSQL_CLIENT} --execute "set sql_select_limit=3; $SELECT_STMT" # should fetch 3 records out of 4
${MYSQL_CLIENT} --execute "set net_write_timeout=55; $CHANGED_SETTINGS_QUERY"
${MYSQL_CLIENT} --execute "set net_read_timeout=66; $CHANGED_SETTINGS_QUERY"
${MYSQL_CLIENT} --execute "$DROP_TABLE"