address the review comments

This commit is contained in:
Sema Checherinda 2022-09-14 15:04:24 +02:00
parent 8b87c3c251
commit 15012c7070
11 changed files with 409 additions and 179 deletions

View File

@ -210,22 +210,11 @@ BlockIO InterpreterDropQuery::executeToTableImpl(ContextPtr context_, ASTDropQue
table->checkTableCanBeDropped(); table->checkTableCanBeDropped();
ActionLock merges_blocker;
TableLockHolder table_shared_lock;
TableExclusiveLockHolder table_excl_lock; TableExclusiveLockHolder table_excl_lock;
/// We don't need any lock for ReplicatedMergeTree /// We don't need any lock for ReplicatedMergeTree and for simple MergeTree
if (!table->supportsReplication())
{
/// And for simple MergeTree we can stop merges before acquiring the lock
merges_blocker = table->getActionLock(ActionLocks::PartsMerge);
/// For the rest of tables types exclusive lock is needed /// For the rest of tables types exclusive lock is needed
auto merge_tree = std::dynamic_pointer_cast<MergeTreeData>(table); if (!table->supportsReplication() && !std::dynamic_pointer_cast<MergeTreeData>(table))
if (merge_tree)
table_shared_lock = table->lockForShare(context_->getCurrentQueryId(), context_->getSettingsRef().lock_acquire_timeout);
else
table_excl_lock = table->lockExclusively(context_->getCurrentQueryId(), context_->getSettingsRef().lock_acquire_timeout); table_excl_lock = table->lockExclusively(context_->getCurrentQueryId(), context_->getSettingsRef().lock_acquire_timeout);
}
auto metadata_snapshot = table->getInMemoryMetadataPtr(); auto metadata_snapshot = table->getInMemoryMetadataPtr();
/// Drop table data, don't touch metadata /// Drop table data, don't touch metadata
@ -481,13 +470,10 @@ bool InterpreterDropQuery::supportsTransactions() const
auto & drop = query_ptr->as<ASTDropQuery &>(); auto & drop = query_ptr->as<ASTDropQuery &>();
if (drop.cluster.empty() return drop.cluster.empty()
&& !drop.temporary && !drop.temporary
&& drop.kind == ASTDropQuery::Kind::Truncate && drop.kind == ASTDropQuery::Kind::Truncate
&& drop.table) && drop.table;
return true;
return false;
} }
} }

View File

