Something working

This commit is contained in:
alesapin 2020-02-10 19:55:09 +03:00
parent 2a866043aa
commit 260a4687f0
4 changed files with 88 additions and 124 deletions

View File

@ -256,6 +256,7 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo
zkutil::CreateMode::PersistentSequential));
ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/columns", storage.getMetadataVersion()));
ops.emplace_back(zkutil::makeSetRequest(storage.zookeeper_path + "/alter_intention_counter", "", -1));
/// Deletes the information that the block number is used for writing.
block_number_lock->getUnlockOps(ops);

View File

@ -167,7 +167,6 @@ void ReplicatedMergeTreeQueue::insertUnlocked(
LOG_DEBUG(log, "ADDING DATA ENTRY WITH ALTER VERSION:" << entry->alter_version << " FOR PART:" << entry->source_parts[0] << " to " << entry->getVirtualPartNames()[0]);
//LOG_DEBUG(log, "ADDING DATA ENTRY WITH ALTER VERSION:" << entry->alter_version);
//std::cerr << "INSERT MUTATE PART:" << entry->alter_version << std::endl;
alter_sequence.addDataAlterIfEmpty(entry->alter_version, state_lock);
}
}
@ -246,10 +245,10 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
if (entry->type == LogEntry::ALTER_METADATA)
{
LOG_DEBUG(log, "FIN ALTER FOR PART with ALTER VERSION:" << entry->alter_version);
//std::cerr << "Alter have mutation:" << entry->have_mutation << std::endl;
alter_sequence.finishMetadataAlter(entry->alter_version, entry->have_mutation, state_lock);
LOG_DEBUG(log, "FIN ALTER FOR PART with ALTER VERSION:" << entry->alter_version);
}
if (entry->type == LogEntry::MUTATE_PART)
{
@ -726,6 +725,9 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C
if (mutation.parts_to_do.size() == 0)
some_mutations_are_probably_done = true;
if (entry->alter_version != -1)
alter_sequence.addMutationForAlter(entry->alter_version, entry->block_numbers, state_lock);
}
}
@ -987,6 +989,12 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
MergeTreeData & data,
std::lock_guard<std::mutex> & state_lock) const
{
if (entry.type == LogEntry::GET_PART)
{
if (!alter_sequence.canExecuteGetEntry(entry.new_part_name, format_version, state_lock))
return false;
}
if (entry.type == LogEntry::MERGE_PARTS
|| entry.type == LogEntry::GET_PART
|| entry.type == LogEntry::MUTATE_PART)
@ -1074,20 +1082,23 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
{ //std::cerr << "Should we execute alter:";
LOG_DEBUG(log, "Should we execute alter entry:" << entry.toString());
LOG_DEBUG(log, "Should we execute alter entry:" << entry.znode_name << "\n"<< entry.toString());
LOG_DEBUG(log, "We are in front:" << (entry.znode_name == *entries_in_queue.begin()));
for (auto & log_entry : entries_in_queue)
{
LOG_DEBUG(log, "LogEntry:" << log_entry);
}
for (auto & log_entry : queue)
{
LOG_DEBUG(log, "LogEntryData:" << log_entry->znode_name << "\n" << log_entry->toString());
}
//std::cerr << alter_sequence.canExecuteMetadataAlter(entry.alter_version, state_lock) << std::endl;
if (!alter_sequence.canExecuteMetadataAlter(entry.alter_version, state_lock) || *entries_in_queue.begin() != entry.znode_name)
if (*entries_in_queue.begin() != entry.znode_name || !alter_sequence.canExecuteMetaAlter(entry.alter_version, state_lock))
{
LOG_DEBUG(log, "No we shouldn't");
out_postpone_reason = "Cannot execute alter metadata with version: " + std::to_string(entry.alter_version)
+ " because current head is " + std::to_string(alter_sequence.queue.front().alter_version)
+ " with state: " + std::to_string(alter_sequence.queue.front().state);
out_postpone_reason
= "Cannot execute alter metadata with because head smallest node is " + *entries_in_queue.begin() + " but we are " + entry.znode_name;
return false;
}
else
@ -1101,13 +1112,11 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
//std::cerr << "Should we execute mutation:";
//std::cerr << alter_sequence.canExecuteDataAlter(entry.alter_version, state_lock) << std::endl;
LOG_DEBUG(log, "Should we execute mutation entry:" << entry.toString());
LOG_DEBUG(log, "Should we execute mutation entry:" << entry.znode_name << "\n" << entry.toString());
if (!alter_sequence.canExecuteDataAlter(entry.alter_version, state_lock))
{
LOG_DEBUG(log, "NOOOO");
out_postpone_reason = "Cannot execute alter data with version: " + std::to_string(entry.alter_version)
+ " because current head is " + std::to_string(alter_sequence.queue.front().alter_version)
+ " with state: " + std::to_string(alter_sequence.queue.front().state);
out_postpone_reason = "Cannot execute alter data with version: " + std::to_string(entry.alter_version);
return false;
}
else

View File

@ -4,6 +4,7 @@
#include <Common/Exception.h>
#include <IO/ReadHelpers.h>
#include <common/logger_useful.h>
#include <Storages/MergeTree/MergeTreePartInfo.h>
namespace DB
{
@ -16,22 +17,16 @@ namespace ErrorCodes
class AlterSequence
{
private:
enum AlterState
{
APPLY_METADATA_CHANGES,
APPLY_DATA_CHANGES,
DATA_CHANGES_NOT_NEEDED,
};
struct AlterInQueue
{
int alter_version;
AlterState state;
std::map<String, Int64> block_numbers;
bool metadata_finished = false;
AlterInQueue(int alter_version_, AlterState state_)
: alter_version(alter_version_)
, state(state_)
AlterInQueue() = default;
AlterInQueue(const std::map<String, Int64> & block_numbers_, bool metadata_finished_)
: block_numbers(block_numbers_)
, metadata_finished(metadata_finished_)
{
}
};
@ -43,65 +38,60 @@ public:
: log(log_)
{
}
std::deque<AlterInQueue> queue;
std::map<int, AlterInQueue> queue_state;
bool empty() const {
return queue.empty();
return queue_state.empty();
}
void addMutationForAlter(int alter_version, const std::map<String, Int64> & block_numbers, std::lock_guard<std::mutex> & /*state_lock*/)
{
LOG_DEBUG(log, "Adding mutation with alter version:" << alter_version);
queue_state.emplace(alter_version, AlterInQueue(block_numbers, true));
}
void addMetadataAlter(int alter_version, std::lock_guard<std::mutex> & /*state_lock*/)
{
if (!queue.empty() && queue.front().alter_version > alter_version)
{
throw Exception("Alter not in order " + std::to_string(alter_version), ErrorCodes::LOGICAL_ERROR);
}
queue.emplace_back(alter_version, AlterState::APPLY_METADATA_CHANGES);
LOG_DEBUG(log, "Adding meta with alter version:" << alter_version);
if (!queue_state.count(alter_version))
queue_state.emplace(alter_version, AlterInQueue({}, false));
else
queue_state[alter_version].metadata_finished = false;
}
void addDataAlterIfEmpty(int alter_version, std::lock_guard<std::mutex> & /*state_lock*/)
bool canExecuteGetEntry(const String & part_name, MergeTreeDataFormatVersion format_version, std::lock_guard<std::mutex> & /*state_lock*/) const
{
if (queue.empty())
queue.emplace_back(alter_version, AlterState::APPLY_DATA_CHANGES);
if (empty())
return true;
MergeTreePartInfo info = MergeTreePartInfo::fromPartName(part_name, format_version);
if (queue_state.begin()->second.block_numbers.count(info.partition_id))
return info.getDataVersion() < queue_state.begin()->second.block_numbers.at(info.partition_id);
return true;
//if (queue.front().alter_version != alter_version)
//{
// throw Exception(
// "Alter head has another version number "
// + std::to_string(queue.front().alter_version) + " than ours " + std::to_string(alter_version),
// ErrorCodes::LOGICAL_ERROR);
//}
}
void finishMetadataAlter(int alter_version, bool have_data_alter, std::unique_lock <std::mutex> & /*state_lock*/)
{
if (queue.empty())
if (queue_state.empty())
{
throw Exception("Queue shouldn't be empty on metadata alter", ErrorCodes::LOGICAL_ERROR);
}
LOG_DEBUG(
log,
"FINISHING METADATA ALTER WITH VERSION:" << alter_version << " AND HAVE DATA ALTER: " << have_data_alter
<< " QUEUE HEAD:" << queue.front().alter_version << " state:" << queue.front().state);
if (queue.front().alter_version != alter_version)
throw Exception("Finished metadata alter with version " + std::to_string(alter_version) + " but current alter in queue is " + std::to_string(queue.front().alter_version), ErrorCodes::LOGICAL_ERROR);
if (have_data_alter && queue.front().state == AlterState::APPLY_METADATA_CHANGES)
if (queue_state.begin()->first != alter_version)
{
LOG_DEBUG(
log,
"FINISHING METADATA ALTER WITH VERSION:" << alter_version << " AND SWITCHING QUEUE STATE");
LOG_DEBUG(log, "Finished metadata alter with version " + std::to_string(alter_version) + " but current alter in queue is " + std::to_string(queue_state.begin()->first));
throw Exception("Finished metadata alter with version " + std::to_string(alter_version) + " but current alter in queue is " + std::to_string(queue_state.begin()->first), ErrorCodes::LOGICAL_ERROR);
}
//std::cerr << "Switching head state:" << AlterState::APPLY_DATA_CHANGES << std::endl;
queue.front().state = AlterState::APPLY_DATA_CHANGES;
if (!have_data_alter)
{
queue_state.erase(alter_version);
}
else
{
LOG_DEBUG(log, "FINISHING METADATA ALTER WITH VERSION:" << alter_version << " AND DOING POP");
//std::cerr << "JUST POP FRONT\n";
queue.pop_front();
queue_state[alter_version].metadata_finished = true;
}
}
@ -109,69 +99,29 @@ public:
{
/// queue can be empty after load of finished mutation without move of mutation pointer
if (queue.empty())
if (queue_state.empty())
{
LOG_DEBUG(log, "FINISHING DATA ALTER WITH VERSION:" << alter_version << " BUT QUEUE EMPTY");
return;
}
//std::cerr << "Finishing data alter:" << alter_version << std::endl;
if (queue.front().alter_version != alter_version)
{
for (auto & state : queue)
{
if (state.alter_version == alter_version)
{
LOG_DEBUG(log, "FINISHING DATA ALTER WITH VERSION:" << alter_version << " BUT HEAD IS NOT SAME SO MAKE DATA_CHANGED_NOT_NEEDED");
state.state = AlterState::DATA_CHANGES_NOT_NEEDED;
return;
}
}
}
//if (queue.front().alter_version != alter_version)
//{
// LOG_DEBUG(log, "FINISHING DATA ALTER WITH VERSION:" << alter_version << " BUT QUEUE VERSION IS " << queue.front().alter_version << " state " << queue.front().state);
// throw Exception(
// "Finished data alter with version " + std::to_string(alter_version) + " but current alter in queue is "
// + std::to_string(queue.front().alter_version),
// ErrorCodes::LOGICAL_ERROR);
//}
if (queue.front().state != AlterState::APPLY_DATA_CHANGES)
{
LOG_DEBUG(
log, "FINISHING DATA ALTER WITH VERSION:" << alter_version << " BUT STATE IS METADATA");
queue.front().state = AlterState::DATA_CHANGES_NOT_NEEDED;
return;
}
LOG_DEBUG(
log,
"FINISHING DATA ALTER WITH VERSION:" << alter_version << " QUEUE VERSION IS " << queue.front().alter_version << " STATE "
<< queue.front().state);
queue.pop_front();
}
bool canExecuteMetadataAlter(int alter_version, std::lock_guard<std::mutex> & /*state_lock*/) const
{
if (queue.empty())
throw Exception("QUEUE EMPTY ON METADATA", ErrorCodes::LOGICAL_ERROR);
LOG_DEBUG(log, "CHECK METADATADATA ALTER WITH VERSION:" << alter_version << " BUT QUEUE HEAD IS " << queue.front().alter_version);
//std::cerr << "Alter queue front:" << queue.front().alter_version << " state:" << queue.front().state << std::endl;
return queue.front().alter_version == alter_version;
LOG_DEBUG(log, "FINISH DATA ALTER: " << alter_version);
queue_state.erase(alter_version);
}
bool canExecuteDataAlter(int alter_version, std::lock_guard<std::mutex> & /*state_lock*/) const
{
if (queue.empty())
throw Exception("QUEUE EMPTY ON DATA", ErrorCodes::LOGICAL_ERROR);
//std::cerr << "Alter queue front:" << queue.front().alter_version << " state:" << queue.front().state << std::endl;
//std::cerr << "CAn execute:" << alter_version << std::endl;
//std::cerr << "FRont version:" << queue.front().alter_version << std::endl;
//std::cerr << "State:" << queue.front().state << std::endl;
LOG_DEBUG(log, "CHECK DATA ALTER WITH VERSION:" << alter_version << " BUT QUEUE HEAD IS " << queue.front().alter_version << " state:" << queue.front().state);
return queue.front().alter_version == alter_version && queue.front().state == AlterState::APPLY_DATA_CHANGES;
LOG_DEBUG(log, "Can execute data alter:" << alter_version);
for (auto [key, value] : queue_state)
{
LOG_DEBUG(log, "Key:" << key << " is metadata finished:" << value.metadata_finished);
}
return queue_state.at(alter_version).metadata_finished;
}
bool canExecuteMetaAlter(int alter_version, std::lock_guard<std::mutex> & /*state_lock*/) const
{
return queue_state.empty() || queue_state.begin()->first == alter_version;
}
};

View File

@ -397,6 +397,7 @@ void StorageReplicatedMergeTree::createNewZooKeeperNodes()
zookeeper->createIfNotExists(zookeeper_path + "/quorum", String());
zookeeper->createIfNotExists(zookeeper_path + "/quorum/last_part", String());
zookeeper->createIfNotExists(zookeeper_path + "/quorum/failed_parts", String());
zookeeper->createIfNotExists(zookeeper_path + "/alter_intention_counter", String());
/// Tracking lag of replicas.
zookeeper->createIfNotExists(replica_path + "/min_unprocessed_insert_time", String());
@ -3290,7 +3291,7 @@ void StorageReplicatedMergeTree::alter(
{
assertNotReadonly();
LOG_DEBUG(log, "Doing ALTER");
LOG_DEBUG(log, "Doing ALTER FROM " << metadata_version);
auto maybe_mutation_commands = params.getMutationCommands(getInMemoryMetadata());
auto table_id = getStorageID();
@ -3394,7 +3395,6 @@ void StorageReplicatedMergeTree::alter(
bool have_mutation = false;
std::optional<EphemeralLocksInAllPartitions> lock_holder;
size_t partitions_count = 0;
if (!maybe_mutation_commands.empty())
{
String mutations_path = zookeeper_path + "/mutations";
@ -3406,24 +3406,20 @@ void StorageReplicatedMergeTree::alter(
Coordination::Stat mutations_stat;
zookeeper->get(mutations_path, &mutations_stat);
Coordination::Stat intention_counter_stat;
zookeeper->get(zookeeper_path + "/alter_intention_counter", &intention_counter_stat);
lock_holder.emplace(
zookeeper_path + "/block_numbers", "block-", zookeeper_path + "/temp", *zookeeper);
zookeeper_path + "/block_numbers", "block-", zookeeper_path + "/temp", *zookeeper);
Coordination::Stat block_numbers_version;
zookeeper->get(zookeeper_path + "/block_numbers", &block_numbers_version);
ops.emplace_back(zkutil::makeCheckRequest(zookeeper_path + "/alter_intention_counter", intention_counter_stat.version));
for (const auto & lock : lock_holder->getLocks())
{
Coordination::Stat partition_stat;
mutation_entry.block_numbers[lock.partition_id] = lock.number;
zookeeper->get(zookeeper_path + "/block_numbers/" + lock.partition_id, &partition_stat);
ops.emplace_back(zkutil::makeCheckRequest(zookeeper_path + "/block_numbers/" + lock.partition_id, partition_stat.version));
partitions_count++;
LOG_DEBUG(log, "ALLOCATED:" << lock.number << " FOR VERSION:" << metadata_version + 1);
}
mutation_entry.create_time = time(nullptr);
/// We have to be sure, that no inserts happened
ops.emplace_back(zkutil::makeCheckRequest(zookeeper_path + "/block_numbers", block_numbers_version.version));
ops.emplace_back(zkutil::makeSetRequest(mutations_path, String(), mutations_stat.version));
ops.emplace_back(
@ -3435,6 +3431,8 @@ void StorageReplicatedMergeTree::alter(
Coordination::Responses results;
int32_t rc = zookeeper->tryMulti(ops, results);
LOG_DEBUG(log, "ALTER REQUESTED TO" << entry.alter_version);
//std::cerr << "Results size:" << results.size() << std::endl;
//std::cerr << "Have mutation:" << have_mutation << std::endl;
@ -3445,7 +3443,7 @@ void StorageReplicatedMergeTree::alter(
{
//std::cerr << "In have mutation\n";
//std::cerr << "INDEX:" << results.size() - 2 << std::endl;
String alter_path = dynamic_cast<const Coordination::CreateResponse &>(*results[results.size() - 4 - partitions_count]).path_created;
String alter_path = dynamic_cast<const Coordination::CreateResponse &>(*results[results.size() - 4]).path_created;
//std::cerr << "Alter path:" << alter_path << std::endl;
entry.znode_name = alter_path.substr(alter_path.find_last_of('/') + 1);
@ -3474,7 +3472,11 @@ void StorageReplicatedMergeTree::alter(
std::vector<String> unwaited;
//std::cerr << "Started wait for alter\n";
if (query_context.getSettingsRef().replication_alter_partitions_sync == 2)
{
LOG_DEBUG(log, "Start waiting for metadata alter");
unwaited = waitForAllReplicasToProcessLogEntry(entry, false);
LOG_DEBUG(log, "Finished waiting for metadata alter");
}
else if (query_context.getSettingsRef().replication_alter_partitions_sync == 1)
waitForReplicaToProcessLogEntry(replica_name, entry);
//std::cerr << "FInished wait for alter\n";
@ -3487,7 +3489,9 @@ void StorageReplicatedMergeTree::alter(
if (mutation_znode)
{
//std::cerr << "Started wait for mutation:" << *mutation_znode << std::endl;
LOG_DEBUG(log, "Start waiting for mutation");
waitMutation(*mutation_znode, query_context.getSettingsRef().replication_alter_partitions_sync);
LOG_DEBUG(log, "Finished waiting for mutation");
//std::cerr << "FInished wait for mutation\n";
}
}