More flexible schema

This commit is contained in:
alesapin 2020-01-31 15:25:31 +03:00
parent d0e5243ad8
commit 01ff1c65e2
5 changed files with 153 additions and 79 deletions

View File

@ -66,7 +66,6 @@ struct ReplicatedMergeTreeLogEntryData
void readText(ReadBuffer & in); void readText(ReadBuffer & in);
String toString() const; String toString() const;
/// log-xxx
String znode_name; String znode_name;
Type type = EMPTY; Type type = EMPTY;
@ -108,10 +107,8 @@ struct ReplicatedMergeTreeLogEntryData
std::shared_ptr<ReplaceRangeEntry> replace_range_entry; std::shared_ptr<ReplaceRangeEntry> replace_range_entry;
/// Should alter be processed sychronously, or asynchronously. //TODO(alesap)
size_t alter_sync_mode; int alter_version;
/// Mutation commands for alter if any.
String mutation_commands;
String columns_str; String columns_str;
String metadata_str; String metadata_str;

View File

@ -151,7 +151,12 @@ void ReplicatedMergeTreeQueue::insertUnlocked(
} }
if (entry->type == LogEntry::ALTER_METADATA) if (entry->type == LogEntry::ALTER_METADATA)
{ {
alter_znodes_in_queue.push_back(entry->znode_name); alter_sequence.addMetadataAlter(entry->alter_version);
}
if (entry->type == LogEntry::MUTATE_PART && entry->alter_version != -1)
{
alter_sequence.addDataAlterIfEmpty(entry->alter_version);
} }
} }
@ -226,12 +231,7 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
if (entry->type == LogEntry::ALTER_METADATA) if (entry->type == LogEntry::ALTER_METADATA)
{ {
if (alter_znodes_in_queue.front() != entry->znode_name) alter_sequence.finishMetadataAlter(entry->alter_version);
{
/// TODO(alesap) Better
throw Exception("Processed incorrect alter.", ErrorCodes::LOGICAL_ERROR);
}
alter_znodes_in_queue.pop_front();
} }
} }
else else
@ -1025,7 +1025,12 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
if (entry.type == LogEntry::ALTER_METADATA) if (entry.type == LogEntry::ALTER_METADATA)
{ {
return entry.znode_name == alter_znodes_in_queue.front(); return alter_sequence.canExecuteMetadataAlter(entry.alter_version);
}
if (entry.type == LogEntry::MUTATE_PART && entry.alter_version != -1)
{
return alter_sequence.canExecuteDataAlter(entry.alter_version);
} }
return true; return true;
@ -1304,6 +1309,8 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep
{ {
LOG_TRACE(log, "Marking mutation " << znode << " done because it is <= mutation_pointer (" << mutation_pointer << ")"); LOG_TRACE(log, "Marking mutation " << znode << " done because it is <= mutation_pointer (" << mutation_pointer << ")");
mutation.is_done = true; mutation.is_done = true;
if (mutation.entry->alter_version != -1)
alter_sequence.finishDataAlter(mutation.entry->alter_version);
} }
else if (mutation.parts_to_do == 0) else if (mutation.parts_to_do == 0)
{ {
@ -1340,6 +1347,8 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep
{ {
LOG_TRACE(log, "Mutation " << entry->znode_name << " is done"); LOG_TRACE(log, "Mutation " << entry->znode_name << " is done");
it->second.is_done = true; it->second.is_done = true;
if (entry->alter_version != -1)
alter_sequence.finishDataAlter(entry->alter_version);
} }
} }
} }
@ -1703,7 +1712,7 @@ bool ReplicatedMergeTreeMergePredicate::operator()(
} }
std::optional<Int64> ReplicatedMergeTreeMergePredicate::getDesiredMutationVersion(const MergeTreeData::DataPartPtr & part) const std::optional<std::pair<Int64, int>> ReplicatedMergeTreeMergePredicate::getDesiredMutationVersion(const MergeTreeData::DataPartPtr & part) const
{ {
/// Assigning mutations is easier than assigning merges because mutations appear in the same order as /// Assigning mutations is easier than assigning merges because mutations appear in the same order as
/// the order of their version numbers (see StorageReplicatedMergeTree::mutate). /// the order of their version numbers (see StorageReplicatedMergeTree::mutate).
@ -1726,11 +1735,22 @@ std::optional<Int64> ReplicatedMergeTreeMergePredicate::getDesiredMutationVersio
return {}; return {};
Int64 current_version = queue.getCurrentMutationVersionImpl(part->info.partition_id, part->info.getDataVersion(), lock); Int64 current_version = queue.getCurrentMutationVersionImpl(part->info.partition_id, part->info.getDataVersion(), lock);
Int64 max_version = in_partition->second.rbegin()->first; Int64 max_version = in_partition->second.begin()->first;
int alter_version = -1;
for (auto [mutation_version, mutation_status] : in_partition->second)
{
max_version = mutation_version;
if (mutations_status->entry->alter_version != -1)
{
alter_version = mutations_status->entry->alter_version;
break;
}
}
if (current_version >= max_version) if (current_version >= max_version)
return {}; return {};
return max_version; return std::make_pair(max_version, alter_version);
} }

