Remove alter transaction!!!

This commit is contained in:
alesapin 2020-03-17 16:49:50 +03:00
parent b0b81bec89
commit 5877a5af42
21 changed files with 107 additions and 696 deletions

View File

@ -89,7 +89,6 @@ BlockIO InterpreterAlterQuery::execute()
if (!partition_commands.empty())
{
partition_commands.validate(*table);
table->alterPartition(query_ptr, partition_commands, context);
}

View File

@ -86,16 +86,18 @@ std::optional<AlterCommand> AlterCommand::parse(const ASTAlterCommand * command_
return command;
}
else if (command_ast->type == ASTAlterCommand::DROP_COLUMN && !command_ast->partition)
else if (command_ast->type == ASTAlterCommand::DROP_COLUMN)
{
if (command_ast->clear_column)
throw Exception(R"("ALTER TABLE table CLEAR COLUMN column" queries are not supported yet. Use "CLEAR COLUMN column IN PARTITION".)", ErrorCodes::NOT_IMPLEMENTED);
AlterCommand command;
command.ast = command_ast->clone();
command.type = AlterCommand::DROP_COLUMN;
command.column_name = getIdentifierName(command_ast->column);
command.if_exists = command_ast->if_exists;
if (command_ast->clear_column)
command.clear = true;
if (command_ast->partition)
command.partition = command_ast->partition;
return command;
}
else if (command_ast->type == ASTAlterCommand::MODIFY_COLUMN)
@ -189,7 +191,7 @@ std::optional<AlterCommand> AlterCommand::parse(const ASTAlterCommand * command_
else if (command_ast->type == ASTAlterCommand::DROP_CONSTRAINT && !command_ast->partition)
{
if (command_ast->clear_column)
throw Exception(R"("ALTER TABLE table CLEAR COLUMN column" queries are not supported yet. Use "CLEAR COLUMN column IN PARTITION".)", ErrorCodes::NOT_IMPLEMENTED);
throw Exception(R"("ALTER TABLE table DROP CONSTRAINT constraint_name" queries are not supported yet. Use "DROP CONSTRAINT constraint_name IN PARTITION".)", ErrorCodes::NOT_IMPLEMENTED);
AlterCommand command;
command.ast = command_ast->clone();
@ -199,16 +201,18 @@ std::optional<AlterCommand> AlterCommand::parse(const ASTAlterCommand * command_
return command;
}
else if (command_ast->type == ASTAlterCommand::DROP_INDEX && !command_ast->partition)
else if (command_ast->type == ASTAlterCommand::DROP_INDEX)
{
if (command_ast->clear_column)
throw Exception(R"("ALTER TABLE table CLEAR INDEX index" queries are not supported yet. Use "CLEAR INDEX index IN PARTITION".)", ErrorCodes::NOT_IMPLEMENTED);
AlterCommand command;
command.ast = command_ast->clone();
command.type = AlterCommand::DROP_INDEX;
command.index_name = command_ast->index->as<ASTIdentifier &>().name;
command.if_exists = command_ast->if_exists;
if (command_ast->clear_index)
command.clear = true;
if (command_ast->partition)
command.partition = command_ast->partition;
return command;
}
@ -263,7 +267,9 @@ void AlterCommand::apply(StorageInMemoryMetadata & metadata) const
}
else if (type == DROP_COLUMN)
{
metadata.columns.remove(column_name);
/// Otherwise just clear data on disk
if (!clear && !partition)
metadata.columns.remove(column_name);
}
else if (type == MODIFY_COLUMN)
{
@ -354,23 +360,25 @@ void AlterCommand::apply(StorageInMemoryMetadata & metadata) const
}
else if (type == DROP_INDEX)
{
auto erase_it = std::find_if(
metadata.indices.indices.begin(),
metadata.indices.indices.end(),
[this](const ASTPtr & index_ast)
{
return index_ast->as<ASTIndexDeclaration &>().name == index_name;
});
if (erase_it == metadata.indices.indices.end())
if (!partition && !clear)
{
if (if_exists)
return;
throw Exception("Wrong index name. Cannot find index " + backQuote(index_name) + " to drop.",
ErrorCodes::BAD_ARGUMENTS);
}
auto erase_it = std::find_if(
metadata.indices.indices.begin(),
metadata.indices.indices.end(),
[this](const ASTPtr & index_ast)
{
return index_ast->as<ASTIndexDeclaration &>().name == index_name;
});
metadata.indices.indices.erase(erase_it);
if (erase_it == metadata.indices.indices.end())
{
if (if_exists)
return;
throw Exception("Wrong index name. Cannot find index " + backQuote(index_name) + " to drop.", ErrorCodes::BAD_ARGUMENTS);
}
metadata.indices.indices.erase(erase_it);
}
}
else if (type == ADD_CONSTRAINT)
{
@ -515,7 +523,7 @@ bool AlterCommand::isRequireMutationStage(const StorageInMemoryMetadata & metada
if (ignore)
return false;
if (type == DROP_COLUMN)
if (type == DROP_COLUMN || type == DROP_INDEX)
return true;
if (type != MODIFY_COLUMN || data_type == nullptr)
@ -564,12 +572,21 @@ std::optional<MutationCommand> AlterCommand::tryConvertToMutationCommand(const S
{
result.type = MutationCommand::Type::DROP_COLUMN;
result.column_name = column_name;
if (clear)
result.clear = true;
if (partition)
result.partition = partition;
result.predicate = nullptr;
}
else if (type == DROP_INDEX)
{
result.type = MutationCommand::Type::DROP_INDEX;
result.column_name = column_name;
result.column_name = index_name;
if (clear)
result.clear = true;
if (partition)
result.partition = partition;
result.predicate = nullptr;
}

View File

@ -41,8 +41,8 @@ struct AlterCommand
String column_name;
/// For DROP COLUMN ... FROM PARTITION
String partition_name;
/// For DROP/CLEAR COLUMN/INDEX ... IN PARTITION
ASTPtr partition;
/// For ADD and MODIFY, a new column type.
DataTypePtr data_type = nullptr;
@ -84,6 +84,9 @@ struct AlterCommand
/// indicates that this command should not be applied, for example in case of if_exists=true and column doesn't exist.
bool ignore = false;
/// Clear columns or index (don't drop from metadata)
bool clear = false;
/// For ADD and MODIFY
CompressionCodecPtr codec = nullptr;

View File

@ -44,7 +44,8 @@ using SettingsChanges = std::vector<SettingChange>;
class AlterCommands;
class MutationCommands;
class PartitionCommands;
struct PartitionCommand;
using PartitionCommands = std::vector<PartitionCommand>;
class IProcessor;
using ProcessorPtr = std::shared_ptr<IProcessor>;

View File

@ -1674,173 +1674,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::createPart(
return createPart(name, type, part_info, disk, relative_path);
}
/// This code is not used anymore in StorageReplicatedMergeTree
/// soon it will be removed from StorageMergeTree as well
/// TODO(alesap)
void MergeTreeData::alterDataPart(
const NamesAndTypesList & new_columns,
const IndicesASTs & new_indices,
bool skip_sanity_checks,
AlterDataPartTransactionPtr & transaction)
{
const auto settings = getSettings();
const auto & part = transaction->getDataPart();
auto res = analyzeAlterConversions(part->getColumns(), new_columns, getIndices().indices, new_indices);
NamesAndTypesList additional_columns;
transaction->rename_map = part->createRenameMapForAlter(res, part->getColumns());
if (!transaction->rename_map.empty())
{
WriteBufferFromOwnString out;
out << "Will ";
bool first = true;
for (const auto & [from, to] : transaction->rename_map)
{
if (!first)
out << ", ";
first = false;
if (to.empty())
out << "remove " << from;
else
out << "rename " << from << " to " << to;
}
out << " in part " << part->name;
LOG_DEBUG(log, out.str());
}
size_t num_files_to_modify = transaction->rename_map.size();
size_t num_files_to_remove = 0;
for (const auto & from_to : transaction->rename_map)
if (from_to.second.empty())
++num_files_to_remove;
if (!skip_sanity_checks
&& (num_files_to_modify > settings->max_files_to_modify_in_alter_columns
|| num_files_to_remove > settings->max_files_to_remove_in_alter_columns))
{
transaction->clear();
const bool forbidden_because_of_modify = num_files_to_modify > settings->max_files_to_modify_in_alter_columns;
std::stringstream exception_message;
exception_message
<< "Suspiciously many ("
<< (forbidden_because_of_modify ? num_files_to_modify : num_files_to_remove)
<< ") files (";
bool first = true;
for (const auto & from_to : transaction->rename_map)
{
if (!first)
exception_message << ", ";
if (forbidden_because_of_modify)
{
exception_message << "from " << backQuote(from_to.first) << " to " << backQuote(from_to.second);
first = false;
}
else if (from_to.second.empty())
{
exception_message << backQuote(from_to.first);
first = false;
}
}
exception_message
<< ") need to be "
<< (forbidden_because_of_modify ? "modified" : "removed")
<< " in part " << part->name << " of table at " << part->getFullPath() << ". Aborting just in case."
<< " If it is not an error, you could increase merge_tree/"
<< (forbidden_because_of_modify ? "max_files_to_modify_in_alter_columns" : "max_files_to_remove_in_alter_columns")
<< " parameter in configuration file (current value: "
<< (forbidden_because_of_modify ? settings->max_files_to_modify_in_alter_columns : settings->max_files_to_remove_in_alter_columns)
<< ")";
throw Exception(exception_message.str(), ErrorCodes::TABLE_DIFFERS_TOO_MUCH);
}
DataPart::Checksums add_checksums;
if (transaction->rename_map.empty() && !res.force_update_metadata)
{
transaction->clear();
return;
}
/// Apply the expression and write the result to temporary files.
if (res.expression)
{
BlockInputStreamPtr part_in = std::make_shared<MergeTreeSequentialBlockInputStream>(
*this, part, res.expression->getRequiredColumns(), false, /* take_column_types_from_storage = */ false);
auto compression_codec = global_context.chooseCompressionCodec(
part->bytes_on_disk,
static_cast<double>(part->bytes_on_disk) / this->getTotalActiveSizeInBytes());
ExpressionBlockInputStream in(part_in, res.expression);
/** Don't write offsets for arrays, because ALTER never change them
* (MODIFY COLUMN could only change types of elements but never modify array sizes).
* Also note that they does not participate in 'rename_map'.
* Also note, that for columns, that are parts of Nested,
* temporary column name ('converting_column_name') created in 'createConvertExpression' method
* will have old name of shared offsets for arrays.
*/
MergedColumnOnlyOutputStream out(
part,
in.getHeader(),
true /* sync */,
compression_codec,
true /* skip_offsets */,
/// Don't recalc indices because indices alter is restricted
std::vector<MergeTreeIndexPtr>{},
nullptr /* offset_columns */,
part->index_granularity,
&part->index_granularity_info,
true /* is_writing_temp_files */);
in.readPrefix();
out.writePrefix();
while (Block b = in.read())
out.write(b);
in.readSuffix();
/// Ugly but will be removed soon (TODO alesap)
MergeTreeData::MutableDataPartPtr mutable_part = std::const_pointer_cast<IMergeTreeDataPart>(part);
add_checksums = out.writeSuffixAndGetChecksums(mutable_part, mutable_part->checksums);
}
/// Update the checksums.
DataPart::Checksums new_checksums = part->checksums;
for (const auto & it : transaction->rename_map)
{
if (it.second.empty())
new_checksums.files.erase(it.first);
else
new_checksums.files[it.second] = add_checksums.files[it.first];
}
/// Write the checksums to the temporary file.
if (!part->checksums.empty())
{
transaction->new_checksums = new_checksums;
WriteBufferFromFile checksums_file(part->getFullPath() + "checksums.txt.tmp", 4096);
new_checksums.write(checksums_file);
transaction->rename_map["checksums.txt.tmp"] = "checksums.txt";
}
/// Write the new column list to the temporary file.
{
transaction->new_columns = new_columns.filter(part->getColumns().getNames());
WriteBufferFromFile columns_file(part->getFullPath() + "columns.txt.tmp", 4096);
transaction->new_columns.writeText(columns_file);
transaction->rename_map["columns.txt.tmp"] = "columns.txt";
}
}
void MergeTreeData::changeSettings(
const ASTPtr & new_settings,
TableStructureWriteLockHolder & /* table_lock_holder */)
@ -1892,114 +1725,6 @@ void MergeTreeData::freezeAll(const String & with_name, const Context & context,
}
bool MergeTreeData::AlterDataPartTransaction::isValid() const
{
return valid && data_part;
}
void MergeTreeData::AlterDataPartTransaction::clear()
{
valid = false;
}
void MergeTreeData::AlterDataPartTransaction::commit()
{
if (!isValid())
return;
if (!data_part)
return;
try
{
std::unique_lock<std::shared_mutex> lock(data_part->columns_lock);
String path = data_part->getFullPath();
/// NOTE: checking that a file exists before renaming or deleting it
/// is justified by the fact that, when converting an ordinary column
/// to a nullable column, new files are created which did not exist
/// before, i.e. they do not have older versions.
/// 1) Rename the old files.
for (const auto & from_to : rename_map)
{
String name = from_to.second.empty() ? from_to.first : from_to.second;
Poco::File file{path + name};
if (file.exists())
file.renameTo(path + name + ".tmp2");
}
/// 2) Move new files in the place of old and update the metadata in memory.
for (const auto & from_to : rename_map)
{
if (!from_to.second.empty())
Poco::File{path + from_to.first}.renameTo(path + from_to.second);
}
auto & mutable_part = const_cast<DataPart &>(*data_part);
mutable_part.checksums = new_checksums;
mutable_part.setColumns(new_columns);
/// 3) Delete the old files and drop required columns (DROP COLUMN)
for (const auto & from_to : rename_map)
{
String name = from_to.second.empty() ? from_to.first : from_to.second;
Poco::File file{path + name + ".tmp2"};
if (file.exists())
file.remove();
}
mutable_part.bytes_on_disk = new_checksums.getTotalSizeOnDisk();
/// TODO: we can skip resetting caches when the column is added.
data_part->storage.global_context.dropCaches();
clear();
}
catch (...)
{
/// Don't delete temporary files in the destructor in case something went wrong.
clear();
throw;
}
}
MergeTreeData::AlterDataPartTransaction::~AlterDataPartTransaction()
{
if (!isValid())
return;
if (!data_part)
return;
try
{
LOG_WARNING(data_part->storage.log, "Aborting ALTER of part " << data_part->relative_path);
String path = data_part->getFullPath();
for (const auto & from_to : rename_map)
{
if (!from_to.second.empty())
{
try
{
Poco::File file(path + from_to.first);
if (file.exists())
file.remove();
}
catch (Poco::Exception & e)
{
LOG_WARNING(data_part->storage.log, "Can't remove " << path + from_to.first << ": " << e.displayText());
}
}
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void MergeTreeData::PartsTemporaryRename::addPart(const String & old_name, const String & new_name)
{
old_and_new_names.push_back({old_name, new_name});

View File

@ -227,43 +227,6 @@ public:
void clear() { precommitted_parts.clear(); }
};
/// An object that stores the names of temporary files created in the part directory during ALTER of its
/// columns.
class AlterDataPartTransaction : private boost::noncopyable
{
public:
/// Renames temporary files, finishing the ALTER of the part.
void commit();
/// If commit() was not called, deletes temporary files, canceling the ALTER.
~AlterDataPartTransaction();
const String & getPartName() const { return data_part->name; }
/// Review the changes before the commit.
const NamesAndTypesList & getNewColumns() const { return new_columns; }
const DataPart::Checksums & getNewChecksums() const { return new_checksums; }
AlterDataPartTransaction(DataPartPtr data_part_) : data_part(data_part_) {}
const DataPartPtr & getDataPart() const { return data_part; }
bool isValid() const;
private:
friend class MergeTreeData;
void clear();
bool valid = true;
DataPartPtr data_part;
DataPart::Checksums new_checksums;
NamesAndTypesList new_columns;
/// If the value is an empty string, the file is not temporary, and it must be deleted.
NameToNameMap rename_map;
};
using AlterDataPartTransactionPtr = std::unique_ptr<AlterDataPartTransaction>;
struct PartsTemporaryRename : private boost::noncopyable
{
PartsTemporaryRename(
@ -557,16 +520,6 @@ public:
/// If something is wrong, throws an exception.
void checkAlterIsPossible(const AlterCommands & commands, const Settings & settings) override;
/// Performs ALTER of the data part, writes the result to temporary files.
/// Returns an object allowing to rename temporary files to permanent files.
/// If the number of affected columns is suspiciously high and skip_sanity_checks is false, throws an exception.
/// If no data transformations are necessary, returns nullptr.
void alterDataPart(
const NamesAndTypesList & new_columns,
const IndicesASTs & new_indices,
bool skip_sanity_checks,
AlterDataPartTransactionPtr& transaction);
/// Change MergeTreeSettings
void changeSettings(
const ASTPtr & new_settings,

View File

@ -1000,7 +1000,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
NamesAndTypesList all_columns = data.getColumns().getAllPhysical();
//LOG_DEBUG(log, "All columns:" << all_columns.toString());
LOG_DEBUG(log, "All columns:" << all_columns.toString());
//LOG_DEBUG(log, "Commands for interpreter:" << for_interpreter.size() << " commands for renames:" << for_file_renames.size());
if (!for_interpreter.empty())
@ -1023,7 +1023,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
/// It shouldn't be changed by mutation.
new_data_part->index_granularity_info = source_part->index_granularity_info;
new_data_part->setColumns(getColumnsForNewDataPart(source_part, updated_header, all_columns));
new_data_part->setColumns(getColumnsForNewDataPart(source_part, updated_header, all_columns, for_file_renames));
String new_part_tmp_path = new_data_part->getFullPath();
@ -1179,7 +1179,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
in->readPrefix();
out.writePrefix();
////LOG_DEBUG(log, "PREFIX READED");
Block block;
while (check_not_cancelled() && (block = in->read()))
{
@ -1220,7 +1219,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
/// Write a file with a description of columns.
WriteBufferFromFile out_columns(new_part_tmp_path + "columns.txt", 4096);
new_data_part->getColumns().writeText(out_columns);
} /// close
} /// close fd
new_data_part->rows_count = source_part->rows_count;
new_data_part->index_granularity = source_part->index_granularity;
@ -1454,7 +1453,6 @@ NameSet MergeTreeDataMergerMutator::collectFilesToSkip(
IDataType::StreamCallback callback = [&](const IDataType::SubstreamPath & substream_path)
{
String stream_name = IDataType::getFileNameForStream(entry.name, substream_path);
//LOG_DEBUG(log, "Collect to skip:" << stream_name);
files_to_skip.insert(stream_name + ".bin");
files_to_skip.insert(stream_name + mrk_extension);
};
@ -1473,8 +1471,17 @@ NameSet MergeTreeDataMergerMutator::collectFilesToSkip(
NamesAndTypesList MergeTreeDataMergerMutator::getColumnsForNewDataPart(
MergeTreeData::DataPartPtr source_part, const Block & updated_header, NamesAndTypesList all_columns) const
MergeTreeData::DataPartPtr source_part,
const Block & updated_header,
NamesAndTypesList all_columns,
const MutationCommands & commands_for_removes) const
{
NameSet removed_columns;
for (const auto & command : commands_for_removes)
{
if (command.type == MutationCommand::DROP_COLUMN)
removed_columns.insert(command.column_name);
}
Names source_column_names = source_part->getColumns().getNames();
NameSet source_columns_name_set(source_column_names.begin(), source_column_names.end());
for (auto it = all_columns.begin(); it != all_columns.end();)
@ -1486,7 +1493,7 @@ NamesAndTypesList MergeTreeDataMergerMutator::getColumnsForNewDataPart(
it->type = updated_type;
++it;
}
else if (source_columns_name_set.count(it->name))
else if (source_columns_name_set.count(it->name) && !removed_columns.count(it->name))
{
++it;
}

View File

@ -154,7 +154,11 @@ private:
NameSet collectFilesToSkip(const Block & updated_header, const std::set<MergeTreeIndexPtr> & indices_to_recalc, const String & mrk_extension) const;
/// Get the columns list of the resulting part in the same order as all_columns.
NamesAndTypesList getColumnsForNewDataPart(MergeTreeData::DataPartPtr source_part, const Block & updated_header, NamesAndTypesList all_columns) const;
NamesAndTypesList getColumnsForNewDataPart(
MergeTreeData::DataPartPtr source_part,
const Block & updated_header,
NamesAndTypesList all_columns,
const MutationCommands & commands_for_removes) const;
bool shouldExecuteTTL(const Names & columns, const MutationCommands & commands) const;

View File

@ -46,6 +46,7 @@ void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const
out << new_part_name;
break;
/// NOTE: Deprecated.
case CLEAR_COLUMN:
out << "clear_column\n"
<< escape << column_name
@ -53,6 +54,7 @@ void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const
<< new_part_name;
break;
/// NOTE: Deprecated.
case CLEAR_INDEX:
out << "clear_index\n"
<< escape << index_name
@ -155,12 +157,12 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in)
detach = type_str == "detach";
in >> new_part_name;
}
else if (type_str == "clear_column")
else if (type_str == "clear_column") /// NOTE: Deprecated.
{
type = CLEAR_COLUMN;
in >> escape >> column_name >> "\nfrom\n" >> new_part_name;
}
else if (type_str == "clear_index")
else if (type_str == "clear_index") /// NOTE: Deprecated.
{
type = CLEAR_INDEX;
in >> escape >> index_name >> "\nfrom\n" >> new_part_name;

View File

@ -32,8 +32,8 @@ struct ReplicatedMergeTreeLogEntryData
GET_PART, /// Get the part from another replica.
MERGE_PARTS, /// Merge the parts.
DROP_RANGE, /// Delete the parts in the specified partition in the specified number range.
CLEAR_COLUMN, /// Drop specific column from specified partition.
CLEAR_INDEX, /// Drop specific index from specified partition.
CLEAR_COLUMN, /// NOTE: Deprecated. Drop specific column from specified partition.
CLEAR_INDEX, /// NOTE: Deprecated. Drop specific index from specified partition.
REPLACE_RANGE, /// Drop certain range of partitions and replace them by new ones
MUTATE_PART, /// Apply one or several mutations to the part.
ALTER_METADATA, /// Apply alter modification according to global /metadata and /columns paths

View File

@ -10,6 +10,8 @@
#include <Common/typeid_cast.h>
#include <Common/quoteString.h>
#include <DataTypes/DataTypeFactory.h>
#include <Parsers/queryToString.h>
#include <common/logger_useful.h>
namespace DB
@ -21,7 +23,7 @@ namespace ErrorCodes
extern const int MULTIPLE_ASSIGNMENTS_TO_COLUMN;
}
std::optional<MutationCommand> MutationCommand::parse(ASTAlterCommand * command, bool from_zookeeper)
std::optional<MutationCommand> MutationCommand::parse(ASTAlterCommand * command, bool parse_alter_commands)
{
if (command->type == ASTAlterCommand::DELETE)
{
@ -57,7 +59,7 @@ std::optional<MutationCommand> MutationCommand::parse(ASTAlterCommand * command,
res.index_name = command->index->as<ASTIdentifier &>().name;
return res;
}
else if (from_zookeeper && command->type == ASTAlterCommand::MODIFY_COLUMN)
else if (parse_alter_commands && command->type == ASTAlterCommand::MODIFY_COLUMN)
{
MutationCommand res;
res.ast = command->ptr();
@ -67,20 +69,30 @@ std::optional<MutationCommand> MutationCommand::parse(ASTAlterCommand * command,
res.data_type = DataTypeFactory::instance().get(ast_col_decl.type);
return res;
}
else if (from_zookeeper && command->type == ASTAlterCommand::DROP_COLUMN)
else if (parse_alter_commands && command->type == ASTAlterCommand::DROP_COLUMN)
{
MutationCommand res;
res.ast = command->ptr();
res.type = MutationCommand::Type::DROP_COLUMN;
res.column_name = getIdentifierName(command->column);
if (command->partition)
res.partition = command->partition;
if (command->clear_column)
res.clear = true;
return res;
}
else if (from_zookeeper && command->type == ASTAlterCommand::DROP_INDEX)
else if (parse_alter_commands && command->type == ASTAlterCommand::DROP_INDEX)
{
MutationCommand res;
res.ast = command->ptr();
res.type = MutationCommand::Type::DROP_INDEX;
res.column_name = command->index->as<ASTIdentifier &>().name;
if (command->partition)
res.partition = command->partition;
if (command->clear_index)
res.clear = true;
return res;
}
else if (command->type == ASTAlterCommand::MATERIALIZE_TTL)
{

View File

@ -50,8 +50,11 @@ struct MutationCommand
String column_name;
DataTypePtr data_type; /// Maybe empty if we just want to drop column
/// If from_zookeeper, than consider more Alter commands as mutation commands
static std::optional<MutationCommand> parse(ASTAlterCommand * command, bool from_zookeeper=false);
/// We need just clear column, not drop from metadata.
bool clear = false;
/// If parse_alter_commands, than consider more Alter commands as mutation commands
static std::optional<MutationCommand> parse(ASTAlterCommand * command, bool parse_alter_commands=false);
};
/// Multiple mutation commands, possible from different ALTER queries

View File

@ -92,28 +92,6 @@ std::optional<PartitionCommand> PartitionCommand::parse(const ASTAlterCommand *
res.with_name = command_ast->with_name;
return res;
}
else if (command_ast->type == ASTAlterCommand::DROP_COLUMN && command_ast->partition)
{
if (!command_ast->clear_column)
throw Exception("Can't DROP COLUMN from partition. It is possible only to CLEAR COLUMN in partition", ErrorCodes::BAD_ARGUMENTS);
PartitionCommand res;
res.type = CLEAR_COLUMN;
res.partition = command_ast->partition;
res.column_name = getIdentifierName(command_ast->column);
return res;
}
else if (command_ast->type == ASTAlterCommand::DROP_INDEX && command_ast->partition)
{
if (!command_ast->clear_index)
throw Exception("Can't DROP INDEX from partition. It is possible only to CLEAR INDEX in partition", ErrorCodes::BAD_ARGUMENTS);
PartitionCommand res;
res.type = CLEAR_INDEX;
res.partition = command_ast->partition;
res.index_name = getIdentifierName(command_ast->index);
return res;
}
else if (command_ast->type == ASTAlterCommand::FREEZE_ALL)
{
PartitionCommand command;
@ -124,32 +102,4 @@ std::optional<PartitionCommand> PartitionCommand::parse(const ASTAlterCommand *
else
return {};
}
void PartitionCommands::validate(const IStorage & table)
{
for (const PartitionCommand & command : *this)
{
if (command.type == PartitionCommand::CLEAR_COLUMN)
{
String column_name = command.column_name.safeGet<String>();
if (!table.getColumns().hasPhysical(column_name))
{
throw Exception("Wrong column name. Cannot find column " + column_name + " to clear it from partition",
DB::ErrorCodes::ILLEGAL_COLUMN);
}
}
else if (command.type == PartitionCommand::CLEAR_INDEX)
{
String index_name = command.index_name.safeGet<String>();
if (!table.getIndices().has(index_name))
{
throw Exception("Wrong index name. Cannot find index " + index_name + " to clear it from partition",
DB::ErrorCodes::BAD_ARGUMENTS);
}
}
}
}
}

View File

@ -20,8 +20,6 @@ struct PartitionCommand
{
ATTACH_PARTITION,
MOVE_PARTITION,
CLEAR_COLUMN,
CLEAR_INDEX,
DROP_PARTITION,
DROP_DETACHED_PARTITION,
FETCH_PARTITION,
@ -33,8 +31,6 @@ struct PartitionCommand
Type type;
ASTPtr partition;
Field column_name;
Field index_name;
/// true for DETACH PARTITION.
bool detach = false;
@ -71,11 +67,7 @@ struct PartitionCommand
static std::optional<PartitionCommand> parse(const ASTAlterCommand * command);
};
class PartitionCommands : public std::vector<PartitionCommand>
{
public:
void validate(const IStorage & table);
};
using PartitionCommands = std::vector<PartitionCommand>;
}

View File

@ -890,57 +890,6 @@ void StorageMergeTree::clearOldMutations(bool truncate)
}
}
void StorageMergeTree::clearColumnOrIndexInPartition(const ASTPtr & partition, const AlterCommand & alter_command, const Context & context)
{
/// Asks to complete merges and moves and does not allow them to start.
/// This protects against "revival" of data for a removed partition after completion of merge.
auto merge_blocker = merger_mutator.merges_blocker.cancel();
auto move_blocker = parts_mover.moves_blocker.cancel();
/// We don't change table structure, only data in some parts, parts are locked inside alterDataPart() function
auto lock_read_structure = lockStructureForShare(false, context.getCurrentQueryId());
String partition_id = getPartitionIDFromQuery(partition, context);
auto parts = getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id);
std::vector<AlterDataPartTransactionPtr> transactions;
StorageInMemoryMetadata metadata = getInMemoryMetadata();
alter_command.apply(metadata);
auto columns_for_parts = metadata.columns.getAllPhysical();
for (const auto & part : parts)
{
if (part->info.partition_id != partition_id)
throw Exception("Unexpected partition ID " + part->info.partition_id + ". This is a bug.", ErrorCodes::LOGICAL_ERROR);
MergeTreeData::AlterDataPartTransactionPtr transaction(new MergeTreeData::AlterDataPartTransaction(part));
alterDataPart(columns_for_parts, metadata.indices.indices, false, transaction);
if (transaction->isValid())
transactions.push_back(std::move(transaction));
if (alter_command.type == AlterCommand::DROP_COLUMN)
LOG_DEBUG(log, "Removing column " << alter_command.column_name << " from part " << part->name);
else if (alter_command.type == AlterCommand::DROP_INDEX)
LOG_DEBUG(log, "Removing index " << alter_command.index_name << " from part " << part->name);
}
if (transactions.empty())
return;
for (auto & transaction : transactions)
{
transaction->commit();
transaction.reset();
}
/// Recalculate columns size (not only for the modified column)
recalculateColumnSizes();
}
bool StorageMergeTree::optimize(
const ASTPtr & /*query*/, const ASTPtr & partition, bool final, bool deduplicate, const Context & context)
{
@ -1054,24 +1003,6 @@ void StorageMergeTree::alterPartition(const ASTPtr & query, const PartitionComma
}
break;
case PartitionCommand::CLEAR_COLUMN:
{
AlterCommand alter_command;
alter_command.type = AlterCommand::DROP_COLUMN;
alter_command.column_name = get<String>(command.column_name);
clearColumnOrIndexInPartition(command.partition, alter_command, context);
}
break;
case PartitionCommand::CLEAR_INDEX:
{
AlterCommand alter_command;
alter_command.type = AlterCommand::DROP_INDEX;
alter_command.index_name = get<String>(command.index_name);
clearColumnOrIndexInPartition(command.partition, alter_command, context);
}
break;
case PartitionCommand::FREEZE_ALL_PARTITIONS:
{
auto lock = lockStructureForShare(false, context.getCurrentQueryId());

View File

@ -132,7 +132,6 @@ private:
// Partition helpers
void dropPartition(const ASTPtr & partition, bool detach, const Context & context);
void clearColumnOrIndexInPartition(const ASTPtr & partition, const AlterCommand & alter_command, const Context & context);
void attachPartition(const ASTPtr & partition, bool part, const Context & context);
void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & context);
void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, const Context & context);

View File

@ -884,12 +884,6 @@ bool StorageReplicatedMergeTree::executeLogEntry(LogEntry & entry)
return true;
}
if (entry.type == LogEntry::CLEAR_COLUMN || entry.type == LogEntry::CLEAR_INDEX)
{
executeClearColumnOrIndexInPartition(entry);
return true;
}
if (entry.type == LogEntry::REPLACE_RANGE)
{
executeReplaceRange(entry);
@ -1474,75 +1468,6 @@ void StorageReplicatedMergeTree::executeDropRange(const LogEntry & entry)
}
void StorageReplicatedMergeTree::executeClearColumnOrIndexInPartition(const LogEntry & entry)
{
LOG_INFO(log, "Clear column " << entry.column_name << " in parts inside " << entry.new_part_name << " range");
auto entry_part_info = MergeTreePartInfo::fromPartName(entry.new_part_name, format_version);
/// We don't change table structure, only data in some parts
/// To disable reading from these parts, we will sequentially acquire write lock for each part inside alterDataPart()
/// If we will lock the whole table here, a deadlock can occur. For example, if use use Buffer table (CLICKHOUSE-3238)
auto lock_read_structure = lockStructureForShare(false, RWLockImpl::NO_QUERY);
auto zookeeper = getZooKeeper();
AlterCommand alter_command;
if (entry.type == LogEntry::CLEAR_COLUMN)
{
alter_command.type = AlterCommand::DROP_COLUMN;
alter_command.column_name = entry.column_name;
}
else if (entry.type == LogEntry::CLEAR_INDEX)
{
alter_command.type = AlterCommand::DROP_INDEX;
alter_command.index_name = entry.index_name;
}
StorageInMemoryMetadata metadata = getInMemoryMetadata();
alter_command.apply(metadata);
size_t modified_parts = 0;
auto parts = getDataParts();
auto columns_for_parts = metadata.columns.getAllPhysical();
/// Check there are no merges in range again
/// TODO: Currently, there are no guarantees that a merge covering entry_part_info will happen during the execution.
/// To solve this problem we could add read/write flags for each part in future_parts
/// and make more sophisticated checks for merges in shouldExecuteLogEntry().
/// But this feature will be useless when the mutation feature is implemented.
queue.checkThereAreNoConflictsInRange(entry_part_info, entry);
for (const auto & part : parts)
{
if (!entry_part_info.contains(part->info))
continue;
if (entry.type == LogEntry::CLEAR_COLUMN)
LOG_DEBUG(log, "Clearing column " << alter_command.column_name << " in part " << part->name);
else if (entry.type == LogEntry::CLEAR_INDEX)
LOG_DEBUG(log, "Clearing index " << alter_command.index_name << " in part " << part->name);
MergeTreeData::AlterDataPartTransactionPtr transaction(new MergeTreeData::AlterDataPartTransaction(part));
alterDataPart(columns_for_parts, metadata.indices.indices, false, transaction);
if (!transaction->isValid())
continue;
updatePartHeaderInZooKeeperAndCommit(zookeeper, *transaction);
++modified_parts;
}
if (entry.type == LogEntry::CLEAR_COLUMN)
LOG_DEBUG(log, "Cleared column " << entry.column_name << " in " << modified_parts << " parts");
else if (entry.type == LogEntry::CLEAR_INDEX)
LOG_DEBUG(log, "Cleared index " << entry.index_name << " in " << modified_parts << " parts");
/// Recalculate columns size (not only for the modified column)
recalculateColumnSizes();
}
bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
{
Stopwatch watch;
@ -3499,24 +3424,6 @@ void StorageReplicatedMergeTree::alterPartition(const ASTPtr & query, const Part
}
break;
case PartitionCommand::CLEAR_COLUMN:
{
LogEntry entry;
entry.type = LogEntry::CLEAR_COLUMN;
entry.column_name = command.column_name.safeGet<String>();
clearColumnOrIndexInPartition(command.partition, std::move(entry), query_context);
}
break;
case PartitionCommand::CLEAR_INDEX:
{
LogEntry entry;
entry.type = LogEntry::CLEAR_INDEX;
entry.index_name = command.index_name.safeGet<String>();
clearColumnOrIndexInPartition(command.partition, std::move(entry), query_context);
}
break;
case PartitionCommand::FREEZE_ALL_PARTITIONS:
{
auto lock = lockStructureForShare(false, query_context.getCurrentQueryId());
@ -3579,40 +3486,6 @@ bool StorageReplicatedMergeTree::getFakePartCoveringAllPartsInPartition(const St
}
void StorageReplicatedMergeTree::clearColumnOrIndexInPartition(
const ASTPtr & partition, LogEntry && entry, const Context & query_context)
{
assertNotReadonly();
/// We don't block merges, so anyone can manage this task (not only leader)
String partition_id = getPartitionIDFromQuery(partition, query_context);
MergeTreePartInfo drop_range_info;
if (!getFakePartCoveringAllPartsInPartition(partition_id, drop_range_info))
{
LOG_INFO(log, "Will not clear partition " << partition_id << ", it is empty.");
return;
}
/// We allocated new block number for this part, so new merges can't merge clearing parts with new ones
entry.new_part_name = getPartNamePossiblyFake(format_version, drop_range_info);
entry.create_time = time(nullptr);
String log_znode_path = getZooKeeper()->create(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential);
entry.znode_name = log_znode_path.substr(log_znode_path.find_last_of('/') + 1);
/// If necessary, wait until the operation is performed on itself or on all replicas.
if (query_context.getSettingsRef().replication_alter_partitions_sync != 0)
{
if (query_context.getSettingsRef().replication_alter_partitions_sync == 1)
waitForReplicaToProcessLogEntry(replica_name, entry);
else
waitForAllReplicasToProcessLogEntry(entry);
}
}
void StorageReplicatedMergeTree::dropPartition(const ASTPtr & query, const ASTPtr & partition, bool detach, const Context & query_context)
{
assertNotReadonly();
@ -5272,59 +5145,6 @@ void StorageReplicatedMergeTree::getCommitPartOps(
}
}
void StorageReplicatedMergeTree::updatePartHeaderInZooKeeperAndCommit(
const zkutil::ZooKeeperPtr & zookeeper,
AlterDataPartTransaction & transaction)
{
String part_path = replica_path + "/parts/" + transaction.getPartName();
const auto storage_settings_ptr = getSettings();
bool need_delete_columns_and_checksums_nodes = false;
try
{
if (storage_settings_ptr->use_minimalistic_part_header_in_zookeeper)
{
auto part_header = ReplicatedMergeTreePartHeader::fromColumnsAndChecksums(
transaction.getNewColumns(), transaction.getNewChecksums());
Coordination::Stat stat;
zookeeper->set(part_path, part_header.toString(), -1, &stat);
need_delete_columns_and_checksums_nodes = stat.numChildren > 0;
}
else
{
Coordination::Requests ops;
ops.emplace_back(zkutil::makeSetRequest(
part_path, String(), -1));
ops.emplace_back(zkutil::makeSetRequest(
part_path + "/columns", transaction.getNewColumns().toString(), -1));
ops.emplace_back(zkutil::makeSetRequest(
part_path + "/checksums", getChecksumsForZooKeeper(transaction.getNewChecksums()), -1));
zookeeper->multi(ops);
}
}
catch (const Coordination::Exception & e)
{
/// The part does not exist in ZK. We will add to queue for verification - maybe the part is superfluous, and it must be removed locally.
if (e.code == Coordination::ZNONODE)
enqueuePartForCheck(transaction.getPartName());
throw;
}
/// Apply file changes.
transaction.commit();
/// Legacy <part_path>/columns and <part_path>/checksums znodes are not needed anymore and can be deleted.
if (need_delete_columns_and_checksums_nodes)
{
Coordination::Requests ops;
ops.emplace_back(zkutil::makeRemoveRequest(part_path + "/columns", -1));
ops.emplace_back(zkutil::makeRemoveRequest(part_path + "/checksums", -1));
zookeeper->multi(ops);
}
}
ReplicatedMergeTreeAddress StorageReplicatedMergeTree::getReplicatedMergeTreeAddress() const
{
auto host_port = global_context.getInterserverIOAddress();

View File

@ -334,11 +334,6 @@ private:
void getCommitPartOps(Coordination::Requests & ops, MutableDataPartPtr & part, const String & block_id_path = "") const;
/// Updates info about part columns and checksums in ZooKeeper and commits transaction if successful.
void updatePartHeaderInZooKeeperAndCommit(
const zkutil::ZooKeeperPtr & zookeeper,
AlterDataPartTransaction & transaction);
/// Adds actions to `ops` that remove a part from ZooKeeper.
/// Set has_children to true for "old-style" parts (those with /columns and /checksums child znodes).
void removePartFromZooKeeper(const String & part_name, Coordination::Requests & ops, bool has_children);
@ -382,8 +377,6 @@ private:
/// If fetch was not successful, clears entry.actual_new_part_name.
bool executeFetch(LogEntry & entry);
void executeClearColumnOrIndexInPartition(const LogEntry & entry);
bool executeReplaceRange(const LogEntry & entry);
/** Updates the queue.
@ -518,7 +511,6 @@ private:
std::optional<Cluster::Address> findClusterAddress(const ReplicatedMergeTreeAddress & leader_address) const;
// Partition helpers
void clearColumnOrIndexInPartition(const ASTPtr & partition, LogEntry && entry, const Context & query_context);
void dropPartition(const ASTPtr & query, const ASTPtr & partition, bool detach, const Context & query_context);
void attachPartition(const ASTPtr & partition, bool part, const Context & query_context);
void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, const Context & query_context);

View File

@ -10,7 +10,7 @@ $ch "DROP TABLE IF EXISTS clear_column2"
$ch "CREATE TABLE clear_column1 (d Date, i Int64, s String) ENGINE = ReplicatedMergeTree('/clickhouse/test/tables/clear_column', '1', d, d, 8192)"
$ch "CREATE TABLE clear_column2 (d Date, i Int64, s String) ENGINE = ReplicatedMergeTree('/clickhouse/test/tables/clear_column', '2', d, d, 8192)"
$ch "ALTER TABLE clear_column1 CLEAR COLUMN VasyaUnexistingColumn IN PARTITION '200001'" 1>/dev/null 2>/dev/null
$ch "ALTER TABLE clear_column1 CLEAR COLUMN VasyaUnexistingColumn IN PARTITION '200001'" --replication_alter_partitions_sync=2 1>/dev/null 2>/dev/null
rc=$?
if [ $rc -eq 0 ]; then
echo "An unexisisting column was ALTERed. Code: $rc"
@ -21,12 +21,12 @@ set -e
$ch "INSERT INTO clear_column1 VALUES ('2000-01-01', 1, 'a'), ('2000-02-01', 2, 'b')"
$ch "INSERT INTO clear_column1 VALUES ('2000-01-01', 3, 'c'), ('2000-02-01', 4, 'd')"
for i in `seq 3`; do
for i in `seq 10`; do
$ch "INSERT INTO clear_column1 VALUES ('2000-02-01', 0, ''), ('2000-02-01', 0, '')" & # insert into the same partition
$ch "ALTER TABLE clear_column1 CLEAR COLUMN i IN PARTITION '200001'" --replication_alter_partitions_sync=2 &
$ch "ALTER TABLE clear_column1 CLEAR COLUMN s IN PARTITION '200001'" --replication_alter_partitions_sync=2 &
$ch "ALTER TABLE clear_column1 CLEAR COLUMN i IN PARTITION '200002'" --replication_alter_partitions_sync=2 &
$ch "ALTER TABLE clear_column1 CLEAR COLUMN s IN PARTITION '200002'" --replication_alter_partitions_sync=2 &
$ch "ALTER TABLE clear_column1 CLEAR COLUMN i IN PARTITION '200001'" --replication_alter_partitions_sync=2
$ch "ALTER TABLE clear_column1 CLEAR COLUMN s IN PARTITION '200001'" --replication_alter_partitions_sync=2
$ch "ALTER TABLE clear_column1 CLEAR COLUMN i IN PARTITION '200002'" --replication_alter_partitions_sync=2
$ch "ALTER TABLE clear_column1 CLEAR COLUMN s IN PARTITION '200002'" --replication_alter_partitions_sync=2
$ch "INSERT INTO clear_column1 VALUES ('2000-03-01', 3, 'c'), ('2000-03-01', 4, 'd')" & # insert into other partition
done
wait

View File

@ -1,5 +1,7 @@
SELECT '===Ordinary case===';
SET replication_alter_partitions_sync = 2;
DROP TABLE IF EXISTS clear_column;
CREATE TABLE clear_column (d Date, num Int64, str String) ENGINE = MergeTree(d, d, 8192);

View File

@ -35,8 +35,7 @@ $CLICKHOUSE_CLIENT --query="INSERT INTO test.minmax_idx VALUES
$CLICKHOUSE_CLIENT --query="SELECT count() FROM test.minmax_idx WHERE i64 = 2;"
$CLICKHOUSE_CLIENT --query="SELECT count() FROM test.minmax_idx WHERE i64 = 2 FORMAT JSON" | grep "rows_read"
$CLICKHOUSE_CLIENT --query="ALTER TABLE test.minmax_idx CLEAR INDEX idx IN PARTITION 1;"
sleep 0.5
$CLICKHOUSE_CLIENT --query="ALTER TABLE test.minmax_idx CLEAR INDEX idx IN PARTITION 1;" --replication_alter_partitions_sync=2
$CLICKHOUSE_CLIENT --query="SELECT count() FROM test.minmax_idx WHERE i64 = 2;"
$CLICKHOUSE_CLIENT --query="SELECT count() FROM test.minmax_idx WHERE i64 = 2 FORMAT JSON" | grep "rows_read"