Merge pull request #25676 from sand6255/MaterializeMySQL-Support-Enum-Data-Type

MaterializeMySQL: support ENUM data type
This commit is contained in:
Kseniia Sumarokova 2021-06-28 19:15:00 +03:00 committed by GitHub
commit 9d02af7d8b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 183 additions and 10 deletions

View File

@ -49,6 +49,7 @@ When working with the `MaterializeMySQL` database engine, [ReplacingMergeTree](.
| DATE, NEWDATE | [Date](../../sql-reference/data-types/date.md) |
| DATETIME, TIMESTAMP | [DateTime](../../sql-reference/data-types/datetime.md) |
| DATETIME2, TIMESTAMP2 | [DateTime64](../../sql-reference/data-types/datetime64.md) |
| ENUM | [Enum](../../sql-reference/data-types/enum.md) |
| STRING | [String](../../sql-reference/data-types/string.md) |
| VARCHAR, VAR_STRING | [String](../../sql-reference/data-types/string.md) |
| BLOB | [String](../../sql-reference/data-types/string.md) |

View File

@ -115,6 +115,8 @@ void ODBCBlockInputStream::insertValue(
assert_cast<ColumnFloat64 &>(column).insertValue(row.get<double>(idx));
break;
case ValueType::vtFixedString:[[fallthrough]];
case ValueType::vtEnum8:
case ValueType::vtEnum16:
case ValueType::vtString:
assert_cast<ColumnString &>(column).insert(row.get<std::string>(idx));
break;

View File

@ -67,9 +67,9 @@ void ExternalResultDescription::init(const Block & sample_block_)
else if (which.isUUID())
types.emplace_back(ValueType::vtUUID, is_nullable);
else if (which.isEnum8())
types.emplace_back(ValueType::vtString, is_nullable);
types.emplace_back(ValueType::vtEnum8, is_nullable);
else if (which.isEnum16())
types.emplace_back(ValueType::vtString, is_nullable);
types.emplace_back(ValueType::vtEnum16, is_nullable);
else if (which.isDateTime64())
types.emplace_back(ValueType::vtDateTime64, is_nullable);
else if (which.isDecimal32())

View File

@ -22,6 +22,8 @@ struct ExternalResultDescription
vtInt64,
vtFloat32,
vtFloat64,
vtEnum8,
vtEnum16,
vtString,
vtDate,
vtDateTime,

View File

@ -298,7 +298,6 @@ namespace MySQLReplication
}
/// Types that do not used in the binlog event:
/// MYSQL_TYPE_ENUM
/// MYSQL_TYPE_SET
/// MYSQL_TYPE_TINY_BLOB
/// MYSQL_TYPE_MEDIUM_BLOB
@ -562,6 +561,22 @@ namespace MySQLReplication
row.push_back(dispatch((meta >> 8) & 0xFF, meta & 0xFF, read_decimal));
break;
}
case MYSQL_TYPE_ENUM:
{
if ((meta & 0xFF) == 1)
{
UInt8 val = 0;
payload.readStrict(reinterpret_cast<char *>(&val), 1);
row.push_back(Field{UInt8{val}});
}
else
{
UInt16 val = 0;
payload.readStrict(reinterpret_cast<char *>(&val), 2);
row.push_back(Field{UInt16{val}});
}
break;
}
case MYSQL_TYPE_VARCHAR:
case MYSQL_TYPE_VAR_STRING:
{

View File

@ -243,6 +243,8 @@ namespace
insertNumber<Float64>(column, value, name);
break;
case ValueType::vtEnum8:
case ValueType::vtEnum16:
case ValueType::vtString:
{
if (value.type() == Poco::MongoDB::ElementTraits<ObjectId::Ptr>::TypeId)

View File

@ -157,6 +157,8 @@ void PostgreSQLBlockInputStream::insertValue(IColumn & column, std::string_view
assert_cast<ColumnFloat64 &>(column).insertValue(pqxx::from_string<double>(value));
break;
case ValueType::vtFixedString:[[fallthrough]];
case ValueType::vtEnum8:
case ValueType::vtEnum16:
case ValueType::vtString:
assert_cast<ColumnString &>(column).insertData(value.data(), value.size());
break;

View File

@ -262,6 +262,9 @@ void registerDataTypeEnum(DataTypeFactory & factory)
factory.registerDataType("Enum8", createExact<DataTypeEnum<Int8>>);
factory.registerDataType("Enum16", createExact<DataTypeEnum<Int16>>);
factory.registerDataType("Enum", create);
/// MySQL
factory.registerAlias("ENUM", "Enum", DataTypeFactory::CaseInsensitive);
}
}

View File

@ -110,6 +110,8 @@ void CassandraBlockInputStream::insertValue(IColumn & column, ValueType type, co
assert_cast<ColumnFloat64 &>(column).insertValue(value);
break;
}
case ValueType::vtEnum8:
case ValueType::vtEnum16:
case ValueType::vtString:
{
const char * value = nullptr;

View File

@ -91,6 +91,8 @@ namespace DB
case ValueType::vtFloat64:
insert<Float64>(column, string_value);
break;
case ValueType::vtEnum8:
case ValueType::vtEnum16:
case ValueType::vtString:
assert_cast<ColumnString &>(column).insert(parse<String>(string_value));
break;

View File

@ -10,6 +10,7 @@
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnFixedString.h>
#include <DataTypes/IDataType.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeNullable.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
@ -157,6 +158,14 @@ namespace
assert_cast<ColumnFloat64 &>(column).insertValue(value.getDouble());
read_bytes_size += 8;
break;
case ValueType::vtEnum8:
assert_cast<ColumnInt8 &>(column).insertValue(assert_cast<const DataTypeEnum<Int8> &>(data_type).castToValue(value.data()).get<Int8>());
read_bytes_size += assert_cast<ColumnInt8 &>(column).byteSize();
break;
case ValueType::vtEnum16:
assert_cast<ColumnInt16 &>(column).insertValue(assert_cast<const DataTypeEnum<Int16> &>(data_type).castToValue(value.data()).get<Int16>());
read_bytes_size += assert_cast<ColumnInt16 &>(column).byteSize();
break;
case ValueType::vtString:
assert_cast<ColumnString &>(column).insertData(value.data(), value.size());
read_bytes_size += assert_cast<ColumnString &>(column).byteSize();

View File

@ -92,22 +92,40 @@ static NamesAndTypesList getColumnsList(const ASTExpressionList * columns_defini
}
ASTPtr data_type = declare_column->data_type;
if (is_unsigned)
{
auto * data_type_function = data_type->as<ASTFunction>();
if (data_type_function)
{
String type_name_upper = Poco::toUpper(data_type_function->name);
if (is_unsigned)
{
/// For example(in MySQL): CREATE TABLE test(column_name INT NOT NULL ... UNSIGNED)
if (type_name_upper.find("INT") != std::string::npos && !endsWith(type_name_upper, "SIGNED")
if (type_name_upper.find("INT") != String::npos && !endsWith(type_name_upper, "SIGNED")
&& !endsWith(type_name_upper, "UNSIGNED"))
data_type_function->name = type_name_upper + " UNSIGNED";
}
}
/// Transforms MySQL ENUM's list of strings to ClickHouse string-integer pairs
/// For example ENUM('a', 'b', 'c') -> ENUM('a'=1, 'b'=2, 'c'=3)
/// Elements on a position further than 32767 are assigned negative values, starting with -32768.
/// Note: Enum would be transformed to Enum8 if number of elements is less then 128, otherwise it would be transformed to Enum16.
if (type_name_upper.find("ENUM") != String::npos)
{
UInt16 i = 0;
for (ASTPtr & child : data_type_function->arguments->children)
{
auto new_child = std::make_shared<ASTFunction>();
new_child->name = "equals";
auto * literal = child->as<ASTLiteral>();
new_child->arguments = std::make_shared<ASTExpressionList>();
new_child->arguments->children.push_back(std::make_shared<ASTLiteral>(literal->value.get<String>()));
new_child->arguments->children.push_back(std::make_shared<ASTLiteral>(Int16(++i)));
child = new_child;
}
}
}
if (is_nullable)
data_type = makeASTFunction("Nullable", data_type);

View File

@ -235,3 +235,25 @@ TEST(MySQLCreateRewritten, QueryWithColumnComments)
std::string(MATERIALIZEMYSQL_TABLE_COLUMNS) +
") ENGINE = ReplacingMergeTree(_version) PARTITION BY intDiv(key, 4294967) ORDER BY tuple(key)");
}
TEST(MySQLCreateRewritten, QueryWithEnum)
{
tryRegisterFunctions();
const auto & context_holder = getContext();
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1`(`key` INT NOT NULL PRIMARY KEY, `test` ENUM('a','b','c'))", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` Int32, `test` Nullable(Enum8('a' = 1, 'b' = 2, 'c' = 3))" +
std::string(MATERIALIZEMYSQL_TABLE_COLUMNS) +
") ENGINE = ReplacingMergeTree(_version) PARTITION BY intDiv(key, 4294967) ORDER BY tuple(key)");
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1`(`key` INT NOT NULL PRIMARY KEY, `test` ENUM('a','b','c') NOT NULL)", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` Int32, `test` Enum8('a' = 1, 'b' = 2, 'c' = 3)" +
std::string(MATERIALIZEMYSQL_TABLE_COLUMNS) +
") ENGINE = ReplacingMergeTree(_version) PARTITION BY intDiv(key, 4294967) ORDER BY tuple(key)");
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"CREATE TABLE `test_database`.`test_table_1`(`key` INT NOT NULL PRIMARY KEY, `test` ENUM('a','b','c') COMMENT 'test_comment')", context_holder.context)),
"CREATE TABLE test_database.test_table_1 (`key` Int32, `test` Nullable(Enum8('a' = 1, 'b' = 2, 'c' = 3)) COMMENT 'test_comment'" +
std::string(MATERIALIZEMYSQL_TABLE_COLUMNS) +
") ENGINE = ReplacingMergeTree(_version) PARTITION BY intDiv(key, 4294967) ORDER BY tuple(key)");
}

