MaterializedMySQL: Support unquoted utf-8 strings in DDL

Since ClickHouse does not support unquoted utf-8 strings but MySQL does.

Instead of fixing Lexer to recognize utf-8 chars as TokenType::BareWord,
suggesting to quote all unrecognized tokens before applying any DDL.

Actual parsing and validating the syntax will be done by particular Parser.

If there is any TokenType::Error, the query is unable to be parsed anyway.
Quoting such tokens can provide the support of utf-8 names.

See `tryQuoteUnrecognizedTokens` and `QuoteUnrecognizedTokensTest`.

mysql> CREATE TABLE 道.渠(...

is converted to

CREATE TABLE `道`.`渠`(...

Also fixed the bug with missing * while doing SELECT in full sync because db or table name are back quoted when not needed.
This commit is contained in:
Val Doroshchuk 2023-07-19 12:53:27 +02:00 committed by Valentyn Doroshchuk WX1158589
parent b225f9c34b
commit efa638ef3c
9 changed files with 542 additions and 13 deletions

View File

@ -44,4 +44,15 @@ String backQuoteIfNeed(StringRef x)
return res;
}
String backQuoteMySQL(StringRef x)
{
String res(x.size, '\0');
{
WriteBufferFromString wb(res);
writeBackQuotedStringMySQL(x, wb);
}
return res;
}
}

View File

@ -24,4 +24,7 @@ String backQuote(StringRef x);
/// Quote the identifier with backquotes, if required.
String backQuoteIfNeed(StringRef x);
/// Quote the identifier with backquotes, for use in MySQL queries.
String backQuoteMySQL(StringRef x);
}

View File

