Slightly refactor merger mutator

This commit is contained in:
alesapin 2020-02-17 18:44:13 +03:00
parent b59fd72f34
commit 6a02b99faf
10 changed files with 149 additions and 109 deletions

View File

@ -73,8 +73,10 @@ public:
/// Unlike `filter`, returns columns in the order in which they go in `names`.
NamesAndTypesList addTypes(const Names & names) const;
/// Check that column contains in list
bool contains(const String & name) const;
/// Try to get column by name, return empty optional if column not found
std::optional<NameAndTypePair> tryGetByName(const std::string & name) const;
};

View File

@ -1,7 +1,7 @@
#include <Common/typeid_cast.h>
#include <Functions/FunctionHelpers.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/evaluateMissingDefaults.h>
#include <Interpreters/inplaceBlockConversions.h>
#include <DataStreams/AddingDefaultsBlockInputStream.h>
#include <Columns/ColumnsNumber.h>

View File

@ -1,6 +1,6 @@
#include <DataStreams/TTLBlockInputStream.h>
#include <DataTypes/DataTypeDate.h>
#include <Interpreters/evaluateMissingDefaults.h>
#include <Interpreters/inplaceBlockConversions.h>
#include <Interpreters/SyntaxAnalyzer.h>
#include <Interpreters/ExpressionAnalyzer.h>

View File

@ -4,7 +4,7 @@
#include <DataTypes/NestedUtils.h>
#include <DataTypes/DataTypeArray.h>
#include <Columns/ColumnArray.h>
#include <Interpreters/evaluateMissingDefaults.h>
#include <Interpreters/inplaceBlockConversions.h>
#include <Core/Block.h>
#include <Storages/ColumnDefault.h>

View File

@ -1,4 +1,4 @@
#include "evaluateMissingDefaults.h"
#include "inplaceBlockConversions.h"
#include <Core/Block.h>
#include <Parsers/queryToString.h>

View File

