From 92648955c4b2d69319a849d03f81c79037d094ff Mon Sep 17 00:00:00 2001 From: alesapin Date: Wed, 15 Jan 2020 16:47:00 +0300 Subject: [PATCH] Almost working drop --- .../src/Interpreters/MutationsInterpreter.cpp | 1 - .../MergeTree/MergeTreeDataMergerMutator.cpp | 45 ++++++++++++------- .../Storages/MergeTree/MergeTreeReader.cpp | 2 +- .../Storages/StorageReplicatedMergeTree.cpp | 3 +- 4 files changed, 32 insertions(+), 19 deletions(-) diff --git a/dbms/src/Interpreters/MutationsInterpreter.cpp b/dbms/src/Interpreters/MutationsInterpreter.cpp index 5ae7a8996d4..5cfcd3f0504 100644 --- a/dbms/src/Interpreters/MutationsInterpreter.cpp +++ b/dbms/src/Interpreters/MutationsInterpreter.cpp @@ -399,7 +399,6 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run) if (stages.size() == 1) /// First stage only supports filtering and can't update columns. stages.emplace_back(context); - /// TODO(alesap) if (command.data_type) stages.back().column_to_updated.emplace(command.column_name, std::make_shared(command.column_name)); } diff --git a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 6e8756b1c81..50387bb4c32 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -980,20 +980,24 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor static_cast(source_part->bytes_on_disk) / data.getTotalActiveSizeInBytes()); Poco::File(new_part_tmp_path).createDirectories(); + BlockInputStreamPtr in = nullptr; + Block updated_header; + + if(!std::all_of(commands_for_part.begin(), commands_for_part.end(), [](const auto & cmd) { return cmd.type == MutationCommand::Type::READ && cmd.data_type == nullptr;})) + { + MutationsInterpreter mutations_interpreter(storage_from_source_part, commands_for_part, context_for_reading, true); + in = mutations_interpreter.execute(table_lock_holder); + updated_header = mutations_interpreter.getUpdatedHeader(); + } - MutationsInterpreter mutations_interpreter(storage_from_source_part, commands_for_part, context_for_reading, true); - auto in = mutations_interpreter.execute(table_lock_holder); - const auto & updated_header = mutations_interpreter.getUpdatedHeader(); NamesAndTypesList all_columns = data.getColumns().getAllPhysical(); const auto data_settings = data.getSettings(); - Block in_header = in->getHeader(); - std::cerr << "Mutations header:" << in_header.dumpStructure() << std::endl; - UInt64 watch_prev_elapsed = 0; MergeStageProgress stage_progress(1.0); - in->setProgressCallback(MergeProgressCallback(merge_entry, watch_prev_elapsed, stage_progress)); + if (in) + in->setProgressCallback(MergeProgressCallback(merge_entry, watch_prev_elapsed, stage_progress)); if (updated_header.columns() == all_columns.size()) { @@ -1033,7 +1037,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor /// Checks if columns used in skipping indexes modified. std::set indices_to_recalc; ASTPtr indices_recalc_expr_list = std::make_shared(); - for (const auto & col : in_header.getNames()) + for (const auto & col : updated_header.getNames()) { for (size_t i = 0; i < data.skip_indices.size(); ++i) { @@ -1052,8 +1056,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor if (!indices_to_recalc.empty()) { - auto indices_recalc_syntax = SyntaxAnalyzer(context, {}).analyze( - indices_recalc_expr_list, in_header.getNamesAndTypesList()); + auto indices_recalc_syntax + = SyntaxAnalyzer(context, {}).analyze(indices_recalc_expr_list, updated_header.getNamesAndTypesList()); auto indices_recalc_expr = ExpressionAnalyzer( indices_recalc_expr_list, indices_recalc_syntax, context).getActions(false); @@ -1091,6 +1095,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor files_to_skip.insert(index->getFileName() + mrk_extension); } + std::unordered_set removed_columns; /// TODO(alesap) better for (const auto & part_column : source_part->columns) { @@ -1106,11 +1111,12 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor if (!found) { std::cerr << "REMOVING COLUMN:" << part_column.name << std::endl; + IDataType::StreamCallback callback = [&](const IDataType::SubstreamPath & substream_path) { String stream_name = IDataType::getFileNameForStream(part_column.name, substream_path); - files_to_skip.insert(stream_name + ".bin"); - files_to_skip.insert(stream_name + mrk_extension); + removed_columns.insert(stream_name + ".bin"); + removed_columns.insert(stream_name + mrk_extension); }; IDataType::SubstreamPath stream_path; @@ -1133,8 +1139,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor merge_entry->columns_written = all_columns.size() - updated_header.columns(); new_data_part->checksums = source_part->checksums; - if (updated_header.columns() != 0) + if (in) { + std::cerr << "Updated header:" << updated_header.dumpStructure() << std::endl; IMergedBlockOutputStream::WrittenOffsetColumns unused_written_offsets; MergedColumnOnlyOutputStream out( data, @@ -1157,6 +1164,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor { out.write(block); + std::cerr << "Block readed:" << block.dumpStructure() << std::endl; + std::cerr << "Block rows:" << block.rows() << std::endl; merge_entry->rows_written += block.rows(); merge_entry->bytes_written_uncompressed += block.bytes(); } @@ -1167,10 +1176,14 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor new_data_part->checksums.add(std::move(changed_checksums)); } + else + { + std::cerr << "Updated header empty\n"; + } - for (const String & file_to_skip : files_to_skip) - if (new_data_part->checksums.files.count(file_to_skip)) - new_data_part->checksums.files.erase(file_to_skip); + for (const String & removed_file : removed_columns) + if (new_data_part->checksums.files.count(removed_file)) + new_data_part->checksums.files.erase(removed_file); { /// Write file with checksums. diff --git a/dbms/src/Storages/MergeTree/MergeTreeReader.cpp b/dbms/src/Storages/MergeTree/MergeTreeReader.cpp index f3dcf3adafc..45e7a6e8642 100644 --- a/dbms/src/Storages/MergeTree/MergeTreeReader.cpp +++ b/dbms/src/Storages/MergeTree/MergeTreeReader.cpp @@ -473,7 +473,7 @@ void MergeTreeReader::performRequiredConversions(Columns & res_columns) //std::cerr << "Copy block: " << copy_block.dumpStructure() << std::endl; DB::performRequiredConversions(copy_block, columns, storage.global_context); - std::cerr << "Result copy block: " << copy_block.dumpStructure() << std::endl; + //std::cerr << "Result copy block: " << copy_block.dumpStructure() << std::endl; /// Move columns from block. name_and_type = columns.begin(); diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 15318aecefb..fbc0e9458ab 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -3394,6 +3394,7 @@ void StorageReplicatedMergeTree::alter( { std::cerr << "We have mutation commands:" << maybe_mutation_commands.size() << std::endl; ReplicatedMergeTreeMutationEntry mutation_entry = mutateImpl(maybe_mutation_commands, query_context); + std::cerr << "Mutation finished\n"; } } @@ -4465,7 +4466,7 @@ ReplicatedMergeTreeMutationEntry StorageReplicatedMergeTree::mutateImpl(const Mu // replicas.push_back(replica_path); waitMutationToFinishOnReplicas(replicas, entry.znode_name); - //} + //} return entry; }