@ -2116,12 +2116,11 @@ size_t MergeTreeData::clearEmptyParts()
if (!part->version.getCreationTID().isPrehistoric() && !part->version.isVisible(TransactionLog::instance().getLatestSnapshot())) if (!part->version.getCreationTID().isPrehistoric() && !part->version.isVisible(TransactionLog::instance().getLatestSnapshot()))
continue; continue;
/// Don't drop empty parts that cover other parts
/// Otherwise covered parts resurrect
{ {
auto lock = lockParts(); auto lock = lockParts();
DataPartsVector covered_parts = getCoveredOutdatedParts(part->info, lock); DataPartsVector covered_parts = getCoveredOutdatedParts(part->info, lock);
// Don't drop empty parts that cover other parts
// Otherwise covered parts resurrect
if (!covered_parts.empty()) if (!covered_parts.empty())
continue; continue;
} }
@ -2130,7 +2129,6 @@ size_t MergeTreeData::clearEmptyParts()
dropPartNoWaitNoThrow(part->name); dropPartNoWaitNoThrow(part->name);
++cleared_count; ++cleared_count;
} }
return cleared_count; return cleared_count;
} }
@ -2906,61 +2904,17 @@ MergeTreeData::PartsTemporaryRename::~PartsTemporaryRename()
} }
} }
MergeTreeData::DataPartsVector MergeTreeData::getCoveredOutdatedParts( void MergeTreeData::getPartHierarchy(
const MergeTreePartInfo & part_info, const MergeTreePartInfo & part_info,
DataPartsLock & /* data_parts_lock */) const const String & part_name,
{ DataPartState state,
auto it_middle = data_parts_by_state_and_info.lower_bound(DataPartStateAndInfo{DataPartState::Outdated, part_info});
auto outdated_parts_range = getDataPartsStateRange(DataPartState::Outdated);
/// Go to the left.
DataPartIteratorByStateAndInfo begin = it_middle;
while (begin != outdated_parts_range.begin())
{
auto prev = std::prev(begin);
if (!part_info.contains((*prev)->info))
{
if ((*prev)->info.contains(part_info))
return {};
break;
}
begin = prev;
}
/// Go to the right.
DataPartIteratorByStateAndInfo end = it_middle;
while (end != outdated_parts_range.end())
{
if ((*end)->info == part_info)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected duplicate part {}. It is a bug.", (*end)->getNameWithState());
if (!part_info.contains((*end)->info))
{
if ((*end)->info.contains(part_info))
return {};
break;
}
++end;
}
return DataPartsVector{begin, end};
}
MergeTreeData::DataPartsVector MergeTreeData::getActivePartsToReplace(
const MergeTreePartInfo & new_part_info,
const String & new_part_name,
DataPartPtr & out_covering_part, DataPartPtr & out_covering_part,
DataPartsVector & covered_part,
DataPartsLock & /* data_parts_lock */) const DataPartsLock & /* data_parts_lock */) const
{ {
/// Parts contained in the part are consecutive in data_parts, intersecting the insertion place for the part itself. /// Parts contained in the part are consecutive in data_parts, intersecting the insertion place for the part itself.
auto it_middle = data_parts_by_state_and_info.lower_bound(DataPartStateAndInfo{DataPartState::Active, new_part_info}); auto it_middle = data_parts_by_state_and_info.lower_bound(DataPartStateAndInfo{state, part_info});
auto committed_parts_range = getDataPartsStateRange(DataPartState::Active); auto committed_parts_range = getDataPartsStateRange(state);
/// Go to the left. /// Go to the left.
DataPartIteratorByStateAndInfo begin = it_middle; DataPartIteratorByStateAndInfo begin = it_middle;
@ -2968,17 +2922,17 @@ MergeTreeData::DataPartsVector MergeTreeData::getActivePartsToReplace(
{ {
auto prev = std::prev(begin); auto prev = std::prev(begin);
if (!new_part_info.contains((*prev)->info)) if (!part_info.contains((*prev)->info))
{ {
if ((*prev)->info.contains(new_part_info)) if ((*prev)->info.contains(part_info))
{ {
out_covering_part = *prev; out_covering_part = *prev;
return {}; return;
} }
if (!new_part_info.isDisjoint((*prev)->info)) if (!part_info.isDisjoint((*prev)->info))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} intersects previous part {}. It is a bug.", throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} intersects previous part {}. It is a bug.",
new_part_name, (*prev)->getNameWithState()); part_name, (*prev)->getNameWithState());
break; break;
} }
@ -2990,20 +2944,20 @@ MergeTreeData::DataPartsVector MergeTreeData::getActivePartsToReplace(
DataPartIteratorByStateAndInfo end = it_middle; DataPartIteratorByStateAndInfo end = it_middle;
while (end != committed_parts_range.end()) while (end != committed_parts_range.end())
{ {
if ((*end)->info == new_part_info) if ((*end)->info == part_info)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected duplicate part {}. It is a bug.", (*end)->getNameWithState()); throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected duplicate part {}. It is a bug.", (*end)->getNameWithState());
if (!new_part_info.contains((*end)->info)) if (!part_info.contains((*end)->info))
{ {
if ((*end)->info.contains(new_part_info)) if ((*end)->info.contains(part_info))
{ {
out_covering_part = *end; out_covering_part = *end;
return {}; return;
} }
if (!new_part_info.isDisjoint((*end)->info)) if (!part_info.isDisjoint((*end)->info))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} intersects next part {}. It is a bug.", throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} intersects next part {}. It is a bug.",
new_part_name, (*end)->getNameWithState()); part_name, (*end)->getNameWithState());
break; break;
} }
@ -3011,9 +2965,29 @@ MergeTreeData::DataPartsVector MergeTreeData::getActivePartsToReplace(
++end; ++end;
} }
return DataPartsVector{begin, end}; covered_part = DataPartsVector{begin, end};
} }
MergeTreeData::DataPartsVector MergeTreeData::getCoveredOutdatedParts(
const MergeTreePartInfo & part_info,
DataPartsLock & data_parts_lock) const
{
DataPartPtr covering_part;
DataPartsVector covered_parts;
getPartHierarchy(part_info, part_info.getPartName(), DataPartState::Outdated, covering_part, covered_parts, data_parts_lock);
return covered_parts;
}
MergeTreeData::DataPartsVector MergeTreeData::getActivePartsToReplace(
const MergeTreePartInfo & new_part_info,
const String & new_part_name,
DataPartPtr & out_covering_part,
DataPartsLock & data_parts_lock) const
{
DataPartsVector covered_parts;
getPartHierarchy(new_part_info, new_part_name, DataPartState::Active, out_covering_part, covered_parts, data_parts_lock);
return covered_parts;
}
bool MergeTreeData::renameTempPartAndAdd( bool MergeTreeData::renameTempPartAndAdd(
MutableDataPartPtr & part, MutableDataPartPtr & part,

View File

@ -1224,6 +1224,14 @@ protected:
const MergeTreePartInfo & part_info, const MergeTreePartInfo & part_info,
DataPartsLock & data_parts_lock) const; DataPartsLock & data_parts_lock) const;
void getPartHierarchy(
const MergeTreePartInfo & part_info,
const String & part_name,
DataPartState state,
DataPartPtr & out_covering_part,
DataPartsVector & covered_part,
DataPartsLock & /* data_parts_lock */) const;
/// Checks whether the column is in the primary key, possibly wrapped in a chain of functions with single argument. /// Checks whether the column is in the primary key, possibly wrapped in a chain of functions with single argument.
bool isPrimaryOrMinMaxKeyColumnPossiblyWrappedInFunctions(const ASTPtr & node, const StorageMetadataPtr & metadata_snapshot) const; bool isPrimaryOrMinMaxKeyColumnPossiblyWrappedInFunctions(const ASTPtr & node, const StorageMetadataPtr & metadata_snapshot) const;

