Almost working drop

This commit is contained in:
alesapin 2020-01-15 16:47:00 +03:00
parent 2abf4bbc95
commit 92648955c4
4 changed files with 32 additions and 19 deletions

View File

@ -399,7 +399,6 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
if (stages.size() == 1) /// First stage only supports filtering and can't update columns.
stages.emplace_back(context);
/// TODO(alesap)
if (command.data_type)
stages.back().column_to_updated.emplace(command.column_name, std::make_shared<ASTIdentifier>(command.column_name));
}

View File

@ -980,20 +980,24 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
static_cast<double>(source_part->bytes_on_disk) / data.getTotalActiveSizeInBytes());
Poco::File(new_part_tmp_path).createDirectories();
BlockInputStreamPtr in = nullptr;
Block updated_header;
if(!std::all_of(commands_for_part.begin(), commands_for_part.end(), [](const auto & cmd) { return cmd.type == MutationCommand::Type::READ && cmd.data_type == nullptr;}))
{
MutationsInterpreter mutations_interpreter(storage_from_source_part, commands_for_part, context_for_reading, true);
in = mutations_interpreter.execute(table_lock_holder);
updated_header = mutations_interpreter.getUpdatedHeader();
}
MutationsInterpreter mutations_interpreter(storage_from_source_part, commands_for_part, context_for_reading, true);
auto in = mutations_interpreter.execute(table_lock_holder);
const auto & updated_header = mutations_interpreter.getUpdatedHeader();
NamesAndTypesList all_columns = data.getColumns().getAllPhysical();
const auto data_settings = data.getSettings();
Block in_header = in->getHeader();
std::cerr << "Mutations header:" << in_header.dumpStructure() << std::endl;
UInt64 watch_prev_elapsed = 0;
MergeStageProgress stage_progress(1.0);
in->setProgressCallback(MergeProgressCallback(merge_entry, watch_prev_elapsed, stage_progress));
if (in)
in->setProgressCallback(MergeProgressCallback(merge_entry, watch_prev_elapsed, stage_progress));
if (updated_header.columns() == all_columns.size())
{
@ -1033,7 +1037,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
/// Checks if columns used in skipping indexes modified.
std::set<MergeTreeIndexPtr> indices_to_recalc;
ASTPtr indices_recalc_expr_list = std::make_shared<ASTExpressionList>();
for (const auto & col : in_header.getNames())
for (const auto & col : updated_header.getNames())
{
for (size_t i = 0; i < data.skip_indices.size(); ++i)
{
@ -1052,8 +1056,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
if (!indices_to_recalc.empty())
{
auto indices_recalc_syntax = SyntaxAnalyzer(context, {}).analyze(
indices_recalc_expr_list, in_header.getNamesAndTypesList());
auto indices_recalc_syntax
= SyntaxAnalyzer(context, {}).analyze(indices_recalc_expr_list, updated_header.getNamesAndTypesList());
auto indices_recalc_expr = ExpressionAnalyzer(
indices_recalc_expr_list,
indices_recalc_syntax, context).getActions(false);
@ -1091,6 +1095,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
files_to_skip.insert(index->getFileName() + mrk_extension);
}
std::unordered_set<String> removed_columns;
/// TODO(alesap) better
for (const auto & part_column : source_part->columns)
{
@ -1106,11 +1111,12 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
if (!found)
{
std::cerr << "REMOVING COLUMN:" << part_column.name << std::endl;
IDataType::StreamCallback callback = [&](const IDataType::SubstreamPath & substream_path)
{
String stream_name = IDataType::getFileNameForStream(part_column.name, substream_path);
files_to_skip.insert(stream_name + ".bin");
files_to_skip.insert(stream_name + mrk_extension);
removed_columns.insert(stream_name + ".bin");
removed_columns.insert(stream_name + mrk_extension);
};
IDataType::SubstreamPath stream_path;
@ -1133,8 +1139,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
merge_entry->columns_written = all_columns.size() - updated_header.columns();
new_data_part->checksums = source_part->checksums;
if (updated_header.columns() != 0)
if (in)
{
std::cerr << "Updated header:" << updated_header.dumpStructure() << std::endl;
IMergedBlockOutputStream::WrittenOffsetColumns unused_written_offsets;
MergedColumnOnlyOutputStream out(
data,
@ -1157,6 +1164,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
{
out.write(block);
std::cerr << "Block readed:" << block.dumpStructure() << std::endl;
std::cerr << "Block rows:" << block.rows() << std::endl;
merge_entry->rows_written += block.rows();
merge_entry->bytes_written_uncompressed += block.bytes();
}
@ -1167,10 +1176,14 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
new_data_part->checksums.add(std::move(changed_checksums));
}
else
{
std::cerr << "Updated header empty\n";
}
for (const String & file_to_skip : files_to_skip)
if (new_data_part->checksums.files.count(file_to_skip))
new_data_part->checksums.files.erase(file_to_skip);
for (const String & removed_file : removed_columns)
if (new_data_part->checksums.files.count(removed_file))
new_data_part->checksums.files.erase(removed_file);
{
/// Write file with checksums.

View File

@ -473,7 +473,7 @@ void MergeTreeReader::performRequiredConversions(Columns & res_columns)
//std::cerr << "Copy block: " << copy_block.dumpStructure() << std::endl;
DB::performRequiredConversions(copy_block, columns, storage.global_context);
std::cerr << "Result copy block: " << copy_block.dumpStructure() << std::endl;
//std::cerr << "Result copy block: " << copy_block.dumpStructure() << std::endl;
/// Move columns from block.
name_and_type = columns.begin();

View File

@ -3394,6 +3394,7 @@ void StorageReplicatedMergeTree::alter(
{
std::cerr << "We have mutation commands:" << maybe_mutation_commands.size() << std::endl;
ReplicatedMergeTreeMutationEntry mutation_entry = mutateImpl(maybe_mutation_commands, query_context);
std::cerr << "Mutation finished\n";
}
}
@ -4465,7 +4466,7 @@ ReplicatedMergeTreeMutationEntry StorageReplicatedMergeTree::mutateImpl(const Mu
// replicas.push_back(replica_path);
waitMutationToFinishOnReplicas(replicas, entry.znode_name);
//}
//}
return entry;
}