View File

@ -177,6 +177,8 @@
"test_materialize_mysql_database/test.py::test_system_tables_table[clickhouse_node1]",
"test_materialize_mysql_database/test.py::test_materialize_with_column_comments[clickhouse_node0]",
"test_materialize_mysql_database/test.py::test_materialize_with_column_comments[clickhouse_node1]",
"test_materialize_mysql_database/test.py::test_materialize_with_enum[clickhouse_node0]",
"test_materialize_mysql_database/test.py::test_materialize_with_enum[clickhouse_node1]",
"test_materialize_mysql_database/test.py::test_utf8mb4[clickhouse_node0]",
"test_materialize_mysql_database/test.py::test_utf8mb4[clickhouse_node1]",
"test_parts_delete_zookeeper/test.py::test_merge_doesnt_work_without_zookeeper",

View File

@ -181,6 +181,8 @@
"test_materialize_mysql_database/test.py::test_system_tables_table[clickhouse_node1]",
"test_materialize_mysql_database/test.py::test_materialize_with_column_comments[clickhouse_node0]",
"test_materialize_mysql_database/test.py::test_materialize_with_column_comments[clickhouse_node1]",
"test_materialize_mysql_database/test.py::test_materialize_with_enum[clickhouse_node0]",
"test_materialize_mysql_database/test.py::test_materialize_with_enum[clickhouse_node1]",
"test_materialize_mysql_database/test.py::test_utf8mb4[clickhouse_node0]",
"test_materialize_mysql_database/test.py::test_utf8mb4[clickhouse_node1]",
"test_parts_delete_zookeeper/test.py::test_merge_doesnt_work_without_zookeeper",

