MySql datatypes dateTime64 and decimal (#11512)

This commit is contained in:
Vasily Nemkov 2020-09-09 15:18:02 +03:00 committed by GitHub
parent f5bef34be6
commit 3973a17530
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 1137 additions and 179 deletions

View File

@ -1,9 +1,7 @@
#pragma once
#include <boost/noncopyable.hpp>
#include <mysqlxx/Types.h>
namespace mysqlxx
{
@ -22,6 +20,11 @@ class ResultBase
public:
ResultBase(MYSQL_RES * res_, Connection * conn_, const Query * query_);
ResultBase(const ResultBase &) = delete;
ResultBase & operator=(const ResultBase &) = delete;
ResultBase(ResultBase &&) = default;
ResultBase & operator=(ResultBase &&) = default;
Connection * getConnection() { return conn; }
MYSQL_FIELDS getFields() { return fields; }
unsigned getNumFields() { return num_fields; }

View File

@ -254,7 +254,23 @@ template <> inline std::string Value::get<std::string >() cons
template <> inline LocalDate Value::get<LocalDate >() const { return getDate(); }
template <> inline LocalDateTime Value::get<LocalDateTime >() const { return getDateTime(); }
template <typename T> inline T Value::get() const { return T(*this); }
namespace details
{
// To avoid stack overflow when converting to type with no appropriate c-tor,
// resulting in endless recursive calls from `Value::get<T>()` to `Value::operator T()` to `Value::get<T>()` to ...
template <typename T, typename std::enable_if_t<std::is_constructible_v<T, Value>>>
inline T contructFromValue(const Value & val)
{
return T(val);
}
}
template <typename T>
inline T Value::get() const
{
return details::contructFromValue<T>(*this);
}
inline std::ostream & operator<< (std::ostream & ostr, const Value & x)

View File

@ -15,6 +15,7 @@ namespace DB
namespace ErrorCodes
{
extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH;
extern const int UNKNOWN_TYPE;
}
@ -86,6 +87,8 @@ namespace
case ValueType::vtUUID:
assert_cast<ColumnUInt128 &>(column).insert(parse<UUID>(value.convert<std::string>()));
break;
default:
throw Exception("Unsupported value type", ErrorCodes::UNKNOWN_TYPE);
}
}

View File

@ -13,6 +13,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_TYPE;
}
namespace
{
using ValueType = ExternalResultDescription::ValueType;
@ -79,6 +84,9 @@ namespace
return Poco::Dynamic::Var(std::to_string(LocalDateTime(time_t(field.get<UInt64>())))).convert<String>();
case ValueType::vtUUID:
return Poco::Dynamic::Var(UUID(field.get<UInt128>()).toUnderType().toHexString()).convert<std::string>();
default:
throw Exception("Unsupported value type", ErrorCodes::UNKNOWN_TYPE);
}
__builtin_unreachable();
}

View File

@ -507,6 +507,7 @@ namespace ErrorCodes
extern const int CANNOT_DECLARE_RABBITMQ_EXCHANGE = 540;
extern const int CANNOT_CREATE_RABBITMQ_QUEUE_BINDING = 541;
extern const int CANNOT_REMOVE_RABBITMQ_EXCHANGE = 542;
extern const int UNKNOWN_MYSQL_DATATYPES_SUPPORT_LEVEL = 543;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

View File

@ -1,9 +1,11 @@
#include "ExternalResultDescription.h"
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeEnum.h>
#include <Common/typeid_cast.h>
@ -64,6 +66,14 @@ void ExternalResultDescription::init(const Block & sample_block_)
types.emplace_back(ValueType::vtString, is_nullable);
else if (typeid_cast<const DataTypeEnum16 *>(type))
types.emplace_back(ValueType::vtString, is_nullable);
else if (typeid_cast<const DataTypeDateTime64 *>(type))
types.emplace_back(ValueType::vtDateTime64, is_nullable);
else if (typeid_cast<const DataTypeDecimal<Decimal32> *>(type))
types.emplace_back(ValueType::vtDecimal32, is_nullable);
else if (typeid_cast<const DataTypeDecimal<Decimal64> *>(type))
types.emplace_back(ValueType::vtDecimal64, is_nullable);
else if (typeid_cast<const DataTypeDecimal<Decimal128> *>(type))
types.emplace_back(ValueType::vtDecimal128, is_nullable);
else
throw Exception{"Unsupported type " + type->getName(), ErrorCodes::UNKNOWN_TYPE};
}

View File

@ -26,6 +26,10 @@ struct ExternalResultDescription
vtDate,
vtDateTime,
vtUUID,
vtDateTime64,
vtDecimal32,
vtDecimal64,
vtDecimal128
};
Block sample_block;

99
src/Core/MultiEnum.h Normal file
View File

@ -0,0 +1,99 @@
#pragma once
#include <cstdint>
#include <type_traits>
// Wrapper around enum that can have multiple values (or none) set at once.
template <typename EnumTypeT, typename StorageTypeT = std::uint64_t>
struct MultiEnum
{
using StorageType = StorageTypeT;
using EnumType = EnumTypeT;
MultiEnum() = default;
template <typename ... EnumValues, typename = std::enable_if_t<std::conjunction_v<std::is_same<EnumTypeT, EnumValues>...>>>
explicit MultiEnum(EnumValues ... v)
: MultiEnum((toBitFlag(v) | ... | 0u))
{}
template <typename ValueType, typename = std::enable_if_t<std::is_convertible_v<ValueType, StorageType>>>
explicit MultiEnum(ValueType v)
: bitset(v)
{
static_assert(std::is_unsigned_v<ValueType>);
static_assert(std::is_unsigned_v<StorageType> && std::is_integral_v<StorageType>);
}
MultiEnum(const MultiEnum & other) = default;
MultiEnum & operator=(const MultiEnum & other) = default;
bool isSet(EnumType value) const
{
return bitset & toBitFlag(value);
}
void set(EnumType value)
{
bitset |= toBitFlag(value);
}
void unSet(EnumType value)
{
bitset &= ~(toBitFlag(value));
}
void reset()
{
bitset = 0;
}
StorageType getValue() const
{
return bitset;
}
template <typename ValueType, typename = std::enable_if_t<std::is_convertible_v<ValueType, StorageType>>>
void setValue(ValueType new_value)
{
// Can't set value from any enum avoid confusion
static_assert(!std::is_enum_v<ValueType>);
bitset = new_value;
}
bool operator==(const MultiEnum & other) const
{
return bitset == other.bitset;
}
template <typename ValueType, typename = std::enable_if_t<std::is_convertible_v<ValueType, StorageType>>>
bool operator==(ValueType other) const
{
// Shouldn't be comparable with any enum to avoid confusion
static_assert(!std::is_enum_v<ValueType>);
return bitset == other;
}
template <typename U>
bool operator!=(U && other) const
{
return !(*this == other);
}
template <typename ValueType, typename = std::enable_if_t<std::is_convertible_v<ValueType, StorageType>>>
friend bool operator==(ValueType left, MultiEnum right)
{
return right == left;
}
template <typename L>
friend bool operator!=(L left, MultiEnum right)
{
return !(right == left);
}
private:
StorageType bitset = 0;
static StorageType toBitFlag(EnumType v) { return StorageType{1} << static_cast<StorageType>(v); }
};

View File