View File

@ -53,7 +53,7 @@ namespace ErrorCodes
extern const int UNKNOWN_POLICY; extern const int UNKNOWN_POLICY;
extern const int NO_SUCH_DATA_PART; extern const int NO_SUCH_DATA_PART;
extern const int ABORTED; extern const int ABORTED;
extern const int RACE_PARTS_MODIFICATION; extern const int PART_IS_TEMPORARILY_LOCKED;
} }
namespace ActionLocks namespace ActionLocks
@ -1432,6 +1432,32 @@ void StorageMergeTree::dropPartNoWaitNoThrow(const String & part_name)
/// Else nothing to do, part was removed in some different way /// Else nothing to do, part was removed in some different way
} }
using RangesWithContinuousBlocks = std::vector<DataPartsVector>;
RangesWithContinuousBlocks groupByRangesWithContinuousBlocks(DataPartsVector parts)
{
RangesWithContinuousBlocks result;
std::sort(parts.begin(), parts.end(), MergeTreeData::LessDataPart());
for (auto part: parts)
{
if (result.empty())
{
result.push_back({part});
continue;
}
auto last_part_in_prev_range = result.back().back();
if (last_part_in_prev_range->info.max_block+1 == part->info.min_block)
result.back().push_back(part);
else
result.push_back({part});
}
return result;
}
struct PartitionInfo struct PartitionInfo
{ {
MergeTreePartition partition{}; MergeTreePartition partition{};
@ -1450,17 +1476,17 @@ PartitionInfos collectPartitionInfos(const DataPartsVector & parts)
for (const auto & part: parts) for (const auto & part: parts)
{ {
const MergeTreePartInfo & part_info = part->info; const MergeTreePartInfo & part_info = part->info;
auto partID = part_info.partition_id; auto partition_id = part_info.partition_id;
if (partition_infos.contains(partID)) if (partition_infos.contains(partition_id))
{ {
auto & partition_info = partition_infos[partID]; auto & partition_info = partition_infos[partition_id];
partition_info.min_block = std::min(partition_info.min_block, part_info.min_block); partition_info.min_block = std::min(partition_info.min_block, part_info.min_block);
partition_info.max_block = std::max(partition_info.max_block, part_info.max_block); partition_info.max_block = std::max(partition_info.max_block, part_info.max_block);
partition_info.max_level = std::max(partition_info.max_level, part_info.level); partition_info.max_level = std::max(partition_info.max_level, part_info.level);
partition_info.max_mutation = std::max(partition_info.max_mutation, part_info.mutation); partition_info.max_mutation = std::max(partition_info.max_mutation, part_info.mutation);
} }
else else
partition_infos[partID] = {part->partition, part_info.min_block, part_info.max_block, part_info.level, part_info.mutation}; partition_infos[partition_id] = {part->partition, part_info.min_block, part_info.max_block, part_info.level, part_info.mutation};
} }
return partition_infos; return partition_infos;
@ -1470,22 +1496,31 @@ MergeTreeData::MutableDataPartsVector StorageMergeTree::makeCoveringEmptyTmpPart
{ {
MutableDataPartsVector new_parts; MutableDataPartsVector new_parts;
auto partitions = collectPartitionInfos(parts); /// Parts could have gaps in continuous block numeration of parts due to concurrent insertions.
/// This gaps must not be covered with empty parts otherwise parts from concurrent insertions miss.
/// Here the parts are split on ranges without block gaps.
RangesWithContinuousBlocks continuous_ranges = groupByRangesWithContinuousBlocks(parts);
for (auto& range: continuous_ranges)
{
/// Inside each range new empty part could cover parts from min_block to max_block with respect of partitions
auto partitions = collectPartitionInfos(range);
for (const auto & it: partitions) for (const auto & it: partitions)
{ {
const auto & partID = it.first; const auto & partition_id = it.first;
const auto & info = it.second; const auto & info = it.second;
auto part_info = MergeTreePartInfo(partID, info.min_block, info.max_block, info.max_level+1, info.max_mutation); auto part_info = MergeTreePartInfo(partition_id, info.min_block, info.max_block, info.max_level+1, info.max_mutation);
auto data_part = createEmptyPart(part_info, info.partition, part_info.getPartName(), txn); auto data_part = createEmptyPart(part_info, info.partition, part_info.getPartName(), txn);
new_parts.push_back(data_part); new_parts.push_back(data_part);
} }
}
auto get_part_names = [](auto & parts_) -> Strings auto get_part_names = [] (auto & parts_) -> Strings
{ {
Strings part_names; Strings part_names;
for (const auto & p : parts_) for (const auto & p : parts_)
part_names.push_back(p->name); part_names.push_back(p->getNameWithState());
return part_names; return part_names;
}; };
@ -1509,7 +1544,7 @@ void StorageMergeTree::coverPartsWithEmptyParts(const DataPartsVector & old_part
changed_parts.push_back(part); changed_parts.push_back(part);
if (!changed_parts.empty()) if (!changed_parts.empty())
throw Exception(ErrorCodes::RACE_PARTS_MODIFICATION, throw Exception(ErrorCodes::PART_IS_TEMPORARILY_LOCKED,
"Race with concurrent query that modifies parts. {} parts have changed the status, first is {}. Try again later.", "Race with concurrent query that modifies parts. {} parts have changed the status, first is {}. Try again later.",
changed_parts.size(), changed_parts.front()->getNameWithState()); changed_parts.size(), changed_parts.front()->getNameWithState());
} }
@ -1550,8 +1585,6 @@ void StorageMergeTree::truncate(const ASTPtr &, const StorageMetadataPtr &, Cont
Stopwatch watch; Stopwatch watch;
/// It's important to create it outside of lock scope because
/// otherwise it can lock parts in destructor and deadlock is possible.
auto txn = query_context->getCurrentTransaction(); auto txn = query_context->getCurrentTransaction();
MergeTreeData::Transaction transaction(*this, txn.get()); MergeTreeData::Transaction transaction(*this, txn.get());
{ {
@ -1563,7 +1596,7 @@ void StorageMergeTree::truncate(const ASTPtr &, const StorageMetadataPtr &, Cont
PartLog::addNewParts(query_context, new_parts, watch.elapsed()); PartLog::addNewParts(query_context, new_parts, watch.elapsed());
LOG_INFO(log, "Truncated {} parts by replacing them with new empty {} parts.", LOG_INFO(log, "Truncated table with {} parts by replacing them with new empty {} parts.",
parts.size(), new_parts.size()); parts.size(), new_parts.size());
} }
@ -1596,18 +1629,17 @@ void StorageMergeTree::dropPart(const String & part_name, bool detach, ContextPt
part->makeCloneInDetached("", metadata_snapshot); part->makeCloneInDetached("", metadata_snapshot);
} }
{
MutableDataPartsVector new_parts = makeCoveringEmptyTmpParts({part}, txn); MutableDataPartsVector new_parts = makeCoveringEmptyTmpParts({part}, txn);
coverPartsWithEmptyParts({part}, new_parts, transaction); coverPartsWithEmptyParts({part}, new_parts, transaction);
PartLog::addNewParts(query_context, new_parts, watch.elapsed()); PartLog::addNewParts(query_context, new_parts, watch.elapsed());
if (detach) auto op = detach ? "Detached" : "Dropped";
LOG_INFO(log, "Detached {} part by replacing it with new empty {} part.", LOG_INFO(log, "{} {} part by replacing it with new empty {} part.",
part->name, new_parts[0]->name); op, part->name, new_parts[0]->name);
else }
LOG_INFO(log, "Dropped {} part by replacing it with new empty {} part.",
part->name, new_parts[0]->name);
} }
/// Old part objects is needed to be destroyed before clearing them from filesystem. /// Old part objects is needed to be destroyed before clearing them from filesystem.
@ -1615,9 +1647,9 @@ void StorageMergeTree::dropPart(const String & part_name, bool detach, ContextPt
clearOldPartsFromFilesystem(); clearOldPartsFromFilesystem();
} }
void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, ContextPtr query_context) void StorageMergeTree::dropPartition(const ASTPtr & query, bool detach, ContextPtr query_context)
{ {
const auto * partition_ast = partition->as<ASTPartition>(); const auto * partition_ast = query->as<ASTPartition>();
/// Asks to complete merges and does not allow them to start. /// Asks to complete merges and does not allow them to start.
/// This protects against "revival" of data for a removed partition after completion of merge. /// This protects against "revival" of data for a removed partition after completion of merge.
@ -1631,13 +1663,15 @@ void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, Cont
MergeTreeData::Transaction transaction(*this, txn.get()); MergeTreeData::Transaction transaction(*this, txn.get());
{ {
DataPartsVector parts; DataPartsVector parts;
{
if (partition_ast && partition_ast->all) if (partition_ast && partition_ast->all)
parts = getVisibleDataPartsVector(query_context); parts = getVisibleDataPartsVector(query_context);
else else
{ {
String partition_id = getPartitionIDFromQuery(partition, query_context); String partition_id = getPartitionIDFromQuery(query, query_context);
parts = getVisibleDataPartsVectorInPartition(query_context, partition_id); parts = getVisibleDataPartsVectorInPartition(query_context, partition_id);
} }
}
if (detach) if (detach)
for (auto part: parts) for (auto part: parts)
@ -1653,12 +1687,9 @@ void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, Cont
PartLog::addNewParts(query_context, new_parts, watch.elapsed()); PartLog::addNewParts(query_context, new_parts, watch.elapsed());
if (detach) auto op = detach ? "Detached" : "Dropped";
LOG_INFO(log, "Detached {} parts by replacing them with new empty {} parts.", LOG_INFO(log, "{} partition with {} parts by replacing them with new empty {} parts",
parts.size(), new_parts.size()); op, parts.size(), new_parts.size());
else
LOG_INFO(log, "Dropped {} parts by replacing them with new empty {} parts.",
parts.size(), new_parts.size());
} }
/// Old parts are needed to be destroyed before clearing them from filesystem. /// Old parts are needed to be destroyed before clearing them from filesystem.

View File

@ -21,13 +21,6 @@ node = cluster.add_instance(
) )
@pytest.fixture(scope="module")
def list_all_test_in_module(request):
items = request.session.items
all_tests_names = [item.name for item in items]
return all_tests_names
@pytest.fixture(scope="module", autouse=True) @pytest.fixture(scope="module", autouse=True)
def start_cluster(): def start_cluster():
try: try:
@ -145,9 +138,9 @@ def test_query_is_lock_free(lock_free_query, exclusive_table):
PERMANENT_QUERIES = { PERMANENT_QUERIES = {
"truncate": ("TRUNCATE TABLE {table};", 0), "truncate": ("TRUNCATE TABLE {table};", 0),
"detach partition all": ("ALTER TABLE {table} DETACH PARTITION ALL;", 0), "detach-partition-all": ("ALTER TABLE {table} DETACH PARTITION ALL;", 0),
"detach part": ("ALTER TABLE {table} DETACH PARTITION '20221001';", 49), "detach-part": ("ALTER TABLE {table} DETACH PARTITION '20221001';", 49),
"drop part": ("ALTER TABLE {table} DROP PART '20220901_1_1_0';", 49), "drop-part": ("ALTER TABLE {table} DROP PART '20220901_1_1_0';", 49),
} }

View File

@ -28,8 +28,9 @@
4 1 Commit 1 1 1 0 4 1 Commit 1 1 1 0
5 1 Begin 1 1 1 1 5 1 Begin 1 1 1 1
5 1 AddPart 1 1 1 1 all_5_5_0 5 1 AddPart 1 1 1 1 all_5_5_0
5 1 AddPart 1 1 1 1 all_1_5_1 5 1 AddPart 1 1 1 1 all_1_1_1
5 1 LockPart 1 1 1 1 all_1_1_0 5 1 LockPart 1 1 1 1 all_1_1_0
5 1 AddPart 1 1 1 1 all_3_5_1
5 1 LockPart 1 1 1 1 all_3_3_0 5 1 LockPart 1 1 1 1 all_3_3_0
5 1 LockPart 1 1 1 1 all_4_4_0 5 1 LockPart 1 1 1 1 all_4_4_0
5 1 LockPart 1 1 1 1 all_5_5_0 5 1 LockPart 1 1 1 1 all_5_5_0

View File

@ -0,0 +1,87 @@
#!/usr/bin/env bash
set -euo pipefail
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
${CLICKHOUSE_CLIENT} -q 'DROP TABLE IF EXISTS table_with_single_pk'
${CLICKHOUSE_CLIENT} -q '
CREATE TABLE table_with_single_pk
(
key UInt8,
value String
)
ENGINE = MergeTree
ORDER BY key
SETTINGS old_parts_lifetime = 1
'
${CLICKHOUSE_CLIENT} -q 'INSERT INTO table_with_single_pk SELECT number, toString(number % 10) FROM numbers(1000000)'
# Check NewPart
${CLICKHOUSE_CLIENT} -q 'SYSTEM FLUSH LOGS'
${CLICKHOUSE_CLIENT} -q "
WITH (
SELECT (event_time, event_time_microseconds)
FROM system.part_log
WHERE table = 'table_with_single_pk' AND database = currentDatabase() AND event_type = 'NewPart'
ORDER BY event_time DESC
LIMIT 1
) AS time
SELECT if(dateDiff('second', toDateTime(time.2), toDateTime(time.1)) = 0, 'ok', 'fail')"
# Now let's check RemovePart
${CLICKHOUSE_CLIENT} -q 'TRUNCATE TABLE table_with_single_pk'
# Wait until parts are removed
function get_inactive_parts_count() {
table_name=$1
${CLICKHOUSE_CLIENT} -q "
SELECT
count()
FROM
system.parts
WHERE
table = 'table_with_single_pk'
AND
active = 0
AND
database = '${CLICKHOUSE_DATABASE}'
"
}
function wait_table_inactive_parts_are_gone() {
table_name=$1
while true
do
count=$(get_inactive_parts_count $table_name)
if [[ count -gt 0 ]]
then
sleep 1
else
break
fi
done
}
export -f get_inactive_parts_count
export -f wait_table_inactive_parts_are_gone
timeout 60 bash -c 'wait_table_inactive_parts_are_gone table_with_single_pk'
${CLICKHOUSE_CLIENT} -q 'SYSTEM FLUSH LOGS;'
${CLICKHOUSE_CLIENT} -q "
WITH (
SELECT (event_time, event_time_microseconds)
FROM system.part_log
WHERE table = 'table_with_single_pk' AND database = currentDatabase() AND event_type = 'RemovePart'
ORDER BY event_time DESC
LIMIT 1
) AS time
SELECT if(dateDiff('second', toDateTime(time.2), toDateTime(time.1)) = 0, 'ok', 'fail')"
${CLICKHOUSE_CLIENT} -q 'DROP TABLE table_with_single_pk'

View File

@ -1,38 +0,0 @@
DROP TABLE IF EXISTS table_with_single_pk;
CREATE TABLE table_with_single_pk
(
key UInt8,
value String
)
ENGINE = MergeTree
ORDER BY key
SETTINGS old_parts_lifetime = 1;
INSERT INTO table_with_single_pk SELECT number, toString(number % 10) FROM numbers(1000000);
-- Check NewPart
SYSTEM FLUSH LOGS;
WITH (
SELECT (event_time, event_time_microseconds)
FROM system.part_log
WHERE table = 'table_with_single_pk' AND database = currentDatabase() AND event_type = 'NewPart'
ORDER BY event_time DESC
LIMIT 1
) AS time
SELECT if(dateDiff('second', toDateTime(time.2), toDateTime(time.1)) = 0, 'ok', 'fail');
-- Now let's check RemovePart
TRUNCATE TABLE table_with_single_pk;
SELECT sleepEachRow(1) from numbers(3);
SYSTEM FLUSH LOGS;
WITH (
SELECT (event_time, event_time_microseconds)
FROM system.part_log
WHERE table = 'table_with_single_pk' AND database = currentDatabase() AND event_type = 'RemovePart'
ORDER BY event_time DESC
LIMIT 1
) AS time
SELECT if(dateDiff('second', toDateTime(time.2), toDateTime(time.1)) = 0, 'ok', 'fail');
DROP TABLE table_with_single_pk;

View File

@ -0,0 +1,36 @@
concurrent_insert
2
all_1_4_1 0
all_5_5_0 1
all_6_6_1 0
concurrent_drop_part_before
PART_IS_TEMPORARILY_LOCKED
INVALID_TRANSACTION
1
3
all_1_1_0 1
all_2_2_1 0
all_3_3_0 1
concurrent_drop_part_after
NO_SUCH_DATA_PART
INVALID_TRANSACTION
all_1_3_1 0
concurrent_delete
tx41 41 3
tx41 41 3
PART_IS_TEMPORARILY_LOCKED
tx42 42 1
2
4
concurrent_delete_rollback
tx51 3
tx51 3
tx52 1
tx51 3
0
read_from_snapshot
tx61 3
tx61 3
tx62 0
tx61 3
0

View File

@ -0,0 +1,155 @@
#!/usr/bin/env bash
# Tags: no-fasttest, no-replicated-database, no-ordinary-database
set -e -o pipefail
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
# shellcheck source=./transactions.lib
. "$CURDIR"/transactions.lib
function reset_table()
{
table=${1:-"tt"}
$CLICKHOUSE_CLIENT -q "drop table if exists $table"
$CLICKHOUSE_CLIENT -q "create table $table (n int) engine=MergeTree order by tuple()"
$CLICKHOUSE_CLIENT -q "insert into $table values (1)" # inserts all_1_1_0
$CLICKHOUSE_CLIENT -q "insert into $table values (2)" # inserts all_2_2_0
$CLICKHOUSE_CLIENT -q "insert into $table values (3)" # inserts all_3_3_0
}
function concurrent_insert()
{
echo "concurrent_insert"
reset_table
tx 1 "begin transaction"
tx 2 "begin transaction"
tx 1 "insert into tt values (1)" # inserts all_4_4_0
tx 2 "insert into tt values (2)" # inserts all_5_5_0
tx 1 "insert into tt values (3)" # inserts all_6_6_0
tx 1 "truncate table tt" # creates all_1_4_1 all_6_6_1
tx 1 "commit"
tx 2 "commit"
$CLICKHOUSE_CLIENT -q "select n from tt order by n"
$CLICKHOUSE_CLIENT -q "select name, rows from system.parts
where table='tt' and database=currentDatabase() and active
order by name"
}
concurrent_insert
function concurrent_drop_part_before()
{
echo "concurrent_drop_part_before"
reset_table
tx 11 "begin transaction"
tx 22 "begin transaction"
tx 22 "alter table tt drop part 'all_2_2_0'"
tx 11 "truncate table tt" | grep -Eo "PART_IS_TEMPORARILY_LOCKED" | uniq
tx 11 "commit" | grep -Eo "INVALID_TRANSACTION" | uniq
tx 22 "commit"
$CLICKHOUSE_CLIENT -q "select n from tt order by n"
$CLICKHOUSE_CLIENT -q "select name, rows from system.parts
where table='tt' and database=currentDatabase() and active
order by name"
}
concurrent_drop_part_before
function concurrent_drop_part_after()
{
echo "concurrent_drop_part_after"
reset_table drop_part_after_table
tx 31 "begin transaction"
tx 32 "begin transaction"
tx 31 "truncate table drop_part_after_table"
tx 32 "alter table drop_part_after_table drop part 'all_2_2_0'" | grep -Eo "NO_SUCH_DATA_PART" | uniq
tx 31 "commit"
tx 32 "commit" | grep -Eo "INVALID_TRANSACTION" | uniq
$CLICKHOUSE_CLIENT -q "select n from drop_part_after_table order by n"
$CLICKHOUSE_CLIENT -q "select name, rows from system.parts
where table='drop_part_after_table' and database=currentDatabase() and active
order by name"
$CLICKHOUSE_CLIENT -q "select event_type, part_name from system.part_log
where table='drop_part_after_table' and database=currentDatabase()
order by part_name"
}
concurrent_drop_part_after
function concurrent_delete()
{
echo "concurrent_delete"
reset_table
tx 41 "begin transaction"
tx 41 "select 41, count() from tt"
tx 42 "begin transaction"
tx 42 "alter table tt delete where n%2=1"
tx 41 "select 41, count() from tt"
tx 41 "truncate table tt" | grep -Eo "PART_IS_TEMPORARILY_LOCKED" | uniq
tx 42 "select 42, count() from tt"
tx 41 "rollback"
tx 42 "insert into tt values (4)"
tx 42 "commit"
$CLICKHOUSE_CLIENT -q "select n from tt order by n"
}
concurrent_delete
function concurrent_delete_rollback()
{
echo "concurrent_delete_rollback"
reset_table
tx 51 "begin transaction"
tx 51 "select count() from tt"
tx 52 "begin transaction"
tx 52 "alter table tt delete where n%2=1"
tx 51 "select count() from tt"
tx 52 "select count() from tt"
tx 51 "select count() from tt"
tx 52 "rollback"
tx 51 "truncate table tt"
tx 51 "commit"
$CLICKHOUSE_CLIENT -q "select count() from tt"
}
concurrent_delete_rollback
function read_from_snapshot()
{
echo "read_from_snapshot"
reset_table
tx 61 "begin transaction"
tx 61 "select count() from tt"
tx 62 "begin transaction"
tx 62 "truncate table tt"
tx 61 "select count() from tt"
tx 62 "select count() from tt"
tx 62 "commit"
tx 61 "select count() from tt"
tx 61 "commit"
$CLICKHOUSE_CLIENT -q "select count() from tt"
}
read_from_snapshot