Better working on files names

This commit is contained in:
alesapin 2020-01-17 16:54:22 +03:00
parent d39f1a0fce
commit 58b9e73a7a
11 changed files with 170 additions and 62 deletions

View File

@ -165,4 +165,13 @@ bool NamesAndTypesList::contains(const String & name) const
return false;
}
std::optional<NameAndTypePair> NamesAndTypesList::tryGetByName(const std::string & name) const
{
for (const NameAndTypePair & column : *this)
{
if (column.name == name)
return column;
}
return {};
}
}

View File

@ -74,6 +74,8 @@ public:
NamesAndTypesList addTypes(const Names & names) const;
bool contains(const String & name) const;
std::optional<NameAndTypePair> tryGetByName(const std::string & name) const;
};
}

View File

@ -392,15 +392,14 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
const auto required_columns = syntax_result->requiredSourceColumns();
affected_indices_columns.insert(std::cbegin(required_columns), std::cend(required_columns));
}
else if (command.type == MutationCommand::READ)
else if (command.type == MutationCommand::READ_COLUMN)
{
if (stages.empty() || !stages.back().column_to_updated.empty())
stages.emplace_back(context);
if (stages.size() == 1) /// First stage only supports filtering and can't update columns.
stages.emplace_back(context);
if (command.data_type)
stages.back().column_to_updated.emplace(command.column_name, std::make_shared<ASTIdentifier>(command.column_name));
stages.back().column_to_updated.emplace(command.column_name, std::make_shared<ASTIdentifier>(command.column_name));
}
else
throw Exception("Unknown mutation command type: " + DB::toString<int>(command.type), ErrorCodes::UNKNOWN_MUTATION_COMMAND);

View File

@ -540,10 +540,26 @@ std::optional<MutationCommand> AlterCommand::tryConvertToMutationCommand(const S
MutationCommand result;
result.type = MutationCommand::Type::READ;
result.column_name = column_name;
result.data_type = data_type;
result.predicate = nullptr;
if (type == MODIFY_COLUMN)
{
result.type = MutationCommand::Type::READ_COLUMN;
result.column_name = column_name;
result.data_type = data_type;
result.predicate = nullptr;
}
else if (type == DROP_COLUMN)
{
result.type = MutationCommand::Type::DROP_COLUMN;
result.column_name = column_name;
result.predicate = nullptr;
}
else if (type == DROP_INDEX)
{
result.type = MutationCommand::Type::DROP_INDEX;
result.column_name = column_name;
result.predicate = nullptr;
}
result.ast = ast->clone();
return result;
}

View File