View File

@ -853,6 +853,86 @@ def materialize_with_column_comments_test(clickhouse_node, mysql_node, service_n
clickhouse_node.query("DROP DATABASE materialize_with_column_comments_test")
mysql_node.query("DROP DATABASE materialize_with_column_comments_test")
def materialize_with_enum8_test(clickhouse_node, mysql_node, service_name):
mysql_node.query("DROP DATABASE IF EXISTS materialize_with_enum8_test")
clickhouse_node.query("DROP DATABASE IF EXISTS materialize_with_enum8_test")
mysql_node.query("CREATE DATABASE materialize_with_enum8_test")
enum8_values_count = 127
enum8_values = ""
enum8_values_with_backslash = ""
for i in range(1, enum8_values_count):
enum8_values += '\'' + str(i) + "\', "
enum8_values_with_backslash += "\\\'" + str(i) +"\\\' = " + str(i) + ", "
enum8_values += '\'' + str(enum8_values_count) + '\''
enum8_values_with_backslash += "\\\'" + str(enum8_values_count) +"\\\' = " + str(enum8_values_count)
mysql_node.query("CREATE TABLE materialize_with_enum8_test.test (id int NOT NULL PRIMARY KEY, value ENUM(" + enum8_values + ")) ENGINE=InnoDB")
mysql_node.query("INSERT INTO materialize_with_enum8_test.test (id, value) VALUES (1, '1'),(2, '2')")
clickhouse_node.query("CREATE DATABASE materialize_with_enum8_test ENGINE = MaterializeMySQL('{}:3306', 'materialize_with_enum8_test', 'root', 'clickhouse')".format(service_name))
check_query(clickhouse_node, "SELECT value FROM materialize_with_enum8_test.test ORDER BY id", "1\n2\n")
mysql_node.query("INSERT INTO materialize_with_enum8_test.test (id, value) VALUES (3, '127')")
check_query(clickhouse_node, "SELECT value FROM materialize_with_enum8_test.test ORDER BY id", "1\n2\n127\n")
check_query(clickhouse_node, "DESCRIBE TABLE materialize_with_enum8_test.test", "id\tInt32\t\t\t\t\t\nvalue\tNullable(Enum8(" + enum8_values_with_backslash + "))\t\t\t\t\t\n_sign\tInt8\tMATERIALIZED\t1\t\t\t\n_version\tUInt64\tMATERIALIZED\t1\t\t\t\n")
clickhouse_node.query("DROP DATABASE materialize_with_enum8_test")
mysql_node.query("DROP DATABASE materialize_with_enum8_test")
def materialize_with_enum16_test(clickhouse_node, mysql_node, service_name):
mysql_node.query("DROP DATABASE IF EXISTS materialize_with_enum16_test")
clickhouse_node.query("DROP DATABASE IF EXISTS materialize_with_enum16_test")
mysql_node.query("CREATE DATABASE materialize_with_enum16_test")
enum16_values_count = 600
enum16_values = ""
enum16_values_with_backslash = ""
for i in range(1, enum16_values_count):
enum16_values += '\'' + str(i) + "\', "
enum16_values_with_backslash += "\\\'" + str(i) +"\\\' = " + str(i) + ", "
enum16_values += '\'' + str(enum16_values_count) + '\''
enum16_values_with_backslash += "\\\'" + str(enum16_values_count) +"\\\' = " + str(enum16_values_count)
mysql_node.query("CREATE TABLE materialize_with_enum16_test.test (id int NOT NULL PRIMARY KEY, value ENUM(" + enum16_values + ")) ENGINE=InnoDB")
mysql_node.query("INSERT INTO materialize_with_enum16_test.test (id, value) VALUES (1, '1'),(2, '2')")
clickhouse_node.query("CREATE DATABASE materialize_with_enum16_test ENGINE = MaterializeMySQL('{}:3306', 'materialize_with_enum16_test', 'root', 'clickhouse')".format(service_name))
check_query(clickhouse_node, "SELECT value FROM materialize_with_enum16_test.test ORDER BY id", "1\n2\n")
mysql_node.query("INSERT INTO materialize_with_enum16_test.test (id, value) VALUES (3, '500')")
check_query(clickhouse_node, "SELECT value FROM materialize_with_enum16_test.test ORDER BY id", "1\n2\n500\n")
check_query(clickhouse_node, "DESCRIBE TABLE materialize_with_enum16_test.test", "id\tInt32\t\t\t\t\t\nvalue\tNullable(Enum16(" + enum16_values_with_backslash + "))\t\t\t\t\t\n_sign\tInt8\tMATERIALIZED\t1\t\t\t\n_version\tUInt64\tMATERIALIZED\t1\t\t\t\n")
clickhouse_node.query("DROP DATABASE materialize_with_enum16_test")
mysql_node.query("DROP DATABASE materialize_with_enum16_test")
def alter_enum8_to_enum16_test(clickhouse_node, mysql_node, service_name):
mysql_node.query("DROP DATABASE IF EXISTS alter_enum8_to_enum16_test")
clickhouse_node.query("DROP DATABASE IF EXISTS alter_enum8_to_enum16_test")
mysql_node.query("CREATE DATABASE alter_enum8_to_enum16_test")
enum8_values_count = 100
enum8_values = ""
enum8_values_with_backslash = ""
for i in range(1, enum8_values_count):
enum8_values += '\'' + str(i) + "\', "
enum8_values_with_backslash += "\\\'" + str(i) +"\\\' = " + str(i) + ", "
enum8_values += '\'' + str(enum8_values_count) + '\''
enum8_values_with_backslash += "\\\'" + str(enum8_values_count) +"\\\' = " + str(enum8_values_count)
mysql_node.query("CREATE TABLE alter_enum8_to_enum16_test.test (id int NOT NULL PRIMARY KEY, value ENUM(" + enum8_values + ")) ENGINE=InnoDB")
mysql_node.query("INSERT INTO alter_enum8_to_enum16_test.test (id, value) VALUES (1, '1'),(2, '2')")
clickhouse_node.query("CREATE DATABASE alter_enum8_to_enum16_test ENGINE = MaterializeMySQL('{}:3306', 'alter_enum8_to_enum16_test', 'root', 'clickhouse')".format(service_name))
mysql_node.query("INSERT INTO alter_enum8_to_enum16_test.test (id, value) VALUES (3, '75')")
check_query(clickhouse_node, "SELECT value FROM alter_enum8_to_enum16_test.test ORDER BY id", "1\n2\n75\n")
check_query(clickhouse_node, "DESCRIBE TABLE alter_enum8_to_enum16_test.test", "id\tInt32\t\t\t\t\t\nvalue\tNullable(Enum8(" + enum8_values_with_backslash + "))\t\t\t\t\t\n_sign\tInt8\tMATERIALIZED\t1\t\t\t\n_version\tUInt64\tMATERIALIZED\t1\t\t\t\n")
enum16_values_count = 600
enum16_values = ""
enum16_values_with_backslash = ""
for i in range(1, enum16_values_count):
enum16_values += '\'' + str(i) + "\', "
enum16_values_with_backslash += "\\\'" + str(i) +"\\\' = " + str(i) + ", "
enum16_values += '\'' + str(enum16_values_count) + '\''
enum16_values_with_backslash += "\\\'" + str(enum16_values_count) +"\\\' = " + str(enum16_values_count)
mysql_node.query("ALTER TABLE alter_enum8_to_enum16_test.test MODIFY COLUMN value ENUM(" + enum16_values + ")")
check_query(clickhouse_node, "DESCRIBE TABLE alter_enum8_to_enum16_test.test", "id\tInt32\t\t\t\t\t\nvalue\tNullable(Enum16(" + enum16_values_with_backslash + "))\t\t\t\t\t\n_sign\tInt8\tMATERIALIZED\t1\t\t\t\n_version\tUInt64\tMATERIALIZED\t1\t\t\t\n")
mysql_node.query("INSERT INTO alter_enum8_to_enum16_test.test (id, value) VALUES (4, '500')")
check_query(clickhouse_node, "SELECT value FROM alter_enum8_to_enum16_test.test ORDER BY id", "1\n2\n75\n500\n")
clickhouse_node.query("DROP DATABASE alter_enum8_to_enum16_test")
mysql_node.query("DROP DATABASE alter_enum8_to_enum16_test")
def move_to_prewhere_and_column_filtering(clickhouse_node, mysql_node, service_name):
clickhouse_node.query("DROP DATABASE IF EXISTS cond_on_key_col")
mysql_node.query("DROP DATABASE IF EXISTS cond_on_key_col")

View File

@ -223,6 +223,15 @@ def test_materialize_with_column_comments(started_cluster, started_mysql_8_0, st
materialize_with_ddl.materialize_with_column_comments_test(clickhouse_node, started_mysql_5_7, "mysql57")
materialize_with_ddl.materialize_with_column_comments_test(clickhouse_node, started_mysql_8_0, "mysql80")
@pytest.mark.parametrize(('clickhouse_node'), [node_db_ordinary, node_db_ordinary])
def test_materialize_with_enum(started_cluster, started_mysql_8_0, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.materialize_with_enum8_test(clickhouse_node, started_mysql_5_7, "mysql57")
materialize_with_ddl.materialize_with_enum16_test(clickhouse_node, started_mysql_5_7, "mysql57")
materialize_with_ddl.alter_enum8_to_enum16_test(clickhouse_node, started_mysql_5_7, "mysql57")
materialize_with_ddl.materialize_with_enum8_test(clickhouse_node, started_mysql_8_0, "mysql80")
materialize_with_ddl.materialize_with_enum16_test(clickhouse_node, started_mysql_8_0, "mysql80")
materialize_with_ddl.alter_enum8_to_enum16_test(clickhouse_node, started_mysql_8_0, "mysql80")
@pytest.mark.parametrize(('clickhouse_node'), [node_disable_bytes_settings, node_disable_rows_settings])
def test_mysql_settings(started_cluster, started_mysql_8_0, started_mysql_5_7, clickhouse_node):