to #15182_MaterializeMySQL_support_bit_type

This commit is contained in:
zzsmdfj 2021-12-17 14:48:49 +08:00
parent db01e94f66
commit e9eed1f927
12 changed files with 133 additions and 12 deletions

View File

@ -4,8 +4,6 @@
#include <mysql/mysql.h>
#endif
#include <mysqlxx/Row.h>
namespace mysqlxx
{
@ -21,4 +19,12 @@ Value Row::operator[] (const char * name) const
throw Exception(std::string("Unknown column ") + name);
}
enum enum_field_types Row::getFieldType(size_t i)
{
if (i >= res->getNumFields())
throw Exception(std::string("Array Index Overflow"));
MYSQL_FIELDS fields = res->getFields();
return fields[i].type;
}
}

View File

@ -79,6 +79,8 @@ public:
*/
operator private_bool_type() const { return row == nullptr ? nullptr : &Row::row; }
enum enum_field_types getFieldType(size_t i);
private:
MYSQL_ROW row{};
ResultBase * res{};

View File

@ -16,6 +16,8 @@ using MYSQL_ROW = char**;
struct st_mysql_field;
using MYSQL_FIELD = st_mysql_field;
enum struct enum_field_types;
#endif
namespace mysqlxx

View File

@ -83,6 +83,7 @@ When working with the `MaterializedMySQL` database engine, [ReplacingMergeTree](
| VARCHAR, VAR_STRING | [String](../../sql-reference/data-types/string.md) |
| BLOB | [String](../../sql-reference/data-types/string.md) |
| BINARY | [FixedString](../../sql-reference/data-types/fixedstring.md) |
| BIT | [UInt64](../../sql-reference/data-types/int-uint.md) |
[Nullable](../../sql-reference/data-types/nullable.md) is supported.

View File

@ -230,6 +230,7 @@ namespace MySQLReplication
pos += 2;
break;
}
case MYSQL_TYPE_BIT:
case MYSQL_TYPE_VARCHAR:
case MYSQL_TYPE_VAR_STRING: {
/// Little-Endian
@ -584,6 +585,19 @@ namespace MySQLReplication
}
break;
}
case MYSQL_TYPE_BIT:
{
UInt32 bits = ((meta >> 8) * 8) + (meta & 0xff);
UInt32 size = (bits + 7) / 8;
Bitmap bitmap_value;
String byte_buffer;
byte_buffer.resize(size);
readBigEndianStrict(payload, reinterpret_cast<char *>(byte_buffer.data()), size);
readBitmapFromStr(byte_buffer.c_str(), bitmap_value, size);
row.push_back(Field{UInt64{bitmap_value.to_ulong()}});
break;
}
case MYSQL_TYPE_VARCHAR:
case MYSQL_TYPE_VAR_STRING:
{

View File

@ -78,11 +78,8 @@ namespace MySQLReplication
}
}
inline void readBitmap(ReadBuffer & payload, Bitmap & bitmap, size_t bitmap_size)
inline void readBitmapFromStr(const char * byte_buffer, Bitmap & bitmap, size_t bitmap_size)
{
String byte_buffer;
byte_buffer.resize(bitmap_size);
payload.readStrict(reinterpret_cast<char *>(byte_buffer.data()), bitmap_size);
bitmap.resize(bitmap_size * 8, false);
for (size_t i = 0; i < bitmap_size; ++i)
{
@ -109,6 +106,14 @@ namespace MySQLReplication
}
}
inline void readBitmap(ReadBuffer & payload, Bitmap & bitmap, size_t bitmap_size)
{
String byte_buffer;
byte_buffer.resize(bitmap_size);
payload.readStrict(reinterpret_cast<char *>(byte_buffer.data()), bitmap_size);
readBitmapFromStr(byte_buffer.c_str(), bitmap, bitmap_size);
}
class EventBase;
using BinlogEventPtr = std::shared_ptr<EventBase>;

View File

@ -86,6 +86,7 @@ void registerDataTypeNumbers(DataTypeFactory & factory)
factory.registerAlias("INT UNSIGNED", "UInt32", DataTypeFactory::CaseInsensitive);
factory.registerAlias("INTEGER UNSIGNED", "UInt32", DataTypeFactory::CaseInsensitive);
factory.registerAlias("BIGINT UNSIGNED", "UInt64", DataTypeFactory::CaseInsensitive);
factory.registerAlias("BIT", "UInt64", DataTypeFactory::CaseInsensitive);
}
}

View File

