From e9eed1f92779c694e6fd687ebf5dad1b398419c9 Mon Sep 17 00:00:00 2001 From: zzsmdfj Date: Fri, 17 Dec 2021 14:48:49 +0800 Subject: [PATCH] to #15182_MaterializeMySQL_support_bit_type --- base/mysqlxx/Row.cpp | 10 ++- base/mysqlxx/Row.h | 2 + base/mysqlxx/Types.h | 2 + .../database-engines/materialized-mysql.md | 1 + src/Core/MySQL/MySQLReplication.cpp | 14 ++++ src/Core/MySQL/MySQLReplication.h | 13 ++-- src/DataTypes/DataTypesNumber.cpp | 1 + src/DataTypes/convertMySQLDataType.cpp | 4 ++ .../MySQL/tests/gtest_create_rewritten.cpp | 2 +- src/Processors/Sources/MySQLSource.cpp | 27 ++++++-- .../materialize_with_ddl.py | 65 +++++++++++++++++++ .../test_materialized_mysql_database/test.py | 4 ++ 12 files changed, 133 insertions(+), 12 deletions(-) diff --git a/base/mysqlxx/Row.cpp b/base/mysqlxx/Row.cpp index aecec46e519..0c2854b1700 100644 --- a/base/mysqlxx/Row.cpp +++ b/base/mysqlxx/Row.cpp @@ -4,8 +4,6 @@ #include #endif #include - - namespace mysqlxx { @@ -21,4 +19,12 @@ Value Row::operator[] (const char * name) const throw Exception(std::string("Unknown column ") + name); } +enum enum_field_types Row::getFieldType(size_t i) +{ + if (i >= res->getNumFields()) + throw Exception(std::string("Array Index Overflow")); + MYSQL_FIELDS fields = res->getFields(); + return fields[i].type; +} + } diff --git a/base/mysqlxx/Row.h b/base/mysqlxx/Row.h index d668fdbd29a..b11d7d628ef 100644 --- a/base/mysqlxx/Row.h +++ b/base/mysqlxx/Row.h @@ -79,6 +79,8 @@ public: */ operator private_bool_type() const { return row == nullptr ? nullptr : &Row::row; } + enum enum_field_types getFieldType(size_t i); + private: MYSQL_ROW row{}; ResultBase * res{}; diff --git a/base/mysqlxx/Types.h b/base/mysqlxx/Types.h index b5ed70916fa..5fd9aa8bbc8 100644 --- a/base/mysqlxx/Types.h +++ b/base/mysqlxx/Types.h @@ -16,6 +16,8 @@ using MYSQL_ROW = char**; struct st_mysql_field; using MYSQL_FIELD = st_mysql_field; +enum struct enum_field_types; + #endif namespace mysqlxx diff --git a/docs/en/engines/database-engines/materialized-mysql.md b/docs/en/engines/database-engines/materialized-mysql.md index 944264b68a3..b8b49634735 100644 --- a/docs/en/engines/database-engines/materialized-mysql.md +++ b/docs/en/engines/database-engines/materialized-mysql.md @@ -83,6 +83,7 @@ When working with the `MaterializedMySQL` database engine, [ReplacingMergeTree]( | VARCHAR, VAR_STRING | [String](../../sql-reference/data-types/string.md) | | BLOB | [String](../../sql-reference/data-types/string.md) | | BINARY | [FixedString](../../sql-reference/data-types/fixedstring.md) | +| BIT | [UInt64](../../sql-reference/data-types/int-uint.md) | [Nullable](../../sql-reference/data-types/nullable.md) is supported. diff --git a/src/Core/MySQL/MySQLReplication.cpp b/src/Core/MySQL/MySQLReplication.cpp index f734154f4ba..70446573718 100644 --- a/src/Core/MySQL/MySQLReplication.cpp +++ b/src/Core/MySQL/MySQLReplication.cpp @@ -230,6 +230,7 @@ namespace MySQLReplication pos += 2; break; } + case MYSQL_TYPE_BIT: case MYSQL_TYPE_VARCHAR: case MYSQL_TYPE_VAR_STRING: { /// Little-Endian @@ -584,6 +585,19 @@ namespace MySQLReplication } break; } + case MYSQL_TYPE_BIT: + { + UInt32 bits = ((meta >> 8) * 8) + (meta & 0xff); + UInt32 size = (bits + 7) / 8; + + Bitmap bitmap_value; + String byte_buffer; + byte_buffer.resize(size); + readBigEndianStrict(payload, reinterpret_cast(byte_buffer.data()), size); + readBitmapFromStr(byte_buffer.c_str(), bitmap_value, size); + row.push_back(Field{UInt64{bitmap_value.to_ulong()}}); + break; + } case MYSQL_TYPE_VARCHAR: case MYSQL_TYPE_VAR_STRING: { diff --git a/src/Core/MySQL/MySQLReplication.h b/src/Core/MySQL/MySQLReplication.h index cb67ce73de9..38c099bb2f9 100644 --- a/src/Core/MySQL/MySQLReplication.h +++ b/src/Core/MySQL/MySQLReplication.h @@ -78,11 +78,8 @@ namespace MySQLReplication } } - inline void readBitmap(ReadBuffer & payload, Bitmap & bitmap, size_t bitmap_size) + inline void readBitmapFromStr(const char * byte_buffer, Bitmap & bitmap, size_t bitmap_size) { - String byte_buffer; - byte_buffer.resize(bitmap_size); - payload.readStrict(reinterpret_cast(byte_buffer.data()), bitmap_size); bitmap.resize(bitmap_size * 8, false); for (size_t i = 0; i < bitmap_size; ++i) { @@ -109,6 +106,14 @@ namespace MySQLReplication } } + inline void readBitmap(ReadBuffer & payload, Bitmap & bitmap, size_t bitmap_size) + { + String byte_buffer; + byte_buffer.resize(bitmap_size); + payload.readStrict(reinterpret_cast(byte_buffer.data()), bitmap_size); + readBitmapFromStr(byte_buffer.c_str(), bitmap, bitmap_size); + } + class EventBase; using BinlogEventPtr = std::shared_ptr; diff --git a/src/DataTypes/DataTypesNumber.cpp b/src/DataTypes/DataTypesNumber.cpp index fef4c34d8b0..0c9a410077f 100644 --- a/src/DataTypes/DataTypesNumber.cpp +++ b/src/DataTypes/DataTypesNumber.cpp @@ -86,6 +86,7 @@ void registerDataTypeNumbers(DataTypeFactory & factory) factory.registerAlias("INT UNSIGNED", "UInt32", DataTypeFactory::CaseInsensitive); factory.registerAlias("INTEGER UNSIGNED", "UInt32", DataTypeFactory::CaseInsensitive); factory.registerAlias("BIGINT UNSIGNED", "UInt64", DataTypeFactory::CaseInsensitive); + factory.registerAlias("BIT", "UInt64", DataTypeFactory::CaseInsensitive); } } diff --git a/src/DataTypes/convertMySQLDataType.cpp b/src/DataTypes/convertMySQLDataType.cpp index 1b5e20bddce..ee897de9597 100644 --- a/src/DataTypes/convertMySQLDataType.cpp +++ b/src/DataTypes/convertMySQLDataType.cpp @@ -91,6 +91,10 @@ DataTypePtr convertMySQLDataType(MultiEnum type_support, res = std::make_shared(scale); } } + else if (type_name == "bit") + { + res = std::make_shared(); + } else if (type_support.isSet(MySQLDataTypesSupport::DECIMAL) && (type_name == "numeric" || type_name == "decimal")) { if (precision <= DecimalUtils::max_precision) diff --git a/src/Interpreters/MySQL/tests/gtest_create_rewritten.cpp b/src/Interpreters/MySQL/tests/gtest_create_rewritten.cpp index 5e18b0de2e0..02af07bc00c 100644 --- a/src/Interpreters/MySQL/tests/gtest_create_rewritten.cpp +++ b/src/Interpreters/MySQL/tests/gtest_create_rewritten.cpp @@ -40,7 +40,7 @@ TEST(MySQLCreateRewritten, ColumnsDataType) {"TINYINT", "Int8"}, {"SMALLINT", "Int16"}, {"MEDIUMINT", "Int32"}, {"INT", "Int32"}, {"INTEGER", "Int32"}, {"BIGINT", "Int64"}, {"FLOAT", "Float32"}, {"DOUBLE", "Float64"}, {"VARCHAR(10)", "String"}, {"CHAR(10)", "String"}, {"Date", "Date"}, {"DateTime", "DateTime"}, - {"TIMESTAMP", "DateTime"}, {"BOOLEAN", "Bool"} + {"TIMESTAMP", "DateTime"}, {"BOOLEAN", "Bool"}, {"BIT", "UInt64"} }; for (const auto & [test_type, mapped_type] : test_types) diff --git a/src/Processors/Sources/MySQLSource.cpp b/src/Processors/Sources/MySQLSource.cpp index 8e9cdcfda48..94440fd7cdd 100644 --- a/src/Processors/Sources/MySQLSource.cpp +++ b/src/Processors/Sources/MySQLSource.cpp @@ -2,6 +2,7 @@ #if USE_MYSQL #include +#include #include #include #include @@ -126,7 +127,7 @@ namespace { using ValueType = ExternalResultDescription::ValueType; - void insertValue(const IDataType & data_type, IColumn & column, const ValueType type, const mysqlxx::Value & value, size_t & read_bytes_size) + void insertValue(const IDataType & data_type, IColumn & column, const ValueType type, const mysqlxx::Value & value, size_t & read_bytes_size, enum enum_field_types mysql_type) { switch (type) { @@ -143,9 +144,25 @@ namespace read_bytes_size += 4; break; case ValueType::vtUInt64: - assert_cast(column).insertValue(value.getUInt()); - read_bytes_size += 8; + { + //we don't have enum enum_field_types definition in mysqlxx/Types.h, so we use literal values directly here. + if (static_cast(mysql_type) == 16) + { + size_t n = value.size(); + char *start = const_cast(value.data()), *end = start + n; + std::reverse(start, end); + MySQLReplication::Bitmap bitmap; + MySQLReplication::readBitmapFromStr(value.data(), bitmap, n); + assert_cast(column).insertValue(bitmap.to_ulong()); + read_bytes_size += n; + } + else + { + assert_cast(column).insertValue(value.getUInt()); + read_bytes_size += 8; + } break; + } case ValueType::vtInt8: assert_cast(column).insertValue(value.getInt()); read_bytes_size += 1; @@ -258,12 +275,12 @@ Chunk MySQLSource::generate() { ColumnNullable & column_nullable = assert_cast(*columns[index]); const auto & data_type = assert_cast(*sample.type); - insertValue(*data_type.getNestedType(), column_nullable.getNestedColumn(), description.types[index].first, value, read_bytes_size); + insertValue(*data_type.getNestedType(), column_nullable.getNestedColumn(), description.types[index].first, value, read_bytes_size, row.getFieldType(position_mapping[index])); column_nullable.getNullMapData().emplace_back(false); } else { - insertValue(*sample.type, *columns[index], description.types[index].first, value, read_bytes_size); + insertValue(*sample.type, *columns[index], description.types[index].first, value, read_bytes_size, row.getFieldType(position_mapping[index])); } } else diff --git a/tests/integration/test_materialized_mysql_database/materialize_with_ddl.py b/tests/integration/test_materialized_mysql_database/materialize_with_ddl.py index 43fab165c53..879d09623ac 100644 --- a/tests/integration/test_materialized_mysql_database/materialize_with_ddl.py +++ b/tests/integration/test_materialized_mysql_database/materialize_with_ddl.py @@ -1072,3 +1072,68 @@ def table_overrides(clickhouse_node, mysql_node, service_name): check_query(clickhouse_node, "SELECT type FROM system.columns WHERE database = 'table_overrides' AND table = 't1' AND name = 'sensor_id'", "UInt64\n") clickhouse_node.query("DROP DATABASE IF EXISTS table_overrides") mysql_node.query("DROP DATABASE IF EXISTS table_overrides") + +def materialized_database_support_all_kinds_of_mysql_datatype(clickhouse_node, mysql_node, service_name): + mysql_node.query("DROP DATABASE IF EXISTS test_database_datatype") + clickhouse_node.query("DROP DATABASE IF EXISTS test_database_datatype") + mysql_node.query("CREATE DATABASE test_database_datatype DEFAULT CHARACTER SET 'utf8'") + mysql_node.query(""" + CREATE TABLE test_database_datatype.t1 ( + `v1` int(10) unsigned AUTO_INCREMENT, + `v2` TINYINT, + `v3` SMALLINT, + `v4` BIGINT, + `v5` int, + `v6` TINYINT unsigned, + `v7` SMALLINT unsigned, + `v8` BIGINT unsigned, + `v9` FLOAT, + `v10` FLOAT unsigned, + `v11` DOUBLE, + `v12` DOUBLE unsigned, + `v13` DECIMAL(5,4), + `v14` date, + `v15` TEXT, + `v16` varchar(100) , + `v17` BLOB, + `v18` datetime DEFAULT CURRENT_TIMESTAMP, + `v19` datetime(6) DEFAULT CURRENT_TIMESTAMP(6), + `v20` TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + `v21` TIMESTAMP(6) DEFAULT CURRENT_TIMESTAMP(6), + /* todo support */ + # `v22` YEAR, + # `v23` TIME, + # `v24` TIME(3), + # `v25` GEOMETRY, + `v26` bit(4), + # `v27` JSON DEFAULT NULL, + # `v28` set('a', 'c', 'f', 'd', 'e', 'b'), + `v29` mediumint(4) unsigned NOT NULL DEFAULT '0', + `v30` varbinary(255) DEFAULT NULL COMMENT 'varbinary support', + `v31` binary(200) DEFAULT NULL, + `v32` ENUM('RED','GREEN','BLUE'), + PRIMARY KEY (`v1`) + ) ENGINE=InnoDB; + """) + + mysql_node.query(""" + INSERT INTO test_database_datatype.t1 (v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13, v14, v15, v16, v17, v18, v19, v20, v21, v26, v29, v30, v31, v32) values + (1, 11, 9223372036854775807, -1, 1, 11, 18446744073709551615, -1.1, 1.1, -1.111, 1.111, 1.1111, '2021-10-06', 'text', 'varchar', 'BLOB', '2021-10-06 18:32:57', '2021-10-06 18:32:57.482786', '2021-10-06 18:32:57', '2021-10-06 18:32:57.482786', b'1010', 11, 'varbinary', 'binary', 'RED'); + """) + clickhouse_node.query( + "CREATE DATABASE test_database_datatype ENGINE = MaterializeMySQL('{}:3306', 'test_database_datatype', 'root', 'clickhouse')".format( + service_name)) + + assert "test_database_datatype" in clickhouse_node.query("SHOW DATABASES") + # full synchronization check + check_query(clickhouse_node, "SELECT v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13, v14, v15, v16, v17, v18, v19, v20, v21, v26, v29, v30, v32 FROM test_database_datatype.t1 FORMAT TSV", + "1\t1\t11\t9223372036854775807\t-1\t1\t11\t18446744073709551615\t-1.1\t1.1\t-1.111\t1.111\t1.1111\t2021-10-06\ttext\tvarchar\tBLOB\t2021-10-06 18:32:57\t2021-10-06 18:32:57.482786\t2021-10-06 18:32:57\t2021-10-06 18:32:57.482786\t10\t11\tvarbinary\tRED\n") + + mysql_node.query(""" + INSERT INTO test_database_datatype.t1 (v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13, v14, v15, v16, v17, v18, v19, v20, v21, v26, v29, v30, v31, v32) values + (2, 22, 9223372036854775807, -2, 2, 22, 18446744073709551615, -2.2, 2.2, -2.22, 2.222, 2.2222, '2021-10-07', 'text', 'varchar', 'BLOB', '2021-10-07 18:32:57', '2021-10-07 18:32:57.482786', '2021-10-07 18:32:57', '2021-10-07 18:32:57.482786', b'1011', 22, 'varbinary', 'binary', 'GREEN' ); + """) + # increment synchronization check + check_query(clickhouse_node, "SELECT v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13, v14, v15, v16, v17, v18, v19, v20, v21, v26, v29, v30, v32 FROM test_database_datatype.t1 ORDER BY v1 FORMAT TSV", + "1\t1\t11\t9223372036854775807\t-1\t1\t11\t18446744073709551615\t-1.1\t1.1\t-1.111\t1.111\t1.1111\t2021-10-06\ttext\tvarchar\tBLOB\t2021-10-06 18:32:57\t2021-10-06 18:32:57.482786\t2021-10-06 18:32:57\t2021-10-06 18:32:57.482786\t10\t11\tvarbinary\tRED\n" + + "2\t2\t22\t9223372036854775807\t-2\t2\t22\t18446744073709551615\t-2.2\t2.2\t-2.22\t2.222\t2.2222\t2021-10-07\ttext\tvarchar\tBLOB\t2021-10-07 18:32:57\t2021-10-07 18:32:57.482786\t2021-10-07 18:32:57\t2021-10-07 18:32:57.482786\t11\t22\tvarbinary\tGREEN\n") diff --git a/tests/integration/test_materialized_mysql_database/test.py b/tests/integration/test_materialized_mysql_database/test.py index 5142a613799..501c0cd78fa 100644 --- a/tests/integration/test_materialized_mysql_database/test.py +++ b/tests/integration/test_materialized_mysql_database/test.py @@ -253,3 +253,7 @@ def test_table_table(started_cluster, started_mysql_8_0, started_mysql_5_7, clic def test_table_overrides(started_cluster, started_mysql_8_0, started_mysql_5_7, clickhouse_node): materialize_with_ddl.table_overrides(clickhouse_node, started_mysql_5_7, "mysql57") materialize_with_ddl.table_overrides(clickhouse_node, started_mysql_8_0, "mysql80") + +def test_materialized_database_support_all_kinds_of_mysql_datatype(started_cluster, started_mysql_8_0, started_mysql_5_7, clickhouse_node): + materialize_with_ddl.materialized_database_support_all_kinds_of_mysql_datatype(clickhouse_node, started_mysql_8_0, "mysql80") + materialize_with_ddl.materialized_database_support_all_kinds_of_mysql_datatype(clickhouse_node, started_mysql_5_7, "mysql57")