ISSUES-4006 add mysql dml test

This commit is contained in:
zhang2014 2020-07-21 01:44:38 +08:00
parent 9996166860
commit b76f218d90
8 changed files with 335 additions and 44 deletions

View File

@ -385,9 +385,9 @@ namespace MySQLReplication
break;
}
case MYSQL_TYPE_FLOAT: {
Float64 val = 0;
Float32 val = 0;
payload.readStrict(reinterpret_cast<char *>(&val), 4);
row.push_back(Field{Float64{val}});
row.push_back(Field{Float32{val}});
break;
}
case MYSQL_TYPE_DOUBLE: {

View File

@ -62,6 +62,7 @@ void registerDataTypeNumbers(DataTypeFactory & factory)
factory.registerAlias("REAL", "Float32", DataTypeFactory::CaseInsensitive);
factory.registerAlias("SINGLE", "Float32", DataTypeFactory::CaseInsensitive); /// MS Access
factory.registerAlias("DOUBLE", "Float64", DataTypeFactory::CaseInsensitive);
factory.registerAlias("MEDIUMINT", "Int32", DataTypeFactory::CaseInsensitive); /// MySQL
factory.registerAlias("DOUBLE PRECISION", "Float64", DataTypeFactory::CaseInsensitive);
@ -69,16 +70,17 @@ void registerDataTypeNumbers(DataTypeFactory & factory)
factory.registerAlias("TINYINT SIGNED", "Int8", DataTypeFactory::CaseInsensitive);
factory.registerAlias("INT1 SIGNED", "Int8", DataTypeFactory::CaseInsensitive);
factory.registerAlias("SMALLINT SIGNED", "Int16", DataTypeFactory::CaseInsensitive);
factory.registerAlias("MEDIUMINT SIGNED", "Int32", DataTypeFactory::CaseInsensitive);
factory.registerAlias("INT SIGNED", "Int32", DataTypeFactory::CaseInsensitive);
factory.registerAlias("INTEGER SIGNED", "Int32", DataTypeFactory::CaseInsensitive);
factory.registerAlias("BIGINT SIGNED", "Int64", DataTypeFactory::CaseInsensitive);
factory.registerAlias("TINYINT UNSIGNED", "UInt8", DataTypeFactory::CaseInsensitive);
factory.registerAlias("INT1 UNSIGNED", "UInt8", DataTypeFactory::CaseInsensitive);
factory.registerAlias("SMALLINT UNSIGNED", "UInt16", DataTypeFactory::CaseInsensitive);
factory.registerAlias("MEDIUMINT UNSIGNED", "UInt32", DataTypeFactory::CaseInsensitive);
factory.registerAlias("INT UNSIGNED", "UInt32", DataTypeFactory::CaseInsensitive);
factory.registerAlias("INTEGER UNSIGNED", "UInt32", DataTypeFactory::CaseInsensitive);
factory.registerAlias("BIGINT UNSIGNED", "UInt64", DataTypeFactory::CaseInsensitive);
}
}

View File

