More clear logic

This commit is contained in:
alesapin 2020-05-19 13:44:53 +03:00
parent 111c576579
commit 2b37a418f2

View File

@ -1013,11 +1013,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
const auto data_settings = data.getSettings(); const auto data_settings = data.getSettings();
MutationCommands for_interpreter, for_file_renames; MutationCommands for_interpreter, for_file_renames;
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "COMMANDS FOR PART:" << commands_for_part.size());
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "TOTAL COLUMNS:" << commands.size());
splitMutationCommands(source_part, commands_for_part, for_interpreter, for_file_renames); splitMutationCommands(source_part, commands_for_part, for_interpreter, for_file_renames);
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "COMMANDS FOR INTERPRETER:" << for_interpreter.size());
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "COMMANDS FOR FILE RENAMES:" << for_file_renames.size());
UInt64 watch_prev_elapsed = 0; UInt64 watch_prev_elapsed = 0;
MergeStageProgress stage_progress(1.0); MergeStageProgress stage_progress(1.0);
@ -1030,9 +1026,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
in = interpreter->execute(table_lock_holder); in = interpreter->execute(table_lock_holder);
updated_header = interpreter->getUpdatedHeader(); updated_header = interpreter->getUpdatedHeader();
in->setProgressCallback(MergeProgressCallback(merge_entry, watch_prev_elapsed, stage_progress)); in->setProgressCallback(MergeProgressCallback(merge_entry, watch_prev_elapsed, stage_progress));
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "UPDATED HEADER:" << updated_header.dumpStructure());
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "STREAM HEADER:" << in->getHeader().dumpStructure());
} }
auto new_data_part = data.createPart( auto new_data_part = data.createPart(
@ -1262,85 +1255,143 @@ void MergeTreeDataMergerMutator::splitMutationCommands(
MutationCommands & for_interpreter, MutationCommands & for_interpreter,
MutationCommands & for_file_renames) MutationCommands & for_file_renames)
{ {
NameSet removed_columns_from_compact_part;
NameSet already_changed_columns;
bool is_compact_part = isCompactPart(part);
ColumnsDescription part_columns(part->getColumns()); ColumnsDescription part_columns(part->getColumns());
for (const auto & command : commands) if (isCompactPart(part))
{ {
if (command.type == MutationCommand::Type::DELETE NameSet mutated_columns;
|| command.type == MutationCommand::Type::UPDATE for (const auto & command : commands)
|| command.type == MutationCommand::Type::MATERIALIZE_INDEX
|| command.type == MutationCommand::Type::MATERIALIZE_TTL)
{ {
for_interpreter.push_back(command); if (command.type == MutationCommand::Type::MATERIALIZE_INDEX || command.type == MutationCommand::Type::MATERIALIZE_TTL
for (const auto & [column_name, expr] : command.column_to_update_expression) || command.type == MutationCommand::Type::DELETE || command.type == MutationCommand::Type::UPDATE)
already_changed_columns.emplace(column_name);
}
else if (command.type == MutationCommand::Type::READ_COLUMN && part_columns.has(command.column_name))
{
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "READ COLUMN:" << command.column_name);
/// If we don't have this column in source part, than we don't
/// need to materialize it
if (part_columns.has(command.column_name))
{ {
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "PART HAS COLUMN:" << command.column_name);
for_interpreter.push_back(command); for_interpreter.push_back(command);
already_changed_columns.emplace(command.column_name); for (const auto & [column_name, expr] : command.column_to_update_expression)
mutated_columns.emplace(column_name);
} }
else else if (part_columns.has(command.column_name))
for_file_renames.push_back(command);
}
else if (is_compact_part && command.type == MutationCommand::Type::DROP_COLUMN && part_columns.has(command.column_name))
{
removed_columns_from_compact_part.emplace(command.column_name);
for_file_renames.push_back(command);
part_columns.remove(command.column_name);
}
else if (command.type == MutationCommand::Type::RENAME_COLUMN && part_columns.has(command.column_name))
{
if (is_compact_part)
{ {
for_interpreter.push_back( if (command.type == MutationCommand::Type::DROP_COLUMN)
{ {
.type = MutationCommand::Type::READ_COLUMN, mutated_columns.emplace(command.column_name);
.column_name = command.rename_to, }
}); else if (command.type == MutationCommand::Type::RENAME_COLUMN)
already_changed_columns.emplace(command.column_name); {
part_columns.rename(command.column_name, command.rename_to); for_interpreter.push_back({
} .type = MutationCommand::Type::READ_COLUMN,
else .column_name = command.rename_to,
{ });
part_columns.rename(command.column_name, command.rename_to); mutated_columns.emplace(command.column_name);
for_file_renames.push_back(command); part_columns.rename(command.column_name, command.rename_to);
}
} }
} }
else if (part_columns.has(command.column_name)) /// If it's compact part than we don't need to actually remove files
{ /// from disk we just don't read dropped columns
for_file_renames.push_back(command);
}
}
if (is_compact_part)
{
/// If it's compact part than we don't need to actually remove files from disk
/// we just don't read dropped columns
for (const auto & column : part->getColumns()) for (const auto & column : part->getColumns())
{ {
if (!removed_columns_from_compact_part.count(column.name) if (!mutated_columns.count(column.name))
&& !already_changed_columns.count(column.name)) for_interpreter.emplace_back(
MutationCommand{.type = MutationCommand::Type::READ_COLUMN, .column_name = column.name, .data_type = column.type});
}
}
else
{
for (const auto & command : commands)
{
if (command.type == MutationCommand::Type::MATERIALIZE_INDEX || command.type == MutationCommand::Type::MATERIALIZE_TTL
|| command.type == MutationCommand::Type::DELETE || command.type == MutationCommand::Type::UPDATE)
{ {
for_interpreter.emplace_back(MutationCommand for_interpreter.push_back(command);
}
/// If we don't have this column in source part, than we don't need
/// to materialize it
else if (part_columns.has(command.column_name))
{
if (command.type == MutationCommand::Type::READ_COLUMN)
{ {
.type = MutationCommand::Type::READ_COLUMN, for_interpreter.push_back(command);
.column_name = column.name, }
.data_type = column.type else if (command.type == MutationCommand::Type::RENAME_COLUMN)
}); {
part_columns.rename(command.column_name, command.rename_to);
for_file_renames.push_back(command);
}
else
{
for_file_renames.push_back(command);
}
} }
} }
} }
//for (const auto & command : commands)
//{
// if (command.type == MutationCommand::Type::MATERIALIZE_INDEX || command.type == MutationCommand::Type::MATERIALIZE_TTL)
// {
// for_interpreter.push_back(command);
// for (const auto & [column_name, expr] : command.column_to_update_expression)
// already_changed_columns.emplace(column_name);
// }
// if (command.type == MutationCommand::Type::DELETE
// || command.type == MutationCommand::Type::UPDATE
// ||
// || )
// {
// }
// else if (command.type == MutationCommand::Type::READ_COLUMN && )
// {
// if (part_columns.has(command.column_name))
// {
// for_interpreter.push_back(command);
// already_changed_columns.emplace(command.column_name);
// }
// else
// for_file_renames.push_back(command);
// }
// else if (is_compact_part && command.type == MutationCommand::Type::DROP_COLUMN && part_columns.has(command.column_name))
// {
// for_file_renames.push_back(command);
// part_columns.remove(command.column_name);
// }
// else if (command.type == MutationCommand::Type::RENAME_COLUMN && part_columns.has(command.column_name))
// {
// if (is_compact_part)
// {
// for_interpreter.push_back(
// {
// .type = MutationCommand::Type::READ_COLUMN,
// .column_name = command.rename_to,
// });
// already_changed_columns.emplace(command.column_name);
// part_columns.rename(command.column_name, command.rename_to);
// }
// else
// {
// }
// }
// else if (part_columns.has(command.column_name))
// {
// for_file_renames.push_back(command);
// }
//}
//if (is_compact_part)
//{
// for (const auto & column : part->getColumns())
// {
// if (!removed_columns_from_compact_part.count(column.name)
// && !already_changed_columns.count(column.name))
// {
// for_interpreter.emplace_back(MutationCommand
// {
// .type = MutationCommand::Type::READ_COLUMN,
// .column_name = column.name,
// .data_type = column.type
// });
// }
// }
//}
} }
@ -1447,9 +1498,11 @@ NamesAndTypesList MergeTreeDataMergerMutator::getColumnsForNewDataPart(
NamesAndTypesList storage_columns, NamesAndTypesList storage_columns,
const MutationCommands & commands_for_removes) const MutationCommands & commands_for_removes)
{ {
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "COLUMNS FOr NEW PART START:" << storage_columns.toString()); /// In compact parts we read all columns, because they all stored in a
/// single file
if (isCompactPart(source_part)) if (isCompactPart(source_part))
return updated_header.getNamesAndTypesList(); return updated_header.getNamesAndTypesList();
NameSet removed_columns; NameSet removed_columns;
NameToNameMap renamed_columns; NameToNameMap renamed_columns;
for (const auto & command : commands_for_removes) for (const auto & command : commands_for_removes)
@ -1457,17 +1510,12 @@ NamesAndTypesList MergeTreeDataMergerMutator::getColumnsForNewDataPart(
if (command.type == MutationCommand::DROP_COLUMN) if (command.type == MutationCommand::DROP_COLUMN)
removed_columns.insert(command.column_name); removed_columns.insert(command.column_name);
if (command.type == MutationCommand::RENAME_COLUMN) if (command.type == MutationCommand::RENAME_COLUMN)
{
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "RENAME FROM:" << command.column_name << " TO:" << command.rename_to);
renamed_columns.emplace(command.rename_to, command.column_name); renamed_columns.emplace(command.rename_to, command.column_name);
}
} }
Names source_column_names = source_part->getColumns().getNames(); Names source_column_names = source_part->getColumns().getNames();
NameSet source_columns_name_set(source_column_names.begin(), source_column_names.end()); NameSet source_columns_name_set(source_column_names.begin(), source_column_names.end());
NameSet columns_that_was_renamed;
for (auto it = storage_columns.begin(); it != storage_columns.end();) for (auto it = storage_columns.begin(); it != storage_columns.end();)
{ {
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "LOOKING AT:" << it->name);
if (updated_header.has(it->name)) if (updated_header.has(it->name))
{ {
auto updated_type = updated_header.getByName(it->name).type; auto updated_type = updated_header.getByName(it->name).type;
@ -1485,12 +1533,10 @@ NamesAndTypesList MergeTreeDataMergerMutator::getColumnsForNewDataPart(
} }
else else
{ {
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "ERASE:" << it->name);
it = storage_columns.erase(it); it = storage_columns.erase(it);
} }
} }
LOG_DEBUG(&Poco::Logger::get("DEBUG"), "COLUMNS FOr NEW PART FINISH:" << storage_columns.toString());
return storage_columns; return storage_columns;
} }