@ -91,6 +91,10 @@ DataTypePtr convertMySQLDataType(MultiEnum<MySQLDataTypesSupport> type_support,
res = std::make_shared<DataTypeDateTime64>(scale);
}
}
else if (type_name == "bit")
{
res = std::make_shared<DataTypeUInt64>();
}
else if (type_support.isSet(MySQLDataTypesSupport::DECIMAL) && (type_name == "numeric" || type_name == "decimal"))
{
if (precision <= DecimalUtils::max_precision<Decimal32>)

View File

@ -40,7 +40,7 @@ TEST(MySQLCreateRewritten, ColumnsDataType)
{"TINYINT", "Int8"}, {"SMALLINT", "Int16"}, {"MEDIUMINT", "Int32"}, {"INT", "Int32"},
{"INTEGER", "Int32"}, {"BIGINT", "Int64"}, {"FLOAT", "Float32"}, {"DOUBLE", "Float64"},
{"VARCHAR(10)", "String"}, {"CHAR(10)", "String"}, {"Date", "Date"}, {"DateTime", "DateTime"},
{"TIMESTAMP", "DateTime"}, {"BOOLEAN", "Bool"}
{"TIMESTAMP", "DateTime"}, {"BOOLEAN", "Bool"}, {"BIT", "UInt64"}
};
for (const auto & [test_type, mapped_type] : test_types)

View File

