Seems to work

This commit is contained in:
alesapin 2020-02-11 17:54:46 +03:00
parent 260a4687f0
commit 9e0d4b0bd4
6 changed files with 189 additions and 12 deletions

View File

@ -256,7 +256,7 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(zkutil::ZooKeeperPtr & zoo
zkutil::CreateMode::PersistentSequential));
ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/columns", storage.getMetadataVersion()));
ops.emplace_back(zkutil::makeSetRequest(storage.zookeeper_path + "/alter_intention_counter", "", -1));
ops.emplace_back(zkutil::makeSetRequest(storage.zookeeper_path + "/block_numbers/" + part->info.partition_id, "", -1));
/// Deletes the information that the block number is used for writing.
block_number_lock->getUnlockOps(ops);

View File

@ -287,6 +287,8 @@ void ReplicatedMergeTreeQueue::removePartFromMutations(const String & part_name)
for (auto it = from_it; it != in_partition->second.end(); ++it)
{
MutationStatus & status = *it->second;
LOG_DEBUG(log, "Removing part name:" << part_name << " from mutation:" << status.entry->znode_name);
status.parts_to_do.removePartAndCoveredParts(part_name);
if (status.parts_to_do.size() == 0)
some_mutations_are_probably_done = true;
@ -695,6 +697,7 @@ void ReplicatedMergeTreeQueue::updateMutations(zkutil::ZooKeeperPtr zookeeper, C
for (const ReplicatedMergeTreeMutationEntryPtr & entry : new_mutations)
{
LOG_DEBUG(log, "PROCESSING MUTATION:" << entry->znode_name);
auto & mutation = mutations_by_znode.emplace(entry->znode_name, MutationStatus(entry, format_version))
.first->second;
@ -972,6 +975,9 @@ bool ReplicatedMergeTreeQueue::addFuturePartIfNotCoveredByThem(const String & pa
{
std::lock_guard lock(state_mutex);
if (!alter_sequence.canExecuteGetEntry(part_name, format_version, lock))
return false;
if (isNotCoveredByFuturePartsImpl(part_name, reject_reason, lock))
{
CurrentlyExecuting::setActualPartName(entry, part_name, *this);
@ -991,7 +997,10 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
{
if (entry.type == LogEntry::GET_PART)
{
if (!alter_sequence.canExecuteGetEntry(entry.new_part_name, format_version, state_lock))
if (!entry.actual_new_part_name.empty() && !alter_sequence.canExecuteGetEntry(entry.actual_new_part_name, format_version, state_lock))
return false;
if (!entry.new_part_name.empty() && !alter_sequence.canExecuteGetEntry(entry.new_part_name, format_version, state_lock))
return false;
}

View File

@ -47,7 +47,10 @@ public:
void addMutationForAlter(int alter_version, const std::map<String, Int64> & block_numbers, std::lock_guard<std::mutex> & /*state_lock*/)
{
LOG_DEBUG(log, "Adding mutation with alter version:" << alter_version);
queue_state.emplace(alter_version, AlterInQueue(block_numbers, true));
if (queue_state.count(alter_version))
queue_state[alter_version].block_numbers = block_numbers;
else
queue_state.emplace(alter_version, AlterInQueue(block_numbers, true));
}
void addMetadataAlter(int alter_version, std::lock_guard<std::mutex> & /*state_lock*/)
@ -65,8 +68,19 @@ public:
return true;
MergeTreePartInfo info = MergeTreePartInfo::fromPartName(part_name, format_version);
if (queue_state.begin()->second.block_numbers.count(info.partition_id))
return info.getDataVersion() < queue_state.begin()->second.block_numbers.at(info.partition_id);
LOG_DEBUG(log, "Checking can fetch:" << part_name);
for (const auto & [block_number, state] : queue_state)
{
LOG_DEBUG(log, "Looking at block:" << block_number << " with part name:" << part_name);
if (state.block_numbers.count(info.partition_id))
{
LOG_DEBUG(log, "Block number:" << block_number << " has part name " << part_name << " version " << state.block_numbers.at(info.partition_id));
return info.getDataVersion() < state.block_numbers.at(info.partition_id);
}
}
LOG_DEBUG(log, "Nobody has block number for part " << part_name);
return true;
}
@ -117,6 +131,8 @@ public:
{
LOG_DEBUG(log, "Key:" << key << " is metadata finished:" << value.metadata_finished);
}
if (alter_version < queue_state.begin()->first)
return true;
return queue_state.at(alter_version).metadata_finished;
}
bool canExecuteMetaAlter(int alter_version, std::lock_guard<std::mutex> & /*state_lock*/) const

View File

@ -2265,6 +2265,7 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
LOG_DEBUG(log, "I'm not leader, I don't want to assign anything");
return;
}
LOG_DEBUG(log, "Merge selecting started");
const auto storage_settings_ptr = getSettings();
const bool deduplicate = false; /// TODO: read deduplicate option from table config
@ -2304,6 +2305,7 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
if (max_source_parts_size_for_merge > 0 &&
merger_mutator.selectPartsToMerge(future_merged_part, false, max_source_parts_size_for_merge, merge_pred))
{
LOG_DEBUG(log, "ASSIGNING MERGE");
success = createLogEntryToMergeParts(zookeeper, future_merged_part.parts,
future_merged_part.name, deduplicate, force_ttl);
}
@ -2337,6 +2339,10 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
}
}
}
else
{
LOG_DEBUG(log, "TOO MANY MUTATIONS IN QUEUE");
}
}
}
catch (...)
@ -3395,6 +3401,7 @@ void StorageReplicatedMergeTree::alter(
bool have_mutation = false;
std::optional<EphemeralLocksInAllPartitions> lock_holder;
size_t partitions_count = 0;
if (!maybe_mutation_commands.empty())
{
String mutations_path = zookeeper_path + "/mutations";
@ -3406,17 +3413,36 @@ void StorageReplicatedMergeTree::alter(
Coordination::Stat mutations_stat;
zookeeper->get(mutations_path, &mutations_stat);
Coordination::Stat intention_counter_stat;
zookeeper->get(zookeeper_path + "/alter_intention_counter", &intention_counter_stat);
//Coordination::Stat intention_counter_stat;
//zookeeper->get(zookeeper_path + "/alter_intention_counter", &intention_counter_stat);
Strings partitions = zookeeper->getChildren(zookeeper_path + "/block_numbers");
std::vector<std::future<Coordination::GetResponse>> partition_futures;
for (const String & partition : partitions)
partition_futures.push_back(zookeeper->asyncGet(zookeeper_path + "/block_numbers/" + partition));
std::unordered_map<String, int> partition_versions;
for (size_t i = 0; i < partition_futures.size(); ++i)
{
auto stat = partition_futures[i].get().stat;
auto partition = partitions[i];
partition_versions[partition] = stat.version;
LOG_DEBUG(log, "Partition version:" << partition << " stat version " << stat.version);
}
lock_holder.emplace(
zookeeper_path + "/block_numbers", "block-", zookeeper_path + "/temp", *zookeeper);
ops.emplace_back(zkutil::makeCheckRequest(zookeeper_path + "/alter_intention_counter", intention_counter_stat.version));
for (const auto & lock : lock_holder->getLocks())
{
mutation_entry.block_numbers[lock.partition_id] = lock.number;
LOG_DEBUG(log, "ALLOCATED:" << lock.number << " FOR VERSION:" << metadata_version + 1);
//ops.emplace_back(zkutil::makeCheckRequest(zookeeper_path + "/alter_intention_counter", intention_counter_stat.version));
ops.emplace_back(zkutil::makeCheckRequest(zookeeper_path + "/block_numbers/" + lock.partition_id, partition_versions[lock.partition_id]));
partitions_count++;
LOG_DEBUG(
log,
"ALLOCATED:" << lock.number << " FOR VERSION:" << metadata_version + 1
<< " Partition version:" << partition_versions[lock.partition_id] + 1 << " for partition " << lock.partition_id);
}
mutation_entry.create_time = time(nullptr);
@ -3431,7 +3457,7 @@ void StorageReplicatedMergeTree::alter(
Coordination::Responses results;
int32_t rc = zookeeper->tryMulti(ops, results);
LOG_DEBUG(log, "ALTER REQUESTED TO" << entry.alter_version);
LOG_DEBUG(log, "ALTER REQUESTED TO:" << entry.alter_version);
//std::cerr << "Results size:" << results.size() << std::endl;
//std::cerr << "Have mutation:" << have_mutation << std::endl;
@ -3443,7 +3469,7 @@ void StorageReplicatedMergeTree::alter(
{
//std::cerr << "In have mutation\n";
//std::cerr << "INDEX:" << results.size() - 2 << std::endl;
String alter_path = dynamic_cast<const Coordination::CreateResponse &>(*results[results.size() - 4]).path_created;
String alter_path = dynamic_cast<const Coordination::CreateResponse &>(*results[results.size() - 3 - partitions_count]).path_created;
//std::cerr << "Alter path:" << alter_path << std::endl;
entry.znode_name = alter_path.substr(alter_path.find_last_of('/') + 1);

View File

@ -0,0 +1,17 @@
1725
1725
1725
1725
1725
Starting alters
Finishing alters
1
0
1
0
1
0
1
0
1
0

View File

@ -0,0 +1,109 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
REPLICAS=5
for i in `seq $REPLICAS`; do
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS concurrent_alter_mt_$i"
done
for i in `seq $REPLICAS`; do
$CLICKHOUSE_CLIENT --query "CREATE TABLE concurrent_alter_mt_$i (key UInt64, value1 UInt64, value2 String) ENGINE = ReplicatedMergeTree('/clickhouse/tables/concurrent_alter_mt', '$i') ORDER BY key SETTINGS max_replicated_mutations_in_queue=1000, number_of_free_entries_in_pool_to_execute_mutation=0"
done
$CLICKHOUSE_CLIENT --query "INSERT INTO concurrent_alter_mt_1 SELECT number, number + 10, toString(number) from numbers(10)"
$CLICKHOUSE_CLIENT --query "INSERT INTO concurrent_alter_mt_1 SELECT number, number + 10, toString(number) from numbers(10, 40)"
for i in `seq $REPLICAS`; do
$CLICKHOUSE_CLIENT --query "SYSTEM SYNC REPLICA concurrent_alter_mt_$i"
done
for i in `seq $REPLICAS`; do
$CLICKHOUSE_CLIENT --query "SELECT SUM(value1) FROM concurrent_alter_mt_$i"
done
INITIAL_SUM=`$CLICKHOUSE_CLIENT --query "SELECT SUM(value1) FROM concurrent_alter_mt_1"`
# This is just garbage thread with conflictings alter
# it additionally loads alters "queue".
function garbage_alter_thread()
{
while true; do
REPLICA=$(($RANDOM % 5 + 1))
$CLICKHOUSE_CLIENT -n --query "ALTER TABLE concurrent_alter_mt_$REPLICA ADD COLUMN h String DEFAULT '0'; ALTER TABLE concurrent_alter_mt_$REPLICA MODIFY COLUMN h UInt64; ALTER TABLE concurrent_alter_mt_$REPLICA DROP COLUMN h;";
done
}
# This alters mostly requires not only metadata change
# but also conversion of data. Also they are all compatible
# between each other, so can be executed concurrently.
function correct_alter_thread()
{
TYPES=(Float64 String UInt8 UInt32)
while true; do
REPLICA=$(($RANDOM % 5 + 1))
TYPE=${TYPES[$RANDOM % ${#TYPES[@]} ]}
$CLICKHOUSE_CLIENT --query "ALTER TABLE concurrent_alter_mt_$REPLICA MODIFY COLUMN value1 $TYPE SETTINGS replication_alter_partitions_sync=0"; # additionaly we don't wait anything for more heavy concurrency
sleep 0.$RANDOM
done
}
# This thread add some data to table. After we finish we can check, that
# all our data have same types.
# insert queries will fail sometime because of wrong types.
function insert_thread()
{
VALUES=(7.0 7 '7')
while true; do
REPLICA=$(($RANDOM % 5 + 1))
VALUE=${VALUES[$RANDOM % ${#VALUES[@]} ]}
$CLICKHOUSE_CLIENT --query "INSERT INTO concurrent_alter_mt_$REPLICA VALUES($RANDOM, $VALUE, toString($VALUE))"
sleep 0.$RANDOM
done
}
echo "Starting alters"
export -f garbage_alter_thread;
export -f correct_alter_thread;
export -f insert_thread;
TIMEOUT=120
timeout $TIMEOUT bash -c correct_alter_thread 2> /dev/null &
timeout $TIMEOUT bash -c correct_alter_thread 2> /dev/null &
timeout $TIMEOUT bash -c correct_alter_thread 2> /dev/null &
timeout $TIMEOUT bash -c insert_thread 2> /dev/null &
timeout $TIMEOUT bash -c insert_thread 2> /dev/null &
timeout $TIMEOUT bash -c insert_thread 2> /dev/null &
timeout $TIMEOUT bash -c insert_thread 2> /dev/null &
timeout $TIMEOUT bash -c insert_thread 2> /dev/null &
timeout $TIMEOUT bash -c insert_thread 2> /dev/null &
timeout $TIMEOUT bash -c insert_thread 2> /dev/null &
timeout $TIMEOUT bash -c insert_thread 2> /dev/null &
timeout $TIMEOUT bash -c insert_thread 2> /dev/null &
wait
echo "Finishing alters"
# This alter will finish all previous
$CLICKHOUSE_CLIENT --query "ALTER TABLE concurrent_alter_mt_1 MODIFY COLUMN value1 String SETTINGS replication_alter_partitions_sync=2" 2>/dev/null
while [ $? -ne 0 ]; do
$CLICKHOUSE_CLIENT --query "ALTER TABLE concurrent_alter_mt_1 MODIFY COLUMN value1 String SETTINGS replication_alter_partitions_sync=2" 2>/dev/null
done
for i in `seq $REPLICAS`; do
$CLICKHOUSE_CLIENT --query "SELECT SUM(toUInt64(value1)) > $INITIAL_SUM FROM concurrent_alter_mt_$i"
$CLICKHOUSE_CLIENT --query "SELECT COUNT() FROM system.mutations WHERE is_done=0" # all mutations have to be done
#$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS concurrent_alter_mt_$i"
done