mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-22 01:30:51 +00:00
parts cleanup, fixes
This commit is contained in:
parent
9915bd0a8b
commit
881889ef22
@ -247,7 +247,6 @@ function configure
|
||||
cp -a "$FASTTEST_SOURCE/programs/server/"{config,users}.xml "$FASTTEST_DATA"
|
||||
"$FASTTEST_SOURCE/tests/config/install.sh" "$FASTTEST_DATA" "$FASTTEST_DATA/client-config"
|
||||
cp -a "$FASTTEST_SOURCE/programs/server/config.d/log_to_console.xml" "$FASTTEST_DATA/config.d"
|
||||
cp -a "$FASTTEST_SOURCE/tests/config/config.d/transactions.xml" "$FASTTEST_DATA/config.d"
|
||||
# doesn't support SSL
|
||||
rm -f "$FASTTEST_DATA/config.d/secure_ports.xml"
|
||||
}
|
||||
|
@ -32,7 +32,7 @@ TIDHash TransactionID::getHash() const
|
||||
return hash.get64();
|
||||
}
|
||||
|
||||
/// It can be used fro introspection purposes only
|
||||
/// It can be used for introspection purposes only
|
||||
TransactionID VersionMetadata::getMaxTID() const
|
||||
{
|
||||
TIDHash max_lock = maxtid_lock.load();
|
||||
@ -40,6 +40,8 @@ TransactionID VersionMetadata::getMaxTID() const
|
||||
{
|
||||
if (auto txn = TransactionLog::instance().tryGetRunningTransaction(max_lock))
|
||||
return txn->tid;
|
||||
if (max_lock == Tx::PrehistoricTID.getHash())
|
||||
return Tx::PrehistoricTID;
|
||||
}
|
||||
|
||||
if (maxcsn.load(std::memory_order_relaxed))
|
||||
@ -53,6 +55,7 @@ TransactionID VersionMetadata::getMaxTID() const
|
||||
|
||||
void VersionMetadata::lockMaxTID(const TransactionID & tid, const String & error_context)
|
||||
{
|
||||
assert(tid);
|
||||
TIDHash max_lock_value = tid.getHash();
|
||||
TIDHash expected_max_lock_value = 0;
|
||||
bool locked = maxtid_lock.compare_exchange_strong(expected_max_lock_value, max_lock_value);
|
||||
@ -69,6 +72,7 @@ void VersionMetadata::lockMaxTID(const TransactionID & tid, const String & error
|
||||
|
||||
void VersionMetadata::unlockMaxTID(const TransactionID & tid)
|
||||
{
|
||||
assert(tid);
|
||||
TIDHash max_lock_value = tid.getHash();
|
||||
TIDHash locked_by = maxtid_lock.load();
|
||||
|
||||
@ -88,6 +92,11 @@ void VersionMetadata::unlockMaxTID(const TransactionID & tid)
|
||||
throw_cannot_unlock();
|
||||
}
|
||||
|
||||
bool VersionMetadata::isMaxTIDLocked() const
|
||||
{
|
||||
return maxtid_lock.load() != 0;
|
||||
}
|
||||
|
||||
void VersionMetadata::setMinTID(const TransactionID & tid)
|
||||
{
|
||||
/// TODO Transactions: initialize it in constructor on part creation and remove this method
|
||||
@ -98,7 +107,11 @@ void VersionMetadata::setMinTID(const TransactionID & tid)
|
||||
|
||||
bool VersionMetadata::isVisible(const MergeTreeTransaction & txn)
|
||||
{
|
||||
Snapshot snapshot_version = txn.getSnapshot();
|
||||
return isVisible(txn.getSnapshot(), txn.tid);
|
||||
}
|
||||
|
||||
bool VersionMetadata::isVisible(Snapshot snapshot_version, TransactionID current_tid)
|
||||
{
|
||||
assert(mintid);
|
||||
CSN min = mincsn.load(std::memory_order_relaxed);
|
||||
TIDHash max_lock = maxtid_lock.load(std::memory_order_relaxed);
|
||||
@ -120,7 +133,7 @@ bool VersionMetadata::isVisible(const MergeTreeTransaction & txn)
|
||||
return false;
|
||||
if (max && max <= snapshot_version)
|
||||
return false;
|
||||
if (max_lock && max_lock == txn.tid.getHash())
|
||||
if (current_tid && max_lock && max_lock == current_tid.getHash())
|
||||
return false;
|
||||
|
||||
/// Otherwise, part is definitely visible if:
|
||||
@ -131,7 +144,7 @@ bool VersionMetadata::isVisible(const MergeTreeTransaction & txn)
|
||||
return true;
|
||||
if (min && min <= snapshot_version && max && snapshot_version < max)
|
||||
return true;
|
||||
if (mintid == txn.tid)
|
||||
if (current_tid && mintid == current_tid)
|
||||
return true;
|
||||
|
||||
/// End of fast path.
|
||||
@ -140,7 +153,7 @@ bool VersionMetadata::isVisible(const MergeTreeTransaction & txn)
|
||||
/// It means that some transaction is creating/removing the part right now or has done it recently
|
||||
/// and we don't know if it was already committed ot not.
|
||||
assert(!had_mincsn || (had_maxtid && !had_maxcsn));
|
||||
assert(mintid != txn.tid && max_lock != txn.tid.getHash());
|
||||
assert(!current_tid || (mintid != current_tid && max_lock != current_tid.getHash()));
|
||||
|
||||
/// Before doing CSN lookup, let's check some extra conditions.
|
||||
/// If snapshot_version <= some_tid.start_csn, then changes of transaction with some_tid
|
||||
|
@ -71,6 +71,7 @@ struct VersionMetadata
|
||||
std::atomic<CSN> maxcsn = Tx::UnknownCSN;
|
||||
|
||||
bool isVisible(const MergeTreeTransaction & txn);
|
||||
bool isVisible(Snapshot snapshot_version, TransactionID current_tid = Tx::EmptyTID);
|
||||
|
||||
TransactionID getMinTID() const { return mintid; }
|
||||
TransactionID getMaxTID() const;
|
||||
@ -78,6 +79,8 @@ struct VersionMetadata
|
||||
void lockMaxTID(const TransactionID & tid, const String & error_context = {});
|
||||
void unlockMaxTID(const TransactionID & tid);
|
||||
|
||||
bool isMaxTIDLocked() const;
|
||||
|
||||
/// It can be called only from MergeTreeTransaction or on server startup
|
||||
void setMinTID(const TransactionID & tid);
|
||||
};
|
||||
|
@ -22,16 +22,16 @@ MergeTreeTransaction::State MergeTreeTransaction::getState() const
|
||||
return COMMITTED;
|
||||
}
|
||||
|
||||
void MergeTreeTransaction::addNewPart(const DataPartPtr & new_part, MergeTreeTransaction * txn)
|
||||
void MergeTreeTransaction::addNewPart(const StoragePtr & storage, const DataPartPtr & new_part, MergeTreeTransaction * txn)
|
||||
{
|
||||
TransactionID tid = txn ? txn->tid : Tx::PrehistoricTID;
|
||||
|
||||
new_part->versions.setMinTID(tid);
|
||||
if (txn)
|
||||
txn->addNewPart(new_part);
|
||||
txn->addNewPart(storage, new_part);
|
||||
}
|
||||
|
||||
void MergeTreeTransaction::removeOldPart(const DataPartPtr & part_to_remove, MergeTreeTransaction * txn)
|
||||
void MergeTreeTransaction::removeOldPart(const StoragePtr & storage, const DataPartPtr & part_to_remove, MergeTreeTransaction * txn)
|
||||
{
|
||||
TransactionID tid = txn ? txn->tid : Tx::PrehistoricTID;
|
||||
String error_context = fmt::format("Table: {}, part name: {}",
|
||||
@ -39,16 +39,16 @@ void MergeTreeTransaction::removeOldPart(const DataPartPtr & part_to_remove, Mer
|
||||
part_to_remove->name);
|
||||
part_to_remove->versions.lockMaxTID(tid, error_context);
|
||||
if (txn)
|
||||
txn->removeOldPart(part_to_remove);
|
||||
txn->removeOldPart(storage, part_to_remove);
|
||||
}
|
||||
|
||||
void MergeTreeTransaction::addNewPartAndRemoveCovered(const DataPartPtr & new_part, const DataPartsVector & covered_parts, MergeTreeTransaction * txn)
|
||||
void MergeTreeTransaction::addNewPartAndRemoveCovered(const StoragePtr & storage, const DataPartPtr & new_part, const DataPartsVector & covered_parts, MergeTreeTransaction * txn)
|
||||
{
|
||||
TransactionID tid = txn ? txn->tid : Tx::PrehistoricTID;
|
||||
|
||||
new_part->versions.setMinTID(tid);
|
||||
if (txn)
|
||||
txn->addNewPart(new_part);
|
||||
txn->addNewPart(storage, new_part);
|
||||
|
||||
String error_context = fmt::format("Table: {}, covering part name: {}",
|
||||
new_part->storage.getStorageID().getNameForLogs(),
|
||||
@ -58,19 +58,21 @@ void MergeTreeTransaction::addNewPartAndRemoveCovered(const DataPartPtr & new_pa
|
||||
{
|
||||
covered->versions.lockMaxTID(tid, fmt::format(error_context, covered->name));
|
||||
if (txn)
|
||||
txn->removeOldPart(covered);
|
||||
txn->removeOldPart(storage, covered);
|
||||
}
|
||||
}
|
||||
|
||||
void MergeTreeTransaction::addNewPart(const DataPartPtr & new_part)
|
||||
void MergeTreeTransaction::addNewPart(const StoragePtr & storage, const DataPartPtr & new_part)
|
||||
{
|
||||
assert(csn == Tx::UnknownCSN);
|
||||
storages.insert(storage);
|
||||
creating_parts.push_back(new_part);
|
||||
}
|
||||
|
||||
void MergeTreeTransaction::removeOldPart(const DataPartPtr & part_to_remove)
|
||||
void MergeTreeTransaction::removeOldPart(const StoragePtr & storage, const DataPartPtr & part_to_remove)
|
||||
{
|
||||
assert(csn == Tx::UnknownCSN);
|
||||
storages.insert(storage);
|
||||
removing_parts.push_back(part_to_remove);
|
||||
}
|
||||
|
||||
@ -100,8 +102,19 @@ void MergeTreeTransaction::rollback() noexcept
|
||||
csn = Tx::RolledBackCSN;
|
||||
for (const auto & part : creating_parts)
|
||||
part->versions.mincsn.store(Tx::RolledBackCSN);
|
||||
|
||||
for (const auto & part : removing_parts)
|
||||
part->versions.unlockMaxTID(tid);
|
||||
|
||||
/// FIXME const_cast
|
||||
for (const auto & part : creating_parts)
|
||||
const_cast<MergeTreeData &>(part->storage).removePartsFromWorkingSet({part}, true);
|
||||
|
||||
for (const auto & part : removing_parts)
|
||||
if (part->versions.getMinTID() != tid)
|
||||
const_cast<MergeTreeData &>(part->storage).restoreAndActivatePart(part);
|
||||
|
||||
/// FIXME seems like session holds shared_ptr to Transaction and transaction holds shared_ptr to parts preventing cleanup
|
||||
}
|
||||
|
||||
void MergeTreeTransaction::onException()
|
||||
|
@ -1,6 +1,10 @@
|
||||
#pragma once
|
||||
#include <Common/TransactionMetadata.h>
|
||||
#include <boost/noncopyable.hpp>
|
||||
#include <Storages/IStorage_fwd.h>
|
||||
|
||||
#include <list>
|
||||
#include <unordered_set>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -28,12 +32,12 @@ public:
|
||||
MergeTreeTransaction() = delete;
|
||||
MergeTreeTransaction(Snapshot snapshot_, LocalTID local_tid_, UUID host_id);
|
||||
|
||||
void addNewPart(const DataPartPtr & new_part);
|
||||
void removeOldPart(const DataPartPtr & part_to_remove);
|
||||
void addNewPart(const StoragePtr & storage, const DataPartPtr & new_part);
|
||||
void removeOldPart(const StoragePtr & storage, const DataPartPtr & part_to_remove);
|
||||
|
||||
static void addNewPart(const DataPartPtr & new_part, MergeTreeTransaction * txn);
|
||||
static void removeOldPart(const DataPartPtr & part_to_remove, MergeTreeTransaction * txn);
|
||||
static void addNewPartAndRemoveCovered(const DataPartPtr & new_part, const DataPartsVector & covered_parts, MergeTreeTransaction * txn);
|
||||
static void addNewPart(const StoragePtr & storage, const DataPartPtr & new_part, MergeTreeTransaction * txn);
|
||||
static void removeOldPart(const StoragePtr & storage, const DataPartPtr & part_to_remove, MergeTreeTransaction * txn);
|
||||
static void addNewPartAndRemoveCovered(const StoragePtr & storage, const DataPartPtr & new_part, const DataPartsVector & covered_parts, MergeTreeTransaction * txn);
|
||||
|
||||
bool isReadOnly() const;
|
||||
|
||||
@ -48,10 +52,14 @@ private:
|
||||
|
||||
Snapshot snapshot;
|
||||
|
||||
std::unordered_set<StoragePtr> storages;
|
||||
DataPartsVector creating_parts;
|
||||
DataPartsVector removing_parts;
|
||||
|
||||
CSN csn;
|
||||
|
||||
/// FIXME it's ugly
|
||||
std::list<Snapshot>::iterator snapshot_in_use_it;
|
||||
};
|
||||
|
||||
using MergeTreeTransactionPtr = std::shared_ptr<MergeTreeTransaction>;
|
||||
|
@ -32,14 +32,16 @@ Snapshot TransactionLog::getLatestSnapshot() const
|
||||
|
||||
MergeTreeTransactionPtr TransactionLog::beginTransaction()
|
||||
{
|
||||
Snapshot snapshot = latest_snapshot.load();
|
||||
LocalTID ltid = 1 + local_tid_counter.fetch_add(1);
|
||||
auto txn = std::make_shared<MergeTreeTransaction>(snapshot, ltid, UUIDHelpers::Nil);
|
||||
MergeTreeTransactionPtr txn;
|
||||
{
|
||||
std::lock_guard lock{running_list_mutex};
|
||||
bool inserted = running_list.try_emplace(txn->tid.getHash(), txn).second; /// Commit point
|
||||
Snapshot snapshot = latest_snapshot.load();
|
||||
LocalTID ltid = 1 + local_tid_counter.fetch_add(1);
|
||||
txn = std::make_shared<MergeTreeTransaction>(snapshot, ltid, UUIDHelpers::Nil);
|
||||
bool inserted = running_list.try_emplace(txn->tid.getHash(), txn).second;
|
||||
if (!inserted)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "I's a bug: TID {} {} exists", txn->tid.getHash(), txn->tid);
|
||||
txn->snapshot_in_use_it = snapshots_in_use.insert(snapshots_in_use.end(), snapshot);
|
||||
}
|
||||
LOG_TRACE(log, "Beginning transaction {}", txn->tid);
|
||||
return txn;
|
||||
@ -76,6 +78,7 @@ CSN TransactionLog::commitTransaction(const MergeTreeTransactionPtr & txn)
|
||||
bool removed = running_list.erase(txn->tid.getHash());
|
||||
if (!removed)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "I's a bug: TID {} {} doesn't exist", txn->tid.getHash(), txn->tid);
|
||||
snapshots_in_use.erase(txn->snapshot_in_use_it);
|
||||
}
|
||||
return new_csn;
|
||||
}
|
||||
@ -89,6 +92,7 @@ void TransactionLog::rollbackTransaction(const MergeTreeTransactionPtr & txn) no
|
||||
bool removed = running_list.erase(txn->tid.getHash());
|
||||
if (!removed)
|
||||
abort();
|
||||
snapshots_in_use.erase(txn->snapshot_in_use_it);
|
||||
}
|
||||
}
|
||||
|
||||
@ -120,4 +124,12 @@ CSN TransactionLog::getCSN(const TIDHash & tid) const
|
||||
return it->second;
|
||||
}
|
||||
|
||||
Snapshot TransactionLog::getOldestSnapshot() const
|
||||
{
|
||||
std::lock_guard lock{running_list_mutex};
|
||||
if (snapshots_in_use.empty())
|
||||
return getLatestSnapshot();
|
||||
return snapshots_in_use.front();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -29,6 +29,8 @@ public:
|
||||
|
||||
MergeTreeTransactionPtr tryGetRunningTransaction(const TIDHash & tid);
|
||||
|
||||
Snapshot getOldestSnapshot() const;
|
||||
|
||||
private:
|
||||
Poco::Logger * log;
|
||||
|
||||
@ -42,6 +44,7 @@ private:
|
||||
|
||||
mutable std::mutex running_list_mutex;
|
||||
std::unordered_map<TIDHash, MergeTreeTransactionPtr> running_list;
|
||||
std::list<Snapshot> snapshots_in_use;
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -23,6 +23,7 @@
|
||||
#include <Interpreters/MergeTreeTransaction.h>
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/InterpreterSelectQuery.h>
|
||||
#include <Interpreters/TransactionLog.h>
|
||||
#include <Parsers/ASTFunction.h>
|
||||
#include <Parsers/ASTLiteral.h>
|
||||
#include <Parsers/ASTNameTypePair.h>
|
||||
@ -1208,6 +1209,10 @@ MergeTreeData::DataPartsVector MergeTreeData::grabOldParts(bool force)
|
||||
{
|
||||
const DataPartPtr & part = *it;
|
||||
|
||||
/// Do not remove outdated part if it may be visible for some transaction
|
||||
if (part->versions.isVisible(TransactionLog::instance().getOldestSnapshot()))
|
||||
continue;
|
||||
|
||||
auto part_remove_time = part->remove_time.load(std::memory_order_relaxed);
|
||||
|
||||
if (part.unique() && /// Grab only parts that are not used by anyone (SELECTs for example).
|
||||
@ -2157,7 +2162,6 @@ bool MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr & part, MergeTreeTra
|
||||
DataPartsVector covered_parts;
|
||||
{
|
||||
auto lock = lockParts();
|
||||
if (!renameTempPartAndReplace(part, txn, increment, out_transaction, lock, &covered_parts))
|
||||
if (!renameTempPartAndReplace(part, txn, increment, out_transaction, lock, &covered_parts, deduplication_log))
|
||||
return false;
|
||||
}
|
||||
@ -2225,10 +2229,6 @@ bool MergeTreeData::renameTempPartAndReplace(
|
||||
return false;
|
||||
}
|
||||
|
||||
/// FIXME Transactions: it's not the best place for checking and setting maxtid,
|
||||
/// because it's too optimistic. We should lock maxtid of covered parts at the beginning of operation.
|
||||
MergeTreeTransaction::addNewPartAndRemoveCovered(part, covered_parts, txn);
|
||||
|
||||
/// Deduplication log used only from non-replicated MergeTree. Replicated
|
||||
/// tables have their own mechanism. We try to deduplicate at such deep
|
||||
/// level, because only here we know real part name which is required for
|
||||
@ -2255,6 +2255,9 @@ bool MergeTreeData::renameTempPartAndReplace(
|
||||
part->setState(DataPartState::PreCommitted);
|
||||
part->renameTo(part_name, true);
|
||||
|
||||
/// FIXME Transactions: it's not the best place for checking and setting maxtid,
|
||||
/// because it's too optimistic. We should lock maxtid of covered parts at the beginning of operation.
|
||||
MergeTreeTransaction::addNewPartAndRemoveCovered(shared_from_this(), part, covered_parts, txn);
|
||||
auto part_it = data_parts_indexes.insert(part).first;
|
||||
|
||||
if (out_transaction)
|
||||
@ -2349,6 +2352,7 @@ void MergeTreeData::removePartsFromWorkingSetImmediatelyAndSetTemporaryState(con
|
||||
if (it_part == data_parts_by_info.end())
|
||||
throw Exception("Part " + part->getNameWithState() + " not found in data_parts", ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
assert(part->getState() == IMergeTreeDataPart::State::PreCommitted);
|
||||
modifyPartState(part, IMergeTreeDataPart::State::Temporary);
|
||||
/// Erase immediately
|
||||
data_parts_indexes.erase(it_part);
|
||||
@ -2417,6 +2421,15 @@ MergeTreeData::DataPartsVector MergeTreeData::removePartsInRangeFromWorkingSet(c
|
||||
return parts_to_remove;
|
||||
}
|
||||
|
||||
void MergeTreeData::restoreAndActivatePart(const DataPartPtr & part, DataPartsLock * acquired_lock)
|
||||
{
|
||||
auto lock = (acquired_lock) ? DataPartsLock() : lockParts();
|
||||
assert(part->getState() != DataPartState::Committed);
|
||||
addPartContributionToColumnSizes(part);
|
||||
addPartContributionToDataVolume(part);
|
||||
modifyPartState(part, DataPartState::Committed);
|
||||
}
|
||||
|
||||
void MergeTreeData::forgetPartAndMoveToDetached(const MergeTreeData::DataPartPtr & part_to_detach, const String & prefix, bool
|
||||
restore_covered)
|
||||
{
|
||||
@ -3762,6 +3775,18 @@ void MergeTreeData::Transaction::rollback()
|
||||
buf << ".";
|
||||
LOG_DEBUG(data.log, "Undoing transaction.{}", buf.str());
|
||||
|
||||
if (!txn)
|
||||
{
|
||||
auto lock = data.lockParts();
|
||||
for (const auto & part : precommitted_parts)
|
||||
{
|
||||
DataPartPtr covering_part;
|
||||
DataPartsVector covered_parts = data.getActivePartsToReplace(part->info, part->name, covering_part, lock);
|
||||
for (auto & covered : covered_parts)
|
||||
covered->versions.unlockMaxTID(Tx::PrehistoricTID);
|
||||
}
|
||||
}
|
||||
|
||||
data.removePartsFromWorkingSet(
|
||||
DataPartsVector(precommitted_parts.begin(), precommitted_parts.end()),
|
||||
/* clear_without_timeout = */ true);
|
||||
|
@ -239,7 +239,7 @@ public:
|
||||
class Transaction : private boost::noncopyable
|
||||
{
|
||||
public:
|
||||
Transaction(MergeTreeData & data_) : data(data_) {}
|
||||
Transaction(MergeTreeData & data_, MergeTreeTransaction * txn_) : data(data_), txn(txn_) {}
|
||||
|
||||
DataPartsVector commit(MergeTreeData::DataPartsLock * acquired_parts_lock = nullptr);
|
||||
|
||||
@ -268,6 +268,7 @@ public:
|
||||
friend class MergeTreeData;
|
||||
|
||||
MergeTreeData & data;
|
||||
MergeTreeTransaction * txn;
|
||||
DataParts precommitted_parts;
|
||||
|
||||
void clear() { precommitted_parts.clear(); }
|
||||
@ -503,6 +504,9 @@ public:
|
||||
DataPartsVector removePartsInRangeFromWorkingSet(const MergeTreePartInfo & drop_range, bool clear_without_timeout,
|
||||
DataPartsLock & lock);
|
||||
|
||||
/// Restores Outdated part and adds it to working set
|
||||
void restoreAndActivatePart(const DataPartPtr & part, DataPartsLock * acquired_lock = nullptr);
|
||||
|
||||
/// Renames the part to detached/<prefix>_<part> and removes it from data_parts,
|
||||
//// so it will not be deleted in clearOldParts.
|
||||
/// If restore_covered is true, adds to the working set inactive parts, which were merged into the deleted part.
|
||||
|
@ -382,7 +382,7 @@ void ReplicatedMergeTreeBlockOutputStream::commitPart(
|
||||
/// Information about the part.
|
||||
storage.getCommitPartOps(ops, part, block_id_path);
|
||||
|
||||
MergeTreeData::Transaction transaction(storage); /// If you can not add a part to ZK, we'll remove it back from the working set.
|
||||
MergeTreeData::Transaction transaction(storage, nullptr); /// If you can not add a part to ZK, we'll remove it back from the working set.
|
||||
bool renamed = false;
|
||||
try
|
||||
{
|
||||
|
@ -1418,7 +1418,7 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con
|
||||
{
|
||||
/// Here we use the transaction just like RAII since rare errors in renameTempPartAndReplace() are possible
|
||||
/// and we should be able to rollback already added (Precomitted) parts
|
||||
Transaction transaction(*this);
|
||||
Transaction transaction(*this, local_context->getCurrentTransaction().get());
|
||||
|
||||
auto data_parts_lock = lockParts();
|
||||
|
||||
@ -1491,7 +1491,7 @@ void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const
|
||||
try
|
||||
{
|
||||
{
|
||||
Transaction transaction(*dest_table_storage);
|
||||
Transaction transaction(*dest_table_storage, local_context->getCurrentTransaction().get());
|
||||
|
||||
auto src_data_parts_lock = lockParts();
|
||||
auto dest_data_parts_lock = dest_table_storage->lockParts();
|
||||
|
@ -1499,7 +1499,7 @@ bool StorageReplicatedMergeTree::executeLogEntry(LogEntry & entry)
|
||||
{
|
||||
LOG_TRACE(log, "Found valid part to attach from local data, preparing the transaction");
|
||||
|
||||
Transaction transaction(*this);
|
||||
Transaction transaction(*this, nullptr);
|
||||
|
||||
renameTempPartAndReplace(part, nullptr, nullptr, &transaction);
|
||||
checkPartChecksumsAndCommit(transaction, part);
|
||||
@ -1726,7 +1726,7 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
|
||||
/// Add merge to list
|
||||
MergeList::EntryPtr merge_entry = getContext()->getMergeList().insert(table_id.database_name, table_id.table_name, future_merged_part);
|
||||
|
||||
Transaction transaction(*this);
|
||||
Transaction transaction(*this, nullptr);
|
||||
MutableDataPartPtr part;
|
||||
|
||||
Stopwatch stopwatch;
|
||||
@ -1859,7 +1859,7 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM
|
||||
StorageMetadataPtr metadata_snapshot = getInMemoryMetadataPtr();
|
||||
|
||||
MutableDataPartPtr new_part;
|
||||
Transaction transaction(*this);
|
||||
Transaction transaction(*this, nullptr);
|
||||
|
||||
FutureMergedMutatedPart future_mutated_part;
|
||||
future_mutated_part.name = entry.new_part_name;
|
||||
@ -2530,7 +2530,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
|
||||
{
|
||||
/// Commit parts
|
||||
auto zookeeper = getZooKeeper();
|
||||
Transaction transaction(*this);
|
||||
Transaction transaction(*this, nullptr);
|
||||
|
||||
Coordination::Requests ops;
|
||||
for (PartDescriptionPtr & part_desc : final_parts)
|
||||
@ -4044,7 +4044,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora
|
||||
|
||||
if (!to_detached)
|
||||
{
|
||||
Transaction transaction(*this);
|
||||
Transaction transaction(*this, nullptr);
|
||||
renameTempPartAndReplace(part, nullptr, nullptr, &transaction);
|
||||
|
||||
replaced_parts = checkPartChecksumsAndCommit(transaction, part);
|
||||
@ -6465,7 +6465,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
|
||||
ops.emplace_back(zkutil::makeSetRequest(fs::path(zookeeper_path) / "log", "", -1));
|
||||
ops.emplace_back(zkutil::makeCreateRequest(fs::path(zookeeper_path) / "log/log-", entry.toString(), zkutil::CreateMode::PersistentSequential));
|
||||
|
||||
Transaction transaction(*this);
|
||||
Transaction transaction(*this, nullptr);
|
||||
{
|
||||
auto data_parts_lock = lockParts();
|
||||
|
||||
@ -6655,7 +6655,7 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
|
||||
entry.toString(), zkutil::CreateMode::PersistentSequential));
|
||||
|
||||
{
|
||||
Transaction transaction(*dest_table_storage);
|
||||
Transaction transaction(*dest_table_storage, nullptr);
|
||||
|
||||
auto src_data_parts_lock = lockParts();
|
||||
auto dest_data_parts_lock = dest_table_storage->lockParts();
|
||||
|
@ -1,2 +1,4 @@
|
||||
275 0 138 136 0
|
||||
275 0
|
||||
275 0 138 136 0
|
||||
275 0
|
||||
|
@ -54,7 +54,7 @@ function thread_optimize()
|
||||
optimize_query="$optimize_query FINAL"
|
||||
fi
|
||||
action="COMMIT"
|
||||
if (( RANDOM % 2 )); then
|
||||
if (( RANDOM % 4 )); then
|
||||
action="ROLLBACK"
|
||||
fi
|
||||
|
||||
@ -62,7 +62,7 @@ function thread_optimize()
|
||||
BEGIN TRANSACTION;
|
||||
$optimize_query;
|
||||
$action;
|
||||
"
|
||||
" 2>&1| grep -Fv "already exists, but it will be deleted soon" | grep -F "Received from " ||:
|
||||
sleep 0.$RANDOM;
|
||||
done
|
||||
}
|
||||
@ -126,6 +126,9 @@ BEGIN TRANSACTION;
|
||||
SELECT count(), sum(n), sum(m=1), sum(m=2), sum(m=3) FROM src;
|
||||
SELECT count(), sum(nm) FROM mv";
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "SELECT count(), sum(n), sum(m=1), sum(m=2), sum(m=3) FROM src"
|
||||
$CLICKHOUSE_CLIENT --query "SELECT count(), sum(nm) FROM mv"
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "DROP TABLE src";
|
||||
$CLICKHOUSE_CLIENT --query "DROP TABLE dst";
|
||||
$CLICKHOUSE_CLIENT --query "DROP TABLE mv";
|
||||
|
@ -1 +1,2 @@
|
||||
200 0 100 100 0
|
||||
200 0 100 100 0
|
||||
|
@ -58,4 +58,6 @@ $CLICKHOUSE_CLIENT --multiquery --query "
|
||||
BEGIN TRANSACTION;
|
||||
SELECT count(), sum(n), sum(m=1), sum(m=2), sum(m=3) FROM mt;";
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "SELECT count(), sum(n), sum(m=1), sum(m=2), sum(m=3) FROM mt;"
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "DROP TABLE mt";
|
||||
|
Loading…
Reference in New Issue
Block a user