less pedantic checks in operations, rely on existed tx functionality

This commit is contained in:
Sema Checherinda 2022-09-29 01:46:21 +02:00
parent a4d1adf945
commit c976b28104
11 changed files with 361 additions and 235 deletions

View File

@ -168,6 +168,7 @@ namespace ErrorCodes
extern const int INCORRECT_QUERY;
extern const int CANNOT_RESTORE_TABLE;
extern const int ZERO_COPY_REPLICATION_ERROR;
extern const int SERIALIZATION_ERROR;
}
@ -1693,15 +1694,6 @@ scope_guard MergeTreeData::getTemporaryPartDirectoryHolder(const String & part_d
return [this, part_dir_name]() { temporary_parts.remove(part_dir_name); };
}
std::optional<scope_guard> MergeTreeData::tryGetTemporaryPartDirectoryHolder(const String & part_dir_name)
{
bool inserted = temporary_parts.add(part_dir_name);
if (!inserted)
return {};
return [this, part_dir_name]() { temporary_parts.remove(part_dir_name); };
}
MergeTreeData::MutableDataPartPtr MergeTreeData::preparePartForRemoval(const DataPartPtr & part)
{
auto state = part->getState();
@ -1767,6 +1759,8 @@ MergeTreeData::DataPartsVector MergeTreeData::grabOldParts(bool force)
const DataPartPtr & part = *it;
/// Do not remove outdated part if it may be visible for some transaction
LOG_TRACE(log, "grabOldParts: check {}.", part->name);
if (!part->version.canBeRemoved())
{
skipped_parts.push_back(part->info);
@ -2140,7 +2134,7 @@ size_t MergeTreeData::clearEmptyParts()
if (part->getState() != DataPartState::Active)
continue;
DataPartsVector covered_parts = getCoveredOutdatedParts(part->info, lock);
DataPartsVector covered_parts = getCoveredOutdatedParts(part, lock);
if (!covered_parts.empty())
continue;
}
@ -2924,14 +2918,13 @@ MergeTreeData::PartsTemporaryRename::~PartsTemporaryRename()
}
}
void MergeTreeData::getPartHierarchy(
MergeTreeData::PartHierarchy MergeTreeData::getPartHierarchy(
const MergeTreePartInfo & part_info,
const String & part_name,
DataPartState state,
DataPartPtr & out_covering_part,
DataPartsVector & covered_part,
DataPartsLock & /* data_parts_lock */) const
{
PartHierarchy result;
/// Parts contained in the part are consecutive in data_parts, intersecting the insertion place for the part itself.
auto it_middle = data_parts_by_state_and_info.lower_bound(DataPartStateAndInfo{state, part_info});
auto committed_parts_range = getDataPartsStateRange(state);
@ -2946,13 +2939,15 @@ void MergeTreeData::getPartHierarchy(
{
if ((*prev)->info.contains(part_info))
{
out_covering_part = *prev;
return;
result.covering_part = *prev;
return result;
}
if (!part_info.isDisjoint((*prev)->info))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} intersects previous part {}. It is a bug.",
part_name, (*prev)->getNameWithState());
{
result.intersected_part = *prev;
return result;
}
break;
}
@ -2965,19 +2960,24 @@ void MergeTreeData::getPartHierarchy(
while (end != committed_parts_range.end())
{
if ((*end)->info == part_info)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected duplicate part {}. It is a bug.", (*end)->getNameWithState());
{
result.duplicate_part = *end;
return result;
}
if (!part_info.contains((*end)->info))
{
if ((*end)->info.contains(part_info))
{
out_covering_part = *end;
return;
result.covering_part = *end;
return result;
}
if (!part_info.isDisjoint((*end)->info))
throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} intersects next part {}. It is a bug.",
part_name, (*end)->getNameWithState());
{
result.intersected_part = *end;
return result;
}
break;
}
@ -2985,17 +2985,17 @@ void MergeTreeData::getPartHierarchy(
++end;
}
covered_part = DataPartsVector{begin, end};
result.covered_parts = DataPartsVector{begin, end};
return result;
}
MergeTreeData::DataPartsVector MergeTreeData::getCoveredOutdatedParts(
const MergeTreePartInfo & part_info,
const DataPartPtr & part,
DataPartsLock & data_parts_lock) const
{
DataPartPtr covering_part;
DataPartsVector covered_parts;
getPartHierarchy(part_info, part_info.getPartName(), DataPartState::Outdated, covering_part, covered_parts, data_parts_lock);
return covered_parts;
assert(part->getState() == DataPartState::Active);
PartHierarchy hierarchy = getPartHierarchy(part->info, DataPartState::Outdated, data_parts_lock);
return hierarchy.covered_parts;
}
MergeTreeData::DataPartsVector MergeTreeData::getActivePartsToReplace(
@ -3004,32 +3004,22 @@ MergeTreeData::DataPartsVector MergeTreeData::getActivePartsToReplace(
DataPartPtr & out_covering_part,
DataPartsLock & data_parts_lock) const
{
DataPartsVector covered_parts;
getPartHierarchy(new_part_info, new_part_name, DataPartState::Active, out_covering_part, covered_parts, data_parts_lock);
return covered_parts;
PartHierarchy hierarchy = getPartHierarchy(new_part_info, DataPartState::Active, data_parts_lock);
if (hierarchy.intersected_part)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} intersects next part {}. It is a bug.",
new_part_name, hierarchy.intersected_part->getNameWithState());
if (hierarchy.duplicate_part)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected duplicate part {}. It is a bug.", hierarchy.duplicate_part->getNameWithState());
out_covering_part = std::move(hierarchy.covering_part);
return std::move(hierarchy.covered_parts);
}
bool MergeTreeData::renameTempPartAndAdd(
MutableDataPartPtr & part,
Transaction & out_transaction,
DataPartsLock & lock)
void MergeTreeData::checkPartPartition(MutableDataPartPtr & part, DataPartsLock & lock) const
{
DataPartsVector covered_parts;
if (!renameTempPartAndReplaceImpl(part, out_transaction, lock, &covered_parts))
return false;
if (!covered_parts.empty())
throw Exception("Added part " + part->name + " covers " + toString(covered_parts.size())
+ " existing part(s) (including " + covered_parts[0]->name + ")", ErrorCodes::LOGICAL_ERROR);
return true;
}
void MergeTreeData::checkPartCanBeAddedToTable(MutableDataPartPtr & part, DataPartsLock & lock) const
{
part->assertState({DataPartState::Temporary});
if (DataPartPtr existing_part_in_partition = getAnyPartInPartition(part->info.partition_id, lock))
{
if (part->partition.value != existing_part_in_partition->partition.value)
@ -3038,15 +3028,23 @@ void MergeTreeData::checkPartCanBeAddedToTable(MutableDataPartPtr & part, DataPa
+ existing_part_in_partition->name + ", newly added part: " + part->name,
ErrorCodes::CORRUPTED_DATA);
}
}
if (auto it_duplicate = data_parts_by_info.find(part->info); it_duplicate != data_parts_by_info.end())
void MergeTreeData::checkPartDuplicate(MutableDataPartPtr & part, Transaction & transaction, DataPartsLock & /*lock*/) const
{
auto it_duplicate = data_parts_by_info.find(part->info);
if (it_duplicate != data_parts_by_info.end())
{
String message = "Part " + (*it_duplicate)->getNameWithState() + " already exists";
if ((*it_duplicate)->checkState({DataPartState::Outdated, DataPartState::Deleting}))
throw Exception(message + ", but it will be deleted soon", ErrorCodes::PART_IS_TEMPORARILY_LOCKED);
throw Exception(message, ErrorCodes::DUPLICATE_DATA_PART);
if (transaction.txn)
throw Exception(message, ErrorCodes::SERIALIZATION_ERROR);
else
throw Exception(message, ErrorCodes::DUPLICATE_DATA_PART);
}
}
@ -3074,36 +3072,45 @@ bool MergeTreeData::renameTempPartAndReplaceImpl(
DataPartsLock & lock,
DataPartsVector * out_covered_parts)
{
LOG_TRACE(log, "Renaming temporary part {} to {}.", part->getDataPartStorage().getPartDirectory(), part->name);
LOG_TRACE(log, "Renaming temporary part {} to {} with tid {}.", part->getDataPartStorage().getPartDirectory(), part->name, out_transaction.getTID());
if (&out_transaction.data != this)
throw Exception("MergeTreeData::Transaction for one table cannot be used with another. It is a bug.",
ErrorCodes::LOGICAL_ERROR);
if (part->hasLightweightDelete())
has_lightweight_delete_parts.store(true);
part->assertState({DataPartState::Temporary});
checkPartPartition(part, lock);
checkPartDuplicate(part, out_transaction, lock);
checkPartCanBeAddedToTable(part, lock);
PartHierarchy hierarchy = getPartHierarchy(part->info, DataPartState::Active, lock);
DataPartPtr covering_part;
DataPartsVector covered_parts = getActivePartsToReplace(part->info, part->name, covering_part, lock);
if (covering_part)
if (hierarchy.intersected_part)
{
LOG_WARNING(log, "Tried to add obsolete part {} covered by {}", part->name, covering_part->getNameWithState());
String message = fmt::format("Part {} intersects next part {}", part->name, hierarchy.intersected_part->getNameWithState());
if (part->isEmpty() || hierarchy.intersected_part->isEmpty())
throw Exception(message, ErrorCodes::SERIALIZATION_ERROR);
throw Exception(ErrorCodes::LOGICAL_ERROR, message + " It is a bug.");
}
if (hierarchy.covering_part)
{
LOG_WARNING(log, "Tried to add obsolete part {} covered by {}", part->name, hierarchy.covering_part->getNameWithState());
return false;
}
if (part->hasLightweightDelete())
has_lightweight_delete_parts.store(true);
/// All checks are passed. Now we can rename the part on disk.
/// So, we maintain invariant: if a non-temporary part in filesystem then it is in data_parts
preparePartForCommit(part, out_transaction);
if (out_covered_parts)
{
out_covered_parts->reserve(out_covered_parts->size() + covered_parts.size());
for (DataPartPtr & covered_part : covered_parts)
out_covered_parts->emplace_back(std::move(covered_part));
out_covered_parts->reserve(out_covered_parts->size() + hierarchy.covered_parts.size());
std::move(hierarchy.covered_parts.begin(), hierarchy.covered_parts.end(), std::back_inserter(*out_covered_parts));
}
return true;
@ -3128,6 +3135,23 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
return covered_parts;
}
bool MergeTreeData::renameTempPartAndAdd(
MutableDataPartPtr & part,
Transaction & out_transaction,
DataPartsLock & lock)
{
DataPartsVector covered_parts;
if (!renameTempPartAndReplaceImpl(part, out_transaction, lock, &covered_parts))
return false;
if (!covered_parts.empty())
throw Exception("Added part " + part->name + " covers " + toString(covered_parts.size())
+ " existing part(s) (including " + covered_parts[0]->name + ")", ErrorCodes::LOGICAL_ERROR);
return true;
}
void MergeTreeData::removePartsFromWorkingSet(MergeTreeTransaction * txn, const MergeTreeData::DataPartsVector & remove, bool clear_without_timeout, DataPartsLock & acquired_lock)
{
if (txn)
@ -5213,6 +5237,13 @@ void MergeTreeData::Transaction::rollbackPartsToTemporaryState()
clear();
}
TransactionID MergeTreeData::Transaction::getTID() const
{
if (txn)
return txn->tid;
return Tx::PrehistoricTID;
}
void MergeTreeData::Transaction::addPart(MutableDataPartPtr & part)
{
precommitted_parts.insert(part);
@ -5223,11 +5254,14 @@ void MergeTreeData::Transaction::rollback()
if (!isEmpty())
{
WriteBufferFromOwnString buf;
buf << " Removing parts:";
buf << "Removing parts:";
for (const auto & part : precommitted_parts)
buf << " " << part->getDataPartStorage().getPartDirectory();
buf << ".";
LOG_DEBUG(data.log, "Undoing transaction.{}", buf.str());
LOG_DEBUG(data.log, "Undoing transaction {}. {}", getTID(), buf.str());
for (const auto & part : precommitted_parts)
part->version.creation_csn.store(Tx::RolledBackCSN);
auto lock = data.lockParts();

View File

@ -220,6 +220,9 @@ public:
using DataPartsLock = std::unique_lock<std::mutex>;
DataPartsLock lockParts() const { return DataPartsLock(data_parts_mutex); }
using OperationDataPartsLock = std::unique_lock<std::mutex>;
OperationDataPartsLock lockOperationsWithParts() const { return OperationDataPartsLock(operation_with_data_parts_mutex); }
MergeTreeDataPartType choosePartType(size_t bytes_uncompressed, size_t rows_count) const;
MergeTreeDataPartType choosePartTypeOnDisk(size_t bytes_uncompressed, size_t rows_count) const;
@ -271,6 +274,8 @@ public:
}
}
TransactionID getTID() const;
private:
friend class MergeTreeData;
@ -1029,7 +1034,6 @@ public:
/// Returns an object that protects temporary directory from cleanup
scope_guard getTemporaryPartDirectoryHolder(const String & part_dir_name);
std::optional<scope_guard> tryGetTemporaryPartDirectoryHolder(const String & part_dir_name);
protected:
friend class IMergeTreeDataPart;
@ -1112,6 +1116,10 @@ protected:
DataPartsIndexes::index<TagByInfo>::type & data_parts_by_info;
DataPartsIndexes::index<TagByStateAndInfo>::type & data_parts_by_state_and_info;
/// Mutex for critical sections which alter set of parts
/// It is like truncate, drop/detach partition
mutable std::mutex operation_with_data_parts_mutex;
/// Current description of columns of data type Object.
/// It changes only when set of parts is changed and is
/// protected by @data_parts_mutex.
@ -1222,15 +1230,20 @@ protected:
DataPartsLock & data_parts_lock) const;
DataPartsVector getCoveredOutdatedParts(
const MergeTreePartInfo & part_info,
const DataPartPtr & part,
DataPartsLock & data_parts_lock) const;
void getPartHierarchy(
struct PartHierarchy
{
DataPartPtr duplicate_part;
DataPartPtr covering_part;
DataPartsVector covered_parts;
DataPartPtr intersected_part;
};
PartHierarchy getPartHierarchy(
const MergeTreePartInfo & part_info,
const String & part_name,
DataPartState state,
DataPartPtr & out_covering_part,
DataPartsVector & covered_part,
DataPartsLock & /* data_parts_lock */) const;
/// Checks whether the column is in the primary key, possibly wrapped in a chain of functions with single argument.
@ -1302,8 +1315,9 @@ protected:
static void incrementMergedPartsProfileEvent(MergeTreeDataPartType type);
private:
/// Checking that candidate part doesn't break invariants: correct partition and doesn't exist already
void checkPartCanBeAddedToTable(MutableDataPartPtr & part, DataPartsLock & lock) const;
/// Checking that candidate part doesn't break invariants: correct partition
void checkPartPartition(MutableDataPartPtr & part, DataPartsLock & lock) const;
void checkPartDuplicate(MutableDataPartPtr & part, Transaction & transaction, DataPartsLock & lock) const;
/// Preparing itself to be committed in memory: fill some fields inside part, add it to data_parts_indexes
/// in precommitted state and to transaction

View File

@ -60,7 +60,7 @@ struct MergeTreePartInfo
/// True if contains rhs (this part is obtained by merging rhs with some other parts or mutating rhs)
bool contains(const MergeTreePartInfo & rhs) const
{
/// Containing part may have equal level iff block numbers are equal (unless level is MAX_LEVEL)
/// Containing part may have equal level if block numbers are equal (unless level is MAX_LEVEL)
/// (e.g. all_0_5_2 does not contain all_0_4_2, but all_0_5_3 or all_0_4_2_9 do)
bool strictly_contains_block_range = (min_block == rhs.min_block && max_block == rhs.max_block) || level > rhs.level
|| level == MAX_LEVEL || level == LEGACY_MAX_LEVEL;

View File

@ -53,7 +53,6 @@ namespace ErrorCodes
extern const int UNKNOWN_POLICY;
extern const int NO_SUCH_DATA_PART;
extern const int ABORTED;
extern const int SERIALIZATION_ERROR;
}
namespace ActionLocks
@ -1528,59 +1527,32 @@ StorageMergeTree::MutableDataPartsVector createNewEmptyDataParts(MergeTreeData &
void captureTmpDirName(MergeTreeData & data, FutureNewEmptyParts & new_parts)
{
for (auto & new_part : new_parts)
{
auto maybe_guard = data.tryGetTemporaryPartDirectoryHolder(new_part.getDirName());
if (!maybe_guard)
throw Exception(ErrorCodes::SERIALIZATION_ERROR, "Temporary part {} already added", new_part.getDirName());
new_part.tmp_dir_guard = std::move(*maybe_guard);
}
new_part.tmp_dir_guard = data.getTemporaryPartDirectoryHolder(new_part.getDirName());
}
void StorageMergeTree::coverPartsWithEmptyParts(const DataPartsVector & old_parts, MutableDataPartsVector & new_parts, Transaction & transaction)
void StorageMergeTree::coverPartsWithEmptyParts(const DataPartsVector &, MutableDataPartsVector & new_parts, Transaction & transaction)
{
auto part_lock = lockParts();
/// Check that all old parts are Active still
{
DataPartsVector changed_parts;
for (const auto & part: old_parts)
if (part->getState() != DataPartState::Active)
changed_parts.push_back(part);
if (!changed_parts.empty())
throw Exception(ErrorCodes::SERIALIZATION_ERROR,
"Race with concurrent query that modifies parts. {} parts have changed the status, first is {}. Try again later.",
changed_parts.size(), changed_parts.front()->getNameWithState());
}
DataPartsVector covered_parts;
for (auto & part: new_parts)
{
bool no_covering_parts = renameTempPartAndReplaceUnlocked(part, transaction, part_lock, &covered_parts);
bool has_covering_parts = !no_covering_parts;
if (has_covering_parts)
throw Exception("Part " + part->name + " has covering part. This is a bug.", ErrorCodes::LOGICAL_ERROR);
DataPartsVector covered_parts_by_one_part = renameTempPartAndReplace(part, transaction);
if (covered_parts_by_one_part.size() > 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} expected to cover not more then 1 part. {} covered parts have found. This is a bug.",
part->name, covered_parts_by_one_part.size());
std::move(covered_parts_by_one_part.begin(), covered_parts_by_one_part.end(), std::back_inserter(covered_parts));
}
if (covered_parts.size() != old_parts.size())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Going to delete {} (first is {}) parts instead of {} (first is {}) by creating new {} (first is {}) parts. This is a bug.",
covered_parts.size(), covered_parts.front()->getNameWithState(),
old_parts.size(), old_parts.front()->getNameWithState(),
new_parts.size(), new_parts.front()->getNameWithState()
);
LOG_INFO(log, "Remove {} parts by covering them with empty {} parts. With txn {}.",
covered_parts.size(), new_parts.size(), transaction.getTID());
LOG_INFO(log, "Remove {} parts by covering them with empty {} parts.", covered_parts.size(), new_parts.size());
/// Do commit at the same locked scope where new parts was checked
transaction.commit(&part_lock);
transaction.commit();
/// Remove covered parts without waiting for old_parts_lifetime seconds.
for (auto & covered_part: covered_parts)
covered_part->remove_time.store(0, std::memory_order_relaxed);
for (auto & part: covered_parts)
part->remove_time.store(0, std::memory_order_relaxed);
if (deduplication_log)
for (const auto & part: covered_parts)
@ -1598,11 +1570,13 @@ void StorageMergeTree::truncate(const ASTPtr &, const StorageMetadataPtr &, Cont
auto txn = query_context->getCurrentTransaction();
MergeTreeData::Transaction transaction(*this, txn.get());
{
auto operation_data_parts_lock = lockOperationsWithParts();
auto parts = getVisibleDataPartsVector(query_context);
auto new_parts = initCoverageWithNewEmptyParts(parts, format_version);
LOG_TEST(log, "Made {} empty parts in order to cover {} parts. Empty parts: {}, covered parts: {}",
LOG_TEST(log, "Made {} empty parts in order to cover {} parts. Empty parts: {}, covered parts: {}, ",
new_parts.size(), parts.size(),
fmt::join(getPartsNames(new_parts), ", "), fmt::join(getPartsNames(parts), ", "));
@ -1635,6 +1609,8 @@ void StorageMergeTree::dropPart(const String & part_name, bool detach, ContextPt
auto txn = query_context->getCurrentTransaction();
MergeTreeData::Transaction transaction(*this, txn.get());
{
auto operation_data_parts_lock = lockOperationsWithParts();
auto part = getPartIfExists(part_name, {MergeTreeDataPartState::Active});
if (!part)
throw Exception("Part " + part_name + " not found, won't try to drop it.", ErrorCodes::NO_SUCH_DATA_PART);
@ -1685,6 +1661,8 @@ void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, Cont
auto txn = query_context->getCurrentTransaction();
MergeTreeData::Transaction transaction(*this, txn.get());
{
auto operation_data_parts_lock = lockOperationsWithParts();
DataPartsVector parts;
{
if (partition_ast && partition_ast->all)

View File

@ -49,14 +49,14 @@ function thread_partition_src_to_dst()
SELECT throwIf((SELECT (count(), sum(n)) FROM merge(currentDatabase(), '') WHERE type=3) != ($count + 1, $sum + $i)) FORMAT Null;
COMMIT;" 2>&1) ||:
echo "$out" | grep -Fv "SERIALIZATION_ERROR" | grep -F "Received from " && $CLICKHOUSE_CLIENT --multiquery --query "
echo "$out" | grep -Fve "SERIALIZATION_ERROR" -Fve "PART_IS_TEMPORARILY_LOCKED" | grep -F "Received from " && $CLICKHOUSE_CLIENT --multiquery --query "
begin transaction;
set transaction snapshot 3;
select $i, 'src', type, n, _part from src order by type, n;
select $i, 'dst', type, n, _part from dst order by type, n;
rollback" ||:
echo "$out" | grep -Fa "SERIALIZATION_ERROR" >/dev/null || count=$((count+1))
echo "$out" | grep -Fa "SERIALIZATION_ERROR" >/dev/null || sum=$((sum+i))
echo "$out" | grep -Fae "SERIALIZATION_ERROR" -Fae "PART_IS_TEMPORARILY_LOCKED" >/dev/null || count=$((count+1))
echo "$out" | grep -Fae "SERIALIZATION_ERROR" -Fae "PART_IS_TEMPORARILY_LOCKED" >/dev/null || sum=$((sum+i))
done
}

View File

@ -1,7 +1,7 @@
concurrent_drop_after
tx71 3
tx11 3
concurrent_drop_before
tx71 3
tx21 3
UNKNOWN_TABLE
concurrent_insert
2
@ -20,22 +20,9 @@ all_1_1_0 1
all_2_2_1 0
all_3_3_0 1
read_from_snapshot
tx61 3
tx61 3
tx62 0
tx61 3
0
concurrent_delete
tx41 41 3
tx41 41 3
SERIALIZATION_ERROR
tx42 42 1
2
4
concurrent_delete_rollback
tx51 3
tx51 3
tx52 1
tx52 0
tx51 3
0
concurrent_drop_part_after
@ -50,3 +37,15 @@ NewPart all_2_2_0
NewPart all_2_2_1
NewPart all_3_3_0
NewPart all_3_3_1
concurrent_truncate_notx_after
tx71 3
tx71 0
0
concurrent_truncate_notx_before
tx81 3
NO_SUCH_DATA_PART
INVALID_TRANSACTION
INVALID_TRANSACTION
0
concurrent_rollback_truncate
3

View File

@ -30,11 +30,11 @@ function concurrent_drop_after()
reset_table
tx 71 "begin transaction"
tx 71 "select count() from tt"
tx 71 "truncate table tt"
$CLICKHOUSE_CLIENT -q "drop table tt"
tx 71 "commit"
tx 11 "begin transaction"
tx 11 "select count() from tt"
tx 11 "truncate table tt"
$CLICKHOUSE_CLIENT -q "drop table tt"
tx 11 "commit"
}
concurrent_drop_after
@ -45,11 +45,11 @@ function concurrent_drop_before()
reset_table
tx 71 "begin transaction"
tx 71 "select count() from tt"
tx 21 "begin transaction"
tx 21 "select count() from tt"
$CLICKHOUSE_CLIENT -q "drop table tt"
tx 71 "truncate table tt" | grep -Eo "UNKNOWN_TABLE" | uniq
tx 71 "rollback"
tx 21 "truncate table tt" | grep -Eo "UNKNOWN_TABLE" | uniq
tx 21 "rollback"
}
concurrent_drop_before
@ -60,14 +60,14 @@ function concurrent_insert()
reset_table
tx 1 "begin transaction"
tx 2 "begin transaction"
tx 1 "insert into tt values (1)" # inserts all_4_4_0
tx 2 "insert into tt values (2)" # inserts all_5_5_0
tx 1 "insert into tt values (3)" # inserts all_6_6_0
tx 1 "truncate table tt" # creates all_1_4_1 all_6_6_1
tx 1 "commit"
tx 2 "commit"
tx 31 "begin transaction"
tx 32 "begin transaction"
tx 31 "insert into tt values (1)" # inserts all_4_4_0
tx 32 "insert into tt values (2)" # inserts all_5_5_0
tx 31 "insert into tt values (3)" # inserts all_6_6_0
tx 31 "truncate table tt" # creates all_1_4_1 all_6_6_1
tx 31 "commit"
tx 32 "commit"
$CLICKHOUSE_CLIENT -q "select n from tt order by n"
$CLICKHOUSE_CLIENT -q "select name, rows from system.parts
@ -83,12 +83,12 @@ function concurrent_drop_part_before()
reset_table
tx 11 "begin transaction"
tx 22 "begin transaction"
tx 22 "alter table tt drop part 'all_2_2_0'"
tx 11 "truncate table tt" | grep -Eo "SERIALIZATION_ERROR" | uniq
tx 11 "commit" | grep -Eo "INVALID_TRANSACTION" | uniq
tx 22 "commit"
tx 41 "begin transaction"
tx 42 "begin transaction"
tx 42 "alter table tt drop part 'all_2_2_0'"
tx 41 "truncate table tt" | grep -Eo "SERIALIZATION_ERROR" | uniq
tx 41 "commit" | grep -Eo "INVALID_TRANSACTION" | uniq
tx 42 "commit"
$CLICKHOUSE_CLIENT -q "select n from tt order by n"
$CLICKHOUSE_CLIENT -q "select name, rows from system.parts
@ -106,76 +106,21 @@ function read_from_snapshot()
reset_table
tx 61 "begin transaction"
tx 61 "select count() from tt"
tx 62 "begin transaction"
tx 62 "truncate table tt"
tx 61 "select count() from tt"
tx 62 "select count() from tt"
tx 62 "commit"
tx 61 "select count() from tt"
tx 61 "commit"
tx 51 "begin transaction"
tx 51 "select count() from tt"
tx 52 "begin transaction"
tx 52 "truncate table tt"
tx 51 "select count() from tt"
tx 52 "select count() from tt"
tx 52 "commit"
tx 51 "select count() from tt"
tx 51 "commit"
$CLICKHOUSE_CLIENT -q "select count() from tt"
}
read_from_snapshot
function concurrent_delete()
{
echo "concurrent_delete"
reset_table
# test runs mutation, merges have to be enabled
$CLICKHOUSE_CLIENT -q "system start merges $table"
# make parts name predictable
$CLICKHOUSE_CLIENT -q "optimize table $table final"
tx 41 "begin transaction"
tx 41 "select 41, count() from tt"
tx 42 "begin transaction"
tx 42 "alter table tt delete where n%2=1"
tx 41 "select 41, count() from tt"
tx 41 "truncate table tt" | grep -Eo "SERIALIZATION_ERROR" | uniq
tx 42 "select 42, count() from tt"
tx 41 "rollback"
tx 42 "insert into tt values (4)"
tx 42 "commit"
$CLICKHOUSE_CLIENT -q "select n from tt order by n"
}
concurrent_delete
function concurrent_delete_rollback()
{
echo "concurrent_delete_rollback"
reset_table
# test runs mutation, merges have to be enabled
$CLICKHOUSE_CLIENT -q "system start merges $table"
# make parts name predictable
$CLICKHOUSE_CLIENT -q "optimize table $table final"
tx 51 "begin transaction"
tx 51 "select count() from tt"
tx 52 "begin transaction"
tx 52 "alter table tt delete where n%2=1"
tx 51 "select count() from tt"
tx 52 "select count() from tt"
tx 51 "select count() from tt"
tx 52 "rollback"
tx 51 "truncate table tt"
tx 51 "commit"
$CLICKHOUSE_CLIENT -q "select count() from tt"
}
concurrent_delete_rollback
function concurrent_drop_part_after()
{
@ -183,12 +128,12 @@ function concurrent_drop_part_after()
reset_table drop_part_after_table
tx 31 "begin transaction"
tx 32 "begin transaction"
tx 31 "truncate table drop_part_after_table"
tx 32 "alter table drop_part_after_table drop part 'all_2_2_0'" | grep -Eo "NO_SUCH_DATA_PART" | uniq
tx 31 "commit"
tx 32 "commit" | grep -Eo "INVALID_TRANSACTION" | uniq
tx 61 "begin transaction"
tx 62 "begin transaction"
tx 61 "truncate table drop_part_after_table"
tx 62 "alter table drop_part_after_table drop part 'all_2_2_0'" | grep -Eo "NO_SUCH_DATA_PART" | uniq
tx 61 "commit"
tx 62 "commit" | grep -Eo "INVALID_TRANSACTION" | uniq
$CLICKHOUSE_CLIENT -q "select n from drop_part_after_table order by n"
$CLICKHOUSE_CLIENT -q "select name, rows from system.parts
@ -201,3 +146,59 @@ function concurrent_drop_part_after()
}
concurrent_drop_part_after
function concurrent_truncate_notx_after()
{
echo "concurrent_truncate_notx_after"
reset_table
tx 71 "begin transaction"
tx 71 "select count() from tt"
tx 71 "alter table tt drop part 'all_2_2_0'"
$CLICKHOUSE_CLIENT -q "truncate table tt"
# return 0, since truncate was out of transaction
# it would be better if exception raised
tx 71 "select count() from tt"
tx 71 "commit"
$CLICKHOUSE_CLIENT -q "select count() from tt"
}
concurrent_truncate_notx_after
function concurrent_truncate_notx_before()
{
echo "concurrent_truncate_notx_before"
reset_table
tx 81 "begin transaction"
tx 81 "select count() from tt"
$CLICKHOUSE_CLIENT -q "truncate table tt"
tx 81 "alter table tt drop part 'all_2_2_0'" | grep -Eo "NO_SUCH_DATA_PART" | uniq
tx 81 "select count() from tt" | grep -Eo "INVALID_TRANSACTION" | uniq
tx 81 "commit" | grep -Eo "INVALID_TRANSACTION" | uniq
$CLICKHOUSE_CLIENT -q "select count() from tt"
}
concurrent_truncate_notx_before
function concurrent_rollback_truncate()
{
echo "concurrent_rollback_truncate"
reset_table
tx 91 "begin transaction"
tx 92 "begin transaction"
tx 91 "truncate table tt"
tx_async 91 "rollback"
tx 92 "truncate table tt" | grep -vwe PART_IS_TEMPORARILY_LOCKED || true
tx 92 "rollback"
$CLICKHOUSE_CLIENT -q "select count() from tt"
}
concurrent_rollback_truncate

View File

@ -0,0 +1,17 @@
concurrent_delete_before
tx11 41 3
tx11 41 3
SERIALIZATION_ERROR
tx12 42 1
2
4
concurrent_delete_after
tx21 111 3
tx22 112 3
UNFINISHED
concurrent_delete_rollback
tx31 3
tx31 3
tx32 1
tx31 3
0

View File

@ -0,0 +1,84 @@
#!/usr/bin/env bash
# Tags: no-fasttest, no-replicated-database, no-ordinary-database
set -e -o pipefail
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
# shellcheck source=./transactions.lib
. "$CURDIR"/transactions.lib
function reset_table()
{
table=${1:-"tt"}
$CLICKHOUSE_CLIENT -q "drop table if exists $table"
$CLICKHOUSE_CLIENT -q "create table $table (n int) engine=MergeTree order by tuple()"
$CLICKHOUSE_CLIENT -q "insert into $table values (1), (2), (3)" # inserts all_1_1_0
}
function concurrent_delete_before()
{
echo "concurrent_delete_before"
reset_table tt
tx 11 "begin transaction"
tx 11 "select 41, count() from tt"
tx 12 "begin transaction"
tx 12 "alter table tt delete where n%2=1"
tx 11 "select 41, count() from tt"
tx 11 "truncate table tt" | grep -Eo "SERIALIZATION_ERROR" | uniq
tx 12 "select 42, count() from tt"
tx 11 "rollback"
tx 12 "insert into tt values (4)"
tx 12 "commit"
$CLICKHOUSE_CLIENT -q "select n from tt order by n"
}
concurrent_delete_before
function concurrent_delete_after()
{
echo "concurrent_delete_after"
reset_table tt
tx 21 "begin transaction"
tx 22 "begin transaction"
tx 21 "select 111, count() from tt"
tx 21 "truncate table tt"
tx 22 "select 112, count() from tt"
tx 22 "alter table tt delete where n%2=1" | grep -Eo "UNFINISHED" | uniq
tx 21 "commit"
tx 22 "rollback"
$CLICKHOUSE_CLIENT -q "select n from tt order by n"
}
concurrent_delete_after
function concurrent_delete_rollback()
{
echo "concurrent_delete_rollback"
reset_table tt
tx 31 "begin transaction"
tx 31 "select count() from tt"
tx 32 "begin transaction"
tx 32 "alter table tt delete where n%2=1"
tx 31 "select count() from tt"
tx 32 "select count() from tt"
tx 31 "select count() from tt"
tx 32 "rollback"
tx 31 "truncate table tt"
tx 31 "commit"
$CLICKHOUSE_CLIENT -q "select count() from tt"
}
concurrent_delete_rollback

View File

@ -5,7 +5,6 @@ init state
2_3_3_0 InMemory 10 1
drop part 0
20
0_1_1_1 Compact 0 0
1_2_2_0 InMemory 10 1
2_3_3_0 InMemory 10 1
detach table

View File

@ -22,7 +22,7 @@ ALTER TABLE table_in_memory DROP PARTITION 0;
SELECT count() FROM table_in_memory;
SELECT name, part_type, rows, active from system.parts
WHERE table='table_in_memory' AND database=currentDatabase();
WHERE table='table_in_memory' AND database=currentDatabase() AND active;
SELECT 'detach table';
DETACH TABLE table_in_memory;