Debugging

This commit is contained in:
alesapin 2020-02-05 14:18:11 +03:00
parent c83387fbbc
commit f8f615dfdb
9 changed files with 253 additions and 71 deletions

View File

@ -87,7 +87,7 @@ private:
/// Acquires block number locks in all partitions.
class EphemeralLocksInAllPartitions
class EphemeralLocksInAllPartitions : private boost::noncopyable
{
public:
EphemeralLocksInAllPartitions(

View File

@ -255,6 +255,8 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo
log_entry.toString(),
zkutil::CreateMode::PersistentSequential));
ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/columns", storage.getMetadataVersion()));
/// Deletes the information that the block number is used for writing.
block_number_lock->getUnlockOps(ops);
@ -308,6 +310,7 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo
transaction.commit();
storage.merge_selecting_task->schedule();
LOG_DEBUG(log, "COMMITED INSERT WITH VERSION:" << storage.getMetadataVersion() << " of part " << part_name);
/// Lock nodes have been already deleted, do not delete them in destructor
block_number_lock->assumeUnlocked();
}

View File

@ -24,6 +24,7 @@ ReplicatedMergeTreeQueue::ReplicatedMergeTreeQueue(StorageReplicatedMergeTree &
, format_version(storage.format_version)
, current_parts(format_version)
, virtual_parts(format_version)
, alter_sequence(&Logger::get(storage_.getStorageID().table_name))
{}
@ -151,7 +152,7 @@ void ReplicatedMergeTreeQueue::insertUnlocked(
}
if (entry->type == LogEntry::ALTER_METADATA)
{
//std::cerr << "INSERT AlTER:" << entry->alter_version << std::endl;
LOG_DEBUG(log, "ADDING METADATA ENTRY WITH ALTER VERSION:" << entry->alter_version);
alter_sequence.addMetadataAlter(entry->alter_version, state_lock);
}
@ -162,6 +163,7 @@ void ReplicatedMergeTreeQueue::insertUnlocked(
}
if (entry->type == LogEntry::MUTATE_PART && entry->alter_version != -1)
{
//LOG_DEBUG(log, "ADDING DATA ENTRY WITH ALTER VERSION:" << entry->alter_version);
//std::cerr << "INSERT MUTATE PART:" << entry->alter_version << std::endl;
alter_sequence.addDataAlterIfEmpty(entry->alter_version, state_lock);
}
@ -217,12 +219,17 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
{
Strings replaced_parts;
current_parts.add(virtual_part_name, &replaced_parts);
virtual_parts.add(virtual_part_name, &replaced_parts);
LOG_DEBUG(log, "Replaced parts size for new part:" << virtual_part_name << " is " << replaced_parts.size());
/// Each part from `replaced_parts` should become Obsolete as a result of executing the entry.
/// So it is one less part to mutate for each mutation with block number greater than part_info.getDataVersion()
for (const String & replaced_part_name : replaced_parts)
{
LOG_DEBUG(log, "REMOVING REPLACED PART FROM MUTATIONS:" << replaced_part_name);
updateMutationsPartsToDo(replaced_part_name, /* add = */ false);
}
}
String drop_range_part_name;
if (entry->type == LogEntry::DROP_RANGE)
@ -240,6 +247,12 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
{
//std::cerr << "Alter have mutation:" << entry->have_mutation << std::endl;
alter_sequence.finishMetadataAlter(entry->alter_version, entry->have_mutation, state_lock);
LOG_DEBUG(log, "FIN ALTER FOR PART with ALTER VERSION:" << entry->alter_version);
}
if (entry->type == LogEntry::MUTATE_PART)
{
LOG_DEBUG(log, "FIN MUTATION FOR PART:" << entry->source_parts[0] << " with ALTER VERSION:" << entry->alter_version);
}
}
else
@ -248,6 +261,7 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
{
/// Because execution of the entry is unsuccessful, `virtual_part_name` will never appear
/// so we won't need to mutate it.
LOG_DEBUG(log, "REMOVING PART FROM MUTATIONS:" << virtual_part_name);
updateMutationsPartsToDo(virtual_part_name, /* add = */ false);
}
}
@ -256,20 +270,41 @@ void ReplicatedMergeTreeQueue::updateStateOnQueueEntryRemoval(
void ReplicatedMergeTreeQueue::updateMutationsPartsToDo(const String & part_name, bool add)
{
LOG_DEBUG(log, "Updating mutations parts to do with flag " << add << " and part " << part_name);
auto part_info = MergeTreePartInfo::fromPartName(part_name, format_version);
auto in_partition = mutations_by_partition.find(part_info.partition_id);
if (in_partition == mutations_by_partition.end())
{
LOG_DEBUG(log, "Not found partition in mutations for part:" << part_name);
return;
}
bool some_mutations_are_probably_done = false;
auto from_it = in_partition->second.upper_bound(part_info.getDataVersion());
if (from_it != in_partition->second.end())
{
LOG_DEBUG(log, "FIRST MUTATION FOR PART "<<part_name<<" IS:" << from_it->second->entry->znode_name);
}
else
{
LOG_DEBUG(log, "NO MUTATIONS FOUND FOR PART:" << part_name << " maximum block number is " << in_partition->second.rbegin()->first);
}
for (auto it = from_it; it != in_partition->second.end(); ++it)
{
MutationStatus & status = *it->second;
if (!add)
LOG_DEBUG(log, "DECREMENTING parts to do for mutation:" << status.entry->znode_name << " because of part " << part_name);
else
LOG_DEBUG(log, "INCREMENTING parts to do for mutation:" << status.entry->znode_name << " because of part " << part_name);
status.parts_to_do += (add ? +1 : -1);
LOG_DEBUG(log, "Parts left:" << status.parts_to_do << " for mutation " << status.entry->znode_name << " after " << part_name);
if (status.parts_to_do <= 0)
{
LOG_DEBUG(log, "ALL PARTS ARE LEFT FOR MUTATION:" << status.entry->znode_name << " because of part " << part_name);
some_mutations_are_probably_done = true;
}
if (!add && !status.latest_failed_part.empty() && part_info.contains(status.latest_failed_part_info))
{
@ -672,7 +707,13 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C
}
/// Initialize `mutation.parts_to_do`. First we need to mutate all parts in `current_parts`.
mutation.parts_to_do += getPartNamesToMutate(*entry, current_parts).size();
auto cur_parts = getPartNamesToMutate(*entry, current_parts);
mutation.parts_to_do += cur_parts.size();
String parts;
for (auto & part : cur_parts)
{
parts += part + ",";
}
/// And next we would need to mutate all parts with getDataVersion() greater than
/// mutation block number that would appear as a result of executing the queue.
@ -683,9 +724,17 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C
auto part_info = MergeTreePartInfo::fromPartName(produced_part_name, format_version);
auto it = entry->block_numbers.find(part_info.partition_id);
if (it != entry->block_numbers.end() && it->second > part_info.getDataVersion())
{
++mutation.parts_to_do;
parts += produced_part_name + ",";
}
}
}
LOG_DEBUG(
log,
"MUTATION ENTRY " << entry->znode_name << " blocks " << entry->block_numbers.begin()->second << " alter version "
<< entry->alter_version << " parts to do " << mutation.parts_to_do << " parts:" << parts);
if (mutation.parts_to_do == 0)
some_mutations_are_probably_done = true;
@ -1034,18 +1083,29 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
}
if (entry.type == LogEntry::ALTER_METADATA)
{
//std::cerr << "Should we execute alter:";
{ //std::cerr << "Should we execute alter:";
//std::cerr << alter_sequence.canExecuteMetadataAlter(entry.alter_version, state_lock) << std::endl;
return alter_sequence.canExecuteMetadataAlter(entry.alter_version, state_lock);
if (!alter_sequence.canExecuteMetadataAlter(entry.alter_version, state_lock) || queue.front()->znode_name != entry.znode_name)
{
out_postpone_reason = "Cannot execute alter metadata with version: " + std::to_string(entry.alter_version)
+ " because current head is " + std::to_string(alter_sequence.queue.front().alter_version)
+ " with state: " + std::to_string(alter_sequence.queue.front().state);
return false;
}
}
if (entry.type == LogEntry::MUTATE_PART && entry.alter_version != -1)
{
//std::cerr << "Should we execute mutation:";
//std::cerr << alter_sequence.canExecuteDataAlter(entry.alter_version, state_lock) << std::endl;
return alter_sequence.canExecuteDataAlter(entry.alter_version, state_lock);
if (!alter_sequence.canExecuteDataAlter(entry.alter_version, state_lock))
{
out_postpone_reason = "Cannot execute alter data with version: " + std::to_string(entry.alter_version)
+ " because current head is " + std::to_string(alter_sequence.queue.front().alter_version)
+ " with state: " + std::to_string(alter_sequence.queue.front().state);
return false;
}
}
return true;
@ -1362,10 +1422,13 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep
LOG_TRACE(log, "Mutation " << entry->znode_name << " is done");
it->second.is_done = true;
if (entry->alter_version != -1)
{
LOG_TRACE(log, "Finishing data alter with version " << entry->alter_version << " for entry " << entry->znode_name);
alter_sequence.finishDataAlter(entry->alter_version, lock);
}
}
}
}
return candidates.size() != finished.size();
}
@ -1467,7 +1530,7 @@ std::vector<MergeTreeMutationStatus> ReplicatedMergeTreeQueue::getMutationsStatu
{
const MutationStatus & status = pair.second;
const ReplicatedMergeTreeMutationEntry & entry = *status.entry;
const Names parts_to_mutate = getPartNamesToMutate(entry, current_parts);
const Names parts_to_mutate = getPartNamesToMutate(entry, virtual_parts);
for (const MutationCommand & command : entry.commands)
{
@ -1735,24 +1798,35 @@ std::optional<std::pair<Int64, int>> ReplicatedMergeTreeMergePredicate::getDesir
/// the part (checked by querying queue.virtual_parts), we can confidently assign a mutation to
/// version X for this part.
LOG_DEBUG(queue.log, "LOOKING for desired mutation version for part:" << part->name);
if (last_quorum_parts.find(part->name) != last_quorum_parts.end()
|| part->name == inprogress_quorum_part)
{
LOG_DEBUG(queue.log, "PART " << part->name <<" NAME NOT IN QUORUM");
return {};
}
std::lock_guard lock(queue.state_mutex);
if (queue.virtual_parts.getContainingPart(part->info) != part->name)
{
LOG_DEBUG(queue.log, "VIRTUAL PARTS HAVE CONTAINING PART " << part->name);
return {};
}
auto in_partition = queue.mutations_by_partition.find(part->info.partition_id);
if (in_partition == queue.mutations_by_partition.end())
{
LOG_DEBUG(queue.log, "NO PARTITION FOR MUTATION FOR PART " << part->name);
return {};
}
Int64 current_version = queue.getCurrentMutationVersionImpl(part->info.partition_id, part->info.getDataVersion(), lock);
Int64 max_version = in_partition->second.rbegin()->first;
if (current_version >= max_version)
{
LOG_DEBUG(queue.log, "PART VERSION FOR " << part->name << " IS BIGGER THAN MAX");
//std::cerr << "But current version is:" << current_version << std::endl;
return {};
}
@ -1772,6 +1846,12 @@ std::optional<std::pair<Int64, int>> ReplicatedMergeTreeMergePredicate::getDesir
}
}
//std::cerr << "FOUND alter version:" << alter_version << " and mutation znode name:" << version << std::endl;
if (current_version >= max_version)
{
LOG_DEBUG(queue.log, "PART VERSION FOR " << part->name << " IS BIGGER THAN MAX AFTER ALTER");
return {};
}
return std::make_pair(max_version, alter_version);
}