@ -30,6 +30,7 @@ namespace ErrorCodes
{
extern const int SYNTAX_ERROR;
extern const int LOGICAL_ERROR;
extern const int NOT_IMPLEMENTED;
extern const int ILLEGAL_MYSQL_VARIABLE;
}
@ -357,6 +358,114 @@ static inline void fillSignAndVersionColumnsData(Block & data, Int8 sign_value,
data.getByPosition(data.columns() - 1).column = std::move(version_mutable_column);
}
template <bool assert_nullable = false>
static void writeFieldsToColumn(
IColumn & column_to, const std::vector<Field> & rows_data, size_t column_index, const std::vector<bool> & mask, ColumnUInt8 * null_map_column = nullptr)
{
if (ColumnNullable * column_nullable = typeid_cast<ColumnNullable *>(&column_to))
writeFieldsToColumn<true>(column_nullable->getNestedColumn(), rows_data, column_index, mask, &column_nullable->getNullMapColumn());
else
{
const auto & write_data_to_null_map = [&](const Field & field, size_t row_index)
{
if (!mask.empty() && !mask.at(row_index))
return false;
if constexpr (assert_nullable)
{
if (field.isNull())
{
column_to.insertDefault();
null_map_column->insertDefault();
return false;
}
null_map_column->insertValue(0);
}
return true;
};
const auto & write_data_to_column = [&](auto * casted_column, auto from_type, auto to_type) {
for (size_t index = 0; index < rows_data.size(); ++index)
{
const Field & value = DB::get<const Tuple &>(rows_data[index])[column_index];
if (write_data_to_null_map(value, index))
casted_column->insertValue(static_cast<decltype(to_type)>(value.template get<decltype(from_type)>()));
}
};
if (ColumnInt8 * casted_column = typeid_cast<ColumnInt8 *>(&column_to))
write_data_to_column(casted_column, UInt8(), Int8());
else if (ColumnInt16 * casted_column = typeid_cast<ColumnInt16 *>(&column_to))
write_data_to_column(casted_column, UInt16(), Int16());
else if (ColumnInt64 * casted_column = typeid_cast<ColumnInt64 *>(&column_to))
write_data_to_column(casted_column, UInt64(), Int64());
else if (ColumnUInt8 * casted_column = typeid_cast<ColumnUInt8 *>(&column_to))
write_data_to_column(casted_column, UInt8(), UInt8());
else if (ColumnUInt16 * casted_column = typeid_cast<ColumnUInt16 *>(&column_to))
write_data_to_column(casted_column, UInt16(), UInt16());
else if (ColumnUInt32 * casted_column = typeid_cast<ColumnUInt32 *>(&column_to))
write_data_to_column(casted_column, UInt32(), UInt32());
else if (ColumnUInt64 * casted_column = typeid_cast<ColumnUInt64 *>(&column_to))
write_data_to_column(casted_column, UInt64(), UInt64());
else if (ColumnFloat32 * casted_column = typeid_cast<ColumnFloat32 *>(&column_to))
write_data_to_column(casted_column, Float32(), Float32());
else if (ColumnFloat64 * casted_column = typeid_cast<ColumnFloat64 *>(&column_to))
write_data_to_column(casted_column, Float64(), Float64());
else if (ColumnInt32 * casted_column = typeid_cast<ColumnInt32 *>(&column_to))
{
for (size_t index = 0; index < rows_data.size(); ++index)
{
const Field & value = DB::get<const Tuple &>(rows_data[index])[column_index];
if (write_data_to_null_map(value, index))
{
if (value.getType() == Field::Types::UInt64)
casted_column->insertValue(value.get<Int32>());
else if (value.getType() == Field::Types::Int64)
{
/// For MYSQL_TYPE_INT24
const Int32 & num = value.get<Int32>();
casted_column->insertValue(num & 0x800000 ? num | 0xFF000000 : num);
}
else
throw Exception("LOGICAL ERROR: it is a bug.", ErrorCodes::LOGICAL_ERROR);
}
}
}
else if (ColumnString * casted_column = typeid_cast<ColumnString *>(&column_to))
{
for (size_t index = 0; index < rows_data.size(); ++index)
{
const Field & value = DB::get<const Tuple &>(rows_data[index])[column_index];
if (write_data_to_null_map(value, index))
{
const String & data = value.get<const String &>();
casted_column->insertData(data.data(), data.size());
}
}
}
else if (ColumnFixedString * casted_column = typeid_cast<ColumnFixedString *>(&column_to))
{
for (size_t index = 0; index < rows_data.size(); ++index)
{
const Field & value = DB::get<const Tuple &>(rows_data[index])[column_index];
if (write_data_to_null_map(value, index))
{
const String & data = value.get<const String &>();
casted_column->insertData(data.data(), data.size());
}
}
}
else
throw Exception("Unsupported data type from MySQL.", ErrorCodes::NOT_IMPLEMENTED);
}
}
template <Int8 sign>
static size_t onWriteOrDeleteData(const std::vector<Field> & rows_data, Block & buffer, size_t version)
{
@ -365,9 +474,7 @@ static size_t onWriteOrDeleteData(const std::vector<Field> & rows_data, Block &
{
MutableColumnPtr col_to = IColumn::mutate(std::move(buffer.getByPosition(column).column));
for (size_t index = 0; index < rows_data.size(); ++index)
col_to->insert(DB::get<const Tuple &>(rows_data[index])[column]);
writeFieldsToColumn(*col_to, rows_data, column, {});
buffer.getByPosition(column).column = std::move(col_to);
}
@ -387,31 +494,23 @@ static inline bool differenceSortingKeys(const Tuple & row_old_data, const Tuple
static inline size_t onUpdateData(const std::vector<Field> & rows_data, Block & buffer, size_t version, const std::vector<size_t> & sorting_columns_index)
{
if (rows_data.size() % 2 != 0)
throw Exception("LOGICAL ERROR: ", ErrorCodes::LOGICAL_ERROR);
throw Exception("LOGICAL ERROR: It is a bug.", ErrorCodes::LOGICAL_ERROR);
size_t prev_bytes = buffer.bytes();
std::vector<bool> difference_sorting_keys_mark(rows_data.size() / 2);
std::vector<bool> writeable_rows_mask(rows_data.size());
for (size_t index = 0; index < rows_data.size(); index += 2)
difference_sorting_keys_mark[index / 2] = differenceSortingKeys(
{
writeable_rows_mask[index + 1] = true;
writeable_rows_mask[index] = differenceSortingKeys(
DB::get<const Tuple &>(rows_data[index]), DB::get<const Tuple &>(rows_data[index + 1]), sorting_columns_index);
}
for (size_t column = 0; column < buffer.columns() - 2; ++column)
{
MutableColumnPtr col_to = IColumn::mutate(std::move(buffer.getByPosition(column).column));
for (size_t index = 0; index < rows_data.size(); index += 2)
{
if (likely(!difference_sorting_keys_mark[index / 2]))
col_to->insert(DB::get<const Tuple &>(rows_data[index + 1])[column]);
else
{
/// If the sorting keys is modified, we should cancel the old data, but this should not happen frequently
col_to->insert(DB::get<const Tuple &>(rows_data[index])[column]);
col_to->insert(DB::get<const Tuple &>(rows_data[index + 1])[column]);
}
}
writeFieldsToColumn(*col_to, rows_data, column, writeable_rows_mask);
buffer.getByPosition(column).column = std::move(col_to);
}
@ -423,7 +522,7 @@ static inline size_t onUpdateData(const std::vector<Field> & rows_data, Block &
for (size_t index = 0; index < rows_data.size(); index += 2)
{
if (likely(!difference_sorting_keys_mark[index / 2]))
if (likely(!writeable_rows_mask[index]))
{
sign_column_data.emplace_back(1);
version_column_data.emplace_back(version);

View File

@ -47,15 +47,36 @@ static inline NamesAndTypesList getColumnsList(ASTExpressionList * columns_defin
throw Exception("Missing type in definition of column.", ErrorCodes::UNKNOWN_TYPE);
bool is_nullable = true;
bool is_unsigned = false;
if (declare_column->column_options)
{
if (const auto * options = declare_column->column_options->as<MySQLParser::ASTDeclareOptions>();
options && options->changes.count("is_null"))
is_nullable = options->changes.at("is_null")->as<ASTLiteral>()->value.safeGet<UInt64>();
if (const auto * options = declare_column->column_options->as<MySQLParser::ASTDeclareOptions>())
{
if (options->changes.count("is_null"))
is_nullable = options->changes.at("is_null")->as<ASTLiteral>()->value.safeGet<UInt64>();
if (options->changes.count("is_unsigned"))
is_unsigned = options->changes.at("is_unsigned")->as<ASTLiteral>()->value.safeGet<UInt64>();
}
}
ASTPtr data_type = declare_column->data_type;
if (is_unsigned)
{
auto data_type_function = data_type->as<ASTFunction>();
if (data_type_function)
{
String type_name_upper = Poco::toUpper(data_type_function->name);
/// For example(in MySQL): CREATE TABLE test(column_name INT NOT NULL ... UNSIGNED)
if (type_name_upper.find("INT") != std::string::npos && !endsWith(type_name_upper, "SIGNED")
&& !endsWith(type_name_upper, "UNSIGNED"))
data_type_function->name = type_name_upper + " UNSIGNED";
}
}
if (is_nullable)
data_type = makeASTFunction("Nullable", data_type);
@ -72,7 +93,7 @@ static NamesAndTypesList getNames(const ASTFunction & expr, const Context & cont
ASTPtr temp_ast = expr.clone();
auto syntax = SyntaxAnalyzer(context).analyze(temp_ast, columns);
auto expression = ExpressionAnalyzer(temp_ast, syntax, context).getActions(true);
auto expression = ExpressionAnalyzer(temp_ast, syntax, context).getActions(false);
return expression->getRequiredColumnsWithTypes();
}
@ -160,43 +181,68 @@ static ASTPtr getPartitionPolicy(const NamesAndTypesList & primary_keys)
if (type->isNullable())
column = makeASTFunction("assumeNotNull", column);
return makeASTFunction("divide", column, std::make_shared<ASTLiteral>(UInt64(type_max_size / 1000)));
return makeASTFunction("intDiv", column, std::make_shared<ASTLiteral>(UInt64(type_max_size / 1000)));
};
ASTPtr best_partition;
size_t index = 0, best_size = 0;
for (const auto & primary_key : primary_keys)
{
WhichDataType which(primary_key.type);
DataTypePtr type = primary_key.type;
WhichDataType which(type);
if (which.isNullable())
which = WhichDataType((static_cast<const DataTypeNullable &>(*primary_key.type)).getNestedType());
{
type = (static_cast<const DataTypeNullable &>(*type)).getNestedType();
which = WhichDataType(type);
}
if (which.isDateOrDateTime())
{
/// In any case, date or datetime is always the best partitioning key
ASTPtr res = std::make_shared<ASTIdentifier>(primary_key.name);
return makeASTFunction("toYYYYMM", primary_key.type->isNullable() ? makeASTFunction("assumeNotNull", res) : res);
}
if (which.isInt8() || which.isUInt8())
return std::make_shared<ASTIdentifier>(primary_key.name);
else if (which.isInt16() || which.isUInt16())
return numbers_partition(primary_key.name, primary_key.type, std::numeric_limits<UInt16>::max());
else if (which.isInt32() || which.isUInt32())
return numbers_partition(primary_key.name, primary_key.type, std::numeric_limits<UInt32>::max());
else if (which.isInt64() || which.isUInt64())
return numbers_partition(primary_key.name, primary_key.type, std::numeric_limits<UInt64>::max());
if (type->haveMaximumSizeOfValue() && (!best_size || type->getSizeOfValueInMemory() < best_size))
{
if (which.isInt8() || which.isUInt8())
{
best_size = type->getSizeOfValueInMemory();
best_partition = std::make_shared<ASTIdentifier>(primary_key.name);
}
else if (which.isInt16() || which.isUInt16())
{
best_size = type->getSizeOfValueInMemory();
best_partition = numbers_partition(primary_key.name, type, std::numeric_limits<UInt16>::max());
}
else if (which.isInt32() || which.isUInt32())
{
best_size = type->getSizeOfValueInMemory();
best_partition = numbers_partition(primary_key.name, type, std::numeric_limits<UInt32>::max());
}
else if (which.isInt64() || which.isUInt64())
{
best_size = type->getSizeOfValueInMemory();
best_partition = numbers_partition(primary_key.name, type, std::numeric_limits<UInt64>::max());
}
}
}
return {};
return best_partition;
}
static ASTPtr getOrderByPolicy(
const NamesAndTypesList & primary_keys, const NamesAndTypesList & unique_keys, const NamesAndTypesList & keys, const NameSet & increment_columns)
{
NameSet order_by_columns_set;
std::deque<String> order_by_columns;
std::deque<std::vector<String>> order_by_columns_list;
const auto & add_order_by_expression = [&](const NamesAndTypesList & names_and_types)
{
std::vector<String> increment_keys;
std::vector<String> non_increment_keys;
for (const auto & [name, type] : names_and_types)
{
if (order_by_columns_set.contains(name))
@ -204,18 +250,21 @@ static ASTPtr getOrderByPolicy(
if (increment_columns.contains(name))
{
increment_keys.emplace_back(name);
order_by_columns_set.emplace(name);
order_by_columns.emplace_back(name);
}
else
{
order_by_columns_set.emplace(name);
order_by_columns.emplace_front(name);
non_increment_keys.emplace_back(name);
}
}
order_by_columns_list.emplace_back(increment_keys);
order_by_columns_list.emplace_front(non_increment_keys);
};
/// primary_key[not increment], key[not increment], unique[not increment], key[increment], unique[increment], primary_key[increment]
/// primary_key[not increment], key[not increment], unique[not increment], unique[increment], key[increment], primary_key[increment]
add_order_by_expression(unique_keys);
add_order_by_expression(keys);
add_order_by_expression(primary_keys);
@ -224,8 +273,11 @@ static ASTPtr getOrderByPolicy(
order_by_expression->name = "tuple";
order_by_expression->arguments = std::make_shared<ASTExpressionList>();
for (const auto & order_by_column : order_by_columns)
order_by_expression->arguments->children.emplace_back(std::make_shared<ASTIdentifier>(order_by_column));
for (const auto & order_by_columns : order_by_columns_list)
{
for (const auto & order_by_column : order_by_columns)
order_by_expression->arguments->children.emplace_back(std::make_shared<ASTIdentifier>(order_by_column));
}
return order_by_expression;
}

View File

@ -0,0 +1,59 @@
#if !defined(ARCADIA_BUILD)
# include "config_core.h"
#endif
#include <gtest/gtest.h>
#include <Parsers/IAST.h>
#include <Parsers/queryToString.h>
#include <Parsers/ASTExternalDDLQuery.h>
#include <Parsers/ParserExternalDDLQuery.h>
#include <Parsers/parseQuery.h>
#include <Interpreters/Context.h>
#include <Functions/registerFunctions.h>
#include <Interpreters/MySQL/InterpretersMySQLDDLQuery.h>
using namespace DB;
static inline ASTPtr tryRewrittenCreateQuery(const String & query, Context & context)
{
ParserExternalDDLQuery external_ddl_parser;
ASTPtr ast = parseQuery(external_ddl_parser, query, 0, 0);
context.unsafeSetCurrentDatabase("default");
return MySQLInterpreter::InterpreterCreateImpl::getRewrittenQuery(
*ast->as<ASTExternalDDLQuery>()->external_ddl->as<MySQLParser::ASTCreateQuery>(),
context, "test_database", "test_database");
}
TEST(MySQLCreateRewritten, RewrittenQueryWithPrimaryKey)
{
registerFunctions();
auto shared_context = Context::createShared();
auto global_context = std::make_unique<Context>(Context::createGlobal(shared_context.get()));
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"EXTERNAL DDL FROM MySQL(test_database, test_database) CREATE TABLE `test_database`.`test_table_1` (`key` int NOT NULL PRIMARY "
"KEY) ENGINE=InnoDB DEFAULT CHARSET=utf8", *global_context)),
"CREATE TABLE test_database.test_table_1 (`key` Int32, `_sign` Int8, `_version` UInt64) ENGINE = ReplacingMergeTree(_version) "
"PARTITION BY intDiv(key, 4294967) ORDER BY tuple(key)");
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"EXTERNAL DDL FROM MySQL(test_database, test_database) CREATE TABLE `test_database`.`test_table_1` (`key` int NOT NULL, "
" PRIMARY KEY (`key`)) ENGINE=InnoDB DEFAULT CHARSET=utf8", *global_context)),
"CREATE TABLE test_database.test_table_1 (`key` Int32, `_sign` Int8, `_version` UInt64) ENGINE = ReplacingMergeTree(_version) "
"PARTITION BY intDiv(key, 4294967) ORDER BY tuple(key)");
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"EXTERNAL DDL FROM MySQL(test_database, test_database) CREATE TABLE `test_database`.`test_table_1` (`key_1` int NOT NULL, "
" key_2 INT NOT NULL, PRIMARY KEY (`key_1`, `key_2`)) ENGINE=InnoDB DEFAULT CHARSET=utf8", *global_context)),
"CREATE TABLE test_database.test_table_1 (`key_1` Int32, `key_2` Int32, `_sign` Int8, `_version` UInt64) ENGINE = "
"ReplacingMergeTree(_version) PARTITION BY intDiv(key_1, 4294967) ORDER BY (key_1, key_2)");
EXPECT_EQ(queryToString(tryRewrittenCreateQuery(
"EXTERNAL DDL FROM MySQL(test_database, test_database) CREATE TABLE `test_database`.`test_table_1` (`key_1` BIGINT NOT NULL, "
" key_2 INT NOT NULL, PRIMARY KEY (`key_1`, `key_2`)) ENGINE=InnoDB DEFAULT CHARSET=utf8", *global_context)),
"CREATE TABLE test_database.test_table_1 (`key_1` Int64, `key_2` Int32, `_sign` Int8, `_version` UInt64) ENGINE = "
"ReplacingMergeTree(_version) PARTITION BY intDiv(key_2, 4294967) ORDER BY (key_1, key_2)");
}

