From 974e9a6b7ac5f57634688de1f18678ee761bf44b Mon Sep 17 00:00:00 2001 From: alesapin Date: Tue, 28 Jan 2020 20:15:22 +0300 Subject: [PATCH] Broke things --- .../MergeTree/ReplicatedMergeTreeLogEntry.cpp | 15 ++++--- .../MergeTree/ReplicatedMergeTreeLogEntry.h | 17 ++++---- .../MergeTree/ReplicatedMergeTreeQueue.cpp | 24 +++++++---- .../MergeTree/ReplicatedMergeTreeQueue.h | 4 ++ dbms/src/Storages/MutationCommands.h | 2 +- .../Storages/StorageReplicatedMergeTree.cpp | 41 ++++++++----------- 6 files changed, 57 insertions(+), 46 deletions(-) diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp index f314bacdc53..ce8d873b972 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.cpp @@ -65,10 +65,12 @@ void ReplicatedMergeTreeLogEntryData::writeText(WriteBuffer & out) const << new_part_name; break; - case FINISH_ALTER: /// Just make local /metadata and /columns consistent with global + case ALTER_METADATA: /// Just make local /metadata and /columns consistent with global out << "alter\n"; - out << required_mutation_znode << "\n"; - out << "finish\n"; + out << "sync_mode\n"; + out << alter_sync_mode << "\n"; + out << "mutatation_commands\n"; + out << mutation_commands << '\n'; break; default: @@ -160,8 +162,11 @@ void ReplicatedMergeTreeLogEntryData::readText(ReadBuffer & in) } else if (type_str == "alter") { - type = FINISH_ALTER; - in >> required_mutation_znode >> "\nfinish\n"; + type = ALTER_METADATA; + in >> "sync_mode\n"; + in >> alter_sync_mode; + in >> "mutatation_commands\n"; + in >> mutation_commands; } //std::cerr << "Read backn\n"; diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h index f7744d5a110..0e836ec7c27 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeLogEntry.h @@ -37,7 +37,7 @@ struct ReplicatedMergeTreeLogEntryData CLEAR_INDEX, /// Drop specific index from specified partition. REPLACE_RANGE, /// Drop certain range of partitions and replace them by new ones MUTATE_PART, /// Apply one or several mutations to the part. - FINISH_ALTER, /// Apply one or several alter modifications to part + ALTER_METADATA, /// Apply alter modification according to global /metadata and /columns paths }; static String typeToString(Type type) @@ -51,7 +51,7 @@ struct ReplicatedMergeTreeLogEntryData case ReplicatedMergeTreeLogEntryData::CLEAR_INDEX: return "CLEAR_INDEX"; case ReplicatedMergeTreeLogEntryData::REPLACE_RANGE: return "REPLACE_RANGE"; case ReplicatedMergeTreeLogEntryData::MUTATE_PART: return "MUTATE_PART"; - case ReplicatedMergeTreeLogEntryData::FINISH_ALTER: return "FINISH_ALTER"; + case ReplicatedMergeTreeLogEntryData::ALTER_METADATA: return "ALTER_METADATA"; default: throw Exception("Unknown log entry type: " + DB::toString(type), ErrorCodes::LOGICAL_ERROR); } @@ -66,6 +66,7 @@ struct ReplicatedMergeTreeLogEntryData void readText(ReadBuffer & in); String toString() const; + /// log-xxx String znode_name; Type type = EMPTY; @@ -83,15 +84,12 @@ struct ReplicatedMergeTreeLogEntryData String index_name; /// Force filter by TTL in 'OPTIMIZE ... FINAL' query to remove expired values from old parts - /// without TTL infos or with outdated TTL infos, e.g. after 'ALTER ... MODIFY TTL' query. + /// without TTL infos or with outdated TTL infos, e.g. after 'ALTER ... MODIFY TTL' query. bool force_ttl = false; /// For DROP_RANGE, true means that the parts need not be deleted, but moved to the `detached` directory. bool detach = false; - /// For ALTER TODO(alesap) - String required_mutation_znode; - /// REPLACE PARTITION FROM command struct ReplaceRangeEntry { @@ -110,12 +108,17 @@ struct ReplicatedMergeTreeLogEntryData std::shared_ptr replace_range_entry; + /// Should alter be processed sychronously, or asynchronously. + size_t alter_sync_mode; + /// Mutation commands for alter if any. + String mutation_commands; + /// Returns a set of parts that will appear after executing the entry + parts to block /// selection of merges. These parts are added to queue.virtual_parts. Strings getVirtualPartNames() const { /// Doesn't produce any part - if (type == FINISH_ALTER) + if (type == ALTER_METADATA) return {}; /// DROP_RANGE does not add a real part, but we must disable merges in that range diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index eef2710ea20..f9622009325 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -149,6 +149,10 @@ void ReplicatedMergeTreeQueue::insertUnlocked( min_unprocessed_insert_time_changed = min_unprocessed_insert_time; } } + if (entry->type == LogEntry::ALTER_METADATA) + { + alter_znodes_in_queue.push_back(entry->znode_name); + } } @@ -219,6 +223,16 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval( current_parts.remove(drop_range_part_name); virtual_parts.remove(drop_range_part_name); } + + if (entry->type == LogEntry::ALTER_METADATA) + { + if (alter_znodes_in_queue.front() != entry->znode_name) + { + /// TODO(alesap) Better + throw Exception("Processed incorrect alter.", ErrorCodes::LOGICAL_ERROR); + } + alter_znodes_in_queue.pop_front(); + } } else { @@ -1009,15 +1023,9 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( } } - if (entry.type == LogEntry::FINISH_ALTER) + if (entry.type == LogEntry::ALTER_METADATA) { - //std::cerr << "Entry finish alter\n"; - if (mutations_by_znode.count(entry.required_mutation_znode) && !mutations_by_znode.at(entry.required_mutation_znode).is_done) { - String reason = "Not altering storage because mutation " + entry.required_mutation_znode + " is not ready yet (mutation is beeing processed)."; - LOG_TRACE(log, reason); - out_postpone_reason = reason; - return false; - } + return entry.znode_name == alter_znodes_in_queue.front(); } return true; diff --git a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h index cad23df6f46..299b4ed1c48 100644 --- a/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h +++ b/dbms/src/Storages/MergeTree/ReplicatedMergeTreeQueue.h @@ -77,6 +77,9 @@ private: time_t last_queue_update = 0; + /// This vector is used for sequential execution of alters + std::deque alter_znodes_in_queue; + /// parts that will appear as a result of actions performed right now by background threads (these actions are not in the queue). /// Used to block other actions on parts in the range covered by future_parts. using FuturePartsSet = std::map; @@ -161,6 +164,7 @@ private: /// Put a set of (already existing) parts in virtual_parts. void addVirtualParts(const MergeTreeData::DataParts & parts); + /// Insert new entry from log into queue void insertUnlocked( const LogEntryPtr & entry, std::optional & min_unprocessed_insert_time_changed, std::lock_guard & state_lock); diff --git a/dbms/src/Storages/MutationCommands.h b/dbms/src/Storages/MutationCommands.h index 265c5ae1a6f..7f7f31b6315 100644 --- a/dbms/src/Storages/MutationCommands.h +++ b/dbms/src/Storages/MutationCommands.h @@ -46,7 +46,7 @@ struct MutationCommand /// For reads, drops and etc. String column_name; - DataTypePtr data_type; + DataTypePtr data_type; /// Maybe empty if we just want to drop column /// If from_zookeeper, than consider more Alter commands as mutation commands static std::optional parse(ASTAlterCommand * command, bool from_zookeeper=false); diff --git a/dbms/src/Storages/StorageReplicatedMergeTree.cpp b/dbms/src/Storages/StorageReplicatedMergeTree.cpp index 323b939e737..574b73cfd74 100644 --- a/dbms/src/Storages/StorageReplicatedMergeTree.cpp +++ b/dbms/src/Storages/StorageReplicatedMergeTree.cpp @@ -988,7 +988,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(LogEntry & entry) { do_fetch = !tryExecutePartMutation(entry); } - else if (entry.type == LogEntry::FINISH_ALTER) + else if (entry.type == LogEntry::ALTER_METADATA) { executeMetadataAlter(entry); } @@ -3345,6 +3345,7 @@ void StorageReplicatedMergeTree::alter( //std::cerr << " Columns preparation to alter:" << getColumns().getAllPhysical().toString() << std::endl; + ReplicatedMergeTreeLogEntryData entry; /// /columns and /metadata nodes std::vector changed_nodes; { @@ -3392,42 +3393,32 @@ void StorageReplicatedMergeTree::alter( for (const auto & node : changed_nodes) ops.emplace_back(zkutil::makeSetRequest(node.shared_path, node.new_value, -1)); + entry.type = LogEntry::ALTER_METADATA; + entry.source_replica = replica_name; + + WriteBufferFromString wb(entry.mutation_commands); + maybe_mutation_commands.writeText(wb); + wb.finalize(); + + entry.create_time = time(nullptr); + + ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential)); + Coordination::Responses results = getZooKeeper()->multi(ops); for (size_t i = 0; i < changed_nodes.size(); ++i) changed_nodes[i].new_version = dynamic_cast(*results[i]).stat.version; + + String path_created = dynamic_cast(*results.back()).path_created; + entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1); } LOG_DEBUG(log, "Updated shared metadata nodes in ZooKeeper. Waiting for replicas to apply changes."); table_lock_holder.release(); - ReplicatedMergeTreeLogEntryData entry; - entry.type = LogEntry::FINISH_ALTER; - entry.source_replica = replica_name; - - ////std::cerr << " Columns before mutation:" << getColumns().getAllPhysical().toString() << std::endl; - - entry.new_part_name = ""; - entry.create_time = time(nullptr); - - String path_created = getZooKeeper()->create(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential); - entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1); - - ////std::cerr << "Waiting for replicas\n"; auto unwaited = waitForAllReplicasToProcessLogEntry(entry, false); - ////std::cerr << "Replicas done"; - - if (!maybe_mutation_commands.empty()) - { - ////std::cerr << "We have mutation commands:" << maybe_mutation_commands.size() << std::endl; - Context copy_context = query_context; - copy_context.getSettingsRef().mutations_sync = 2; - ReplicatedMergeTreeMutationEntry mutation_entry = mutateImpl(maybe_mutation_commands, copy_context); - ////std::cerr << "Mutation finished\n"; - } - if (!unwaited.empty()) { throw Exception("Some replicas doesn't finish alter", ErrorCodes::UNFINISHED);