@ -982,14 +982,19 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
Poco::File(new_part_tmp_path).createDirectories();
BlockInputStreamPtr in = nullptr;
Block updated_header;
std::optional<MutationsInterpreter> interpreter;
if(!std::all_of(commands_for_part.begin(), commands_for_part.end(), [](const auto & cmd) { return cmd.type == MutationCommand::Type::READ && cmd.data_type == nullptr;}))
{
MutationsInterpreter mutations_interpreter(storage_from_source_part, commands_for_part, context_for_reading, true);
in = mutations_interpreter.execute(table_lock_holder);
updated_header = mutations_interpreter.getUpdatedHeader();
}
std::vector<MutationCommand> for_interpreter;
std::vector<MutationCommand> for_file_renames;
splitMutationCommands(source_part, commands_for_part, for_interpreter, for_file_renames);
if (!for_interpreter.empty())
{
interpreter.emplace(storage_from_source_part, for_interpreter, context_for_reading, true);
in = interpreter->execute(table_lock_holder);
updated_header = interpreter->getUpdatedHeader();
}
NamesAndTypesList all_columns = data.getColumns().getAllPhysical();
const auto data_settings = data.getSettings();
@ -1107,43 +1112,38 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
}
std::unordered_set<String> removed_columns;
/// TODO(alesap) better
for (const auto & part_column : source_part->columns)
std::unordered_set<String> remove_files;
/// Remove old indices
for (const auto & command : for_file_renames)
{
bool found = false;
for (const auto & all_column : all_columns)
if (command.type == MutationCommand::Type::DROP_INDEX)
{
if (part_column.name == all_column.name)
{
found = true;
break;
}
remove_files.emplace("skp_idx_" + command.column_name + ".idx");
remove_files.emplace("skp_idx_" + command.column_name + mrk_extension);
}
if (!found)
else if (command.type == MutationCommand::Type::DROP_COLUMN)
{
std::cerr << "REMOVING COLUMN:" << part_column.name << std::endl;
IDataType::StreamCallback callback = [&](const IDataType::SubstreamPath & substream_path)
{
String stream_name = IDataType::getFileNameForStream(part_column.name, substream_path);
IDataType::StreamCallback callback = [&](const IDataType::SubstreamPath & substream_path) {
String stream_name = IDataType::getFileNameForStream(command.column_name, substream_path);
/// Delete files if they are no longer shared with another column.
if (--stream_counts[stream_name] == 0)
{
removed_columns.insert(stream_name + ".bin");
removed_columns.insert(stream_name + mrk_extension);
remove_files.emplace(stream_name + ".bin");
remove_files.emplace(stream_name + mrk_extension);
}
};
IDataType::SubstreamPath stream_path;
part_column.type->enumerateStreams(callback, stream_path);
auto column = source_part->columns.tryGetByName(command.column_name);
if (column)
column->type->enumerateStreams(callback, stream_path);
}
}
Poco::DirectoryIterator dir_end;
for (Poco::DirectoryIterator dir_it(source_part->getFullPath()); dir_it != dir_end; ++dir_it)
{
if (files_to_skip.count(dir_it.name()) || removed_columns.count(dir_it.name()))
if (files_to_skip.count(dir_it.name()) || remove_files.count(dir_it.name()))
continue;
Poco::Path destination(new_part_tmp_path);
@ -1198,7 +1198,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
std::cerr << "Updated header empty\n";
}
for (const String & removed_file : removed_columns)
for (const String & removed_file : remove_files)
if (new_data_part->checksums.files.count(removed_file))
new_data_part->checksums.files.erase(removed_file);
@ -1340,4 +1340,34 @@ size_t MergeTreeDataMergerMutator::estimateNeededDiskSpace(const MergeTreeData::
return static_cast<size_t>(res * DISK_USAGE_COEFFICIENT_TO_RESERVE);
}
void MergeTreeDataMergerMutator::splitMutationCommands(
MergeTreeData::DataPartPtr part,
const std::vector<MutationCommand> & commands,
std::vector<MutationCommand> & for_interpreter,
std::vector<MutationCommand> & for_file_renames)
{
for (const auto & command : commands)
{
if (command.type == MutationCommand::Type::DELETE
|| command.type == MutationCommand::Type::UPDATE
|| command.type == MutationCommand::Type::MATERIALIZE_INDEX)
{
for_interpreter.push_back(command);
}
else if (command.type == MutationCommand::Type::READ_COLUMN)
{
/// If we don't have this column in source part, than we don't
/// need to materialize it
if (part->columns.contains(command.column_name))
for_interpreter.push_back(command);
else
for_file_renames.push_back(command);
}
else
{
for_file_renames.push_back(command);
}
}
}
}

View File

@ -114,6 +114,11 @@ public:
const MergeTreeData::DataPartsVector & parts,
MergeTreeData::Transaction * out_transaction = nullptr);
void splitMutationCommands(
MergeTreeData::DataPartPtr part,
const std::vector<MutationCommand> & commands,
std::vector<MutationCommand> & for_interpreter,
std::vector<MutationCommand> & for_file_renames);
/// The approximate amount of disk space needed for merge or mutation. With a surplus.
static size_t estimateNeededDiskSpace(const MergeTreeData::DataPartsVector & source_parts);

View File