@ -12,6 +12,8 @@ class Context;
class NamesAndTypesList;
struct ColumnDefault;
/// Adds missing defaults to block according to required_columns
/// using column_defaults map
void evaluateMissingDefaults(Block & block,
const NamesAndTypesList & required_columns,
const std::unordered_map<std::string, ColumnDefault> & column_defaults,

View File

@ -54,6 +54,7 @@ namespace DB
namespace ErrorCodes
{
extern const int ABORTED;
extern const int UNKNOWN_MUTATION_COMMAND;
}
@ -984,25 +985,25 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
Block updated_header;
std::optional<MutationsInterpreter> interpreter;
std::vector<MutationCommand> for_interpreter;
std::vector<MutationCommand> for_file_renames;
std::vector<MutationCommand> for_interpreter, for_file_renames;
splitMutationCommands(source_part, commands_for_part, for_interpreter, for_file_renames);
UInt64 watch_prev_elapsed = 0;
MergeStageProgress stage_progress(1.0);
if (!for_interpreter.empty())
{
interpreter.emplace(storage_from_source_part, for_interpreter, context_for_reading, true);
in = interpreter->execute(table_lock_holder);
updated_header = interpreter->getUpdatedHeader();
in->setProgressCallback(MergeProgressCallback(merge_entry, watch_prev_elapsed, stage_progress));
}
NamesAndTypesList all_columns = data.getColumns().getAllPhysical();
const auto data_settings = data.getSettings();
/// Don't change granularity type while mutating subset of columns
String mrk_extension = source_part->index_granularity_info.is_adaptive ? getAdaptiveMrkExtension() : getNonAdaptiveMrkExtension();
UInt64 watch_prev_elapsed = 0;
MergeStageProgress stage_progress(1.0);
if (in)
in->setProgressCallback(MergeProgressCallback(merge_entry, watch_prev_elapsed, stage_progress));
if (updated_header.columns() == all_columns.size())
{
@ -1076,76 +1077,14 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
std::make_shared<ExpressionBlockInputStream>(in, indices_recalc_expr));
}
NameSet files_to_skip = {"checksums.txt", "columns.txt"};
/// Don't change granularity type while mutating subset of columns
auto mrk_extension = source_part->index_granularity_info.is_adaptive ? getAdaptiveMrkExtension() : getNonAdaptiveMrkExtension();
/// Skip updated files
for (const auto & entry : updated_header)
{
IDataType::StreamCallback callback = [&](const IDataType::SubstreamPath & substream_path)
{
String stream_name = IDataType::getFileNameForStream(entry.name, substream_path);
files_to_skip.insert(stream_name + ".bin");
files_to_skip.insert(stream_name + mrk_extension);
};
IDataType::SubstreamPath stream_path;
entry.type->enumerateStreams(callback, stream_path);
}
for (const auto & index : indices_to_recalc)
{
files_to_skip.insert(index->getFileName() + ".idx");
files_to_skip.insert(index->getFileName() + mrk_extension);
}
/// Collect counts for shared streams of different columns. As an example, Nested columns have shared stream with array sizes.
std::map<String, size_t> stream_counts;
for (const NameAndTypePair & column : source_part->columns)
{
column.type->enumerateStreams(
[&](const IDataType::SubstreamPath & substream_path)
{
++stream_counts[IDataType::getFileNameForStream(column.name, substream_path)];
},
{});
}
std::unordered_set<String> remove_files;
/// Remove old indices
for (const auto & command : for_file_renames)
{
if (command.type == MutationCommand::Type::DROP_INDEX)
{
remove_files.emplace("skp_idx_" + command.column_name + ".idx");
remove_files.emplace("skp_idx_" + command.column_name + mrk_extension);
}
else if (command.type == MutationCommand::Type::DROP_COLUMN)
{
IDataType::StreamCallback callback = [&](const IDataType::SubstreamPath & substream_path)
{
String stream_name = IDataType::getFileNameForStream(command.column_name, substream_path);
/// Delete files if they are no longer shared with another column.
if (--stream_counts[stream_name] == 0)
{
remove_files.emplace(stream_name + ".bin");
remove_files.emplace(stream_name + mrk_extension);
}
};
IDataType::SubstreamPath stream_path;
auto column = source_part->columns.tryGetByName(command.column_name);
if (column)
column->type->enumerateStreams(callback, stream_path);
}
}
NameSet files_to_skip = collectFilesToSkip(updated_header, indices_to_recalc, mrk_extension);
NameSet files_to_remove = collectFilesToRemove(source_part, for_file_renames, mrk_extension);
Poco::DirectoryIterator dir_end;
/// Create hardlinks for unchanged files
for (Poco::DirectoryIterator dir_it(source_part->getFullPath()); dir_it != dir_end; ++dir_it)
{
if (files_to_skip.count(dir_it.name()) || remove_files.count(dir_it.name()))
if (files_to_skip.count(dir_it.name()) || files_to_remove.count(dir_it.name()))
continue;
Poco::Path destination(new_part_tmp_path);
@ -1192,41 +1131,22 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
new_data_part->checksums.add(std::move(changed_checksums));
}
for (const String & removed_file : remove_files)
for (const String & removed_file : files_to_remove)
if (new_data_part->checksums.files.count(removed_file))
new_data_part->checksums.files.erase(removed_file);
{
/// Write file with checksums.
WriteBufferFromFile out_checksums(new_part_tmp_path + "checksums.txt", 4096);
new_data_part->checksums.write(out_checksums);
}
} /// close fd
/// Write the columns list of the resulting part in the same order as all_columns.
new_data_part->columns = all_columns;
Names source_column_names = source_part->columns.getNames();
NameSet source_columns_name_set(source_column_names.begin(), source_column_names.end());
for (auto it = new_data_part->columns.begin(); it != new_data_part->columns.end();)
{
if (updated_header.has(it->name))
{
auto updated_type = updated_header.getByName(it->name).type;
if (updated_type != it->type)
it->type = updated_type;
++it;
}
else if (source_columns_name_set.count(it->name))
{
++it;
}
else
it = new_data_part->columns.erase(it);
}
new_data_part->columns = getColumnsForNewDataPart(source_part, updated_header, all_columns);
{
/// Write a file with a description of columns.
WriteBufferFromFile out_columns(new_part_tmp_path + "columns.txt", 4096);
new_data_part->columns.writeText(out_columns);
}
} /// close
new_data_part->rows_count = source_part->rows_count;
new_data_part->index_granularity = source_part->index_granularity;
@ -1338,7 +1258,7 @@ void MergeTreeDataMergerMutator::splitMutationCommands(
MergeTreeData::DataPartPtr part,
const std::vector<MutationCommand> & commands,
std::vector<MutationCommand> & for_interpreter,
std::vector<MutationCommand> & for_file_renames)
std::vector<MutationCommand> & for_file_renames) const
{
for (const auto & command : commands)
{
@ -1364,4 +1284,103 @@ void MergeTreeDataMergerMutator::splitMutationCommands(
}
}
NameSet MergeTreeDataMergerMutator::collectFilesToRemove(
MergeTreeData::DataPartPtr source_part, const std::vector<MutationCommand> & commands_for_removes, const String & mrk_extension) const
{
/// Collect counts for shared streams of different columns. As an example, Nested columns have shared stream with array sizes.
std::map<String, size_t> stream_counts;
for (const NameAndTypePair & column : source_part->columns)
{
column.type->enumerateStreams(
[&](const IDataType::SubstreamPath & substream_path)
{
++stream_counts[IDataType::getFileNameForStream(column.name, substream_path)];
},
{});
}
NameSet remove_files;
/// Remove old indices
for (const auto & command : commands_for_removes)
{
if (command.type == MutationCommand::Type::DROP_INDEX)
{
remove_files.emplace("skp_idx_" + command.column_name + ".idx");
remove_files.emplace("skp_idx_" + command.column_name + mrk_extension);
}
else if (command.type == MutationCommand::Type::DROP_COLUMN)
{
IDataType::StreamCallback callback = [&](const IDataType::SubstreamPath & substream_path)
{
String stream_name = IDataType::getFileNameForStream(command.column_name, substream_path);
/// Delete files if they are no longer shared with another column.
if (--stream_counts[stream_name] == 0)
{
remove_files.emplace(stream_name + ".bin");
remove_files.emplace(stream_name + mrk_extension);
}
};
IDataType::SubstreamPath stream_path;
auto column = source_part->columns.tryGetByName(command.column_name);
if (column)
column->type->enumerateStreams(callback, stream_path);
}
}
return remove_files;
}
NameSet MergeTreeDataMergerMutator::collectFilesToSkip(
const Block & updated_header, const std::set<MergeTreeIndexPtr> & indices_to_recalc, const String & mrk_extension) const
{
NameSet files_to_skip = {"checksums.txt", "columns.txt"};
/// Skip updated files
for (const auto & entry : updated_header)
{
IDataType::StreamCallback callback = [&](const IDataType::SubstreamPath & substream_path) {
String stream_name = IDataType::getFileNameForStream(entry.name, substream_path);
files_to_skip.insert(stream_name + ".bin");
files_to_skip.insert(stream_name + mrk_extension);
};
IDataType::SubstreamPath stream_path;
entry.type->enumerateStreams(callback, stream_path);
}
for (const auto & index : indices_to_recalc)
{
files_to_skip.insert(index->getFileName() + ".idx");
files_to_skip.insert(index->getFileName() + mrk_extension);
}
return files_to_skip;
}
NamesAndTypesList MergeTreeDataMergerMutator::getColumnsForNewDataPart(
MergeTreeData::DataPartPtr source_part, const Block & updated_header, NamesAndTypesList all_columns) const
{
Names source_column_names = source_part->columns.getNames();
NameSet source_columns_name_set(source_column_names.begin(), source_column_names.end());
for (auto it = all_columns.begin(); it != all_columns.end();)
{
if (updated_header.has(it->name))
{
auto updated_type = updated_header.getByName(it->name).type;
if (updated_type != it->type)
it->type = updated_type;
++it;
}
else if (source_columns_name_set.count(it->name))
{
++it;
}
else
it = all_columns.erase(it);
}
return all_columns;
}
}

