mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-17 20:02:05 +00:00
dbms: deduce and check types during alter, alteration of defaulted columns. [#METR-12739]
This commit is contained in:
parent
8a8e5963c5
commit
847b91614a
@ -9,7 +9,6 @@
|
||||
#include <DB/Core/NamesAndTypes.h>
|
||||
#include <DB/Core/Exception.h>
|
||||
#include <DB/Core/ErrorCodes.h>
|
||||
#include <DB/Storages/ColumnDefault.h>
|
||||
#include "ColumnsWithNameAndType.h"
|
||||
|
||||
|
||||
@ -22,6 +21,7 @@ namespace DB
|
||||
*/
|
||||
|
||||
class Context;
|
||||
struct ColumnDefaults;
|
||||
|
||||
class Block
|
||||
{
|
||||
@ -69,9 +69,6 @@ public:
|
||||
void addDefaults(const NamesAndTypesList & required_columns,
|
||||
const ColumnDefaults & column_defaults,
|
||||
const Context & context);
|
||||
void addAllDefaults(const NamesAndTypesList & required_columns,
|
||||
const ColumnDefaults & column_defaults,
|
||||
const Context & context);
|
||||
|
||||
ColumnWithNameAndType & getByPosition(size_t position);
|
||||
const ColumnWithNameAndType & getByPosition(size_t position) const;
|
||||
|
@ -110,8 +110,7 @@ private:
|
||||
else
|
||||
{
|
||||
default_type_column.column->insert(toString(it->second.type));
|
||||
default_expression_column.column->insert(queryToString(
|
||||
setAlias(it->second.expression->clone(), "")));
|
||||
default_expression_column.column->insert(queryToString(it->second.expression));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -41,8 +41,19 @@ struct AlterCommand
|
||||
NamesAndTypesList & materialized_columns,
|
||||
NamesAndTypesList & alias_columns,
|
||||
ColumnDefaults & column_defaults) const;
|
||||
|
||||
AlterCommand() = default;
|
||||
AlterCommand(const Type type, const String & column_name, const DataTypePtr & data_type,
|
||||
const ColumnDefaultType default_type, const ASTPtr & default_expression,
|
||||
const String & after_column = String{})
|
||||
: type{type}, column_name{column_name}, data_type{data_type}, default_type{default_type},
|
||||
default_expression{default_expression}, after_column{after_column}
|
||||
{}
|
||||
};
|
||||
|
||||
class IStorage;
|
||||
class Context;
|
||||
|
||||
class AlterCommands : public std::vector<AlterCommand>
|
||||
{
|
||||
public:
|
||||
@ -50,6 +61,8 @@ public:
|
||||
NamesAndTypesList & materialized_columns,
|
||||
NamesAndTypesList & alias_columns,
|
||||
ColumnDefaults & column_defaults) const;
|
||||
|
||||
void validate(IStorage * table, const Context & context);
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -1,6 +1,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <DB/Parsers/IAST.h>
|
||||
#include <DB/Parsers/formatAST.h>
|
||||
#include <unordered_map>
|
||||
|
||||
namespace DB
|
||||
@ -56,5 +57,17 @@ namespace DB
|
||||
ASTPtr expression;
|
||||
};
|
||||
|
||||
using ColumnDefaults = std::unordered_map<String, ColumnDefault>;
|
||||
inline bool operator==(const ColumnDefault & lhs, const ColumnDefault & rhs)
|
||||
{
|
||||
return lhs.type == rhs.type && queryToString(lhs.expression) == queryToString(rhs.expression);
|
||||
}
|
||||
|
||||
struct ColumnDefaults : public std::unordered_map<String, ColumnDefault>
|
||||
{
|
||||
using std::unordered_map<String, ColumnDefault>::unordered_map;
|
||||
|
||||
/// @todo implement (de)serialization
|
||||
String toString() const { return {}; }
|
||||
static ColumnDefaults parse(const String & str) { return {}; }
|
||||
};
|
||||
}
|
||||
|
@ -14,6 +14,7 @@
|
||||
#include <DB/Storages/AlterCommands.h>
|
||||
#include <Poco/File.h>
|
||||
#include <Poco/RWLock.h>
|
||||
#include <statdaemons/stdext.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -107,8 +108,8 @@ public:
|
||||
return res;
|
||||
}
|
||||
|
||||
typedef Poco::SharedPtr<Poco::ScopedWriteRWLock> TableStructureWriteLockPtr;
|
||||
typedef Poco::SharedPtr<Poco::ScopedWriteRWLock> TableDataWriteLockPtr;
|
||||
typedef std::unique_ptr<Poco::ScopedWriteRWLock> TableStructureWriteLockPtr;
|
||||
typedef std::unique_ptr<Poco::ScopedWriteRWLock> TableDataWriteLockPtr;
|
||||
typedef std::pair<TableDataWriteLockPtr, TableStructureWriteLockPtr> TableFullWriteLockPtr;
|
||||
|
||||
/** Не дает читать структуру таблицы. Берется для ALTER, RENAME и DROP.
|
||||
@ -124,7 +125,7 @@ public:
|
||||
*/
|
||||
TableDataWriteLockPtr lockDataForAlter()
|
||||
{
|
||||
TableDataWriteLockPtr res = new Poco::ScopedWriteRWLock(data_lock);
|
||||
auto res = stdext::make_unique<Poco::ScopedWriteRWLock>(data_lock);
|
||||
if (is_dropped)
|
||||
throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED);
|
||||
return res;
|
||||
@ -132,7 +133,7 @@ public:
|
||||
|
||||
TableStructureWriteLockPtr lockStructureForAlter()
|
||||
{
|
||||
TableStructureWriteLockPtr res = new Poco::ScopedWriteRWLock(structure_lock);
|
||||
auto res = stdext::make_unique<Poco::ScopedWriteRWLock>(structure_lock);
|
||||
if (is_dropped)
|
||||
throw Exception("Table is dropped", ErrorCodes::TABLE_IS_DROPPED);
|
||||
return res;
|
||||
|
@ -108,6 +108,7 @@ protected:
|
||||
|
||||
if (!reader)
|
||||
{
|
||||
/// @todo resolve missing columns
|
||||
UncompressedCache * uncompressed_cache = use_uncompressed_cache ? storage.context.getUncompressedCache() : NULL;
|
||||
reader.reset(new MergeTreeReader(path, owned_data_part->name, columns, uncompressed_cache, storage, all_mark_ranges));
|
||||
if (prewhere_actions)
|
||||
|
@ -124,7 +124,7 @@ public:
|
||||
storage.reportBrokenPart(part_name);
|
||||
|
||||
/// Более хорошая диагностика.
|
||||
throw Exception(e.message() + " (while reading from part " + path + " from mark " + toString(from_mark) + " to "
|
||||
throw Exception(e.message() + "\n(while reading from part " + path + " from mark " + toString(from_mark) + " to "
|
||||
+ toString(to_mark) + ")", e.code());
|
||||
}
|
||||
catch (...)
|
||||
@ -195,12 +195,13 @@ public:
|
||||
}
|
||||
|
||||
/// evaluate defaulted columns
|
||||
res.addAllDefaults(columns, storage.column_defaults, storage.context);
|
||||
res.addDefaults(columns, storage.column_defaults, storage.context);
|
||||
}
|
||||
catch (const Exception & e)
|
||||
{
|
||||
/// Более хорошая диагностика.
|
||||
throw Exception(e.message() + " (while reading from part " + path + ")", e.code());
|
||||
throw Exception(e.message() + '\n' + e.getStackTrace().toString()
|
||||
+ "\n(while reading from part " + path + ")", e.code());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -5,6 +5,8 @@
|
||||
|
||||
#include <DB/Core/Block.h>
|
||||
|
||||
#include <DB/Storages/ColumnDefault.h>
|
||||
|
||||
#include <DB/Columns/ColumnArray.h>
|
||||
#include <DB/DataTypes/DataTypeNested.h>
|
||||
|
||||
@ -49,7 +51,8 @@ void Block::addDefaults(const NamesAndTypesList & required_columns,
|
||||
if (it == column_defaults.end())
|
||||
insertDefault(column.name, column.type);
|
||||
else
|
||||
default_expr_list->children.emplace_back(it->second.expression->clone());
|
||||
default_expr_list->children.emplace_back(
|
||||
setAlias(it->second.expression->clone(), it->first));
|
||||
}
|
||||
|
||||
/// nothing to evaluate
|
||||
@ -67,38 +70,6 @@ void Block::addDefaults(const NamesAndTypesList & required_columns,
|
||||
insert(std::move(column_name_type));
|
||||
}
|
||||
|
||||
void Block::addAllDefaults(const NamesAndTypesList & required_columns,
|
||||
const ColumnDefaults & column_defaults,
|
||||
const Context & context)
|
||||
{
|
||||
ASTPtr default_expr_list{stdext::make_unique<ASTExpressionList>().release()};
|
||||
|
||||
for (const auto & column_default : column_defaults)
|
||||
{
|
||||
if (has(column_default.first))
|
||||
continue;
|
||||
|
||||
/// expressions must be cloned to prevent modification by the ExpressionAnalyzer
|
||||
default_expr_list->children.emplace_back(column_default.second.expression->clone());
|
||||
}
|
||||
|
||||
/// nothing to evaluate
|
||||
if (default_expr_list->children.empty())
|
||||
return;
|
||||
|
||||
/** ExpressionAnalyzer eliminates "unused" columns, in order to ensure their safety
|
||||
* we are going to operate on a copy instead of the original block */
|
||||
Block copy_block{*this};
|
||||
/// evaluate default values for defaulted columns
|
||||
ExpressionAnalyzer{default_expr_list, context, required_columns}.getActions(true)->execute(copy_block);
|
||||
|
||||
/// move evaluated columns to the original block only if required
|
||||
for (auto & column_name_type : required_columns) {
|
||||
if (copy_block.has(column_name_type.name))
|
||||
insert(std::move(copy_block.getByName(column_name_type.name)));
|
||||
}
|
||||
}
|
||||
|
||||
Block & Block::operator= (const Block & other)
|
||||
{
|
||||
data = other.data;
|
||||
|
@ -50,8 +50,12 @@ void InterpreterAlterQuery::execute()
|
||||
throw Exception("Bad PartitionCommand::Type: " + toString(command.type), ErrorCodes::ARGUMENT_OUT_OF_BOUND);
|
||||
}
|
||||
|
||||
if (!alter_commands.empty())
|
||||
table->alter(alter_commands, database_name, table_name, context);
|
||||
if (alter_commands.empty())
|
||||
return;
|
||||
|
||||
alter_commands.validate(table.get(), context);
|
||||
|
||||
table->alter(alter_commands, database_name, table_name, context);
|
||||
}
|
||||
|
||||
void InterpreterAlterQuery::parseAlter(
|
||||
@ -77,7 +81,7 @@ void InterpreterAlterQuery::parseAlter(
|
||||
if (ast_col_decl.default_expression)
|
||||
{
|
||||
command.default_type = columnDefaultTypeFromString(ast_col_decl.default_specifier);
|
||||
command.default_expression = setAlias(ast_col_decl.default_expression, ast_col_decl.name);
|
||||
command.default_expression = ast_col_decl.default_expression;
|
||||
}
|
||||
|
||||
if (params.column)
|
||||
@ -111,7 +115,7 @@ void InterpreterAlterQuery::parseAlter(
|
||||
if (ast_col_decl.default_expression)
|
||||
{
|
||||
command.default_type = columnDefaultTypeFromString(ast_col_decl.default_specifier);
|
||||
command.default_expression = setAlias(ast_col_decl.default_expression, ast_col_decl.name);
|
||||
command.default_expression = ast_col_decl.default_expression;
|
||||
}
|
||||
|
||||
out_alter_commands.push_back(command);
|
||||
|
@ -337,7 +337,7 @@ InterpreterCreateQuery::ColumnsAndDefaults InterpreterCreateQuery::parseColumns(
|
||||
|
||||
defaults.emplace(col_decl_ptr->name, ColumnDefault{
|
||||
columnDefaultTypeFromString(col_decl_ptr->default_specifier),
|
||||
setAlias(col_decl_ptr->default_expression, col_decl_ptr->name)
|
||||
col_decl_ptr->default_expression
|
||||
});
|
||||
}
|
||||
}
|
||||
@ -429,7 +429,7 @@ ASTPtr InterpreterCreateQuery::formatColumns(NamesAndTypesList columns,
|
||||
if (it != std::end(column_defaults))
|
||||
{
|
||||
column_declaration->default_specifier = toString(it->second.type);
|
||||
column_declaration->default_expression = setAlias(it->second.expression->clone(), "");
|
||||
column_declaration->default_expression = it->second.expression->clone();
|
||||
}
|
||||
|
||||
columns_list.children.push_back(column_declaration_ptr);
|
||||
|
@ -1,6 +1,9 @@
|
||||
#include <DB/Storages/AlterCommands.h>
|
||||
#include <DB/Storages/IStorage.h>
|
||||
#include <DB/DataTypes/DataTypeNested.h>
|
||||
#include <DB/DataTypes/DataTypeArray.h>
|
||||
#include <DB/Interpreters/Context.h>
|
||||
#include <DB/Interpreters/ExpressionAnalyzer.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -54,7 +57,7 @@ namespace DB
|
||||
add_column(materialized_columns);
|
||||
else if (default_type == ColumnDefaultType::Alias)
|
||||
add_column(alias_columns);
|
||||
else
|
||||
else
|
||||
throw Exception{"Unknown ColumnDefaultType value", ErrorCodes::LOGICAL_ERROR};
|
||||
|
||||
if (default_expression)
|
||||
@ -139,7 +142,7 @@ namespace DB
|
||||
auto new_materialized_columns = materialized_columns;
|
||||
auto new_alias_columns = alias_columns;
|
||||
auto new_column_defaults = column_defaults;
|
||||
|
||||
|
||||
for (const AlterCommand & command : *this)
|
||||
command.apply(new_columns, new_materialized_columns, new_alias_columns, new_column_defaults);
|
||||
|
||||
@ -148,4 +151,153 @@ namespace DB
|
||||
alias_columns = std::move(new_alias_columns);
|
||||
column_defaults = std::move(new_column_defaults);
|
||||
}
|
||||
|
||||
void AlterCommands::validate(IStorage * table, const Context & context)
|
||||
{
|
||||
auto lock = table->lockDataForAlter();
|
||||
|
||||
auto columns = table->getColumnsList();
|
||||
columns.insert(std::end(columns), std::begin(table->alias_columns), std::end(table->alias_columns));
|
||||
auto defaults = table->column_defaults;
|
||||
|
||||
lock.reset();
|
||||
|
||||
std::vector<std::pair<String, AlterCommand *>> defaulted_columns{};
|
||||
|
||||
ASTPtr default_expr_list{new ASTExpressionList};
|
||||
default_expr_list->children.reserve(defaults.size());
|
||||
|
||||
for (AlterCommand & command : *this)
|
||||
{
|
||||
if (command.type == AlterCommand::ADD || command.type == AlterCommand::MODIFY)
|
||||
{
|
||||
if (command.type == AlterCommand::MODIFY)
|
||||
{
|
||||
const auto it = std::find_if(std::begin(columns), std::end(columns),
|
||||
std::bind(AlterCommand::namesEqual, std::cref(command.column_name), std::placeholders::_1));
|
||||
|
||||
if (it == std::end(columns))
|
||||
throw Exception("Wrong column name. Cannot find column " + command.column_name + " to modify.",
|
||||
DB::ErrorCodes::ILLEGAL_COLUMN);
|
||||
|
||||
columns.erase(it);
|
||||
defaults.erase(command.column_name);
|
||||
}
|
||||
|
||||
if (command.data_type)
|
||||
columns.emplace_back(command.column_name, command.data_type);
|
||||
|
||||
if (command.default_expression)
|
||||
{
|
||||
if (command.data_type)
|
||||
{
|
||||
const auto & column_name = command.column_name;
|
||||
const auto tmp_column_name = column_name + "_tmp";
|
||||
const auto conversion_function_name = "to" + command.data_type->getName();
|
||||
|
||||
default_expr_list->children.emplace_back(setAlias(
|
||||
makeASTFunction(conversion_function_name, ASTPtr{new ASTIdentifier{{}, tmp_column_name}}),
|
||||
column_name));
|
||||
|
||||
default_expr_list->children.emplace_back(setAlias(command.default_expression->clone(), tmp_column_name));
|
||||
|
||||
defaulted_columns.emplace_back(command.column_name, &command);
|
||||
}
|
||||
else
|
||||
{
|
||||
default_expr_list->children.emplace_back(
|
||||
setAlias(command.default_expression->clone(), command.column_name));
|
||||
|
||||
defaulted_columns.emplace_back(command.column_name, &command);
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (command.type == AlterCommand::DROP)
|
||||
{
|
||||
auto found = false;
|
||||
for (auto it = std::begin(columns); it != std::end(columns);)
|
||||
if (AlterCommand::namesEqual(command.column_name, *it))
|
||||
{
|
||||
found = true;
|
||||
it = columns.erase(it);
|
||||
}
|
||||
else
|
||||
++it;
|
||||
|
||||
for (auto it = std::begin(defaults); it != std::end(defaults);)
|
||||
if (AlterCommand::namesEqual(command.column_name, { it->first, nullptr }))
|
||||
it = defaults.erase(it);
|
||||
else
|
||||
++it;
|
||||
|
||||
if (!found)
|
||||
throw Exception("Wrong column name. Cannot find column " + command.column_name + " to drop.",
|
||||
DB::ErrorCodes::ILLEGAL_COLUMN);
|
||||
}
|
||||
}
|
||||
|
||||
/** Existing defaulted columns may require default expression extensions with a type conversion,
|
||||
* therefore we add them to defaulted_columns to allow further processing */
|
||||
for (const auto & col_def : defaults)
|
||||
{
|
||||
const auto & column_name = col_def.first;
|
||||
const auto column_it = std::find_if(columns.begin(), columns.end(), [&] (const NameAndTypePair & name_type) {
|
||||
return AlterCommand::namesEqual(column_name, name_type);
|
||||
});
|
||||
const auto tmp_column_name = column_name + "_tmp";
|
||||
const auto conversion_function_name = "to" + column_it->type->getName();
|
||||
|
||||
default_expr_list->children.emplace_back(setAlias(
|
||||
makeASTFunction(conversion_function_name, ASTPtr{new ASTIdentifier{{}, tmp_column_name}}),
|
||||
column_name));
|
||||
|
||||
default_expr_list->children.emplace_back(setAlias(col_def.second.expression->clone(), tmp_column_name));
|
||||
|
||||
defaulted_columns.emplace_back(column_name, nullptr);
|
||||
}
|
||||
|
||||
const auto actions = ExpressionAnalyzer{default_expr_list, context, columns}.getActions(true);
|
||||
const auto block = actions->getSampleBlock();
|
||||
|
||||
/// set deduced types, modify default expression if necessary
|
||||
for (auto & defaulted_column : defaulted_columns)
|
||||
{
|
||||
const auto & column_name = defaulted_column.first;
|
||||
const auto command_ptr = defaulted_column.second;
|
||||
const auto & column = block.getByName(column_name);
|
||||
|
||||
/// default expression on old column
|
||||
if (!command_ptr)
|
||||
{
|
||||
const auto & tmp_column = block.getByName(column_name + "_tmp");
|
||||
|
||||
// column not specified explicitly in the ALTER query may require default_expression modification
|
||||
if (typeid(*column.type) != typeid(*tmp_column.type))
|
||||
{
|
||||
const auto it = defaults.find(column_name);
|
||||
this->push_back(AlterCommand{
|
||||
AlterCommand::MODIFY, column_name, column.type, it->second.type,
|
||||
makeASTFunction("to" + column.type->getName(), it->second.expression),
|
||||
});
|
||||
}
|
||||
}
|
||||
else if (command_ptr && command_ptr->data_type)
|
||||
{
|
||||
const auto & tmp_column = block.getByName(column_name + "_tmp");
|
||||
|
||||
/// type mismatch between explicitly specified and deduced type, add conversion
|
||||
if (typeid(*column.type) != typeid(*tmp_column.type))
|
||||
{
|
||||
command_ptr->default_expression = makeASTFunction(
|
||||
"to" + column.type->getName(),
|
||||
command_ptr->default_expression->clone());
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
/// just set deduced type
|
||||
command_ptr->data_type = column.type;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -381,7 +381,11 @@ void MergeTreeData::checkAlter(const AlterCommands & params)
|
||||
/// Проверим, что преобразования типов возможны.
|
||||
ExpressionActionsPtr unused_expression;
|
||||
NameToNameMap unused_map;
|
||||
createConvertExpression(nullptr, *columns, new_columns, unused_expression, unused_map);
|
||||
|
||||
/// augment plain columns with materialized columns for convert expression creation
|
||||
new_columns.insert(std::end(new_columns),
|
||||
std::begin(new_materialized_columns), std::end(new_materialized_columns));
|
||||
createConvertExpression(nullptr, getColumnsList(), new_columns, unused_expression, unused_map);
|
||||
}
|
||||
|
||||
void MergeTreeData::createConvertExpression(const DataPartPtr & part, const NamesAndTypesList & old_columns, const NamesAndTypesList & new_columns,
|
||||
|
@ -133,15 +133,18 @@ void StorageMergeTree::alter(const AlterCommands & params, const String & databa
|
||||
auto new_materialized_columns = data.materialized_columns;
|
||||
auto new_alias_columns = data.alias_columns;
|
||||
auto new_column_defaults = data.column_defaults;
|
||||
|
||||
|
||||
params.apply(new_columns, new_materialized_columns, new_alias_columns, new_column_defaults);
|
||||
|
||||
auto columns_for_parts = new_columns;
|
||||
columns_for_parts.insert(std::end(columns_for_parts),
|
||||
std::begin(new_materialized_columns), std::end(new_materialized_columns));
|
||||
|
||||
MergeTreeData::DataParts parts = data.getDataParts();
|
||||
std::vector<MergeTreeData::AlterDataPartTransactionPtr> transactions;
|
||||
for (const MergeTreeData::DataPartPtr & part : parts)
|
||||
{
|
||||
auto transaction = data.alterDataPart(part, new_columns);
|
||||
if (transaction)
|
||||
if (auto transaction = data.alterDataPart(part, columns_for_parts))
|
||||
transactions.push_back(std::move(transaction));
|
||||
}
|
||||
|
||||
|
@ -209,6 +209,12 @@ void StorageReplicatedMergeTree::createTableIfNotExists()
|
||||
zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent));
|
||||
ops.push_back(new zkutil::Op::Create(zookeeper_path + "/columns", data.getColumnsListNonMaterialized().toString(),
|
||||
zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent));
|
||||
ops.push_back(new zkutil::Op::Create(zookeeper_path + "/materialized_columns", data.materialized_columns.toString(),
|
||||
zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent));
|
||||
ops.push_back(new zkutil::Op::Create(zookeeper_path + "/alias_columns", data.alias_columns.toString(),
|
||||
zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent));
|
||||
ops.push_back(new zkutil::Op::Create(zookeeper_path + "/column_defaults", data.column_defaults.toString(),
|
||||
zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent));
|
||||
ops.push_back(new zkutil::Op::Create(zookeeper_path + "/log", "",
|
||||
zookeeper->getDefaultACL(), zkutil::CreateMode::Persistent));
|
||||
ops.push_back(new zkutil::Op::Create(zookeeper_path + "/blocks", "",
|
||||
@ -256,13 +262,21 @@ void StorageReplicatedMergeTree::checkTableStructure(bool skip_sanity_checks, bo
|
||||
|
||||
zkutil::Stat stat;
|
||||
auto columns = NamesAndTypesList::parse(zookeeper->get(zookeeper_path + "/columns", &stat), context.getDataTypeFactory());
|
||||
NamesAndTypesList materialized_columns;
|
||||
NamesAndTypesList alias_columns;
|
||||
ColumnDefaults column_defaults;
|
||||
auto materialized_columns = NamesAndTypesList::parse(
|
||||
zookeeper->get(zookeeper_path + "/materialized_columns", &stat), context.getDataTypeFactory());
|
||||
auto alias_columns = NamesAndTypesList::parse(
|
||||
zookeeper->get(zookeeper_path + "/alias_columns", &stat), context.getDataTypeFactory());
|
||||
auto column_defaults = ColumnDefaults::parse(zookeeper->get(zookeeper_path + "/column_defaults", &stat));
|
||||
columns_version = stat.version;
|
||||
if (columns != data.getColumnsListNonMaterialized())
|
||||
if (columns != data.getColumnsListNonMaterialized() ||
|
||||
materialized_columns != data.materialized_columns ||
|
||||
alias_columns != data.alias_columns ||
|
||||
column_defaults != data.column_defaults)
|
||||
{
|
||||
if (allow_alter && (data.getColumnsListNonMaterialized().sizeOfDifference(columns) <= 2 || skip_sanity_checks))
|
||||
if (allow_alter &&
|
||||
(skip_sanity_checks ||
|
||||
data.getColumnsListNonMaterialized().sizeOfDifference(columns) +
|
||||
data.materialized_columns.sizeOfDifference(materialized_columns) <= 2))
|
||||
{
|
||||
LOG_WARNING(log, "Table structure in ZooKeeper is a little different from local table structure. Assuming ALTER.");
|
||||
|
||||
@ -411,6 +425,9 @@ void StorageReplicatedMergeTree::createReplica()
|
||||
}
|
||||
|
||||
zookeeper->create(replica_path + "/columns", data.getColumnsListNonMaterialized().toString(), zkutil::CreateMode::Persistent);
|
||||
zookeeper->create(replica_path + "/materialized_columns", data.materialized_columns.toString(), zkutil::CreateMode::Persistent);
|
||||
zookeeper->create(replica_path + "/alias_columns", data.alias_columns.toString(), zkutil::CreateMode::Persistent);
|
||||
zookeeper->create(replica_path + "/column_defaults", data.column_defaults.toString(), zkutil::CreateMode::Persistent);
|
||||
}
|
||||
|
||||
void StorageReplicatedMergeTree::activateReplica()
|
||||
@ -632,6 +649,15 @@ void StorageReplicatedMergeTree::checkPartAndAddToZooKeeper(const MergeTreeData:
|
||||
ops.push_back(new zkutil::Op::Check(
|
||||
zookeeper_path + "/columns",
|
||||
expected_columns_version));
|
||||
ops.push_back(new zkutil::Op::Check(
|
||||
zookeeper_path + "/materialized_columns",
|
||||
expected_columns_version));
|
||||
ops.push_back(new zkutil::Op::Check(
|
||||
zookeeper_path + "/alias_columns",
|
||||
expected_columns_version));
|
||||
ops.push_back(new zkutil::Op::Check(
|
||||
zookeeper_path + "/column_defaults",
|
||||
expected_columns_version));
|
||||
ops.push_back(new zkutil::Op::Create(
|
||||
replica_path + "/parts/" + part_name,
|
||||
"",
|
||||
@ -1588,10 +1614,15 @@ void StorageReplicatedMergeTree::alterThread()
|
||||
*/
|
||||
|
||||
zkutil::Stat stat;
|
||||
String columns_str = zookeeper->get(zookeeper_path + "/columns", &stat, alter_thread_event);
|
||||
const String columns_str = zookeeper->get(zookeeper_path + "/columns", &stat, alter_thread_event);
|
||||
const String materialized_columns_str = zookeeper->get(zookeeper_path + "/materialized_columns",
|
||||
&stat, alter_thread_event);
|
||||
const String alias_columns_str = zookeeper->get(zookeeper_path + "/alias_columns",
|
||||
&stat, alter_thread_event);
|
||||
NamesAndTypesList columns = NamesAndTypesList::parse(columns_str, context.getDataTypeFactory());
|
||||
NamesAndTypesList materialized_columns;
|
||||
NamesAndTypesList alias_columns;
|
||||
NamesAndTypesList materialized_columns = NamesAndTypesList::parse(
|
||||
materialized_columns_str, context.getDataTypeFactory());
|
||||
NamesAndTypesList alias_columns = NamesAndTypesList::parse(alias_columns_str, context.getDataTypeFactory());
|
||||
ColumnDefaults column_defaults;
|
||||
|
||||
bool changed_version = (stat.version != columns_version);
|
||||
@ -1603,24 +1634,45 @@ void StorageReplicatedMergeTree::alterThread()
|
||||
{
|
||||
auto table_lock = lockStructureForAlter();
|
||||
|
||||
if (columns != data.getColumnsListNonMaterialized())
|
||||
const auto columns_changed = columns != data.getColumnsListNonMaterialized();
|
||||
const auto materialized_columns_changed = materialized_columns != data.materialized_columns;
|
||||
const auto alias_columns_changed = alias_columns != data.alias_columns;
|
||||
const auto column_defaults_changed = false;
|
||||
|
||||
if (columns_changed || materialized_columns_changed || alias_columns_changed ||
|
||||
column_defaults_changed)
|
||||
{
|
||||
LOG_INFO(log, "Columns list changed in ZooKeeper. Applying changes locally.");
|
||||
|
||||
InterpreterAlterQuery::updateMetadata(database_name, table_name, columns,
|
||||
materialized_columns, alias_columns, column_defaults, context);
|
||||
|
||||
this->materialized_columns = materialized_columns;
|
||||
this->alias_columns = alias_columns;
|
||||
this->column_defaults = column_defaults;
|
||||
|
||||
data.setColumnsList(columns);
|
||||
data.materialized_columns = std::move(materialized_columns);
|
||||
data.alias_columns = std::move(alias_columns);
|
||||
data.column_defaults = std::move(column_defaults);
|
||||
if (columns_changed)
|
||||
{
|
||||
data.setColumnsList(columns);
|
||||
|
||||
if (unreplicated_data)
|
||||
unreplicated_data->setColumnsList(columns);
|
||||
}
|
||||
|
||||
if (materialized_columns_changed)
|
||||
{
|
||||
this->materialized_columns = materialized_columns;
|
||||
data.materialized_columns = std::move(materialized_columns);
|
||||
}
|
||||
|
||||
if (alias_columns_changed)
|
||||
{
|
||||
this->alias_columns = alias_columns;
|
||||
data.alias_columns = std::move(alias_columns);
|
||||
}
|
||||
|
||||
if (column_defaults_changed)
|
||||
{
|
||||
this->column_defaults = column_defaults;
|
||||
data.column_defaults = std::move(column_defaults);
|
||||
}
|
||||
|
||||
if (unreplicated_data)
|
||||
unreplicated_data->setColumnsList(columns);
|
||||
LOG_INFO(log, "Applied changes to table.");
|
||||
}
|
||||
else
|
||||
@ -1647,12 +1699,14 @@ void StorageReplicatedMergeTree::alterThread()
|
||||
if (!changed_version)
|
||||
parts = data.getDataParts();
|
||||
|
||||
const auto columns_plus_materialized = data.getColumnsList();
|
||||
|
||||
for (const MergeTreeData::DataPartPtr & part : parts)
|
||||
{
|
||||
/// Обновим кусок и запишем результат во временные файлы.
|
||||
/// TODO: Можно пропускать проверку на слишком большие изменения, если в ZooKeeper есть, например,
|
||||
/// нода /flags/force_alter.
|
||||
auto transaction = data.alterDataPart(part, columns);
|
||||
auto transaction = data.alterDataPart(part, columns_plus_materialized);
|
||||
|
||||
if (!transaction)
|
||||
continue;
|
||||
@ -1676,7 +1730,7 @@ void StorageReplicatedMergeTree::alterThread()
|
||||
|
||||
for (const MergeTreeData::DataPartPtr & part : parts)
|
||||
{
|
||||
auto transaction = unreplicated_data->alterDataPart(part, columns);
|
||||
auto transaction = unreplicated_data->alterDataPart(part, columns_plus_materialized);
|
||||
|
||||
if (!transaction)
|
||||
continue;
|
||||
@ -1689,6 +1743,9 @@ void StorageReplicatedMergeTree::alterThread()
|
||||
|
||||
/// Список столбцов для конкретной реплики.
|
||||
zookeeper->set(replica_path + "/columns", columns.toString());
|
||||
zookeeper->set(replica_path + "/materialized_columns", materialized_columns.toString());
|
||||
zookeeper->set(replica_path + "/alias_columns", alias_columns.toString());
|
||||
zookeeper->set(replica_path + "/column_defaults", column_defaults.toString());
|
||||
|
||||
if (changed_version)
|
||||
{
|
||||
@ -2330,6 +2387,9 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params,
|
||||
NamesAndTypesList new_alias_columns;
|
||||
ColumnDefaults new_column_defaults;
|
||||
String new_columns_str;
|
||||
String new_materialized_columns_str;
|
||||
String new_alias_columns_str;
|
||||
String new_column_defaults_str;
|
||||
int new_columns_version;
|
||||
zkutil::Stat stat;
|
||||
|
||||
@ -2348,9 +2408,15 @@ void StorageReplicatedMergeTree::alter(const AlterCommands & params,
|
||||
params.apply(new_columns, new_materialized_columns, new_alias_columns, new_column_defaults);
|
||||
|
||||
new_columns_str = new_columns.toString();
|
||||
new_materialized_columns_str = new_materialized_columns.toString();
|
||||
new_alias_columns_str = new_alias_columns.toString();
|
||||
new_column_defaults_str = new_column_defaults.toString();
|
||||
|
||||
/// Делаем ALTER.
|
||||
zookeeper->set(zookeeper_path + "/columns", new_columns_str, -1, &stat);
|
||||
zookeeper->set(zookeeper_path + "/materialized_columns", new_materialized_columns_str, -1, &stat);
|
||||
zookeeper->set(zookeeper_path + "/alias_columns", new_alias_columns_str, -1, &stat);
|
||||
zookeeper->set(zookeeper_path + "/column_defaults", new_column_defaults_str, -1, &stat);
|
||||
|
||||
new_columns_version = stat.version;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user