to MaterializeMySQL_support_set_and_other_dataType

This commit is contained in:
zzsmdfj 2022-01-21 11:37:34 +08:00
parent e380e59b16
commit 411f43ec4d
7 changed files with 60 additions and 56 deletions

View File

@ -78,18 +78,21 @@ When working with the `MaterializedMySQL` database engine, [ReplacingMergeTree](
| DATE, NEWDATE | [Date](../../sql-reference/data-types/date.md) |
| DATETIME, TIMESTAMP | [DateTime](../../sql-reference/data-types/datetime.md) |
| DATETIME2, TIMESTAMP2 | [DateTime64](../../sql-reference/data-types/datetime64.md) |
| YEAR | [String](../../sql-reference/data-types/string.md) |
| TIME | [String](../../sql-reference/data-types/string.md) |
| YEAR | [UInt16](../../sql-reference/data-types/int-uint.md) |
| TIME | [Int64](../../sql-reference/data-types/int-uint.md) |
| ENUM | [Enum](../../sql-reference/data-types/enum.md) |
| STRING | [String](../../sql-reference/data-types/string.md) |
| VARCHAR, VAR_STRING | [String](../../sql-reference/data-types/string.md) |
| BLOB | [String](../../sql-reference/data-types/string.md) |
| GEOMETRY | [String](../../sql-reference/data-types/string.md) |
| BINARY | [FixedString](../../sql-reference/data-types/fixedstring.md) |
| BIT | [UInt64](../../sql-reference/data-types/int-uint.md) |
| SET | [UInt64](../../sql-reference/data-types/int-uint.md) |
[Nullable](../../sql-reference/data-types/nullable.md) is supported.
The data of TIME type in MySQL is converted to microseconds in ClickHouse.
Other types are not supported. If MySQL table contains a column of such type, ClickHouse throws exception "Unhandled data type" and stops replication.
## Specifics and Recommendations {#specifics-and-recommendations}

View File

@ -16,7 +16,15 @@ using MYSQL_ROW = char**;
struct st_mysql_field;
using MYSQL_FIELD = st_mysql_field;
enum struct enum_field_types;
enum struct enum_field_types { MYSQL_TYPE_DECIMAL, MYSQL_TYPE_TINY,
MYSQL_TYPE_SHORT, MYSQL_TYPE_LONG,
MYSQL_TYPE_FLOAT, MYSQL_TYPE_DOUBLE,
MYSQL_TYPE_NULL, MYSQL_TYPE_TIMESTAMP,
MYSQL_TYPE_LONGLONG,MYSQL_TYPE_INT24,
MYSQL_TYPE_DATE, MYSQL_TYPE_TIME,
MYSQL_TYPE_DATETIME, MYSQL_TYPE_YEAR,
MYSQL_TYPE_NEWDATE, MYSQL_TYPE_VARCHAR,
MYSQL_TYPE_BIT };
#endif

View File

@ -217,7 +217,6 @@ namespace MySQLReplication
case MYSQL_TYPE_DATETIME2:
case MYSQL_TYPE_TIME2:
case MYSQL_TYPE_BLOB:
case MYSQL_TYPE_JSON:
case MYSQL_TYPE_GEOMETRY:
{
column_meta.emplace_back(UInt16(meta[pos]));
@ -437,19 +436,15 @@ namespace MySQLReplication
break;
}
case MYSQL_TYPE_YEAR: {
Int32 val = 0;
Int16 val = 0;
payload.readStrict(reinterpret_cast<char *>(&val), 1);
String time_str;
time_str.resize(4);
sprintf(time_str.data(), "%04d", (val + 1900));
row.push_back(Field{String{time_str}});
row.push_back(Field{UInt16{static_cast<UInt16>(val + 1900)}});
break;
}
case MYSQL_TYPE_TIME2:
{
UInt64 uintpart = 0UL;
UInt32 frac = 0U;
Int32 frac = 0U;
Int64 ltime;
Int64 intpart;
switch (meta)
@ -508,50 +503,28 @@ namespace MySQLReplication
break;
}
}
String hh, mm, ss, ff;
Int64 hh, mm, ss;
bool negative = false;
if (intpart == 0)
{
hh = "00";
mm = "00";
ss = "00";
hh = 0;
mm = 0;
ss = 0;
}
else
{
if (ltime < 0) negative= true;
UInt64 ultime = std::abs(ltime);
intpart = ultime >> 24;
UInt32 d = (intpart >> 12) % (1 << 10);
if (d >= 100)
{
hh.resize(3);
sprintf(hh.data(), "%3d", d);
}
else
{
hh.resize(2);
sprintf(hh.data(), "%02d", d);
}
mm.resize(2);
ss.resize(2);
sprintf(mm.data(), "%02d", static_cast<int> (intpart >> 6) % (1 << 6));
sprintf(ss.data(), "%02d", static_cast<int> (intpart % (1 << 6)));
hh = (intpart >> 12) % (1 << 10);
mm = (intpart >> 6) % (1 << 6);
ss = intpart % (1 << 6);
}
if (meta > 1)
{
ff.resize(6);
sprintf(ff.data(), "%06d", frac);
}
String time_buff;
if (negative) time_buff += '-';
time_buff.append(hh).append(":");
time_buff.append(mm).append(":");
time_buff.append(ss);
if (meta > 1)
{
time_buff.append(".").append(ff);
}
row.push_back(Field{String{time_buff}});
Int64 time_micro = 0;
time_micro = (hh * 3600 + mm * 60 + ss) * 1000000 + std::abs(frac);
if (negative) time_micro = - time_micro;
row.push_back(Field{Int64{time_micro}});
break;
}
case MYSQL_TYPE_DATETIME2:
@ -715,10 +688,6 @@ namespace MySQLReplication
row.push_back(Field{UInt64{bitmap1.to_ulong()}});
break;
}
//todo parse binlog from json type is is a little complex, Unless we find a library that we can use it to parse here.
// case MYSQL_TYPE_JSON:
// {
// }
case MYSQL_TYPE_BIT:
{
UInt32 bits = ((meta >> 8) * 8) + (meta & 0xff);

View File

@ -92,8 +92,7 @@ void registerDataTypeString(DataTypeFactory & factory)
factory.registerAlias("BINARY LARGE OBJECT", "String", DataTypeFactory::CaseInsensitive);
factory.registerAlias("BINARY VARYING", "String", DataTypeFactory::CaseInsensitive);
factory.registerAlias("VARBINARY", "String", DataTypeFactory::CaseInsensitive);
factory.registerAlias("YEAR", "String", DataTypeFactory::CaseInsensitive);
factory.registerAlias("GEOMETRY", "String", DataTypeFactory::CaseInsensitive); //mysql
factory.registerAlias("TIME", "String", DataTypeFactory::CaseInsensitive);
}
}