@ -4,6 +4,7 @@
#include <Databases/MySQL/MaterializedMySQLSyncThread.h>
#include <Databases/MySQL/tryParseTableIDFromDDL.h>
#include <Databases/MySQL/tryQuoteUnrecognizedTokens.h>
#include <cstdlib>
#include <random>
#include <string_view>
@ -342,9 +343,8 @@ static inline String rewriteMysqlQueryColumn(mysqlxx::Pool::Entry & connection,
{ std::make_shared<DataTypeString>(), "column_type" }
};
const String & query = "SELECT COLUMN_NAME AS column_name, COLUMN_TYPE AS column_type FROM INFORMATION_SCHEMA.COLUMNS"
" WHERE TABLE_SCHEMA = '" + backQuoteIfNeed(database_name) +
"' AND TABLE_NAME = '" + backQuoteIfNeed(table_name) + "' ORDER BY ORDINAL_POSITION";
String query = "SELECT COLUMN_NAME AS column_name, COLUMN_TYPE AS column_type FROM INFORMATION_SCHEMA.COLUMNS"
" WHERE TABLE_SCHEMA = '" + database_name + "' AND TABLE_NAME = '" + table_name + "' ORDER BY ORDINAL_POSITION";
StreamSettings mysql_input_stream_settings(global_settings, false, true);
auto mysql_source = std::make_unique<MySQLSource>(connection, query, tables_columns_sample_block, mysql_input_stream_settings);
@ -812,6 +812,7 @@ void MaterializedMySQLSyncThread::executeDDLAtomic(const QueryEvent & query_even
CurrentThread::QueryScope query_scope(query_context);
String query = query_event.query;
tryQuoteUnrecognizedTokens(query, query);
if (!materialized_tables_list.empty())
{
auto table_id = tryParseTableIDFromDDL(query, query_event.schema);

View File

@ -0,0 +1,289 @@
#include <gtest/gtest.h>
#include <Databases/MySQL/tryQuoteUnrecognizedTokens.h>
using namespace DB;
struct TestCase
{
String query;
String res;
bool ok;
TestCase(
const String & query_,
const String & res_,
bool ok_)
: query(query_)
, res(res_)
, ok(ok_)
{
}
};
std::ostream & operator<<(std::ostream & ostr, const TestCase & test_case)
{
return ostr << '"' << test_case.query << "\" -> \"" << test_case.res << "\" ok:" << test_case.ok;
}
class QuoteUnrecognizedTokensTest : public ::testing::TestWithParam<TestCase>
{
};
TEST_P(QuoteUnrecognizedTokensTest, escape)
{
const auto & [query, expected, ok] = GetParam();
String actual;
bool res = tryQuoteUnrecognizedTokens(query, actual);
EXPECT_EQ(ok, res);
EXPECT_EQ(expected, actual);
}
INSTANTIATE_TEST_SUITE_P(MaterializedMySQL, QuoteUnrecognizedTokensTest, ::testing::ValuesIn(std::initializer_list<TestCase>{
{
"",
"",
false
},
{
"test '\"`",
"",
false
},
{
"SELECT * FROM db.`table`",
"",
false
},
{
"道渠",
"`道渠`",
true
},
{
"",
"`道`",
true
},
{
"道道(skip) 道(",
"`道道`(skip) `道`(",
true
},
{
"`道渠`",
"",
false
},
{
"'道'",
"",
false
},
{
"\"\"",
"",
false
},
{
"` 道 test 渠 `",
"",
false
},
{
"skip 道 skip 123",
"skip `道` skip 123",
true
},
{
"skip 123 `道` skip",
"",
false
},
{
"skip `道 skip 123",
"",
false
},
{
"skip test道 skip",
"skip `test道` skip",
true
},
{
"test道2test",
"`test道2test`",
true
},
{
"skip test道2test 123",
"skip `test道2test` 123",
true
},
{
"skip 您a您a您a a您a您a您a 1您2您3您4 skip",
"skip `您a您a您a` `a您a您a您a` `1您2您3您4` skip",
true
},
{
"skip 您a 您a您a b您2您c您4 skip",
"skip `您a` `您a您a` `b您2您c您4` skip",
true
},
{
"123您a skip 56_您a 您a2 b_您2_您c123您_a4 skip",
"`123您a` skip `56_您a` `您a2` `b_您2_您c123您_a4` skip",
true
},
{
"_您_ 123 skip 56_您_您_您_您_您_您_您_您_您_a 您a2 abc 123_您_您_321 a1b2c3 aaaaa您您_a4 skip",
"`_您_` 123 skip `56_您_您_您_您_您_您_您_您_您_a` `您a2` abc `123_您_您_321` a1b2c3 `aaaaa您您_a4` skip",
true
},
{
"TABLE 您2 您(",
"TABLE `您2` `您`(",
true
},
{
"TABLE 您.a您2(日2日2 INT",
"TABLE `您`.`a您2`(`日2日2` INT",
true
},
{
"TABLE 您$.a_您2a_($日2日_2 INT, 您Hi好 a您b好c)",
"TABLE `您`$.`a_您2a_`($`日2日_2` INT, `您Hi好` `a您b好c`)",
true
},
{
"TABLE 您a日.您a您a您a(test INT",
"TABLE `您a日`.`您a您a您a`(test INT",
true
},
{
"TABLE 您a日.您a您a您a(Hi您Hi好Hi INT",
"TABLE `您a日`.`您a您a您a`(`Hi您Hi好Hi` INT",
true
},
{
"--TABLE 您a日.您a您a您a(test INT",
"",
false
},
{
"--您a日.您a您a您a(\n您Hi好",
"--您a日.您a您a您a(\n`您Hi好`",
true
},
{
" /* TABLE 您a日.您a您a您a(test INT",
"",
false
},
{
"/*您a日.您a您a您a(*/\n您Hi好",
"/*您a日.您a您a您a(*/\n`您Hi好`",
true
},
{
" 您a日.您您aa您a /* 您a日.您a您a您a */ a您a日a.a您您您a",
" `您a日`.`您您aa您a` /* 您a日.您a您a您a */ `a您a日a`.`a您您您a`",
true
},
//{ TODO
// "TABLE 您2.您a您a您a(test INT",
// "TABLE `您2`.`您a您a您a`(test INT",
// true
//},
{
"skip 您a您a您a skip",
"skip `您a您a您a` skip",
true
},
{
"test 您a2您3a您a 4 again",
"test `您a2您3a您a` 4 again",
true
},
{
"CREATE TABLE db.`道渠`",
"",
false
},
{
"CREATE TABLE db.`道渠",
"",
false
},
{
"CREATE TABLE db.道渠",
"CREATE TABLE db.`道渠`",
true
},
{
"CREATE TABLE db. 道渠",
"CREATE TABLE db. `道渠`",
true
},
{
R"sql(
CREATE TABLE gb2312.`` ( `id` int NOT NULL,
INT,
DATETIME,
test INT, test您 INT, test您test INT,
test INT, test道渠 INT, test道渠test INT,
_ INT, _您 INT, _您_ INT,
__ INT, __您您 INT, __您您__ INT,
2 INT, 2 INT, 22 INT,
22 INT, 22 INT, 2222 INT,
_2 INT, _2您 INT, _2您_2 INT, _2您2_ INT, 2_您_2 INT,
__22 INT, __22您您 INT, __22您您__22 INT, __22您您22__ INT, 22__您您__22 INT,
2_ INT, 2_您 INT, 2_您2_ INT,
22__ INT, 22__您您 INT, 22__您您22__ INT,
_test INT, _test您 INT, _test您_test INT, _test您test_ INT, test_您test_ INT, test_您_test INT,
_test INT, _test您您 INT, _test您您_test INT, _test您您test_ INT, test_您您test_ INT, test_您您_test INT,
test3 INT, test3您 INT, test3您test3 INT, test3您3test INT,
test3 INT, test3您您 INT, test3您您test3 INT, test3您您3test INT,
3test INT, 3test您 INT, 3test您3test INT, 3test您test3 INT,
3test INT, 3test您您 INT, 3test您您3test INT, 3test您您test3 INT,
_test4 INT, _test4您 INT, _test4您_test4 INT, test4_您_test4 INT, _test4您4test_ INT, _test4您test4_ INT,
_test4 INT, _test4您您 INT, _test4您您_test4 INT, test4_您您_test4 INT, _test4您您4test_ INT, _test4您您test4_ INT,
_5test INT, _5test您 INT, _5test您_5test INT, 5test_您_test5 INT, _4test您test4_ INT,
test_日期 varchar(256), test_道_2 varchar(256) NOT NULL ,
test_道渠您_3
BIGINT NOT NULL,
3_test INT,
PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=gb2312;
)sql",
R"sql(
CREATE TABLE gb2312.`` ( `id` int NOT NULL,
`` INT,
`` DATETIME,
`test` INT, `test您` INT, `test您test` INT,
`test` INT, `test道渠` INT, `test道渠test` INT,
`_` INT, `_您` INT, `_您_` INT,
`__` INT, `__您您` INT, `__您您__` INT,
`2` INT, `2` INT, `22` INT,
`22` INT, `22` INT, `2222` INT,
`_2` INT, `_2您` INT, `_2您_2` INT, `_2您2_` INT, `2_您_2` INT,
`__22` INT, `__22您您` INT, `__22您您__22` INT, `__22您您22__` INT, `22__您您__22` INT,
`2_` INT, `2_您` INT, `2_您2_` INT,
`22__` INT, `22__您您` INT, `22__您您22__` INT,
`_test` INT, `_test您` INT, `_test您_test` INT, `_test您test_` INT, `test_您test_` INT, `test_您_test` INT,
`_test` INT, `_test您您` INT, `_test您您_test` INT, `_test您您test_` INT, `test_您您test_` INT, `test_您您_test` INT,
`test3` INT, `test3您` INT, `test3您test3` INT, `test3您3test` INT,
`test3` INT, `test3您您` INT, `test3您您test3` INT, `test3您您3test` INT,
`3test` INT, `3test您` INT, `3test您3test` INT, `3test您test3` INT,
`3test` INT, `3test您您` INT, `3test您您3test` INT, `3test您您test3` INT,
`_test4` INT, `_test4您` INT, `_test4您_test4` INT, `test4_您_test4` INT, `_test4您4test_` INT, `_test4您test4_` INT,
`_test4` INT, `_test4您您` INT, `_test4您您_test4` INT, `test4_您您_test4` INT, `_test4您您4test_` INT, `_test4您您test4_` INT,
`_5test` INT, `_5test您` INT, `_5test您_5test` INT, `5test_您_test5` INT, `_4test您test4_` INT,
`test_日期` varchar(256), `test_道_2` varchar(256) NOT NULL ,
`test_道渠您_3`
BIGINT NOT NULL,
`3_test` INT,
PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=gb2312;
)sql",
true
},
}));

View File

@ -0,0 +1,96 @@
#include <Databases/MySQL/tryQuoteUnrecognizedTokens.h>
#include <Parsers/CommonParsers.h>
#include <Common/quoteString.h>
namespace DB
{
/// Checks if there are no any tokens (like whitespaces) between current and previous pos
static bool noWhitespaces(const char * to, const char * from)
{
return static_cast<size_t>(from - to) == 0;
}
/// Checks if the token should be quoted too together with unrecognized
static bool isWordOrNumber(TokenType type)
{
return type == TokenType::BareWord || type == TokenType::Number;
}
static void quoteLiteral(
IParser::Pos & pos,
IParser::Pos & pos_prev,
const char *& pos_unrecognized,
const char *& copy_from,
String & rewritten_query)
{
/// Copy also whitespaces if any
const auto * end =
isWordOrNumber(pos->type) && noWhitespaces(pos_prev->end, pos->begin)
? pos->end
: pos_prev->end;
String literal(pos_unrecognized, static_cast<size_t>(end - pos_unrecognized));
rewritten_query.append(copy_from, pos_unrecognized - copy_from).append(backQuoteMySQL(literal));
copy_from = end;
}
bool tryQuoteUnrecognizedTokens(const String & query, String & res)
{
Tokens tokens(query.data(), query.data() + query.size());
IParser::Pos pos(tokens, 0);
Expected expected;
String rewritten_query;
const char * copy_from = query.data();
auto pos_prev = pos;
const char * pos_unrecognized = nullptr;
for (;pos->type != TokenType::EndOfStream; ++pos)
{
/// Commit quotes if any whitespaces found or the token is not a word
bool commit = !noWhitespaces(pos_prev->end, pos->begin) || (pos->type != TokenType::Error && !isWordOrNumber(pos->type));
if (pos_unrecognized && commit)
{
quoteLiteral(
pos,
pos_prev,
pos_unrecognized,
copy_from,
rewritten_query);
pos_unrecognized = nullptr;
}
if (pos->type == TokenType::Error)
{
/// Find first appearance of the error token
if (!pos_unrecognized)
{
pos_unrecognized =
isWordOrNumber(pos_prev->type) && noWhitespaces(pos_prev->end, pos->begin)
? pos_prev->begin
: pos->begin;
}
}
pos_prev = pos;
}
/// There was EndOfStream but not committed unrecognized token
if (pos_unrecognized)
{
quoteLiteral(
pos,
pos_prev,
pos_unrecognized,
copy_from,
rewritten_query);
pos_unrecognized = nullptr;
}
/// If no Errors found
if (copy_from == query.data())
return false;
auto size = static_cast<size_t>(pos->end - copy_from);
rewritten_query.append(copy_from, size);
res = rewritten_query;
return true;
}
}

View File

@ -0,0 +1,10 @@
#pragma once
#include <base/types.h>
namespace DB
{
bool tryQuoteUnrecognizedTokens(const String & query, String & res);
}

View File

@ -19,6 +19,7 @@
#include <Processors/Sinks/SinkToStorage.h>
#include <QueryPipeline/Pipe.h>
#include <Common/parseRemoteDescription.h>
#include <Common/quoteString.h>
#include <Common/logger_useful.h>
#include <Storages/NamedCollectionsHelpers.h>
#include <Databases/MySQL/FetchTablesColumnsList.h>
@ -34,16 +35,6 @@ namespace ErrorCodes
extern const int UNKNOWN_TABLE;
}
static String backQuoteMySQL(const String & x)
{
String res(x.size(), '\0');
{
WriteBufferFromString wb(res);
writeBackQuotedStringMySQL(x, wb);
}
return res;
}
StorageMySQL::StorageMySQL(
const StorageID & table_id_,
mysqlxx::PoolWithFailover && pool_,

View File

@ -1581,6 +1581,128 @@ def utf8mb4_test(clickhouse_node, mysql_node, service_name):
mysql_node.query("DROP DATABASE utf8mb4_test")
def utf8mb4_column_test(clickhouse_node, mysql_node, service_name):
db = "utf8mb4_column_test"
mysql_node.query(f"DROP DATABASE IF EXISTS {db}")
clickhouse_node.query(f"DROP DATABASE IF EXISTS {db}")
mysql_node.query(f"CREATE DATABASE {db}")
# Full sync
mysql_node.query(f"CREATE TABLE {db}.unquoted (id INT primary key, 日期 DATETIME)")
mysql_node.query(f"CREATE TABLE {db}.quoted (id INT primary key, `日期` DATETIME)")
mysql_node.query(f"INSERT INTO {db}.unquoted VALUES(1, now())")
mysql_node.query(f"INSERT INTO {db}.quoted VALUES(1, now())")
clickhouse_node.query(
f"CREATE DATABASE {db} ENGINE = MaterializedMySQL('{service_name}:3306', '{db}', 'root', 'clickhouse')"
)
# Full sync replicated unquoted columns names since they use SHOW CREATE TABLE
# which returns quoted column names
check_query(
clickhouse_node,
f"/* expect: quoted unquoted */ SHOW TABLES FROM {db}",
"quoted\nunquoted\n",
)
check_query(
clickhouse_node,
f"/* expect: 1 */ SELECT COUNT() FROM {db}.unquoted",
"1\n",
)
check_query(
clickhouse_node,
f"/* expect: 1 */ SELECT COUNT() FROM {db}.quoted",
"1\n",
)
# Inc sync
mysql_node.query(
f"CREATE TABLE {db}.unquoted_new (id INT primary key, 日期 DATETIME)"
)
mysql_node.query(
f"CREATE TABLE {db}.quoted_new (id INT primary key, `日期` DATETIME)"
)
mysql_node.query(f"INSERT INTO {db}.unquoted_new VALUES(1, now())")
mysql_node.query(f"INSERT INTO {db}.quoted_new VALUES(1, now())")
mysql_node.query(f"INSERT INTO {db}.unquoted VALUES(2, now())")
mysql_node.query(f"INSERT INTO {db}.quoted VALUES(2, now())")
check_query(
clickhouse_node,
f"/* expect: 2 */ SELECT COUNT() FROM {db}.quoted",
"2\n",
)
check_query(
clickhouse_node,
f"/* expect: 1 */ SELECT COUNT() FROM {db}.quoted_new",
"1\n",
)
check_query(
clickhouse_node,
f"/* expect: 2 */ SELECT COUNT() FROM {db}.unquoted",
"2\n",
)
check_query(
clickhouse_node,
f"/* expect: 1 */ SELECT COUNT() FROM {db}.unquoted_new",
"1\n",
)
clickhouse_node.query(f"DROP DATABASE IF EXISTS `{db}`")
mysql_node.query(f"DROP DATABASE IF EXISTS `{db}`")
def utf8mb4_name_test(clickhouse_node, mysql_node, service_name):
db = "您Hi您"
table = "日期"
mysql_node.query(f"DROP DATABASE IF EXISTS `{db}`")
clickhouse_node.query(f"DROP DATABASE IF EXISTS `{db}`")
mysql_node.query(f"CREATE DATABASE `{db}`")
mysql_node.query(
f"CREATE TABLE `{db}`.`{table}` (id INT(11) NOT NULL PRIMARY KEY, `{table}` DATETIME) ENGINE=InnoDB DEFAULT CHARACTER SET utf8mb4"
)
mysql_node.query(f"INSERT INTO `{db}`.`{table}` VALUES(1, now())")
mysql_node.query(
f"CREATE TABLE {db}.{table}_unquoted (id INT(11) NOT NULL PRIMARY KEY, {table} DATETIME) ENGINE=InnoDB DEFAULT CHARACTER SET utf8mb4"
)
mysql_node.query(f"INSERT INTO {db}.{table}_unquoted VALUES(1, now())")
clickhouse_node.query(
f"CREATE DATABASE `{db}` ENGINE = MaterializedMySQL('{service_name}:3306', '{db}', 'root', 'clickhouse')"
)
check_query(
clickhouse_node,
f"/* expect: 1 */ SELECT COUNT() FROM `{db}`.`{table}`",
"1\n",
)
check_query(
clickhouse_node,
f"/* expect: 1 */ SELECT COUNT() FROM `{db}`.`{table}_unquoted`",
"1\n",
)
# Inc sync
mysql_node.query(
f"CREATE TABLE `{db}`.`{table}2` (id INT(11) NOT NULL PRIMARY KEY, `{table}` DATETIME) ENGINE=InnoDB DEFAULT CHARACTER SET utf8mb4"
)
mysql_node.query(f"INSERT INTO `{db}`.`{table}2` VALUES(1, now())")
check_query(
clickhouse_node,
f"/* expect: 1 */ SELECT COUNT() FROM `{db}`.`{table}2`",
"1\n",
)
mysql_node.query(
f"CREATE TABLE {db}.{table}2_unquoted (id INT(11) NOT NULL PRIMARY KEY, {table} DATETIME) ENGINE=InnoDB DEFAULT CHARACTER SET utf8mb4"
)
mysql_node.query(f"INSERT INTO {db}.{table}2_unquoted VALUES(1, now())")
check_query(
clickhouse_node,
f"/* expect: 1 */ SELECT COUNT() FROM `{db}`.`{table}2_unquoted`",
"1\n",
)
clickhouse_node.query(f"DROP DATABASE IF EXISTS `{db}`")
mysql_node.query(f"DROP DATABASE IF EXISTS `{db}`")
def system_parts_test(clickhouse_node, mysql_node, service_name):
mysql_node.query("DROP DATABASE IF EXISTS system_parts_test")
clickhouse_node.query("DROP DATABASE IF EXISTS system_parts_test")

View File

@ -381,6 +381,12 @@ def test_utf8mb4(
):
materialized_with_ddl.utf8mb4_test(clickhouse_node, started_mysql_5_7, "mysql57")
materialized_with_ddl.utf8mb4_test(clickhouse_node, started_mysql_8_0, "mysql80")
materialized_with_ddl.utf8mb4_column_test(
clickhouse_node, started_mysql_8_0, "mysql80"
)
materialized_with_ddl.utf8mb4_name_test(
clickhouse_node, started_mysql_8_0, "mysql80"
)
def test_system_parts_table(started_cluster, started_mysql_8_0, clickhouse_node):