@ -21,7 +21,7 @@ namespace ErrorCodes
extern const int MULTIPLE_ASSIGNMENTS_TO_COLUMN;
}
std::optional<MutationCommand> MutationCommand::parse(ASTAlterCommand * command, bool parse_modify)
std::optional<MutationCommand> MutationCommand::parse(ASTAlterCommand * command, bool from_zookeeper)
{
if (command->type == ASTAlterCommand::DELETE)
{
@ -57,24 +57,32 @@ std::optional<MutationCommand> MutationCommand::parse(ASTAlterCommand * command,
res.index_name = command->index->as<ASTIdentifier &>().name;
return res;
}
else if (parse_modify && command->type == ASTAlterCommand::MODIFY_COLUMN)
else if (from_zookeeper && command->type == ASTAlterCommand::MODIFY_COLUMN)
{
MutationCommand res;
res.ast = command->ptr();
res.type = MutationCommand::Type::READ;
res.type = MutationCommand::Type::READ_COLUMN;
const auto & ast_col_decl = command->col_decl->as<ASTColumnDeclaration &>();
res.column_name = ast_col_decl.name;
res.data_type = DataTypeFactory::instance().get(ast_col_decl.type);
return res;
}
else if (parse_modify && command->type == ASTAlterCommand::DROP_COLUMN)
else if (from_zookeeper && command->type == ASTAlterCommand::DROP_COLUMN)
{
MutationCommand res;
res.ast = command->ptr();
res.type = MutationCommand::Type::READ;
res.type = MutationCommand::Type::DROP_COLUMN;
res.column_name = getIdentifierName(command->column);
return res;
}
else if (from_zookeeper && command->type == ASTAlterCommand::DROP_INDEX)
{
MutationCommand res;
res.ast = command->ptr();
res.type = MutationCommand::Type::DROP_INDEX;
res.column_name = command->index->as<ASTIdentifier &>().name;
return res;
}
else
return {};
}

View File

@ -27,7 +27,9 @@ struct MutationCommand
DELETE,
UPDATE,
MATERIALIZE_INDEX,
READ
READ_COLUMN,
DROP_COLUMN,
DROP_INDEX,
};
Type type = EMPTY;
@ -42,11 +44,12 @@ struct MutationCommand
String index_name;
ASTPtr partition;
/// For cast
/// For reads, drops and etc.
String column_name;
DataTypePtr data_type;
static std::optional<MutationCommand> parse(ASTAlterCommand * command, bool parse_modify=false);
/// If from_zookeeper, than consider more Alter commands as mutation commands
static std::optional<MutationCommand> parse(ASTAlterCommand * command, bool from_zookeeper=false);
};
/// Multiple mutation commands, possible from different ALTER queries

View File

