ISSUES-4006 suport synchronous MySQL incremental data

This commit is contained in:
zhang2014 2020-05-29 10:39:22 +08:00
parent 7203c6e186
commit 0c52d425ba
16 changed files with 1011 additions and 333 deletions

View File

@ -17,6 +17,12 @@ MySQLClient::MySQLClient(const String & host_, UInt16 port_, const String & user
client_capability_flags = CLIENT_PROTOCOL_41 | CLIENT_PLUGIN_AUTH | CLIENT_SECURE_CONNECTION;
}
MySQLClient::MySQLClient(MySQLClient && other)
: host(other.host), port(other.port), user(other.user), password(other.password)
, client_capability_flags(other.client_capability_flags)
{
}
void MySQLClient::connect()
{
if (connected)

View File

@ -28,6 +28,8 @@ class MySQLClient
{
public:
MySQLClient(const String & host_, UInt16 port_, const String & user_, const String & password_);
MySQLClient(MySQLClient && other);
void connect();
void disconnect();
void ping();

View File

@ -0,0 +1,55 @@
#include <DataStreams/AddingVersionsBlockOutputStream.h>
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypesNumber.h>
namespace DB
{
void AddingVersionsBlockOutputStream::writePrefix()
{
output->writePrefix();
}
void AddingVersionsBlockOutputStream::writeSuffix()
{
output->writeSuffix();
}
void AddingVersionsBlockOutputStream::flush()
{
output->flush();
}
void AddingVersionsBlockOutputStream::write(const Block & block)
{
/// create_version and delete_version are always in the last place
Block res;
size_t rows = block.rows();
for (size_t index = 0; index < block.columns(); ++index)
res.insert(block.getByPosition(index));
DataTypePtr data_type = std::make_shared<DataTypeUInt64>();
auto create_version = ColumnUInt64::create(rows);
for (size_t index = 0; index < rows; ++index)
create_version->getData()[index] = ++version;
Block header = output->getHeader();
res.insert(ColumnWithTypeAndName(create_version->getPtr(), data_type, header.getByPosition(header.columns() - 2).name));
res.insert(ColumnWithTypeAndName(data_type->createColumnConstWithDefaultValue(rows)->convertToFullColumnIfConst(), data_type, header.getByPosition(header.columns() - 1).name));
output->write(res);
}
Block AddingVersionsBlockOutputStream::getHeader() const
{
Block res;
Block header = output->getHeader();
for (size_t index = 0; index < header.columns() - 2; ++index)
res.insert(header.getByPosition(index));
return res;
}
}

View File

@ -0,0 +1,30 @@
#pragma once
#include <DataStreams/IBlockOutputStream.h>
namespace DB
{
class AddingVersionsBlockOutputStream : public IBlockOutputStream
{
public:
AddingVersionsBlockOutputStream(size_t & version_, const BlockOutputStreamPtr & output_)
: version(version_), output(output_)
{
}
Block getHeader() const override;
void write(const Block & block) override;
void flush() override;
void writePrefix() override;
void writeSuffix() override;
private:
size_t & version;
BlockOutputStreamPtr output;
};
}

View File

@ -23,6 +23,7 @@
# include <Interpreters/evaluateConstantExpression.h>
# include <Common/parseAddress.h>
# include <mysqlxx/Pool.h>
# include <Core/MySQLClient.h>
#endif
namespace DB
@ -118,8 +119,12 @@ DatabasePtr DatabaseFactory::getImpl(const ASTCreateQuery & create, const String
auto mysql_pool = mysqlxx::Pool(mysql_database_name, remote_host_name, mysql_user_name, mysql_user_password, remote_port);
if (materializeMySQLDatabase(engine_define->settings))
{
MySQLClient client(remote_host_name, remote_port, mysql_user_name, mysql_user_password);
return std::make_shared<DatabaseMaterializeMySQL>(
context, database_name, metadata_path, engine_define, mysql_database_name, std::move(mysql_pool));
context, database_name, metadata_path, engine_define, mysql_database_name, std::move(mysql_pool), std::move(client));
}
return std::make_shared<DatabaseConnectionMySQL>(context, database_name, metadata_path, engine_define, mysql_database_name, std::move(mysql_pool));
}

View File

@ -0,0 +1,135 @@
#include <Databases/MySQL/DataBuffers.h>
#include <Storages/IStorage.h>
#include <common/sleep.h>
namespace DB
{
DataBuffers::DataBuffers(size_t & version_, IDatabase * database_, const std::function<void(const std::unordered_map<String, Block> &)> & flush_function_)
: version(version_), database(database_), flush_function(flush_function_)
{
/// TODO: 定时刷新
}
void DataBuffers::flush()
{
flush_function(buffers);
buffers.clear();
}
void DataBuffers::writeData(const std::string & table_name, const std::vector<Field> & rows_data)
{
Block & to = getTableBuffer(table_name, rows_data.size());
for (size_t column = 0; column < to.columns() - 2; ++column)
{
/// normally columns
MutableColumnPtr col_to = (*std::move(to.getByPosition(column).column)).mutate();
for (size_t index = 0; index < rows_data.size(); ++index)
col_to->insert(DB::get<const Tuple &>(rows_data[index])[column]);
}
Field new_version(UInt64(++version));
MutableColumnPtr create_version_column = (*std::move(to.getByPosition(to.columns() - 2)).column).mutate();
MutableColumnPtr delete_version_column = (*std::move(to.getByPosition(to.columns() - 1)).column).mutate();
delete_version_column->insertManyDefaults(rows_data.size());
for (size_t index = 0; index < rows_data.size(); ++index)
create_version_column->insert(new_version);
}
void DataBuffers::updateData(const String & table_name, const std::vector<Field> & rows_data)
{
if (rows_data.size() % 2 != 0)
throw Exception("LOGICAL ERROR: ", ErrorCodes::LOGICAL_ERROR);
Block & to = getTableBuffer(table_name, rows_data.size());
for (size_t column = 0; column < to.columns() - 2; ++column)
{
/// normally columns
MutableColumnPtr col_to = (*std::move(to.getByPosition(column).column)).mutate();
for (size_t index = 0; index < rows_data.size(); ++index)
col_to->insert(DB::get<const Tuple &>(rows_data[index])[column]);
}
Field new_version(UInt64(++version));
MutableColumnPtr create_version_column = (*std::move(to.getByPosition(to.columns() - 2)).column).mutate();
MutableColumnPtr delete_version_column = (*std::move(to.getByPosition(to.columns() - 1)).column).mutate();
for (size_t index = 0; index < rows_data.size(); ++index)
{
if (index % 2 == 0)
{
create_version_column->insertDefault();
delete_version_column->insert(new_version);
}
else
{
delete_version_column->insertDefault();
create_version_column->insert(new_version);
}
}
}
void DataBuffers::deleteData(const String & table_name, const std::vector<Field> & rows_data)
{
Block & to = getTableBuffer(table_name, rows_data.size());
for (size_t column = 0; column < to.columns() - 2; ++column)
{
/// normally columns
MutableColumnPtr col_to = (*std::move(to.getByPosition(column).column)).mutate();
for (size_t index = 0; index < rows_data.size(); ++index)
col_to->insert(DB::get<const Tuple &>(rows_data[index])[column]);
}
Field new_version(UInt64(++version));
MutableColumnPtr create_version_column = (*std::move(to.getByPosition(to.columns() - 2)).column).mutate();
MutableColumnPtr delete_version_column = (*std::move(to.getByPosition(to.columns() - 1)).column).mutate();
create_version_column->insertManyDefaults(rows_data.size());
for (size_t index = 0; index < rows_data.size(); ++index)
delete_version_column->insert(new_version);
}
Block & DataBuffers::getTableBuffer(const String & table_name, size_t write_size)
{
if (buffers.find(table_name) == buffers.end())
{
StoragePtr write_storage = database->tryGetTable(table_name);
buffers[table_name] = write_storage->getSampleBlockNonMaterialized();
}
/// TODO: settings
if (buffers[table_name].rows() + write_size > 8192)
flush();
return buffers[table_name];
}
[[noreturn]] void DataBuffers::scheduleFlush()
{
while (1)
{
try
{
flush();
sleepForSeconds(1);
}
catch (...)
{
// ++error_count;
// sleep_time = std::min(
// std::chrono::milliseconds{Int64(default_sleep_time.count() * std::exp2(error_count))},
// max_sleep_time);
tryLogCurrentException("");
}
}
}
}

View File

@ -0,0 +1,42 @@
#pragma once
#include <unordered_map>
#include <Columns/IColumn.h>
#include <Core/Field.h>
#include <Databases/IDatabase.h>
#include <boost/noncopyable.hpp>
#include <Common/ThreadPool.h>
namespace DB
{
class DataBuffers : private boost::noncopyable
{
public:
DataBuffers(size_t & version_, IDatabase * database_, const std::function<void(const std::unordered_map<String, Block> &)> & flush_function_);
void flush();
void writeData(const std::string & table_name, const std::vector<Field> & rows_data);
void updateData(const std::string & table_name, const std::vector<Field> & rows_data);
void deleteData(const std::string & table_name, const std::vector<Field> & rows_data);
private:
size_t & version;
IDatabase * database;
std::function<void(const std::unordered_map<String, Block> &)> flush_function;
mutable std::mutex mutex;
std::unordered_map<String, Block> buffers;
[[noreturn]] void scheduleFlush();
Block & getTableBuffer(const String & table_name, size_t write_size);
ThreadFromGlobalPool thread{&DataBuffers::scheduleFlush, this};
};
}

View File

@ -6,21 +6,25 @@
#include <Databases/MySQL/DatabaseMaterializeMySQL.h>
# include <cstdlib>
# include <Columns/ColumnTuple.h>
# include <DataStreams/AddingVersionsBlockOutputStream.h>
# include <DataStreams/copyData.h>
# include <DataTypes/DataTypeString.h>
# include <DataTypes/DataTypesNumber.h>
# include <Databases/DatabaseFactory.h>
# include <Databases/MySQL/DataBuffers.h>
# include <Databases/MySQL/MasterStatusInfo.h>
# include <Databases/MySQL/queryConvert.h>
# include <Formats/MySQLBlockInputStream.h>
# include <IO/Operators.h>
# include <IO/ReadBufferFromString.h>
# include <Interpreters/Context.h>
# include <Interpreters/MySQL/CreateQueryConvertVisitor.h>
# include <Interpreters/MySQL/CreateQueryVisitor.h>
# include <Interpreters/executeQuery.h>
# include <Parsers/MySQL/ASTCreateQuery.h>
# include <Parsers/parseQuery.h>
# include <Poco/File.h>
# include <Common/quoteString.h>
# include <Common/setThreadName.h>
# include <common/sleep.h>
namespace DB
{
@ -32,26 +36,25 @@ namespace ErrorCodes
DatabaseMaterializeMySQL::DatabaseMaterializeMySQL(
const Context & context, const String & database_name_, const String & metadata_path_,
const ASTStorage * database_engine_define_, const String & mysql_database_name_, mysqlxx::Pool && pool_)
: DatabaseOrdinary(database_name_, metadata_path_, context)
/*, global_context(context.getGlobalContext()), metadata_path(metadata_path_)*/
, database_engine_define(database_engine_define_->clone()), mysql_database_name(mysql_database_name_), pool(std::move(pool_))
const ASTStorage * database_engine_define_, const String & mysql_database_name_, mysqlxx::Pool && pool_, MySQLClient && client_)
: IDatabase(database_name_), global_context(context.getGlobalContext()), metadata_path(metadata_path_)
, database_engine_define(database_engine_define_->clone()), mysql_database_name(mysql_database_name_)
, nested_database(std::make_shared<DatabaseOrdinary>(database_name_, metadata_path, global_context))
, pool(std::move(pool_)), client(std::move(client_)), log(&Logger::get("DatabaseMaterializeMySQL"))
{
/// TODO: 做简单的check, 失败即报错
}
void DatabaseMaterializeMySQL::tryToExecuteQuery(const String & query_to_execute)
BlockIO DatabaseMaterializeMySQL::tryToExecuteQuery(const String & query_to_execute, const String & comment)
{
ReadBufferFromString istr(query_to_execute);
String dummy_string;
WriteBufferFromString ostr(dummy_string);
String query_prefix = "/*" + comment + " for " + backQuoteIfNeed(database_name) + " Database */ ";
try
{
Context context = global_context;
context.getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
context.setCurrentQueryId(""); // generate random query_id
executeQuery(istr, ostr, false, context, {});
return executeQuery(query_prefix + query_to_execute, context, true);
}
catch (...)
{
@ -62,7 +65,7 @@ void DatabaseMaterializeMySQL::tryToExecuteQuery(const String & query_to_execute
LOG_DEBUG(log, "Executed query: " << query_to_execute);
}
String DatabaseMaterializeMySQL::getCreateQuery(const mysqlxx::Pool::Entry & connection, const String & database, const String & table_name)
String DatabaseMaterializeMySQL::getCreateQuery(const mysqlxx::Pool::Entry & connection, const String & table_name)
{
Block show_create_table_header{
{std::make_shared<DataTypeString>(), "Table"},
@ -77,57 +80,82 @@ String DatabaseMaterializeMySQL::getCreateQuery(const mysqlxx::Pool::Entry & con
if (!create_query_block || create_query_block.rows() != 1)
throw Exception("LOGICAL ERROR mysql show create return more rows.", ErrorCodes::LOGICAL_ERROR);
const auto & create_query = create_query_block.getByName("Create Table").column->getDataAt(0);
MySQLParser::ParserCreateQuery p_create_query;
ASTPtr ast = parseQuery(p_create_query, create_query.data, create_query.data + create_query.size, "", 0, 0);
if (!ast || !ast->as<MySQLParser::ASTCreateQuery>())
throw Exception("LOGICAL ERROR: ast cannot cast to MySQLParser::ASTCreateQuery.", ErrorCodes::LOGICAL_ERROR);
WriteBufferFromOwnString out;
ast->as<MySQLParser::ASTCreateQuery>()->database = database;
MySQLVisitor::CreateQueryConvertVisitor::Data data{.out = out, .context = global_context};
MySQLVisitor::CreateQueryConvertVisitor visitor(data);
visitor.visit(ast);
return out.str();
return create_query_block.getByName("Create Table").column->getDataAt(0).toString();
}
void DatabaseMaterializeMySQL::dumpMySQLDatabase(const std::function<bool()> & is_cancelled)
BlockOutputStreamPtr DatabaseMaterializeMySQL::getTableOutput(const String & table_name, bool fill_version)
{
mysqlxx::PoolWithFailover::Entry connection = pool.get();
Context context = global_context;
context.getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
context.setCurrentQueryId(""); // generate random query_id
MasterStatusInfo info(connection, mysql_database_name);
StoragePtr write_storage = nested_database->tryGetTable(table_name);
auto table_lock = write_storage->lockStructureForShare(true, context.getInitialQueryId(), context.getSettingsRef().lock_acquire_timeout);
for (const auto & dumping_table_name : info.need_dumping_tables)
BlockOutputStreamPtr output = write_storage->write(ASTPtr{}, global_context);
output->addTableLock(table_lock);
return fill_version ? std::make_shared<AddingVersionsBlockOutputStream>(version, output) : output;
}
void DatabaseMaterializeMySQL::dumpDataForTables(mysqlxx::Pool::Entry & connection, const std::vector<String> & tables_name, const std::function<bool()> & is_cancelled)
{
std::unordered_map<String, String> tables_create_query;
for (size_t index = 0; index < tables_name.size() && !is_cancelled(); ++index)
tables_create_query[tables_name[index]] = getCreateQuery(connection, tables_name[index]);
auto iterator = tables_create_query.begin();
for (; iterator != tables_create_query.end() && !is_cancelled(); ++iterator)
{
if (is_cancelled())
return;
const auto & table_name = iterator->first;
MySQLTableStruct table_struct = visitCreateQuery(iterator->second, global_context, database_name);
String comment = String("Dumping ") + backQuoteIfNeed(mysql_database_name) + "." + backQuoteIfNeed(table_name);
tryToExecuteQuery(toCreateQuery(table_struct, global_context), comment);
const auto & table_name = backQuoteIfNeed(database_name) + "." + backQuoteIfNeed(dumping_table_name);
String query_prefix = "/* Dumping " + backQuoteIfNeed(mysql_database_name) + "." + backQuoteIfNeed(dumping_table_name) + " for "
+ backQuoteIfNeed(database_name) + " Database */ ";
tryToExecuteQuery(query_prefix + " DROP TABLE IF EXISTS " + table_name);
tryToExecuteQuery(query_prefix + getCreateQuery(connection, database_name, dumping_table_name));
Context context = global_context;
context.setCurrentQueryId(""); // generate random query_id
context.getClientInfo().query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
BlockIO streams = executeQuery(query_prefix + " INSERT INTO " + table_name + " VALUES", context, true);
if (!streams.out)
throw Exception("LOGICAL ERROR out stream is undefined.", ErrorCodes::LOGICAL_ERROR);
MySQLBlockInputStream input(
connection, "SELECT * FROM " + backQuoteIfNeed(mysql_database_name) + "." + backQuoteIfNeed(dumping_table_name),
streams.out->getHeader(), DEFAULT_BLOCK_SIZE);
copyData(input, *streams.out, is_cancelled);
/// TODO: 启动slave, 监听事件
BlockOutputStreamPtr out = getTableOutput(table_name, true);
MySQLBlockInputStream input(connection, "SELECT * FROM " + backQuoteIfNeed(mysql_database_name) + "." + backQuoteIfNeed(table_name), out->getHeader(), DEFAULT_BLOCK_SIZE);
copyData(input, *out, is_cancelled);
}
}
void DatabaseMaterializeMySQL::synchronization()
MasterStatusInfo DatabaseMaterializeMySQL::prepareSynchronized(std::unique_lock<std::mutex> & lock)
{
while(!sync_quit.load(std::memory_order_seq_cst))
{
try
{
LOG_DEBUG(log, "Checking " + database_name + " database status.");
while (!sync_quit && !DatabaseCatalog::instance().isDatabaseExist(database_name))
sync_cond.wait_for(lock, std::chrono::seconds(1));
LOG_DEBUG(log, database_name + " database status is OK.");
mysqlxx::PoolWithFailover::Entry connection = pool.get();
MasterStatusInfo master_info(connection, getMetadataPath() + "/.master_status", mysql_database_name);
if (!master_info.need_dumping_tables.empty())
{
/// TODO: 删除所有表结构, 这可能需要考虑到仍然有查询在使用这个表.
dumpDataForTables(connection, master_info.need_dumping_tables, [&]() { return sync_quit.load(std::memory_order_seq_cst); });
master_info.finishDump();
}
client.connect();
client.startBinlogDump(std::rand(), mysql_database_name, master_info.binlog_file, master_info.binlog_position);
return master_info;
}
catch(...)
{
tryLogCurrentException(log, "Prepare MySQL Synchronized exception and retry");
sleepForSeconds(1);
}
}
throw Exception("", ErrorCodes::LOGICAL_ERROR);
}
void DatabaseMaterializeMySQL::synchronized()
{
setThreadName("MySQLDBSync");
@ -135,18 +163,55 @@ void DatabaseMaterializeMySQL::synchronization()
{
std::unique_lock<std::mutex> lock{sync_mutex};
LOG_DEBUG(log, "Checking " + database_name + " database status.");
while (!sync_quit && !DatabaseCatalog::instance().isDatabaseExist(database_name))
sync_cond.wait_for(lock, std::chrono::seconds(1));
MasterStatusInfo master_status = prepareSynchronized(lock);
LOG_DEBUG(log, database_name + " database status is OK.");
Poco::File dumped_flag(getMetadataPath() + "/dumped.flag");
if (!dumped_flag.exists())
DataBuffers buffers(version, this, [&](const std::unordered_map<String, Block> & tables_data)
{
dumpMySQLDatabase([&]() { return sync_quit.load(std::memory_order_seq_cst); });
dumped_flag.createFile();
master_status.transaction(client.getPosition(), [&]() /// At least once, There is only one possible reference: https://github.com/ClickHouse/ClickHouse/pull/8467
{
for (const auto & [table_name, data] : tables_data)
{
if (!sync_quit.load(std::memory_order_seq_cst))
{
LOG_DEBUG(log, "Prepare to flush data.");
BlockOutputStreamPtr output = getTableOutput(table_name, false);
output->writePrefix();
output->write(data);
output->writeSuffix();
output->flush();
LOG_DEBUG(log, "Finish data flush.");
}
}
});
});
while (!sync_quit.load(std::memory_order_seq_cst))
{
const auto & event = client.readOneBinlogEvent();
if (event->type() == MYSQL_WRITE_ROWS_EVENT)
{
WriteRowsEvent & write_rows_event = static_cast<WriteRowsEvent &>(*event);
write_rows_event.dump();
buffers.writeData(write_rows_event.table, write_rows_event.rows);
}
else if (event->type() == MYSQL_UPDATE_ROWS_EVENT)
{
UpdateRowsEvent & update_rows_event = static_cast<UpdateRowsEvent &>(*event);
update_rows_event.dump();
buffers.updateData(update_rows_event.table, update_rows_event.rows);
}
else if (event->type() == MYSQL_DELETE_ROWS_EVENT)
{
DeleteRowsEvent & delete_rows_event = static_cast<DeleteRowsEvent &>(*event);
delete_rows_event.dump();
buffers.deleteData(delete_rows_event.table, delete_rows_event.rows);
}
else if (event->type() == MYSQL_QUERY_EVENT)
{
/// TODO: 识别, 查看是否支持的DDL, 支持的话立即刷新当前的数据, 然后执行DDL.
buffers.flush();
}
}
}
catch(...)

View File

@ -3,46 +3,165 @@
#include "config_core.h"
#if USE_MYSQL
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Databases/IDatabase.h>
#include <Databases/DatabaseOrdinary.h>
#include <Parsers/ASTCreateQuery.h>
#include <mysqlxx/Pool.h>
#include <mutex>
# include <mutex>
# include <Core/MySQLClient.h>
# include <DataStreams/BlockIO.h>
# include <DataTypes/DataTypeString.h>
# include <DataTypes/DataTypesNumber.h>
# include <Databases/DatabaseOrdinary.h>
# include <Databases/IDatabase.h>
# include <Databases/MySQL/MasterStatusInfo.h>
# include <Interpreters/MySQL/CreateQueryVisitor.h>
# include <Parsers/ASTCreateQuery.h>
# include <mysqlxx/Pool.h>
# include <mysqlxx/PoolWithFailover.h>
namespace DB
{
class DatabaseMaterializeMySQL : public DatabaseOrdinary
class DatabaseMaterializeMySQL : public IDatabase
{
public:
DatabaseMaterializeMySQL(
const Context & context, const String & database_name_, const String & metadata_path_,
const ASTStorage * database_engine_define_, const String & mysql_database_name_, mysqlxx::Pool && pool_);
const ASTStorage * database_engine_define_, const String & mysql_database_name_, mysqlxx::Pool && pool_,
MySQLClient && client_);
String getEngineName() const override { return "MySQL"; }
void shutdown() override { nested_database->shutdown(); }
bool empty() const override { return nested_database->empty(); }
String getDataPath() const override { return nested_database->getDataPath(); }
String getTableDataPath(const String & string) const override { return nested_database->getTableDataPath(string); }
String getTableDataPath(const ASTCreateQuery & query) const override { return nested_database->getTableDataPath(query); }
UUID tryGetTableUUID(const String & string) const override { return nested_database->tryGetTableUUID(string); }
bool isDictionaryExist(const String & string) const override { return nested_database->isDictionaryExist(string); }
DatabaseDictionariesIteratorPtr getDictionariesIterator(const FilterByNameFunction & filter_by_dictionary_name) override
{
return nested_database->getDictionariesIterator(filter_by_dictionary_name);
}
void createTable(const Context & context, const String & string, const StoragePtr & ptr, const ASTPtr & astPtr) override
{
nested_database->createTable(context, string, ptr, astPtr);
}
void createDictionary(const Context & context, const String & string, const ASTPtr & ptr) override
{
nested_database->createDictionary(context, string, ptr);
}
void dropTable(const Context & context, const String & string, bool no_delay) override
{
nested_database->dropTable(context, string, no_delay);
}
void removeDictionary(const Context & context, const String & string) override { nested_database->removeDictionary(context, string); }
void attachTable(const String & string, const StoragePtr & ptr, const String & relative_table_path) override
{
nested_database->attachTable(string, ptr, relative_table_path);
}
void attachDictionary(const String & string, const DictionaryAttachInfo & info) override { nested_database->attachDictionary(string, info); }
StoragePtr detachTable(const String & string) override { return nested_database->detachTable(string); }
void detachDictionary(const String & string) override { nested_database->detachDictionary(string); }
void renameTable(const Context & context, const String & string, IDatabase & database, const String & string1, bool b) override
{
nested_database->renameTable(context, string, database, string1, b);
}
void alterTable(const Context & context, const StorageID & id, const StorageInMemoryMetadata & metadata) override
{
nested_database->alterTable(context, id, metadata);
}
time_t getObjectMetadataModificationTime(const String & string) const override
{
return nested_database->getObjectMetadataModificationTime(string);
}
Poco::AutoPtr<Poco::Util::AbstractConfiguration> getDictionaryConfiguration(const String & string) const override
{
return nested_database->getDictionaryConfiguration(string);
}
String getMetadataPath() const override { return nested_database->getMetadataPath(); }
String getObjectMetadataPath(const String & string) const override { return nested_database->getObjectMetadataPath(string); }
bool shouldBeEmptyOnDetach() const override { return nested_database->shouldBeEmptyOnDetach(); }
void drop(const Context & context) override { nested_database->drop(context); }
bool isTableExist(const String & name) const override { return nested_database->isTableExist(name); }
StoragePtr tryGetTable(const String & name) const override { return nested_database->tryGetTable(name); }
void loadStoredObjects(Context & context, bool b) override
{
try
{
LOG_DEBUG(log, "Loading MySQL nested database stored objects.");
nested_database->loadStoredObjects(context, b);
LOG_DEBUG(log, "Loaded MySQL nested database stored objects.");
}
catch (...)
{
tryLogCurrentException(log, "Cannot load MySQL nested database stored objects.");
throw;
}
}
ASTPtr getCreateDatabaseQuery() const override
{
const auto & create_query = std::make_shared<ASTCreateQuery>();
create_query->database = database_name;
create_query->set(create_query->storage, database_engine_define);
return create_query;
}
DatabaseTablesIteratorPtr getTablesIterator(const FilterByNameFunction & filter_by_table_name) override { return nested_database->getTablesIterator(filter_by_table_name); }
private:
// const Context & global_context;
// String metadata_path;
const Context & global_context;
String metadata_path;
ASTPtr database_engine_define;
String mysql_database_name;
DatabasePtr nested_database;
size_t version{0};
mutable mysqlxx::Pool pool;
mutable MySQLClient client;
void synchronization();
Poco::Logger * log;
void tryToExecuteQuery(const String & query_to_execute);
void synchronized();
void dumpMySQLDatabase(const std::function<bool()> & is_cancelled);
MasterStatusInfo prepareSynchronized(std::unique_lock<std::mutex> & lock);
String getCreateQuery(const mysqlxx::Pool::Entry & connection, const String & database, const String & table_name);
BlockOutputStreamPtr getTableOutput(const String & table_name, bool fill_version);
BlockIO tryToExecuteQuery(const String & query_to_execute, const String & comment);
String getCreateQuery(const mysqlxx::Pool::Entry & connection, const String & table_name);
void dumpDataForTables(mysqlxx::Pool::Entry & connection, const std::vector<String> & tables_name, const std::function<bool()> & is_cancelled);
mutable std::mutex sync_mutex;
std::atomic<bool> sync_quit{false};
std::condition_variable sync_cond;
ThreadFromGlobalPool thread{&DatabaseMaterializeMySQL::synchronization, this};
ThreadFromGlobalPool thread{&DatabaseMaterializeMySQL::synchronized, this};
};
}

View File

@ -4,15 +4,13 @@
#include <Databases/MySQL/MasterStatusInfo.h>
#include <Formats/MySQLBlockInputStream.h>
#include <Common/quoteString.h>
#include <Poco/File.h>
#include <IO/Operators.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteBufferFromFile.h>
namespace DB
{
/*MasterStatusInfo::MasterStatusInfo(
String binlog_file_, UInt64 binlog_position_, String binlog_do_db_, String binlog_ignore_db_, String executed_gtid_set_)
: binlog_file(binlog_file_), binlog_position(binlog_position_), binlog_do_db(binlog_do_db_), binlog_ignore_db(binlog_ignore_db_),
executed_gtid_set(executed_gtid_set_)
{
}*/
static std::vector<String> fetchTablesInDB(const mysqlxx::PoolWithFailover::Entry & connection, const std::string & database)
{
@ -31,42 +29,15 @@ static std::vector<String> fetchTablesInDB(const mysqlxx::PoolWithFailover::Entr
return tables_in_db;
}
MasterStatusInfo::MasterStatusInfo(mysqlxx::PoolWithFailover::Entry & connection, const String & database)
{
bool locked_tables = false;
try
{
connection->query("FLUSH TABLES;").execute();
connection->query("FLUSH TABLES WITH READ LOCK;").execute();
locked_tables = true;
fetchMasterStatus(connection);
connection->query("SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ;").execute();
connection->query("START TRANSACTION /*!40100 WITH CONSISTENT SNAPSHOT */;").execute();
need_dumping_tables = fetchTablesInDB(connection, database);
connection->query("UNLOCK TABLES;").execute();
}
catch (...)
{
if (locked_tables)
connection->query("UNLOCK TABLES;").execute();
throw;
}
}
void MasterStatusInfo::fetchMasterStatus(mysqlxx::PoolWithFailover::Entry & connection)
{
Block header
{
{std::make_shared<DataTypeString>(), "File"},
{std::make_shared<DataTypeUInt64>(), "Position"},
{std::make_shared<DataTypeString>(), "Binlog_Do_DB"},
{std::make_shared<DataTypeString>(), "Binlog_Ignore_DB"},
{std::make_shared<DataTypeString>(), "Executed_Gtid_Set"},
};
Block header{
{std::make_shared<DataTypeString>(), "File"},
{std::make_shared<DataTypeUInt64>(), "Position"},
{std::make_shared<DataTypeString>(), "Binlog_Do_DB"},
{std::make_shared<DataTypeString>(), "Binlog_Ignore_DB"},
{std::make_shared<DataTypeString>(), "Executed_Gtid_Set"},
};
MySQLBlockInputStream input(connection, "SHOW MASTER STATUS;", header, DEFAULT_BLOCK_SIZE);
Block master_status = input.read();
@ -81,4 +52,99 @@ void MasterStatusInfo::fetchMasterStatus(mysqlxx::PoolWithFailover::Entry & conn
executed_gtid_set = (*master_status.getByPosition(4).column)[0].safeGet<String>();
}
bool MasterStatusInfo::checkBinlogFileExists(mysqlxx::PoolWithFailover::Entry & connection)
{
Block header{
{std::make_shared<DataTypeString>(), "Log_name"},
{std::make_shared<DataTypeUInt64>(), "File_size"},
{std::make_shared<DataTypeString>(), "Encrypted"}
};
MySQLBlockInputStream input(connection, "SHOW MASTER LOGS", header, DEFAULT_BLOCK_SIZE);
while (Block block = input.read())
{
for (size_t index = 0; index < block.rows(); ++index)
{
const auto & log_name = (*block.getByPosition(0).column)[index].safeGet<String>();
if (log_name == binlog_file)
return true;
}
}
return false;
}
void MasterStatusInfo::finishDump()
{
WriteBufferFromFile out(persistent_path);
out << "Version:\t1\n"
<< "Binlog File:\t" << binlog_file << "\nBinlog Position:\t" << binlog_position << "\nBinlog Do DB:\t" << binlog_do_db
<< "\nBinlog Ignore DB:\t" << binlog_ignore_db << "\nExecuted GTID SET:\t" << executed_gtid_set;
out.next();
out.sync();
}
void MasterStatusInfo::transaction(const MySQLReplication::Position & position, const std::function<void()> & fun)
{
binlog_file = position.binlog_name;
binlog_position = position.binlog_pos;
{
Poco::File temp_file(persistent_path + ".temp");
if (temp_file.exists())
temp_file.remove();
}
WriteBufferFromFile out(persistent_path + ".temp");
out << "Version:\t1\n"
<< "Binlog File:\t" << binlog_file << "\nBinlog Position:\t" << binlog_position << "\nBinlog Do DB:\t" << binlog_do_db
<< "\nBinlog Ignore DB:\t" << binlog_ignore_db << "\nExecuted GTID SET:\t" << executed_gtid_set;
out.next();
out.sync();
fun();
Poco::File(persistent_path + ".temp").renameTo(persistent_path);
}
MasterStatusInfo::MasterStatusInfo(mysqlxx::PoolWithFailover::Entry & connection, const String & path_, const String & database)
: persistent_path(path_)
{
if (Poco::File(persistent_path).exists())
{
ReadBufferFromFile in(persistent_path);
in >> "Version:\t1\n" >> "Binlog File:\t" >> binlog_file >> "\nBinlog Position:\t" >> binlog_position >> "\nBinlog Do DB:\t"
>> binlog_do_db >> "\nBinlog Ignore DB:\t" >> binlog_ignore_db >> "\nExecuted GTID SET:\t" >> executed_gtid_set;
if (checkBinlogFileExists(connection))
{
std::cout << "Load From File \n";
return;
}
}
bool locked_tables = false;
try
{
connection->query("FLUSH TABLES;").execute();
connection->query("FLUSH TABLES WITH READ LOCK;").execute();
locked_tables = true;
fetchMasterStatus(connection);
connection->query("SET SESSION TRANSACTION ISOLATION LEVEL REPEATABLE READ;").execute();
connection->query("START TRANSACTION /*!40100 WITH CONSISTENT SNAPSHOT */;").execute();
need_dumping_tables = fetchTablesInDB(connection, database);
connection->query("UNLOCK TABLES;").execute();
/// TODO: 拉取建表语句, 解析并构建出表结构(列列表, 主键, 唯一索引, 分区键)
}
catch (...)
{
if (locked_tables)
connection->query("UNLOCK TABLES;").execute();
throw;
}
}
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <Core/Types.h>
#include <Core/MySQLReplication.h>
#include <mysqlxx/Connection.h>
#include <mysqlxx/PoolWithFailover.h>
@ -9,6 +10,8 @@ namespace DB
struct MasterStatusInfo
{
const String persistent_path;
String binlog_file;
UInt64 binlog_position;
String binlog_do_db;
@ -17,10 +20,16 @@ struct MasterStatusInfo
std::vector<String> need_dumping_tables;
MasterStatusInfo(mysqlxx::PoolWithFailover::Entry & connection, const String & database);
void finishDump();
void fetchMasterStatus(mysqlxx::PoolWithFailover::Entry & connection);
bool checkBinlogFileExists(mysqlxx::PoolWithFailover::Entry & connection);
void transaction(const MySQLReplication::Position & position, const std::function<void()> & fun);
MasterStatusInfo(mysqlxx::PoolWithFailover::Entry & connection, const String & path, const String & database);
};

View File

@ -0,0 +1,152 @@
#include <Databases/MySQL/queryConvert.h>
#include <IO/Operators.h>
#include <Common/quoteString.h>
namespace DB
{
ASTPtr getFormattedOrderByExpression(const MySQLTableStruct & table_struct)
{
if (table_struct.primary_keys.empty())
return makeASTFunction("tuple");
/// TODO: support unique key & key
const auto function = std::make_shared<ASTFunction>();
function->name = "tuple";
function->arguments = std::make_shared<ASTExpressionList>();
function->children.push_back(function->arguments);
function->arguments->children = table_struct.primary_keys;
return function;
}
template <typename TType>
Field choiceBetterRangeSize(TType min, TType max, size_t max_ranges, size_t min_size_pre_range)
{
UInt64 interval = UInt64(max) - min;
size_t calc_rows_pre_range = std::ceil(interval / double(max_ranges));
size_t rows_pre_range = std::max(min_size_pre_range, calc_rows_pre_range);
if (rows_pre_range >= interval)
return Null();
return rows_pre_range > std::numeric_limits<TType>::max() ? Field(UInt64(rows_pre_range)) : Field(TType(rows_pre_range));
}
ASTPtr getFormattedPartitionByExpression(const MySQLTableStruct & table_struct, const Context & context, size_t max_ranges, size_t min_rows_pre_range)
{
ASTPtr partition_columns = std::make_shared<ASTExpressionList>();
if (!table_struct.partition_keys.empty())
partition_columns->children = table_struct.partition_keys;
else if (!table_struct.primary_keys.empty())
{
ASTPtr expr_list = std::make_shared<ASTExpressionList>();
expr_list->children = table_struct.primary_keys;
auto syntax = SyntaxAnalyzer(context).analyze(expr_list, table_struct.columns_name_and_type);
auto index_expr = ExpressionAnalyzer(expr_list, syntax, context).getActions(false);
const NamesAndTypesList & required_names_and_types = index_expr->getRequiredColumnsWithTypes();
const auto & addPartitionColumn = [&](const String & column_name, const DataTypePtr & type, Field better_pre_range_size)
{
partition_columns->children.emplace_back(std::make_shared<ASTIdentifier>(column_name));
if (type->isNullable())
partition_columns->children.back() = makeASTFunction("assumeNotNull", partition_columns->children.back());
if (!better_pre_range_size.isNull())
partition_columns->children.back()
= makeASTFunction("divide", partition_columns->children.back(), std::make_shared<ASTLiteral>(better_pre_range_size));
};
for (const auto & required_name_and_type : required_names_and_types)
{
DataTypePtr assume_not_null = required_name_and_type.type;
if (assume_not_null->isNullable())
assume_not_null = (static_cast<const DataTypeNullable &>(*assume_not_null)).getNestedType();
WhichDataType which(assume_not_null);
if (which.isInt8())
addPartitionColumn(required_name_and_type.name, required_name_and_type.type, choiceBetterRangeSize<Int8>(
std::numeric_limits<Int8>::min(), std::numeric_limits<Int8>::max(), max_ranges, min_rows_pre_range));
else if (which.isInt16())
addPartitionColumn(required_name_and_type.name, required_name_and_type.type, choiceBetterRangeSize<Int16>(
std::numeric_limits<Int16>::min(), std::numeric_limits<Int16>::max(), max_ranges, min_rows_pre_range));
else if (which.isInt32())
addPartitionColumn(required_name_and_type.name, required_name_and_type.type, choiceBetterRangeSize<Int32>(
std::numeric_limits<Int32>::min(), std::numeric_limits<Int32>::max(), max_ranges, min_rows_pre_range));
else if (which.isInt64())
addPartitionColumn(required_name_and_type.name, required_name_and_type.type, choiceBetterRangeSize<Int64>(
std::numeric_limits<Int64>::min(), std::numeric_limits<Int64>::max(), max_ranges, min_rows_pre_range));
else if (which.isUInt8())
addPartitionColumn(required_name_and_type.name, required_name_and_type.type, choiceBetterRangeSize<UInt8>(
std::numeric_limits<UInt8>::min(), std::numeric_limits<UInt8>::max(), max_ranges, min_rows_pre_range));
else if (which.isUInt16())
addPartitionColumn(required_name_and_type.name, required_name_and_type.type, choiceBetterRangeSize<UInt16>(
std::numeric_limits<UInt16>::min(), std::numeric_limits<UInt16>::max(), max_ranges, min_rows_pre_range));
else if (which.isUInt32())
addPartitionColumn(required_name_and_type.name, required_name_and_type.type, choiceBetterRangeSize<UInt32>(
std::numeric_limits<UInt32>::min(), std::numeric_limits<UInt32>::max(), max_ranges, min_rows_pre_range));
else if (which.isUInt64())
addPartitionColumn(required_name_and_type.name, required_name_and_type.type, choiceBetterRangeSize<UInt64>(
std::numeric_limits<UInt64>::min(), std::numeric_limits<UInt64>::max(), max_ranges, min_rows_pre_range));
else if (which.isDateOrDateTime())
{
partition_columns->children.emplace_back(std::make_shared<ASTIdentifier>(required_name_and_type.name));
if (required_name_and_type.type->isNullable())
partition_columns->children.back() = makeASTFunction("assumeNotNull", partition_columns->children.back());
partition_columns->children.back() = makeASTFunction("toYYYYMM", partition_columns->children.back());
}
}
}
const auto function = std::make_shared<ASTFunction>();
function->name = "tuple";
function->arguments = partition_columns;
function->children.push_back(function->arguments);
return function;
}
String getUniqueColumnName(NamesAndTypesList columns_name_and_type, const String & prefix)
{
const auto & is_unique = [&](const String & column_name)
{
for (const auto & column_name_and_type : columns_name_and_type)
{
if (column_name_and_type.name == column_name)
return false;
}
return true;
};
if (is_unique(prefix))
return prefix;
for (size_t index = 0; ; ++index)
{
const String & cur_name = prefix + "_" + toString(index);
if (is_unique(cur_name))
return cur_name;
}
}
String toCreateQuery(const MySQLTableStruct & table_struct, const Context & context)
{
/// TODO: settings
String create_version = getUniqueColumnName(table_struct.columns_name_and_type, "_create_version");
String delete_version = getUniqueColumnName(table_struct.columns_name_and_type, "_delete_version");
WriteBufferFromOwnString out;
out << "CREATE TABLE " << (table_struct.if_not_exists ? "IF NOT EXISTS" : "")
<< backQuoteIfNeed(table_struct.database_name) + "." << backQuoteIfNeed(table_struct.table_name) << "("
<< queryToString(InterpreterCreateQuery::formatColumns(table_struct.columns_name_and_type))
<< ", " << create_version << " UInt64, " << delete_version << " UInt64" << ") ENGINE = MergeTree()"
<< " PARTITION BY " << queryToString(getFormattedPartitionByExpression(table_struct, context, 1000, 50000))
<< " ORDER BY " << queryToString(getFormattedOrderByExpression(table_struct));
return out.str();
}
}

View File

@ -0,0 +1,19 @@
#pragma once
#include <Core/Types.h>
#include <DataTypes/DataTypeNullable.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/MySQL/CreateQueryVisitor.h>
#include <Parsers/queryToString.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTIdentifier.h>
#include <Interpreters/ExpressionActions.h>
namespace DB
{
String toCreateQuery(const MySQLTableStruct & table_struct, const Context & context);
}

View File

@ -1,13 +1,13 @@
#include <Interpreters/MySQL/CreateQueryConvertVisitor.h>
#include <Common/quoteString.h>
#include <IO/Operators.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/queryToString.h>
#include <Parsers/ASTLiteral.h>
#include <Interpreters/MySQL/CreateQueryVisitor.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/MySQL/ASTDeclareIndex.h>
#include <Parsers/MySQL/ASTDeclareOption.h>
#include <Parsers/queryToString.h>
#include <Poco/String.h>
#include <Common/quoteString.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeNullable.h>
@ -94,14 +94,9 @@ void CreateQueryMatcher::visit(const MySQLParser::ASTCreateQuery & create, const
if (create.partition_options)
visit(*create.partition_options->as<MySQLParser::ASTDeclarePartitionOptions>(), create.partition_options, data);
auto expression_list = std::make_shared<ASTExpressionList>();
expression_list->children = data.primary_keys;
data.out << "CREATE TABLE " << (create.if_not_exists ? "IF NOT EXISTS" : "")
<< (create.database.empty() ? "" : backQuoteIfNeed(create.database) + ".") << backQuoteIfNeed(create.table)
<< "(" << queryToString(InterpreterCreateQuery::formatColumns(data.columns_name_and_type)) << ") ENGINE = MergeTree()"
" PARTITION BY " << queryToString(data.getFormattedPartitionByExpression())
<< " ORDER BY " << queryToString(data.getFormattedOrderByExpression());
data.table_name = create.table;
data.database_name = create.database;
data.if_not_exists = create.if_not_exists;
}
void CreateQueryMatcher::visit(const MySQLParser::ASTDeclareIndex & declare_index, const ASTPtr &, Data & data)
@ -201,111 +196,39 @@ void CreateQueryMatcher::Data::addPartitionKey(const ASTPtr & partition_key)
partition_keys.emplace_back(partition_key);
}
ASTPtr CreateQueryMatcher::Data::getFormattedOrderByExpression()
}
bool MySQLTableStruct::operator==(const MySQLTableStruct & other) const
{
if (primary_keys.empty())
return makeASTFunction("tuple");
const auto & this_expression = std::make_shared<ASTExpressionList>();
this_expression->children.insert(this_expression->children.begin(), primary_keys.begin(), primary_keys.end());
this_expression->children.insert(this_expression->children.begin(), partition_keys.begin(), partition_keys.end());
/// TODO: support unique key & key
const auto function = std::make_shared<ASTFunction>();
function->name = "tuple";
function->arguments = std::make_shared<ASTExpressionList>();
function->children.push_back(function->arguments);
function->arguments->children = primary_keys;
const auto & other_expression = std::make_shared<ASTExpressionList>();
other_expression->children.insert(other_expression->children.begin(), other.primary_keys.begin(), other.primary_keys.end());
other_expression->children.insert(other_expression->children.begin(), other.partition_keys.begin(), other.partition_keys.end());
return function;
return queryToString(this_expression) == queryToString(other_expression) && columns_name_and_type == other.columns_name_and_type;
}
template <typename TType>
Field choiceBetterRangeSize(TType min, TType max, size_t max_ranges, size_t min_size_pre_range)
MySQLTableStruct visitCreateQuery(ASTPtr & create_query, const Context & context, const std::string & new_database)
{
UInt64 interval = UInt64(max) - min;
size_t calc_rows_pre_range = std::ceil(interval / double(max_ranges));
size_t rows_pre_range = std::max(min_size_pre_range, calc_rows_pre_range);
if (rows_pre_range >= interval)
return Null();
return rows_pre_range > std::numeric_limits<TType>::max() ? Field(UInt64(rows_pre_range)) : Field(TType(rows_pre_range));
create_query->as<MySQLParser::ASTCreateQuery>()->database = new_database;
MySQLVisitor::CreateQueryVisitor::Data table_struct(context);
MySQLVisitor::CreateQueryVisitor visitor(table_struct);
visitor.visit(create_query);
return std::move(table_struct);
}
ASTPtr CreateQueryMatcher::Data::getFormattedPartitionByExpression()
MySQLTableStruct visitCreateQuery(const String & create_query, const Context & context, const std::string & new_database)
{
ASTPtr partition_columns = std::make_shared<ASTExpressionList>();
MySQLParser::ParserCreateQuery p_create_query;
ASTPtr ast_create_query = parseQuery(p_create_query, create_query.data(), create_query.data() + create_query.size(), "", 0, 0);
if (!partition_keys.empty())
partition_columns->children = partition_keys;
else if (!primary_keys.empty())
{
ASTPtr expr_list = std::make_shared<ASTExpressionList>();
expr_list->children = primary_keys;
auto syntax = SyntaxAnalyzer(context).analyze(expr_list, columns_name_and_type);
auto index_expr = ExpressionAnalyzer(expr_list, syntax, context).getActions(false);
const NamesAndTypesList & required_names_and_types = index_expr->getRequiredColumnsWithTypes();
const auto & addPartitionColumn = [&](const String & column_name, const DataTypePtr & type, Field better_pre_range_size)
{
partition_columns->children.emplace_back(std::make_shared<ASTIdentifier>(column_name));
if (type->isNullable())
partition_columns->children.back() = makeASTFunction("assumeNotNull", partition_columns->children.back());
if (!better_pre_range_size.isNull())
partition_columns->children.back()
= makeASTFunction("divide", partition_columns->children.back(), std::make_shared<ASTLiteral>(better_pre_range_size));
};
for (const auto & required_name_and_type : required_names_and_types)
{
DataTypePtr assume_not_null = required_name_and_type.type;
if (assume_not_null->isNullable())
assume_not_null = (static_cast<const DataTypeNullable &>(*assume_not_null)).getNestedType();
WhichDataType which(assume_not_null);
if (which.isInt8())
addPartitionColumn(required_name_and_type.name, required_name_and_type.type, choiceBetterRangeSize<Int8>(
std::numeric_limits<Int8>::min(), std::numeric_limits<Int8>::max(), max_ranges, min_rows_pre_range));
else if (which.isInt16())
addPartitionColumn(required_name_and_type.name, required_name_and_type.type, choiceBetterRangeSize<Int16>(
std::numeric_limits<Int16>::min(), std::numeric_limits<Int16>::max(), max_ranges, min_rows_pre_range));
else if (which.isInt32())
addPartitionColumn(required_name_and_type.name, required_name_and_type.type, choiceBetterRangeSize<Int32>(
std::numeric_limits<Int32>::min(), std::numeric_limits<Int32>::max(), max_ranges, min_rows_pre_range));
else if (which.isInt64())
addPartitionColumn(required_name_and_type.name, required_name_and_type.type, choiceBetterRangeSize<Int64>(
std::numeric_limits<Int64>::min(), std::numeric_limits<Int64>::max(), max_ranges, min_rows_pre_range));
else if (which.isUInt8())
addPartitionColumn(required_name_and_type.name, required_name_and_type.type, choiceBetterRangeSize<UInt8>(
std::numeric_limits<UInt8>::min(), std::numeric_limits<UInt8>::max(), max_ranges, min_rows_pre_range));
else if (which.isUInt16())
addPartitionColumn(required_name_and_type.name, required_name_and_type.type, choiceBetterRangeSize<UInt16>(
std::numeric_limits<UInt16>::min(), std::numeric_limits<UInt16>::max(), max_ranges, min_rows_pre_range));
else if (which.isUInt32())
addPartitionColumn(required_name_and_type.name, required_name_and_type.type, choiceBetterRangeSize<UInt32>(
std::numeric_limits<UInt32>::min(), std::numeric_limits<UInt32>::max(), max_ranges, min_rows_pre_range));
else if (which.isUInt64())
addPartitionColumn(required_name_and_type.name, required_name_and_type.type, choiceBetterRangeSize<UInt64>(
std::numeric_limits<UInt64>::min(), std::numeric_limits<UInt64>::max(), max_ranges, min_rows_pre_range));
else if (which.isDateOrDateTime())
{
partition_columns->children.emplace_back(std::make_shared<ASTIdentifier>(required_name_and_type.name));
if (required_name_and_type.type->isNullable())
partition_columns->children.back() = makeASTFunction("assumeNotNull", partition_columns->children.back());
partition_columns->children.back() = makeASTFunction("toYYYYMM", partition_columns->children.back());
}
}
}
const auto function = std::make_shared<ASTFunction>();
function->name = "tuple";
function->arguments = partition_columns;
function->children.push_back(function->arguments);
return function;
}
if (!ast_create_query || !ast_create_query->as<MySQLParser::ASTCreateQuery>())
throw Exception("LOGICAL ERROR: ast cannot cast to MySQLParser::ASTCreateQuery.", ErrorCodes::LOGICAL_ERROR);
return visitCreateQuery(ast_create_query, context, new_database);
}
}

View File

@ -2,15 +2,34 @@
#include <Core/NamesAndTypes.h>
#include <Interpreters/InDepthNodeVisitor.h>
#include <Parsers/MySQL/ASTCreateQuery.h>
#include <Parsers/MySQL/ASTCreateDefines.h>
#include <Parsers/MySQL/ASTDeclareIndex.h>
#include <Parsers/MySQL/ASTCreateQuery.h>
#include <Parsers/MySQL/ASTDeclareColumn.h>
#include <Parsers/MySQL/ASTDeclareIndex.h>
#include <Parsers/MySQL/ASTDeclarePartitionOptions.h>
#include <Parsers/parseQuery.h>
namespace DB
{
struct MySQLTableStruct
{
bool if_not_exists;
String table_name;
String database_name;
ASTs primary_keys;
ASTs partition_keys;
NamesAndTypesList columns_name_and_type;
MySQLTableStruct() {}
MySQLTableStruct(const ASTs & primary_keys_, const ASTs & partition_keys_, const NamesAndTypesList & columns_name_and_type_)
: primary_keys(primary_keys_), partition_keys(partition_keys_), columns_name_and_type(columns_name_and_type_)
{}
bool operator==(const MySQLTableStruct & other) const;
};
namespace MySQLVisitor
{
@ -20,25 +39,17 @@ class CreateQueryMatcher
public:
using Visitor = InDepthNodeVisitor<CreateQueryMatcher, false>;
struct Data
struct Data : public MySQLTableStruct
{
/// SETTINGS
WriteBuffer & out;
const Context & context;
size_t max_ranges;
size_t min_rows_pre_range;
ASTs primary_keys;
ASTs partition_keys;
NamesAndTypesList columns_name_and_type;
Data(const Context & context_) : MySQLTableStruct(), context(context_) {}
void addPrimaryKey(const ASTPtr & primary_key);
void addPartitionKey(const ASTPtr & partition_key);
ASTPtr getFormattedOrderByExpression();
ASTPtr getFormattedPartitionByExpression();
};
static void visit(ASTPtr & ast, Data & data);
@ -56,10 +67,12 @@ private:
static void visit(const MySQLParser::ASTDeclarePartitionOptions & declare_partition_options, const ASTPtr &, Data & data);
};
using CreateQueryConvertVisitor = CreateQueryMatcher::Visitor;
using CreateQueryVisitor = CreateQueryMatcher::Visitor;
}
MySQLTableStruct visitCreateQuery(ASTPtr & create_query, const Context & context, const std::string & new_database);
MySQLTableStruct visitCreateQuery(const String & create_query, const Context & context, const std::string & new_database);
}

View File

@ -1,150 +1,187 @@
#include <gtest/gtest.h>
#include <Parsers/parseQuery.h>
#include <DataTypes/DataTypeFactory.h>
#include <IO/WriteBufferFromString.h>
#include <Parsers/MySQL/ASTCreateQuery.h>
#include <Interpreters/Context.h>
#include <Interpreters/MySQL/CreateQueryConvertVisitor.h>
#include <Interpreters/MySQL/CreateQueryVisitor.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/MySQL/ASTCreateQuery.h>
#include <Parsers/parseQuery.h>
#include <gtest/gtest.h>
using namespace DB;
using namespace MySQLParser;
using namespace MySQLVisitor;
static DataTypePtr getType(const String & data_type)
{
return DataTypeFactory::instance().get(data_type);
}
static ContextShared * contextShared()
{
static SharedContextHolder shared = Context::createShared();
return shared.get();
}
static String convert(const String & input)
static MySQLTableStruct visitQuery(const String & query)
{
ParserCreateQuery p_create_query;
ASTPtr ast = parseQuery(p_create_query, input.data(), input.data() + input.size(), "", 0, 0);
ASTPtr ast = parseQuery(p_create_query, query.data(), query.data() + query.size(), "", 0, 0);
WriteBufferFromOwnString out;
CreateQueryConvertVisitor::Data data{
.out = out, .context = Context::createGlobal(contextShared()), .max_ranges = 1000, .min_rows_pre_range = 1000000};
CreateQueryConvertVisitor visitor(data);
CreateQueryVisitor::Data data(Context::createGlobal(contextShared()));
data.max_ranges = 1000;
data.min_rows_pre_range = 1000000;
CreateQueryVisitor visitor(data);
visitor.visit(ast);
return out.str();
return std::move(data);
}
TEST(CreateQueryConvert, TestConvertNumberColumnsType)
TEST(CreateQueryVisitor, TestWithNumberColumnsType)
{
EXPECT_EQ(
convert("CREATE TABLE test(a tinyint, b SMALLINT, c MEDIUMINT, d INT, e INTEGER, f BIGINT, g DECIMAL, h DEC, i NUMERIC, j FIXED, k "
"FLOAT, l DOUBLE, m DOUBLE PRECISION, n REAL)"),
"CREATE TABLE test(`a` Nullable(Int8), `b` Nullable(Int16), `c` Nullable(Int32), `d` Nullable(Int32), `e` Nullable(Int32), `f` "
"Nullable(Int64), `g` Nullable(Decimal(10, 0)), `h` Nullable(Decimal(10, 0)), `i` Nullable(Decimal(10, 0)), `j` "
"Nullable(Decimal(10, 0)), `k` Nullable(Float32), `l` Nullable(Float64), `m` Nullable(Float64), `n` Nullable(Float64)) ENGINE = "
"MergeTree() PARTITION BY tuple() ORDER BY tuple()");
visitQuery("CREATE TABLE test(a tinyint, b SMALLINT, c MEDIUMINT, d INT, e INTEGER, f BIGINT, g DECIMAL, h DEC, i NUMERIC, j "
"FIXED, k FLOAT, l DOUBLE, m DOUBLE PRECISION, n REAL)"),
MySQLTableStruct(ASTs{}, ASTs{}, {{"a", getType("Nullable(Int8)")}, {"b", getType("Nullable(Int16)")}
, {"c", getType("Nullable(Int32)")}, {"d", getType("Nullable(Int32)")}, {"e", getType("Nullable(Int32)")}
, {"f", getType("Nullable(Int64)")}, {"g", getType("Nullable(Decimal(10, 0))")}, {"h", getType("Nullable(Decimal(10, 0))")}
, {"i", getType("Nullable(Decimal(10, 0))")}, {"j", getType("Nullable(Decimal(10, 0))")}
, {"k", getType("Nullable(Float32)")}, {"l", getType("Nullable(Float64)")}, {"m", getType("Nullable(Float64)")}
, {"n", getType("Nullable(Float64)")}}
)
);
EXPECT_EQ(
convert("CREATE TABLE test(a tinyint(1), b SMALLINT(1), c MEDIUMINT(1), d INT(1), e INTEGER(1), f BIGINT(1), g DECIMAL(1), h "
"DEC(2, 1), i NUMERIC(4, 3), j FIXED(6, 5), k FLOAT(1), l DOUBLE(1, 2), m DOUBLE PRECISION(3, 4), n REAL(5, 6))"),
"CREATE TABLE test(`a` Nullable(Int8), `b` Nullable(Int16), `c` Nullable(Int32), `d` Nullable(Int32), `e` Nullable(Int32), `f` "
"Nullable(Int64), `g` Nullable(Decimal(1, 0)), `h` Nullable(Decimal(2, 1)), `i` Nullable(Decimal(4, 3)), `j` Nullable(Decimal(6, "
"5)), `k` Nullable(Float32), `l` Nullable(Float64), `m` Nullable(Float64), `n` Nullable(Float64)) ENGINE = MergeTree() PARTITION "
"BY tuple() ORDER BY tuple()");
visitQuery("CREATE TABLE test(a tinyint(1), b SMALLINT(1), c MEDIUMINT(1), d INT(1), e INTEGER(1), f BIGINT(1), g DECIMAL(1), h "
"DEC(2, 1), i NUMERIC(4, 3), j FIXED(6, 5), k FLOAT(1), l DOUBLE(1, 2), m DOUBLE PRECISION(3, 4), n REAL(5, 6))"),
MySQLTableStruct(ASTs{}, ASTs{}, {{"a", getType("Nullable(Int8)")}, {"b", getType("Nullable(Int16)")}
, {"c", getType("Nullable(Int32)")}, {"d", getType("Nullable(Int32)")}, {"e", getType("Nullable(Int32)")}
, {"f", getType("Nullable(Int64)")}, {"g", getType("Nullable(Decimal(1, 0))")}, {"h", getType("Nullable(Decimal(2, 1))")}
, {"i", getType("Nullable(Decimal(4, 3))")}, {"j", getType("Nullable(Decimal(6, 5))")}
, {"k", getType("Nullable(Float32)")}, {"l", getType("Nullable(Float64)")}, {"m", getType("Nullable(Float64)")}
, {"n", getType("Nullable(Float64)")}}
)
);
/// UNSIGNED
EXPECT_EQ(
convert("CREATE TABLE test(a tinyint UNSIGNED, b SMALLINT(1) UNSIGNED, c MEDIUMINT(1) UNSIGNED, d INT(1) UNSIGNED, e INTEGER(1), f "
"BIGINT(1) UNSIGNED, g DECIMAL(1) UNSIGNED, h DEC(2, 1) UNSIGNED, i NUMERIC(4, 3) UNSIGNED, j FIXED(6, 5) UNSIGNED, k FLOAT(1) "
"UNSIGNED, l DOUBLE(1, 2) UNSIGNED, m DOUBLE PRECISION(3, 4) UNSIGNED, n REAL(5, 6) UNSIGNED)"),
"CREATE TABLE test(`a` Nullable(UInt8), `b` Nullable(UInt16), `c` Nullable(UInt32), `d` Nullable(UInt32), `e` Nullable(Int32), `f` "
"Nullable(UInt64), `g` Nullable(Decimal(1, 0)), `h` Nullable(Decimal(2, 1)), `i` Nullable(Decimal(4, 3)), `j` Nullable(Decimal(6, "
"5)), `k` Nullable(Float32), `l` Nullable(Float64), `m` Nullable(Float64), `n` Nullable(Float64)) ENGINE = MergeTree() PARTITION "
"BY tuple() ORDER BY tuple()");
visitQuery("CREATE TABLE test(a tinyint UNSIGNED, b SMALLINT(1) UNSIGNED, c MEDIUMINT(1) UNSIGNED, d INT(1) UNSIGNED, e INTEGER(1), f "
"BIGINT(1) UNSIGNED, g DECIMAL(1) UNSIGNED, h DEC(2, 1) UNSIGNED, i NUMERIC(4, 3) UNSIGNED, j FIXED(6, 5) UNSIGNED, k FLOAT(1) "
"UNSIGNED, l DOUBLE(1, 2) UNSIGNED, m DOUBLE PRECISION(3, 4) UNSIGNED, n REAL(5, 6) UNSIGNED)"),
MySQLTableStruct(ASTs{}, ASTs{}, {{"a", getType("Nullable(UInt8)")}, {"b", getType("Nullable(UInt16)")}
, {"c", getType("Nullable(UInt32)")}, {"d", getType("Nullable(UInt32)")}, {"e", getType("Nullable(Int32)")}
, {"f", getType("Nullable(UInt64)")}, {"g", getType("Nullable(Decimal(1, 0))")}, {"h", getType("Nullable(Decimal(2, 1))")}
, {"i", getType("Nullable(Decimal(4, 3))")}, {"j", getType("Nullable(Decimal(6, 5))")}
, {"k", getType("Nullable(Float32)")}, {"l", getType("Nullable(Float64)")}, {"m", getType("Nullable(Float64)")}
, {"n", getType("Nullable(Float64)")}}
)
);
/// NOT NULL
EXPECT_EQ(
convert("CREATE TABLE test(a tinyint NOT NULL, b SMALLINT(1) NOT NULL, c MEDIUMINT(1) NOT NULL, d INT(1) NOT NULL, e INTEGER(1), f "
"BIGINT(1) NOT NULL, g DECIMAL(1) NOT NULL, h DEC(2, 1) NOT NULL, i NUMERIC(4, 3) NOT NULL, j FIXED(6, 5) NOT NULL, k FLOAT(1) NOT "
"NULL, l DOUBLE(1, 2) NOT NULL, m DOUBLE PRECISION(3, 4) NOT NULL, n REAL(5, 6) NOT NULL)"),
"CREATE TABLE test(`a` Int8, `b` Int16, `c` Int32, `d` Int32, `e` Nullable(Int32), `f` Int64, `g` Decimal(1, 0), `h` Decimal(2, "
"1), `i` Decimal(4, 3), `j` Decimal(6, 5), `k` Float32, `l` Float64, `m` Float64, `n` Float64) ENGINE = MergeTree() PARTITION BY "
"tuple() ORDER BY tuple()");
visitQuery("CREATE TABLE test(a tinyint NOT NULL, b SMALLINT(1) NOT NULL, c MEDIUMINT(1) NOT NULL, d INT(1) NOT NULL, e INTEGER(1), f "
"BIGINT(1) NOT NULL, g DECIMAL(1) NOT NULL, h DEC(2, 1) NOT NULL, i NUMERIC(4, 3) NOT NULL, j FIXED(6, 5) NOT NULL, k FLOAT(1) NOT "
"NULL, l DOUBLE(1, 2) NOT NULL, m DOUBLE PRECISION(3, 4) NOT NULL, n REAL(5, 6) NOT NULL)"),
MySQLTableStruct(ASTs{}, ASTs{}, {{"a", getType("Int8")}, {"b", getType("Int16")}
, {"c", getType("Int32")}, {"d", getType("Int32")}, {"e", getType("Nullable(Int32)")}
, {"f", getType("Int64")}, {"g", getType("Decimal(1, 0)")}, {"h", getType("Decimal(2, 1)")}
, {"i", getType("Decimal(4, 3)")}, {"j", getType("Decimal(6, 5)")}
, {"k", getType("Float32")}, {"l", getType("Float64")}, {"m", getType("Float64")}, {"n", getType("Float64")}}
)
);
/// ZEROFILL
EXPECT_EQ(
convert("CREATE TABLE test(a tinyint ZEROFILL, b SMALLINT(1) ZEROFILL, c MEDIUMINT(1) ZEROFILL, d INT(1) ZEROFILL, e INTEGER(1), f "
"BIGINT(1) ZEROFILL, g DECIMAL(1) ZEROFILL, h DEC(2, 1) ZEROFILL, i NUMERIC(4, 3) ZEROFILL, j FIXED(6, 5) ZEROFILL, k FLOAT(1) "
"ZEROFILL, l DOUBLE(1, 2) ZEROFILL, m DOUBLE PRECISION(3, 4) ZEROFILL, n REAL(5, 6) ZEROFILL)"),
"CREATE TABLE test(`a` Nullable(UInt8), `b` Nullable(UInt16), `c` Nullable(UInt32), `d` Nullable(UInt32), `e` Nullable(Int32), `f` "
"Nullable(UInt64), `g` Nullable(Decimal(1, 0)), `h` Nullable(Decimal(2, 1)), `i` Nullable(Decimal(4, 3)), `j` Nullable(Decimal(6, "
"5)), `k` Nullable(Float32), `l` Nullable(Float64), `m` Nullable(Float64), `n` Nullable(Float64)) ENGINE = MergeTree() PARTITION "
"BY tuple() ORDER BY tuple()");
visitQuery("CREATE TABLE test(a tinyint ZEROFILL, b SMALLINT(1) ZEROFILL, c MEDIUMINT(1) ZEROFILL, d INT(1) ZEROFILL, e INTEGER(1), f "
"BIGINT(1) ZEROFILL, g DECIMAL(1) ZEROFILL, h DEC(2, 1) ZEROFILL, i NUMERIC(4, 3) ZEROFILL, j FIXED(6, 5) ZEROFILL, k FLOAT(1) "
"ZEROFILL, l DOUBLE(1, 2) ZEROFILL, m DOUBLE PRECISION(3, 4) ZEROFILL, n REAL(5, 6) ZEROFILL)"),
MySQLTableStruct(ASTs{}, ASTs{}, {{"a", getType("Nullable(UInt8)")}, {"b", getType("Nullable(UInt16)")}
, {"c", getType("Nullable(UInt32)")}, {"d", getType("Nullable(UInt32)")}, {"e", getType("Nullable(Int32)")}
, {"f", getType("Nullable(UInt64)")}, {"g", getType("Nullable(Decimal(1, 0))")}, {"h", getType("Nullable(Decimal(2, 1))")}
, {"i", getType("Nullable(Decimal(4, 3))")}, {"j", getType("Nullable(Decimal(6, 5))")}
, {"k", getType("Nullable(Float32)")}, {"l", getType("Nullable(Float64)")}, {"m", getType("Nullable(Float64)")}
, {"n", getType("Nullable(Float64)")}}
)
);
}
TEST(CreateQueryConvert, TestConvertDateTimesColumnsType)
TEST(CreateQueryVisitor, TestWithDateTimesColumnsType)
{
EXPECT_EQ(
convert("CREATE TABLE test(a DATE, b DATETIME, c TIMESTAMP, d TIME, e year)"),
"CREATE TABLE test(`a` Nullable(Date), `b` Nullable(DateTime), `c` Nullable(DateTime), `d` Nullable(DateTime64(3)), `e` "
"Nullable(Int16)) ENGINE = MergeTree() PARTITION BY tuple() ORDER BY tuple()");
visitQuery("CREATE TABLE test(a DATE, b DATETIME, c TIMESTAMP, d TIME, e year)"),
MySQLTableStruct(ASTs{}, ASTs{}, {{"a", getType("Nullable(Date)")}, {"b", getType("Nullable(DateTime)")}
, {"c", getType("Nullable(DateTime)")}, {"d", getType("Nullable(DateTime64(3))")}, {"e", getType("Nullable(Int16)")} }
)
);
EXPECT_EQ(
convert("CREATE TABLE test(a DATE, b DATETIME(1), c TIMESTAMP(1), d TIME(1), e year(4))"),
"CREATE TABLE test(`a` Nullable(Date), `b` Nullable(DateTime), `c` Nullable(DateTime), `d` Nullable(DateTime64(3)), `e` "
"Nullable(Int16)) ENGINE = MergeTree() PARTITION BY tuple() ORDER BY tuple()");
visitQuery("CREATE TABLE test(a DATE, b DATETIME(1), c TIMESTAMP(1), d TIME(1), e year(4))"),
MySQLTableStruct(ASTs{}, ASTs{}, {{"a", getType("Nullable(Date)")}, {"b", getType("Nullable(DateTime)")}
, {"c", getType("Nullable(DateTime)")}, {"d", getType("Nullable(DateTime64(3))")}, {"e", getType("Nullable(Int16)")} }
)
);
EXPECT_EQ(
convert(
"CREATE TABLE test(a DATE NOT NULL, b DATETIME(1) NOT NULL, c TIMESTAMP(1) NOT NULL, d TIME(1) NOT NULL, e year(4) NOT NULL)"),
"CREATE TABLE test(`a` Date, `b` DateTime, `c` DateTime, `d` DateTime64(3), `e` Int16) ENGINE = MergeTree() PARTITION BY tuple() "
"ORDER BY tuple()");
visitQuery("CREATE TABLE test(a DATE NOT NULL, b DATETIME(1) NOT NULL, c TIMESTAMP(1) NOT NULL, d TIME(1) NOT NULL, e year(4) NOT NULL)"),
MySQLTableStruct(ASTs{}, ASTs{}, {{"a", getType("Date")}, {"b", getType("DateTime")} , {"c", getType("DateTime")}, {"d", getType("DateTime64")}, {"e", getType("Int16")} }
)
);
}
TEST(CreateQueryConvert, TestConvertParitionOptions)
TEST(CreateQueryVisitor, TestWithParitionOptions)
{
EXPECT_EQ(
convert("CREATE TABLE test(a DATE NOT NULL) PARTITION BY HASH a"),
"CREATE TABLE test(`a` Date) ENGINE = MergeTree() PARTITION BY tuple(a) ORDER BY tuple()");
visitQuery("CREATE TABLE test(a DATE NOT NULL) PARTITION BY HASH a"),
MySQLTableStruct(ASTs{}, ASTs{std::make_shared<ASTIdentifier>("a")}, {{"a", getType("Date")}}));
EXPECT_EQ(
convert("CREATE TABLE test(a DATE NOT NULL) PARTITION BY LINEAR HASH a"),
"CREATE TABLE test(`a` Date) ENGINE = MergeTree() PARTITION BY tuple(a) ORDER BY tuple()");
visitQuery("CREATE TABLE test(a DATE NOT NULL) PARTITION BY LINEAR HASH a"),
MySQLTableStruct(ASTs{}, ASTs{std::make_shared<ASTIdentifier>("a")}, {{"a", getType("Date")}}));
EXPECT_EQ(
convert("CREATE TABLE test(a DATE NOT NULL) PARTITION BY RANGE(a)"),
"CREATE TABLE test(`a` Date) ENGINE = MergeTree() PARTITION BY tuple(a) ORDER BY tuple()");
visitQuery("CREATE TABLE test(a DATE NOT NULL) PARTITION BY RANGE(a)"),
MySQLTableStruct(ASTs{}, ASTs{std::make_shared<ASTIdentifier>("a")}, {{"a", getType("Date")}}));
EXPECT_EQ(
convert("CREATE TABLE test(a DATE NOT NULL, b INT) PARTITION BY RANGE COLUMNS(a, b)"),
"CREATE TABLE test(`a` Date, `b` Nullable(Int32)) ENGINE = MergeTree() PARTITION BY (a, b) ORDER BY tuple()");
visitQuery("CREATE TABLE test(a DATE NOT NULL, b INT) PARTITION BY RANGE COLUMNS(a, b)"),
MySQLTableStruct(ASTs{}, ASTs{std::make_shared<ASTIdentifier>("a"), std::make_shared<ASTIdentifier>("b")}, {{"a", getType("Date")}, {"b", getType("Nullable(Int32)")}}));
EXPECT_EQ(
convert("CREATE TABLE test(a DATE NOT NULL) PARTITION BY LIST(a)"),
"CREATE TABLE test(`a` Date) ENGINE = MergeTree() PARTITION BY tuple(a) ORDER BY tuple()");
visitQuery("CREATE TABLE test(a DATE NOT NULL) PARTITION BY LIST(a)"),
MySQLTableStruct(ASTs{}, ASTs{std::make_shared<ASTIdentifier>("a")}, {{"a", getType("Date")}}));
EXPECT_EQ(
convert("CREATE TABLE test(a DATE NOT NULL, b INT) PARTITION BY LIST COLUMNS(a, b)"),
"CREATE TABLE test(`a` Date, `b` Nullable(Int32)) ENGINE = MergeTree() PARTITION BY (a, b) ORDER BY tuple()");
visitQuery("CREATE TABLE test(a DATE NOT NULL, b INT) PARTITION BY LIST COLUMNS(a, b)"),
MySQLTableStruct(ASTs{}, ASTs{std::make_shared<ASTIdentifier>("a"), std::make_shared<ASTIdentifier>("b")},
{{"a", getType("Date")}, {"b", getType("Nullable(Int32)")}}));
}
TEST(CreateQueryConvert, TestConvertPrimaryToPartitionBy)
TEST(CreateQueryVisitor, TestWithPrimaryToPartitionBy)
{
EXPECT_EQ(convert("CREATE TABLE test(a DATE NOT NULL PRIMARY KEY)"),
"CREATE TABLE test(`a` Date) ENGINE = MergeTree() PARTITION BY tuple(toYYYYMM(a)) ORDER BY tuple(a)");
EXPECT_EQ(convert("CREATE TABLE test(a DATETIME NOT NULL PRIMARY KEY)"),
"CREATE TABLE test(`a` DateTime) ENGINE = MergeTree() PARTITION BY tuple(toYYYYMM(a)) ORDER BY tuple(a)");
EXPECT_EQ(convert("CREATE TABLE test(a TINYINT NOT NULL PRIMARY KEY)"),
"CREATE TABLE test(`a` Int8) ENGINE = MergeTree() PARTITION BY tuple(a) ORDER BY tuple(a)");
EXPECT_EQ(convert("CREATE TABLE test(a SMALLINT NOT NULL PRIMARY KEY)"),
"CREATE TABLE test(`a` Int16) ENGINE = MergeTree() PARTITION BY tuple(a) ORDER BY tuple(a)");
EXPECT_EQ(convert("CREATE TABLE test(a INT NOT NULL PRIMARY KEY)"),
"CREATE TABLE test(`a` Int32) ENGINE = MergeTree() PARTITION BY tuple(a / 4294968) ORDER BY tuple(a)");
EXPECT_EQ(convert("CREATE TABLE test(a BIGINT NOT NULL PRIMARY KEY)"),
"CREATE TABLE test(`a` Int64) ENGINE = MergeTree() PARTITION BY tuple(a / 18446744073709552) ORDER BY tuple(a)");
EXPECT_EQ(
visitQuery("CREATE TABLE test(a DATE NOT NULL PRIMARY KEY)"),
MySQLTableStruct(ASTs{std::make_shared<ASTIdentifier>("a")}, ASTs{}, {{"a", getType("Date")}}));
EXPECT_EQ(
convert("CREATE TABLE test(a BIGINT PRIMARY KEY)"),
"CREATE TABLE test(`a` Nullable(Int64)) ENGINE = MergeTree() PARTITION BY tuple(assumeNotNull(a) / 18446744073709552) ORDER BY "
"tuple(a)");
visitQuery("CREATE TABLE test(a DATETIME NOT NULL PRIMARY KEY)"),
MySQLTableStruct(ASTs{std::make_shared<ASTIdentifier>("a")}, ASTs{}, {{"a", getType("DateTime")}}));
EXPECT_EQ(
visitQuery("CREATE TABLE test(a TINYINT NOT NULL PRIMARY KEY)"),
MySQLTableStruct(ASTs{std::make_shared<ASTIdentifier>("a")}, ASTs{}, {{"a", getType("Int8")}}));
EXPECT_EQ(
visitQuery("CREATE TABLE test(a SMALLINT NOT NULL PRIMARY KEY)"),
MySQLTableStruct(ASTs{std::make_shared<ASTIdentifier>("a")}, ASTs{}, {{"a", getType("Int16")}}));
EXPECT_EQ(
visitQuery("CREATE TABLE test(a INT NOT NULL PRIMARY KEY)"),
MySQLTableStruct(ASTs{std::make_shared<ASTIdentifier>("a")}, ASTs{}, {{"a", getType("Int32")}}));
EXPECT_EQ(
visitQuery("CREATE TABLE test(a BIGINT NOT NULL PRIMARY KEY)"),
MySQLTableStruct(ASTs{std::make_shared<ASTIdentifier>("a")}, ASTs{}, {{"a", getType("Int64")}}));
EXPECT_EQ(
visitQuery("CREATE TABLE test(a BIGINT PRIMARY KEY)"),
MySQLTableStruct(ASTs{std::make_shared<ASTIdentifier>("a")}, ASTs{}, {{"a", getType("Nullable(Int64)")}}));
}