@ -382,6 +382,7 @@ class IColumn;
M(Bool, alter_partition_verbose_result, false, "Output information about affected parts. Currently works only for FREEZE and ATTACH commands.", 0) \
M(Bool, allow_experimental_database_materialize_mysql, false, "Allow to create database with Engine=MaterializeMySQL(...).", 0) \
M(Bool, system_events_show_zero_values, false, "Include all metrics, even with zero values", 0) \
M(MySQLDataTypesSupport, mysql_datatypes_support_level, 0, "Which MySQL types should be converted to corresponding ClickHouse types (rather than being represented as String). Can be empty or any combination of 'decimal' or 'datetime64'. When empty MySQL's DECIMAL and DATETIME/TIMESTAMP with non-zero precison are seen as String on ClickHouse's side.", 0) \
\
/** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \
\

View File

@ -11,6 +11,7 @@ namespace ErrorCodes
extern const int UNKNOWN_DISTRIBUTED_PRODUCT_MODE;
extern const int UNKNOWN_JOIN;
extern const int BAD_ARGUMENTS;
extern const int UNKNOWN_MYSQL_DATATYPES_SUPPORT_LEVEL;
}
@ -91,4 +92,8 @@ IMPLEMENT_SETTING_ENUM_WITH_RENAME(DefaultDatabaseEngine, ErrorCodes::BAD_ARGUME
{{"Ordinary", DefaultDatabaseEngine::Ordinary},
{"Atomic", DefaultDatabaseEngine::Atomic}})
IMPLEMENT_SETTING_MULTI_ENUM(MySQLDataTypesSupport, ErrorCodes::UNKNOWN_MYSQL_DATATYPES_SUPPORT_LEVEL,
{{"decimal", MySQLDataTypesSupport::DECIMAL},
{"datetime64", MySQLDataTypesSupport::DATETIME64}})
}

View File

@ -126,4 +126,15 @@ enum class DefaultDatabaseEngine
};
DECLARE_SETTING_ENUM(DefaultDatabaseEngine)
enum class MySQLDataTypesSupport
{
DECIMAL, // convert MySQL's decimal and number to ClickHouse Decimal when applicable
DATETIME64, // convert MySQL's DATETIME and TIMESTAMP and ClickHouse DateTime64 if precision is > 0 or range is greater that for DateTime.
// ENUM
};
DECLARE_SETTING_MULTI_ENUM(MySQLDataTypesSupport)
}

View File

@ -4,9 +4,11 @@
#include <Poco/URI.h>
#include <Core/Types.h>
#include <Core/Field.h>
#include <Core/MultiEnum.h>
#include <boost/range/adaptor/map.hpp>
#include <chrono>
#include <unordered_map>
#include <string_view>
namespace DB
@ -328,6 +330,113 @@ void SettingFieldEnum<EnumT, Traits>::readBinary(ReadBuffer & in)
throw Exception(msg, ERROR_CODE_FOR_UNEXPECTED_NAME); \
}
// Mostly like SettingFieldEnum, but can have multiple enum values (or none) set at once.
template <typename Enum, typename Traits>
struct SettingFieldMultiEnum
{
using EnumType = Enum;
using ValueType = MultiEnum<Enum>;
using StorageType = typename ValueType::StorageType;
ValueType value;
bool changed = false;
explicit SettingFieldMultiEnum(ValueType v = ValueType{}) : value{v} {}
explicit SettingFieldMultiEnum(EnumType e) : value{e} {}
explicit SettingFieldMultiEnum(StorageType s) : value(s) {}
explicit SettingFieldMultiEnum(const Field & f) : value(parseValueFromString(f.safeGet<const String &>())) {}
operator ValueType() const { return value; }
explicit operator StorageType() const { return value.getValue(); }
explicit operator Field() const { return toString(); }
SettingFieldMultiEnum & operator= (StorageType x) { changed = x != value.getValue(); value.setValue(x); return *this; }
SettingFieldMultiEnum & operator= (ValueType x) { changed = !(x == value); value = x; return *this; }
SettingFieldMultiEnum & operator= (const Field & x) { parseFromString(x.safeGet<const String &>()); return *this; }
String toString() const
{
static const String separator = ",";
String result;
for (StorageType i = 0; i < Traits::getEnumSize(); ++i)
{
const auto v = static_cast<Enum>(i);
if (value.isSet(v))
{
result += Traits::toString(v);
result += separator;
}
}
if (result.size() > 0)
result.erase(result.size() - separator.size());
return result;
}
void parseFromString(const String & str) { *this = parseValueFromString(str); }
void writeBinary(WriteBuffer & out) const;
void readBinary(ReadBuffer & in);
private:
static ValueType parseValueFromString(const std::string_view str)
{
static const String separators=", ";
ValueType result;
//to avoid allocating memory on substr()
const std::string_view str_view{str};
auto value_start = str_view.find_first_not_of(separators);
while (value_start != std::string::npos)
{
auto value_end = str_view.find_first_of(separators, value_start + 1);
if (value_end == std::string::npos)
value_end = str_view.size();
result.set(Traits::fromString(str_view.substr(value_start, value_end - value_start)));
value_start = str_view.find_first_not_of(separators, value_end);
}
return result;
}
};
template <typename EnumT, typename Traits>
void SettingFieldMultiEnum<EnumT, Traits>::writeBinary(WriteBuffer & out) const
{
SettingFieldEnumHelpers::writeBinary(toString(), out);
}
template <typename EnumT, typename Traits>
void SettingFieldMultiEnum<EnumT, Traits>::readBinary(ReadBuffer & in)
{
parseFromString(SettingFieldEnumHelpers::readBinary(in));
}
#define DECLARE_SETTING_MULTI_ENUM(ENUM_TYPE) \
DECLARE_SETTING_MULTI_ENUM_WITH_RENAME(ENUM_TYPE, ENUM_TYPE)
#define DECLARE_SETTING_MULTI_ENUM_WITH_RENAME(ENUM_TYPE, NEW_NAME) \
struct SettingField##NEW_NAME##Traits \
{ \
using EnumType = ENUM_TYPE; \
static size_t getEnumSize(); \
static const String & toString(EnumType value); \
static EnumType fromString(const std::string_view & str); \
}; \
\
using SettingField##NEW_NAME = SettingFieldMultiEnum<ENUM_TYPE, SettingField##NEW_NAME##Traits>;
#define IMPLEMENT_SETTING_MULTI_ENUM(ENUM_TYPE, ERROR_CODE_FOR_UNEXPECTED_NAME, ...) \
IMPLEMENT_SETTING_MULTI_ENUM_WITH_RENAME(ENUM_TYPE, ERROR_CODE_FOR_UNEXPECTED_NAME, __VA_ARGS__)
#define IMPLEMENT_SETTING_MULTI_ENUM_WITH_RENAME(NEW_NAME, ERROR_CODE_FOR_UNEXPECTED_NAME, ...) \
IMPLEMENT_SETTING_ENUM_WITH_RENAME(NEW_NAME, ERROR_CODE_FOR_UNEXPECTED_NAME, __VA_ARGS__)\
size_t SettingField##NEW_NAME##Traits::getEnumSize() {\
return std::initializer_list<std::pair<const char*, NEW_NAME>> __VA_ARGS__ .size();\
}
/// Can keep a value of any type. Used for user-defined settings.
struct SettingFieldCustom

View File

@ -0,0 +1,158 @@
#include <gtest/gtest.h>
#include <Core/Types.h>
#include <type_traits>
#include <Core/MultiEnum.h>
namespace
{
using namespace DB;
enum class TestEnum : UInt8
{
// name represents which bit is going to be set
ZERO,
ONE,
TWO,
THREE,
FOUR,
FIVE
};
}
GTEST_TEST(MultiEnum, WithDefault)
{
MultiEnum<TestEnum, UInt8> multi_enum;
ASSERT_EQ(0, multi_enum.getValue());
ASSERT_EQ(0, multi_enum);
ASSERT_FALSE(multi_enum.isSet(TestEnum::ZERO));
ASSERT_FALSE(multi_enum.isSet(TestEnum::ONE));
ASSERT_FALSE(multi_enum.isSet(TestEnum::TWO));
ASSERT_FALSE(multi_enum.isSet(TestEnum::THREE));
ASSERT_FALSE(multi_enum.isSet(TestEnum::FOUR));
ASSERT_FALSE(multi_enum.isSet(TestEnum::FIVE));
}
GTEST_TEST(MultiEnum, WitheEnum)
{
MultiEnum<TestEnum, UInt8> multi_enum(TestEnum::FOUR);
ASSERT_EQ(16, multi_enum.getValue());
ASSERT_EQ(16, multi_enum);
ASSERT_FALSE(multi_enum.isSet(TestEnum::ZERO));
ASSERT_FALSE(multi_enum.isSet(TestEnum::ONE));
ASSERT_FALSE(multi_enum.isSet(TestEnum::TWO));
ASSERT_FALSE(multi_enum.isSet(TestEnum::THREE));
ASSERT_TRUE(multi_enum.isSet(TestEnum::FOUR));
ASSERT_FALSE(multi_enum.isSet(TestEnum::FIVE));
}
GTEST_TEST(MultiEnum, WithValue)
{
const MultiEnum<TestEnum> multi_enum(13u); // (1 | (1 << 2 | 1 << 3)
ASSERT_TRUE(multi_enum.isSet(TestEnum::ZERO));
ASSERT_FALSE(multi_enum.isSet(TestEnum::ONE));
ASSERT_TRUE(multi_enum.isSet(TestEnum::TWO));
ASSERT_TRUE(multi_enum.isSet(TestEnum::THREE));
ASSERT_FALSE(multi_enum.isSet(TestEnum::FOUR));
ASSERT_FALSE(multi_enum.isSet(TestEnum::FIVE));
}
GTEST_TEST(MultiEnum, WithMany)
{
MultiEnum<TestEnum> multi_enum{TestEnum::ONE, TestEnum::FIVE};
ASSERT_EQ(1 << 1 | 1 << 5, multi_enum.getValue());
ASSERT_EQ(1 << 1 | 1 << 5, multi_enum);
ASSERT_FALSE(multi_enum.isSet(TestEnum::ZERO));
ASSERT_TRUE(multi_enum.isSet(TestEnum::ONE));
ASSERT_FALSE(multi_enum.isSet(TestEnum::TWO));
ASSERT_FALSE(multi_enum.isSet(TestEnum::THREE));
ASSERT_FALSE(multi_enum.isSet(TestEnum::FOUR));
ASSERT_TRUE(multi_enum.isSet(TestEnum::FIVE));
}
GTEST_TEST(MultiEnum, WithCopyConstructor)
{
const MultiEnum<TestEnum> multi_enum_source{TestEnum::ONE, TestEnum::FIVE};
MultiEnum<TestEnum> multi_enum{multi_enum_source};
ASSERT_EQ(1 << 1 | 1 << 5, multi_enum.getValue());
}
GTEST_TEST(MultiEnum, SetAndUnSet)
{
MultiEnum<TestEnum> multi_enum;
multi_enum.set(TestEnum::ONE);
ASSERT_EQ(1 << 1, multi_enum);
multi_enum.set(TestEnum::TWO);
ASSERT_EQ(1 << 1| (1 << 2), multi_enum);
multi_enum.unSet(TestEnum::ONE);
ASSERT_EQ(1 << 2, multi_enum);
}
GTEST_TEST(MultiEnum, SetValueOnDifferentTypes)
{
MultiEnum<TestEnum> multi_enum;
multi_enum.setValue(static_cast<UInt8>(1));
ASSERT_EQ(1, multi_enum);
multi_enum.setValue(static_cast<UInt16>(2));
ASSERT_EQ(2, multi_enum);
multi_enum.setValue(static_cast<UInt32>(3));
ASSERT_EQ(3, multi_enum);
multi_enum.setValue(static_cast<UInt64>(4));
ASSERT_EQ(4, multi_enum);
}
// shouldn't compile
//GTEST_TEST(MultiEnum, WithOtherEnumType)
//{
// MultiEnum<TestEnum> multi_enum;
// enum FOO {BAR, FOOBAR};
// MultiEnum<TestEnum> multi_enum2(BAR);
// MultiEnum<TestEnum> multi_enum3(BAR, FOOBAR);
// multi_enum.setValue(FOO::BAR);
// multi_enum == FOO::BAR;
// FOO::BAR == multi_enum;
//}
GTEST_TEST(MultiEnum, SetSameValueMultipleTimes)
{
// Setting same value is idempotent.
MultiEnum<TestEnum> multi_enum;
multi_enum.set(TestEnum::ONE);
ASSERT_EQ(1 << 1, multi_enum);
multi_enum.set(TestEnum::ONE);
ASSERT_EQ(1 << 1, multi_enum);
}
GTEST_TEST(MultiEnum, UnSetValuesThatWerentSet)
{
// Unsetting values that weren't set shouldn't change other flags nor aggregate value.
MultiEnum<TestEnum> multi_enum{TestEnum::ONE, TestEnum::THREE};
multi_enum.unSet(TestEnum::TWO);
ASSERT_EQ(1 << 1 | 1 << 3, multi_enum);
multi_enum.unSet(TestEnum::FOUR);
ASSERT_EQ(1 << 1 | 1 << 3, multi_enum);
multi_enum.unSet(TestEnum::FIVE);
ASSERT_EQ(1 << 1 | 1 << 3, multi_enum);
}
GTEST_TEST(MultiEnum, Reset)
{
MultiEnum<TestEnum> multi_enum{TestEnum::ONE, TestEnum::THREE};
multi_enum.reset();
ASSERT_EQ(0, multi_enum);
}

View File

@ -0,0 +1,146 @@
#include <gtest/gtest.h>
#include <Core/SettingsFields.h>
#include <Core/SettingsEnums.h>
#include <Core/Field.h>
namespace
{
using namespace DB;
using SettingMySQLDataTypesSupport = SettingFieldMultiEnum<MySQLDataTypesSupport, SettingFieldMySQLDataTypesSupportTraits>;
}
namespace DB
{
template <typename Enum, typename Traits>
bool operator== (const SettingFieldMultiEnum<Enum, Traits> & setting, const Field & f)
{
return Field(setting) == f;
}
template <typename Enum, typename Traits>
bool operator== (const Field & f, const SettingFieldMultiEnum<Enum, Traits> & setting)
{
return f == Field(setting);
}
}
GTEST_TEST(MySQLDataTypesSupport, WithDefault)
{
// Setting can be default-initialized and that means all values are unset.
const SettingMySQLDataTypesSupport setting;
ASSERT_EQ(0, setting.value.getValue());
ASSERT_EQ("", setting.toString());
ASSERT_EQ(setting, Field(""));
ASSERT_FALSE(setting.value.isSet(MySQLDataTypesSupport::DECIMAL));
ASSERT_FALSE(setting.value.isSet(MySQLDataTypesSupport::DATETIME64));
}
GTEST_TEST(SettingMySQLDataTypesSupport, WithDECIMAL)
{
// Setting can be initialized with MySQLDataTypesSupport::DECIMAL
// and this value can be obtained in varios forms with getters.
const SettingMySQLDataTypesSupport setting(MySQLDataTypesSupport::DECIMAL);
ASSERT_EQ(1, setting.value.getValue());
ASSERT_TRUE(setting.value.isSet(MySQLDataTypesSupport::DECIMAL));
ASSERT_FALSE(setting.value.isSet(MySQLDataTypesSupport::DATETIME64));
ASSERT_EQ("decimal", setting.toString());
ASSERT_EQ(Field("decimal"), setting);
}
GTEST_TEST(SettingMySQLDataTypesSupport, With1)
{
// Setting can be initialized with int value corresponding to DECIMAL
// and rest of the test is the same as for that value.
const SettingMySQLDataTypesSupport setting(1u);
ASSERT_EQ(1, setting.value.getValue());
ASSERT_TRUE(setting.value.isSet(MySQLDataTypesSupport::DECIMAL));
ASSERT_FALSE(setting.value.isSet(MySQLDataTypesSupport::DATETIME64));
ASSERT_EQ("decimal", setting.toString());
ASSERT_EQ(Field("decimal"), setting);
}
GTEST_TEST(SettingMySQLDataTypesSupport, WithMultipleValues)
{
// Setting can be initialized with int value corresponding to (DECIMAL | DATETIME64)
const SettingMySQLDataTypesSupport setting(3u);
ASSERT_EQ(3, setting.value.getValue());
ASSERT_TRUE(setting.value.isSet(MySQLDataTypesSupport::DECIMAL));
ASSERT_TRUE(setting.value.isSet(MySQLDataTypesSupport::DATETIME64));
ASSERT_EQ("decimal,datetime64", setting.toString());
ASSERT_EQ(Field("decimal,datetime64"), setting);
}
GTEST_TEST(SettingMySQLDataTypesSupport, SetString)
{
SettingMySQLDataTypesSupport setting;
setting = String("decimal");
ASSERT_TRUE(setting.changed);
ASSERT_TRUE(setting.value.isSet(MySQLDataTypesSupport::DECIMAL));
ASSERT_FALSE(setting.value.isSet(MySQLDataTypesSupport::DATETIME64));
ASSERT_EQ("decimal", setting.toString());
ASSERT_EQ(Field("decimal"), setting);
setting = "datetime64,decimal";
ASSERT_TRUE(setting.changed);
ASSERT_TRUE(setting.value.isSet(MySQLDataTypesSupport::DECIMAL));
ASSERT_TRUE(setting.value.isSet(MySQLDataTypesSupport::DATETIME64));
ASSERT_EQ("decimal,datetime64", setting.toString());
ASSERT_EQ(Field("decimal,datetime64"), setting);
// comma with spaces
setting = " datetime64 , decimal ";
ASSERT_FALSE(setting.changed); // false since value is the same as previous one.
ASSERT_TRUE(setting.value.isSet(MySQLDataTypesSupport::DECIMAL));
ASSERT_TRUE(setting.value.isSet(MySQLDataTypesSupport::DATETIME64));
ASSERT_EQ("decimal,datetime64", setting.toString());
ASSERT_EQ(Field("decimal,datetime64"), setting);
setting = String(",,,,,,,, ,decimal");
ASSERT_TRUE(setting.changed);
ASSERT_TRUE(setting.value.isSet(MySQLDataTypesSupport::DECIMAL));
ASSERT_FALSE(setting.value.isSet(MySQLDataTypesSupport::DATETIME64));
ASSERT_EQ("decimal", setting.toString());
ASSERT_EQ(Field("decimal"), setting);
setting = String(",decimal,decimal,decimal,decimal,decimal,decimal,decimal,decimal,decimal,");
ASSERT_FALSE(setting.changed); //since previous value was DECIMAL
ASSERT_TRUE(setting.value.isSet(MySQLDataTypesSupport::DECIMAL));
ASSERT_FALSE(setting.value.isSet(MySQLDataTypesSupport::DATETIME64));
ASSERT_EQ("decimal", setting.toString());
ASSERT_EQ(Field("decimal"), setting);
setting = String("");
ASSERT_TRUE(setting.changed);
ASSERT_FALSE(setting.value.isSet(MySQLDataTypesSupport::DECIMAL));
ASSERT_FALSE(setting.value.isSet(MySQLDataTypesSupport::DATETIME64));
ASSERT_EQ("", setting.toString());
ASSERT_EQ(Field(""), setting);
}
GTEST_TEST(SettingMySQLDataTypesSupport, SetInvalidString)
{
// Setting can be initialized with int value corresponding to (DECIMAL | DATETIME64)
SettingMySQLDataTypesSupport setting;
EXPECT_THROW(setting = String("FOOBAR"), Exception);
ASSERT_FALSE(setting.changed);
ASSERT_EQ(0, setting.value.getValue());
EXPECT_THROW(setting = String("decimal,datetime64,123"), Exception);
ASSERT_FALSE(setting.changed);
ASSERT_EQ(0, setting.value.getValue());
EXPECT_NO_THROW(setting = String(", "));
ASSERT_FALSE(setting.changed);
ASSERT_EQ(0, setting.value.getValue());
}

View File

@ -37,6 +37,7 @@ namespace ErrorCodes
extern const int TYPE_MISMATCH;
extern const int MONGODB_CANNOT_AUTHENTICATE;
extern const int NOT_FOUND_COLUMN_IN_BLOCK;
extern const int UNKNOWN_TYPE;
}
@ -298,6 +299,8 @@ namespace
ErrorCodes::TYPE_MISMATCH};
break;
}
default:
throw Exception("Value of unsupported type:" + column.getName(), ErrorCodes::UNKNOWN_TYPE);
}
}

View File

@ -2,11 +2,16 @@
#include <Core/Field.h>
#include <Core/Types.h>
#include <Core/MultiEnum.h>
#include <Core/SettingsEnums.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/IAST.h>
#include "DataTypeDate.h"
#include "DataTypeDateTime.h"
#include "DataTypeDateTime64.h"
#include "DataTypeEnum.h"
#include "DataTypesDecimal.h"
#include "DataTypeFixedString.h"
#include "DataTypeNullable.h"
#include "DataTypeString.h"
@ -25,52 +30,88 @@ ASTPtr dataTypeConvertToQuery(const DataTypePtr & data_type)
return makeASTFunction("Nullable", dataTypeConvertToQuery(typeid_cast<const DataTypeNullable *>(data_type.get())->getNestedType()));
}
DataTypePtr convertMySQLDataType(const std::string & mysql_data_type, bool is_nullable, bool is_unsigned, size_t length)
DataTypePtr convertMySQLDataType(MultiEnum<MySQLDataTypesSupport> type_support,
const std::string & mysql_data_type,
bool is_nullable,
bool is_unsigned,
size_t length,
size_t precision,
size_t scale)
{
DataTypePtr res;
if (mysql_data_type == "tinyint")
{
if (is_unsigned)
res = std::make_shared<DataTypeUInt8>();
else
res = std::make_shared<DataTypeInt8>();
}
else if (mysql_data_type == "smallint")
{
if (is_unsigned)
res = std::make_shared<DataTypeUInt16>();
else
res = std::make_shared<DataTypeInt16>();
}
else if (mysql_data_type == "int" || mysql_data_type == "mediumint")
{
if (is_unsigned)
res = std::make_shared<DataTypeUInt32>();
else
res = std::make_shared<DataTypeInt32>();
}
else if (mysql_data_type == "bigint")
{
if (is_unsigned)
res = std::make_shared<DataTypeUInt64>();
else
res = std::make_shared<DataTypeInt64>();
}
else if (mysql_data_type == "float")
res = std::make_shared<DataTypeFloat32>();
else if (mysql_data_type == "double")
res = std::make_shared<DataTypeFloat64>();
else if (mysql_data_type == "date")
res = std::make_shared<DataTypeDate>();
else if (mysql_data_type == "datetime" || mysql_data_type == "timestamp")
res = std::make_shared<DataTypeDateTime>();
else if (mysql_data_type == "binary")
res = std::make_shared<DataTypeFixedString>(length);
else
// we expect mysql_data_type to be either "basic_type" or "type_with_params(param1, param2, ...)"
auto data_type = std::string_view(mysql_data_type);
const auto param_start_pos = data_type.find("(");
const auto type_name = data_type.substr(0, param_start_pos);
DataTypePtr res = [&]() -> DataTypePtr {
if (type_name == "tinyint")
{
if (is_unsigned)
return std::make_shared<DataTypeUInt8>();
else
return std::make_shared<DataTypeInt8>();
}
if (type_name == "smallint")
{
if (is_unsigned)
return std::make_shared<DataTypeUInt16>();
else
return std::make_shared<DataTypeInt16>();
}
if (type_name == "int" || type_name == "mediumint")
{
if (is_unsigned)
return std::make_shared<DataTypeUInt32>();
else
return std::make_shared<DataTypeInt32>();
}
if (type_name == "bigint")
{
if (is_unsigned)
return std::make_shared<DataTypeUInt64>();
else
return std::make_shared<DataTypeInt64>();
}
if (type_name == "float")
return std::make_shared<DataTypeFloat32>();
if (type_name == "double")
return std::make_shared<DataTypeFloat64>();
if (type_name == "date")
return std::make_shared<DataTypeDate>();
if (type_name == "binary")
return std::make_shared<DataTypeFixedString>(length);
if (type_name == "datetime" || type_name == "timestamp")
{
if (!type_support.isSet(MySQLDataTypesSupport::DATETIME64))
return std::make_shared<DataTypeDateTime>();
if (type_name == "timestamp" && scale == 0)
{
return std::make_shared<DataTypeDateTime>();
}
else if (type_name == "datetime" || type_name == "timestamp")
{
return std::make_shared<DataTypeDateTime64>(scale);
}
}
if (type_support.isSet(MySQLDataTypesSupport::DECIMAL) && (type_name == "numeric" || type_name == "decimal"))
{
if (precision <= DecimalUtils::maxPrecision<Decimal32>())
return std::make_shared<DataTypeDecimal<Decimal32>>(precision, scale);
else if (precision <= DecimalUtils::maxPrecision<Decimal64>())
return std::make_shared<DataTypeDecimal<Decimal64>>(precision, scale);
else if (precision <= DecimalUtils::maxPrecision<Decimal128>())
return std::make_shared<DataTypeDecimal<Decimal128>>(precision, scale);
}
/// Also String is fallback for all unknown types.
res = std::make_shared<DataTypeString>();
return std::make_shared<DataTypeString>();
}();
if (is_nullable)
res = std::make_shared<DataTypeNullable>(res);
return res;
}

View File

@ -1,17 +1,20 @@
#pragma once
#include <string>
#include <Core/MultiEnum.h>
#include <Parsers/IAST.h>
#include "IDataType.h"
namespace DB
{
enum class MySQLDataTypesSupport;
/// Convert data type to query. for example
/// DataTypeUInt8 -> ASTIdentifier(UInt8)
/// DataTypeNullable(DataTypeUInt8) -> ASTFunction(ASTIdentifier(UInt8))
ASTPtr dataTypeConvertToQuery(const DataTypePtr & data_type);
/// Convert MySQL type to ClickHouse data type.
DataTypePtr convertMySQLDataType(const std::string & mysql_data_type, bool is_nullable, bool is_unsigned, size_t length);
DataTypePtr convertMySQLDataType(MultiEnum<MySQLDataTypesSupport> type_support, const std::string & mysql_data_type, bool is_nullable, bool is_unsigned, size_t length, size_t precision, size_t scale);
}

View File

@ -0,0 +1,101 @@
#include <Columns/IColumn.h>
#include <Core/Field.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/IDataType.h>
#include <DataTypes/getLeastSupertype.h>
#include <DataTypes/getMostSubtype.h>
#include <Formats/FormatSettings.h>
#include <IO/ReadBuffer.h>
#pragma GCC diagnostic ignored "-Wmissing-declarations"
#include <gtest/gtest.h>
#include <string>
#include <vector>
#include <Core/iostream_debug_helpers.h>
namespace std
{
template <typename T>
inline std::ostream& operator<<(std::ostream & ostr, const std::vector<T> & v)
{
ostr << "[";
for (const auto & i : v)
{
ostr << i << ", ";
}
return ostr << "] (" << v.size() << ") items";
}
}
using namespace DB;
struct ParseDataTypeTestCase
{
const char * type_name;
std::vector<String> values;
FieldVector expected_values;
};
std::ostream & operator<<(std::ostream & ostr, const ParseDataTypeTestCase & test_case)
{
return ostr << "ParseDataTypeTestCase{\"" << test_case.type_name << "\", " << test_case.values << "}";
}
class ParseDataTypeTest : public ::testing::TestWithParam<ParseDataTypeTestCase>
{
public:
void SetUp() override
{
const auto & p = GetParam();
data_type = DataTypeFactory::instance().get(p.type_name);
}
DataTypePtr data_type;
};
TEST_P(ParseDataTypeTest, parseStringValue)
{
const auto & p = GetParam();
auto col = data_type->createColumn();
for (const auto & value : p.values)
{
ReadBuffer buffer(const_cast<char *>(value.data()), value.size(), 0);
data_type->deserializeAsWholeText(*col, buffer, FormatSettings{});
}
ASSERT_EQ(p.expected_values.size(), col->size()) << "Actual items: " << *col;
for (size_t i = 0; i < col->size(); ++i)
{
ASSERT_EQ(p.expected_values[i], (*col)[i]);
}
}
INSTANTIATE_TEST_SUITE_P(ParseDecimal,
ParseDataTypeTest,
::testing::ValuesIn(
std::initializer_list<ParseDataTypeTestCase>{
{
"Decimal(8, 0)",
{"0", "5", "8", "-5", "-8", "12345678", "-12345678"},
std::initializer_list<Field>{
DecimalField<Decimal32>(0, 0),
DecimalField<Decimal32>(5, 0),
DecimalField<Decimal32>(8, 0),
DecimalField<Decimal32>(-5, 0),
DecimalField<Decimal32>(-8, 0),
DecimalField<Decimal32>(12345678, 0),
DecimalField<Decimal32>(-12345678, 0)
}
}
}
)
);

View File

@ -10,6 +10,7 @@
# include <DataTypes/DataTypesNumber.h>
# include <DataTypes/convertMySQLDataType.h>
# include <Databases/MySQL/DatabaseConnectionMySQL.h>
# include <Databases/MySQL/FetchTablesColumnsList.h>
# include <Formats/MySQLBlockInputStream.h>
# include <IO/Operators.h>
# include <Parsers/ASTCreateQuery.h>
@ -43,31 +44,14 @@ constexpr static const auto suffix = ".remove_flag";
static constexpr const std::chrono::seconds cleaner_sleep_time{30};
static const std::chrono::seconds lock_acquire_timeout{10};
static String toQueryStringWithQuote(const std::vector<String> & quote_list)
{
WriteBufferFromOwnString quote_list_query;
quote_list_query << "(";
for (size_t index = 0; index < quote_list.size(); ++index)
{
if (index)
quote_list_query << ",";
quote_list_query << quote << quote_list[index];
}
quote_list_query << ")";
return quote_list_query.str();
}
DatabaseConnectionMySQL::DatabaseConnectionMySQL(
const Context & global_context_, const String & database_name_, const String & metadata_path_,
DatabaseConnectionMySQL::DatabaseConnectionMySQL(const Context & context, const String & database_name_, const String & metadata_path_,
const ASTStorage * database_engine_define_, const String & database_name_in_mysql_, mysqlxx::Pool && pool)
: IDatabase(database_name_)
, global_context(global_context_.getGlobalContext())
, global_context(context.getGlobalContext())
, metadata_path(metadata_path_)
, database_engine_define(database_engine_define_->clone())
, database_name_in_mysql(database_name_in_mysql_)
, mysql_datatypes_support_level(context.getQueryContext().getSettingsRef().mysql_datatypes_support_level)
, mysql_pool(std::move(pool))
{
empty(); /// test database is works fine.
@ -78,7 +62,7 @@ bool DatabaseConnectionMySQL::empty() const
{
std::lock_guard<std::mutex> lock(mutex);
fetchTablesIntoLocalCache();
fetchTablesIntoLocalCache(global_context);
if (local_tables_cache.empty())
return true;
@ -90,12 +74,12 @@ bool DatabaseConnectionMySQL::empty() const
return true;
}
DatabaseTablesIteratorPtr DatabaseConnectionMySQL::getTablesIterator(const Context &, const FilterByNameFunction & filter_by_table_name)
DatabaseTablesIteratorPtr DatabaseConnectionMySQL::getTablesIterator(const Context & context, const FilterByNameFunction & filter_by_table_name)
{
Tables tables;
std::lock_guard<std::mutex> lock(mutex);
fetchTablesIntoLocalCache();
fetchTablesIntoLocalCache(context);
for (const auto & [table_name, modify_time_and_storage] : local_tables_cache)
if (!remove_or_detach_tables.count(table_name) && (!filter_by_table_name || filter_by_table_name(table_name)))
@ -109,11 +93,11 @@ bool DatabaseConnectionMySQL::isTableExist(const String & name, const Context &
return bool(tryGetTable(name, context));
}
StoragePtr DatabaseConnectionMySQL::tryGetTable(const String & mysql_table_name, const Context &) const
StoragePtr DatabaseConnectionMySQL::tryGetTable(const String & mysql_table_name, const Context & context) const
{
std::lock_guard<std::mutex> lock(mutex);
fetchTablesIntoLocalCache();
fetchTablesIntoLocalCache(context);
if (!remove_or_detach_tables.count(mysql_table_name) && local_tables_cache.find(mysql_table_name) != local_tables_cache.end())
return local_tables_cache[mysql_table_name].second;
@ -157,11 +141,11 @@ static ASTPtr getCreateQueryFromStorage(const StoragePtr & storage, const ASTPtr
return create_table_query;
}
ASTPtr DatabaseConnectionMySQL::getCreateTableQueryImpl(const String & table_name, const Context &, bool throw_on_error) const
ASTPtr DatabaseConnectionMySQL::getCreateTableQueryImpl(const String & table_name, const Context & context, bool throw_on_error) const
{
std::lock_guard<std::mutex> lock(mutex);
fetchTablesIntoLocalCache();
fetchTablesIntoLocalCache(context);
if (local_tables_cache.find(table_name) == local_tables_cache.end())
{
@ -178,7 +162,7 @@ time_t DatabaseConnectionMySQL::getObjectMetadataModificationTime(const String &
{
std::lock_guard<std::mutex> lock(mutex);
fetchTablesIntoLocalCache();
fetchTablesIntoLocalCache(global_context);
if (local_tables_cache.find(table_name) == local_tables_cache.end())
throw Exception("MySQL table " + database_name_in_mysql + "." + table_name + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
@ -194,12 +178,12 @@ ASTPtr DatabaseConnectionMySQL::getCreateDatabaseQuery() const
return create_query;
}
void DatabaseConnectionMySQL::fetchTablesIntoLocalCache() const
void DatabaseConnectionMySQL::fetchTablesIntoLocalCache(const Context & context) const
{
const auto & tables_with_modification_time = fetchTablesWithModificationTime();
destroyLocalCacheExtraTables(tables_with_modification_time);
fetchLatestTablesStructureIntoCache(tables_with_modification_time);
fetchLatestTablesStructureIntoCache(tables_with_modification_time, context);
}
void DatabaseConnectionMySQL::destroyLocalCacheExtraTables(const std::map<String, UInt64> & tables_with_modification_time) const
@ -216,7 +200,7 @@ void DatabaseConnectionMySQL::destroyLocalCacheExtraTables(const std::map<String
}
}
void DatabaseConnectionMySQL::fetchLatestTablesStructureIntoCache(const std::map<String, UInt64> &tables_modification_time) const
void DatabaseConnectionMySQL::fetchLatestTablesStructureIntoCache(const std::map<String, UInt64> &tables_modification_time, const Context & context) const
{
std::vector<String> wait_update_tables_name;
for (const auto & table_modification_time : tables_modification_time)
@ -228,7 +212,7 @@ void DatabaseConnectionMySQL::fetchLatestTablesStructureIntoCache(const std::map
wait_update_tables_name.emplace_back(table_modification_time.first);
}
std::map<String, NamesAndTypesList> tables_and_columns = fetchTablesColumnsList(wait_update_tables_name);
std::map<String, NamesAndTypesList> tables_and_columns = fetchTablesColumnsList(wait_update_tables_name, context);
for (const auto & table_and_columns : tables_and_columns)
{
@ -280,53 +264,16 @@ std::map<String, UInt64> DatabaseConnectionMySQL::fetchTablesWithModificationTim
return tables_with_modification_time;
}
std::map<String, NamesAndTypesList> DatabaseConnectionMySQL::fetchTablesColumnsList(const std::vector<String> & tables_name) const
std::map<String, NamesAndTypesList> DatabaseConnectionMySQL::fetchTablesColumnsList(const std::vector<String> & tables_name, const Context & context) const
{
std::map<String, NamesAndTypesList> tables_and_columns;
const auto & settings = context.getSettingsRef();
if (tables_name.empty())
return tables_and_columns;
Block tables_columns_sample_block
{
{ std::make_shared<DataTypeString>(), "table_name" },
{ std::make_shared<DataTypeString>(), "column_name" },
{ std::make_shared<DataTypeString>(), "column_type" },
{ std::make_shared<DataTypeUInt8>(), "is_nullable" },
{ std::make_shared<DataTypeUInt8>(), "is_unsigned" },
{ std::make_shared<DataTypeUInt64>(), "length" },
};
WriteBufferFromOwnString query;
query << "SELECT "
" TABLE_NAME AS table_name,"
" COLUMN_NAME AS column_name,"
" DATA_TYPE AS column_type,"
" IS_NULLABLE = 'YES' AS is_nullable,"
" COLUMN_TYPE LIKE '%unsigned' AS is_unsigned,"
" CHARACTER_MAXIMUM_LENGTH AS length"
" FROM INFORMATION_SCHEMA.COLUMNS"
" WHERE TABLE_SCHEMA = " << quote << database_name_in_mysql
<< " AND TABLE_NAME IN " << toQueryStringWithQuote(tables_name) << " ORDER BY ORDINAL_POSITION";
const auto & external_table_functions_use_nulls = global_context.getSettings().external_table_functions_use_nulls;
MySQLBlockInputStream result(mysql_pool.get(), query.str(), tables_columns_sample_block, DEFAULT_BLOCK_SIZE);
while (Block block = result.read())
{
size_t rows = block.rows();
for (size_t i = 0; i < rows; ++i)
{
String table_name = (*block.getByPosition(0).column)[i].safeGet<String>();
tables_and_columns[table_name].emplace_back((*block.getByPosition(1).column)[i].safeGet<String>(),
convertMySQLDataType(
(*block.getByPosition(2).column)[i].safeGet<String>(),
(*block.getByPosition(3).column)[i].safeGet<UInt64>() &&
external_table_functions_use_nulls,
(*block.getByPosition(4).column)[i].safeGet<UInt64>(),
(*block.getByPosition(5).column)[i].safeGet<UInt64>()));
}
}
return tables_and_columns;
return DB::fetchTablesColumnsList(
mysql_pool,
database_name_in_mysql,
tables_name,
settings.external_table_functions_use_nulls,
mysql_datatypes_support_level);
}
void DatabaseConnectionMySQL::shutdown()

View File

@ -4,17 +4,27 @@
#if USE_MYSQL
#include <mysqlxx/Pool.h>
#include <Databases/DatabasesCommon.h>
#include <memory>
#include <Parsers/ASTCreateQuery.h>
#include <Common/ThreadPool.h>
#include <Core/MultiEnum.h>
#include <Common/ThreadPool.h>
#include <Databases/DatabasesCommon.h>
#include <Parsers/ASTCreateQuery.h>
#include <atomic>
#include <condition_variable>
#include <map>
#include <memory>
#include <mutex>
#include <unordered_set>
#include <vector>
namespace DB
{
class Context;
enum class MySQLDataTypesSupport;
/** Real-time access to table list and table structure from remote MySQL
* It doesn't make any manipulations with filesystem.
* All tables are created by calling code after real-time pull-out structure from remote MySQL
@ -25,7 +35,7 @@ public:
~DatabaseConnectionMySQL() override;
DatabaseConnectionMySQL(
const Context & global_context, const String & database_name, const String & metadata_path,
const Context & context, const String & database_name, const String & metadata_path,
const ASTStorage * database_engine_define, const String & database_name_in_mysql, mysqlxx::Pool && pool);
String getEngineName() const override { return "MySQL"; }
@ -66,6 +76,9 @@ private:
String metadata_path;
ASTPtr database_engine_define;
String database_name_in_mysql;
// Cache setting for later from query context upon creation,
// so column types depend on the settings set at query-level.
MultiEnum<MySQLDataTypesSupport> mysql_datatypes_support_level;
std::atomic<bool> quit{false};
std::condition_variable cond;
@ -81,15 +94,15 @@ private:
void cleanOutdatedTables();
void fetchTablesIntoLocalCache() const;
void fetchTablesIntoLocalCache(const Context & context) const;
std::map<String, UInt64> fetchTablesWithModificationTime() const;
std::map<String, NamesAndTypesList> fetchTablesColumnsList(const std::vector<String> & tables_name) const;
std::map<String, NamesAndTypesList> fetchTablesColumnsList(const std::vector<String> & tables_name, const Context & context) const;
void destroyLocalCacheExtraTables(const std::map<String, UInt64> & tables_with_modification_time) const;
void fetchLatestTablesStructureIntoCache(const std::map<String, UInt64> & tables_modification_time) const;
void fetchLatestTablesStructureIntoCache(const std::map<String, UInt64> & tables_modification_time, const Context & context) const;
ThreadFromGlobalPool thread;
};

View File

@ -0,0 +1,114 @@
#if !defined(ARCADIA_BUILD)
# include "config_core.h"
#endif
#if USE_MYSQL
#include <Core/Block.h>
#include <Databases/MySQL/FetchTablesColumnsList.h>
#include <DataTypes/convertMySQLDataType.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Formats/MySQLBlockInputStream.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <IO/Operators.h>
#include <memory>
namespace
{
using namespace DB;
String toQueryStringWithQuote(const std::vector<String> & quote_list)
{
WriteBufferFromOwnString quote_list_query;
quote_list_query << "(";
for (size_t index = 0; index < quote_list.size(); ++index)
{
if (index)
quote_list_query << ",";
quote_list_query << quote << quote_list[index];
}
quote_list_query << ")";
return quote_list_query.str();
}
}
namespace DB
{
std::map<String, NamesAndTypesList> fetchTablesColumnsList(
mysqlxx::Pool & pool,
const String & database_name,
const std::vector<String> & tables_name,
bool external_table_functions_use_nulls,
MultiEnum<MySQLDataTypesSupport> type_support)
{
std::map<String, NamesAndTypesList> tables_and_columns;
if (tables_name.empty())
return tables_and_columns;
Block tables_columns_sample_block
{
{ std::make_shared<DataTypeString>(), "table_name" },
{ std::make_shared<DataTypeString>(), "column_name" },
{ std::make_shared<DataTypeString>(), "column_type" },
{ std::make_shared<DataTypeUInt8>(), "is_nullable" },
{ std::make_shared<DataTypeUInt8>(), "is_unsigned" },
{ std::make_shared<DataTypeUInt64>(), "length" },
{ std::make_shared<DataTypeUInt64>(), "precision" },
{ std::make_shared<DataTypeUInt64>(), "scale" },
};
WriteBufferFromOwnString query;
query << "SELECT "
" TABLE_NAME AS table_name,"
" COLUMN_NAME AS column_name,"
" COLUMN_TYPE AS column_type,"
" IS_NULLABLE = 'YES' AS is_nullable,"
" COLUMN_TYPE LIKE '%unsigned' AS is_unsigned,"
" CHARACTER_MAXIMUM_LENGTH AS length,"
" NUMERIC_PRECISION as '',"
" IF(ISNULL(NUMERIC_SCALE), DATETIME_PRECISION, NUMERIC_SCALE) AS scale" // we know DATETIME_PRECISION as a scale in CH
" FROM INFORMATION_SCHEMA.COLUMNS"
" WHERE TABLE_SCHEMA = " << quote << database_name
<< " AND TABLE_NAME IN " << toQueryStringWithQuote(tables_name) << " ORDER BY ORDINAL_POSITION";
MySQLBlockInputStream result(pool.get(), query.str(), tables_columns_sample_block, DEFAULT_BLOCK_SIZE);
while (Block block = result.read())
{
const auto & table_name_col = *block.getByPosition(0).column;
const auto & column_name_col = *block.getByPosition(1).column;
const auto & column_type_col = *block.getByPosition(2).column;
const auto & is_nullable_col = *block.getByPosition(3).column;
const auto & is_unsigned_col = *block.getByPosition(4).column;
const auto & char_max_length_col = *block.getByPosition(5).column;
const auto & precision_col = *block.getByPosition(6).column;
const auto & scale_col = *block.getByPosition(7).column;
size_t rows = block.rows();
for (size_t i = 0; i < rows; ++i)
{
String table_name = table_name_col[i].safeGet<String>();
tables_and_columns[table_name].emplace_back(
column_name_col[i].safeGet<String>(),
convertMySQLDataType(
type_support,
column_type_col[i].safeGet<String>(),
external_table_functions_use_nulls && is_nullable_col[i].safeGet<UInt64>(),
is_unsigned_col[i].safeGet<UInt64>(),
char_max_length_col[i].safeGet<UInt64>(),
precision_col[i].safeGet<UInt64>(),
scale_col[i].safeGet<UInt64>()));
}
}
return tables_and_columns;
}
}
#endif

View File

@ -0,0 +1,28 @@
#pragma once
#include "config_core.h"
#if USE_MYSQL
#include <mysqlxx/Pool.h>
#include <common/types.h>
#include <Core/MultiEnum.h>
#include <Core/NamesAndTypes.h>
#include <Core/SettingsEnums.h>
#include <map>
#include <vector>
namespace DB
{
std::map<String, NamesAndTypesList> fetchTablesColumnsList(
mysqlxx::Pool & pool,
const String & database_name,
const std::vector<String> & tables_name,
bool external_table_functions_use_nulls,
MultiEnum<MySQLDataTypesSupport> type_support);
}
#endif

View File

@ -19,6 +19,7 @@ SRCS(
DatabaseWithDictionaries.cpp
MySQL/DatabaseConnectionMySQL.cpp
MySQL/DatabaseMaterializeMySQL.cpp
MySQL/FetchTablesColumnsList.cpp
MySQL/MaterializeMetadata.cpp
MySQL/MaterializeMySQLSettings.cpp
MySQL/MaterializeMySQLSyncThread.cpp

View File

@ -19,6 +19,7 @@ namespace DB
namespace ErrorCodes
{
extern const int TYPE_MISMATCH;
extern const int UNKNOWN_TYPE;
}
CassandraBlockInputStream::CassandraBlockInputStream(
@ -140,6 +141,8 @@ void CassandraBlockInputStream::insertValue(IColumn & column, ValueType type, co
assert_cast<ColumnUInt128 &>(column).insert(parse<UUID>(uuid_str.data(), uuid_str.size()));
break;
}
default:
throw Exception("Unknown type : " + std::to_string(static_cast<int>(type)), ErrorCodes::UNKNOWN_TYPE);
}
}
@ -252,6 +255,8 @@ void CassandraBlockInputStream::assertTypes(const CassResultPtr & result)
expected = CASS_VALUE_TYPE_UUID;
expected_text = "uuid";
break;
default:
throw Exception("Unknown type : " + std::to_string(static_cast<int>(description.types[i].first)), ErrorCodes::UNKNOWN_TYPE);
}
CassValueType got = cass_result_column_type(result, i);

View File

@ -26,6 +26,7 @@ namespace DB
extern const int LOGICAL_ERROR;
extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH;
extern const int INTERNAL_REDIS_ERROR;
extern const int UNKNOWN_TYPE;
}
@ -103,6 +104,8 @@ namespace DB
case ValueType::vtUUID:
assert_cast<ColumnUInt128 &>(column).insertValue(parse<UUID>(string_value));
break;
default:
throw Exception("Value of unsupported type:" + column.getName(), ErrorCodes::UNKNOWN_TYPE);
}
}
}

View File

@ -7,13 +7,15 @@
# include <Columns/ColumnNullable.h>
# include <Columns/ColumnString.h>
# include <Columns/ColumnsNumber.h>
# include <Columns/ColumnDecimal.h>
# include <DataTypes/IDataType.h>
# include <DataTypes/DataTypeNullable.h>
# include <IO/ReadHelpers.h>
# include <IO/WriteHelpers.h>
# include <Common/assert_cast.h>
# include <ext/range.h>
# include "MySQLBlockInputStream.h"
namespace DB
{
namespace ErrorCodes
@ -39,7 +41,7 @@ namespace
{
using ValueType = ExternalResultDescription::ValueType;
void insertValue(IColumn & column, const ValueType type, const mysqlxx::Value & value)
void insertValue(const IDataType & data_type, IColumn & column, const ValueType type, const mysqlxx::Value & value)
{
switch (type)
{
@ -85,6 +87,15 @@ namespace
case ValueType::vtUUID:
assert_cast<ColumnUInt128 &>(column).insert(parse<UUID>(value.data(), value.size()));
break;
case ValueType::vtDateTime64:[[fallthrough]];
case ValueType::vtDecimal32: [[fallthrough]];
case ValueType::vtDecimal64: [[fallthrough]];
case ValueType::vtDecimal128:
{
ReadBuffer buffer(const_cast<char *>(value.data()), value.size(), 0);
data_type.deserializeAsWholeText(column, buffer, FormatSettings{});
break;
}
}
}
@ -112,19 +123,21 @@ Block MySQLBlockInputStream::readImpl()
for (const auto idx : ext::range(0, row.size()))
{
const auto value = row[idx];
const auto & sample = description.sample_block.getByPosition(idx);
if (!value.isNull())
{
if (description.types[idx].second)
{
ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*columns[idx]);
insertValue(column_nullable.getNestedColumn(), description.types[idx].first, value);
const auto & data_type = assert_cast<const DataTypeNullable &>(*sample.type);
insertValue(*data_type.getNestedType(), column_nullable.getNestedColumn(), description.types[idx].first, value);
column_nullable.getNullMapData().emplace_back(0);
}
else
insertValue(*columns[idx], description.types[idx].first, value);
insertValue(*sample.type, *columns[idx], description.types[idx].first, value);
}
else
insertDefaultValue(*columns[idx], *description.sample_block.getByPosition(idx).column);
insertDefaultValue(*columns[idx], *sample.column);
}
++num_rows;

View File

@ -4,6 +4,7 @@
#if USE_MYSQL
# include <Core/Defines.h>
# include <Databases/MySQL/FetchTablesColumnsList.h>
# include <DataTypes/DataTypeString.h>
# include <DataTypes/DataTypesNumber.h>
# include <DataTypes/convertMySQLDataType.h>
@ -21,6 +22,8 @@
# include <Common/quoteString.h>
# include "registerTableFunctions.h"
# include <Databases/MySQL/DatabaseConnectionMySQL.h> // for fetchTablesColumnsList
# include <mysqlxx/Pool.h>
@ -74,47 +77,11 @@ StoragePtr TableFunctionMySQL::executeImpl(const ASTPtr & ast_function, const Co
auto parsed_host_port = parseAddress(host_port, 3306);
mysqlxx::Pool pool(remote_database_name, parsed_host_port.first, user_name, password, parsed_host_port.second);
const auto & settings = context.getSettingsRef();
const auto tables_and_columns = fetchTablesColumnsList(pool, remote_database_name, {remote_table_name}, settings.external_table_functions_use_nulls, settings.mysql_datatypes_support_level);
/// Determine table definition by running a query to INFORMATION_SCHEMA.
Block sample_block
{
{ std::make_shared<DataTypeString>(), "name" },
{ std::make_shared<DataTypeString>(), "type" },
{ std::make_shared<DataTypeUInt8>(), "is_nullable" },
{ std::make_shared<DataTypeUInt8>(), "is_unsigned" },
{ std::make_shared<DataTypeUInt64>(), "length" },
};
WriteBufferFromOwnString query;
query << "SELECT"
" COLUMN_NAME AS name,"
" DATA_TYPE AS type,"
" IS_NULLABLE = 'YES' AS is_nullable,"
" COLUMN_TYPE LIKE '%unsigned' AS is_unsigned,"
" CHARACTER_MAXIMUM_LENGTH AS length"
" FROM INFORMATION_SCHEMA.COLUMNS"
" WHERE TABLE_SCHEMA = " << quote << remote_database_name
<< " AND TABLE_NAME = " << quote << remote_table_name
<< " ORDER BY ORDINAL_POSITION";
NamesAndTypesList columns;
MySQLBlockInputStream result(pool.get(), query.str(), sample_block, DEFAULT_BLOCK_SIZE);
while (Block block = result.read())
{
size_t rows = block.rows();
for (size_t i = 0; i < rows; ++i)
columns.emplace_back(
(*block.getByPosition(0).column)[i].safeGet<String>(),
convertMySQLDataType(
(*block.getByPosition(1).column)[i].safeGet<String>(),
(*block.getByPosition(2).column)[i].safeGet<UInt64>() && context.getSettings().external_table_functions_use_nulls,
(*block.getByPosition(3).column)[i].safeGet<UInt64>(),
(*block.getByPosition(4).column)[i].safeGet<UInt64>()));
}
if (columns.empty())
const auto columns = tables_and_columns.find(remote_table_name);
if (columns == tables_and_columns.end())
throw Exception("MySQL table " + backQuoteIfNeed(remote_database_name) + "." + backQuoteIfNeed(remote_table_name) + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
auto res = StorageMySQL::create(
@ -124,7 +91,7 @@ StoragePtr TableFunctionMySQL::executeImpl(const ASTPtr & ast_function, const Co
remote_table_name,
replace_query,
on_duplicate_clause,
ColumnsDescription{columns},
ColumnsDescription{columns->second},
ConstraintsDescription{},
context);

View File

@ -7,6 +7,8 @@ import pytest
from helpers.cluster import ClickHouseCluster
from helpers.client import QueryRuntimeException
from string import Template
cluster = ClickHouseCluster(__file__)
clickhouse_node = cluster.add_instance('node1', main_configs=['configs/remote_servers.xml'], with_mysql=True)
@ -32,7 +34,21 @@ class MySQLNodeInstance:
if self.mysql_connection is None:
self.mysql_connection = pymysql.connect(user=self.user, password=self.password, host=self.hostname, port=self.port)
with self.mysql_connection.cursor() as cursor:
cursor.execute(execution_query)
def execute(query):
res = cursor.execute(query)
if query.lstrip().lower().startswith(('select', 'show')):
# Mimic output of the ClickHouseInstance, which is:
# tab-sparated values and newline (\n)-separated rows.
rows = []
for row in cursor.fetchall():
rows.append("\t".join(str(item) for item in row))
res = "\n".join(rows)
return res
if isinstance(execution_query, (str, bytes, unicode)):
return execute(execution_query)
else:
return [execute(q) for q in execution_query]
def close(self):
if self.mysql_connection is not None:
@ -96,7 +112,7 @@ def test_clickhouse_dml_for_mysql_database(started_cluster):
clickhouse_node.query("CREATE DATABASE test_database ENGINE = MySQL('mysql1:3306', test_database, 'root', 'clickhouse')")
assert clickhouse_node.query("SELECT count() FROM `test_database`.`test_table`").rstrip() == '0'
clickhouse_node.query("INSERT INTO `test_database`.`test_table`(`i\`d`) select number from numbers(10000)")
clickhouse_node.query("INSERT INTO `test_database`.`test_table`(`i``d`) select number from numbers(10000)")
assert clickhouse_node.query("SELECT count() FROM `test_database`.`test_table`").rstrip() == '10000'
mysql_node.query("DROP DATABASE test_database")
@ -130,3 +146,132 @@ def test_bad_arguments_for_mysql_database_engine(started_cluster):
clickhouse_node.query("CREATE DATABASE test_database_bad_arguments ENGINE = MySQL('mysql1:3306', test_bad_arguments, root, 'clickhouse')")
assert 'Database engine MySQL requested literal argument.' in str(exception.value)
mysql_node.query("DROP DATABASE test_bad_arguments")
decimal_values = [0.123, 0.4, 5.67, 8.91011, 123456789.123, -0.123, -0.4, -5.67, -8.91011, -123456789.123]
timestamp_values = ['2015-05-18 07:40:01.123', '2019-09-16 19:20:11.123']
timestamp_values_no_subsecond = ['2015-05-18 07:40:01', '2019-09-16 19:20:11']
@pytest.mark.parametrize("case_name, mysql_type, expected_ch_type, mysql_values, setting_mysql_datatypes_support_level",
[
("decimal_default", "decimal NOT NULL", "Decimal(10, 0)", decimal_values, "decimal,datetime64"),
("decimal_default_nullable", "decimal", "Nullable(Decimal(10, 0))", decimal_values, "decimal,datetime64"),
("decimal_18_6", "decimal(18, 6) NOT NULL", "Decimal(18, 6)", decimal_values, "decimal,datetime64"),
("decimal_38_6", "decimal(38, 6) NOT NULL", "Decimal(38, 6)", decimal_values, "decimal,datetime64"),
# Due to python DB driver roundtrip MySQL timestamp and datetime values
# are printed with 6 digits after decimal point, so to simplify tests a bit,
# we only validate precision of 0 and 6.
("timestamp_default", "timestamp", "DateTime", timestamp_values, "decimal,datetime64"),
("timestamp_6", "timestamp(6)", "DateTime64(6)", timestamp_values, "decimal,datetime64"),
("datetime_default", "DATETIME NOT NULL", "DateTime64(0)", timestamp_values, "decimal,datetime64"),
("datetime_6", "DATETIME(6) NOT NULL", "DateTime64(6)", timestamp_values, "decimal,datetime64"),
# right now precision bigger than 39 is not supported by ClickHouse's Decimal, hence fall back to String
("decimal_40_6", "decimal(40, 6) NOT NULL", "String", decimal_values, "decimal,datetime64"),
("decimal_18_6", "decimal(18, 6) NOT NULL", "String", decimal_values, "datetime64"),
("decimal_18_6", "decimal(18, 6) NOT NULL", "String", decimal_values, ""),
("datetime_6", "DATETIME(6) NOT NULL", "DateTime", timestamp_values_no_subsecond, "decimal"),
("datetime_6", "DATETIME(6) NOT NULL", "DateTime", timestamp_values_no_subsecond, ""),
])
def test_mysql_types(started_cluster, case_name, mysql_type, expected_ch_type, mysql_values, setting_mysql_datatypes_support_level):
""" Verify that values written to MySQL can be read on ClickHouse side via DB engine MySQL,
or Table engine MySQL, or mysql() table function.
Make sure that type is converted properly and values match exactly.
"""
substitutes = dict(
mysql_db = 'decimal_support',
table_name = case_name,
mysql_type = mysql_type,
mysql_values = ', '.join('({})'.format(repr(x)) for x in mysql_values),
ch_mysql_db = 'mysql_db',
ch_mysql_table = 'mysql_table_engine_' + case_name,
expected_ch_type = expected_ch_type,
)
clickhouse_query_settings = dict(
mysql_datatypes_support_level = setting_mysql_datatypes_support_level
)
def execute_query(node, query, **kwargs):
def do_execute(query):
query = Template(query).safe_substitute(substitutes)
res = node.query(query, **kwargs)
return res if isinstance(res, int) else res.rstrip('\n\r')
if isinstance(query, (str, bytes, unicode)):
return do_execute(query)
else:
return [do_execute(q) for q in query]
with contextlib.closing(MySQLNodeInstance('root', 'clickhouse', '127.0.0.1', port=3308)) as mysql_node:
execute_query(mysql_node, [
"DROP DATABASE IF EXISTS ${mysql_db}",
"CREATE DATABASE ${mysql_db} DEFAULT CHARACTER SET 'utf8'",
"CREATE TABLE `${mysql_db}`.`${table_name}` (value ${mysql_type})",
"INSERT INTO `${mysql_db}`.`${table_name}` (value) VALUES ${mysql_values}",
"SELECT * FROM `${mysql_db}`.`${table_name}`",
"FLUSH TABLES"
])
assert execute_query(mysql_node, "SELECT COUNT(*) FROM ${mysql_db}.${table_name}") \
== \
"{}".format(len(mysql_values))
# MySQL TABLE ENGINE
execute_query(clickhouse_node, [
"DROP TABLE IF EXISTS ${ch_mysql_table};",
"CREATE TABLE ${ch_mysql_table} (value ${expected_ch_type}) ENGINE = MySQL('mysql1:3306', '${mysql_db}', '${table_name}', 'root', 'clickhouse')",
], settings=clickhouse_query_settings)
# Validate type
assert \
execute_query(clickhouse_node, "SELECT toTypeName(value) FROM ${ch_mysql_table} LIMIT 1",
settings=clickhouse_query_settings) \
== \
expected_ch_type
# Validate values
assert \
execute_query(clickhouse_node, "SELECT value FROM ${ch_mysql_table}",
settings=clickhouse_query_settings) \
== \
execute_query(mysql_node, "SELECT value FROM ${mysql_db}.${table_name}")
# MySQL DATABASE ENGINE
execute_query(clickhouse_node, [
"DROP DATABASE IF EXISTS ${ch_mysql_db}",
"CREATE DATABASE ${ch_mysql_db} ENGINE = MySQL('mysql1:3306', '${mysql_db}', 'root', 'clickhouse')"
], settings=clickhouse_query_settings)
# Validate type
assert \
execute_query(clickhouse_node, "SELECT toTypeName(value) FROM ${ch_mysql_db}.${table_name} LIMIT 1",
settings=clickhouse_query_settings) \
== \
expected_ch_type
# Validate values
assert \
execute_query(clickhouse_node, "SELECT value FROM ${ch_mysql_db}.${table_name}",
settings=clickhouse_query_settings) \
== \
execute_query(mysql_node, "SELECT value FROM ${mysql_db}.${table_name}")
# MySQL TABLE FUNCTION
# Validate type
assert \
execute_query(clickhouse_node, "SELECT toTypeName(value) FROM mysql('mysql1:3306', '${mysql_db}', '${table_name}', 'root', 'clickhouse') LIMIT 1",
settings=clickhouse_query_settings) \
== \
expected_ch_type
# Validate values
assert \
execute_query(mysql_node, "SELECT value FROM ${mysql_db}.${table_name}") \
== \
execute_query(clickhouse_node, "SELECT value FROM mysql('mysql1:3306', '${mysql_db}', '${table_name}', 'root', 'clickhouse')",
settings=clickhouse_query_settings)