support alter partition

This commit is contained in:
Alexander Tokmakov 2021-11-17 21:14:14 +03:00
parent ce2f692bb4
commit 0a4647f927
17 changed files with 391 additions and 71 deletions

View File

@ -44,6 +44,13 @@ void VersionMetadata::lockMaxTID(const TransactionID & tid, const String & error
bool locked = maxtid_lock.compare_exchange_strong(expected_max_lock_value, max_lock_value); bool locked = maxtid_lock.compare_exchange_strong(expected_max_lock_value, max_lock_value);
if (!locked) if (!locked)
{ {
if (tid == Tx::PrehistoricTID && expected_max_lock_value == Tx::PrehistoricTID.getHash())
{
/// Don't need to lock part for queries without transaction
//FIXME Transactions: why is it possible?
return;
}
throw Exception(ErrorCodes::SERIALIZATION_ERROR, "Serialization error: " throw Exception(ErrorCodes::SERIALIZATION_ERROR, "Serialization error: "
"Transaction {} tried to remove data part, " "Transaction {} tried to remove data part, "
"but it's locked ({}) by another transaction {} which is currently removing this part. {}", "but it's locked ({}) by another transaction {} which is currently removing this part. {}",

View File

@ -172,10 +172,15 @@ static void logQuery(const String & query, ContextPtr context, bool internal)
if (!comment.empty()) if (!comment.empty())
comment = fmt::format(" (comment: {})", comment); comment = fmt::format(" (comment: {})", comment);
LOG_DEBUG(&Poco::Logger::get("executeQuery"), "(from {}{}{}){} {}", String transaction_info;
if (auto txn = context->getCurrentTransaction())
transaction_info = fmt::format(" (TID: {}, TIDH: {})", txn->tid, txn->tid.getHash());
LOG_DEBUG(&Poco::Logger::get("executeQuery"), "(from {}{}{}){}{} {}",
client_info.current_address.toString(), client_info.current_address.toString(),
(current_user != "default" ? ", user: " + current_user : ""), (current_user != "default" ? ", user: " + current_user : ""),
(!initial_query_id.empty() && current_query_id != initial_query_id ? ", initial_query_id: " + initial_query_id : std::string()), (!initial_query_id.empty() && current_query_id != initial_query_id ? ", initial_query_id: " + initial_query_id : std::string()),
transaction_info,
comment, comment,
joinLines(query)); joinLines(query));

View File

@ -538,11 +538,6 @@ public:
/// Similar to above but checks for DETACH. It's only used for DICTIONARIES. /// Similar to above but checks for DETACH. It's only used for DICTIONARIES.
virtual void checkTableCanBeDetached() const {} virtual void checkTableCanBeDetached() const {}
/// Checks that Partition could be dropped right now
/// Otherwise - throws an exception with detailed information.
/// We do not use mutex because it is not very important that the size could change during the operation.
virtual void checkPartitionCanBeDropped(const ASTPtr & /*partition*/) {}
/// Returns true if Storage may store some data on disk. /// Returns true if Storage may store some data on disk.
/// NOTE: may not be equivalent to !getDataPaths().empty() /// NOTE: may not be equivalent to !getDataPaths().empty()
virtual bool storesDataOnDisk() const { return false; } virtual bool storesDataOnDisk() const { return false; }

View File

@ -226,6 +226,7 @@ public:
*/ */
enum class State enum class State
{ {
///TODO Transactions: rename Committed to Active, because it becomes confusing
Temporary, /// the part is generating now, it is not in data_parts list Temporary, /// the part is generating now, it is not in data_parts list
PreCommitted, /// the part is in data_parts, but not used for SELECTs PreCommitted, /// the part is in data_parts, but not used for SELECTs
Committed, /// active data part, used by current and upcoming SELECTs Committed, /// active data part, used by current and upcoming SELECTs

View File

@ -3142,9 +3142,23 @@ MergeTreeData::DataPartPtr MergeTreeData::getActiveContainingPart(const String &
return getActiveContainingPart(part_info); return getActiveContainingPart(part_info);
} }
MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVectorInPartition(MergeTreeData::DataPartState state, const String & partition_id) const MergeTreeData::DataPartsVector MergeTreeData::getVisibleDataPartsVectorInPartition(ContextPtr local_context, const String & partition_id) const
{ {
DataPartStateAndPartitionID state_with_partition{state, partition_id}; if (const auto * txn = local_context->getCurrentTransaction().get())
{
DataPartStateAndPartitionID active_parts{MergeTreeDataPartState::Committed, partition_id};
DataPartStateAndPartitionID outdated_parts{MergeTreeDataPartState::Outdated, partition_id};
DataPartsVector res;
{
auto lock = lockParts();
res.insert(res.end(), data_parts_by_state_and_info.lower_bound(active_parts), data_parts_by_state_and_info.upper_bound(active_parts));
res.insert(res.end(), data_parts_by_state_and_info.lower_bound(outdated_parts), data_parts_by_state_and_info.upper_bound(outdated_parts));
}
filterVisibleDataParts(res, txn->getSnapshot(), txn->tid);
return res;
}
DataPartStateAndPartitionID state_with_partition{MergeTreeDataPartState::Committed, partition_id};
auto lock = lockParts(); auto lock = lockParts();
return DataPartsVector( return DataPartsVector(
@ -3152,19 +3166,37 @@ MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVectorInPartition(Merg
data_parts_by_state_and_info.upper_bound(state_with_partition)); data_parts_by_state_and_info.upper_bound(state_with_partition));
} }
MergeTreeData::DataPartsVector MergeTreeData::getDataPartsVectorInPartitions(MergeTreeData::DataPartState state, const std::unordered_set<String> & partition_ids) const MergeTreeData::DataPartsVector MergeTreeData::getVisibleDataPartsVectorInPartitions(ContextPtr local_context, const std::unordered_set<String> & partition_ids) const
{ {
auto lock = lockParts(); auto txn = local_context->getCurrentTransaction();
DataPartsVector res; DataPartsVector res;
{
auto lock = lockParts();
for (const auto & partition_id : partition_ids) for (const auto & partition_id : partition_ids)
{ {
DataPartStateAndPartitionID state_with_partition{state, partition_id}; DataPartStateAndPartitionID active_parts{MergeTreeDataPartState::Committed, partition_id};
insertAtEnd( insertAtEnd(
res, res,
DataPartsVector( DataPartsVector(
data_parts_by_state_and_info.lower_bound(state_with_partition), data_parts_by_state_and_info.lower_bound(active_parts),
data_parts_by_state_and_info.upper_bound(state_with_partition))); data_parts_by_state_and_info.upper_bound(active_parts)));
if (txn)
{
DataPartStateAndPartitionID outdated_parts{MergeTreeDataPartState::Committed, partition_id};
insertAtEnd(
res,
DataPartsVector(
data_parts_by_state_and_info.lower_bound(outdated_parts),
data_parts_by_state_and_info.upper_bound(outdated_parts)));
} }
}
}
if (txn)
filterVisibleDataParts(res, txn->getSnapshot(), txn->tid);
return res; return res;
} }
@ -3295,10 +3327,10 @@ void MergeTreeData::checkAlterPartitionIsPossible(
} }
} }
void MergeTreeData::checkPartitionCanBeDropped(const ASTPtr & partition) void MergeTreeData::checkPartitionCanBeDropped(const ASTPtr & partition, ContextPtr local_context)
{ {
const String partition_id = getPartitionIDFromQuery(partition, getContext()); const String partition_id = getPartitionIDFromQuery(partition, local_context);
auto parts_to_remove = getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id); auto parts_to_remove = getVisibleDataPartsVectorInPartition(local_context, partition_id);
UInt64 partition_size = 0; UInt64 partition_size = 0;
@ -3337,7 +3369,7 @@ void MergeTreeData::movePartitionToDisk(const ASTPtr & partition, const String &
throw Exception("Part " + partition_id + " is not exists or not active", ErrorCodes::NO_SUCH_DATA_PART); throw Exception("Part " + partition_id + " is not exists or not active", ErrorCodes::NO_SUCH_DATA_PART);
} }
else else
parts = getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id); parts = getVisibleDataPartsVectorInPartition(local_context, partition_id);
auto disk = getStoragePolicy()->getDiskByName(name); auto disk = getStoragePolicy()->getDiskByName(name);
if (!disk) if (!disk)
@ -3382,7 +3414,7 @@ void MergeTreeData::movePartitionToVolume(const ASTPtr & partition, const String
throw Exception("Part " + partition_id + " is not exists or not active", ErrorCodes::NO_SUCH_DATA_PART); throw Exception("Part " + partition_id + " is not exists or not active", ErrorCodes::NO_SUCH_DATA_PART);
} }
else else
parts = getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id); parts = getVisibleDataPartsVectorInPartition(local_context, partition_id);
auto volume = getStoragePolicy()->getVolumeByName(name); auto volume = getStoragePolicy()->getVolumeByName(name);
if (!volume) if (!volume)
@ -3454,7 +3486,7 @@ Pipe MergeTreeData::alterPartition(
} }
else else
{ {
checkPartitionCanBeDropped(command.partition); checkPartitionCanBeDropped(command.partition, query_context);
dropPartition(command.partition, command.detach, query_context); dropPartition(command.partition, command.detach, query_context);
} }
} }
@ -3503,7 +3535,7 @@ Pipe MergeTreeData::alterPartition(
case PartitionCommand::REPLACE_PARTITION: case PartitionCommand::REPLACE_PARTITION:
{ {
if (command.replace) if (command.replace)
checkPartitionCanBeDropped(command.partition); checkPartitionCanBeDropped(command.partition, query_context);
String from_database = query_context->resolveDatabase(command.from_database); String from_database = query_context->resolveDatabase(command.from_database);
auto from_storage = DatabaseCatalog::instance().getTable({from_database, command.from_table}, query_context); auto from_storage = DatabaseCatalog::instance().getTable({from_database, command.from_table}, query_context);
replacePartitionFrom(from_storage, command.partition, command.replace, query_context); replacePartitionFrom(from_storage, command.partition, command.replace, query_context);
@ -3564,7 +3596,7 @@ BackupEntries MergeTreeData::backup(const ASTs & partitions, ContextPtr local_co
if (partitions.empty()) if (partitions.empty())
data_parts = getDataPartsVector(); data_parts = getDataPartsVector();
else else
data_parts = getDataPartsVectorInPartitions(MergeTreeDataPartState::Committed, getPartitionIDsFromQuery(partitions, local_context)); data_parts = getVisibleDataPartsVectorInPartitions(local_context, getPartitionIDsFromQuery(partitions, local_context));
return backupDataParts(data_parts); return backupDataParts(data_parts);
} }
@ -3771,26 +3803,54 @@ String MergeTreeData::getPartitionIDFromQuery(const ASTPtr & ast, ContextPtr loc
} }
DataPartsVector MergeTreeData::getDataPartsVector(ContextPtr local_context) const DataPartsVector MergeTreeData::getVisibleDataPartsVector(ContextPtr local_context) const
{ {
return getVisibleDataPartsVector(local_context->getCurrentTransaction()); DataPartsVector res;
if (const auto * txn = local_context->getCurrentTransaction().get())
{
res = getDataPartsVector({DataPartState::Committed, DataPartState::Outdated});
filterVisibleDataParts(res, txn->getSnapshot(), txn->tid);
}
else
{
res = getDataPartsVector();
}
return res;
} }
MergeTreeData::DataPartsVector MergeTreeData::getVisibleDataPartsVector(const MergeTreeTransactionPtr & txn) const MergeTreeData::DataPartsVector MergeTreeData::getVisibleDataPartsVector(const MergeTreeTransactionPtr & txn) const
{ {
if (!txn) DataPartsVector res;
return getDataPartsVector(); if (txn)
{
res = getDataPartsVector({DataPartState::Committed, DataPartState::Outdated});
filterVisibleDataParts(res, txn->getSnapshot(), txn->tid);
}
else
{
res = getDataPartsVector();
}
return res;
}
DataPartsVector maybe_visible_parts = getDataPartsVector({DataPartState::PreCommitted, DataPartState::Committed, DataPartState::Outdated}); MergeTreeData::DataPartsVector MergeTreeData::getVisibleDataPartsVector(Snapshot snapshot_version, TransactionID current_tid) const
{
auto res = getDataPartsVector({DataPartState::Committed, DataPartState::Outdated});
filterVisibleDataParts(res, snapshot_version, current_tid);
return res;
}
void MergeTreeData::filterVisibleDataParts(DataPartsVector & maybe_visible_parts, Snapshot snapshot_version, TransactionID current_tid) const
{
if (maybe_visible_parts.empty()) if (maybe_visible_parts.empty())
return maybe_visible_parts; return;
auto it = maybe_visible_parts.begin(); auto it = maybe_visible_parts.begin();
auto it_last = maybe_visible_parts.end() - 1; auto it_last = maybe_visible_parts.end() - 1;
String visible_parts_str; String visible_parts_str;
while (it <= it_last) while (it <= it_last)
{ {
if ((*it)->versions.isVisible(*txn)) if ((*it)->versions.isVisible(snapshot_version, current_tid))
{ {
visible_parts_str += (*it)->name; visible_parts_str += (*it)->name;
visible_parts_str += " "; visible_parts_str += " ";
@ -3804,9 +3864,8 @@ MergeTreeData::DataPartsVector MergeTreeData::getVisibleDataPartsVector(const Me
} }
size_t new_size = it_last - maybe_visible_parts.begin() + 1; size_t new_size = it_last - maybe_visible_parts.begin() + 1;
LOG_TRACE(log, "Got {} parts visible for {}: {}", new_size, txn->tid, visible_parts_str); LOG_TEST(log, "Got {} parts visible in snapshot {} (TID {}): {}", new_size, snapshot_version, current_tid, visible_parts_str);
maybe_visible_parts.resize(new_size); maybe_visible_parts.resize(new_size);
return maybe_visible_parts;
} }
@ -4238,7 +4297,7 @@ MergeTreeData::DataParts MergeTreeData::getDataParts(const DataPartStates & affo
return res; return res;
} }
MergeTreeData::DataParts MergeTreeData::getDataParts() const MergeTreeData::DataParts MergeTreeData::getDataPartsForInternalUsage() const
{ {
return getDataParts({DataPartState::Committed}); return getDataParts({DataPartState::Committed});
} }
@ -4879,7 +4938,7 @@ bool MergeTreeData::getQueryProcessingStageWithAggregateProjection(
max_added_blocks = std::make_shared<PartitionIdToMaxBlock>(replicated->getMaxAddedBlocks()); max_added_blocks = std::make_shared<PartitionIdToMaxBlock>(replicated->getMaxAddedBlocks());
} }
auto parts = getDataPartsVector(query_context); auto parts = getVisibleDataPartsVector(query_context);
// If minmax_count_projection is a valid candidate, check its completeness. // If minmax_count_projection is a valid candidate, check its completeness.
if (minmax_conut_projection_candidate) if (minmax_conut_projection_candidate)
@ -5237,7 +5296,7 @@ PartitionCommandsResultInfo MergeTreeData::freezePartitionsByMatcher(
const String shadow_path = "shadow/"; const String shadow_path = "shadow/";
/// Acquire a snapshot of active data parts to prevent removing while doing backup. /// Acquire a snapshot of active data parts to prevent removing while doing backup.
const auto data_parts = getDataParts(); const auto data_parts = getVisibleDataPartsVector(local_context);
String backup_name = (!with_name.empty() ? escapeForFileName(with_name) : toString(increment)); String backup_name = (!with_name.empty() ? escapeForFileName(with_name) : toString(increment));
String backup_path = fs::path(shadow_path) / backup_name / ""; String backup_path = fs::path(shadow_path) / backup_name / "";

View File

@ -425,6 +425,7 @@ public:
Int64 getMaxBlockNumber() const; Int64 getMaxBlockNumber() const;
/// Returns a copy of the list so that the caller shouldn't worry about locks. /// Returns a copy of the list so that the caller shouldn't worry about locks.
DataParts getDataParts(const DataPartStates & affordable_states) const; DataParts getDataParts(const DataPartStates & affordable_states) const;
@ -436,24 +437,18 @@ public:
/// Returns absolutely all parts (and snapshot of their states) /// Returns absolutely all parts (and snapshot of their states)
DataPartsVector getAllDataPartsVector(DataPartStateVector * out_states = nullptr, bool require_projection_parts = false) const; DataPartsVector getAllDataPartsVector(DataPartStateVector * out_states = nullptr, bool require_projection_parts = false) const;
/// Returns all detached parts /// Returns parts in Committed state (NOT in terms of transactions, should be used carefully)
DetachedPartsInfo getDetachedParts() const; DataParts getDataPartsForInternalUsage() const;
void validateDetachedPartName(const String & name) const;
void dropDetached(const ASTPtr & partition, bool part, ContextPtr context);
MutableDataPartsVector tryLoadPartsToAttach(const ASTPtr & partition, bool attach_part,
ContextPtr context, PartsTemporaryRename & renamed_parts);
/// Returns Committed parts
DataParts getDataParts() const;
DataPartsVector getDataPartsVector() const; DataPartsVector getDataPartsVector() const;
DataPartsVector getDataPartsVector(ContextPtr local_context) const; void filterVisibleDataParts(DataPartsVector & maybe_visible_parts, Snapshot snapshot_version, TransactionID current_tid) const;
DataPartsVector getVisibleDataPartsVector(const MergeTreeTransactionPtr & txn) const;
/// Returns a committed part with the given name or a part containing it. If there is no such part, returns nullptr. /// Returns parts that visible with current snapshot
DataPartsVector getVisibleDataPartsVector(ContextPtr local_context) const;
DataPartsVector getVisibleDataPartsVector(const MergeTreeTransactionPtr & txn) const;
DataPartsVector getVisibleDataPartsVector(Snapshot snapshot_version, TransactionID current_tid) const;
/// Returns a part in Committed state with the given name or a part containing it. If there is no such part, returns nullptr.
DataPartPtr getActiveContainingPart(const String & part_name) const; DataPartPtr getActiveContainingPart(const String & part_name) const;
DataPartPtr getActiveContainingPart(const MergeTreePartInfo & part_info) const; DataPartPtr getActiveContainingPart(const MergeTreePartInfo & part_info) const;
DataPartPtr getActiveContainingPart(const MergeTreePartInfo & part_info, DataPartState state, DataPartsLock & lock) const; DataPartPtr getActiveContainingPart(const MergeTreePartInfo & part_info, DataPartState state, DataPartsLock & lock) const;
@ -463,8 +458,8 @@ public:
void swapActivePart(MergeTreeData::DataPartPtr part_copy); void swapActivePart(MergeTreeData::DataPartPtr part_copy);
/// Returns all parts in specified partition /// Returns all parts in specified partition
DataPartsVector getDataPartsVectorInPartition(DataPartState state, const String & partition_id) const; DataPartsVector getVisibleDataPartsVectorInPartition(ContextPtr local_context, const String & partition_id) const;
DataPartsVector getDataPartsVectorInPartitions(DataPartState state, const std::unordered_set<String> & partition_ids) const; DataPartsVector getVisibleDataPartsVectorInPartitions(ContextPtr local_context, const std::unordered_set<String> & partition_ids) const;
/// Returns the part with the given name and state or nullptr if no such part. /// Returns the part with the given name and state or nullptr if no such part.
DataPartPtr getPartIfExists(const String & part_name, const DataPartStates & valid_states); DataPartPtr getPartIfExists(const String & part_name, const DataPartStates & valid_states);
@ -484,6 +479,18 @@ public:
/// Makes sense only for ordinary MergeTree engines because for them block numbering doesn't depend on partition. /// Makes sense only for ordinary MergeTree engines because for them block numbering doesn't depend on partition.
std::optional<Int64> getMinPartDataVersion() const; std::optional<Int64> getMinPartDataVersion() const;
/// Returns all detached parts
DetachedPartsInfo getDetachedParts() const;
void validateDetachedPartName(const String & name) const;
void dropDetached(const ASTPtr & partition, bool part, ContextPtr context);
MutableDataPartsVector tryLoadPartsToAttach(const ASTPtr & partition, bool attach_part,
ContextPtr context, PartsTemporaryRename & renamed_parts);
/// If the table contains too many active parts, sleep for a while to give them time to merge. /// If the table contains too many active parts, sleep for a while to give them time to merge.
/// If until is non-null, wake up from the sleep earlier if the event happened. /// If until is non-null, wake up from the sleep earlier if the event happened.
void delayInsertOrThrowIfNeeded(Poco::Event * until = nullptr) const; void delayInsertOrThrowIfNeeded(Poco::Event * until = nullptr) const;
@ -656,7 +663,10 @@ public:
/// Moves partition to specified Volume /// Moves partition to specified Volume
void movePartitionToVolume(const ASTPtr & partition, const String & name, bool moving_part, ContextPtr context); void movePartitionToVolume(const ASTPtr & partition, const String & name, bool moving_part, ContextPtr context);
void checkPartitionCanBeDropped(const ASTPtr & partition) override; /// Checks that Partition could be dropped right now
/// Otherwise - throws an exception with detailed information.
/// We do not use mutex because it is not very important that the size could change during the operation.
void checkPartitionCanBeDropped(const ASTPtr & partition, ContextPtr local_context);
void checkPartCanBeDropped(const String & part_name); void checkPartCanBeDropped(const String & part_name);

View File

@ -395,7 +395,7 @@ MergeTreeData::DataPartsVector MergeTreeDataMergerMutator::selectAllPartsFromPar
{ {
MergeTreeData::DataPartsVector parts_from_partition; MergeTreeData::DataPartsVector parts_from_partition;
MergeTreeData::DataParts data_parts = data.getDataParts(); MergeTreeData::DataParts data_parts = data.getDataPartsForInternalUsage();
for (const auto & current_part : data_parts) for (const auto & current_part : data_parts)
{ {

View File

@ -130,7 +130,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
return std::make_unique<QueryPlan>(); return std::make_unique<QueryPlan>();
const auto & settings = context->getSettingsRef(); const auto & settings = context->getSettingsRef();
auto parts = data.getDataPartsVector(context); auto parts = data.getVisibleDataPartsVector(context);
if (!query_info.projection) if (!query_info.projection)
{ {

View File

@ -87,7 +87,7 @@ StorageMergeTree::StorageMergeTree(
{ {
loadDataParts(has_force_restore_data_flag); loadDataParts(has_force_restore_data_flag);
if (!attach && !getDataParts().empty()) if (!attach && !getDataPartsForInternalUsage().empty())
throw Exception("Data directory for table already containing data parts - probably it was unclean DROP table or manual intervention. You must either clear directory by hand or use ATTACH TABLE instead of CREATE TABLE if you need to use that parts.", ErrorCodes::INCORRECT_DATA); throw Exception("Data directory for table already containing data parts - probably it was unclean DROP table or manual intervention. You must either clear directory by hand or use ATTACH TABLE instead of CREATE TABLE if you need to use that parts.", ErrorCodes::INCORRECT_DATA);
increment.set(getMaxBlockNumber()); increment.set(getMaxBlockNumber());
@ -258,7 +258,7 @@ void StorageMergeTree::truncate(const ASTPtr &, const StorageMetadataPtr &, Cont
/// This protects against "revival" of data for a removed partition after completion of merge. /// This protects against "revival" of data for a removed partition after completion of merge.
auto merge_blocker = stopMergesAndWait(); auto merge_blocker = stopMergesAndWait();
auto parts_to_remove = getDataPartsVector(); auto parts_to_remove = getVisibleDataPartsVector(local_context);
removePartsFromWorkingSet(local_context->getCurrentTransaction().get(), parts_to_remove, true); removePartsFromWorkingSet(local_context->getCurrentTransaction().get(), parts_to_remove, true);
LOG_INFO(log, "Removed {} parts.", parts_to_remove.size()); LOG_INFO(log, "Removed {} parts.", parts_to_remove.size());
@ -713,9 +713,9 @@ std::shared_ptr<MergeMutateSelectedEntry> StorageMergeTree::selectPartsToMerge(
{ {
/// Cannot merge parts if some of them is not visible in current snapshot /// Cannot merge parts if some of them is not visible in current snapshot
/// TODO We can use simplified visibility rules (without CSN lookup) here /// TODO We can use simplified visibility rules (without CSN lookup) here
if (left && !left->versions.isVisible(*tx)) if (left && !left->versions.isVisible(tx->getSnapshot(), Tx::EmptyTID))
return false; return false;
if (right && !right->versions.isVisible(*tx)) if (right && !right->versions.isVisible(tx->getSnapshot(), Tx::EmptyTID))
return false; return false;
} }
@ -1288,7 +1288,7 @@ void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, Cont
/// This protects against "revival" of data for a removed partition after completion of merge. /// This protects against "revival" of data for a removed partition after completion of merge.
auto merge_blocker = stopMergesAndWait(); auto merge_blocker = stopMergesAndWait();
String partition_id = getPartitionIDFromQuery(partition, local_context); String partition_id = getPartitionIDFromQuery(partition, local_context);
parts_to_remove = getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id); parts_to_remove = getVisibleDataPartsVectorInPartition(local_context, partition_id);
/// TODO should we throw an exception if parts_to_remove is empty? /// TODO should we throw an exception if parts_to_remove is empty?
removePartsFromWorkingSet(local_context->getCurrentTransaction().get(), parts_to_remove, true); removePartsFromWorkingSet(local_context->getCurrentTransaction().get(), parts_to_remove, true);
@ -1370,7 +1370,7 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con
MergeTreeData & src_data = checkStructureAndGetMergeTreeData(source_table, source_metadata_snapshot, my_metadata_snapshot); MergeTreeData & src_data = checkStructureAndGetMergeTreeData(source_table, source_metadata_snapshot, my_metadata_snapshot);
String partition_id = getPartitionIDFromQuery(partition, local_context); String partition_id = getPartitionIDFromQuery(partition, local_context);
DataPartsVector src_parts = src_data.getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id); DataPartsVector src_parts = src_data.getVisibleDataPartsVectorInPartition(local_context, partition_id);
MutableDataPartsVector dst_parts; MutableDataPartsVector dst_parts;
static const String TMP_PREFIX = "tmp_replace_from_"; static const String TMP_PREFIX = "tmp_replace_from_";
@ -1455,7 +1455,7 @@ void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const
MergeTreeData & src_data = dest_table_storage->checkStructureAndGetMergeTreeData(*this, metadata_snapshot, dest_metadata_snapshot); MergeTreeData & src_data = dest_table_storage->checkStructureAndGetMergeTreeData(*this, metadata_snapshot, dest_metadata_snapshot);
String partition_id = getPartitionIDFromQuery(partition, local_context); String partition_id = getPartitionIDFromQuery(partition, local_context);
DataPartsVector src_parts = src_data.getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id); DataPartsVector src_parts = src_data.getVisibleDataPartsVectorInPartition(local_context, partition_id);
MutableDataPartsVector dst_parts; MutableDataPartsVector dst_parts;
static const String TMP_PREFIX = "tmp_move_from_"; static const String TMP_PREFIX = "tmp_move_from_";
@ -1535,7 +1535,7 @@ CheckResults StorageMergeTree::checkData(const ASTPtr & query, ContextPtr local_
if (const auto & check_query = query->as<ASTCheckQuery &>(); check_query.partition) if (const auto & check_query = query->as<ASTCheckQuery &>(); check_query.partition)
{ {
String partition_id = getPartitionIDFromQuery(check_query.partition, local_context); String partition_id = getPartitionIDFromQuery(check_query.partition, local_context);
data_parts = getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id); data_parts = getVisibleDataPartsVectorInPartition(local_context, partition_id);
} }
else else
data_parts = getDataPartsVector(); data_parts = getDataPartsVector();

View File

@ -145,7 +145,6 @@ public:
CheckResults checkData(const ASTPtr & query , ContextPtr context) override { return getNested()->checkData(query, context); } CheckResults checkData(const ASTPtr & query , ContextPtr context) override { return getNested()->checkData(query, context); }
void checkTableCanBeDropped() const override { getNested()->checkTableCanBeDropped(); } void checkTableCanBeDropped() const override { getNested()->checkTableCanBeDropped(); }
void checkPartitionCanBeDropped(const ASTPtr & partition) override { getNested()->checkPartitionCanBeDropped(partition); }
bool storesDataOnDisk() const override { return getNested()->storesDataOnDisk(); } bool storesDataOnDisk() const override { return getNested()->storesDataOnDisk(); }
Strings getDataPaths() const override { return getNested()->getDataPaths(); } Strings getDataPaths() const override { return getNested()->getDataPaths(); }
StoragePolicyPtr getStoragePolicy() const override { return getNested()->getStoragePolicy(); } StoragePolicyPtr getStoragePolicy() const override { return getNested()->getStoragePolicy(); }

View File

@ -405,7 +405,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
if (!attach) if (!attach)
{ {
if (!getDataParts().empty()) if (!getDataPartsForInternalUsage().empty())
throw Exception("Data directory for table already contains data parts" throw Exception("Data directory for table already contains data parts"
" - probably it was unclean DROP table or manual intervention." " - probably it was unclean DROP table or manual intervention."
" You must either clear directory by hand or use ATTACH TABLE" " You must either clear directory by hand or use ATTACH TABLE"
@ -2452,7 +2452,7 @@ void StorageReplicatedMergeTree::cloneReplica(const String & source_replica, Coo
tryRemovePartsFromZooKeeperWithRetries(parts_to_remove_from_zk); tryRemovePartsFromZooKeeperWithRetries(parts_to_remove_from_zk);
auto local_active_parts = getDataParts(); auto local_active_parts = getDataPartsForInternalUsage();
DataPartsVector parts_to_remove_from_working_set; DataPartsVector parts_to_remove_from_working_set;
@ -4187,7 +4187,7 @@ ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock StorageReplicatedMerg
{ {
ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock max_added_blocks; ReplicatedMergeTreeQuorumAddedParts::PartitionIdToMaxBlock max_added_blocks;
for (const auto & data_part : getDataParts()) for (const auto & data_part : getDataPartsForInternalUsage())
{ {
max_added_blocks[data_part->info.partition_id] max_added_blocks[data_part->info.partition_id]
= std::max(max_added_blocks[data_part->info.partition_id], data_part->info.max_block); = std::max(max_added_blocks[data_part->info.partition_id], data_part->info.max_block);
@ -4293,6 +4293,7 @@ void StorageReplicatedMergeTree::foreachCommittedParts(Func && func, bool select
max_added_blocks = getMaxAddedBlocks(); max_added_blocks = getMaxAddedBlocks();
auto lock = lockParts(); auto lock = lockParts();
/// TODO Transactions: should we count visible parts only?
for (const auto & part : getDataPartsStateRange(DataPartState::Committed)) for (const auto & part : getDataPartsStateRange(DataPartState::Committed))
{ {
if (part->isEmpty()) if (part->isEmpty())
@ -6246,7 +6247,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
String partition_id = getPartitionIDFromQuery(partition, query_context); String partition_id = getPartitionIDFromQuery(partition, query_context);
/// NOTE: Some covered parts may be missing in src_all_parts if corresponding log entries are not executed yet. /// NOTE: Some covered parts may be missing in src_all_parts if corresponding log entries are not executed yet.
DataPartsVector src_all_parts = src_data.getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id); DataPartsVector src_all_parts = src_data.getVisibleDataPartsVectorInPartition(query_context, partition_id);
LOG_DEBUG(log, "Cloning {} parts", src_all_parts.size()); LOG_DEBUG(log, "Cloning {} parts", src_all_parts.size());
@ -7068,7 +7069,7 @@ CheckResults StorageReplicatedMergeTree::checkData(const ASTPtr & query, Context
if (const auto & check_query = query->as<ASTCheckQuery &>(); check_query.partition) if (const auto & check_query = query->as<ASTCheckQuery &>(); check_query.partition)
{ {
String partition_id = getPartitionIDFromQuery(check_query.partition, local_context); String partition_id = getPartitionIDFromQuery(check_query.partition, local_context);
data_parts = getDataPartsVectorInPartition(MergeTreeDataPartState::Committed, partition_id); data_parts = getVisibleDataPartsVectorInPartition(local_context, partition_id);
} }
else else
data_parts = getDataPartsVector(); data_parts = getDataPartsVector();

View File

@ -0,0 +1,6 @@
1 1
2 1
3 1
4 1
1
10 100

View File

@ -0,0 +1,113 @@
#!/usr/bin/env bash
# Tags: long
# shellcheck disable=SC2015
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
set -e
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS src";
$CLICKHOUSE_CLIENT --query "DROP TABLE IF EXISTS dst";
$CLICKHOUSE_CLIENT --query "CREATE TABLE src (n UInt64, type UInt8) ENGINE=MergeTree ORDER BY type SETTINGS old_parts_lifetime=0";
$CLICKHOUSE_CLIENT --query "CREATE TABLE dst (n UInt64, type UInt8) ENGINE=MergeTree ORDER BY type SETTINGS old_parts_lifetime=0";
function thread_insert()
{
set -e
trap "exit 0" INT
while true; do
action="ROLLBACK"
if (( RANDOM % 2 )); then
action="COMMIT"
fi
val=$RANDOM
$CLICKHOUSE_CLIENT --multiquery --query "
BEGIN TRANSACTION;
INSERT INTO src VALUES ($val, 1);
INSERT INTO src VALUES ($val, 2);
COMMIT;"
sleep 0.$RANDOM;
done
}
# NOTE
# ALTER PARTITION query stops merges,
# but serialization error is still possible if some merge was assigned (and committed) between BEGIN and ALTER.
function thread_partition_src_to_dst()
{
set -e
count=0
sum=0
for i in {1..20}; do
out=$(
$CLICKHOUSE_CLIENT --multiquery --query "
BEGIN TRANSACTION;
INSERT INTO src VALUES ($i, 3);
INSERT INTO dst SELECT * FROM src;
ALTER TABLE src DROP PARTITION ID 'all';
SELECT throwIf((SELECT (count(), sum(n)) FROM merge(currentDatabase(), '') WHERE type=3) != ($count + 1, $sum + $i)) FORMAT Null;
COMMIT;" 2>&1) ||:
echo "$out" | grep -Fv "SERIALIZATION_ERROR" | grep -F "Received from " && $CLICKHOUSE_CLIENT -q "SELECT _table, type, arraySort(groupArray(n)) FROM merge(currentDatabase(), '') GROUP BY _table, type ORDER BY _table, type" ||:
echo "$out" | grep -Fa "SERIALIZATION_ERROR" >/dev/null || count=$((count+1)) && sum=$((sum+i))
done
}
function thread_partition_dst_to_src()
{
set -e
for i in {1..20}; do
action="ROLLBACK"
if (( i % 2 )); then
action="COMMIT"
fi
$CLICKHOUSE_CLIENT --multiquery --query "
SYSTEM STOP MERGES dst;
BEGIN TRANSACTION;
INSERT INTO dst VALUES ($i, 4);
INSERT INTO src SELECT * FROM dst;
ALTER TABLE dst DROP PARTITION ID 'all';
SYSTEM START MERGES dst;
SELECT throwIf((SELECT (count(), sum(n)) FROM merge(currentDatabase(), '') WHERE type=4) != (toUInt8($i/2 + 1), (select sum(number) from numbers(1, $i) where number % 2 or number=$i))) FORMAT Null;
$action;" || $CLICKHOUSE_CLIENT -q "SELECT _table, type, arraySort(groupArray(n)) FROM merge(currentDatabase(), '') GROUP BY _table, type ORDER BY _table, type"
done
}
function thread_select()
{
set -e
trap "exit 0" INT
while true; do
$CLICKHOUSE_CLIENT --multiquery --query "
BEGIN TRANSACTION;
-- no duplicates
SELECT type, throwIf(count(n) != countDistinct(n)) FROM src GROUP BY type FORMAT Null;
SELECT type, throwIf(count(n) != countDistinct(n)) FROM dst GROUP BY type FORMAT Null;
-- rows inserted by thread_insert moved together
SELECT _table, throwIf(arraySort(groupArrayIf(n, type=1)) != arraySort(groupArrayIf(n, type=2))) FROM merge(currentDatabase(), '') GROUP BY _table FORMAT Null;
COMMIT;" || $CLICKHOUSE_CLIENT -q "SELECT _table, type, arraySort(groupArray(n)) FROM merge(currentDatabase(), '') GROUP BY _table, type ORDER BY _table, type"
done
}
thread_insert & PID_1=$!
thread_select & PID_2=$!
thread_partition_src_to_dst & PID_3=$!
thread_partition_dst_to_src & PID_4=$!
wait $PID_3 && wait $PID_4
kill -INT $PID_1
kill -INT $PID_2
wait
$CLICKHOUSE_CLIENT -q "SELECT type, count(n) = countDistinct(n) FROM merge(currentDatabase(), '') GROUP BY type ORDER BY type"
$CLICKHOUSE_CLIENT -q "SELECT DISTINCT arraySort(groupArrayIf(n, type=1)) = arraySort(groupArrayIf(n, type=2)) FROM merge(currentDatabase(), '') GROUP BY _table ORDER BY _table"
$CLICKHOUSE_CLIENT -q "SELECT count(n), sum(n) FROM merge(currentDatabase(), '') WHERE type=4"
$CLICKHOUSE_CLIENT --query "DROP TABLE src";
$CLICKHOUSE_CLIENT --query "DROP TABLE dst";

View File

@ -0,0 +1,30 @@
1 1
2 3
3 2
3 4
4 3
5 3
5 5
6 3
6 5
6 6
7 8
8 3
8 5
8 7
8 9
SERIALIZATION_ERROR
INVALID_TRANSACTION
9 8
10 8
11 8
11 11
11 12
12 8
12 8
12 11
12 12

View File

@ -0,0 +1,70 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
# shellcheck source=./transactions.lib
. "$CURDIR"/transactions.lib
$CLICKHOUSE_CLIENT -q "drop table if exists mt"
$CLICKHOUSE_CLIENT -q "create table mt (n int) engine=MergeTree order by n"
tx 1 "begin transaction"
tx 1 "insert into mt values (1)"
tx 2 "begin transaction"
tx 2 "insert into mt values (2)"
tx 1 "select 1, n from mt order by n"
tx 1 "alter table mt drop partition id 'all'"
tx 2 "insert into mt values (4)"
tx 1 "insert into mt values (3)"
tx 1 "select 2, n from mt order by n"
tx 2 "select 3, n from mt order by n"
tx 2 "alter table mt drop partition id 'all'"
tx 2 "insert into mt values (5)"
tx 1 "select 4, n from mt order by n"
tx 2 "commit"
tx 1 "commit"
echo ''
$CLICKHOUSE_CLIENT -q "select 5, n from mt order by n"
echo ''
tx 4 "begin transaction"
tx 4 "insert into mt values (6)"
tx 3 "begin transaction"
tx 3 "insert into mt values (7)"
tx 4 "select 6, n from mt order by n"
tx 4 "alter table mt drop partition id 'all'"
tx 3 "insert into mt values (9)"
tx 4 "insert into mt values (8)"
tx 4 "select 7, n from mt order by n"
tx 3 "select 8, n from mt order by n"
tx 3 "alter table mt drop partition id 'all'" | grep -Eo "SERIALIZATION_ERROR" | uniq
tx 3 "insert into mt values (10)" | grep -Eo "INVALID_TRANSACTION" | uniq
tx 4 "select 9, n from mt order by n"
tx 3 "rollback"
tx 4 "commit"
echo ''
$CLICKHOUSE_CLIENT -q "select 10, n from mt order by n"
echo ''
$CLICKHOUSE_CLIENT -q "drop table if exists another_mt"
$CLICKHOUSE_CLIENT -q "create table another_mt (n int) engine=MergeTree order by n"
tx 5 "begin transaction"
tx 5 "insert into another_mt values (11)"
tx 6 "begin transaction"
tx 6 "insert into mt values (12)"
tx 6 "insert into another_mt values (13)"
tx 5 "alter table another_mt move partition id 'all' to table mt"
tx 6 "alter table another_mt replace partition id 'all' from mt"
tx 5 "alter table another_mt attach partition id 'all' from mt"
tx 5 "commit"
tx 6 "commit"
$CLICKHOUSE_CLIENT -q "select 11, n from mt order by n"
$CLICKHOUSE_CLIENT -q "select 12, n from another_mt order by n"
$CLICKHOUSE_CLIENT -q "drop table another_mt"
$CLICKHOUSE_CLIENT -q "drop table mt"

View File

@ -22,6 +22,7 @@ $CLICKHOUSE_CLIENT --query "INSERT INTO src VALUES (0, 0)"
# some transactions will fail due to constraint # some transactions will fail due to constraint
function thread_insert_commit() function thread_insert_commit()
{ {
set -e
for i in {1..100}; do for i in {1..100}; do
$CLICKHOUSE_CLIENT --multiquery --query " $CLICKHOUSE_CLIENT --multiquery --query "
BEGIN TRANSACTION; BEGIN TRANSACTION;
@ -34,6 +35,7 @@ function thread_insert_commit()
function thread_insert_rollback() function thread_insert_rollback()
{ {
set -e
for _ in {1..100}; do for _ in {1..100}; do
$CLICKHOUSE_CLIENT --multiquery --query " $CLICKHOUSE_CLIENT --multiquery --query "
BEGIN TRANSACTION; BEGIN TRANSACTION;
@ -46,11 +48,17 @@ function thread_insert_rollback()
# make merges more aggressive # make merges more aggressive
function thread_optimize() function thread_optimize()
{ {
set -e
trap "exit 0" INT trap "exit 0" INT
while true; do while true; do
optimize_query="OPTIMIZE TABLE src" optimize_query="OPTIMIZE TABLE src"
partition_id=$(( RANDOM % 2 ))
if (( RANDOM % 2 )); then if (( RANDOM % 2 )); then
optimize_query="OPTIMIZE TABLE dst" optimize_query="OPTIMIZE TABLE dst"
partition_id="all"
fi
if (( RANDOM % 2 )); then
optimize_query="$optimize_query PARTITION ID '$partition_id'"
fi fi
if (( RANDOM % 2 )); then if (( RANDOM % 2 )); then
optimize_query="$optimize_query FINAL" optimize_query="$optimize_query FINAL"
@ -71,6 +79,7 @@ function thread_optimize()
function thread_select() function thread_select()
{ {
set -e
trap "exit 0" INT trap "exit 0" INT
while true; do while true; do
$CLICKHOUSE_CLIENT --multiquery --query " $CLICKHOUSE_CLIENT --multiquery --query "
@ -86,6 +95,7 @@ function thread_select()
function thread_select_insert() function thread_select_insert()
{ {
set -e
trap "exit 0" INT trap "exit 0" INT
while true; do while true; do
$CLICKHOUSE_CLIENT --multiquery --query " $CLICKHOUSE_CLIENT --multiquery --query "

View File

@ -0,0 +1,14 @@
#!/usr/bin/env bash
function tx()
{
tx_num=$1
query=$2
url_without_session="https://${CLICKHOUSE_HOST}:${CLICKHOUSE_PORT_HTTPS}/?"
session="${CLICKHOUSE_TEST_ZOOKEEPER_PREFIX}_$tx_num"
url="${url_without_session}session_id=$session&database=$CLICKHOUSE_DATABASE"
${CLICKHOUSE_CURL} -m 30 -sSk "$url" --data "$query"
}