View File

@ -9,6 +9,7 @@
#include <Storages/MergeTree/MergeTreeData.h> #include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeMutationStatus.h> #include <Storages/MergeTree/MergeTreeMutationStatus.h>
#include <Storages/MergeTree/ReplicatedMergeTreeQuorumAddedParts.h> #include <Storages/MergeTree/ReplicatedMergeTreeQuorumAddedParts.h>
#include <Storages/MergeTree/ReplicatedQueueAlterState.h>
#include <Common/ZooKeeper/ZooKeeper.h> #include <Common/ZooKeeper/ZooKeeper.h>
#include <Core/BackgroundSchedulePool.h> #include <Core/BackgroundSchedulePool.h>
@ -126,10 +127,10 @@ private:
/// Znode ID of the latest mutation that is done. /// Znode ID of the latest mutation that is done.
String mutation_pointer; String mutation_pointer;
/// Provides only one simultaneous call to pullLogsToQueue. /// Provides only one simultaneous call to pullLogsToQueue.
std::mutex pull_logs_to_queue_mutex; std::mutex pull_logs_to_queue_mutex;
AlterSequence alter_sequence;
/// List of subscribers /// List of subscribers
/// A subscriber callback is called when an entry queue is deleted /// A subscriber callback is called when an entry queue is deleted
@ -374,6 +375,11 @@ public:
void getInsertTimes(time_t & out_min_unprocessed_insert_time, time_t & out_max_processed_insert_time) const; void getInsertTimes(time_t & out_min_unprocessed_insert_time, time_t & out_max_processed_insert_time) const;
std::vector<MergeTreeMutationStatus> getMutationsStatus() const; std::vector<MergeTreeMutationStatus> getMutationsStatus() const;
String getLatestMutation() const
{
return mutations_by_znode.rbegin()->first;
}
}; };
class ReplicatedMergeTreeMergePredicate class ReplicatedMergeTreeMergePredicate
@ -390,7 +396,7 @@ public:
/// Return nonempty optional if the part can and should be mutated. /// Return nonempty optional if the part can and should be mutated.
/// Returned mutation version number is always the biggest possible. /// Returned mutation version number is always the biggest possible.
std::optional<Int64> getDesiredMutationVersion(const MergeTreeData::DataPartPtr & part) const; std::optional<std::pair<Int64, int>> getDesiredMutationVersion(const MergeTreeData::DataPartPtr & part) const;
bool isMutationFinished(const ReplicatedMergeTreeMutationEntry & mutation) const; bool isMutationFinished(const ReplicatedMergeTreeMutationEntry & mutation) const;

View File

