Merge remote-tracking branch 'upstream/master' into fix27

This commit is contained in:
proller 2019-12-28 23:54:38 +03:00
commit 76890058b6
41 changed files with 820 additions and 555 deletions

View File

@ -0,0 +1,119 @@
#include <memory>
#include <random>
#include <DataTypes/DataTypesNumber.h>
#include <Common/thread_local_rng.h>
#include <IO/ReadBuffer.h>
#include <IO/WriteBuffer.h>
#include <AggregateFunctions/IAggregateFunction.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
namespace DB
{
namespace ErrorCodes
{
extern const int AGGREGATE_FUNCTION_THROW;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
namespace
{
struct AggregateFunctionThrowData
{
bool allocated;
AggregateFunctionThrowData() : allocated(true) {}
~AggregateFunctionThrowData()
{
volatile bool * allocated_ptr = &allocated;
if (*allocated_ptr)
*allocated_ptr = false;
else
abort();
}
};
/** Throw on creation with probability specified in parameter.
* It will check correct destruction of the state.
* This is intended to check for exception safety.
*/
class AggregateFunctionThrow final : public IAggregateFunctionDataHelper<AggregateFunctionThrowData, AggregateFunctionThrow>
{
private:
Float64 throw_probability;
public:
AggregateFunctionThrow(const DataTypes & argument_types_, const Array & parameters_, Float64 throw_probability_)
: IAggregateFunctionDataHelper(argument_types_, parameters_), throw_probability(throw_probability_) {}
String getName() const override
{
return "aggThrow";
}
DataTypePtr getReturnType() const override
{
return std::make_shared<DataTypeUInt8>();
}
void create(AggregateDataPtr place) const override
{
if (std::uniform_real_distribution<>(0.0, 1.0)(thread_local_rng) <= throw_probability)
throw Exception("Aggregate function " + getName() + " has thrown exception successfully", ErrorCodes::AGGREGATE_FUNCTION_THROW);
new (place) Data;
}
void destroy(AggregateDataPtr place) const noexcept override
{
data(place).~Data();
}
void add(AggregateDataPtr, const IColumn **, size_t, Arena *) const override
{
}
void merge(AggregateDataPtr, ConstAggregateDataPtr, Arena *) const override
{
}
void serialize(ConstAggregateDataPtr, WriteBuffer & buf) const override
{
char c = 0;
buf.write(c);
}
void deserialize(AggregateDataPtr, ReadBuffer & buf, Arena *) const override
{
char c = 0;
buf.read(c);
}
void insertResultInto(ConstAggregateDataPtr, IColumn & to) const override
{
to.insertDefault();
}
};
}
void registerAggregateFunctionAggThrow(AggregateFunctionFactory & factory)
{
factory.registerFunction("aggThrow", [](const std::string & name, const DataTypes & argument_types, const Array & parameters)
{
Float64 throw_probability = 1.0;
if (parameters.size() == 1)
throw_probability = parameters[0].safeGet<Float64>();
else if (parameters.size() > 1)
throw Exception("Aggregate function " + name + " cannot have more than one parameter", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
return std::make_shared<AggregateFunctionThrow>(argument_types, parameters, throw_probability);
});
}
}

View File

@ -100,7 +100,18 @@ public:
void create(AggregateDataPtr place) const override
{
for (size_t i = 0; i < total; ++i)
nested_function->create(place + i * size_of_data);
{
try
{
nested_function->create(place + i * size_of_data);
}
catch (...)
{
for (size_t j = 0; j < i; ++j)
nested_function->destroy(place + j * size_of_data);
throw;
}
}
}
void destroy(AggregateDataPtr place) const noexcept override

View File

@ -23,13 +23,13 @@ inline void assertNoParameters(const std::string & name, const Array & parameter
inline void assertUnary(const std::string & name, const DataTypes & argument_types)
{
if (argument_types.size() != 1)
throw Exception("Aggregate function " + name + " require single argument", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
throw Exception("Aggregate function " + name + " requires single argument", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
}
inline void assertBinary(const std::string & name, const DataTypes & argument_types)
{
if (argument_types.size() != 2)
throw Exception("Aggregate function " + name + " require two arguments", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
throw Exception("Aggregate function " + name + " requires two arguments", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
}
template<std::size_t maximal_arity>

View File

@ -213,7 +213,7 @@ protected:
public:
IAggregateFunctionDataHelper(const DataTypes & argument_types_, const Array & parameters_)
: IAggregateFunctionHelper<Derived>(argument_types_, parameters_) {}
: IAggregateFunctionHelper<Derived>(argument_types_, parameters_) {}
void create(AggregateDataPtr place) const override
{

View File

@ -42,6 +42,7 @@ void registerAggregateFunctions()
registerAggregateFunctionSimpleLinearRegression(factory);
registerAggregateFunctionMoving(factory);
registerAggregateFunctionCategoricalIV(factory);
registerAggregateFunctionAggThrow(factory);
}
{

View File

@ -34,6 +34,7 @@ void registerAggregateFunctionEntropy(AggregateFunctionFactory &);
void registerAggregateFunctionSimpleLinearRegression(AggregateFunctionFactory &);
void registerAggregateFunctionMoving(AggregateFunctionFactory &);
void registerAggregateFunctionCategoricalIV(AggregateFunctionFactory &);
void registerAggregateFunctionAggThrow(AggregateFunctionFactory &);
class AggregateFunctionCombinatorFactory;
void registerAggregateFunctionCombinatorIf(AggregateFunctionCombinatorFactory &);

View File

@ -477,6 +477,7 @@ namespace ErrorCodes
extern const int CANNOT_CREATE_DICTIONARY_FROM_METADATA = 500;
extern const int CANNOT_CREATE_DATABASE = 501;
extern const int CANNOT_SIGQUEUE = 502;
extern const int AGGREGATE_FUNCTION_THROW = 503;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

View File

@ -175,10 +175,7 @@ ASTPtr DatabaseLazy::getCreateDatabaseQuery(const Context & context) const
void DatabaseLazy::alterTable(
const Context & /* context */,
const String & /* table_name */,
const ColumnsDescription & /* columns */,
const IndicesDescription & /* indices */,
const ConstraintsDescription & /* constraints */,
const ASTModifier & /* storage_modifier */)
const StorageInMemoryMetadata & /* metadata */)
{
SCOPE_EXIT({ clearExpiredTables(); });
throw Exception("ALTER query is not supported for Lazy database.", ErrorCodes::UNSUPPORTED_METHOD);

View File

@ -55,10 +55,7 @@ public:
void alterTable(
const Context & context,
const String & name,
const ColumnsDescription & columns,
const IndicesDescription & indices,
const ConstraintsDescription & constraints,
const ASTModifier & engine_modifier) override;
const StorageInMemoryMetadata & metadata) override;
time_t getObjectMetadataModificationTime(
const Context & context,

View File

@ -20,9 +20,12 @@
#include <Dictionaries/DictionaryFactory.h>
#include <Parsers/parseQuery.h>
#include <Parsers/formatAST.h>
#include <Parsers/ASTSetQuery.h>
#include <Storages/IStorage.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Parsers/queryToString.h>
#include <Poco/DirectoryIterator.h>
#include <Poco/Event.h>
#include <Common/Stopwatch.h>
@ -315,10 +318,7 @@ ASTPtr DatabaseOrdinary::getCreateDatabaseQuery(const Context & context) const
void DatabaseOrdinary::alterTable(
const Context & context,
const String & table_name,
const ColumnsDescription & columns,
const IndicesDescription & indices,
const ConstraintsDescription & constraints,
const ASTModifier & storage_modifier)
const StorageInMemoryMetadata & metadata)
{
/// Read the definition of the table and replace the necessary parts with new ones.
@ -338,19 +338,30 @@ void DatabaseOrdinary::alterTable(
const auto & ast_create_query = ast->as<ASTCreateQuery &>();
ASTPtr new_columns = InterpreterCreateQuery::formatColumns(columns);
ASTPtr new_indices = InterpreterCreateQuery::formatIndices(indices);
ASTPtr new_constraints = InterpreterCreateQuery::formatConstraints(constraints);
ASTPtr new_columns = InterpreterCreateQuery::formatColumns(metadata.columns);
ASTPtr new_indices = InterpreterCreateQuery::formatIndices(metadata.indices);
ASTPtr new_constraints = InterpreterCreateQuery::formatConstraints(metadata.constraints);
ast_create_query.columns_list->replace(ast_create_query.columns_list->columns, new_columns);
ast_create_query.columns_list->setOrReplace(ast_create_query.columns_list->indices, new_indices);
ast_create_query.columns_list->setOrReplace(ast_create_query.columns_list->constraints, new_constraints);
if (storage_modifier)
storage_modifier(*ast_create_query.storage);
ASTStorage & storage_ast = *ast_create_query.storage;
/// ORDER BY may change, but cannot appear, it's required construction
if (metadata.order_by_ast && storage_ast.order_by)
storage_ast.set(storage_ast.order_by, metadata.order_by_ast);
if (metadata.primary_key_ast)
storage_ast.set(storage_ast.primary_key, metadata.primary_key_ast);
if (metadata.ttl_for_table_ast)
storage_ast.set(storage_ast.ttl_table, metadata.ttl_for_table_ast);
if (metadata.settings_ast)
storage_ast.set(storage_ast.settings, metadata.settings_ast);
statement = getObjectDefinitionFromCreateQuery(ast);
{
WriteBufferFromFile out(table_metadata_tmp_path, statement.size(), O_WRONLY | O_CREAT | O_EXCL);
writeString(statement, out);

View File

@ -51,10 +51,7 @@ public:
void alterTable(
const Context & context,
const String & name,
const ColumnsDescription & columns,
const IndicesDescription & indices,
const ConstraintsDescription & constraints,
const ASTModifier & engine_modifier) override;
const StorageInMemoryMetadata & metadata) override;
time_t getObjectMetadataModificationTime(
const Context & context,

View File

@ -3,6 +3,7 @@
#include <Core/Types.h>
#include <Parsers/IAST_fwd.h>
#include <Storages/IStorage_fwd.h>
#include <Storages/StorageInMemoryMetadata.h>
#include <Dictionaries/IDictionary.h>
#include <Common/Exception.h>
@ -192,12 +193,9 @@ public:
virtual void alterTable(
const Context & /*context*/,
const String & /*name*/,
const ColumnsDescription & /*columns*/,
const IndicesDescription & /*indices*/,
const ConstraintsDescription & /*constraints*/,
const ASTModifier & /*engine_modifier*/)
const StorageInMemoryMetadata & /*metadata*/)
{
throw Exception(getEngineName() + ": renameTable() is not supported", ErrorCodes::NOT_IMPLEMENTED);
throw Exception(getEngineName() + ": alterTable() is not supported", ErrorCodes::NOT_IMPLEMENTED);
}
/// Returns time of table's metadata change, 0 if there is no corresponding metadata file.

View File

@ -168,7 +168,8 @@ void FunctionArrayReduce::executeImpl(Block & block, const ColumnNumbers & argum
}
catch (...)
{
agg_func.destroy(places[i]);
for (size_t j = 0; j < i; ++j)
agg_func.destroy(places[j]);
throw;
}
}

View File

@ -103,7 +103,9 @@ BlockIO InterpreterAlterQuery::execute()
if (!alter_commands.empty())
{
auto table_lock_holder = table->lockAlterIntention(context.getCurrentQueryId());
alter_commands.validate(*table, context);
StorageInMemoryMetadata metadata = table->getInMemoryMetadata();
alter_commands.validate(metadata, context);
alter_commands.prepare(metadata, context);
table->alter(alter_commands, context, table_lock_holder);
}

View File

@ -16,6 +16,7 @@
#include <Parsers/ASTAlterQuery.h>
#include <Parsers/ASTColumnDeclaration.h>
#include <Parsers/ASTSetQuery.h>
#include <Parsers/ASTCreateQuery.h>
#include <Common/typeid_cast.h>
#include <Compression/CompressionFactory.h>
@ -213,9 +214,7 @@ std::optional<AlterCommand> AlterCommand::parse(const ASTAlterCommand * command_
}
void AlterCommand::apply(ColumnsDescription & columns_description, IndicesDescription & indices_description,
ConstraintsDescription & constraints_description, ASTPtr & order_by_ast, ASTPtr & primary_key_ast,
ASTPtr & ttl_table_ast, SettingsChanges & changes) const
void AlterCommand::apply(StorageInMemoryMetadata & metadata) const
{
if (type == ADD_COLUMN)
{
@ -231,18 +230,18 @@ void AlterCommand::apply(ColumnsDescription & columns_description, IndicesDescri
column.codec = codec;
column.ttl = ttl;
columns_description.add(column, after_column);
metadata.columns.add(column, after_column);
/// Slow, because each time a list is copied
columns_description.flattenNested();
metadata.columns.flattenNested();
}
else if (type == DROP_COLUMN)
{
columns_description.remove(column_name);
metadata.columns.remove(column_name);
}
else if (type == MODIFY_COLUMN)
{
columns_description.modify(column_name, [&](ColumnDescription & column)
metadata.columns.modify(column_name, [&](ColumnDescription & column)
{
if (codec)
{
@ -273,24 +272,24 @@ void AlterCommand::apply(ColumnsDescription & columns_description, IndicesDescri
}
else if (type == MODIFY_ORDER_BY)
{
if (!primary_key_ast && order_by_ast)
if (!metadata.primary_key_ast && metadata.order_by_ast)
{
/// Primary and sorting key become independent after this ALTER so we have to
/// save the old ORDER BY expression as the new primary key.
primary_key_ast = order_by_ast->clone();
metadata.primary_key_ast = metadata.order_by_ast->clone();
}
order_by_ast = order_by;
metadata.order_by_ast = order_by;
}
else if (type == COMMENT_COLUMN)
{
columns_description.modify(column_name, [&](ColumnDescription & column) { column.comment = *comment; });
metadata.columns.modify(column_name, [&](ColumnDescription & column) { column.comment = *comment; });
}
else if (type == ADD_INDEX)
{
if (std::any_of(
indices_description.indices.cbegin(),
indices_description.indices.cend(),
metadata.indices.indices.cbegin(),
metadata.indices.indices.cend(),
[this](const ASTPtr & index_ast)
{
return index_ast->as<ASTIndexDeclaration &>().name == index_name;
@ -303,38 +302,38 @@ void AlterCommand::apply(ColumnsDescription & columns_description, IndicesDescri
ErrorCodes::ILLEGAL_COLUMN};
}
auto insert_it = indices_description.indices.end();
auto insert_it = metadata.indices.indices.end();
if (!after_index_name.empty())
{
insert_it = std::find_if(
indices_description.indices.begin(),
indices_description.indices.end(),
metadata.indices.indices.begin(),
metadata.indices.indices.end(),
[this](const ASTPtr & index_ast)
{
return index_ast->as<ASTIndexDeclaration &>().name == after_index_name;
});
if (insert_it == indices_description.indices.end())
if (insert_it == metadata.indices.indices.end())
throw Exception("Wrong index name. Cannot find index " + backQuote(after_index_name) + " to insert after.",
ErrorCodes::LOGICAL_ERROR);
++insert_it;
}
indices_description.indices.emplace(insert_it, std::dynamic_pointer_cast<ASTIndexDeclaration>(index_decl));
metadata.indices.indices.emplace(insert_it, std::dynamic_pointer_cast<ASTIndexDeclaration>(index_decl));
}
else if (type == DROP_INDEX)
{
auto erase_it = std::find_if(
indices_description.indices.begin(),
indices_description.indices.end(),
metadata.indices.indices.begin(),
metadata.indices.indices.end(),
[this](const ASTPtr & index_ast)
{
return index_ast->as<ASTIndexDeclaration &>().name == index_name;
});
if (erase_it == indices_description.indices.end())
if (erase_it == metadata.indices.indices.end())
{
if (if_exists)
return;
@ -342,13 +341,13 @@ void AlterCommand::apply(ColumnsDescription & columns_description, IndicesDescri
ErrorCodes::LOGICAL_ERROR);
}
indices_description.indices.erase(erase_it);
metadata.indices.indices.erase(erase_it);
}
else if (type == ADD_CONSTRAINT)
{
if (std::any_of(
constraints_description.constraints.cbegin(),
constraints_description.constraints.cend(),
metadata.constraints.constraints.cbegin(),
metadata.constraints.constraints.cend(),
[this](const ASTPtr & constraint_ast)
{
return constraint_ast->as<ASTConstraintDeclaration &>().name == constraint_name;
@ -360,36 +359,45 @@ void AlterCommand::apply(ColumnsDescription & columns_description, IndicesDescri
ErrorCodes::ILLEGAL_COLUMN);
}
auto insert_it = constraints_description.constraints.end();
auto insert_it = metadata.constraints.constraints.end();
constraints_description.constraints.emplace(insert_it, std::dynamic_pointer_cast<ASTConstraintDeclaration>(constraint_decl));
metadata.constraints.constraints.emplace(insert_it, std::dynamic_pointer_cast<ASTConstraintDeclaration>(constraint_decl));
}
else if (type == DROP_CONSTRAINT)
{
auto erase_it = std::find_if(
constraints_description.constraints.begin(),
constraints_description.constraints.end(),
metadata.constraints.constraints.begin(),
metadata.constraints.constraints.end(),
[this](const ASTPtr & constraint_ast)
{
return constraint_ast->as<ASTConstraintDeclaration &>().name == constraint_name;
});
if (erase_it == constraints_description.constraints.end())
if (erase_it == metadata.constraints.constraints.end())
{
if (if_exists)
return;
throw Exception("Wrong constraint name. Cannot find constraint `" + constraint_name + "` to drop.",
ErrorCodes::LOGICAL_ERROR);
}
constraints_description.constraints.erase(erase_it);
metadata.constraints.constraints.erase(erase_it);
}
else if (type == MODIFY_TTL)
{
ttl_table_ast = ttl;
metadata.ttl_for_table_ast = ttl;
}
else if (type == MODIFY_SETTING)
{
changes.insert(changes.end(), settings_changes.begin(), settings_changes.end());
auto & settings_from_storage = metadata.settings_ast->as<ASTSetQuery &>().changes;
for (const auto & change : settings_changes)
{
auto finder = [&change](const SettingChange & c) { return c.name == change.name; };
if (auto it = std::find_if(settings_from_storage.begin(), settings_from_storage.end(), finder);
it != settings_from_storage.end())
it->value = change.value;
else
settings_from_storage.push_back(change);
}
}
else
throw Exception("Wrong parameter type in ALTER query", ErrorCodes::LOGICAL_ERROR);
@ -411,35 +419,72 @@ bool AlterCommand::isSettingsAlter() const
return type == MODIFY_SETTING;
}
void AlterCommands::apply(ColumnsDescription & columns_description, IndicesDescription & indices_description,
ConstraintsDescription & constraints_description, ASTPtr & order_by_ast, ASTPtr & primary_key_ast,
ASTPtr & ttl_table_ast, SettingsChanges & changes) const
bool AlterCommand::isCommentAlter() const
{
auto new_columns_description = columns_description;
auto new_indices_description = indices_description;
auto new_constraints_description = constraints_description;
auto new_order_by_ast = order_by_ast;
auto new_primary_key_ast = primary_key_ast;
auto new_ttl_table_ast = ttl_table_ast;
auto new_changes = changes;
for (const AlterCommand & command : *this)
if (!command.ignore)
command.apply(new_columns_description, new_indices_description, new_constraints_description, new_order_by_ast, new_primary_key_ast, new_ttl_table_ast, new_changes);
columns_description = std::move(new_columns_description);
indices_description = std::move(new_indices_description);
constraints_description = std::move(new_constraints_description);
order_by_ast = std::move(new_order_by_ast);
primary_key_ast = std::move(new_primary_key_ast);
ttl_table_ast = std::move(new_ttl_table_ast);
changes = std::move(new_changes);
if (type == COMMENT_COLUMN)
{
return true;
}
else if (type == MODIFY_COLUMN)
{
return comment.has_value()
&& codec == nullptr
&& data_type == nullptr
&& default_expression == nullptr
&& ttl == nullptr;
}
return false;
}
void AlterCommands::validate(const IStorage & table, const Context & context)
String alterTypeToString(const AlterCommand::Type type)
{
switch (type)
{
case AlterCommand::Type::ADD_COLUMN:
return "ADD COLUMN";
case AlterCommand::Type::ADD_CONSTRAINT:
return "ADD CONSTRAINT";
case AlterCommand::Type::ADD_INDEX:
return "ADD INDEX";
case AlterCommand::Type::COMMENT_COLUMN:
return "COMMENT COLUMN";
case AlterCommand::Type::DROP_COLUMN:
return "DROP COLUMN";
case AlterCommand::Type::DROP_CONSTRAINT:
return "DROP CONSTRAINT";
case AlterCommand::Type::DROP_INDEX:
return "DROP INDEX";
case AlterCommand::Type::MODIFY_COLUMN:
return "MODIFY COLUMN";
case AlterCommand::Type::MODIFY_ORDER_BY:
return "MODIFY ORDER BY";
case AlterCommand::Type::MODIFY_TTL:
return "MODIFY TTL";
case AlterCommand::Type::MODIFY_SETTING:
return "MODIFY SETTING";
}
__builtin_unreachable();
}
void AlterCommands::apply(StorageInMemoryMetadata & metadata) const
{
if (!prepared)
throw DB::Exception("Alter commands is not prepared. Cannot apply. It's a bug", ErrorCodes::LOGICAL_ERROR);
auto metadata_copy = metadata;
for (const AlterCommand & command : *this)
if (!command.ignore)
command.apply(metadata_copy);
metadata = std::move(metadata_copy);
}
void AlterCommands::prepare(const StorageInMemoryMetadata & metadata, const Context & context)
{
/// A temporary object that is used to keep track of the current state of columns after applying a subset of commands.
auto columns = table.getColumns();
auto columns = metadata.columns;
/// Default expressions will be added to this list for type deduction.
auto default_expr_list = std::make_shared<ASTExpressionList>();
@ -461,19 +506,13 @@ void AlterCommands::validate(const IStorage & table, const Context & context)
{
if (command.if_not_exists)
command.ignore = true;
else
throw Exception{"Cannot add column " + column_name + ": column with this name already exists", ErrorCodes::ILLEGAL_COLUMN};
}
}
else if (command.type == AlterCommand::MODIFY_COLUMN)
{
if (!columns.has(column_name))
{
if (command.if_exists)
command.ignore = true;
else
throw Exception{"Wrong column name. Cannot find column " + column_name + " to modify", ErrorCodes::ILLEGAL_COLUMN};
}
if (!command.ignore)
columns.remove(column_name);
@ -513,45 +552,15 @@ void AlterCommands::validate(const IStorage & table, const Context & context)
else if (command.type == AlterCommand::DROP_COLUMN)
{
if (columns.has(command.column_name) || columns.hasNested(command.column_name))
{
for (const ColumnDescription & column : columns)
{
const auto & default_expression = column.default_desc.expression;
if (!default_expression)
continue;
ASTPtr query = default_expression->clone();
auto syntax_result = SyntaxAnalyzer(context).analyze(query, columns.getAll());
const auto actions = ExpressionAnalyzer(query, syntax_result, context).getActions(true);
const auto required_columns = actions->getRequiredColumns();
if (required_columns.end() != std::find(required_columns.begin(), required_columns.end(), command.column_name))
throw Exception(
"Cannot drop column " + command.column_name + ", because column " + column.name +
" depends on it", ErrorCodes::ILLEGAL_COLUMN);
}
columns.remove(command.column_name);
}
else if (command.if_exists)
command.ignore = true;
else
throw Exception("Wrong column name. Cannot find column " + command.column_name + " to drop",
ErrorCodes::ILLEGAL_COLUMN);
}
else if (command.type == AlterCommand::COMMENT_COLUMN)
{
if (!columns.has(command.column_name))
{
if (command.if_exists)
command.ignore = true;
else
throw Exception{"Wrong column name. Cannot find column " + command.column_name + " to comment", ErrorCodes::ILLEGAL_COLUMN};
}
if (!columns.has(command.column_name) && command.if_exists)
command.ignore = true;
}
else if (command.type == AlterCommand::MODIFY_SETTING)
for (const auto & change : command.settings_changes)
table.checkSettingCanBeChanged(change.name);
}
/** Existing defaulted columns may require default expression extensions with a type conversion,
@ -596,10 +605,25 @@ void AlterCommands::validate(const IStorage & table, const Context & context)
{
if (!command)
{
#if !__clang__
# pragma GCC diagnostic push
# pragma GCC diagnostic ignored "-Wmissing-field-initializers"
#endif
/// We completely sure, that we initialize all required fields
AlterCommand aux_command{
.type = AlterCommand::MODIFY_COLUMN,
.column_name = column.name,
.data_type = explicit_type,
.default_kind = column.default_desc.kind,
.default_expression = column.default_desc.expression
};
#if !__clang__
# pragma GCC diagnostic pop
#endif
/// column has no associated alter command, let's create it
/// add a new alter command to modify existing column
this->emplace_back(AlterCommand{AlterCommand::MODIFY_COLUMN,
column.name, explicit_type, column.default_desc.kind, column.default_desc.expression, {}, {}, {}, {}});
this->emplace_back(aux_command);
command = &back();
}
@ -615,62 +639,71 @@ void AlterCommands::validate(const IStorage & table, const Context & context)
command->data_type = block.getByName(column.name).type;
}
}
prepared = true;
}
void AlterCommands::applyForColumnsOnly(ColumnsDescription & columns_description) const
void AlterCommands::validate(const StorageInMemoryMetadata & metadata, const Context & context) const
{
auto out_columns_description = columns_description;
IndicesDescription indices_description;
ConstraintsDescription constraints_description;
ASTPtr out_order_by;
ASTPtr out_primary_key;
ASTPtr out_ttl_table;
SettingsChanges out_changes;
apply(out_columns_description, indices_description, constraints_description,
out_order_by, out_primary_key, out_ttl_table, out_changes);
/// We will save ALTER ADD/MODIFY command indices (only the last for each column) for possible modification
/// (we might need to add deduced types or modify default expressions).
/// Saving indices because we can add new commands later and thus cause vector resize.
std::unordered_map<String, size_t> column_to_command_idx;
if (out_order_by)
throw Exception("Storage doesn't support modifying ORDER BY expression", ErrorCodes::NOT_IMPLEMENTED);
if (out_primary_key)
throw Exception("Storage doesn't support modifying PRIMARY KEY expression", ErrorCodes::NOT_IMPLEMENTED);
if (!indices_description.indices.empty())
throw Exception("Storage doesn't support modifying indices", ErrorCodes::NOT_IMPLEMENTED);
if (!constraints_description.constraints.empty())
throw Exception("Storage doesn't support modifying constraints", ErrorCodes::NOT_IMPLEMENTED);
if (out_ttl_table)
throw Exception("Storage doesn't support modifying TTL expression", ErrorCodes::NOT_IMPLEMENTED);
if (!out_changes.empty())
throw Exception("Storage doesn't support modifying settings", ErrorCodes::NOT_IMPLEMENTED);
for (size_t i = 0; i < size(); ++i)
{
auto & command = (*this)[i];
if (command.type == AlterCommand::ADD_COLUMN || command.type == AlterCommand::MODIFY_COLUMN)
{
const auto & column_name = command.column_name;
if (command.type == AlterCommand::ADD_COLUMN)
{
if (metadata.columns.has(column_name) || metadata.columns.hasNested(column_name))
if (!command.if_not_exists)
throw Exception{"Cannot add column " + column_name + ": column with this name already exists", ErrorCodes::ILLEGAL_COLUMN};
}
else if (command.type == AlterCommand::MODIFY_COLUMN)
{
if (!metadata.columns.has(column_name))
if (!command.if_exists)
throw Exception{"Wrong column name. Cannot find column " + column_name + " to modify", ErrorCodes::ILLEGAL_COLUMN};
}
columns_description = std::move(out_columns_description);
}
}
else if (command.type == AlterCommand::DROP_COLUMN)
{
if (metadata.columns.has(command.column_name) || metadata.columns.hasNested(command.column_name))
{
for (const ColumnDescription & column : metadata.columns)
{
const auto & default_expression = column.default_desc.expression;
if (!default_expression)
continue;
ASTPtr query = default_expression->clone();
auto syntax_result = SyntaxAnalyzer(context).analyze(query, metadata.columns.getAll());
const auto actions = ExpressionAnalyzer(query, syntax_result, context).getActions(true);
const auto required_columns = actions->getRequiredColumns();
void AlterCommands::applyForSettingsOnly(SettingsChanges & changes) const
{
ColumnsDescription out_columns_description;
IndicesDescription indices_description;
ConstraintsDescription constraints_description;
ASTPtr out_order_by;
ASTPtr out_primary_key;
ASTPtr out_ttl_table;
SettingsChanges out_changes;
apply(out_columns_description, indices_description, constraints_description, out_order_by,
out_primary_key, out_ttl_table, out_changes);
if (out_columns_description.begin() != out_columns_description.end())
throw Exception("Alter modifying columns, but only settings change applied.", ErrorCodes::LOGICAL_ERROR);
if (out_order_by)
throw Exception("Alter modifying ORDER BY expression, but only settings change applied.", ErrorCodes::LOGICAL_ERROR);
if (out_primary_key)
throw Exception("Alter modifying PRIMARY KEY expression, but only settings change applied.", ErrorCodes::LOGICAL_ERROR);
if (!indices_description.indices.empty())
throw Exception("Alter modifying indices, but only settings change applied.", ErrorCodes::NOT_IMPLEMENTED);
if (out_ttl_table)
throw Exception("Alter modifying TTL, but only settings change applied.", ErrorCodes::NOT_IMPLEMENTED);
changes = std::move(out_changes);
if (required_columns.end() != std::find(required_columns.begin(), required_columns.end(), command.column_name))
throw Exception(
"Cannot drop column " + command.column_name + ", because column " + column.name +
" depends on it", ErrorCodes::ILLEGAL_COLUMN);
}
}
else if (!command.if_exists)
throw Exception("Wrong column name. Cannot find column " + command.column_name + " to drop",
ErrorCodes::ILLEGAL_COLUMN);
}
else if (command.type == AlterCommand::COMMENT_COLUMN)
{
if (!metadata.columns.has(command.column_name))
{
if (!command.if_exists)
throw Exception{"Wrong column name. Cannot find column " + command.column_name + " to comment", ErrorCodes::ILLEGAL_COLUMN};
}
}
}
}
bool AlterCommands::isModifyingData() const
@ -688,4 +721,9 @@ bool AlterCommands::isSettingsAlter() const
{
return std::all_of(begin(), end(), [](const AlterCommand & c) { return c.isSettingsAlter(); });
}
bool AlterCommands::isCommentAlter() const
{
return std::all_of(begin(), end(), [](const AlterCommand & c) { return c.isCommentAlter(); });
}
}

View File

@ -2,10 +2,9 @@
#include <optional>
#include <Core/NamesAndTypes.h>
#include <Storages/ColumnsDescription.h>
#include <Storages/IStorage_fwd.h>
#include <Storages/IndicesDescription.h>
#include <Storages/ConstraintsDescription.h>
#include <Storages/StorageInMemoryMetadata.h>
#include <Common/SettingsChanges.h>
@ -31,11 +30,10 @@ struct AlterCommand
ADD_CONSTRAINT,
DROP_CONSTRAINT,
MODIFY_TTL,
UKNOWN_TYPE,
MODIFY_SETTING,
};
Type type = UKNOWN_TYPE;
Type type;
String column_name;
@ -43,10 +41,12 @@ struct AlterCommand
String partition_name;
/// For ADD and MODIFY, a new column type.
DataTypePtr data_type;
DataTypePtr data_type = nullptr;
ColumnDefaultKind default_kind{};
ASTPtr default_expression{};
/// For COMMENT column
std::optional<String> comment;
/// For ADD - after which column to add a new one. If an empty string, add to the end. To add to the beginning now it is impossible.
@ -59,48 +59,36 @@ struct AlterCommand
bool if_not_exists = false;
/// For MODIFY_ORDER_BY
ASTPtr order_by;
ASTPtr order_by = nullptr;
/// For ADD INDEX
ASTPtr index_decl;
ASTPtr index_decl = nullptr;
String after_index_name;
/// For ADD/DROP INDEX
String index_name;
// For ADD CONSTRAINT
ASTPtr constraint_decl;
ASTPtr constraint_decl = nullptr;
// For ADD/DROP CONSTRAINT
String constraint_name;
/// For MODIFY TTL
ASTPtr ttl;
ASTPtr ttl = nullptr;
/// indicates that this command should not be applied, for example in case of if_exists=true and column doesn't exist.
bool ignore = false;
/// For ADD and MODIFY
CompressionCodecPtr codec;
CompressionCodecPtr codec = nullptr;
/// For MODIFY SETTING
SettingsChanges settings_changes;
AlterCommand() = default;
AlterCommand(const Type type_, const String & column_name_, const DataTypePtr & data_type_,
const ColumnDefaultKind default_kind_, const ASTPtr & default_expression_,
const String & after_column_, const String & comment_,
const bool if_exists_, const bool if_not_exists_)
: type{type_}, column_name{column_name_}, data_type{data_type_}, default_kind{default_kind_},
default_expression{default_expression_}, comment(comment_), after_column{after_column_},
if_exists(if_exists_), if_not_exists(if_not_exists_)
{}
static std::optional<AlterCommand> parse(const ASTAlterCommand * command);
void apply(ColumnsDescription & columns_description, IndicesDescription & indices_description,
ConstraintsDescription & constraints_description, ASTPtr & order_by_ast,
ASTPtr & primary_key_ast, ASTPtr & ttl_table_ast, SettingsChanges & changes) const;
void apply(StorageInMemoryMetadata & metadata) const;
/// Checks that alter query changes data. For MergeTree:
/// * column files (data and marks)
@ -110,25 +98,31 @@ struct AlterCommand
/// checks that only settings changed by alter
bool isSettingsAlter() const;
bool isCommentAlter() const;
};
String alterTypeToString(const AlterCommand::Type type);
class Context;
class AlterCommands : public std::vector<AlterCommand>
{
private:
bool prepared = false;
public:
/// Used for primitive table engines, where only columns metadata can be changed
void applyForColumnsOnly(ColumnsDescription & columns_description) const;
void apply(ColumnsDescription & columns_description, IndicesDescription & indices_description,
ConstraintsDescription & constraints_description, ASTPtr & order_by_ast, ASTPtr & primary_key_ast,
ASTPtr & ttl_table_ast, SettingsChanges & changes) const;
void apply(StorageInMemoryMetadata & metadata) const;
/// Apply alter commands only for settings. Exception will be thrown if any other part of table structure will be modified.
void applyForSettingsOnly(SettingsChanges & changes) const;
void validate(const IStorage & table, const Context & context);
void prepare(const StorageInMemoryMetadata & metadata, const Context & context);
void validate(const StorageInMemoryMetadata & metadata, const Context & context) const;
bool isModifyingData() const;
bool isSettingsAlter() const;
bool isCommentAlter() const;
};
}

View File

@ -0,0 +1,76 @@
#pragma once
#include <Core/Types.h>
#include <optional>
#include <Parsers/IAST.h>
#include <Common/SettingsChanges.h>
namespace DB
{
class AlterMetadataCommand
{
enum Type
{
COMMENT_COLUMN,
MODIFY_ORDER_BY,
ADD_INDEX,
ADD_CONSTRAINT,
DROP_CONSTRAINT,
MODIFY_TTL,
MODIFY_SETTING,
};
Type type;
String column_name;
std::optional<String> comment;
/// For DROP_COLUMN, MODIFY_COLUMN, COMMENT_COLUMN
bool if_exists = false;
/// For MODIFY_ORDER_BY
ASTPtr order_by;
/// For ADD INDEX
ASTPtr index_decl;
String after_index_name;
/// For ADD/DROP INDEX
String index_name;
// For ADD CONSTRAINT
ASTPtr constraint_decl;
// For ADD/DROP CONSTRAINT
String constraint_name;
/// For MODIFY TTL
ASTPtr ttl;
/// indicates that this command should not be applied, for example in case of if_exists=true and column doesn't exist.
bool ignore = false;
/// For MODIFY SETTING
SettingsChanges settings_changes;
};
class AlterMetadataCommands : public std::vector<AlterMetadataCommand>
{
public:
void apply(
IndicesDescription & indices_description,
ConstraintsDescription & constraints_description,
ASTPtr & order_by_ast,
ASTPtr & primary_key_ast,
ASTPtr & ttl_table_ast,
SettingsChanges & changes) const;
/// Apply alter commands only for settings. Exception will be thrown if any other part of table structure will be modified.
void applyForSettings(SettingsChanges & changes) const;
void validate(const IStorage & table, const Context & context);
bool isModifyingData() const;
bool isSettingsAlter() const;
};
}

View File

@ -27,6 +27,7 @@ namespace ErrorCodes
extern const int SETTINGS_ARE_NOT_SUPPORTED;
extern const int UNKNOWN_SETTING;
extern const int TABLE_IS_DROPPED;
extern const int NOT_IMPLEMENTED;
}
IStorage::IStorage(ColumnsDescription virtuals_) : virtuals(std::move(virtuals_))
@ -313,12 +314,6 @@ bool IStorage::isVirtualColumn(const String & column_name) const
return getColumns().get(column_name).is_virtual;
}
void IStorage::checkSettingCanBeChanged(const String & /* setting_name */) const
{
if (!supportsSettings())
throw Exception("Storage '" + getName() + "' doesn't support settings.", ErrorCodes::SETTINGS_ARE_NOT_SUPPORTED);
}
TableStructureReadLockHolder IStorage::lockStructureForShare(bool will_add_new_data, const String & query_id)
{
TableStructureReadLockHolder result;
@ -373,57 +368,40 @@ TableStructureWriteLockHolder IStorage::lockExclusively(const String & query_id)
return result;
}
IDatabase::ASTModifier IStorage::getSettingsModifier(const SettingsChanges & new_changes) const
StorageInMemoryMetadata IStorage::getInMemoryMetadata() const
{
return [&] (IAST & ast)
return
{
if (!new_changes.empty())
{
auto & storage_changes = ast.as<ASTStorage &>().settings->changes;
/// Make storage settings unique
for (const auto & change : new_changes)
{
checkSettingCanBeChanged(change.name);
auto finder = [&change] (const SettingChange & c) { return c.name == change.name; };
if (auto it = std::find_if(storage_changes.begin(), storage_changes.end(), finder); it != storage_changes.end())
it->value = change.value;
else
storage_changes.push_back(change);
}
}
.columns = getColumns(),
.indices = getIndices(),
.constraints = getConstraints(),
};
}
void IStorage::alter(
const AlterCommands & params,
const Context & context,
TableStructureWriteLockHolder & table_lock_holder)
TableStructureWriteLockHolder & /*table_lock_holder*/)
{
if (params.isModifyingData())
throw Exception("Method alter supports only change comment of column for storage " + getName(), ErrorCodes::NOT_IMPLEMENTED);
checkAlterIsPossible(params, context.getSettingsRef());
const String database_name = getDatabaseName();
const String table_name = getTableName();
StorageInMemoryMetadata metadata = getInMemoryMetadata();
params.apply(metadata);
context.getDatabase(database_name)->alterTable(context, table_name, metadata);
setColumns(std::move(metadata.columns));
}
if (params.isSettingsAlter())
void IStorage::checkAlterIsPossible(const AlterCommands & commands, const Settings & /* settings */)
{
for (const auto & command : commands)
{
SettingsChanges new_changes;
params.applyForSettingsOnly(new_changes);
IDatabase::ASTModifier settings_modifier = getSettingsModifier(new_changes);
context.getDatabase(database_name)->alterTable(context, table_name, getColumns(), getIndices(), getConstraints(), settings_modifier);
}
else
{
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
auto new_columns = getColumns();
auto new_indices = getIndices();
auto new_constraints = getConstraints();
params.applyForColumnsOnly(new_columns);
context.getDatabase(database_name)->alterTable(context, table_name, new_columns, new_indices, new_constraints, {});
setColumns(std::move(new_columns));
if (!command.isCommentAlter())
throw Exception(
"Alter of type '" + alterTypeToString(command.type) + "' is not supported by storage " + getName(),
ErrorCodes::NOT_IMPLEMENTED);
}
}

View File

@ -13,6 +13,7 @@
#include <Storages/ColumnsDescription.h>
#include <Storages/IndicesDescription.h>
#include <Storages/ConstraintsDescription.h>
#include <Storages/StorageInMemoryMetadata.h>
#include <Common/ActionLock.h>
#include <Common/Exception.h>
#include <Common/RWLock.h>
@ -127,6 +128,10 @@ public: /// thread-unsafe part. lockStructure must be acquired
const ConstraintsDescription & getConstraints() const;
void setConstraints(ConstraintsDescription constraints_);
/// Returns storage metadata copy. Direct modification of
/// result structure doesn't affect storage.
virtual StorageInMemoryMetadata getInMemoryMetadata() const;
/// NOTE: these methods should include virtual columns,
/// but should NOT include ALIAS columns (they are treated separately).
virtual NameAndTypePair getColumn(const String & column_name) const;
@ -152,9 +157,6 @@ public: /// thread-unsafe part. lockStructure must be acquired
/// If |need_all| is set, then checks that all the columns of the table are in the block.
void check(const Block & block, bool need_all = false) const;
/// Check storage has setting and setting can be modified.
virtual void checkSettingCanBeChanged(const String & setting_name) const;
protected: /// still thread-unsafe part.
void setIndices(IndicesDescription indices_);
@ -162,8 +164,6 @@ protected: /// still thread-unsafe part.
/// Initially reserved virtual column name may be shadowed by real column.
virtual bool isVirtualColumn(const String & column_name) const;
/// Returns modifier of settings in storage definition
IDatabase::ASTModifier getSettingsModifier(const SettingsChanges & new_changes) const;
private:
ColumnsDescription columns; /// combined real and virtual columns
@ -316,6 +316,11 @@ public:
*/
virtual void alter(const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder);
/** Checks that alter commands can be applied to storage. For example, columns can be modified,
* or primary key can be changes, etc.
*/
virtual void checkAlterIsPossible(const AlterCommands & commands, const Settings & settings);
/** ALTER tables with regard to its partitions.
* Should handle locks for each command on its own.
*/

View File

@ -418,15 +418,6 @@ bool StorageKafka::streamToViews()
return limits_applied;
}
void StorageKafka::checkSettingCanBeChanged(const String & setting_name) const
{
if (KafkaSettings::findIndex(setting_name) == KafkaSettings::npos)
throw Exception{"Storage '" + getName() + "' doesn't have setting '" + setting_name + "'", ErrorCodes::UNKNOWN_SETTING};
throw Exception{"Setting '" + setting_name + "' is readonly for storage '" + getName() + "'", ErrorCodes::READONLY_SETTING};
}
void registerStorageKafka(StorageFactory & factory)
{
factory.registerStorage("Kafka", [](const StorageFactory::Arguments & args)

View File

@ -64,8 +64,6 @@ public:
const auto & getSchemaName() const { return schema_name; }
const auto & skipBroken() const { return skip_broken; }
void checkSettingCanBeChanged(const String & setting_name) const override;
protected:
StorageKafka(
const std::string & table_name_,

View File

@ -114,16 +114,9 @@ MergeTreeData::MergeTreeData(
const String & database_,
const String & table_,
const String & relative_data_path_,
const ColumnsDescription & columns_,
const IndicesDescription & indices_,
const ConstraintsDescription & constraints_,
const StorageInMemoryMetadata & metadata,
Context & context_,
const String & date_column_name,
const ASTPtr & partition_by_ast_,
const ASTPtr & order_by_ast_,
const ASTPtr & primary_key_ast_,
const ASTPtr & sample_by_ast_,
const ASTPtr & ttl_table_ast_,
const MergingParams & merging_params_,
std::unique_ptr<MergeTreeSettings> storage_settings_,
bool require_part_metadata_,
@ -131,8 +124,9 @@ MergeTreeData::MergeTreeData(
BrokenPartCallback broken_part_callback_)
: global_context(context_)
, merging_params(merging_params_)
, partition_by_ast(partition_by_ast_)
, sample_by_ast(sample_by_ast_)
, partition_by_ast(metadata.partition_by_ast)
, sample_by_ast(metadata.sample_by_ast)
, settings_ast(metadata.settings_ast)
, require_part_metadata(require_part_metadata_)
, database_name(database_)
, table_name(table_)
@ -147,7 +141,7 @@ MergeTreeData::MergeTreeData(
, parts_mover(this)
{
const auto settings = getSettings();
setProperties(order_by_ast_, primary_key_ast_, columns_, indices_, constraints_);
setProperties(metadata);
/// NOTE: using the same columns list as is read when performing actual merges.
merging_params.check(getColumns().getAllPhysical());
@ -189,7 +183,7 @@ MergeTreeData::MergeTreeData(
min_format_version = MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING;
}
setTTLExpressions(columns_.getColumnTTLs(), ttl_table_ast_);
setTTLExpressions(metadata.columns.getColumnTTLs(), metadata.ttl_for_table_ast);
// format_file always contained on any data path
String version_file_path;
@ -244,6 +238,35 @@ MergeTreeData::MergeTreeData(
}
StorageInMemoryMetadata MergeTreeData::getInMemoryMetadata() const
{
StorageInMemoryMetadata metadata{
.columns = getColumns(),
.indices = getIndices(),
.constraints = getConstraints(),
};
if (partition_by_ast)
metadata.partition_by_ast = partition_by_ast->clone();
if (order_by_ast)
metadata.order_by_ast = order_by_ast->clone();
if (primary_key_ast)
metadata.primary_key_ast = primary_key_ast->clone();
if (ttl_table_ast)
metadata.ttl_for_table_ast = ttl_table_ast->clone();
if (sample_by_ast)
metadata.sample_by_ast = sample_by_ast->clone();
if (settings_ast)
metadata.settings_ast = settings_ast->clone();
return metadata;
}
static void checkKeyExpression(const ExpressionActions & expr, const Block & sample_block, const String & key_name)
{
for (const ExpressionAction & action : expr.getActions())
@ -272,18 +295,14 @@ static void checkKeyExpression(const ExpressionActions & expr, const Block & sam
}
}
void MergeTreeData::setProperties(
const ASTPtr & new_order_by_ast, const ASTPtr & new_primary_key_ast,
const ColumnsDescription & new_columns, const IndicesDescription & indices_description,
const ConstraintsDescription & constraints_description, bool only_check)
void MergeTreeData::setProperties(const StorageInMemoryMetadata & metadata, bool only_check)
{
if (!new_order_by_ast)
if (!metadata.order_by_ast)
throw Exception("ORDER BY cannot be empty", ErrorCodes::BAD_ARGUMENTS);
ASTPtr new_sorting_key_expr_list = extractKeyExpressionList(new_order_by_ast);
ASTPtr new_primary_key_expr_list = new_primary_key_ast
? extractKeyExpressionList(new_primary_key_ast) : new_sorting_key_expr_list->clone();
ASTPtr new_sorting_key_expr_list = extractKeyExpressionList(metadata.order_by_ast);
ASTPtr new_primary_key_expr_list = metadata.primary_key_ast
? extractKeyExpressionList(metadata.primary_key_ast) : new_sorting_key_expr_list->clone();
if (merging_params.mode == MergeTreeData::MergingParams::VersionedCollapsing)
new_sorting_key_expr_list->children.push_back(std::make_shared<ASTIdentifier>(merging_params.version_column));
@ -315,8 +334,9 @@ void MergeTreeData::setProperties(
}
}
auto all_columns = new_columns.getAllPhysical();
auto all_columns = metadata.columns.getAllPhysical();
/// Order by check AST
if (order_by_ast && only_check)
{
/// This is ALTER, not CREATE/ATTACH TABLE. Let us check that all new columns used in the sorting key
@ -352,7 +372,7 @@ void MergeTreeData::setProperties(
"added to the sorting key. You can add expressions that use only the newly added columns",
ErrorCodes::BAD_ARGUMENTS);
if (new_columns.getDefaults().count(col))
if (metadata.columns.getDefaults().count(col))
throw Exception("Newly added column " + col + " has a default expression, so adding "
"expressions that use it to the sorting key is forbidden",
ErrorCodes::BAD_ARGUMENTS);
@ -387,11 +407,11 @@ void MergeTreeData::setProperties(
MergeTreeIndices new_indices;
if (!indices_description.indices.empty())
if (!metadata.indices.indices.empty())
{
std::set<String> indices_names;
for (const auto & index_ast : indices_description.indices)
for (const auto & index_ast : metadata.indices.indices)
{
const auto & index_decl = std::dynamic_pointer_cast<ASTIndexDeclaration>(index_ast);
@ -428,24 +448,24 @@ void MergeTreeData::setProperties(
if (!only_check)
{
setColumns(std::move(new_columns));
setColumns(std::move(metadata.columns));
order_by_ast = new_order_by_ast;
order_by_ast = metadata.order_by_ast;
sorting_key_columns = std::move(new_sorting_key_columns);
sorting_key_expr_ast = std::move(new_sorting_key_expr_list);
sorting_key_expr = std::move(new_sorting_key_expr);
primary_key_ast = new_primary_key_ast;
primary_key_ast = metadata.primary_key_ast;
primary_key_columns = std::move(new_primary_key_columns);
primary_key_expr_ast = std::move(new_primary_key_expr_list);
primary_key_expr = std::move(new_primary_key_expr);
primary_key_sample = std::move(new_primary_key_sample);
primary_key_data_types = std::move(new_primary_key_data_types);
setIndices(indices_description);
setIndices(metadata.indices);
skip_indices = std::move(new_indices);
setConstraints(constraints_description);
setConstraints(metadata.constraints);
primary_key_and_skip_indices_expr = new_indices_with_primary_key_expr;
sorting_key_and_skip_indices_expr = new_indices_with_sorting_key_expr;
@ -1357,17 +1377,15 @@ bool isMetadataOnlyConversion(const IDataType * from, const IDataType * to)
}
void MergeTreeData::checkAlter(const AlterCommands & commands, const Context & /*context*/)
void MergeTreeData::checkAlterIsPossible(const AlterCommands & commands, const Settings & settings)
{
/// Check that needed transformations can be applied to the list of columns without considering type conversions.
auto new_columns = getColumns();
auto new_indices = getIndices();
auto new_constraints = getConstraints();
ASTPtr new_order_by_ast = order_by_ast;
ASTPtr new_primary_key_ast = primary_key_ast;
ASTPtr new_ttl_table_ast = ttl_table_ast;
SettingsChanges new_changes;
commands.apply(new_columns, new_indices, new_constraints, new_order_by_ast, new_primary_key_ast, new_ttl_table_ast, new_changes);
StorageInMemoryMetadata metadata = getInMemoryMetadata();
commands.apply(metadata);
if (getIndices().empty() && !metadata.indices.empty() &&
!settings.allow_experimental_data_skipping_indices)
throw Exception("You must set the setting `allow_experimental_data_skipping_indices` to 1 " \
"before using data skipping indices.", ErrorCodes::BAD_ARGUMENTS);
/// Set of columns that shouldn't be altered.
NameSet columns_alter_type_forbidden;
@ -1436,13 +1454,32 @@ void MergeTreeData::checkAlter(const AlterCommands & commands, const Context & /
}
}
setProperties(new_order_by_ast, new_primary_key_ast,
new_columns, new_indices, new_constraints, /* only_check = */ true);
setProperties(metadata, /* only_check = */ true);
setTTLExpressions(new_columns.getColumnTTLs(), new_ttl_table_ast, /* only_check = */ true);
setTTLExpressions(metadata.columns.getColumnTTLs(), metadata.ttl_for_table_ast, /* only_check = */ true);
for (const auto & setting : new_changes)
checkSettingCanBeChanged(setting.name);
if (settings_ast)
{
const auto & current_changes = settings_ast->as<const ASTSetQuery &>().changes;
for (const auto & changed_setting : metadata.settings_ast->as<const ASTSetQuery &>().changes)
{
if (MergeTreeSettings::findIndex(changed_setting.name) == MergeTreeSettings::npos)
throw Exception{"Storage '" + getName() + "' doesn't have setting '" + changed_setting.name + "'",
ErrorCodes::UNKNOWN_SETTING};
auto comparator = [&changed_setting](const auto & change) { return change.name == changed_setting.name; };
auto current_setting_it
= std::find_if(current_changes.begin(), current_changes.end(), comparator);
if ((current_setting_it == current_changes.end() || *current_setting_it != changed_setting)
&& MergeTreeSettings::isReadonlySetting(changed_setting.name))
{
throw Exception{"Setting '" + changed_setting.name + "' is readonly for storage '" + getName() + "'",
ErrorCodes::READONLY_SETTING};
}
}
}
if (commands.isModifyingData())
{
@ -1450,8 +1487,8 @@ void MergeTreeData::checkAlter(const AlterCommands & commands, const Context & /
ExpressionActionsPtr unused_expression;
NameToNameMap unused_map;
bool unused_bool;
createConvertExpression(nullptr, getColumns().getAllPhysical(), new_columns.getAllPhysical(),
getIndices().indices, new_indices.indices, unused_expression, unused_map, unused_bool);
createConvertExpression(nullptr, getColumns().getAllPhysical(), metadata.columns.getAllPhysical(),
getIndices().indices, metadata.indices.indices, unused_expression, unused_map, unused_bool);
}
}
@ -1767,26 +1804,19 @@ void MergeTreeData::alterDataPart(
}
void MergeTreeData::changeSettings(
const SettingsChanges & new_changes,
const ASTPtr & new_settings,
TableStructureWriteLockHolder & /* table_lock_holder */)
{
if (!new_changes.empty())
if (new_settings)
{
const auto & new_changes = new_settings->as<const ASTSetQuery &>().changes;
MergeTreeSettings copy = *getSettings();
copy.applyChanges(new_changes);
storage_settings.set(std::make_unique<const MergeTreeSettings>(copy));
settings_ast = new_settings;
}
}
void MergeTreeData::checkSettingCanBeChanged(const String & setting_name) const
{
if (MergeTreeSettings::findIndex(setting_name) == MergeTreeSettings::npos)
throw Exception{"Storage '" + getName() + "' doesn't have setting '" + setting_name + "'", ErrorCodes::UNKNOWN_SETTING};
if (MergeTreeSettings::isReadonlySetting(setting_name))
throw Exception{"Setting '" + setting_name + "' is readonly for storage '" + getName() + "'", ErrorCodes::READONLY_SETTING};
}
void MergeTreeData::removeEmptyColumnsFromPart(MergeTreeData::MutableDataPartPtr & data_part)
{
auto & empty_columns = data_part->empty_columns;

View File

@ -332,22 +332,17 @@ public:
/// attach - whether the existing table is attached or the new table is created.
MergeTreeData(const String & database_, const String & table_,
const String & relative_data_path_,
const ColumnsDescription & columns_,
const IndicesDescription & indices_,
const ConstraintsDescription & constraints_,
const StorageInMemoryMetadata & metadata,
Context & context_,
const String & date_column_name,
const ASTPtr & partition_by_ast_,
const ASTPtr & order_by_ast_,
const ASTPtr & primary_key_ast_,
const ASTPtr & sample_by_ast_, /// nullptr, if sampling is not supported.
const ASTPtr & ttl_table_ast_,
const MergingParams & merging_params_,
std::unique_ptr<MergeTreeSettings> settings_,
bool require_part_metadata_,
bool attach,
BrokenPartCallback broken_part_callback_ = [](const String &){});
StorageInMemoryMetadata getInMemoryMetadata() const override;
ASTPtr getPartitionKeyAST() const override { return partition_by_ast; }
ASTPtr getSortingKeyAST() const override { return sorting_key_expr_ast; }
ASTPtr getPrimaryKeyAST() const override { return primary_key_expr_ast; }
@ -545,7 +540,7 @@ public:
/// - all type conversions can be done.
/// - columns corresponding to primary key, indices, sign, sampling expression and date are not affected.
/// If something is wrong, throws an exception.
void checkAlter(const AlterCommands & commands, const Context & context);
void checkAlterIsPossible(const AlterCommands & commands, const Settings & settings) override;
/// Performs ALTER of the data part, writes the result to temporary files.
/// Returns an object allowing to rename temporary files to permanent files.
@ -559,12 +554,9 @@ public:
/// Change MergeTreeSettings
void changeSettings(
const SettingsChanges & new_changes,
const ASTPtr & new_changes,
TableStructureWriteLockHolder & table_lock_holder);
/// All MergeTreeData children have settings.
void checkSettingCanBeChanged(const String & setting_name) const override;
/// Remove columns, that have been marked as empty after zeroing values with expired ttl
void removeEmptyColumnsFromPart(MergeTreeData::MutableDataPartPtr & data_part);
@ -787,6 +779,7 @@ protected:
ASTPtr primary_key_ast;
ASTPtr sample_by_ast;
ASTPtr ttl_table_ast;
ASTPtr settings_ast;
bool require_part_metadata;
@ -899,10 +892,7 @@ protected:
std::mutex clear_old_temporary_directories_mutex;
/// Mutex for settings usage
void setProperties(const ASTPtr & new_order_by_ast, const ASTPtr & new_primary_key_ast,
const ColumnsDescription & new_columns,
const IndicesDescription & indices_description,
const ConstraintsDescription & constraints_description, bool only_check = false);
void setProperties(const StorageInMemoryMetadata & metadata, bool only_check = false);
void initPartitionKey();

View File

@ -34,6 +34,12 @@ ReplicatedMergeTreeTableMetadata::ReplicatedMergeTreeTableMetadata(const MergeTr
merging_params_mode = static_cast<int>(data.merging_params.mode);
sign_column = data.merging_params.sign_column;
/// This code may looks strange, but previously we had only one entity: PRIMARY KEY (or ORDER BY, it doesn't matter)
/// Now we have two different entities ORDER BY and it's optional prefix -- PRIMARY KEY.
/// In most cases user doesn't specify PRIMARY KEY and semantically it's equal to ORDER BY.
/// So rules in zookeeper metadata is following:
/// - When we have only ORDER BY, than store it in "primary key:" row of /metadata
/// - When we have both, than store PRIMARY KEY in "primary key:" row and ORDER BY in "sorting key:" row of /metadata
if (!data.primary_key_ast)
primary_key = formattedAST(MergeTreeData::extractKeyExpressionList(data.order_by_ast));
else

View File

@ -12,6 +12,7 @@
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/parseAggregateFunctionParameters.h>
@ -573,6 +574,7 @@ static StoragePtr create(const StorageFactory::Arguments & args)
ASTPtr primary_key_ast;
ASTPtr sample_by_ast;
ASTPtr ttl_table_ast;
ASTPtr settings_ast;
IndicesDescription indices_description;
ConstraintsDescription constraints_description;
@ -599,12 +601,16 @@ static StoragePtr create(const StorageFactory::Arguments & args)
if (args.storage_def->ttl_table)
ttl_table_ast = args.storage_def->ttl_table->ptr();
if (args.query.columns_list && args.query.columns_list->indices)
for (const auto & index : args.query.columns_list->indices->children)
indices_description.indices.push_back(
std::dynamic_pointer_cast<ASTIndexDeclaration>(index->clone()));
storage_settings->loadFromQuery(*args.storage_def);
if (args.storage_def->settings)
settings_ast = args.storage_def->settings->ptr();
}
else
{
@ -633,18 +639,30 @@ static StoragePtr create(const StorageFactory::Arguments & args)
ErrorCodes::BAD_ARGUMENTS);
}
if (!args.attach && !indices_description.empty() && !args.local_context.getSettingsRef().allow_experimental_data_skipping_indices)
throw Exception("You must set the setting `allow_experimental_data_skipping_indices` to 1 " \
"before using data skipping indices.", ErrorCodes::BAD_ARGUMENTS);
StorageInMemoryMetadata metadata{
.columns = args.columns,
.indices = indices_description,
.constraints = args.constraints,
.partition_by_ast = partition_by_ast,
.order_by_ast = order_by_ast,
.primary_key_ast = primary_key_ast,
.ttl_for_table_ast = ttl_table_ast,
.sample_by_ast = sample_by_ast,
.settings_ast = settings_ast,
};
if (replicated)
return StorageReplicatedMergeTree::create(
zookeeper_path, replica_name, args.attach, args.database_name, args.table_name, args.relative_data_path,
args.columns, indices_description, args.constraints,
args.context, date_column_name, partition_by_ast, order_by_ast, primary_key_ast,
sample_by_ast, ttl_table_ast, merging_params, std::move(storage_settings),
metadata, args.context, date_column_name, merging_params, std::move(storage_settings),
args.has_force_restore_data_flag);
else
return StorageMergeTree::create(
args.database_name, args.table_name, args.relative_data_path, args.columns, indices_description,
args.constraints, args.attach, args.context, date_column_name, partition_by_ast, order_by_ast,
primary_key_ast, sample_by_ast, ttl_table_ast, merging_params, std::move(storage_settings),
args.database_name, args.table_name, args.relative_data_path, metadata, args.attach, args.context,
date_column_name, merging_params, std::move(storage_settings),
args.has_force_restore_data_flag);
}

View File

@ -699,23 +699,35 @@ void StorageBuffer::flushThread()
} while (!shutdown_event.tryWait(1000));
}
void StorageBuffer::checkAlterIsPossible(const AlterCommands & commands, const Settings & /* settings */)
{
for (const auto & command : commands)
{
if (command.type != AlterCommand::Type::ADD_COLUMN && command.type != AlterCommand::Type::MODIFY_COLUMN
&& command.type != AlterCommand::Type::DROP_COLUMN && command.type != AlterCommand::Type::COMMENT_COLUMN)
throw Exception(
"Alter of type '" + alterTypeToString(command.type) + "' is not supported by storage " + getName(),
ErrorCodes::NOT_IMPLEMENTED);
}
}
void StorageBuffer::alter(const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder)
{
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
checkAlterIsPossible(params, context.getSettingsRef());
const String database_name_ = getDatabaseName();
const String table_name_ = getTableName();
/// So that no blocks of the old structure remain.
optimize({} /*query*/, {} /*partition_id*/, false /*final*/, false /*deduplicate*/, context);
auto new_columns = getColumns();
auto new_indices = getIndices();
auto new_constraints = getConstraints();
params.applyForColumnsOnly(new_columns);
context.getDatabase(database_name_)->alterTable(context, table_name_, new_columns, new_indices, new_constraints, {});
setColumns(std::move(new_columns));
StorageInMemoryMetadata metadata = getInMemoryMetadata();
params.apply(metadata);
context.getDatabase(database_name_)->alterTable(context, table_name_, metadata);
setColumns(std::move(metadata.columns));
}

View File

@ -94,9 +94,10 @@ public:
bool mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context & query_context) const override;
/// The structure of the subordinate table is not checked and does not change.
void alter(
const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder) override;
void checkAlterIsPossible(const AlterCommands & commands, const Settings & /* settings */) override;
/// The structure of the subordinate table is not checked and does not change.
void alter(const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder) override;
~StorageBuffer() override;

View File

@ -393,20 +393,33 @@ BlockOutputStreamPtr StorageDistributed::write(const ASTPtr &, const Context & c
}
void StorageDistributed::alter(
const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder)
void StorageDistributed::checkAlterIsPossible(const AlterCommands & commands, const Settings & /* settings */)
{
for (const auto & command : commands)
{
if (command.type != AlterCommand::Type::ADD_COLUMN
&& command.type != AlterCommand::Type::MODIFY_COLUMN
&& command.type != AlterCommand::Type::DROP_COLUMN
&& command.type != AlterCommand::Type::COMMENT_COLUMN)
throw Exception("Alter of type '" + alterTypeToString(command.type) + "' is not supported by storage " + getName(),
ErrorCodes::NOT_IMPLEMENTED);
}
}
void StorageDistributed::alter(const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder)
{
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
checkAlterIsPossible(params, context.getSettingsRef());
const String current_database_name = getDatabaseName();
const String current_table_name = getTableName();
auto new_columns = getColumns();
auto new_indices = getIndices();
auto new_constraints = getConstraints();
params.applyForColumnsOnly(new_columns);
context.getDatabase(current_database_name)->alterTable(context, current_table_name, new_columns, new_indices, new_constraints, {});
setColumns(std::move(new_columns));
StorageInMemoryMetadata metadata = getInMemoryMetadata();
params.apply(metadata);
context.getDatabase(current_database_name)->alterTable(context, current_table_name, metadata);
setColumns(std::move(metadata.columns));
}

View File

@ -84,10 +84,12 @@ public:
void rename(const String & new_path_to_table_data, const String & new_database_name, const String & new_table_name, TableStructureWriteLockHolder &) override;
void checkAlterIsPossible(const AlterCommands & commands, const Settings & /* settings */) override;
/// in the sub-tables, you need to manually add and delete columns
/// the structure of the sub-table is not checked
void alter(
const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder) override;
void alter(const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder) override;
void startup() override;
void shutdown() override;

View File

@ -0,0 +1,38 @@
#pragma once
#include <Storages/ColumnsDescription.h>
#include <Storages/IndicesDescription.h>
#include <Storages/ConstraintsDescription.h>
#include <Parsers/IAST_fwd.h>
namespace DB
{
/// Structure represent table metadata stored in memory.
/// Only one storage engine support all fields -- MergeTree.
/// Complete table AST can be recreated from this struct.
struct StorageInMemoryMetadata
{
/// Columns of table with their names, types,
/// defaults, comments, etc. All table engines have columns.
ColumnsDescription columns;
/// Table indices. Currently supported for MergeTree only.
IndicesDescription indices;
/// Table constraints. Currently supported for MergeTree only.
ConstraintsDescription constraints;
/// PARTITION BY expression. Currently supported for MergeTree only.
ASTPtr partition_by_ast = nullptr;
/// ORDER BY expression. Required field for all MergeTree tables
/// even in old syntax MergeTree(partition_key, order_by, ...)
ASTPtr order_by_ast = nullptr;
/// PRIMARY KEY expression. If absent, than equal to order_by_ast.
ASTPtr primary_key_ast = nullptr;
/// TTL expression for whole table. Supported for MergeTree only.
ASTPtr ttl_for_table_ast = nullptr;
/// SAMPLE BY expression. Supported for MergeTree only.
ASTPtr sample_by_ast = nullptr;
/// SETTINGS expression. Supported for MergeTree, Buffer and Kafka.
ASTPtr settings_ast = nullptr;
};
}

View File

@ -413,17 +413,27 @@ DatabaseTablesIteratorPtr StorageMerge::getDatabaseIterator(const Context & cont
}
void StorageMerge::checkAlterIsPossible(const AlterCommands & commands, const Settings & /* settings */)
{
for (const auto & command : commands)
{
if (command.type != AlterCommand::Type::ADD_COLUMN && command.type != AlterCommand::Type::MODIFY_COLUMN
&& command.type != AlterCommand::Type::DROP_COLUMN && command.type != AlterCommand::Type::COMMENT_COLUMN)
throw Exception(
"Alter of type '" + alterTypeToString(command.type) + "' is not supported by storage " + getName(),
ErrorCodes::NOT_IMPLEMENTED);
}
}
void StorageMerge::alter(
const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder)
{
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
auto new_columns = getColumns();
auto new_indices = getIndices();
auto new_constraints = getConstraints();
params.applyForColumnsOnly(new_columns);
context.getDatabase(database_name)->alterTable(context, table_name, new_columns, new_indices, new_constraints, {});
setColumns(new_columns);
StorageInMemoryMetadata storage_metadata = getInMemoryMetadata();
params.apply(storage_metadata);
context.getDatabase(database_name)->alterTable(context, table_name, storage_metadata);
setColumns(storage_metadata.columns);
}
Block StorageMerge::getQueryHeader(

View File

@ -48,10 +48,12 @@ public:
database_name = new_database_name;
}
void checkAlterIsPossible(const AlterCommands & commands, const Settings & /* settings */) override;
/// you need to add and remove columns in the sub-tables manually
/// the structure of sub-tables is not checked
void alter(
const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder) override;
void alter(const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder) override;
bool mayBenefitFromIndexForIn(const ASTPtr & left_in_operand, const Context & query_context) const override;

View File

@ -56,27 +56,27 @@ StorageMergeTree::StorageMergeTree(
const String & database_name_,
const String & table_name_,
const String & relative_data_path_,
const ColumnsDescription & columns_,
const IndicesDescription & indices_,
const ConstraintsDescription & constraints_,
const StorageInMemoryMetadata & metadata,
bool attach,
Context & context_,
const String & date_column_name,
const ASTPtr & partition_by_ast_,
const ASTPtr & order_by_ast_,
const ASTPtr & primary_key_ast_,
const ASTPtr & sample_by_ast_, /// nullptr, if sampling is not supported.
const ASTPtr & ttl_table_ast_,
const MergingParams & merging_params_,
std::unique_ptr<MergeTreeSettings> storage_settings_,
bool has_force_restore_data_flag)
: MergeTreeData(database_name_, table_name_, relative_data_path_,
columns_, indices_, constraints_,
context_, date_column_name, partition_by_ast_, order_by_ast_, primary_key_ast_,
sample_by_ast_, ttl_table_ast_, merging_params_,
std::move(storage_settings_), false, attach),
reader(*this), writer(*this),
merger_mutator(*this, global_context.getBackgroundPool().getNumberOfThreads())
: MergeTreeData(
database_name_,
table_name_,
relative_data_path_,
metadata,
context_,
date_column_name,
merging_params_,
std::move(storage_settings_),
false,
attach)
, reader(*this)
, writer(*this)
, merger_mutator(*this, global_context.getBackgroundPool().getNumberOfThreads())
{
loadDataParts(has_force_restore_data_flag);
@ -252,47 +252,21 @@ void StorageMergeTree::alter(
lockNewDataStructureExclusively(table_lock_holder, context.getCurrentQueryId());
checkAlter(params, context);
checkAlterIsPossible(params, context.getSettingsRef());
auto new_columns = getColumns();
auto new_indices = getIndices();
auto new_constraints = getConstraints();
ASTPtr new_order_by_ast = order_by_ast;
ASTPtr new_primary_key_ast = primary_key_ast;
ASTPtr new_ttl_table_ast = ttl_table_ast;
SettingsChanges new_changes;
StorageInMemoryMetadata metadata = getInMemoryMetadata();
params.apply(new_columns, new_indices, new_constraints, new_order_by_ast, new_primary_key_ast, new_ttl_table_ast, new_changes);
/// Modifier for storage AST in /metadata/storage_db/storage.sql
IDatabase::ASTModifier storage_modifier = [&](IAST & ast)
{
auto & storage_ast = ast.as<ASTStorage &>();
if (new_order_by_ast.get() != order_by_ast.get())
storage_ast.set(storage_ast.order_by, new_order_by_ast);
if (new_primary_key_ast.get() != primary_key_ast.get())
storage_ast.set(storage_ast.primary_key, new_primary_key_ast);
if (new_ttl_table_ast.get() != ttl_table_ast.get())
storage_ast.set(storage_ast.ttl_table, new_ttl_table_ast);
if (!new_changes.empty())
{
auto settings_modifier = getSettingsModifier(new_changes);
settings_modifier(ast);
}
};
params.apply(metadata);
/// Update metdata in memory
auto update_metadata = [&]()
auto update_metadata = [&metadata, &table_lock_holder, this]()
{
changeSettings(new_changes, table_lock_holder);
/// Reinitialize primary key because primary key column types might have changed.
setProperties(new_order_by_ast, new_primary_key_ast, new_columns, new_indices, new_constraints);
setTTLExpressions(new_columns.getColumnTTLs(), new_ttl_table_ast);
changeSettings(metadata.settings_ast, table_lock_holder);
/// Reinitialize primary key because primary key column types might have changed.
setProperties(metadata);
setTTLExpressions(metadata.columns.getColumnTTLs(), metadata.ttl_for_table_ast);
};
/// This alter can be performed at metadata level only
@ -300,7 +274,7 @@ void StorageMergeTree::alter(
{
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
context.getDatabase(current_database_name)->alterTable(context, current_table_name, new_columns, new_indices, new_constraints, storage_modifier);
context.getDatabase(current_database_name)->alterTable(context, current_table_name, metadata);
update_metadata();
}
@ -308,16 +282,16 @@ void StorageMergeTree::alter(
{
/// NOTE: Here, as in ReplicatedMergeTree, you can do ALTER which does not block the writing of data for a long time.
/// Also block moves, because they can replace part with old state
/// Also block moves, because they can replace part with old state.
auto merge_blocker = merger_mutator.merges_blocker.cancel();
auto moves_blocked = parts_mover.moves_blocker.cancel();
auto transactions = prepareAlterTransactions(new_columns, new_indices, context);
auto transactions = prepareAlterTransactions(metadata.columns, metadata.indices, context);
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
context.getDatabase(current_database_name)->alterTable(context, current_table_name, new_columns, new_indices, new_constraints, storage_modifier);
context.getDatabase(current_database_name)->alterTable(context, current_table_name, metadata);
update_metadata();
@ -930,25 +904,18 @@ void StorageMergeTree::clearColumnOrIndexInPartition(const ASTPtr & partition, c
std::vector<AlterDataPartTransactionPtr> transactions;
auto new_columns = getColumns();
auto new_indices = getIndices();
auto new_constraints = getConstraints();
ASTPtr ignored_order_by_ast;
ASTPtr ignored_primary_key_ast;
ASTPtr ignored_ttl_table_ast;
SettingsChanges ignored_settings_changes;
alter_command.apply(new_columns, new_indices, new_constraints, ignored_order_by_ast,
ignored_primary_key_ast, ignored_ttl_table_ast, ignored_settings_changes);
StorageInMemoryMetadata metadata = getInMemoryMetadata();
alter_command.apply(metadata);
auto columns_for_parts = new_columns.getAllPhysical();
auto columns_for_parts = metadata.columns.getAllPhysical();
for (const auto & part : parts)
{
if (part->info.partition_id != partition_id)
throw Exception("Unexpected partition ID " + part->info.partition_id + ". This is a bug.", ErrorCodes::LOGICAL_ERROR);
MergeTreeData::AlterDataPartTransactionPtr transaction(new MergeTreeData::AlterDataPartTransaction(part));
alterDataPart(columns_for_parts, new_indices.indices, false, transaction);
alterDataPart(columns_for_parts, metadata.indices.indices, false, transaction);
if (transaction->isValid())
transactions.push_back(std::move(transaction));

View File

@ -67,7 +67,7 @@ public:
void drop(TableStructureWriteLockHolder &) override;
void truncate(const ASTPtr &, const Context &, TableStructureWriteLockHolder &) override;
void alter(const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder) override;
void alter(const AlterCommands & commands, const Context & context, TableStructureWriteLockHolder & table_lock_holder) override;
void checkTableCanBeDropped() const override;
@ -161,17 +161,10 @@ protected:
const String & database_name_,
const String & table_name_,
const String & relative_data_path_,
const ColumnsDescription & columns_,
const IndicesDescription & indices_,
const ConstraintsDescription & constraints_,
const StorageInMemoryMetadata & metadata,
bool attach,
Context & context_,
const String & date_column_name,
const ASTPtr & partition_by_ast_,
const ASTPtr & order_by_ast_,
const ASTPtr & primary_key_ast_,
const ASTPtr & sample_by_ast_, /// nullptr, if sampling is not supported.
const ASTPtr & ttl_table_ast_,
const MergingParams & merging_params_,
std::unique_ptr<MergeTreeSettings> settings_,
bool has_force_restore_data_flag);

View File

@ -30,6 +30,19 @@ void registerStorageNull(StorageFactory & factory)
});
}
void StorageNull::checkAlterIsPossible(const AlterCommands & commands, const Settings & /* settings */)
{
for (const auto & command : commands)
{
if (command.type != AlterCommand::Type::ADD_COLUMN && command.type != AlterCommand::Type::MODIFY_COLUMN
&& command.type != AlterCommand::Type::DROP_COLUMN && command.type != AlterCommand::Type::COMMENT_COLUMN)
throw Exception(
"Alter of type '" + alterTypeToString(command.type) + "' is not supported by storage " + getName(),
ErrorCodes::NOT_IMPLEMENTED);
}
}
void StorageNull::alter(
const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder)
{
@ -38,12 +51,10 @@ void StorageNull::alter(
const String current_database_name = getDatabaseName();
const String current_table_name = getTableName();
ColumnsDescription new_columns = getColumns();
IndicesDescription new_indices = getIndices();
ConstraintsDescription new_constraints = getConstraints();
params.applyForColumnsOnly(new_columns);
context.getDatabase(current_database_name)->alterTable(context, current_table_name, new_columns, new_indices, new_constraints, {});
setColumns(std::move(new_columns));
StorageInMemoryMetadata metadata = getInMemoryMetadata();
params.apply(metadata);
context.getDatabase(current_database_name)->alterTable(context, current_table_name, metadata);
setColumns(std::move(metadata.columns));
}
}

View File

@ -44,8 +44,9 @@ public:
database_name = new_database_name;
}
void alter(
const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder) override;
void checkAlterIsPossible(const AlterCommands & commands, const Settings & /* settings */) override;
void alter(const AlterCommands & params, const Context & context, TableStructureWriteLockHolder & table_lock_holder) override;
private:
String table_name;

View File

@ -33,6 +33,7 @@
#include <Parsers/ASTLiteral.h>
#include <Parsers/queryToString.h>
#include <Parsers/ASTCheckQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <IO/ReadBufferFromString.h>
#include <IO/Operators.h>
@ -194,25 +195,16 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
const String & database_name_,
const String & table_name_,
const String & relative_data_path_,
const ColumnsDescription & columns_,
const IndicesDescription & indices_,
const ConstraintsDescription & constraints_,
const StorageInMemoryMetadata & metadata,
Context & context_,
const String & date_column_name,
const ASTPtr & partition_by_ast_,
const ASTPtr & order_by_ast_,
const ASTPtr & primary_key_ast_,
const ASTPtr & sample_by_ast_,
const ASTPtr & ttl_table_ast_,
const MergingParams & merging_params_,
std::unique_ptr<MergeTreeSettings> settings_,
bool has_force_restore_data_flag)
: MergeTreeData(database_name_, table_name_, relative_data_path_,
columns_, indices_, constraints_,
context_, date_column_name, partition_by_ast_, order_by_ast_, primary_key_ast_,
sample_by_ast_, ttl_table_ast_, merging_params_,
std::move(settings_), true, attach,
: MergeTreeData(database_name_, table_name_, relative_data_path_, metadata,
context_, date_column_name, merging_params_, std::move(settings_), true, attach,
[this] (const std::string & name) { enqueuePartForCheck(name); }),
zookeeper_path(global_context.getMacros()->expand(zookeeper_path_, database_name_, table_name_)),
replica_name(global_context.getMacros()->expand(replica_name_, database_name_, table_name_)),
reader(*this), writer(*this), merger_mutator(*this, global_context.getBackgroundPool().getNumberOfThreads()),
@ -496,12 +488,10 @@ void StorageReplicatedMergeTree::checkTableStructure(bool skip_sanity_checks, bo
void StorageReplicatedMergeTree::setTableStructure(ColumnsDescription new_columns, const ReplicatedMergeTreeTableMetadata::Diff & metadata_diff)
{
ASTPtr new_primary_key_ast = primary_key_ast;
ASTPtr new_order_by_ast = order_by_ast;
auto new_indices = getIndices();
auto new_constraints = getConstraints();
ASTPtr new_ttl_table_ast = ttl_table_ast;
IDatabase::ASTModifier storage_modifier;
StorageInMemoryMetadata metadata = getInMemoryMetadata();
if (new_columns != metadata.columns)
metadata.columns = new_columns;
if (!metadata_diff.empty())
{
if (metadata_diff.sorting_key_changed)
@ -510,59 +500,41 @@ void StorageReplicatedMergeTree::setTableStructure(ColumnsDescription new_column
auto new_sorting_key_expr_list = parseQuery(parser, metadata_diff.new_sorting_key, 0);
if (new_sorting_key_expr_list->children.size() == 1)
new_order_by_ast = new_sorting_key_expr_list->children[0];
metadata.order_by_ast = new_sorting_key_expr_list->children[0];
else
{
auto tuple = makeASTFunction("tuple");
tuple->arguments->children = new_sorting_key_expr_list->children;
new_order_by_ast = tuple;
metadata.order_by_ast = tuple;
}
if (!primary_key_ast)
{
/// Primary and sorting key become independent after this ALTER so we have to
/// save the old ORDER BY expression as the new primary key.
new_primary_key_ast = order_by_ast->clone();
metadata.primary_key_ast = order_by_ast->clone();
}
}
if (metadata_diff.skip_indices_changed)
new_indices = IndicesDescription::parse(metadata_diff.new_skip_indices);
metadata.indices = IndicesDescription::parse(metadata_diff.new_skip_indices);
if (metadata_diff.constraints_changed)
new_constraints = ConstraintsDescription::parse(metadata_diff.new_constraints);
metadata.constraints = ConstraintsDescription::parse(metadata_diff.new_constraints);
if (metadata_diff.ttl_table_changed)
{
ParserTTLExpressionList parser;
new_ttl_table_ast = parseQuery(parser, metadata_diff.new_ttl_table, 0);
ParserExpression parser;
metadata.ttl_for_table_ast = parseQuery(parser, metadata_diff.new_ttl_table, 0);
}
storage_modifier = [&](IAST & ast)
{
auto & storage_ast = ast.as<ASTStorage &>();
if (!storage_ast.order_by)
throw Exception(
"ALTER MODIFY ORDER BY of default-partitioned tables is not supported",
ErrorCodes::LOGICAL_ERROR);
if (new_primary_key_ast.get() != primary_key_ast.get())
storage_ast.set(storage_ast.primary_key, new_primary_key_ast);
if (new_ttl_table_ast.get() != ttl_table_ast.get())
storage_ast.set(storage_ast.ttl_table, new_ttl_table_ast);
storage_ast.set(storage_ast.order_by, new_order_by_ast);
};
}
global_context.getDatabase(database_name)->alterTable(global_context, table_name, new_columns, new_indices, new_constraints, storage_modifier);
global_context.getDatabase(database_name)->alterTable(global_context, table_name, metadata);
/// Even if the primary/sorting keys didn't change we must reinitialize it
/// because primary key column types might have changed.
setProperties(new_order_by_ast, new_primary_key_ast, new_columns, new_indices, new_constraints);
setTTLExpressions(new_columns.getColumnTTLs(), new_ttl_table_ast);
setProperties(metadata);
setTTLExpressions(new_columns.getColumnTTLs(), metadata.ttl_for_table_ast);
}
@ -1550,18 +1522,12 @@ void StorageReplicatedMergeTree::executeClearColumnOrIndexInPartition(const LogE
alter_command.index_name = entry.index_name;
}
auto new_columns = getColumns();
auto new_indices = getIndices();
auto new_constraints = getConstraints();
ASTPtr ignored_order_by_ast;
ASTPtr ignored_primary_key_ast;
ASTPtr ignored_ttl_table_ast;
SettingsChanges ignored_changes;
alter_command.apply(new_columns, new_indices, new_constraints, ignored_order_by_ast, ignored_primary_key_ast, ignored_ttl_table_ast, ignored_changes);
StorageInMemoryMetadata metadata = getInMemoryMetadata();
alter_command.apply(metadata);
size_t modified_parts = 0;
auto parts = getDataParts();
auto columns_for_parts = new_columns.getAllPhysical();
auto columns_for_parts = metadata.columns.getAllPhysical();
/// Check there are no merges in range again
/// TODO: Currently, there are no guarantees that a merge covering entry_part_info will happen during the execution.
@ -1581,7 +1547,7 @@ void StorageReplicatedMergeTree::executeClearColumnOrIndexInPartition(const LogE
LOG_DEBUG(log, "Clearing index " << alter_command.index_name << " in part " << part->name);
MergeTreeData::AlterDataPartTransactionPtr transaction(new MergeTreeData::AlterDataPartTransaction(part));
alterDataPart(columns_for_parts, new_indices.indices, false, transaction);
alterDataPart(columns_for_parts, metadata.indices.indices, false, transaction);
if (!transaction->isValid())
continue;
@ -3232,6 +3198,9 @@ void StorageReplicatedMergeTree::alter(
const String current_database_name = getDatabaseName();
const String current_table_name = getTableName();
checkAlterIsPossible(params, query_context.getSettingsRef());
/// We cannot check this alter commands with method isModifyingData()
/// because ReplicatedMergeTree stores both columns and metadata for
/// each replica. So we have to wait AlterThread even with lightweight
@ -3241,14 +3210,12 @@ void StorageReplicatedMergeTree::alter(
/// We don't replicate storage_settings_ptr ALTER. It's local operation.
/// Also we don't upgrade alter lock to table structure lock.
LOG_DEBUG(log, "ALTER storage_settings_ptr only");
SettingsChanges new_changes;
params.applyForSettingsOnly(new_changes);
StorageInMemoryMetadata metadata = getInMemoryMetadata();
params.apply(metadata);
changeSettings(new_changes, table_lock_holder);
changeSettings(metadata.settings_ast, table_lock_holder);
IDatabase::ASTModifier settings_modifier = getSettingsModifier(new_changes);
global_context.getDatabase(current_database_name)->alterTable(
query_context, current_table_name, getColumns(), getIndices(), getConstraints(), settings_modifier);
global_context.getDatabase(current_database_name)->alterTable(query_context, current_table_name, metadata);
return;
}
@ -3278,6 +3245,13 @@ void StorageReplicatedMergeTree::alter(
int32_t new_version = -1; /// Initialization is to suppress (useless) false positive warning found by cppcheck.
};
auto ast_to_str = [](ASTPtr query) -> String
{
if (!query)
return "";
return queryToString(query);
};
/// /columns and /metadata nodes
std::vector<ChangedNode> changed_nodes;
@ -3288,33 +3262,25 @@ void StorageReplicatedMergeTree::alter(
if (is_readonly)
throw Exception("Can't ALTER readonly table", ErrorCodes::TABLE_IS_READ_ONLY);
checkAlter(params, query_context);
StorageInMemoryMetadata metadata = getInMemoryMetadata();
params.apply(metadata);
ColumnsDescription new_columns = getColumns();
IndicesDescription new_indices = getIndices();
ConstraintsDescription new_constraints = getConstraints();
ASTPtr new_order_by_ast = order_by_ast;
ASTPtr new_primary_key_ast = primary_key_ast;
ASTPtr new_ttl_table_ast = ttl_table_ast;
SettingsChanges new_changes;
params.apply(new_columns, new_indices, new_constraints, new_order_by_ast, new_primary_key_ast, new_ttl_table_ast, new_changes);
String new_columns_str = new_columns.toString();
String new_columns_str = metadata.columns.toString();
if (new_columns_str != getColumns().toString())
changed_nodes.emplace_back(zookeeper_path, "columns", new_columns_str);
ReplicatedMergeTreeTableMetadata new_metadata(*this);
if (new_order_by_ast.get() != order_by_ast.get())
new_metadata.sorting_key = serializeAST(*extractKeyExpressionList(new_order_by_ast));
if (ast_to_str(metadata.order_by_ast) != ast_to_str(order_by_ast))
new_metadata.sorting_key = serializeAST(*extractKeyExpressionList(metadata.order_by_ast));
if (new_ttl_table_ast.get() != ttl_table_ast.get())
new_metadata.ttl_table = serializeAST(*new_ttl_table_ast);
if (ast_to_str(metadata.ttl_for_table_ast) != ast_to_str(ttl_table_ast))
new_metadata.ttl_table = serializeAST(*metadata.ttl_for_table_ast);
String new_indices_str = new_indices.toString();
String new_indices_str = metadata.indices.toString();
if (new_indices_str != getIndices().toString())
new_metadata.skip_indices = new_indices_str;
String new_constraints_str = new_constraints.toString();
String new_constraints_str = metadata.constraints.toString();
if (new_constraints_str != getConstraints().toString())
new_metadata.constraints = new_constraints_str;
@ -3323,16 +3289,11 @@ void StorageReplicatedMergeTree::alter(
changed_nodes.emplace_back(zookeeper_path, "metadata", new_metadata_str);
/// Perform settings update locally
if (!new_changes.empty())
{
IDatabase::ASTModifier settings_modifier = getSettingsModifier(new_changes);
changeSettings(new_changes, table_lock_holder);
global_context.getDatabase(current_database_name)->alterTable(
query_context, current_table_name, getColumns(), getIndices(), getConstraints(), settings_modifier);
}
auto old_metadata = getInMemoryMetadata();
old_metadata.settings_ast = metadata.settings_ast;
changeSettings(metadata.settings_ast, table_lock_holder);
global_context.getDatabase(current_database_name)->alterTable(query_context, current_table_name, old_metadata);
/// Modify shared metadata nodes in ZooKeeper.
Coordination::Requests ops;

View File

@ -545,16 +545,9 @@ protected:
bool attach,
const String & database_name_, const String & name_,
const String & relative_data_path_,
const ColumnsDescription & columns_,
const IndicesDescription & indices_,
const ConstraintsDescription & constraints_,
const StorageInMemoryMetadata & metadata,
Context & context_,
const String & date_column_name,
const ASTPtr & partition_by_ast_,
const ASTPtr & order_by_ast_,
const ASTPtr & primary_key_ast_,
const ASTPtr & sample_by_ast_,
const ASTPtr & table_ttl_ast_,
const MergingParams & merging_params_,
std::unique_ptr<MergeTreeSettings> settings_,
bool has_force_restore_data_flag);

View File

@ -5,7 +5,7 @@ CREATE TABLE log_for_alter (
Data String
) ENGINE = Log();
ALTER TABLE log_for_alter MODIFY SETTING aaa=123; -- { serverError 471 }
ALTER TABLE log_for_alter MODIFY SETTING aaa=123; -- { serverError 48 }
DROP TABLE IF EXISTS log_for_alter;

View File

@ -0,0 +1 @@
SELECT arrayReduce('aggThrow(0.0001)', range(number % 10)) FROM system.numbers; -- { serverError 503 }