refactor InterpreterCreateQuery::createTable(...)

This commit is contained in:
Alexander Tokmakov 2019-10-23 21:39:07 +03:00
parent 3eaa96e666
commit 9c461ff143
6 changed files with 160 additions and 158 deletions

View File

@ -269,16 +269,8 @@ void DatabaseOrdinary::alterTable(
ASTPtr new_constraints = InterpreterCreateQuery::formatConstraints(constraints);
ast_create_query.columns_list->replace(ast_create_query.columns_list->columns, new_columns);
if (ast_create_query.columns_list->indices)
ast_create_query.columns_list->replace(ast_create_query.columns_list->indices, new_indices);
else
ast_create_query.columns_list->set(ast_create_query.columns_list->indices, new_indices);
if (ast_create_query.columns_list->constraints)
ast_create_query.columns_list->replace(ast_create_query.columns_list->constraints, new_constraints);
else
ast_create_query.columns_list->set(ast_create_query.columns_list->constraints, new_constraints);
ast_create_query.columns_list->setOrReplace(ast_create_query.columns_list->indices, new_indices);
ast_create_query.columns_list->setOrReplace(ast_create_query.columns_list->constraints, new_constraints);
if (storage_modifier)
storage_modifier(*ast_create_query.storage);

View File

@ -402,84 +402,97 @@ ConstraintsDescription InterpreterCreateQuery::getConstraintsDescription(const A
}
ColumnsDescription InterpreterCreateQuery::setProperties(
ASTCreateQuery & create, const Block & as_select_sample, const StoragePtr & as_storage) const
InterpreterCreateQuery::TableProperties InterpreterCreateQuery::setProperties(ASTCreateQuery & create) const
{
ColumnsDescription columns;
IndicesDescription indices;
ConstraintsDescription constraints;
TableProperties properties;
TableStructureReadLockHolder as_storage_lock;
if (create.columns_list)
{
if (create.columns_list->columns)
columns = getColumnsDescription(*create.columns_list->columns, context);
properties.columns = getColumnsDescription(*create.columns_list->columns, context);
if (create.columns_list->indices)
for (const auto & index : create.columns_list->indices->children)
indices.indices.push_back(
properties.indices.indices.push_back(
std::dynamic_pointer_cast<ASTIndexDeclaration>(index->clone()));
if (create.columns_list->constraints)
for (const auto & constraint : create.columns_list->constraints->children)
constraints.constraints.push_back(
std::dynamic_pointer_cast<ASTConstraintDeclaration>(constraint->clone()));
properties.constraints = getConstraintsDescription(create.columns_list->constraints);
}
else if (!create.as_table.empty())
{
columns = as_storage->getColumns();
String as_database_name = create.as_database.empty() ? context.getCurrentDatabase() : create.as_database;
StoragePtr as_storage = context.getTable(as_database_name, create.as_table);
/// as_storage->getColumns() and setEngine(...) must be called under structure lock of other_table for CREATE ... AS other_table.
as_storage_lock = as_storage->lockStructureForShare(false, context.getCurrentQueryId());
properties.columns = as_storage->getColumns();
/// Secondary indices make sense only for MergeTree family of storage engines.
/// We should not copy them for other storages.
if (create.storage && endsWith(create.storage->engine->name, "MergeTree"))
indices = as_storage->getIndices();
properties.indices = as_storage->getIndices();
constraints = as_storage->getConstraints();
properties.constraints = as_storage->getConstraints();
}
else if (create.select)
{
columns = ColumnsDescription(as_select_sample.getNamesAndTypesList());
Block as_select_sample = InterpreterSelectWithUnionQuery::getSampleBlock(create.select->clone(), context);
properties.columns = ColumnsDescription(as_select_sample.getNamesAndTypesList());
}
else if (create.as_table_function)
return {};
else
throw Exception("Incorrect CREATE query: required list of column descriptions or AS section or SELECT.", ErrorCodes::INCORRECT_QUERY);
/// Even if query has list of columns, canonicalize it (unfold Nested columns).
ASTPtr new_columns = formatColumns(columns);
ASTPtr new_indices = formatIndices(indices);
ASTPtr new_constraints = formatConstraints(constraints);
if (!create.columns_list)
{
auto new_columns_list = std::make_shared<ASTColumns>();
create.set(create.columns_list, new_columns_list);
}
create.set(create.columns_list, std::make_shared<ASTColumns>());
if (create.columns_list->columns)
create.columns_list->replace(create.columns_list->columns, new_columns);
else
create.columns_list->set(create.columns_list->columns, new_columns);
ASTPtr new_columns = formatColumns(properties.columns);
ASTPtr new_indices = formatIndices(properties.indices);
ASTPtr new_constraints = formatConstraints(properties.constraints);
if (new_indices && create.columns_list->indices)
create.columns_list->replace(create.columns_list->indices, new_indices);
else if (new_indices)
create.columns_list->set(create.columns_list->indices, new_indices);
create.columns_list->setOrReplace(create.columns_list->columns, new_columns);
create.columns_list->setOrReplace(create.columns_list->indices, new_indices);
create.columns_list->setOrReplace(create.columns_list->constraints, new_constraints);
if (new_constraints && create.columns_list->constraints)
create.columns_list->replace(create.columns_list->constraints, new_constraints);
else if (new_constraints)
create.columns_list->set(create.columns_list->constraints, new_constraints);
validateTableStructure(create, properties);
/// Set the table engine if it was not specified explicitly.
setEngine(create);
return properties;
}
void InterpreterCreateQuery::validateTableStructure(const ASTCreateQuery & create,
const InterpreterCreateQuery::TableProperties & properties) const
{
/// Check for duplicates
std::set<String> all_columns;
for (const auto & column : columns)
for (const auto & column : properties.columns)
{
if (!all_columns.emplace(column.name).second)
throw Exception("Column " + backQuoteIfNeed(column.name) + " already exists", ErrorCodes::DUPLICATE_COLUMN);
}
return columns;
/// Check low cardinality types in creating table if it was not allowed in setting
if (!create.attach && !context.getSettingsRef().allow_suspicious_low_cardinality_types && !create.is_materialized_view)
{
for (const auto & name_and_type_pair : properties.columns.getAllPhysical())
{
if (const auto * current_type_ptr = typeid_cast<const DataTypeLowCardinality *>(name_and_type_pair.type.get()))
{
if (!isStringOrFixedString(*removeNullable(current_type_ptr->getDictionaryType())))
throw Exception("Creating columns of type " + current_type_ptr->getName() + " is prohibited by default "
"due to expected negative impact on performance. "
"It can be enabled with the \"allow_suspicious_low_cardinality_types\" setting.",
ErrorCodes::SUSPICIOUS_TYPE_FOR_LOW_CARDINALITY);
}
}
}
}
void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
{
if (create.storage)
@ -541,12 +554,10 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
throw Exception("Temporary tables cannot be inside a database. You should not specify a database for a temporary table.",
ErrorCodes::BAD_DATABASE_FOR_TEMPORARY_TABLE);
String path = context.getPath();
String current_database = context.getCurrentDatabase();
String database_name = create.database.empty() ? current_database : create.database;
String table_name = create.table;
String table_name_escaped = escapeForFileName(table_name);
// If this is a stub ATTACH query, read the query definition from the database
if (create.attach && !create.storage && !create.columns_list)
@ -566,26 +577,66 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
visitor.visit(*create.select);
}
Block as_select_sample;
if (create.select && (!create.attach || !create.columns_list))
as_select_sample = InterpreterSelectWithUnionQuery::getSampleBlock(create.select->clone(), context);
/// Set and retrieve list of columns, indices and constraints. Set table engine if needed. Rewrite query in canonical way.
TableProperties properties = setProperties(create);
String as_database_name = create.as_database.empty() ? current_database : create.as_database;
String as_table_name = create.as_table;
/// Actually creates table
bool created = doCreateTable(create, properties, database_name);
if (!created)
return {};
StoragePtr as_storage;
TableStructureReadLockHolder as_storage_lock;
return fillTableIfNeeded(create, database_name);
}
if (!as_table_name.empty())
bool InterpreterCreateQuery::doCreateTable(const ASTCreateQuery & create,
const InterpreterCreateQuery::TableProperties & properties,
const String & database_name)
{
std::unique_ptr<DDLGuard> guard;
String data_path;
DatabasePtr database;
const String & table_name = create.table;
bool need_add_to_database = !create.temporary || create.is_live_view;
if (need_add_to_database)
{
as_storage = context.getTable(as_database_name, as_table_name);
as_storage_lock = as_storage->lockStructureForShare(false, context.getCurrentQueryId());
database = context.getDatabase(database_name);
if (!create.uuid.empty() && database->getEngineName() != "Atomic")
throw Exception("Table UUID specified, but engine of database " + database_name + " is not Atomic",
ErrorCodes::INCORRECT_QUERY);
data_path = database->getDataPath();
/** If the request specifies IF NOT EXISTS, we allow concurrent CREATE queries (which do nothing).
* If table doesnt exist, one thread is creating table, while others wait in DDLGuard.
*/
guard = context.getDDLGuard(database_name, table_name);
/// Table can be created before or it can be created concurrently in another thread, while we were waiting in DDLGuard.
if (database->isTableExist(context, table_name))
{
if (create.if_not_exists)
return false;
else if (create.replace_view)
{
/// when executing CREATE OR REPLACE VIEW, drop current existing view
auto drop_ast = std::make_shared<ASTDropQuery>();
drop_ast->database = database_name;
drop_ast->table = table_name;
drop_ast->no_ddl_lock = true;
InterpreterDropQuery interpreter(drop_ast, context);
interpreter.execute();
}
else
throw Exception("Table " + database_name + "." + table_name + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
}
}
else if (context.tryGetExternalTable(table_name) && create.if_not_exists)
return false;
ColumnsDescription columns;
ConstraintsDescription constraints;
StoragePtr res;
if (create.as_table_function)
{
const auto & table_function = create.as_table_function->as<ASTFunction &>();
@ -594,101 +645,38 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
}
else
{
/// Set and retrieve list of columns.
columns = setProperties(create, as_select_sample, as_storage);
constraints = getConstraintsDescription(create.columns_list->constraints);
/// Check low cardinality types in creating table if it was not allowed in setting
if (!create.attach && !context.getSettingsRef().allow_suspicious_low_cardinality_types && !create.is_materialized_view)
{
for (const auto & name_and_type_pair : columns.getAllPhysical())
{
if (const auto * current_type_ptr = typeid_cast<const DataTypeLowCardinality *>(name_and_type_pair.type.get()))
{
if (!isStringOrFixedString(*removeNullable(current_type_ptr->getDictionaryType())))
throw Exception("Creating columns of type " + current_type_ptr->getName() + " is prohibited by default due to expected negative impact on performance. It can be enabled with the \"allow_suspicious_low_cardinality_types\" setting.",
ErrorCodes::SUSPICIOUS_TYPE_FOR_LOW_CARDINALITY);
}
}
}
/// Set the table engine if it was not specified explicitly.
setEngine(create);
res = StorageFactory::instance().get(create,
data_path,
table_name,
database_name,
context,
context.getGlobalContext(),
properties.columns,
properties.constraints,
create.attach,
false);
}
{
std::unique_ptr<DDLGuard> guard;
if (need_add_to_database)
database->createTable(context, table_name, res, query_ptr);
else
context.getSessionContext().addExternalTable(table_name, res, query_ptr);
String data_path;
DatabasePtr database;
/// We must call "startup" and "shutdown" while holding DDLGuard.
/// Because otherwise method "shutdown" (from InterpreterDropQuery) can be called before startup
/// (in case when table was created and instantly dropped before started up)
///
/// Method "startup" may create background tasks and method "shutdown" will wait for them.
/// But if "shutdown" is called before "startup", it will exit early, because there are no background tasks to wait.
/// Then background task is created by "startup" method. And when destructor of a table object is called, background task is still active,
/// and the task will use references to freed data.
if (!create.temporary || create.is_live_view)
{
database = context.getDatabase(database_name);
if (!create.uuid.empty() && database->getEngineName() != "Atomic")
throw Exception("Table UUID specified, but engine of database " + database_name + " is not Atomic",
ErrorCodes::INCORRECT_QUERY);
data_path = database->getDataPath();
/** If the request specifies IF NOT EXISTS, we allow concurrent CREATE queries (which do nothing).
* If table doesnt exist, one thread is creating table, while others wait in DDLGuard.
*/
guard = context.getDDLGuard(database_name, table_name);
/// Table can be created before or it can be created concurrently in another thread, while we were waiting in DDLGuard.
if (database->isTableExist(context, table_name))
{
if (create.if_not_exists)
return {};
else if (create.replace_view)
{
/// when executing CREATE OR REPLACE VIEW, drop current existing view
auto drop_ast = std::make_shared<ASTDropQuery>();
drop_ast->database = database_name;
drop_ast->table = table_name;
drop_ast->no_ddl_lock = true;
InterpreterDropQuery interpreter(drop_ast, context);
interpreter.execute();
}
else
throw Exception("Table " + database_name + "." + table_name + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
}
}
else if (context.tryGetExternalTable(table_name) && create.if_not_exists)
return {};
if (!create.as_table_function)
{
res = StorageFactory::instance().get(create,
data_path,
table_name,
database_name,
context,
context.getGlobalContext(),
columns,
constraints,
create.attach,
false);
}
if (create.temporary && !create.is_live_view)
context.getSessionContext().addExternalTable(table_name, res, query_ptr);
else
database->createTable(context, table_name, res, query_ptr);
/// We must call "startup" and "shutdown" while holding DDLGuard.
/// Because otherwise method "shutdown" (from InterpreterDropQuery) can be called before startup
/// (in case when table was created and instantly dropped before started up)
///
/// Method "startup" may create background tasks and method "shutdown" will wait for them.
/// But if "shutdown" is called before "startup", it will exit early, because there are no background tasks to wait.
/// Then background task is created by "startup" method. And when destructor of a table object is called, background task is still active,
/// and the task will use references to freed data.
res->startup();
}
res->startup();
return true;
}
BlockIO InterpreterCreateQuery::fillTableIfNeeded(const ASTCreateQuery & create, const String & database_name)
{
/// If the query is a CREATE SELECT, insert the data into the table.
if (create.select && !create.attach
&& !create.is_view && !create.is_live_view && (!create.is_materialized_view || create.is_populate))
@ -698,7 +686,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
if (!create.temporary)
insert->database = database_name;
insert->table = table_name;
insert->table = create.table;
insert->select = create.select->clone();
if (create.temporary && !context.getSessionContext().hasQueryContext())

View File

@ -49,14 +49,27 @@ public:
static ConstraintsDescription getConstraintsDescription(const ASTExpressionList * constraints);
private:
struct TableProperties
{
ColumnsDescription columns;
IndicesDescription indices;
ConstraintsDescription constraints;
};
BlockIO createDatabase(ASTCreateQuery & create);
BlockIO createTable(ASTCreateQuery & create);
/// Calculate list of columns, constraints, indices, etc... of table and return columns.
ColumnsDescription setProperties(ASTCreateQuery & create, const Block & as_select_sample, const StoragePtr & as_storage) const;
/// Calculate list of columns, constraints, indices, etc... of table. Rewrite query in canonical way.
TableProperties setProperties(ASTCreateQuery & create) const;
void validateTableStructure(const ASTCreateQuery & create, const TableProperties & properties) const;
void setEngine(ASTCreateQuery & create) const;
void checkAccess(const ASTCreateQuery & create);
/// Create IStorage and add it to database. If table already exists and IF NOT EXISTS specified, do nothing and return false.
bool doCreateTable(const ASTCreateQuery & create, const TableProperties & properties, const String & database_name);
/// Inserts data in created table if it's CREATE ... SELECT
BlockIO fillTableIfNeeded(const ASTCreateQuery & create, const String & database_name);
ASTPtr query_ptr;
Context & context;

View File

@ -146,6 +146,15 @@ public:
throw Exception("AST subtree not found in children", ErrorCodes::LOGICAL_ERROR);
}
template <typename T>
void setOrReplace(T * & field, const ASTPtr & child)
{
if (field)
replace(field, child);
else
set(field, child);
}
/// Convert to a string.
/// Format settings.

View File

@ -39,7 +39,7 @@ void StorageFactory::registerStorage(const std::string & name, Creator creator)
StoragePtr StorageFactory::get(
ASTCreateQuery & query,
const ASTCreateQuery & query,
const String & data_path,
const String & table_name,
const String & database_name,

View File

@ -46,7 +46,7 @@ public:
using Creator = std::function<StoragePtr(const Arguments & arguments)>;
StoragePtr get(
ASTCreateQuery & query,
const ASTCreateQuery & query,
const String & data_path,
const String & table_name,
const String & database_name,