Broke things

This commit is contained in:
alesapin 2020-01-28 20:15:22 +03:00
parent 736711d21c
commit 974e9a6b7a
6 changed files with 57 additions and 46 deletions

View File

@ -65,10 +65,12 @@ void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const
<< new_part_name;
break;
case FINISH_ALTER: /// Just make local /metadata and /columns consistent with global
case ALTER_METADATA: /// Just make local /metadata and /columns consistent with global
out << "alter\n";
out << required_mutation_znode << "\n";
out << "finish\n";
out << "sync_mode\n";
out << alter_sync_mode << "\n";
out << "mutatation_commands\n";
out << mutation_commands << '\n';
break;
default:
@ -160,8 +162,11 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in)
}
else if (type_str == "alter")
{
type = FINISH_ALTER;
in >> required_mutation_znode >> "\nfinish\n";
type = ALTER_METADATA;
in >> "sync_mode\n";
in >> alter_sync_mode;
in >> "mutatation_commands\n";
in >> mutation_commands;
}
//std::cerr << "Read backn\n";

View File

@ -37,7 +37,7 @@ struct ReplicatedMergeTreeLogEntryData
CLEAR_INDEX, /// Drop specific index from specified partition.
REPLACE_RANGE, /// Drop certain range of partitions and replace them by new ones
MUTATE_PART, /// Apply one or several mutations to the part.
FINISH_ALTER, /// Apply one or several alter modifications to part
ALTER_METADATA, /// Apply alter modification according to global /metadata and /columns paths
};
static String typeToString(Type type)
@ -51,7 +51,7 @@ struct ReplicatedMergeTreeLogEntryData
case ReplicatedMergeTreeLogEntryData::CLEAR_INDEX: return "CLEAR_INDEX";
case ReplicatedMergeTreeLogEntryData::REPLACE_RANGE: return "REPLACE_RANGE";
case ReplicatedMergeTreeLogEntryData::MUTATE_PART: return "MUTATE_PART";
case ReplicatedMergeTreeLogEntryData::FINISH_ALTER: return "FINISH_ALTER";
case ReplicatedMergeTreeLogEntryData::ALTER_METADATA: return "ALTER_METADATA";
default:
throw Exception("Unknown log entry type: " + DB::toString<int>(type), ErrorCodes::LOGICAL_ERROR);
}
@ -66,6 +66,7 @@ struct ReplicatedMergeTreeLogEntryData
void readText(ReadBuffer & in);
String toString() const;
/// log-xxx
String znode_name;
Type type = EMPTY;
@ -83,15 +84,12 @@ struct ReplicatedMergeTreeLogEntryData
String index_name;
/// Force filter by TTL in 'OPTIMIZE ... FINAL' query to remove expired values from old parts
/// without TTL infos or with outdated TTL infos, e.g. after 'ALTER ... MODIFY TTL' query.
/// without TTL infos or with outdated TTL infos, e.g. after 'ALTER ... MODIFY TTL' query.
bool force_ttl = false;
/// For DROP_RANGE, true means that the parts need not be deleted, but moved to the `detached` directory.
bool detach = false;
/// For ALTER TODO(alesap)
String required_mutation_znode;
/// REPLACE PARTITION FROM command
struct ReplaceRangeEntry
{
@ -110,12 +108,17 @@ struct ReplicatedMergeTreeLogEntryData
std::shared_ptr<ReplaceRangeEntry> replace_range_entry;
/// Should alter be processed sychronously, or asynchronously.
size_t alter_sync_mode;
/// Mutation commands for alter if any.
String mutation_commands;
/// Returns a set of parts that will appear after executing the entry + parts to block
/// selection of merges. These parts are added to queue.virtual_parts.
Strings getVirtualPartNames() const
{
/// Doesn't produce any part
if (type == FINISH_ALTER)
if (type == ALTER_METADATA)
return {};
/// DROP_RANGE does not add a real part, but we must disable merges in that range

View File

@ -149,6 +149,10 @@ void ReplicatedMergeTreeQueue::insertUnlocked(
min_unprocessed_insert_time_changed = min_unprocessed_insert_time;
}
}
if (entry->type == LogEntry::ALTER_METADATA)
{
alter_znodes_in_queue.push_back(entry->znode_name);
}
}
@ -219,6 +223,16 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
current_parts.remove(drop_range_part_name);
virtual_parts.remove(drop_range_part_name);
}
if (entry->type == LogEntry::ALTER_METADATA)
{
if (alter_znodes_in_queue.front() != entry->znode_name)
{
/// TODO(alesap) Better
throw Exception("Processed incorrect alter.", ErrorCodes::LOGICAL_ERROR);
}
alter_znodes_in_queue.pop_front();
}
}
else
{
@ -1009,15 +1023,9 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
}
}
if (entry.type == LogEntry::FINISH_ALTER)
if (entry.type == LogEntry::ALTER_METADATA)
{
//std::cerr << "Entry finish alter\n";
if (mutations_by_znode.count(entry.required_mutation_znode) && !mutations_by_znode.at(entry.required_mutation_znode).is_done) {
String reason = "Not altering storage because mutation " + entry.required_mutation_znode + " is not ready yet (mutation is beeing processed).";
LOG_TRACE(log, reason);
out_postpone_reason = reason;
return false;
}
return entry.znode_name == alter_znodes_in_queue.front();
}
return true;

