From 2b37a418f2df083ce5b2bba5d0b96fa9fcb564fc Mon Sep 17 00:00:00 2001 From: alesapin Date: Tue, 19 May 2020 13:44:53 +0300 Subject: [PATCH] More clear logic --- .../MergeTree/MergeTreeDataMergerMutator.cpp | 198 +++++++++++------- 1 file changed, 122 insertions(+), 76 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 6610629c159..aa09d6ddf9e 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -1013,11 +1013,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor const auto data_settings = data.getSettings(); MutationCommands for_interpreter, for_file_renames; - LOG_DEBUG(&Poco::Logger::get("DEBUG"), "COMMANDS FOR PART:" << commands_for_part.size()); - LOG_DEBUG(&Poco::Logger::get("DEBUG"), "TOTAL COLUMNS:" << commands.size()); splitMutationCommands(source_part, commands_for_part, for_interpreter, for_file_renames); - LOG_DEBUG(&Poco::Logger::get("DEBUG"), "COMMANDS FOR INTERPRETER:" << for_interpreter.size()); - LOG_DEBUG(&Poco::Logger::get("DEBUG"), "COMMANDS FOR FILE RENAMES:" << for_file_renames.size()); UInt64 watch_prev_elapsed = 0; MergeStageProgress stage_progress(1.0); @@ -1030,9 +1026,6 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor in = interpreter->execute(table_lock_holder); updated_header = interpreter->getUpdatedHeader(); in->setProgressCallback(MergeProgressCallback(merge_entry, watch_prev_elapsed, stage_progress)); - - LOG_DEBUG(&Poco::Logger::get("DEBUG"), "UPDATED HEADER:" << updated_header.dumpStructure()); - LOG_DEBUG(&Poco::Logger::get("DEBUG"), "STREAM HEADER:" << in->getHeader().dumpStructure()); } auto new_data_part = data.createPart( @@ -1262,85 +1255,143 @@ void MergeTreeDataMergerMutator::splitMutationCommands( MutationCommands & for_interpreter, MutationCommands & for_file_renames) { - NameSet removed_columns_from_compact_part; - NameSet already_changed_columns; - bool is_compact_part = isCompactPart(part); ColumnsDescription part_columns(part->getColumns()); - for (const auto & command : commands) + if (isCompactPart(part)) { - if (command.type == MutationCommand::Type::DELETE - || command.type == MutationCommand::Type::UPDATE - || command.type == MutationCommand::Type::MATERIALIZE_INDEX - || command.type == MutationCommand::Type::MATERIALIZE_TTL) + NameSet mutated_columns; + for (const auto & command : commands) { - for_interpreter.push_back(command); - for (const auto & [column_name, expr] : command.column_to_update_expression) - already_changed_columns.emplace(column_name); - } - else if (command.type == MutationCommand::Type::READ_COLUMN && part_columns.has(command.column_name)) - { - LOG_DEBUG(&Poco::Logger::get("DEBUG"), "READ COLUMN:" << command.column_name); - /// If we don't have this column in source part, than we don't - /// need to materialize it - if (part_columns.has(command.column_name)) + if (command.type == MutationCommand::Type::MATERIALIZE_INDEX || command.type == MutationCommand::Type::MATERIALIZE_TTL + || command.type == MutationCommand::Type::DELETE || command.type == MutationCommand::Type::UPDATE) { - LOG_DEBUG(&Poco::Logger::get("DEBUG"), "PART HAS COLUMN:" << command.column_name); for_interpreter.push_back(command); - already_changed_columns.emplace(command.column_name); + for (const auto & [column_name, expr] : command.column_to_update_expression) + mutated_columns.emplace(column_name); } - else - for_file_renames.push_back(command); - - } - else if (is_compact_part && command.type == MutationCommand::Type::DROP_COLUMN && part_columns.has(command.column_name)) - { - removed_columns_from_compact_part.emplace(command.column_name); - for_file_renames.push_back(command); - part_columns.remove(command.column_name); - } - else if (command.type == MutationCommand::Type::RENAME_COLUMN && part_columns.has(command.column_name)) - { - if (is_compact_part) + else if (part_columns.has(command.column_name)) { - for_interpreter.push_back( + if (command.type == MutationCommand::Type::DROP_COLUMN) { - .type = MutationCommand::Type::READ_COLUMN, - .column_name = command.rename_to, - }); - already_changed_columns.emplace(command.column_name); - part_columns.rename(command.column_name, command.rename_to); - } - else - { - part_columns.rename(command.column_name, command.rename_to); - for_file_renames.push_back(command); + mutated_columns.emplace(command.column_name); + } + else if (command.type == MutationCommand::Type::RENAME_COLUMN) + { + for_interpreter.push_back({ + .type = MutationCommand::Type::READ_COLUMN, + .column_name = command.rename_to, + }); + mutated_columns.emplace(command.column_name); + part_columns.rename(command.column_name, command.rename_to); + } } } - else if (part_columns.has(command.column_name)) - { - for_file_renames.push_back(command); - } - } - - if (is_compact_part) - { - /// If it's compact part than we don't need to actually remove files from disk - /// we just don't read dropped columns + /// If it's compact part than we don't need to actually remove files + /// from disk we just don't read dropped columns for (const auto & column : part->getColumns()) { - if (!removed_columns_from_compact_part.count(column.name) - && !already_changed_columns.count(column.name)) + if (!mutated_columns.count(column.name)) + for_interpreter.emplace_back( + MutationCommand{.type = MutationCommand::Type::READ_COLUMN, .column_name = column.name, .data_type = column.type}); + } + } + else + { + for (const auto & command : commands) + { + if (command.type == MutationCommand::Type::MATERIALIZE_INDEX || command.type == MutationCommand::Type::MATERIALIZE_TTL + || command.type == MutationCommand::Type::DELETE || command.type == MutationCommand::Type::UPDATE) { - for_interpreter.emplace_back(MutationCommand + for_interpreter.push_back(command); + } + /// If we don't have this column in source part, than we don't need + /// to materialize it + else if (part_columns.has(command.column_name)) + { + if (command.type == MutationCommand::Type::READ_COLUMN) { - .type = MutationCommand::Type::READ_COLUMN, - .column_name = column.name, - .data_type = column.type - }); + for_interpreter.push_back(command); + } + else if (command.type == MutationCommand::Type::RENAME_COLUMN) + { + part_columns.rename(command.column_name, command.rename_to); + for_file_renames.push_back(command); + } + else + { + for_file_renames.push_back(command); + } } } } + //for (const auto & command : commands) + //{ + // if (command.type == MutationCommand::Type::MATERIALIZE_INDEX || command.type == MutationCommand::Type::MATERIALIZE_TTL) + // { + // for_interpreter.push_back(command); + // for (const auto & [column_name, expr] : command.column_to_update_expression) + // already_changed_columns.emplace(column_name); + // } + // if (command.type == MutationCommand::Type::DELETE + // || command.type == MutationCommand::Type::UPDATE + // || + // || ) + // { + // } + // else if (command.type == MutationCommand::Type::READ_COLUMN && ) + // { + // if (part_columns.has(command.column_name)) + // { + // for_interpreter.push_back(command); + // already_changed_columns.emplace(command.column_name); + // } + // else + // for_file_renames.push_back(command); + + // } + // else if (is_compact_part && command.type == MutationCommand::Type::DROP_COLUMN && part_columns.has(command.column_name)) + // { + // for_file_renames.push_back(command); + // part_columns.remove(command.column_name); + // } + // else if (command.type == MutationCommand::Type::RENAME_COLUMN && part_columns.has(command.column_name)) + // { + // if (is_compact_part) + // { + // for_interpreter.push_back( + // { + // .type = MutationCommand::Type::READ_COLUMN, + // .column_name = command.rename_to, + // }); + // already_changed_columns.emplace(command.column_name); + // part_columns.rename(command.column_name, command.rename_to); + // } + // else + // { + // } + // } + // else if (part_columns.has(command.column_name)) + // { + // for_file_renames.push_back(command); + // } + //} + + //if (is_compact_part) + //{ + // for (const auto & column : part->getColumns()) + // { + // if (!removed_columns_from_compact_part.count(column.name) + // && !already_changed_columns.count(column.name)) + // { + // for_interpreter.emplace_back(MutationCommand + // { + // .type = MutationCommand::Type::READ_COLUMN, + // .column_name = column.name, + // .data_type = column.type + // }); + // } + // } + //} } @@ -1447,9 +1498,11 @@ NamesAndTypesList MergeTreeDataMergerMutator::getColumnsForNewDataPart( NamesAndTypesList storage_columns, const MutationCommands & commands_for_removes) { - LOG_DEBUG(&Poco::Logger::get("DEBUG"), "COLUMNS FOr NEW PART START:" << storage_columns.toString()); + /// In compact parts we read all columns, because they all stored in a + /// single file if (isCompactPart(source_part)) return updated_header.getNamesAndTypesList(); + NameSet removed_columns; NameToNameMap renamed_columns; for (const auto & command : commands_for_removes) @@ -1457,17 +1510,12 @@ NamesAndTypesList MergeTreeDataMergerMutator::getColumnsForNewDataPart( if (command.type == MutationCommand::DROP_COLUMN) removed_columns.insert(command.column_name); if (command.type == MutationCommand::RENAME_COLUMN) - { - LOG_DEBUG(&Poco::Logger::get("DEBUG"), "RENAME FROM:" << command.column_name << " TO:" << command.rename_to); renamed_columns.emplace(command.rename_to, command.column_name); - } } Names source_column_names = source_part->getColumns().getNames(); NameSet source_columns_name_set(source_column_names.begin(), source_column_names.end()); - NameSet columns_that_was_renamed; for (auto it = storage_columns.begin(); it != storage_columns.end();) { - LOG_DEBUG(&Poco::Logger::get("DEBUG"), "LOOKING AT:" << it->name); if (updated_header.has(it->name)) { auto updated_type = updated_header.getByName(it->name).type; @@ -1485,12 +1533,10 @@ NamesAndTypesList MergeTreeDataMergerMutator::getColumnsForNewDataPart( } else { - LOG_DEBUG(&Poco::Logger::get("DEBUG"), "ERASE:" << it->name); it = storage_columns.erase(it); } } - LOG_DEBUG(&Poco::Logger::get("DEBUG"), "COLUMNS FOr NEW PART FINISH:" << storage_columns.toString()); return storage_columns; }