@ -979,7 +979,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(LogEntry & entry)
}
else if (entry.type == LogEntry::FINISH_ALTER)
{
tryFinishAlter(entry);
executeMetadataAlter(entry);
}
else
{
@ -1156,7 +1156,7 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
}
bool StorageReplicatedMergeTree::tryFinishAlter(const StorageReplicatedMergeTree::LogEntry & /*entry*/)
bool StorageReplicatedMergeTree::executeMetadataAlter(const StorageReplicatedMergeTree::LogEntry & /*entry*/)
{
std::cerr << "Trying to finish alter\n";
auto zookeeper = getZooKeeper();
@ -1179,15 +1179,21 @@ bool StorageReplicatedMergeTree::tryFinishAlter(const StorageReplicatedMergeTree
const bool changed_columns_version = (columns_version_zk != this->columns_version);
const bool changed_metadata_version = (metadata_version_zk != this->metadata_version);
std::cerr << "Versions changed: columns" << changed_columns_version << " metadata:" << changed_metadata_version << std::endl;
std::cerr << "Versions changed: columns:" << changed_columns_version << " metadata:" << changed_metadata_version << std::endl;
if (!(changed_columns_version || changed_metadata_version))
{
std::cerr << "Nothing changed\n";
return true;
}
std::cerr << "Receiving metadata from zookeeper\n";
auto columns_in_zk = ColumnsDescription::parse(columns_str);
auto metadata_in_zk = ReplicatedMergeTreeTableMetadata::parse(metadata_str);
auto metadata_diff = ReplicatedMergeTreeTableMetadata(*this).checkAndFindDiff(metadata_in_zk, /* allow_alter = */ true);
std::cerr << "Metadata received\n";
MergeTreeData::DataParts parts;
/// If metadata nodes have changed, we will update table structure locally.
@ -1213,16 +1219,29 @@ bool StorageReplicatedMergeTree::tryFinishAlter(const StorageReplicatedMergeTree
LOG_INFO(log, "Applied changes to the metadata of the table.");
}
this->columns_version = columns_version_zk;
this->metadata_version = metadata_version_zk;
std::cerr << "Columns version before:" << columns_version << std::endl;
std::cerr << "Columns version after:" << columns_version_zk << std::endl;
columns_version = columns_version_zk;
metadata_version = metadata_version_zk;
std::cerr << "Recalculating columns sizes\n";
recalculateColumnSizes();
/// Update metadata ZK nodes for a specific replica.
if (changed_columns_version)
{
zookeeper->set(replica_path + "/columns", columns_str);
}
else
{
}
if (changed_metadata_version)
{
zookeeper->set(replica_path + "/metadata", metadata_str);
}
std::cerr << "Nodes in zk updated\n";
}
std::cerr << "Done\n";
return true;
}
@ -3380,7 +3399,7 @@ void StorageReplicatedMergeTree::alter(
entry.type = LogEntry::FINISH_ALTER;
entry.source_replica = replica_name;
std::cerr << " Columns before mutation:" << getColumns().getAllPhysical().toString() << std::endl;
//std::cerr << " Columns before mutation:" << getColumns().getAllPhysical().toString() << std::endl;
entry.new_part_name = "";
entry.create_time = time(nullptr);
@ -3388,14 +3407,24 @@ void StorageReplicatedMergeTree::alter(
String path_created = getZooKeeper()->create(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential);
entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1);
waitForAllReplicasToProcessLogEntry(entry);
std::cerr << "Waiting for replicas\n";
auto unwaited = waitForAllReplicasToProcessLogEntry(entry, false);
std::cerr << "Replicas done";
if (!maybe_mutation_commands.empty())
{
std::cerr << "We have mutation commands:" << maybe_mutation_commands.size() << std::endl;
ReplicatedMergeTreeMutationEntry mutation_entry = mutateImpl(maybe_mutation_commands, query_context);
Context copy_context = query_context;
copy_context.getSettingsRef().mutations_sync = 2;
ReplicatedMergeTreeMutationEntry mutation_entry = mutateImpl(maybe_mutation_commands, copy_context);
std::cerr << "Mutation finished\n";
}
if (!unwaited.empty())
{
throw Exception("Some replicas doesn't finish alter", ErrorCodes::UNFINISHED);
}
}
void StorageReplicatedMergeTree::alterPartition(const ASTPtr & query, const PartitionCommands & commands, const Context & query_context)
@ -3782,21 +3811,27 @@ StorageReplicatedMergeTree::allocateBlockNumber(
}
void StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry(const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active)
Strings StorageReplicatedMergeTree::waitForAllReplicasToProcessLogEntry(const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active)
{
LOG_DEBUG(log, "Waiting for all replicas to process " << entry.znode_name);
auto zookeeper = getZooKeeper();
Strings replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
Strings unwaited;
for (const String & replica : replicas)
{
if (wait_for_non_active || zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active"))
{
waitForReplicaToProcessLogEntry(replica, entry);
}
else
{
unwaited.push_back(replica);
}
}
LOG_DEBUG(log, "Finished waiting for all replicas to process " << entry.znode_name);
return unwaited;
}
@ -4352,7 +4387,7 @@ void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, const
mutateImpl(commands, query_context);
}
ReplicatedMergeTreeMutationEntry StorageReplicatedMergeTree::mutateImpl(const MutationCommands & commands, const Context & /*query_context*/)
ReplicatedMergeTreeMutationEntry StorageReplicatedMergeTree::mutateImpl(const MutationCommands & commands, const Context & query_context)
{
/// Overview of the mutation algorithm.
///
@ -4457,16 +4492,17 @@ ReplicatedMergeTreeMutationEntry StorageReplicatedMergeTree::mutateImpl(const Mu
}
/// we have to wait
//if (query_context.getSettingsRef().mutations_sync != 0)
//{
if (query_context.getSettingsRef().mutations_sync != 0)
{
Strings replicas;
//if (query_context.getSettingsRef().mutations_sync == ) /// wait for all replicas
replicas = getZooKeeper()->getChildren(zookeeper_path + "/replicas");
//else if (query_context.getSettingsRef().mutations_sync == 1) /// just wait for ourself
// replicas.push_back(replica_path);
if (query_context.getSettingsRef().mutations_sync == 2) /// wait for all replicas
replicas = getZooKeeper()->getChildren(zookeeper_path + "/replicas");
else if (query_context.getSettingsRef().mutations_sync == 1) /// just wait for ourself
replicas.push_back(replica_path);
std::cerr << "Waiting for mutation on replicas:" << replicas.size() << std::endl;
waitMutationToFinishOnReplicas(replicas, entry.znode_name);
//}
}
return entry;
}

View File

@ -380,7 +380,7 @@ private:
/// Do the merge or recommend to make the fetch instead of the merge
bool tryExecuteMerge(const LogEntry & entry);
bool tryFinishAlter(const LogEntry & entry);
bool executeMetadataAlter(const LogEntry & entry);
bool tryExecutePartMutation(const LogEntry & entry);
@ -489,7 +489,7 @@ private:
* Because it effectively waits for other thread that usually has to also acquire a lock to proceed and this yields deadlock.
* TODO: There are wrong usages of this method that are not fixed yet.
*/
void waitForAllReplicasToProcessLogEntry(const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active = true);
Strings waitForAllReplicasToProcessLogEntry(const ReplicatedMergeTreeLogEntryData & entry, bool wait_for_non_active = true);
/** Wait until the specified replica executes the specified action from the log.
* NOTE: See comment about locks above.

View File

@ -69,4 +69,4 @@ ALTER TABLE nested_alter DROP COLUMN `n.d`;
SELECT * FROM nested_alter;
--DROP TABLE nested_alter;
DROP TABLE nested_alter;