First working version

This commit is contained in:
alesapin 2020-03-30 15:51:05 +03:00
parent 6812b1df5f
commit de0754ef0d
7 changed files with 99 additions and 16 deletions

View File

@ -533,7 +533,7 @@ bool AlterCommand::isRequireMutationStage(const StorageInMemoryMetadata & metada
if (ignore)
return false;
if (type == DROP_COLUMN || type == DROP_INDEX)
if (type == DROP_COLUMN || type == DROP_INDEX || type == RENAME_COLUMN)
return true;
if (type != MODIFY_COLUMN || data_type == nullptr)
@ -599,6 +599,12 @@ std::optional<MutationCommand> AlterCommand::tryConvertToMutationCommand(const S
result.predicate = nullptr;
}
else if (type == RENAME_COLUMN)
{
result.type = MutationCommand::Type::RENAME_COLUMN;
result.column_name = column_name;
result.rename_to = rename_to;
}
result.ast = ast->clone();
return result;

View File

@ -32,6 +32,8 @@ IMergeTreeReader::IMergeTreeReader(const MergeTreeData::DataPartPtr & data_part_
, all_mark_ranges(all_mark_ranges_)
, alter_conversions(storage.getAlterConversionsForPart(data_part))
{
LOG_DEBUG(&Poco::Logger::get("IMergeTreeReader"), "Columns to read:" << columns_.toString());
LOG_DEBUG(&Poco::Logger::get("IMergeTreeReader"), "Columns in part:" << data_part_->getColumns().toString());
}
IMergeTreeReader::~IMergeTreeReader() = default;

View File

@ -469,6 +469,7 @@ void MergeTreeData::setProperties(const StorageInMemoryMetadata & metadata, bool
if (!only_check)
{
LOG_DEBUG(log, "SETTING UP COLUMNS:" << metadata.columns.toString());
setColumns(std::move(metadata.columns));
order_by_ast = metadata.order_by_ast;
@ -3595,8 +3596,13 @@ MergeTreeData::AlterConversions MergeTreeData::getAlterConversionsForPart(const
AlterConversions result{};
for (const auto & command : commands)
{
if (command.type == MutationCommand::Type::RENAME_COLUMN)
{
result.rename_map[command.column_name] = command.rename_to;
LOG_DEBUG(log, "Add to rename map:" << command.column_name);
}
}
return result;
}

View File

@ -31,6 +31,8 @@
#include <iomanip>
#include <boost/algorithm/string/replace.hpp>
namespace ProfileEvents
{
extern const Event MergedRows;
@ -988,6 +990,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
splitMutationCommands(source_part, commands_for_part, for_interpreter, for_file_renames);
LOG_DEBUG(&Poco::Logger::get("MergerMutator"), "COMMANDS FOR INTERPRETER:" << for_interpreter.size());
LOG_DEBUG(&Poco::Logger::get("MergerMutator"), "COMMANDS FOR RENAMES:" << for_file_renames.size());
UInt64 watch_prev_elapsed = 0;
MergeStageProgress stage_progress(1.0);
@ -1010,6 +1015,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
/// It shouldn't be changed by mutation.
new_data_part->index_granularity_info = source_part->index_granularity_info;
new_data_part->setColumns(getColumnsForNewDataPart(source_part, updated_header, all_columns, for_file_renames));
LOG_DEBUG(&Poco::Logger::get("MergerMutator"), "New data part columns:" << new_data_part->getColumns().toString());
new_data_part->partition.assign(source_part->partition);
auto disk = new_data_part->disk;
@ -1056,19 +1062,34 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
auto indices_to_recalc = getIndicesToRecalculate(in, storage_from_source_part, updated_header.getNamesAndTypesList(), context);
NameSet files_to_skip = collectFilesToSkip(updated_header, indices_to_recalc, mrk_extension);
NameSet files_to_remove = collectFilesToRemove(source_part, for_file_renames, mrk_extension);
NameToNameMap files_to_rename = collectFilesForRenames(source_part, for_file_renames, mrk_extension);
LOG_DEBUG(&Poco::Logger::get("MergerMutator"), "FILES RENAME MAP:" << files_to_rename.size());
if (need_remove_expired_values)
files_to_skip.insert("ttl.txt");
/// Create hardlinks for unchanged files
for (auto it = disk->iterateDirectory(source_part->getFullRelativePath()); it->isValid(); it->next())
{
if (files_to_skip.count(it->name()) || files_to_remove.count(it->name()))
if (files_to_skip.count(it->name()))
continue;
String destination = new_part_tmp_path + "/" + it->name();
String destination = new_part_tmp_path + "/";
auto rename_it = files_to_rename.find(it->name());
if (rename_it != files_to_rename.end())
{
LOG_DEBUG(&Poco::Logger::get("MergerMutator"), "RENAME IT FOUND:" << rename_it->first << " to " << rename_it->second);
if (rename_it->second.empty())
continue;
destination += rename_it->second;
}
else
{
destination += it->name();
}
LOG_DEBUG(&Poco::Logger::get("MergerMutator"), "HARDLINKING FROM:" << it->path() << " TO " << destination);
disk->createHardLink(it->path(), destination);
}
@ -1090,9 +1111,19 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
need_remove_expired_values);
}
for (const String & removed_file : files_to_remove)
if (new_data_part->checksums.files.count(removed_file))
new_data_part->checksums.files.erase(removed_file);
for (const auto & [rename_from, rename_to] : files_to_rename)
{
if (rename_to.empty() && new_data_part->checksums.files.count(rename_from))
{
new_data_part->checksums.files.erase(rename_from);
}
else if (new_data_part->checksums.files.count(rename_from))
{
new_data_part->checksums.files[rename_to] = new_data_part->checksums.files[rename_from];
new_data_part->checksums.files.erase(rename_from);
}
}
finalizeMutatedPart(source_part, new_data_part, need_remove_expired_values);
}
@ -1262,7 +1293,7 @@ void MergeTreeDataMergerMutator::splitMutationCommands(
}
NameSet MergeTreeDataMergerMutator::collectFilesToRemove(
NameToNameMap MergeTreeDataMergerMutator::collectFilesForRenames(
MergeTreeData::DataPartPtr source_part, const MutationCommands & commands_for_removes, const String & mrk_extension)
{
/// Collect counts for shared streams of different columns. As an example, Nested columns have shared stream with array sizes.
@ -1277,14 +1308,14 @@ NameSet MergeTreeDataMergerMutator::collectFilesToRemove(
{});
}
NameSet remove_files;
NameToNameMap rename_map;
/// Remove old indices
for (const auto & command : commands_for_removes)
{
if (command.type == MutationCommand::Type::DROP_INDEX)
{
remove_files.emplace("skp_idx_" + command.column_name + ".idx");
remove_files.emplace("skp_idx_" + command.column_name + mrk_extension);
rename_map.emplace("skp_idx_" + command.column_name + ".idx", "");
rename_map.emplace("skp_idx_" + command.column_name + mrk_extension, "");
}
else if (command.type == MutationCommand::Type::DROP_COLUMN)
{
@ -1294,8 +1325,8 @@ NameSet MergeTreeDataMergerMutator::collectFilesToRemove(
/// Delete files if they are no longer shared with another column.
if (--stream_counts[stream_name] == 0)
{
remove_files.emplace(stream_name + ".bin");
remove_files.emplace(stream_name + mrk_extension);
rename_map.emplace(stream_name + ".bin", "");
rename_map.emplace(stream_name + mrk_extension, "");
}
};
@ -1304,9 +1335,25 @@ NameSet MergeTreeDataMergerMutator::collectFilesToRemove(
if (column)
column->type->enumerateStreams(callback, stream_path);
}
else if (command.type == MutationCommand::Type::RENAME_COLUMN)
{
LOG_DEBUG(&Poco::Logger::get("collectFilesForRenames"), "Has mutation command");
IDataType::StreamCallback callback = [&](const IDataType::SubstreamPath & substream_path)
{
String stream_from = IDataType::getFileNameForStream(command.column_name, substream_path);
String stream_to = boost::replace_first_copy(stream_from, command.column_name, command.rename_to);
rename_map.emplace(stream_from + ".bin", stream_to + ".bin");
rename_map.emplace(stream_from + mrk_extension, stream_to + mrk_extension);
};
IDataType::SubstreamPath stream_path;
auto column = source_part->getColumns().tryGetByName(command.column_name);
if (column)
column->type->enumerateStreams(callback, stream_path);
}
}
return remove_files;
return rename_map;
}
NameSet MergeTreeDataMergerMutator::collectFilesToSkip(
@ -1344,15 +1391,19 @@ NamesAndTypesList MergeTreeDataMergerMutator::getColumnsForNewDataPart(
const MutationCommands & commands_for_removes)
{
NameSet removed_columns;
NameToNameMap renamed_columns;
for (const auto & command : commands_for_removes)
{
if (command.type == MutationCommand::DROP_COLUMN)
removed_columns.insert(command.column_name);
if (command.type == MutationCommand::RENAME_COLUMN)
renamed_columns.emplace(command.rename_to, command.column_name);
}
Names source_column_names = source_part->getColumns().getNames();
NameSet source_columns_name_set(source_column_names.begin(), source_column_names.end());
for (auto it = all_columns.begin(); it != all_columns.end();)
{
LOG_DEBUG(&Poco::Logger::get("getColumnsForNewDataPart"), "Looking at column:" << it->name);
if (updated_header.has(it->name))
{
auto updated_type = updated_header.getByName(it->name).type;
@ -1364,6 +1415,10 @@ NamesAndTypesList MergeTreeDataMergerMutator::getColumnsForNewDataPart(
{
++it;
}
else if (renamed_columns.count(it->name) && source_columns_name_set.count(renamed_columns[it->name]))
{
++it;
}
else
it = all_columns.erase(it);
}

View File

@ -147,7 +147,7 @@ private:
/// Apply commands to source_part i.e. remove some columns in source_part
/// and return set of files, that have to be removed from filesystem and checksums
static NameSet collectFilesToRemove(MergeTreeData::DataPartPtr source_part, const MutationCommands & commands_for_removes, const String & mrk_extension);
static NameToNameMap collectFilesForRenames(MergeTreeData::DataPartPtr source_part, const MutationCommands & commands_for_removes, const String & mrk_extension);
/// Files, that we don't need to remove and don't need to hardlink, for example columns.txt and checksums.txt.
/// Because we will generate new versions of them after we perform mutation.

View File

@ -183,6 +183,7 @@ Pipes MergeTreeDataSelectExecutor::readFromParts(
for (const String & name : column_names_to_return)
{
LOG_DEBUG(log, "Column name to return:" << name);
if (name == "_part")
{
part_column_queried = true;
@ -209,6 +210,7 @@ Pipes MergeTreeDataSelectExecutor::readFromParts(
NamesAndTypesList available_real_columns = data.getColumns().getAllPhysical();
LOG_DEBUG(log, "Available columns:" << available_real_columns.toString());
/// If there are only virtual columns in the query, you must request at least one non-virtual one.
if (real_column_names.empty())
real_column_names.push_back(ExpressionActions::getSmallestColumn(available_real_columns));

View File

@ -213,11 +213,14 @@ void StorageMergeTree::alter(
StorageInMemoryMetadata metadata = getInMemoryMetadata();
auto maybe_mutation_commands = commands.getMutationCommands(metadata);
LOG_DEBUG(log, "Applying commands");
commands.apply(metadata);
LOG_DEBUG(log, "Commands applied");
/// This alter can be performed at metadata level only
if (commands.isSettingsAlter())
{
LOG_DEBUG(log, "Settings alter");
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
changeSettings(metadata.settings_ast, table_lock_holder);
@ -226,15 +229,18 @@ void StorageMergeTree::alter(
}
else
{
LOG_DEBUG(log, "Not settings alter");
lockStructureExclusively(table_lock_holder, context.getCurrentQueryId());
changeSettings(metadata.settings_ast, table_lock_holder);
/// Reinitialize primary key because primary key column types might have changed.
setProperties(metadata);
LOG_DEBUG(log, "Metadata setup");
setTTLExpressions(metadata.columns.getColumnTTLs(), metadata.ttl_for_table_ast);
DatabaseCatalog::instance().getDatabase(table_id.database_name)->alterTable(context, table_id.table_name, metadata);
LOG_DEBUG(log, "Data on disk changed");
/// We release all locks except alter_lock which allows
/// to execute alter queries sequentially
@ -683,10 +689,16 @@ bool StorageMergeTree::tryMutatePart()
MutationCommands commands_for_size_validation;
for (const auto & command : it->second.commands)
{
if (command.type != MutationCommand::Type::DROP_COLUMN && command.type != MutationCommand::Type::DROP_INDEX)
if (command.type != MutationCommand::Type::DROP_COLUMN
&& command.type != MutationCommand::Type::DROP_INDEX
&& command.type != MutationCommand::Type::RENAME_COLUMN)
{
commands_for_size_validation.push_back(command);
}
else
{
commands_size += command.ast->size();
}
}
if (!commands_for_size_validation.empty())