View File

@ -77,6 +77,9 @@ private:
time_t last_queue_update = 0;
/// This vector is used for sequential execution of alters
std::deque<String> alter_znodes_in_queue;
/// parts that will appear as a result of actions performed right now by background threads (these actions are not in the queue).
/// Used to block other actions on parts in the range covered by future_parts.
using FuturePartsSet = std::map<String, LogEntryPtr>;
@ -161,6 +164,7 @@ private:
/// Put a set of (already existing) parts in virtual_parts.
void addVirtualParts(const MergeTreeData::DataParts & parts);
/// Insert new entry from log into queue
void insertUnlocked(
const LogEntryPtr & entry, std::optional<time_t> & min_unprocessed_insert_time_changed,
std::lock_guard<std::mutex> & state_lock);

View File

@ -46,7 +46,7 @@ struct MutationCommand
/// For reads, drops and etc.
String column_name;
DataTypePtr data_type;
DataTypePtr data_type; /// Maybe empty if we just want to drop column
/// If from_zookeeper, than consider more Alter commands as mutation commands
static std::optional<MutationCommand> parse(ASTAlterCommand * command, bool from_zookeeper=false);

View File

@ -988,7 +988,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(LogEntry & entry)
{
do_fetch = !tryExecutePartMutation(entry);
}
else if (entry.type == LogEntry::FINISH_ALTER)
else if (entry.type == LogEntry::ALTER_METADATA)
{
executeMetadataAlter(entry);
}
@ -3345,6 +3345,7 @@ void StorageReplicatedMergeTree::alter(
//std::cerr << " Columns preparation to alter:" << getColumns().getAllPhysical().toString() << std::endl;
ReplicatedMergeTreeLogEntryData entry;
/// /columns and /metadata nodes
std::vector<ChangedNode> changed_nodes;
{
@ -3392,42 +3393,32 @@ void StorageReplicatedMergeTree::alter(
for (const auto & node : changed_nodes)
ops.emplace_back(zkutil::makeSetRequest(node.shared_path, node.new_value, -1));
entry.type = LogEntry::ALTER_METADATA;
entry.source_replica = replica_name;
WriteBufferFromString wb(entry.mutation_commands);
maybe_mutation_commands.writeText(wb);
wb.finalize();
entry.create_time = time(nullptr);
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential));
Coordination::Responses results = getZooKeeper()->multi(ops);
for (size_t i = 0; i < changed_nodes.size(); ++i)
changed_nodes[i].new_version = dynamic_cast<const Coordination::SetResponse &>(*results[i]).stat.version;
String path_created = dynamic_cast<const Coordination::CreateResponse &>(*results.back()).path_created;
entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1);
}
LOG_DEBUG(log, "Updated shared metadata nodes in ZooKeeper. Waiting for replicas to apply changes.");
table_lock_holder.release();
ReplicatedMergeTreeLogEntryData entry;
entry.type = LogEntry::FINISH_ALTER;
entry.source_replica = replica_name;
////std::cerr << " Columns before mutation:" << getColumns().getAllPhysical().toString() << std::endl;
entry.new_part_name = "";
entry.create_time = time(nullptr);
String path_created = getZooKeeper()->create(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential);
entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1);
////std::cerr << "Waiting for replicas\n";
auto unwaited = waitForAllReplicasToProcessLogEntry(entry, false);
////std::cerr << "Replicas done";
if (!maybe_mutation_commands.empty())
{
////std::cerr << "We have mutation commands:" << maybe_mutation_commands.size() << std::endl;
Context copy_context = query_context;
copy_context.getSettingsRef().mutations_sync = 2;
ReplicatedMergeTreeMutationEntry mutation_entry = mutateImpl(maybe_mutation_commands, copy_context);
////std::cerr << "Mutation finished\n";
}
if (!unwaited.empty())
{
throw Exception("Some replicas doesn't finish alter", ErrorCodes::UNFINISHED);