View File

@ -70,6 +70,8 @@ bool ParserDeclareColumn::parseColumnDeclareOptions(IParser::Pos & pos, ASTPtr &
ParserDeclareOptions p_non_generate_options{
{
OptionDescribe("ZEROFILL", "zero_fill", std::make_unique<ParserAlwaysTrue>()),
OptionDescribe("SIGNED", "is_unsigned", std::make_unique<ParserAlwaysFalse>()),
OptionDescribe("UNSIGNED", "is_unsigned", std::make_unique<ParserAlwaysTrue>()),
OptionDescribe("NULL", "is_null", std::make_unique<ParserAlwaysTrue>()),
OptionDescribe("NOT NULL", "is_null", std::make_unique<ParserAlwaysFalse>()),
OptionDescribe("DEFAULT", "default", std::make_unique<ParserExpression>()),

View File

@ -15,6 +15,74 @@ def check_query(clickhouse_node, query, result_set, retry_count=3, interval_seco
assert lastest_result == result_set
def dml_with_materialize_mysql_database(clickhouse_node, mysql_node, service_name):
mysql_node.query("CREATE DATABASE test_database DEFAULT CHARACTER SET 'utf8'")
# existed before the mapping was created
# TODO: Add check test BIT[(M)] BOOL, BOOLEAN, Enum
mysql_node.query("CREATE TABLE test_database.test_table_1 ("
"`key` INT NOT NULL PRIMARY KEY, "
"unsigned_tiny_int TINYINT UNSIGNED, tiny_int TINYINT, "
"unsigned_small_int SMALLINT UNSIGNED, small_int SMALLINT, "
"unsigned_medium_int MEDIUMINT UNSIGNED, medium_int MEDIUMINT, "
"unsigned_int INT UNSIGNED, _int INT, "
"unsigned_integer INTEGER UNSIGNED, _integer INTEGER, "
"unsigned_bigint BIGINT UNSIGNED, _bigint BIGINT, "
"/* Need ClickHouse support read mysql decimal unsigned_decimal DECIMAL(19, 10) UNSIGNED, _decimal DECIMAL(19, 10), */"
"unsigned_float FLOAT UNSIGNED, _float FLOAT, "
"unsigned_double DOUBLE UNSIGNED, _double DOUBLE, "
"_varchar VARCHAR(10), _char CHAR(10), "
"_date Date, _datetime DateTime, _timestamp TIMESTAMP) ENGINE = InnoDB;")
# it already has some data
mysql_node.query(
"INSERT INTO test_database.test_table_1 VALUES(1, 1, -1, 2, -2, 3, -3, 4, -4, 5, -5, 6, -6, 3.2, -3.2, 3.4, -3.4, 'varchar', 'char', "
"'2020-01-01', '2020-01-01 00:00:00', '2020-01-01 00:00:00');")
clickhouse_node.query("CREATE DATABASE test_database ENGINE = MaterializeMySQL('{}:3306', 'test_database', 'root', 'clickhouse')".format(service_name))
assert "test_database" in clickhouse_node.query("SHOW DATABASES")
check_query(clickhouse_node, "SHOW TABLES FROM test_database FORMAT TSV", "test_table_1\n")
check_query(clickhouse_node, "SELECT * FROM test_database.test_table_1 ORDER BY key FORMAT TSV",
"1\t1\t-1\t2\t-2\t3\t-3\t4\t-4\t5\t-5\t6\t-6\t-1.0842022e-19\t-1.0842022e-19\t3.4\t-3.4\tvarchar\tchar\t2020-01-01\t"
"2020-01-01 00:00:00\t2020-01-01 00:00:00\n")
mysql_node.query(
"INSERT INTO test_database.test_table_1 VALUES(2, 1, -1, 2, -2, 3, -3, 4, -4, 5, -5, 6, -6, 3.2, -3.2, 3.4, -3.4, 'varchar', 'char', "
"'2020-01-01', '2020-01-01 00:00:00', '2020-01-01 00:00:00');")
check_query(clickhouse_node, "SELECT * FROM test_database.test_table_1 ORDER BY key FORMAT TSV",
"1\t1\t-1\t2\t-2\t3\t-3\t4\t-4\t5\t-5\t6\t-6\t-1.0842022e-19\t-1.0842022e-19\t3.4\t-3.4\tvarchar\tchar\t2020-01-01\t"
"2020-01-01 00:00:00\t2020-01-01 00:00:00\n2\t1\t-1\t2\t-2\t3\t-3\t4\t-4\t5\t-5\t6\t-6\t-1.0842022e-19\t-1.0842022e-19\t3.4\t-3.4\t"
"varchar\tchar\t2020-01-01\t2020-01-01 00:00:00\t2020-01-01 00:00:00\n")
mysql_node.query("UPDATE test_database.test_table_1 SET unsigned_tiny_int = 2 WHERE `key` = 1")
check_query(clickhouse_node, "SELECT * FROM test_database.test_table_1 ORDER BY key FORMAT TSV",
"1\t2\t-1\t2\t-2\t3\t-3\t4\t-4\t5\t-5\t6\t-6\t-1.0842022e-19\t-1.0842022e-19\t3.4\t-3.4\tvarchar\tchar\t2020-01-01\t"
"2020-01-01 00:00:00\t2020-01-01 00:00:00\n2\t1\t-1\t2\t-2\t3\t-3\t4\t-4\t5\t-5\t6\t-6\t-1.0842022e-19\t-1.0842022e-19\t3.4\t-3.4\t"
"varchar\tchar\t2020-01-01\t2020-01-01 00:00:00\t2020-01-01 00:00:00\n")
# update primary key
mysql_node.query("UPDATE test_database.test_table_1 SET `key` = 3 WHERE `tiny_int` = -1")
check_query(clickhouse_node, "SELECT * FROM test_database.test_table_1 ORDER BY key FORMAT TSV",
"2\t1\t-1\t2\t-2\t3\t-3\t4\t-4\t5\t-5\t6\t-6\t-1.0842022e-19\t-1.0842022e-19\t3.4\t-3.4\t"
"varchar\tchar\t2020-01-01\t2020-01-01 00:00:00\t2020-01-01 00:00:00\n3\t2\t-1\t2\t-2\t3\t-3\t"
"4\t-4\t5\t-5\t6\t-6\t-1.0842022e-19\t-1.0842022e-19\t3.4\t-3.4\tvarchar\tchar\t2020-01-01\t2020-01-01 00:00:00\t2020-01-01 00:00:00\n")
mysql_node.query('DELETE FROM test_database.test_table_1 WHERE `key` = 2')
check_query(clickhouse_node, "SELECT * FROM test_database.test_table_1 ORDER BY key FORMAT TSV",
"3\t2\t-1\t2\t-2\t3\t-3\t4\t-4\t5\t-5\t6\t-6\t-1.0842022e-19\t-1.0842022e-19\t3.4\t-3.4\tvarchar\tchar\t2020-01-01\t"
"2020-01-01 00:00:00\t2020-01-01 00:00:00\n")
mysql_node.query('DELETE FROM test_database.test_table_1 WHERE `unsigned_tiny_int` = 2')
check_query(clickhouse_node, "SELECT * FROM test_database.test_table_1 ORDER BY key FORMAT TSV", "")
mysql_node.query("DROP DATABASE test_database")
clickhouse_node.query("DROP DATABASE test_database")
def drop_table_with_materialize_mysql_database(clickhouse_node, mysql_node, service_name):
mysql_node.query("CREATE DATABASE test_database DEFAULT CHARACTER SET 'utf8'")
mysql_node.query("CREATE TABLE test_database.test_table_1 (id INT NOT NULL PRIMARY KEY) ENGINE = InnoDB;")

View File

@ -87,6 +87,14 @@ def started_mysql_8_0():
subprocess.check_call(['docker-compose', '-p', cluster.project_name, '-f', docker_compose, 'down', '--volumes', '--remove-orphans'])
def test_materialize_database_dml_with_mysql_5_7(started_cluster, started_mysql_5_7):
materialize_with_ddl.dml_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql5_7")
def test_materialize_database_dml_with_mysql_8_0(started_cluster, started_mysql_8_0):
materialize_with_ddl.dml_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql8_0")
def test_materialize_database_ddl_with_mysql_5_7(started_cluster, started_mysql_5_7):
materialize_with_ddl.drop_table_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql5_7")
materialize_with_ddl.create_table_with_materialize_mysql_database(clickhouse_node, started_mysql_5_7, "mysql5_7")
@ -106,3 +114,4 @@ def test_materialize_database_ddl_with_mysql_8_0(started_cluster, started_mysql_
materialize_with_ddl.alter_drop_column_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql8_0")
materialize_with_ddl.alter_rename_column_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql8_0")
materialize_with_ddl.alter_modify_column_with_materialize_mysql_database(clickhouse_node, started_mysql_8_0, "mysql8_0")