@ -404,6 +404,8 @@ void StorageReplicatedMergeTree::createNewZooKeeperNodes()
/// ALTERs of the metadata node. /// ALTERs of the metadata node.
zookeeper->createIfNotExists(replica_path + "/metadata", String()); zookeeper->createIfNotExists(replica_path + "/metadata", String());
zookeeper->createIfNotExists(zookeeper_path + "/alters", String());
} }
@ -469,6 +471,7 @@ void StorageReplicatedMergeTree::checkTableStructure(bool skip_sanity_checks, bo
auto columns_from_zk = ColumnsDescription::parse(zookeeper->get(zookeeper_path + "/columns", &columns_stat)); auto columns_from_zk = ColumnsDescription::parse(zookeeper->get(zookeeper_path + "/columns", &columns_stat));
//columns_version = columns_stat.version; //columns_version = columns_stat.version;
/// TODO(alesap) remove this trash
const ColumnsDescription & old_columns = getColumns(); const ColumnsDescription & old_columns = getColumns();
if (columns_from_zk != old_columns || !metadata_diff.empty()) if (columns_from_zk != old_columns || !metadata_diff.empty())
{ {
@ -2296,11 +2299,11 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
if (part->bytes_on_disk > max_source_part_size_for_mutation) if (part->bytes_on_disk > max_source_part_size_for_mutation)
continue; continue;
std::optional<Int64> desired_mutation_version = merge_pred.getDesiredMutationVersion(part); std::optional<std::pair<Int64, int>> desired_mutation_version = merge_pred.getDesiredMutationVersion(part);
if (!desired_mutation_version) if (!desired_mutation_version)
continue; continue;
if (createLogEntryToMutatePart(*part, *desired_mutation_version)) if (createLogEntryToMutatePart(*part, desired_mutation_version->first, desired_mutation_version->second))
{ {
success = true; success = true;
break; break;
@ -2401,7 +2404,7 @@ bool StorageReplicatedMergeTree::createLogEntryToMergeParts(
} }
bool StorageReplicatedMergeTree::createLogEntryToMutatePart(const MergeTreeDataPart & part, Int64 mutation_version) bool StorageReplicatedMergeTree::createLogEntryToMutatePart(const MergeTreeDataPart & part, Int64 mutation_version, int alter_version)
{ {
auto zookeeper = getZooKeeper(); auto zookeeper = getZooKeeper();
@ -2431,6 +2434,7 @@ bool StorageReplicatedMergeTree::createLogEntryToMutatePart(const MergeTreeDataP
entry.source_parts.push_back(part.name); entry.source_parts.push_back(part.name);
entry.new_part_name = new_part_name; entry.new_part_name = new_part_name;
entry.create_time = time(nullptr); entry.create_time = time(nullptr);
entry.alter_version = alter_version;
zookeeper->create(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential); zookeeper->create(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential);
return true; return true;
@ -3245,18 +3249,57 @@ bool StorageReplicatedMergeTree::executeMetadataAlter(const StorageReplicatedMer
std::cerr << "Nodes in zk updated\n"; std::cerr << "Nodes in zk updated\n";
} }
if (!entry.mutation_commands.empty()) if (!entry.mutation_commands.empty())
{ {
MutationCommands commands; MutationCommands commands;
ReadBufferFromString in(entry.mutation_commands); ReadBufferFromString in(entry.mutation_commands);
commands.readText(in); commands.readText(in);
if (is_leader)
String mutation_znode;
while (true)
{ {
auto mutation_entry = mutateImpl(commands); Coordination::Requests requests;
waitMutation(mutation_entry, entry.alter_sync_mode); requests.emplace_back(zkutil::makeCreateRequest(
zookeeper_path + "/alters" + entry.znode_name, String(), zkutil::CreateMode::PersistentSequential));
prepareMutationEntry(zookeeper, commands, requests);
std::cerr << replica_name << " - " << "REqeusts size:" << requests.size() << std::endl;
Coordination::Responses responses;
int32_t rc = zookeeper->tryMulti(requests, responses);
if (rc == Coordination::ZOK)
{
const String & path_created = dynamic_cast<const Coordination::CreateResponse *>(responses.back().get())->path_created;
mutation_znode = path_created.substr(path_created.find_last_of('/') + 1);
std::cerr << "Created mutation\n";
LOG_TRACE(log, "Created mutation with ID " << mutation_znode);
break;
}
else if (rc == Coordination::ZNODEEXISTS)
{
queue.updateMutations(zookeeper);
mutation_znode = queue.getLatestMutation();
std::cerr << replica_name << " - " << "Found mutation in queue:" << mutation_znode;
LOG_TRACE(log, "Already have mutation with ID " << mutation_znode);
break;
}
else if (rc != Coordination::ZBADVERSION)
{
throw Coordination::Exception("Cannot create mutation for alter", rc);
}
std::cerr << "LOOOPING\n";
} }
waitMutation(mutation_znode, entry.alter_sync_mode);
} }
return true; return true;
} }
@ -3296,6 +3339,8 @@ void StorageReplicatedMergeTree::alter(
return queryToString(query); return queryToString(query);
}; };
auto zookeeper = getZooKeeper();
//std::cerr << " Columns preparation to alter:" << getColumns().getAllPhysical().toString() << std::endl; //std::cerr << " Columns preparation to alter:" << getColumns().getAllPhysical().toString() << std::endl;
ReplicatedMergeTreeLogEntryData entry; ReplicatedMergeTreeLogEntryData entry;
@ -3355,6 +3400,11 @@ void StorageReplicatedMergeTree::alter(
ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential)); ops.emplace_back(zkutil::makeCreateRequest(zookeeper_path + "/log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential));
if (!entry.mutation_commands.empty())
{
prepareMutationEntry(zookeeper, entry.mutation_commands, ops);
}
Coordination::Responses results = getZooKeeper()->multi(ops); Coordination::Responses results = getZooKeeper()->multi(ops);
String path_created = dynamic_cast<const Coordination::CreateResponse &>(*results.back()).path_created; String path_created = dynamic_cast<const Coordination::CreateResponse &>(*results.back()).path_created;
@ -4338,11 +4388,15 @@ void StorageReplicatedMergeTree::fetchPartition(const ASTPtr & partition, const
void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, const Context & query_context) void StorageReplicatedMergeTree::mutate(const MutationCommands & commands, const Context & query_context)
{ {
auto entry = mutateImpl(commands); auto zookeeper = getZooKeeper();
waitMutation(entry, query_context.getSettingsRef().mutations_sync); Coordination::Requests requests;
ReplicatedMergeTreeMutationEntry entry = prepareMutationEntry(zookeeper, commands, requests);
mutateImpl(zookeeper, requests, entry);
waitMutation(entry.znode_name, query_context.getSettingsRef().mutations_sync);
} }
void StorageReplicatedMergeTree::waitMutation(const ReplicatedMergeTreeMutationEntry & entry, size_t mutations_sync) const void StorageReplicatedMergeTree::waitMutation(const String & znode_name, size_t mutations_sync) const
{ {
auto zookeeper = getZooKeeper(); auto zookeeper = getZooKeeper();
/// we have to wait /// we have to wait
@ -4354,11 +4408,39 @@ void StorageReplicatedMergeTree::waitMutation(const ReplicatedMergeTreeMutationE
else if (mutations_sync == 1) /// just wait for ourself else if (mutations_sync == 1) /// just wait for ourself
replicas.push_back(replica_name); replicas.push_back(replica_name);
waitMutationToFinishOnReplicas(replicas, entry.znode_name); waitMutationToFinishOnReplicas(replicas, znode_name);
} }
} }
ReplicatedMergeTreeMutationEntry StorageReplicatedMergeTree::mutateImpl(const MutationCommands & commands)
ReplicatedMergeTreeMutationEntry StorageReplicatedMergeTree::prepareMutationEntry(
zkutil::ZooKeeperPtr zookeeper, const MutationCommands & commands, Coordination::Requests & requests) const
{
String mutations_path = zookeeper_path + "/mutations";
ReplicatedMergeTreeMutationEntry entry;
entry.source_replica = replica_name;
entry.commands = commands;
Coordination::Stat mutations_stat;
zookeeper->get(mutations_path, &mutations_stat);
EphemeralLocksInAllPartitions block_number_locks(zookeeper_path + "/block_numbers", "block-", zookeeper_path + "/temp", *zookeeper);
for (const auto & lock : block_number_locks.getLocks())
entry.block_numbers[lock.partition_id] = lock.number;
entry.create_time = time(nullptr);
requests.emplace_back(zkutil::makeSetRequest(mutations_path, String(), mutations_stat.version));
requests.emplace_back(zkutil::makeCreateRequest(mutations_path + "/", entry.toString(), zkutil::CreateMode::PersistentSequential));
return entry;
}
void StorageReplicatedMergeTree::mutateImpl(
zkutil::ZooKeeperPtr zookeeper,
const Coordination::Requests & requests,
ReplicatedMergeTreeMutationEntry & entry)
{ {
/// Overview of the mutation algorithm. /// Overview of the mutation algorithm.
/// ///
@ -4413,57 +4495,25 @@ ReplicatedMergeTreeMutationEntry StorageReplicatedMergeTree::mutateImpl(const Mu
/// After all needed parts are mutated (i.e. all active parts have the mutation version greater than /// After all needed parts are mutated (i.e. all active parts have the mutation version greater than
/// the version of this mutation), the mutation is considered done and can be deleted. /// the version of this mutation), the mutation is considered done and can be deleted.
ReplicatedMergeTreeMutationEntry entry;
entry.source_replica = replica_name;
entry.commands = commands;
String mutations_path = zookeeper_path + "/mutations"; Coordination::Responses responses;
int32_t rc = zookeeper->tryMulti(requests, responses);
/// Update the mutations_path node when creating the mutation and check its version to ensure that if (rc == Coordination::ZOK)
/// nodes for mutations are created in the same order as the corresponding block numbers.
/// Should work well if the number of concurrent mutation requests is small.
while (true)
{ {
auto zookeeper = getZooKeeper(); const String & path_created =
dynamic_cast<const Coordination::CreateResponse *>(responses[1].get())->path_created;
Coordination::Stat mutations_stat; entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1);
zookeeper->get(mutations_path, &mutations_stat); LOG_TRACE(log, "Created mutation with ID " << entry.znode_name);
EphemeralLocksInAllPartitions block_number_locks(
zookeeper_path + "/block_numbers", "block-", zookeeper_path + "/temp", *zookeeper);
for (const auto & lock : block_number_locks.getLocks())
entry.block_numbers[lock.partition_id] = lock.number;
entry.create_time = time(nullptr);
Coordination::Requests requests;
requests.emplace_back(zkutil::makeSetRequest(mutations_path, String(), mutations_stat.version));
requests.emplace_back(zkutil::makeCreateRequest(
mutations_path + "/", entry.toString(), zkutil::CreateMode::PersistentSequential));
Coordination::Responses responses;
int32_t rc = zookeeper->tryMulti(requests, responses);
if (rc == Coordination::ZOK)
{
const String & path_created =
dynamic_cast<const Coordination::CreateResponse *>(responses[1].get())->path_created;
entry.znode_name = path_created.substr(path_created.find_last_of('/') + 1);
LOG_TRACE(log, "Created mutation with ID " << entry.znode_name);
break;
}
else if (rc == Coordination::ZBADVERSION)
{
LOG_TRACE(log, "Version conflict when trying to create a mutation node, retrying...");
continue;
}
else
throw Coordination::Exception("Unable to create a mutation znode", rc);
} }
else if (rc == Coordination::ZBADVERSION)
/// This error mean, that parallel mutation is created in mutations queue (/mutations) right now.
return entry; /// (NOTE: concurrent mutations execution is OK, but here we have case with concurrent mutation intention from client)
/// We can retry this error by ourself, but mutations is not designed for highly concurrent execution.
/// So, if client sure that he do what he want, than he should retry.
throw Coordination::Exception("Parallel mutation is creating right now. Client should retry.", rc);
else
throw Coordination::Exception("Unable to create a mutation znode", rc);
} }
std::vector<MergeTreeMutationStatus> StorageReplicatedMergeTree::getMutationsStatus() const std::vector<MergeTreeMutationStatus> StorageReplicatedMergeTree::getMutationsStatus() const

View File

@ -109,8 +109,9 @@ public:
void alterPartition(const ASTPtr & query, const PartitionCommands & commands, const Context & query_context) override; void alterPartition(const ASTPtr & query, const PartitionCommands & commands, const Context & query_context) override;
void mutate(const MutationCommands & commands, const Context & context) override; void mutate(const MutationCommands & commands, const Context & context) override;
ReplicatedMergeTreeMutationEntry mutateImpl(const MutationCommands & commands); ReplicatedMergeTreeMutationEntry prepareMutationEntry(zkutil::ZooKeeperPtr zk, const MutationCommands & commands, Coordination::Requests & requests) const;
void waitMutation(const ReplicatedMergeTreeMutationEntry & entry, size_t mutation_sync) const; void mutateImpl(zkutil::ZooKeeperPtr zookeeper, const Coordination::Requests & requests, ReplicatedMergeTreeMutationEntry & entry);
void waitMutation(const String & znode_name, size_t mutation_sync) const;
std::vector<MergeTreeMutationStatus> getMutationsStatus() const override; std::vector<MergeTreeMutationStatus> getMutationsStatus() const override;
CancellationCode killMutation(const String & mutation_id) override; CancellationCode killMutation(const String & mutation_id) override;
@ -438,7 +439,7 @@ private:
bool force_ttl, bool force_ttl,
ReplicatedMergeTreeLogEntryData * out_log_entry = nullptr); ReplicatedMergeTreeLogEntryData * out_log_entry = nullptr);
bool createLogEntryToMutatePart(const MergeTreeDataPart & part, Int64 mutation_version); bool createLogEntryToMutatePart(const MergeTreeDataPart & part, Int64 mutation_version, int alter_version);
/// Exchange parts. /// Exchange parts.