View File

@ -78,9 +78,6 @@ private:
time_t last_queue_update = 0;
/// This vector is used for sequential execution of alters
std::deque<String> alter_znodes_in_queue;
/// parts that will appear as a result of actions performed right now by background threads (these actions are not in the queue).
/// Used to block other actions on parts in the range covered by future_parts.
using FuturePartsSet = std::map<String, LogEntryPtr>;

View File

@ -3,6 +3,7 @@
#include <deque>
#include <Common/Exception.h>
#include <IO/ReadHelpers.h>
#include <common/logger_useful.h>
namespace DB
{
@ -19,6 +20,7 @@ private:
{
APPLY_METADATA_CHANGES,
APPLY_DATA_CHANGES,
DATA_CHANGES_NOT_NEEDED,
};
struct AlterInQueue
@ -32,9 +34,14 @@ private:
{
}
};
Poco::Logger * log;
public:
AlterSequence(Poco::Logger * log_)
: log(log_)
{
}
std::deque<AlterInQueue> queue;
bool empty() const {
@ -43,6 +50,10 @@ public:
void addMetadataAlter(int alter_version, std::lock_guard<std::mutex> & /*state_lock*/)
{
if (!queue.empty() && queue.front().alter_version > alter_version)
{
throw Exception("Alter not in order " + std::to_string(alter_version), ErrorCodes::LOGICAL_ERROR);
}
queue.emplace_back(alter_version, AlterState::APPLY_METADATA_CHANGES);
}
@ -62,20 +73,32 @@ public:
void finishMetadataAlter(int alter_version, bool have_data_alter, std::unique_lock <std::mutex> & /*state_lock*/)
{
if (queue.empty())
{
throw Exception("Queue shouldn't be empty on metadata alter", ErrorCodes::LOGICAL_ERROR);
}
LOG_DEBUG(
log,
"FINISHING METADATA ALTER WITH VERSION:" << alter_version << " AND HAVE DATA ALTER: " << have_data_alter
<< " QUEUE HEAD:" << queue.front().alter_version << " state:" << queue.front().state);
if (queue.front().alter_version != alter_version)
throw Exception("Finished metadata alter with version " + std::to_string(alter_version) + " but current alter in queue is " + std::to_string(queue.front().alter_version), ErrorCodes::LOGICAL_ERROR);
if (have_data_alter)
if (have_data_alter && queue.front().state == AlterState::APPLY_METADATA_CHANGES)
{
LOG_DEBUG(
log,
"FINISHING METADATA ALTER WITH VERSION:" << alter_version << " AND SWITCHING QUEUE STATE");
//std::cerr << "Switching head state:" << AlterState::APPLY_DATA_CHANGES << std::endl;
queue.front().state = AlterState::APPLY_DATA_CHANGES;
}
else
{
LOG_DEBUG(log, "FINISHING METADATA ALTER WITH VERSION:" << alter_version << " AND DOING POP");
//std::cerr << "JUST POP FRONT\n";
queue.pop_front();
}
@ -83,37 +106,70 @@ public:
void finishDataAlter(int alter_version, std::lock_guard<std::mutex> & /*state_lock*/)
{
/// queue can be empty after load of finished mutation without move of mutation pointer
if (queue.empty())
{
LOG_DEBUG(log, "FINISHING DATA ALTER WITH VERSION:" << alter_version << " BUT QUEUE EMPTY");
return;
}
//std::cerr << "Finishing data alter:" << alter_version << std::endl;
if (queue.front().alter_version != alter_version)
throw Exception(
"Finished data alter with version " + std::to_string(alter_version) + " but current alter in queue is "
+ std::to_string(queue.front().alter_version),
ErrorCodes::LOGICAL_ERROR);
{
for (auto & state : queue)
{
if (state.alter_version == alter_version)
{
LOG_DEBUG(log, "FINISHING DATA ALTER WITH VERSION:" << alter_version << " BUT HEAD IS NOT SAME SO MAKE DATA_CHANGED_NOT_NEEDED");
state.state = AlterState::DATA_CHANGES_NOT_NEEDED;
return;
}
}
}
//if (queue.front().alter_version != alter_version)
//{
// LOG_DEBUG(log, "FINISHING DATA ALTER WITH VERSION:" << alter_version << " BUT QUEUE VERSION IS " << queue.front().alter_version << " state " << queue.front().state);
// throw Exception(
// "Finished data alter with version " + std::to_string(alter_version) + " but current alter in queue is "
// + std::to_string(queue.front().alter_version),
// ErrorCodes::LOGICAL_ERROR);
//}
if (queue.front().state != AlterState::APPLY_DATA_CHANGES)
{
throw Exception(
"Finished data alter but current alter should perform metadata alter",
ErrorCodes::LOGICAL_ERROR);
LOG_DEBUG(
log, "FINISHING DATA ALTER WITH VERSION:" << alter_version << " BUT STATE IS METADATA");
queue.front().state = AlterState::DATA_CHANGES_NOT_NEEDED;
return;
}
LOG_DEBUG(
log,
"FINISHING DATA ALTER WITH VERSION:" << alter_version << " QUEUE VERSION IS " << queue.front().alter_version << " STATE "
<< queue.front().state);
queue.pop_front();
}
bool canExecuteMetadataAlter(int alter_version, std::lock_guard<std::mutex> & /*state_lock*/) const
{
if (queue.empty())
throw Exception("QUEUE EMPTY ON METADATA", ErrorCodes::LOGICAL_ERROR);
LOG_DEBUG(log, "CHECK METADATADATA ALTER WITH VERSION:" << alter_version << " BUT QUEUE HEAD IS " << queue.front().alter_version);
//std::cerr << "Alter queue front:" << queue.front().alter_version << " state:" << queue.front().state << std::endl;
return queue.front().alter_version == alter_version;
}
bool canExecuteDataAlter(int alter_version, std::lock_guard<std::mutex> & /*state_lock*/) const
{
if (queue.empty())
throw Exception("QUEUE EMPTY ON DATA", ErrorCodes::LOGICAL_ERROR);
//std::cerr << "Alter queue front:" << queue.front().alter_version << " state:" << queue.front().state << std::endl;
//std::cerr << "CAn execute:" << alter_version << std::endl;
//std::cerr << "FRont version:" << queue.front().alter_version << std::endl;
//std::cerr << "State:" << queue.front().state << std::endl;
LOG_DEBUG(log, "CHECK DATA ALTER WITH VERSION:" << alter_version << " BUT QUEUE HEAD IS " << queue.front().alter_version << " state:" << queue.front().state);
return queue.front().alter_version == alter_version && queue.front().state == AlterState::APPLY_DATA_CHANGES;
}

View File

@ -444,6 +444,7 @@ MergeTreeData::DataPart::Checksums checkDataPart(
const MergeTreeIndices & indices,
std::function<bool()> is_cancelled)
{
try {
return checkDataPart(
data_part->getFullPath(),
data_part->index_granularity,
@ -452,6 +453,11 @@ MergeTreeData::DataPart::Checksums checkDataPart(
primary_key_data_types,
indices,
is_cancelled);
}catch (...)
{
tryLogCurrentException("PartChecker");
std::terminate();
}
}

View File

@ -300,6 +300,10 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
createNewZooKeeperNodes();
Coordination::Stat metadata_stat;
current_zookeeper->get(zookeeper_path + "/columns", &metadata_stat);
metadata_version = metadata_stat.version;
other_replicas_fixed_granularity = checkFixedGranualrityInZookeeper();
}
@ -772,8 +776,16 @@ void StorageReplicatedMergeTree::checkPartChecksumsAndAddCommitOps(const zkutil:
if (part_name.empty())
part_name = part->name;
try
{
check(part->columns);
//int expected_columns_version = columns_version;
}
catch (...)
{
LOG_DEBUG(log, "EXCEPTION ADDING PART:" << part_name << " VERSION:" << getMetadataVersion());
throw;
}
auto local_part_header = ReplicatedMergeTreePartHeader::fromColumnsAndChecksums(
part->columns, part->checksums);
@ -3246,8 +3258,9 @@ bool StorageReplicatedMergeTree::executeMetadataAlter(const StorageReplicatedMer
LOG_INFO(log, "Metadata changed in ZooKeeper. Applying changes locally.");
setTableStructure(std::move(columns_from_entry), metadata_diff);
metadata_version = entry.alter_version;
LOG_INFO(log, "Applied changes to the metadata of the table.");
LOG_INFO(log, "Applied changes to the metadata of the table. Setting metadata version:" << metadata_version);
}
//////std::cerr << "Recalculating columns sizes\n";
@ -3313,23 +3326,17 @@ void StorageReplicatedMergeTree::alter(
Coordination::Stat metadata_stat;
String metadata_in_zk = zookeeper->get(zookeeper_path + "/metadata", &metadata_stat);
Coordination::Stat columns_stat;
String columns_in_zk = zookeeper->get(zookeeper_path + "/columns", &columns_stat);
StorageInMemoryMetadata metadata = getMetadataFromSharedZookeeper(metadata_in_zk, columns_in_zk);
params.validate(metadata, query_context);
StorageInMemoryMetadata metadata = getInMemoryMetadata();
params.apply(metadata);
String path_prefix = zookeeper_path + "/alters/alter-";
String path = zookeeper->create(path_prefix, "", zkutil::CreateMode::EphemeralSequential);
////std::cerr << "Path for alter:" << path << std::endl;
int alter_version = parse<int>(path.c_str() + path_prefix.size(), path.size() - path_prefix.size());
//int alter_version = columns_stat.version;
//std::cerr << "Alter version:" << alter_version << std::endl;
String new_columns_str = metadata.columns.toString();
if (new_columns_str != getColumns().toString())
ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/columns", new_columns_str, columns_stat.version));
ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/columns", new_columns_str, metadata_version));
ReplicatedMergeTreeTableMetadata new_metadata(*this);
if (ast_to_str(metadata.order_by_ast) != ast_to_str(order_by_ast))
@ -3347,14 +3354,8 @@ void StorageReplicatedMergeTree::alter(
new_metadata.constraints = new_constraints_str;
String new_metadata_str = new_metadata.toString();
if (new_metadata_str != ReplicatedMergeTreeTableMetadata(*this).toString())
ops.emplace_back(zkutil::makeSetRequest(zookeeper_path + "/metadata", new_metadata_str, metadata_stat.version));
if (ops.empty())
{
//std::cerr << "Alter doesn't change anything\n";
return;
}
/// Perform settings update locally
auto old_metadata = getInMemoryMetadata();
old_metadata.settings_ast = metadata.settings_ast;
@ -3365,7 +3366,7 @@ void StorageReplicatedMergeTree::alter(
entry.source_replica = replica_name;
entry.metadata_str = new_metadata_str;
entry.columns_str = new_columns_str;
entry.alter_version = alter_version;
entry.alter_version = metadata_version + 1;
entry.create_time = time(nullptr);
if (!maybe_mutation_commands.empty())
@ -3377,9 +3378,42 @@ void StorageReplicatedMergeTree::alter(
bool have_mutation = false;
std::optional<EphemeralLocksInAllPartitions> lock_holder;
size_t partitions_count = 0;
if (!maybe_mutation_commands.empty())
{
prepareMutationEntry(zookeeper, maybe_mutation_commands, ops, alter_version);
String mutations_path = zookeeper_path + "/mutations";
ReplicatedMergeTreeMutationEntry mutation_entry;
mutation_entry.source_replica = replica_name;
mutation_entry.commands = maybe_mutation_commands;
mutation_entry.alter_version = metadata_version + 1;
Coordination::Stat mutations_stat;
zookeeper->get(mutations_path, &mutations_stat);
lock_holder.emplace(
zookeeper_path + "/block_numbers", "block-", zookeeper_path + "/temp", *zookeeper);
Coordination::Stat block_numbers_version;
zookeeper->get(zookeeper_path + "/block_numbers", &block_numbers_version);
for (const auto & lock : lock_holder->getLocks())
{
Coordination::Stat partition_stat;
mutation_entry.block_numbers[lock.partition_id] = lock.number;
zookeeper->get(zookeeper_path + "/block_numbers/" + lock.partition_id, &partition_stat);
ops.emplace_back(zkutil::makeCheckRequest(zookeeper_path + "/block_numbers/" + lock.partition_id, partition_stat.version));
partitions_count++;
}
mutation_entry.create_time = time(nullptr);
/// We have to be sure, that no inserts happened
ops.emplace_back(zkutil::makeCheckRequest(zookeeper_path + "/block_numbers", block_numbers_version.version));
ops.emplace_back(zkutil::makeSetRequest(mutations_path, String(), mutations_stat.version));
ops.emplace_back(
zkutil::makeCreateRequest(mutations_path + "/", mutation_entry.toString(), zkutil::CreateMode::PersistentSequential));
have_mutation = true;
}
@ -3396,7 +3430,7 @@ void StorageReplicatedMergeTree::alter(
{
//std::cerr << "In have mutation\n";
//std::cerr << "INDEX:" << results.size() - 2 << std::endl;
String alter_path = dynamic_cast<const Coordination::CreateResponse &>(*results[results.size() - 3]).path_created;
String alter_path = dynamic_cast<const Coordination::CreateResponse &>(*results[results.size() - 4 - partitions_count]).path_created;
//std::cerr << "Alter path:" << alter_path << std::endl;
entry.znode_name = alter_path.substr(alter_path.find_last_of('/') + 1);
@ -3422,8 +3456,12 @@ void StorageReplicatedMergeTree::alter(
table_lock_holder.release();
std::vector<String> unwaited;
//std::cerr << "Started wait for alter\n";
auto unwaited = waitForAllReplicasToProcessLogEntry(entry, false);
if (query_context.getSettingsRef().replication_alter_partitions_sync == 2)
unwaited = waitForAllReplicasToProcessLogEntry(entry, false);
else if (query_context.getSettingsRef().replication_alter_partitions_sync == 1)
waitForReplicaToProcessLogEntry(replica_name, entry);
//std::cerr << "FInished wait for alter\n";
if (!unwaited.empty())

View File

@ -180,6 +180,10 @@ public:
/// Checks ability to use granularity
bool canUseAdaptiveGranularity() const override;
int getMetadataVersion() const {
return metadata_version;
}
private:
/// Get a sequential consistent view of current parts.
@ -255,6 +259,7 @@ private:
/// Limiting parallel fetches per one table
std::atomic_uint current_table_fetches {0};
int metadata_version = 0;
/// Threads.
/// A task that keeps track of the updates in the logs of all replicas and loads them into the queue.

View File

@ -3,23 +3,24 @@
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
REPLICAS=5
for i in {1..2}; do
for i in `seq $REPLICAS`; do
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS concurrent_alter_mt_$i"
done
for i in {1..2}; do
for i in `seq $REPLICAS`; do
$CLICKHOUSE_CLIENT --query "CREATE TABLE concurrent_alter_mt_$i (key UInt64, value1 UInt64, value2 String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/concurrent_alter_mt', '$i') ORDER BY key"
done
$CLICKHOUSE_CLIENT --query "INSERT INTO concurrent_alter_mt_1 SELECT number, number + 10, toString(number) from numbers(10)"
$CLICKHOUSE_CLIENT --query "INSERT INTO concurrent_alter_mt_1 SELECT number, number + 10, toString(number) from numbers(10, 40)"
for i in {1..2}; do
for i in `seq $REPLICAS`; do
$CLICKHOUSE_CLIENT --query "SYSTEM SYNC REPLICA concurrent_alter_mt_$i"
done
for i in {1..2}; do
for i in `seq $REPLICAS`; do
$CLICKHOUSE_CLIENT --query "SELECT SUM(value1) FROM concurrent_alter_mt_$i"
done
@ -30,7 +31,7 @@ INITIAL_SUM=`$CLICKHOUSE_CLIENT --query "SELECT SUM(value1) FROM concurrent_alte
function garbage_alter_thread()
{
while true; do
REPLICA=$(($RANDOM % 2 + 1))
REPLICA=$(($RANDOM % 5 + 1))
$CLICKHOUSE_CLIENT -n --query "ALTER TABLE concurrent_alter_mt_$REPLICA ADD COLUMN h String DEFAULT '0'; ALTER TABLE concurrent_alter_mt_$REPLICA MODIFY COLUMN h UInt64; ALTER TABLE concurrent_alter_mt_$REPLICA DROP COLUMN h;";
done
}
@ -43,7 +44,7 @@ function correct_alter_thread()
{
TYPES=(Float64 String UInt8 UInt32)
while true; do
REPLICA=$(($RANDOM % 2 + 1))
REPLICA=$(($RANDOM % 5 + 1))
TYPE=${TYPES[$RANDOM % ${#TYPES[@]} ]}
$CLICKHOUSE_CLIENT --query "ALTER TABLE concurrent_alter_mt_$REPLICA MODIFY COLUMN value1 $TYPE SETTINGS replication_alter_partitions_sync=0"; # additionaly we don't wait anything for more heavy concurrency
sleep 0.$RANDOM
@ -58,7 +59,7 @@ function insert_thread()
VALUES=(7.0 7 '7')
while true; do
REPLICA=$(($RANDOM % 2 + 1))
REPLICA=$(($RANDOM % 5 + 1))
VALUE=${VALUES[$RANDOM % ${#VALUES[@]} ]}
$CLICKHOUSE_CLIENT --query "INSERT INTO concurrent_alter_mt_$REPLICA VALUES($RANDOM, $VALUE, toString($VALUE))"
sleep 0.$RANDOM
@ -71,26 +72,21 @@ export -f garbage_alter_thread;
export -f correct_alter_thread;
export -f insert_thread;
TIMEOUT=10
TIMEOUT=30
timeout $TIMEOUT bash -c garbage_alter_thread 2> /dev/null &
timeout $TIMEOUT bash -c garbage_alter_thread 2> /dev/null &
timeout $TIMEOUT bash -c garbage_alter_thread 2> /dev/null &
#timeout $TIMEOUT bash -c garbage_alter_thread 2> /dev/null &
#timeout $TIMEOUT bash -c garbage_alter_thread 2> /dev/null &
timeout $TIMEOUT bash -c correct_alter_thread 2> /dev/null &
timeout $TIMEOUT bash -c correct_alter_thread 2> /dev/null &
timeout $TIMEOUT bash -c correct_alter_thread 2> /dev/null &
#timeout $TIMEOUT bash -c correct_alter_thread 2> /dev/null &
#timeout $TIMEOUT bash -c correct_alter_thread 2> /dev/null &
# We don't want too many parts, just several alters per second
timeout $TIMEOUT bash -c insert_thread 2> /dev/null &
timeout $TIMEOUT bash -c insert_thread 2> /dev/null &
timeout $TIMEOUT bash -c insert_thread 2> /dev/null &
#timeout $TIMEOUT bash -c insert_thread 2> /dev/null &
#timeout $TIMEOUT bash -c insert_thread 2> /dev/null &
timeout $TIMEOUT bash -c insert_thread 2> /dev/null &
timeout $TIMEOUT bash -c insert_thread 2> /dev/null &
timeout $TIMEOUT bash -c insert_thread 2> /dev/null &
timeout $TIMEOUT bash -c insert_thread 2> /dev/null &
timeout $TIMEOUT bash -c insert_thread 2> /dev/null &
timeout $TIMEOUT bash -c insert_thread 2> /dev/null &
wait
@ -99,11 +95,12 @@ echo "Finishing alters"
# This alter will finish all previous
$CLICKHOUSE_CLIENT --query "ALTER TABLE concurrent_alter_mt_1 MODIFY COLUMN value1 String"
for i in {1..2}; do
for i in `seq $REPLICAS`; do
$CLICKHOUSE_CLIENT --query "SYSTEM SYNC REPLICA concurrent_alter_mt_$i"
done
for i in {1..2}; do
for i in `seq $REPLICAS`; do
$CLICKHOUSE_CLIENT --query "SELECT SUM(toUInt64(value1)) > $INITIAL_SUM FROM concurrent_alter_mt_$i"
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS concurrent_alter_mt_$i"
done