View File

@ -88,6 +88,8 @@ void registerDataTypeNumbers(DataTypeFactory & factory)
factory.registerAlias("BIGINT UNSIGNED", "UInt64", DataTypeFactory::CaseInsensitive);
factory.registerAlias("BIT", "UInt64", DataTypeFactory::CaseInsensitive); /// MySQL
factory.registerAlias("SET", "UInt64", DataTypeFactory::CaseInsensitive); /// MySQL
factory.registerAlias("YEAR", "UInt16", DataTypeFactory::CaseInsensitive);
factory.registerAlias("TIME", "Int64", DataTypeFactory::CaseInsensitive);
}
}

View File

@ -41,7 +41,7 @@ TEST(MySQLCreateRewritten, ColumnsDataType)
{"INTEGER", "Int32"}, {"BIGINT", "Int64"}, {"FLOAT", "Float32"}, {"DOUBLE", "Float64"},
{"VARCHAR(10)", "String"}, {"CHAR(10)", "String"}, {"Date", "Date"}, {"DateTime", "DateTime"},
{"TIMESTAMP", "DateTime"}, {"BOOLEAN", "Bool"}, {"BIT", "UInt64"}, {"SET", "UInt64"},
{"YEAR", "String"}, {"TIME", "String"}
{"YEAR", "UInt16"}, {"TIME", "Int64"}, {"GEOMETRY", "String"}
};
for (const auto & [test_type, mapped_type] : test_types)

View File

@ -19,6 +19,7 @@
#include <base/range.h>
#include <base/logger_useful.h>
#include <Processors/Sources/MySQLSource.h>
#include <boost/algorithm/string.hpp>
namespace DB
@ -145,8 +146,7 @@ namespace
break;
case ValueType::vtUInt64:
{
//we don't have enum enum_field_types definition in mysqlxx/Types.h, so we use literal values directly here.
if (static_cast<int>(mysql_type) == 16)
if (mysql_type == enum_field_types::MYSQL_TYPE_BIT)
{
size_t n = value.size();
UInt64 val = 0UL;
@ -175,9 +175,32 @@ namespace
read_bytes_size += 4;
break;
case ValueType::vtInt64:
assert_cast<ColumnInt64 &>(column).insertValue(value.getInt());
read_bytes_size += 8;
{
if (mysql_type == enum_field_types::MYSQL_TYPE_TIME)
{
String time_str(value.data(), value.size());
bool negative = time_str.starts_with("-");
if (negative) time_str = time_str.substr(1);
std::vector<String> hhmmss;
boost::split(hhmmss, time_str, [](char c) { return c == ':'; });
Int64 v = 0;
if (hhmmss.size() == 3)
{
v = (std::stoi(hhmmss[0]) * 3600 + std::stoi(hhmmss[1]) * 60 + std::stold(hhmmss[2])) * 1000000;
}
else
throw Exception("Unsupported value format", ErrorCodes::NOT_IMPLEMENTED);
if (negative) v = -v;
assert_cast<ColumnInt64 &>(column).insertValue(v);
read_bytes_size += value.size();
}
else
{
assert_cast<ColumnInt64 &>(column).insertValue(value.getInt());
read_bytes_size += 8;
}
break;
}
case ValueType::vtFloat32:
assert_cast<ColumnFloat32 &>(column).insertValue(value.getDouble());
read_bytes_size += 4;