@ -2,6 +2,7 @@
#if USE_MYSQL
#include <vector>
#include <Core/MySQL/MySQLReplication.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
@ -126,7 +127,7 @@ namespace
{
using ValueType = ExternalResultDescription::ValueType;
void insertValue(const IDataType & data_type, IColumn & column, const ValueType type, const mysqlxx::Value & value, size_t & read_bytes_size)
void insertValue(const IDataType & data_type, IColumn & column, const ValueType type, const mysqlxx::Value & value, size_t & read_bytes_size, enum enum_field_types mysql_type)
{
switch (type)
{
@ -143,9 +144,25 @@ namespace
read_bytes_size += 4;
break;
case ValueType::vtUInt64:
assert_cast<ColumnUInt64 &>(column).insertValue(value.getUInt());
read_bytes_size += 8;
{
//we don't have enum enum_field_types definition in mysqlxx/Types.h, so we use literal values directly here.
if (static_cast<int>(mysql_type) == 16)
{
size_t n = value.size();
char *start = const_cast<char *>(value.data()), *end = start + n;
std::reverse(start, end);
MySQLReplication::Bitmap bitmap;
MySQLReplication::readBitmapFromStr(value.data(), bitmap, n);
assert_cast<ColumnUInt64 &>(column).insertValue(bitmap.to_ulong());
read_bytes_size += n;
}
else
{
assert_cast<ColumnUInt64 &>(column).insertValue(value.getUInt());
read_bytes_size += 8;
}
break;
}
case ValueType::vtInt8:
assert_cast<ColumnInt8 &>(column).insertValue(value.getInt());
read_bytes_size += 1;
@ -258,12 +275,12 @@ Chunk MySQLSource::generate()
{
ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*columns[index]);
const auto & data_type = assert_cast<const DataTypeNullable &>(*sample.type);
insertValue(*data_type.getNestedType(), column_nullable.getNestedColumn(), description.types[index].first, value, read_bytes_size);
insertValue(*data_type.getNestedType(), column_nullable.getNestedColumn(), description.types[index].first, value, read_bytes_size, row.getFieldType(position_mapping[index]));
column_nullable.getNullMapData().emplace_back(false);
}
else
{
insertValue(*sample.type, *columns[index], description.types[index].first, value, read_bytes_size);
insertValue(*sample.type, *columns[index], description.types[index].first, value, read_bytes_size, row.getFieldType(position_mapping[index]));
}
}
else

View File

@ -1072,3 +1072,68 @@ def table_overrides(clickhouse_node, mysql_node, service_name):
check_query(clickhouse_node, "SELECT type FROM system.columns WHERE database = 'table_overrides' AND table = 't1' AND name = 'sensor_id'", "UInt64\n")
clickhouse_node.query("DROP DATABASE IF EXISTS table_overrides")
mysql_node.query("DROP DATABASE IF EXISTS table_overrides")
def materialized_database_support_all_kinds_of_mysql_datatype(clickhouse_node, mysql_node, service_name):
mysql_node.query("DROP DATABASE IF EXISTS test_database_datatype")
clickhouse_node.query("DROP DATABASE IF EXISTS test_database_datatype")
mysql_node.query("CREATE DATABASE test_database_datatype DEFAULT CHARACTER SET 'utf8'")
mysql_node.query("""
CREATE TABLE test_database_datatype.t1 (
`v1` int(10) unsigned AUTO_INCREMENT,
`v2` TINYINT,
`v3` SMALLINT,
`v4` BIGINT,
`v5` int,
`v6` TINYINT unsigned,
`v7` SMALLINT unsigned,
`v8` BIGINT unsigned,
`v9` FLOAT,
`v10` FLOAT unsigned,
`v11` DOUBLE,
`v12` DOUBLE unsigned,
`v13` DECIMAL(5,4),
`v14` date,
`v15` TEXT,
`v16` varchar(100) ,
`v17` BLOB,
`v18` datetime DEFAULT CURRENT_TIMESTAMP,
`v19` datetime(6) DEFAULT CURRENT_TIMESTAMP(6),
`v20` TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
`v21` TIMESTAMP(6) DEFAULT CURRENT_TIMESTAMP(6),
/* todo support */
# `v22` YEAR,
# `v23` TIME,
# `v24` TIME(3),
# `v25` GEOMETRY,
`v26` bit(4),
# `v27` JSON DEFAULT NULL,
# `v28` set('a', 'c', 'f', 'd', 'e', 'b'),
`v29` mediumint(4) unsigned NOT NULL DEFAULT '0',
`v30` varbinary(255) DEFAULT NULL COMMENT 'varbinary support',
`v31` binary(200) DEFAULT NULL,
`v32` ENUM('RED','GREEN','BLUE'),
PRIMARY KEY (`v1`)
) ENGINE=InnoDB;
""")
mysql_node.query("""
INSERT INTO test_database_datatype.t1 (v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13, v14, v15, v16, v17, v18, v19, v20, v21, v26, v29, v30, v31, v32) values
(1, 11, 9223372036854775807, -1, 1, 11, 18446744073709551615, -1.1, 1.1, -1.111, 1.111, 1.1111, '2021-10-06', 'text', 'varchar', 'BLOB', '2021-10-06 18:32:57', '2021-10-06 18:32:57.482786', '2021-10-06 18:32:57', '2021-10-06 18:32:57.482786', b'1010', 11, 'varbinary', 'binary', 'RED');
""")
clickhouse_node.query(
"CREATE DATABASE test_database_datatype ENGINE = MaterializeMySQL('{}:3306', 'test_database_datatype', 'root', 'clickhouse')".format(
service_name))
assert "test_database_datatype" in clickhouse_node.query("SHOW DATABASES")
# full synchronization check
check_query(clickhouse_node, "SELECT v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13, v14, v15, v16, v17, v18, v19, v20, v21, v26, v29, v30, v32 FROM test_database_datatype.t1 FORMAT TSV",
"1\t1\t11\t9223372036854775807\t-1\t1\t11\t18446744073709551615\t-1.1\t1.1\t-1.111\t1.111\t1.1111\t2021-10-06\ttext\tvarchar\tBLOB\t2021-10-06 18:32:57\t2021-10-06 18:32:57.482786\t2021-10-06 18:32:57\t2021-10-06 18:32:57.482786\t10\t11\tvarbinary\tRED\n")
mysql_node.query("""
INSERT INTO test_database_datatype.t1 (v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13, v14, v15, v16, v17, v18, v19, v20, v21, v26, v29, v30, v31, v32) values
(2, 22, 9223372036854775807, -2, 2, 22, 18446744073709551615, -2.2, 2.2, -2.22, 2.222, 2.2222, '2021-10-07', 'text', 'varchar', 'BLOB', '2021-10-07 18:32:57', '2021-10-07 18:32:57.482786', '2021-10-07 18:32:57', '2021-10-07 18:32:57.482786', b'1011', 22, 'varbinary', 'binary', 'GREEN' );
""")
# increment synchronization check
check_query(clickhouse_node, "SELECT v1, v2, v3, v4, v5, v6, v7, v8, v9, v10, v11, v12, v13, v14, v15, v16, v17, v18, v19, v20, v21, v26, v29, v30, v32 FROM test_database_datatype.t1 ORDER BY v1 FORMAT TSV",
"1\t1\t11\t9223372036854775807\t-1\t1\t11\t18446744073709551615\t-1.1\t1.1\t-1.111\t1.111\t1.1111\t2021-10-06\ttext\tvarchar\tBLOB\t2021-10-06 18:32:57\t2021-10-06 18:32:57.482786\t2021-10-06 18:32:57\t2021-10-06 18:32:57.482786\t10\t11\tvarbinary\tRED\n" +
"2\t2\t22\t9223372036854775807\t-2\t2\t22\t18446744073709551615\t-2.2\t2.2\t-2.22\t2.222\t2.2222\t2021-10-07\ttext\tvarchar\tBLOB\t2021-10-07 18:32:57\t2021-10-07 18:32:57.482786\t2021-10-07 18:32:57\t2021-10-07 18:32:57.482786\t11\t22\tvarbinary\tGREEN\n")

View File

@ -253,3 +253,7 @@ def test_table_table(started_cluster, started_mysql_8_0, started_mysql_5_7, clic
def test_table_overrides(started_cluster, started_mysql_8_0, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.table_overrides(clickhouse_node, started_mysql_5_7, "mysql57")
materialize_with_ddl.table_overrides(clickhouse_node, started_mysql_8_0, "mysql80")
def test_materialized_database_support_all_kinds_of_mysql_datatype(started_cluster, started_mysql_8_0, started_mysql_5_7, clickhouse_node):
materialize_with_ddl.materialized_database_support_all_kinds_of_mysql_datatype(clickhouse_node, started_mysql_8_0, "mysql80")
materialize_with_ddl.materialized_database_support_all_kinds_of_mysql_datatype(clickhouse_node, started_mysql_5_7, "mysql57")