View File

@ -114,11 +114,6 @@ public:
const MergeTreeData::DataPartsVector & parts,
MergeTreeData::Transaction * out_transaction = nullptr);
void splitMutationCommands(
MergeTreeData::DataPartPtr part,
const std::vector<MutationCommand> & commands,
std::vector<MutationCommand> & for_interpreter,
std::vector<MutationCommand> & for_file_renames);
/// The approximate amount of disk space needed for merge or mutation. With a surplus.
static size_t estimateNeededDiskSpace(const MergeTreeData::DataPartsVector & source_parts);
@ -128,6 +123,29 @@ private:
*/
MergeTreeData::DataPartsVector selectAllPartsFromPartition(const String & partition_id);
/** Split mutation commands into two parts:
* First part should be executed by mutations interpreter.
* Other is just simple drop/renames, so they can be executed without interpreter.
*/
void splitMutationCommands(
MergeTreeData::DataPartPtr part,
const std::vector<MutationCommand> & commands,
std::vector<MutationCommand> & for_interpreter,
std::vector<MutationCommand> & for_file_renames) const;
/// Apply commands to source_part i.e. remove some columns in source_part
/// and return set of files, that have to be removed from filesystem and checksums
NameSet collectFilesToRemove(MergeTreeData::DataPartPtr source_part, const std::vector<MutationCommand> & commands_for_removes, const String & mrk_extension) const;
/// Files, that we don't need to remove and don't need to hardlink, for example columns.txt and checksums.txt.
/// Because we will generate new versions of them after we perform mutation.
NameSet collectFilesToSkip(const Block & updated_header, const std::set<MergeTreeIndexPtr> & indices_to_recalc, const String & mrk_extension) const;
/// Get the columns list of the resulting part in the same order as all_columns.
NamesAndTypesList getColumnsForNewDataPart(MergeTreeData::DataPartPtr source_part, const Block & updated_header, NamesAndTypesList all_columns) const;
public :
/** Is used to cancel all merges and mutations. On cancel() call all currently running actions will throw exception soon.
* All new attempts to start a merge or mutation will throw an exception until all 'LockHolder' objects will be destroyed.

View File

@ -2,7 +2,7 @@
#include <DataTypes/DataTypeArray.h>
#include <Common/escapeForFileName.h>
#include <Columns/ColumnArray.h>
#include <Interpreters/evaluateMissingDefaults.h>
#include <Interpreters/inplaceBlockConversions.h>
#include <Storages/MergeTree/MergeTreeReader.h>
#include <Common/typeid_cast.h>

View File

@ -3369,7 +3369,6 @@ void StorageReplicatedMergeTree::alter(
if (rc == Coordination::ZOK)
{
queue.pullLogsToQueue(zookeeper);
if (alter_entry->have_mutation)
{
/// Record in replication /log