mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-20 08:40:50 +00:00
Merge pull request #14535 from zhang2014/fix/datetime
ISSUES-4006 support decimal data type for MaterializedMySQL
This commit is contained in:
commit
6e0bdaf46d
@ -74,6 +74,8 @@ void ExternalResultDescription::init(const Block & sample_block_)
|
|||||||
types.emplace_back(ValueType::vtDecimal64, is_nullable);
|
types.emplace_back(ValueType::vtDecimal64, is_nullable);
|
||||||
else if (typeid_cast<const DataTypeDecimal<Decimal128> *>(type))
|
else if (typeid_cast<const DataTypeDecimal<Decimal128> *>(type))
|
||||||
types.emplace_back(ValueType::vtDecimal128, is_nullable);
|
types.emplace_back(ValueType::vtDecimal128, is_nullable);
|
||||||
|
else if (typeid_cast<const DataTypeDecimal<Decimal256> *>(type))
|
||||||
|
types.emplace_back(ValueType::vtDecimal256, is_nullable);
|
||||||
else
|
else
|
||||||
throw Exception{"Unsupported type " + type->getName(), ErrorCodes::UNKNOWN_TYPE};
|
throw Exception{"Unsupported type " + type->getName(), ErrorCodes::UNKNOWN_TYPE};
|
||||||
}
|
}
|
||||||
|
@ -29,7 +29,8 @@ struct ExternalResultDescription
|
|||||||
vtDateTime64,
|
vtDateTime64,
|
||||||
vtDecimal32,
|
vtDecimal32,
|
||||||
vtDecimal64,
|
vtDecimal64,
|
||||||
vtDecimal128
|
vtDecimal128,
|
||||||
|
vtDecimal256
|
||||||
};
|
};
|
||||||
|
|
||||||
Block sample_block;
|
Block sample_block;
|
||||||
|
@ -195,10 +195,9 @@ namespace MySQLReplication
|
|||||||
case MYSQL_TYPE_LONGLONG:
|
case MYSQL_TYPE_LONGLONG:
|
||||||
case MYSQL_TYPE_INT24:
|
case MYSQL_TYPE_INT24:
|
||||||
case MYSQL_TYPE_DATE:
|
case MYSQL_TYPE_DATE:
|
||||||
case MYSQL_TYPE_TIME:
|
|
||||||
case MYSQL_TYPE_DATETIME:
|
case MYSQL_TYPE_DATETIME:
|
||||||
case MYSQL_TYPE_YEAR:
|
case MYSQL_TYPE_NEWDATE:
|
||||||
case MYSQL_TYPE_NEWDATE: {
|
{
|
||||||
/// No data here.
|
/// No data here.
|
||||||
column_meta.emplace_back(0);
|
column_meta.emplace_back(0);
|
||||||
break;
|
break;
|
||||||
@ -208,16 +207,15 @@ namespace MySQLReplication
|
|||||||
case MYSQL_TYPE_DOUBLE:
|
case MYSQL_TYPE_DOUBLE:
|
||||||
case MYSQL_TYPE_TIMESTAMP2:
|
case MYSQL_TYPE_TIMESTAMP2:
|
||||||
case MYSQL_TYPE_DATETIME2:
|
case MYSQL_TYPE_DATETIME2:
|
||||||
case MYSQL_TYPE_TIME2:
|
|
||||||
case MYSQL_TYPE_JSON:
|
|
||||||
case MYSQL_TYPE_BLOB:
|
case MYSQL_TYPE_BLOB:
|
||||||
case MYSQL_TYPE_GEOMETRY: {
|
{
|
||||||
column_meta.emplace_back(UInt16(meta[pos]));
|
column_meta.emplace_back(UInt16(meta[pos]));
|
||||||
pos += 1;
|
pos += 1;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case MYSQL_TYPE_NEWDECIMAL:
|
case MYSQL_TYPE_NEWDECIMAL:
|
||||||
case MYSQL_TYPE_STRING: {
|
case MYSQL_TYPE_STRING:
|
||||||
|
{
|
||||||
/// Big-Endian
|
/// Big-Endian
|
||||||
auto b0 = UInt16(meta[pos] << 8);
|
auto b0 = UInt16(meta[pos] << 8);
|
||||||
auto b1 = UInt8(meta[pos + 1]);
|
auto b1 = UInt8(meta[pos + 1]);
|
||||||
@ -225,8 +223,6 @@ namespace MySQLReplication
|
|||||||
pos += 2;
|
pos += 2;
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
case MYSQL_TYPE_BIT:
|
|
||||||
case MYSQL_TYPE_VARCHAR:
|
case MYSQL_TYPE_VARCHAR:
|
||||||
case MYSQL_TYPE_VAR_STRING: {
|
case MYSQL_TYPE_VAR_STRING: {
|
||||||
/// Little-Endian
|
/// Little-Endian
|
||||||
@ -355,71 +351,65 @@ namespace MySQLReplication
|
|||||||
|
|
||||||
switch (field_type)
|
switch (field_type)
|
||||||
{
|
{
|
||||||
case MYSQL_TYPE_TINY: {
|
case MYSQL_TYPE_TINY:
|
||||||
|
{
|
||||||
UInt8 val = 0;
|
UInt8 val = 0;
|
||||||
payload.readStrict(reinterpret_cast<char *>(&val), 1);
|
payload.readStrict(reinterpret_cast<char *>(&val), 1);
|
||||||
row.push_back(Field{UInt8{val}});
|
row.push_back(Field{UInt8{val}});
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case MYSQL_TYPE_SHORT: {
|
case MYSQL_TYPE_SHORT:
|
||||||
|
{
|
||||||
UInt16 val = 0;
|
UInt16 val = 0;
|
||||||
payload.readStrict(reinterpret_cast<char *>(&val), 2);
|
payload.readStrict(reinterpret_cast<char *>(&val), 2);
|
||||||
row.push_back(Field{UInt16{val}});
|
row.push_back(Field{UInt16{val}});
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case MYSQL_TYPE_INT24: {
|
case MYSQL_TYPE_INT24:
|
||||||
|
{
|
||||||
Int32 val = 0;
|
Int32 val = 0;
|
||||||
payload.readStrict(reinterpret_cast<char *>(&val), 3);
|
payload.readStrict(reinterpret_cast<char *>(&val), 3);
|
||||||
row.push_back(Field{Int32{val}});
|
row.push_back(Field{Int32{val}});
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case MYSQL_TYPE_LONG: {
|
case MYSQL_TYPE_LONG:
|
||||||
|
{
|
||||||
UInt32 val = 0;
|
UInt32 val = 0;
|
||||||
payload.readStrict(reinterpret_cast<char *>(&val), 4);
|
payload.readStrict(reinterpret_cast<char *>(&val), 4);
|
||||||
row.push_back(Field{UInt32{val}});
|
row.push_back(Field{UInt32{val}});
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case MYSQL_TYPE_LONGLONG: {
|
case MYSQL_TYPE_LONGLONG:
|
||||||
|
{
|
||||||
UInt64 val = 0;
|
UInt64 val = 0;
|
||||||
payload.readStrict(reinterpret_cast<char *>(&val), 8);
|
payload.readStrict(reinterpret_cast<char *>(&val), 8);
|
||||||
row.push_back(Field{UInt64{val}});
|
row.push_back(Field{UInt64{val}});
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case MYSQL_TYPE_FLOAT: {
|
case MYSQL_TYPE_FLOAT:
|
||||||
|
{
|
||||||
Float32 val = 0;
|
Float32 val = 0;
|
||||||
payload.readStrict(reinterpret_cast<char *>(&val), 4);
|
payload.readStrict(reinterpret_cast<char *>(&val), 4);
|
||||||
row.push_back(Field{Float32{val}});
|
row.push_back(Field{Float32{val}});
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case MYSQL_TYPE_DOUBLE: {
|
case MYSQL_TYPE_DOUBLE:
|
||||||
|
{
|
||||||
Float64 val = 0;
|
Float64 val = 0;
|
||||||
payload.readStrict(reinterpret_cast<char *>(&val), 8);
|
payload.readStrict(reinterpret_cast<char *>(&val), 8);
|
||||||
row.push_back(Field{Float64{val}});
|
row.push_back(Field{Float64{val}});
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case MYSQL_TYPE_TIMESTAMP: {
|
case MYSQL_TYPE_TIMESTAMP:
|
||||||
|
{
|
||||||
UInt32 val = 0;
|
UInt32 val = 0;
|
||||||
|
|
||||||
payload.readStrict(reinterpret_cast<char *>(&val), 4);
|
payload.readStrict(reinterpret_cast<char *>(&val), 4);
|
||||||
row.push_back(Field{val});
|
row.push_back(Field{val});
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case MYSQL_TYPE_TIME: {
|
case MYSQL_TYPE_DATE:
|
||||||
UInt32 i24 = 0;
|
{
|
||||||
payload.readStrict(reinterpret_cast<char *>(&i24), 3);
|
|
||||||
|
|
||||||
String time_buff;
|
|
||||||
time_buff.resize(8);
|
|
||||||
sprintf(
|
|
||||||
time_buff.data(),
|
|
||||||
"%02d:%02d:%02d",
|
|
||||||
static_cast<int>(i24 / 10000),
|
|
||||||
static_cast<int>(i24 % 10000) / 100,
|
|
||||||
static_cast<int>(i24 % 100));
|
|
||||||
row.push_back(Field{String{time_buff}});
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case MYSQL_TYPE_DATE: {
|
|
||||||
UInt32 i24 = 0;
|
UInt32 i24 = 0;
|
||||||
payload.readStrict(reinterpret_cast<char *>(&i24), 3);
|
payload.readStrict(reinterpret_cast<char *>(&i24), 3);
|
||||||
|
|
||||||
@ -429,60 +419,12 @@ namespace MySQLReplication
|
|||||||
row.push_back(Field(date_day_number.toUnderType()));
|
row.push_back(Field(date_day_number.toUnderType()));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case MYSQL_TYPE_YEAR: {
|
case MYSQL_TYPE_DATETIME2:
|
||||||
Int32 val = 0;
|
{
|
||||||
payload.readStrict(reinterpret_cast<char *>(&val), 1);
|
Int64 val = 0;
|
||||||
|
UInt32 fsp = 0;
|
||||||
String time_buff;
|
|
||||||
time_buff.resize(4);
|
|
||||||
sprintf(time_buff.data(), "%04d", (val + 1900));
|
|
||||||
row.push_back(Field{String{time_buff}});
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case MYSQL_TYPE_TIME2: {
|
|
||||||
UInt32 val = 0, frac_part = 0;
|
|
||||||
|
|
||||||
readBigEndianStrict(payload, reinterpret_cast<char *>(&val), 3);
|
|
||||||
if (readBits(val, 0, 1, 24) == 0)
|
|
||||||
{
|
|
||||||
val = ~val + 1;
|
|
||||||
}
|
|
||||||
UInt32 hour = readBits(val, 2, 10, 24);
|
|
||||||
UInt32 minute = readBits(val, 12, 6, 24);
|
|
||||||
UInt32 second = readBits(val, 18, 6, 24);
|
|
||||||
readTimeFractionalPart(payload, reinterpret_cast<char *>(&frac_part), meta);
|
|
||||||
|
|
||||||
if (frac_part != 0)
|
|
||||||
{
|
|
||||||
String time_buff;
|
|
||||||
time_buff.resize(15);
|
|
||||||
sprintf(
|
|
||||||
time_buff.data(),
|
|
||||||
"%02d:%02d:%02d.%06d",
|
|
||||||
static_cast<int>(hour),
|
|
||||||
static_cast<int>(minute),
|
|
||||||
static_cast<int>(second),
|
|
||||||
static_cast<int>(frac_part));
|
|
||||||
row.push_back(Field{String{time_buff}});
|
|
||||||
}
|
|
||||||
else
|
|
||||||
{
|
|
||||||
String time_buff;
|
|
||||||
time_buff.resize(8);
|
|
||||||
sprintf(
|
|
||||||
time_buff.data(),
|
|
||||||
"%02d:%02d:%02d",
|
|
||||||
static_cast<int>(hour),
|
|
||||||
static_cast<int>(minute),
|
|
||||||
static_cast<int>(second));
|
|
||||||
row.push_back(Field{String{time_buff}});
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case MYSQL_TYPE_DATETIME2: {
|
|
||||||
Int64 val = 0, fsp = 0;
|
|
||||||
readBigEndianStrict(payload, reinterpret_cast<char *>(&val), 5);
|
readBigEndianStrict(payload, reinterpret_cast<char *>(&val), 5);
|
||||||
readTimeFractionalPart(payload, reinterpret_cast<char *>(&fsp), meta);
|
readTimeFractionalPart(payload, fsp, meta);
|
||||||
|
|
||||||
UInt32 year_month = readBits(val, 1, 17, 40);
|
UInt32 year_month = readBits(val, 1, 17, 40);
|
||||||
time_t date_time = DateLUT::instance().makeDateTime(
|
time_t date_time = DateLUT::instance().makeDateTime(
|
||||||
@ -490,138 +432,130 @@ namespace MySQLReplication
|
|||||||
, readBits(val, 23, 5, 40), readBits(val, 28, 6, 40), readBits(val, 34, 6, 40)
|
, readBits(val, 23, 5, 40), readBits(val, 28, 6, 40), readBits(val, 34, 6, 40)
|
||||||
);
|
);
|
||||||
|
|
||||||
row.push_back(Field{UInt32(date_time)});
|
if (!meta)
|
||||||
|
row.push_back(Field{UInt32(date_time)});
|
||||||
|
else
|
||||||
|
{
|
||||||
|
DB::DecimalUtils::DecimalComponents<DateTime64::NativeType> components{
|
||||||
|
static_cast<DateTime64::NativeType>(date_time), 0};
|
||||||
|
|
||||||
|
components.fractional = fsp;
|
||||||
|
row.push_back(Field(DecimalUtils::decimalFromComponents<DateTime64>(components, meta)));
|
||||||
|
}
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case MYSQL_TYPE_TIMESTAMP2: {
|
case MYSQL_TYPE_TIMESTAMP2:
|
||||||
|
{
|
||||||
UInt32 sec = 0, fsp = 0;
|
UInt32 sec = 0, fsp = 0;
|
||||||
readBigEndianStrict(payload, reinterpret_cast<char *>(&sec), 4);
|
readBigEndianStrict(payload, reinterpret_cast<char *>(&sec), 4);
|
||||||
readTimeFractionalPart(payload, reinterpret_cast<char *>(&fsp), meta);
|
readTimeFractionalPart(payload, fsp, meta);
|
||||||
row.push_back(Field{sec});
|
|
||||||
|
if (!meta)
|
||||||
|
row.push_back(Field{sec});
|
||||||
|
else
|
||||||
|
{
|
||||||
|
DB::DecimalUtils::DecimalComponents<DateTime64::NativeType> components{
|
||||||
|
static_cast<DateTime64::NativeType>(sec), 0};
|
||||||
|
|
||||||
|
components.fractional = fsp;
|
||||||
|
row.push_back(Field(DecimalUtils::decimalFromComponents<DateTime64>(components, meta)));
|
||||||
|
}
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case MYSQL_TYPE_NEWDECIMAL: {
|
case MYSQL_TYPE_NEWDECIMAL:
|
||||||
Int8 digits_per_integer = 9;
|
{
|
||||||
Int8 precision = meta >> 8;
|
const auto & dispatch = [](const size_t & precision, const size_t & scale, const auto & function) -> Field
|
||||||
Int8 decimals = meta & 0xff;
|
|
||||||
const char compressed_byte_map[] = {0, 1, 1, 2, 2, 3, 3, 4, 4, 4};
|
|
||||||
|
|
||||||
Int8 integral = (precision - decimals);
|
|
||||||
UInt32 uncompressed_integers = integral / digits_per_integer;
|
|
||||||
UInt32 uncompressed_decimals = decimals / digits_per_integer;
|
|
||||||
UInt32 compressed_integers = integral - (uncompressed_integers * digits_per_integer);
|
|
||||||
UInt32 compressed_decimals = decimals - (uncompressed_decimals * digits_per_integer);
|
|
||||||
|
|
||||||
String buff;
|
|
||||||
UInt32 bytes_to_read = uncompressed_integers * 4 + compressed_byte_map[compressed_integers]
|
|
||||||
+ uncompressed_decimals * 4 + compressed_byte_map[compressed_decimals];
|
|
||||||
buff.resize(bytes_to_read);
|
|
||||||
payload.readStrict(reinterpret_cast<char *>(buff.data()), bytes_to_read);
|
|
||||||
|
|
||||||
String format;
|
|
||||||
format.resize(0);
|
|
||||||
|
|
||||||
bool is_negative = ((buff[0] & 0x80) == 0);
|
|
||||||
if (is_negative)
|
|
||||||
{
|
{
|
||||||
format += "-";
|
if (precision <= DecimalUtils::maxPrecision<Decimal32>())
|
||||||
}
|
return Field(function(precision, scale, Decimal32()));
|
||||||
buff[0] ^= 0x80;
|
else if (precision <= DecimalUtils::maxPrecision<Decimal64>())
|
||||||
|
return Field(function(precision, scale, Decimal64()));
|
||||||
|
else if (precision <= DecimalUtils::maxPrecision<Decimal128>())
|
||||||
|
return Field(function(precision, scale, Decimal128()));
|
||||||
|
|
||||||
ReadBufferFromString reader(buff);
|
return Field(function(precision, scale, Decimal256()));
|
||||||
/// Compressed part.
|
};
|
||||||
if (compressed_integers != 0)
|
|
||||||
{
|
|
||||||
Int64 val = 0;
|
|
||||||
UInt8 to_read = compressed_byte_map[compressed_integers];
|
|
||||||
readBigEndianStrict(reader, reinterpret_cast<char *>(&val), to_read);
|
|
||||||
format += std::to_string(val);
|
|
||||||
}
|
|
||||||
|
|
||||||
for (auto k = 0U; k < uncompressed_integers; k++)
|
const auto & read_decimal = [&](const size_t & precision, const size_t & scale, auto decimal)
|
||||||
{
|
{
|
||||||
UInt32 val = 0;
|
using DecimalType = decltype(decimal);
|
||||||
readBigEndianStrict(reader, reinterpret_cast<char *>(&val), 4);
|
static constexpr size_t digits_per_integer = 9;
|
||||||
format += std::to_string(val);
|
static const size_t compressed_bytes_map[] = {0, 1, 1, 2, 2, 3, 3, 4, 4, 4};
|
||||||
}
|
static const size_t compressed_integer_align_numbers[] = {
|
||||||
format += ".";
|
0x0, 0xFF, 0xFF, 0xFFFF, 0xFFFF, 0xFFFFFF, 0xFFFFFF, 0xFFFFFFFF, 0xFFFFFFFF, 0xFFFFFFFF};
|
||||||
for (auto k = 0U; k < uncompressed_decimals; k++)
|
|
||||||
{
|
UInt32 mask = 0;
|
||||||
UInt32 val = 0;
|
DecimalType res(0);
|
||||||
reader.readStrict(reinterpret_cast<char *>(&val), 4);
|
|
||||||
format += std::to_string(val);
|
if ((*payload.position() & 0x80) == 0)
|
||||||
}
|
mask = UInt32(-1);
|
||||||
|
|
||||||
|
*payload.position() ^= 0x80;
|
||||||
|
|
||||||
/// Compressed part.
|
|
||||||
if (compressed_decimals != 0)
|
|
||||||
{
|
|
||||||
Int64 val = 0;
|
|
||||||
String compressed_buff;
|
|
||||||
UInt8 to_read = compressed_byte_map[compressed_decimals];
|
|
||||||
switch (to_read)
|
|
||||||
{
|
{
|
||||||
case 1: {
|
size_t integral = (precision - scale);
|
||||||
reader.readStrict(reinterpret_cast<char *>(&val), 1);
|
size_t uncompressed_integers = integral / digits_per_integer;
|
||||||
break;
|
size_t compressed_integers = integral - (uncompressed_integers * digits_per_integer);
|
||||||
}
|
|
||||||
case 2: {
|
|
||||||
readBigEndianStrict(reader, reinterpret_cast<char *>(&val), 2);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case 3: {
|
|
||||||
readBigEndianStrict(reader, reinterpret_cast<char *>(&val), 3);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case 4: {
|
|
||||||
readBigEndianStrict(reader, reinterpret_cast<char *>(&val), 4);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
format += std::to_string(val);
|
|
||||||
}
|
|
||||||
row.push_back(Field{String{format}});
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case MYSQL_TYPE_ENUM: {
|
|
||||||
Int32 val = 0;
|
|
||||||
Int32 len = (meta & 0xff);
|
|
||||||
switch (len)
|
|
||||||
{
|
|
||||||
case 1: {
|
|
||||||
payload.readStrict(reinterpret_cast<char *>(&val), 1);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case 2: {
|
|
||||||
payload.readStrict(reinterpret_cast<char *>(&val), 2);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
default:
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
row.push_back(Field{Int32{val}});
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
case MYSQL_TYPE_BIT: {
|
|
||||||
UInt32 bits = ((meta >> 8) * 8) + (meta & 0xff);
|
|
||||||
UInt32 size = (bits + 7) / 8;
|
|
||||||
|
|
||||||
Bitmap bitmap1;
|
/// Compressed part.
|
||||||
readBitmap(payload, bitmap1, size);
|
if (compressed_integers != 0)
|
||||||
row.push_back(Field{UInt64{bitmap1.to_ulong()}});
|
{
|
||||||
break;
|
UInt32 val = 0;
|
||||||
}
|
size_t to_read = compressed_bytes_map[compressed_integers];
|
||||||
case MYSQL_TYPE_SET: {
|
readBigEndianStrict(payload, reinterpret_cast<char *>(&val), to_read);
|
||||||
UInt32 size = (meta & 0xff);
|
res += (val ^ (mask & compressed_integer_align_numbers[compressed_integers]));
|
||||||
|
}
|
||||||
|
|
||||||
Bitmap bitmap1;
|
for (auto k = 0U; k < uncompressed_integers; k++)
|
||||||
readBitmap(payload, bitmap1, size);
|
{
|
||||||
row.push_back(Field{UInt64{bitmap1.to_ulong()}});
|
UInt32 val = 0;
|
||||||
|
readBigEndianStrict(payload, reinterpret_cast<char *>(&val), 4);
|
||||||
|
res *= intExp10OfSize<DecimalType>(digits_per_integer);
|
||||||
|
res += (val ^ mask);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
size_t uncompressed_decimals = scale / digits_per_integer;
|
||||||
|
size_t compressed_decimals = scale - (uncompressed_decimals * digits_per_integer);
|
||||||
|
|
||||||
|
for (auto k = 0U; k < uncompressed_decimals; k++)
|
||||||
|
{
|
||||||
|
UInt32 val = 0;
|
||||||
|
readBigEndianStrict(payload, reinterpret_cast<char *>(&val), 4);
|
||||||
|
res *= intExp10OfSize<DecimalType>(digits_per_integer);
|
||||||
|
res += (val ^ mask);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Compressed part.
|
||||||
|
if (compressed_decimals != 0)
|
||||||
|
{
|
||||||
|
UInt32 val = 0;
|
||||||
|
size_t to_read = compressed_bytes_map[compressed_decimals];
|
||||||
|
|
||||||
|
if (to_read)
|
||||||
|
{
|
||||||
|
readBigEndianStrict(payload, reinterpret_cast<char *>(&val), to_read);
|
||||||
|
res *= intExp10OfSize<DecimalType>(compressed_decimals);
|
||||||
|
res += (val ^ (mask & compressed_integer_align_numbers[compressed_decimals]));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (mask != 0)
|
||||||
|
res *= -1;
|
||||||
|
|
||||||
|
return res;
|
||||||
|
};
|
||||||
|
|
||||||
|
row.push_back(dispatch((meta >> 8) & 0xFF, meta & 0xFF, read_decimal));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case MYSQL_TYPE_VARCHAR:
|
case MYSQL_TYPE_VARCHAR:
|
||||||
case MYSQL_TYPE_VAR_STRING: {
|
case MYSQL_TYPE_VAR_STRING:
|
||||||
|
{
|
||||||
uint32_t size = 0;
|
uint32_t size = 0;
|
||||||
if (meta < 256)
|
if (meta < 256)
|
||||||
{
|
{
|
||||||
@ -638,7 +572,8 @@ namespace MySQLReplication
|
|||||||
row.push_back(Field{String{val}});
|
row.push_back(Field{String{val}});
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case MYSQL_TYPE_STRING: {
|
case MYSQL_TYPE_STRING:
|
||||||
|
{
|
||||||
UInt32 size = 0;
|
UInt32 size = 0;
|
||||||
if (field_len < 256)
|
if (field_len < 256)
|
||||||
{
|
{
|
||||||
@ -655,8 +590,8 @@ namespace MySQLReplication
|
|||||||
row.push_back(Field{String{val}});
|
row.push_back(Field{String{val}});
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case MYSQL_TYPE_GEOMETRY:
|
case MYSQL_TYPE_BLOB:
|
||||||
case MYSQL_TYPE_BLOB: {
|
{
|
||||||
UInt32 size = 0;
|
UInt32 size = 0;
|
||||||
switch (meta)
|
switch (meta)
|
||||||
{
|
{
|
||||||
@ -686,16 +621,6 @@ namespace MySQLReplication
|
|||||||
row.push_back(Field{String{val}});
|
row.push_back(Field{String{val}});
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case MYSQL_TYPE_JSON: {
|
|
||||||
UInt32 size = 0;
|
|
||||||
payload.readStrict(reinterpret_cast<char *>(&size), meta);
|
|
||||||
|
|
||||||
String val;
|
|
||||||
val.resize(size);
|
|
||||||
payload.readStrict(reinterpret_cast<char *>(val.data()), size);
|
|
||||||
row.push_back(Field{String{val}});
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
default:
|
default:
|
||||||
throw ReplicationError(
|
throw ReplicationError(
|
||||||
"ParseRow: Unhandled MySQL field type:" + std::to_string(field_type), ErrorCodes::UNKNOWN_EXCEPTION);
|
"ParseRow: Unhandled MySQL field type:" + std::to_string(field_type), ErrorCodes::UNKNOWN_EXCEPTION);
|
||||||
|
@ -36,23 +36,41 @@ namespace MySQLReplication
|
|||||||
std::reverse(start, end);
|
std::reverse(start, end);
|
||||||
}
|
}
|
||||||
|
|
||||||
inline void readTimeFractionalPart(ReadBuffer & payload, char * to, UInt16 meta)
|
inline void readTimeFractionalPart(ReadBuffer & payload, UInt32 & factional, UInt16 meta)
|
||||||
{
|
{
|
||||||
switch (meta)
|
switch (meta)
|
||||||
{
|
{
|
||||||
case 1:
|
case 1:
|
||||||
case 2: {
|
{
|
||||||
readBigEndianStrict(payload, to, 1);
|
readBigEndianStrict(payload, reinterpret_cast<char *>(&factional), 1);
|
||||||
|
factional /= 10;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case 2:
|
||||||
|
{
|
||||||
|
readBigEndianStrict(payload, reinterpret_cast<char *>(&factional), 1);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case 3:
|
case 3:
|
||||||
case 4: {
|
{
|
||||||
readBigEndianStrict(payload, to, 2);
|
readBigEndianStrict(payload, reinterpret_cast<char *>(&factional), 2);
|
||||||
|
factional /= 10;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case 4:
|
||||||
|
{
|
||||||
|
readBigEndianStrict(payload, reinterpret_cast<char *>(&factional), 2);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case 5:
|
case 5:
|
||||||
case 6: {
|
{
|
||||||
readBigEndianStrict(payload, to, 3);
|
readBigEndianStrict(payload, reinterpret_cast<char *>(&factional), 3);
|
||||||
|
factional /= 10;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
case 6:
|
||||||
|
{
|
||||||
|
readBigEndianStrict(payload, reinterpret_cast<char *>(&factional), 3);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
default:
|
default:
|
||||||
|
@ -9,6 +9,7 @@
|
|||||||
# include <cstdlib>
|
# include <cstdlib>
|
||||||
# include <random>
|
# include <random>
|
||||||
# include <Columns/ColumnTuple.h>
|
# include <Columns/ColumnTuple.h>
|
||||||
|
# include <Columns/ColumnDecimal.h>
|
||||||
# include <DataStreams/CountingBlockOutputStream.h>
|
# include <DataStreams/CountingBlockOutputStream.h>
|
||||||
# include <DataStreams/OneBlockInputStream.h>
|
# include <DataStreams/OneBlockInputStream.h>
|
||||||
# include <DataStreams/copyData.h>
|
# include <DataStreams/copyData.h>
|
||||||
@ -453,6 +454,14 @@ static void writeFieldsToColumn(
|
|||||||
write_data_to_column(casted_float32_column, Float64(), Float32());
|
write_data_to_column(casted_float32_column, Float64(), Float32());
|
||||||
else if (ColumnFloat64 * casted_float64_column = typeid_cast<ColumnFloat64 *>(&column_to))
|
else if (ColumnFloat64 * casted_float64_column = typeid_cast<ColumnFloat64 *>(&column_to))
|
||||||
write_data_to_column(casted_float64_column, Float64(), Float64());
|
write_data_to_column(casted_float64_column, Float64(), Float64());
|
||||||
|
else if (ColumnDecimal<Decimal32> * casted_decimal_32_column = typeid_cast<ColumnDecimal<Decimal32> *>(&column_to))
|
||||||
|
write_data_to_column(casted_decimal_32_column, Decimal32(), Decimal32());
|
||||||
|
else if (ColumnDecimal<Decimal64> * casted_decimal_64_column = typeid_cast<ColumnDecimal<Decimal64> *>(&column_to))
|
||||||
|
write_data_to_column(casted_decimal_64_column, Decimal64(), Decimal64());
|
||||||
|
else if (ColumnDecimal<Decimal128> * casted_decimal_128_column = typeid_cast<ColumnDecimal<Decimal128> *>(&column_to))
|
||||||
|
write_data_to_column(casted_decimal_128_column, Decimal128(), Decimal128());
|
||||||
|
else if (ColumnDecimal<Decimal256> * casted_decimal_256_column = typeid_cast<ColumnDecimal<Decimal256> *>(&column_to))
|
||||||
|
write_data_to_column(casted_decimal_256_column, Decimal256(), Decimal256());
|
||||||
else if (ColumnInt32 * casted_int32_column = typeid_cast<ColumnInt32 *>(&column_to))
|
else if (ColumnInt32 * casted_int32_column = typeid_cast<ColumnInt32 *>(&column_to))
|
||||||
{
|
{
|
||||||
for (size_t index = 0; index < rows_data.size(); ++index)
|
for (size_t index = 0; index < rows_data.size(); ++index)
|
||||||
|
@ -90,7 +90,8 @@ namespace
|
|||||||
case ValueType::vtDateTime64:[[fallthrough]];
|
case ValueType::vtDateTime64:[[fallthrough]];
|
||||||
case ValueType::vtDecimal32: [[fallthrough]];
|
case ValueType::vtDecimal32: [[fallthrough]];
|
||||||
case ValueType::vtDecimal64: [[fallthrough]];
|
case ValueType::vtDecimal64: [[fallthrough]];
|
||||||
case ValueType::vtDecimal128:
|
case ValueType::vtDecimal128:[[fallthrough]];
|
||||||
|
case ValueType::vtDecimal256:
|
||||||
{
|
{
|
||||||
ReadBuffer buffer(const_cast<char *>(value.data()), value.size(), 0);
|
ReadBuffer buffer(const_cast<char *>(value.data()), value.size(), 0);
|
||||||
data_type.deserializeAsWholeText(column, buffer, FormatSettings{});
|
data_type.deserializeAsWholeText(column, buffer, FormatSettings{});
|
||||||
|
@ -99,6 +99,38 @@ def dml_with_materialize_mysql_database(clickhouse_node, mysql_node, service_nam
|
|||||||
mysql_node.query("DROP DATABASE test_database")
|
mysql_node.query("DROP DATABASE test_database")
|
||||||
|
|
||||||
|
|
||||||
|
def materialize_mysql_database_with_datetime_and_decimal(clickhouse_node, mysql_node, service_name):
|
||||||
|
mysql_node.query("CREATE DATABASE test_database DEFAULT CHARACTER SET 'utf8'")
|
||||||
|
mysql_node.query("CREATE TABLE test_database.test_table_1 (`key` INT NOT NULL PRIMARY KEY, _datetime DateTime(6), _timestamp TIMESTAMP(3), _decimal DECIMAL(65, 30)) ENGINE = InnoDB;")
|
||||||
|
mysql_node.query("INSERT INTO test_database.test_table_1 VALUES(1, '2020-01-01 01:02:03.999999', '2020-01-01 01:02:03.999', " + ('9' * 35) + "." + ('9' * 30) + ")")
|
||||||
|
mysql_node.query("INSERT INTO test_database.test_table_1 VALUES(2, '2020-01-01 01:02:03.000000', '2020-01-01 01:02:03.000', ." + ('0' * 29) + "1)")
|
||||||
|
mysql_node.query("INSERT INTO test_database.test_table_1 VALUES(3, '2020-01-01 01:02:03.9999', '2020-01-01 01:02:03.99', -" + ('9' * 35) + "." + ('9' * 30) + ")")
|
||||||
|
mysql_node.query("INSERT INTO test_database.test_table_1 VALUES(4, '2020-01-01 01:02:03.9999', '2020-01-01 01:02:03.9999', -." + ('0' * 29) + "1)")
|
||||||
|
|
||||||
|
clickhouse_node.query("CREATE DATABASE test_database ENGINE = MaterializeMySQL('{}:3306', 'test_database', 'root', 'clickhouse')".format(service_name))
|
||||||
|
assert "test_database" in clickhouse_node.query("SHOW DATABASES")
|
||||||
|
check_query(clickhouse_node, "SHOW TABLES FROM test_database FORMAT TSV", "test_table_1\n")
|
||||||
|
check_query(clickhouse_node, "SELECT * FROM test_database.test_table_1 ORDER BY key FORMAT TSV",
|
||||||
|
"1\t2020-01-01 01:02:03.999999\t2020-01-01 01:02:03.999\t" + ('9' * 35) + "." + ('9' * 30) + "\n"
|
||||||
|
"2\t2020-01-01 01:02:03.000000\t2020-01-01 01:02:03.000\t0." + ('0' * 29) + "1\n"
|
||||||
|
"3\t2020-01-01 01:02:03.999900\t2020-01-01 01:02:03.990\t-" + ('9' * 35) + "." + ('9' * 30) + "\n"
|
||||||
|
"4\t2020-01-01 01:02:03.999900\t2020-01-01 01:02:04.000\t-0." + ('0' * 29) + "1\n")
|
||||||
|
|
||||||
|
mysql_node.query("CREATE TABLE test_database.test_table_2 (`key` INT NOT NULL PRIMARY KEY, _datetime DateTime(6), _timestamp TIMESTAMP(3), _decimal DECIMAL(65, 30)) ENGINE = InnoDB;")
|
||||||
|
mysql_node.query("INSERT INTO test_database.test_table_2 VALUES(1, '2020-01-01 01:02:03.999999', '2020-01-01 01:02:03.999', " + ('9' * 35) + "." + ('9' * 30) + ")")
|
||||||
|
mysql_node.query("INSERT INTO test_database.test_table_2 VALUES(2, '2020-01-01 01:02:03.000000', '2020-01-01 01:02:03.000', ." + ('0' * 29) + "1)")
|
||||||
|
mysql_node.query("INSERT INTO test_database.test_table_2 VALUES(3, '2020-01-01 01:02:03.9999', '2020-01-01 01:02:03.99', -" + ('9' * 35) + "." + ('9' * 30) + ")")
|
||||||
|
mysql_node.query("INSERT INTO test_database.test_table_2 VALUES(4, '2020-01-01 01:02:03.9999', '2020-01-01 01:02:03.9999', -." + ('0' * 29) + "1)")
|
||||||
|
check_query(clickhouse_node, "SELECT * FROM test_database.test_table_2 ORDER BY key FORMAT TSV",
|
||||||
|
"1\t2020-01-01 01:02:03.999999\t2020-01-01 01:02:03.999\t" + ('9' * 35) + "." + ('9' * 30) + "\n"
|
||||||
|
"2\t2020-01-01 01:02:03.000000\t2020-01-01 01:02:03.000\t0." + ('0' * 29) + "1\n"
|
||||||
|
"3\t2020-01-01 01:02:03.999900\t2020-01-01 01:02:03.990\t-" + ('9' * 35) + "." + ('9' * 30) + "\n"
|
||||||
|
"4\t2020-01-01 01:02:03.999900\t2020-01-01 01:02:04.000\t-0." + ('0' * 29) + "1\n")
|
||||||
|
clickhouse_node.query("DROP DATABASE test_database")
|
||||||
|
mysql_node.query("DROP DATABASE test_database")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def drop_table_with_materialize_mysql_database(clickhouse_node, mysql_node, service_name):
|
def drop_table_with_materialize_mysql_database(clickhouse_node, mysql_node, service_name):
|
||||||
mysql_node.query("CREATE DATABASE test_database DEFAULT CHARACTER SET 'utf8'")
|
mysql_node.query("CREATE DATABASE test_database DEFAULT CHARACTER SET 'utf8'")
|
||||||
mysql_node.query("CREATE TABLE test_database.test_table_1 (id INT NOT NULL PRIMARY KEY) ENGINE = InnoDB;")
|
mysql_node.query("CREATE TABLE test_database.test_table_1 (id INT NOT NULL PRIMARY KEY) ENGINE = InnoDB;")
|
||||||
|
@ -94,10 +94,13 @@ def started_mysql_8_0():
|
|||||||
|
|
||||||
def test_materialize_database_dml_with_mysql_5_7(started_cluster, started_mysql_5_7):
|
def test_materialize_database_dml_with_mysql_5_7(started_cluster, started_mysql_5_7):
|
||||||
materialize_with_ddl.dml_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1")
|
materialize_with_ddl.dml_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql1")
|
||||||
|
materialize_with_ddl.materialize_mysql_database_with_datetime_and_decimal(clickhouse_node, started_mysql_5_7, "mysql1")
|
||||||
|
|
||||||
|
|
||||||
def test_materialize_database_dml_with_mysql_8_0(started_cluster, started_mysql_8_0):
|
def test_materialize_database_dml_with_mysql_8_0(started_cluster, started_mysql_8_0):
|
||||||
materialize_with_ddl.dml_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql8_0")
|
materialize_with_ddl.dml_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql8_0")
|
||||||
|
materialize_with_ddl.materialize_mysql_database_with_datetime_and_decimal(clickhouse_node, started_mysql_8_0, "mysql8_0")
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def test_materialize_database_ddl_with_mysql_5_7(started_cluster, started_mysql_5_7):
|
def test_materialize_database_ddl_with_mysql_5_7(started_cluster, started_mysql_5_7):
|
||||||
|
Loading…
Reference in New Issue
Block a user