write part version before other files

This commit is contained in:
Alexander Tokmakov 2022-02-14 22:50:08 +03:00
parent cbd3b45646
commit ae5aa8c12d
28 changed files with 212 additions and 87 deletions

View File

@ -2,6 +2,7 @@
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Interpreters/TransactionLog.h>
#include <Interpreters/TransactionsInfoLog.h>
namespace DB
{
@ -33,7 +34,10 @@ void MergeTreeTransaction::addNewPart(const StoragePtr & storage, const DataPart
{
TransactionID tid = txn ? txn->tid : Tx::PrehistoricTID;
new_part->version.setCreationTID(tid, TransactionInfoContext{storage->getStorageID(), new_part->name});
/// Now we know actual part name and can write it to system log table.
tryWriteEventToSystemLog(new_part->version.log, TransactionsInfoLogElement::ADD_PART, tid, TransactionInfoContext{storage->getStorageID(), new_part->name});
new_part->assertHasVersionMetadata(txn);
if (txn)
txn->addNewPart(storage, new_part);
}
@ -52,7 +56,9 @@ void MergeTreeTransaction::addNewPartAndRemoveCovered(const StoragePtr & storage
TransactionID tid = txn ? txn->tid : Tx::PrehistoricTID;
TransactionInfoContext context{storage->getStorageID(), new_part->name};
new_part->version.setCreationTID(tid, context);
tryWriteEventToSystemLog(new_part->version.log, TransactionsInfoLogElement::ADD_PART, tid, context);
new_part->assertHasVersionMetadata(txn);
if (txn)
txn->addNewPart(storage, new_part);
@ -94,12 +100,14 @@ void MergeTreeTransaction::removeOldPart(const StoragePtr & storage, const DataP
void MergeTreeTransaction::addMutation(const StoragePtr & table, const String & mutation_id)
{
storages.insert(table);
mutations.emplace_back(table, mutation_id);
}
bool MergeTreeTransaction::isReadOnly() const
{
return creating_parts.empty() && removing_parts.empty();
assert((creating_parts.empty() && removing_parts.empty() && mutations.empty()) == storages.empty());
return storages.empty();
}
void MergeTreeTransaction::beforeCommit()
@ -169,20 +177,45 @@ void MergeTreeTransaction::onException()
String MergeTreeTransaction::dumpDescription() const
{
String res = "\ncreating parts:\n";
for (const auto & part : creating_parts)
String res = fmt::format("{} state: {}, snapshot: {}", tid, getState(), snapshot);
if (isReadOnly())
{
res += part->name;
res += "\n";
res += ", readonly";
return res;
}
res += "removing parts:\n";
res += fmt::format(", affects {} tables:", storages.size());
using ChangesInTable = std::tuple<Strings, Strings, Strings>;
std::unordered_map<const IStorage *, ChangesInTable> storage_to_changes;
for (const auto & part : creating_parts)
std::get<0>(storage_to_changes[&(part->storage)]).push_back(part->name);
for (const auto & part : removing_parts)
{
res += part->name;
res += fmt::format(" (created by {}, {})\n", part->version.getCreationTID(), part->version.creation_csn);
String info = fmt::format("{} (created by {}, {})", part->name, part->version.getCreationTID(), part->version.creation_csn);
std::get<1>(storage_to_changes[&(part->storage)]).push_back(std::move(info));
assert(!part->version.creation_csn || part->version.creation_csn <= snapshot);
assert(!part->version.removal_csn);
}
for (const auto & mutation : mutations)
std::get<2>(storage_to_changes[mutation.first.get()]).push_back(mutation.second);
for (const auto & storage_changes : storage_to_changes)
{
res += fmt::format("\n\t{}:", storage_changes.first->getStorageID().getNameForLogs());
const auto & creating_info = std::get<0>(storage_changes.second);
const auto & removing_info = std::get<1>(storage_changes.second);
const auto & mutations_info = std::get<2>(storage_changes.second);
if (!creating_info.empty())
res += fmt::format("\n\t\tcreating parts:\n\t\t\t{}", fmt::join(creating_info, "\n\t\t\t"));
if (!removing_info.empty())
res += fmt::format("\n\t\tremoving parts:\n\t\t\t{}", fmt::join(removing_info, "\n\t\t\t"));
if (!mutations_info.empty())
res += fmt::format("\n\t\tmutations:\n\t\t\t{}", fmt::join(mutations_info, "\n\t\t\t"));
}
return res;

View File

@ -20,26 +20,6 @@ namespace ErrorCodes
extern const int CANNOT_PARSE_TEXT;
}
static void tryWriteEventToSystemLog(Poco::Logger * log,
TransactionsInfoLogElement::Type type, const TransactionID & tid,
const TransactionInfoContext & context)
try
{
auto system_log = Context::getGlobalContextInstance()->getTransactionsInfoLog();
if (!system_log)
return;
TransactionsInfoLogElement elem;
elem.type = type;
elem.tid = tid;
elem.fillCommonFields(&context);
system_log->add(elem);
}
catch (...)
{
tryLogCurrentException(log);
}
VersionMetadata::VersionMetadata()
{
/// It would be better to make it static, but static loggers do not work for some reason (initialization order?)
@ -143,12 +123,13 @@ bool VersionMetadata::isMaxTIDLocked() const
return removal_tid_lock.load() != 0;
}
void VersionMetadata::setCreationTID(const TransactionID & tid, const TransactionInfoContext & context)
void VersionMetadata::setCreationTID(const TransactionID & tid, TransactionInfoContext * context)
{
/// NOTE ReplicatedMergeTreeBlockOutputStream may add one part multiple times
assert(creation_tid.isEmpty() || creation_tid == tid);
creation_tid = tid;
tryWriteEventToSystemLog(log, TransactionsInfoLogElement::ADD_PART, tid, context);
if (context)
tryWriteEventToSystemLog(log, TransactionsInfoLogElement::ADD_PART, tid, *context);
}
bool VersionMetadata::isVisible(const MergeTreeTransaction & txn)

View File

@ -43,7 +43,7 @@ struct VersionMetadata
bool isMaxTIDLocked() const;
/// It can be called only from MergeTreeTransaction or on server startup
void setCreationTID(const TransactionID & tid, const TransactionInfoContext & context);
void setCreationTID(const TransactionID & tid, TransactionInfoContext * context);
/// Checks if it's safe to remove outdated version of an object
bool canBeRemoved();

View File

@ -1,5 +1,6 @@
#include <Interpreters/TransactionsInfoLog.h>
#include <Interpreters/TransactionVersionMetadata.h>
#include <Interpreters/Context.h>
#include <Common/TransactionID.h>
#include <Common/CurrentThread.h>
#include <Core/NamesAndTypes.h>
@ -86,4 +87,25 @@ void TransactionsInfoLogElement::appendToBlock(MutableColumns & columns) const
columns[i++]->insert(part_name);
}
void tryWriteEventToSystemLog(Poco::Logger * log,
TransactionsInfoLogElement::Type type, const TransactionID & tid,
const TransactionInfoContext & context)
try
{
auto system_log = Context::getGlobalContextInstance()->getTransactionsInfoLog();
if (!system_log)
return;
TransactionsInfoLogElement elem;
elem.type = type;
elem.tid = tid;
elem.fillCommonFields(&context);
system_log->add(elem);
}
catch (...)
{
tryLogCurrentException(log);
}
}

View File

@ -51,4 +51,8 @@ class TransactionsInfoLog : public SystemLog<TransactionsInfoLogElement>
using SystemLog<TransactionsInfoLogElement>::SystemLog;
};
void tryWriteEventToSystemLog(Poco::Logger * log, TransactionsInfoLogElement::Type type,
const TransactionID & tid, const TransactionInfoContext & context);
}

View File

@ -592,7 +592,8 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
metadata_snapshot->projections.get(projection_name).metadata,
block.getNamesAndTypesList(),
{},
CompressionCodecFactory::instance().get("NONE", {}));
CompressionCodecFactory::instance().get("NONE", {}),
nullptr);
part_out.write(block);
part_out.finalizePart(new_projection_part, false);
@ -616,7 +617,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
MergedBlockOutputStream part_out(
new_data_part, metadata_snapshot, block.getNamesAndTypesList(), {},
CompressionCodecFactory::instance().get("NONE", {}));
CompressionCodecFactory::instance().get("NONE", {}), nullptr);
part_out.write(block);
part_out.finalizePart(new_data_part, false);

View File

@ -25,6 +25,7 @@
#include <Parsers/ExpressionElementParsers.h>
#include <DataTypes/NestedUtils.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <Interpreters/MergeTreeTransaction.h>
namespace CurrentMetrics
@ -1103,13 +1104,31 @@ void IMergeTreeDataPart::loadColumns(bool require)
setSerializationInfos(infos);
}
void IMergeTreeDataPart::assertHasVersionMetadata(MergeTreeTransaction * txn) const
{
assert(storage.supportsTransactions());
TransactionID expected_tid = txn ? txn->tid : Tx::PrehistoricTID;
if (version.creation_tid != expected_tid)
throw Exception(ErrorCodes::LOGICAL_ERROR,
"CreationTID of part {} (table {}) is set to unexpected value {}, it's a bug. Current transaction: {}",
name, storage.getStorageID().getNameForLogs(), version.creation_tid, txn ? txn->dumpDescription() : "<none>");
assert(!txn || volume->getDisk()->exists(fs::path(getFullRelativePath()) / TXN_VERSION_METADATA_FILE_NAME));
}
void IMergeTreeDataPart::storeVersionMetadata() const
{
assert(storage.supportsTransactions());
assert(!version.creation_tid.isEmpty());
if (version.creation_tid.isPrehistoric() && (version.removal_tid.isEmpty() || version.removal_tid.isPrehistoric()))
return;
LOG_TEST(storage.log, "Writing version for {} (creation: {}, removal {})", name, version.creation_tid, version.removal_tid);
if (!isStoredOnDisk())
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Transactions are not supported for in-memory parts (table: {}, part: {})",
storage.getStorageID().getNameForLogs(), name);
String version_file_name = fs::path(getFullRelativePath()) / TXN_VERSION_METADATA_FILE_NAME;
String tmp_version_file_name = version_file_name + ".tmp";
DiskPtr disk = volume->getDisk();
@ -1121,6 +1140,9 @@ void IMergeTreeDataPart::storeVersionMetadata() const
auto out = disk->writeFile(tmp_version_file_name, 256, WriteMode::Rewrite);
version.write(*out);
out->finalize();
/// TODO Small enough appends to file are usually atomic,
/// maybe we should append new metadata instead of rewriting file to reduce number of fsyncs.
/// (especially, it will allow to remove fsync after writing CSN after commit).
out->sync();
}
@ -1160,21 +1182,18 @@ try
/// Four (?) cases are possible:
/// 1. Part was created without transactions.
/// 2. Version metadata file was not renamed from *.tmp on part creation.
/// 3. Version metadata were written to *.tmp file, but hard restart happened before fsync
/// We must remove part in this case, but we cannot distinguish it from the first case.
/// TODO Write something to some checksummed file if part was created with transaction,
/// so part will be ether broken or known to be created by transaction.
/// 3. Version metadata were written to *.tmp file, but hard restart happened before fsync.
/// 4. Fsyncs in storeVersionMetadata() work incorrectly.
TransactionInfoContext txn_context{storage.getStorageID(), name};
if (!disk->exists(tmp_version_file_name))
{
/// Case 1 (or 3).
/// Case 1.
/// We do not have version metadata and transactions history for old parts,
/// so let's consider that such parts were created by some ancient transaction
/// and were committed with some prehistoric CSN.
version.setCreationTID(Tx::PrehistoricTID, txn_context);
/// NOTE It might be Case 3, but version metadata file is written on part creation before other files,
/// so it's not Case 3 if part is not broken.
version.setCreationTID(Tx::PrehistoricTID, nullptr);
version.creation_csn = Tx::PrehistoricCSN;
return;
}
@ -1182,7 +1201,7 @@ try
/// Case 2.
/// Content of *.tmp file may be broken, just use fake TID.
/// Transaction was not committed if *.tmp file was not renamed, so we should complete rollback by removing part.
version.setCreationTID(Tx::DummyTID, txn_context);
version.setCreationTID(Tx::DummyTID, nullptr);
version.creation_csn = Tx::RolledBackCSN;
remove_tmp_file();
}

View File

@ -423,6 +423,7 @@ public:
/// Required for distinguish different copies of the same part on remote FS.
String getUniqueId() const;
void assertHasVersionMetadata(MergeTreeTransaction * txn) const;
void storeVersionMetadata() const;
void loadVersionMetadata() const;

View File

@ -207,7 +207,8 @@ std::pair<bool, ReplicatedMergeMutateTaskBase::PartLogWriter> MergeFromLogEntryT
reserved_space,
entry.deduplicate,
entry.deduplicate_by_columns,
storage.merging_params);
storage.merging_params,
nullptr);
/// Adjust priority

View File

@ -109,7 +109,8 @@ void MergePlainMergeTreeTask::prepare()
merge_mutate_entry->tagger->reserved_space,
deduplicate,
deduplicate_by_columns,
storage.merging_params);
storage.merging_params,
txn);
}

View File

@ -262,6 +262,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
global_ctx->merging_columns,
MergeTreeIndexFactory::instance().getMany(global_ctx->metadata_snapshot->getSecondaryIndices()),
ctx->compression_codec,
global_ctx->txn,
/*reset_columns=*/ true,
ctx->blocks_are_granules_size);
@ -599,6 +600,7 @@ bool MergeTask::MergeProjectionsStage::mergeMinMaxIndexAndPrepareProjections() c
projection_merging_params,
global_ctx->new_data_part.get(),
".proj",
nullptr,
global_ctx->data,
global_ctx->mutator,
global_ctx->merges_blocker,

View File

@ -60,6 +60,7 @@ public:
MergeTreeData::MergingParams merging_params_,
const IMergeTreeDataPart * parent_part_,
String suffix_,
MergeTreeTransactionPtr txn,
MergeTreeData * data_,
MergeTreeDataMergerMutator * mutator_,
ActionBlocker * merges_blocker_,
@ -83,6 +84,7 @@ public:
global_ctx->mutator = std::move(mutator_);
global_ctx->merges_blocker = std::move(merges_blocker_);
global_ctx->ttl_merges_blocker = std::move(ttl_merges_blocker_);
global_ctx->txn = std::move(txn);
auto prepare_stage_ctx = std::make_shared<ExecuteAndFinalizeHorizontalPartRuntimeContext>();
@ -164,6 +166,8 @@ private:
std::promise<MergeTreeData::MutableDataPartPtr> promise{};
IMergedBlockOutputStream::WrittenOffsetColumns written_offset_columns{};
MergeTreeTransactionPtr txn;
};
using GlobalRuntimeContextPtr = std::shared_ptr<GlobalRuntimeContext>;

View File

@ -1310,7 +1310,18 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
auto deactivate_part = [&] (DataPartIteratorByStateAndInfo it)
{
(*it)->remove_time.store((*it)->modification_time, std::memory_order_relaxed);
auto creation_csn = (*it)->version.creation_csn.load(std::memory_order_relaxed);
if (creation_csn != Tx::RolledBackCSN && creation_csn != Tx::PrehistoricCSN && !(*it)->version.isMaxTIDLocked())
{
/// It's possible that covering part was created without transaction,
/// but if covered part was created with transaction (i.e. creation_tid is not prehistoric),
/// then it must have removal tid in metadata file.
throw Exception(ErrorCodes::LOGICAL_ERROR, "Data part {} is Outdated and has creation TID {} and CSN {}, "
"but does not have removal tid. It's a bug or a result of manual intervention.",
(*it)->name, (*it)->version.creation_tid, creation_csn);
}
modifyPartState(it, DataPartState::Outdated);
removePartContributionToDataVolume(*it);
};
@ -1405,40 +1416,45 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
/// were merged), but that for some reason are still not deleted from the filesystem.
/// Deletion of files will be performed later in the clearOldParts() method.
if (data_parts_indexes.size() >= 2)
auto active_parts_range = getDataPartsStateRange(DataPartState::Active);
auto prev_it = active_parts_range.begin();
auto end_it = active_parts_range.end();
bool less_than_two_active_parts = prev_it == end_it || std::next(prev_it) == end_it;
if (!less_than_two_active_parts)
{
/// Now all parts are committed, so data_parts_by_state_and_info == committed_parts_range
auto prev_jt = data_parts_by_state_and_info.begin();
auto curr_jt = std::next(prev_jt);
(*prev_it)->assertState({DataPartState::Active});
auto curr_it = std::next(prev_it);
(*prev_jt)->assertState({DataPartState::Active});
while (curr_jt != data_parts_by_state_and_info.end() && (*curr_jt)->getState() == DataPartState::Active)
while (curr_it != data_parts_by_state_and_info.end() && (*curr_it)->getState() == DataPartState::Active)
{
(*curr_it)->assertState({DataPartState::Active});
/// Don't consider data parts belonging to different partitions.
if ((*curr_jt)->info.partition_id != (*prev_jt)->info.partition_id)
if ((*curr_it)->info.partition_id != (*prev_it)->info.partition_id)
{
++prev_jt;
++curr_jt;
++prev_it;
++curr_it;
continue;
}
if ((*curr_jt)->contains(**prev_jt))
if ((*curr_it)->contains(**prev_it))
{
deactivate_part(prev_jt);
prev_jt = curr_jt;
++curr_jt;
deactivate_part(prev_it);
prev_it = curr_it;
++curr_it;
}
else if ((*prev_jt)->contains(**curr_jt))
else if ((*prev_it)->contains(**curr_it))
{
auto next = std::next(curr_jt);
deactivate_part(curr_jt);
curr_jt = next;
auto next = std::next(curr_it);
deactivate_part(curr_it);
curr_it = next;
}
else
{
++prev_jt;
++curr_jt;
++prev_it;
++curr_it;
}
}
}
@ -5411,7 +5427,8 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPartOnSameDisk(
const MergeTreeData::DataPartPtr & src_part,
const String & tmp_part_prefix,
const MergeTreePartInfo & dst_part_info,
const StorageMetadataPtr & metadata_snapshot)
const StorageMetadataPtr & metadata_snapshot,
const MergeTreeTransactionPtr & txn)
{
/// Check that the storage policy contains the disk where the src_part is located.
bool does_storage_policy_allow_same_disk = false;
@ -5429,6 +5446,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPartOnSameDisk(
ErrorCodes::BAD_ARGUMENTS);
String dst_part_name = src_part->getNewName(dst_part_info);
assert(!tmp_part_prefix.empty());
String tmp_dst_part_name = tmp_part_prefix + dst_part_name;
auto reservation = reserveSpace(src_part->getBytesOnDisk(), src_part->volume->getDisk());
@ -5455,6 +5473,13 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPartOnSameDisk(
auto single_disk_volume = std::make_shared<SingleDiskVolume>(disk->getName(), disk, 0);
auto dst_data_part = createPart(dst_part_name, dst_part_info, single_disk_volume, tmp_dst_part_name);
if (supportsTransactions())
{
/// We should write version metadata on part creation to distinguish it from parts that were created without transaction.
TransactionID tid = txn ? txn->tid : Tx::PrehistoricTID;
dst_data_part->version.setCreationTID(tid, nullptr);
dst_data_part->storeVersionMetadata();
}
dst_data_part->is_temp = true;

View File

@ -728,7 +728,8 @@ public:
MergeTreeData & checkStructureAndGetMergeTreeData(IStorage & source_table, const StorageMetadataPtr & src_snapshot, const StorageMetadataPtr & my_snapshot) const;
MergeTreeData::MutableDataPartPtr cloneAndLoadDataPartOnSameDisk(
const MergeTreeData::DataPartPtr & src_part, const String & tmp_part_prefix, const MergeTreePartInfo & dst_part_info, const StorageMetadataPtr & metadata_snapshot);
const MergeTreeData::DataPartPtr & src_part, const String & tmp_part_prefix, const MergeTreePartInfo & dst_part_info,
const StorageMetadataPtr & metadata_snapshot, const MergeTreeTransactionPtr & txn);
virtual std::vector<MergeTreeMutationStatus> getMutationsStatus() const = 0;

View File

@ -422,6 +422,7 @@ MergeTaskPtr MergeTreeDataMergerMutator::mergePartsToTemporaryPart(
bool deduplicate,
const Names & deduplicate_by_columns,
const MergeTreeData::MergingParams & merging_params,
const MergeTreeTransactionPtr & txn,
const IMergeTreeDataPart * parent_part,
const String & suffix)
{
@ -438,6 +439,7 @@ MergeTaskPtr MergeTreeDataMergerMutator::mergePartsToTemporaryPart(
merging_params,
parent_part,
suffix,
txn,
&data,
this,
&merges_blocker,
@ -452,6 +454,7 @@ MutateTaskPtr MergeTreeDataMergerMutator::mutatePartToTemporaryPart(
MergeListEntry * merge_entry,
time_t time_of_mutation,
ContextPtr context,
const MergeTreeTransactionPtr & txn,
ReservationSharedPtr space_reservation,
TableLockHolder & holder)
{
@ -464,6 +467,7 @@ MutateTaskPtr MergeTreeDataMergerMutator::mutatePartToTemporaryPart(
context,
space_reservation,
holder,
txn,
data,
*this,
merges_blocker

View File

@ -113,6 +113,7 @@ public:
bool deduplicate,
const Names & deduplicate_by_columns,
const MergeTreeData::MergingParams & merging_params,
const MergeTreeTransactionPtr & txn,
const IMergeTreeDataPart * parent_part = nullptr,
const String & suffix = "");
@ -124,6 +125,7 @@ public:
MergeListEntry * merge_entry,
time_t time_of_mutation,
ContextPtr context,
const MergeTreeTransactionPtr & txn,
ReservationSharedPtr space_reservation,
TableLockHolder & table_lock_holder);

View File

@ -91,7 +91,7 @@ void MergeTreeDataPartInMemory::flushToDisk(const String & base_path, const Stri
auto compression_codec = storage.getContext()->chooseCompressionCodec(0, 0);
auto indices = MergeTreeIndexFactory::instance().getMany(metadata_snapshot->getSecondaryIndices());
MergedBlockOutputStream out(new_data_part, metadata_snapshot, columns, indices, compression_codec);
MergedBlockOutputStream out(new_data_part, metadata_snapshot, columns, indices, compression_codec, nullptr);
out.write(block);
const auto & projections = metadata_snapshot->getProjections();
for (const auto & [projection_name, projection] : projection_parts)
@ -122,7 +122,7 @@ void MergeTreeDataPartInMemory::flushToDisk(const String & base_path, const Stri
auto projection_indices = MergeTreeIndexFactory::instance().getMany(desc.metadata->getSecondaryIndices());
MergedBlockOutputStream projection_out(
projection_data_part, desc.metadata, projection_part->columns, projection_indices,
projection_compression_codec);
projection_compression_codec, nullptr);
projection_out.write(projection_part->block);
projection_out.finalizePart(projection_data_part, false);

View File

@ -7,6 +7,7 @@
#include <Interpreters/AggregationCommon.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/MergeTreeTransaction.h>
#include <IO/HashingWriteBuffer.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDate.h>
@ -427,7 +428,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPart(
const auto & index_factory = MergeTreeIndexFactory::instance();
auto out = std::make_unique<MergedBlockOutputStream>(new_data_part, metadata_snapshot,columns,
index_factory.getMany(metadata_snapshot->getSecondaryIndices()), compression_codec);
index_factory.getMany(metadata_snapshot->getSecondaryIndices()), compression_codec, context->getCurrentTransaction());
out->writeWithPermutation(block, perm_ptr);
@ -545,7 +546,8 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPartImpl(
metadata_snapshot,
columns,
MergeTreeIndices{},
compression_codec);
compression_codec,
nullptr);
out->writeWithPermutation(block, perm_ptr);
auto finalizer = out->finalizePartAsync(new_data_part, false);

View File

@ -195,7 +195,8 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(const Stor
metadata_snapshot,
block.getNamesAndTypesList(),
{},
CompressionCodecFactory::instance().get("NONE", {}));
CompressionCodecFactory::instance().get("NONE", {}),
nullptr);
part->minmax_idx->update(block, storage.getMinMaxColumnsNames(metadata_snapshot->getPartitionKey()));
part->partition.create(metadata_snapshot, block, 0, context);

View File

@ -1,5 +1,6 @@
#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Interpreters/Context.h>
#include <Interpreters/MergeTreeTransaction.h>
#include <Parsers/queryToString.h>
@ -18,6 +19,7 @@ MergedBlockOutputStream::MergedBlockOutputStream(
const NamesAndTypesList & columns_list_,
const MergeTreeIndices & skip_indices,
CompressionCodecPtr default_codec_,
const MergeTreeTransactionPtr & txn,
bool reset_columns_,
bool blocks_are_granules_size)
: IMergedBlockOutputStream(data_part, metadata_snapshot_, columns_list_, reset_columns_)
@ -34,6 +36,16 @@ MergedBlockOutputStream::MergedBlockOutputStream(
if (!part_path.empty())
volume->getDisk()->createDirectories(part_path);
if (storage.supportsTransactions())
{
/// We should write version metadata on part creation to distinguish it from parts that were created without transaction.
TransactionID tid = txn ? txn->tid : Tx::PrehistoricTID;
/// NOTE do not pass context for writing to system.transactions_info_log,
/// because part may have temporary name (with temporary block numbers). Will write it later.
data_part->version.setCreationTID(tid, nullptr);
data_part->storeVersionMetadata();
}
writer = data_part->getWriter(columns_list, metadata_snapshot, skip_indices, default_codec, writer_settings);
}

View File

@ -19,6 +19,7 @@ public:
const NamesAndTypesList & columns_list_,
const MergeTreeIndices & skip_indices,
CompressionCodecPtr default_codec_,
const MergeTreeTransactionPtr & txn,
bool reset_columns_ = false,
bool blocks_are_granules_size = false);

View File

@ -89,7 +89,7 @@ std::pair<bool, ReplicatedMergeMutateTaskBase::PartLogWriter> MutateFromLogEntry
mutate_task = storage.merger_mutator.mutatePartToTemporaryPart(
future_mutated_part, metadata_snapshot, commands, merge_mutate_entry.get(),
entry.create_time, fake_query_context, reserved_space, table_lock_holder);
entry.create_time, fake_query_context, nullptr, reserved_space, table_lock_holder);
/// Adjust priority
for (auto & item : future_mutated_part->parts)

View File

@ -57,7 +57,7 @@ void MutatePlainMergeTreeTask::prepare()
mutate_task = storage.merger_mutator.mutatePartToTemporaryPart(
future_part, metadata_snapshot, merge_mutate_entry->commands, merge_list_entry.get(),
time(nullptr), fake_query_context, merge_mutate_entry->tagger->reserved_space, table_lock_holder);
time(nullptr), fake_query_context, merge_mutate_entry->txn, merge_mutate_entry->tagger->reserved_space, table_lock_holder);
}
bool MutatePlainMergeTreeTask::executeStep()

View File

@ -536,6 +536,8 @@ struct MutationContext
bool need_sync;
ExecuteTTLType execute_ttl_type{ExecuteTTLType::NONE};
MergeTreeTransactionPtr txn;
};
using MutationContextPtr = std::shared_ptr<MutationContext>;
@ -646,6 +648,7 @@ public:
false, // TODO Do we need deduplicate for projections
{},
projection_merging_params,
nullptr,
ctx->new_data_part.get(),
".tmp_proj");
@ -967,7 +970,8 @@ private:
ctx->metadata_snapshot,
ctx->new_data_part->getColumns(),
skip_part_indices,
ctx->compression_codec);
ctx->compression_codec,
ctx->txn);
ctx->mutating_pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
ctx->mutating_executor = std::make_unique<PullingPipelineExecutor>(ctx->mutating_pipeline);
@ -1188,6 +1192,7 @@ MutateTask::MutateTask(
ContextPtr context_,
ReservationSharedPtr space_reservation_,
TableLockHolder & table_lock_holder_,
const MergeTreeTransactionPtr & txn,
MergeTreeData & data_,
MergeTreeDataMergerMutator & mutator_,
ActionBlocker & merges_blocker_)
@ -1205,6 +1210,7 @@ MutateTask::MutateTask(
ctx->metadata_snapshot = metadata_snapshot_;
ctx->space_reservation = space_reservation_;
ctx->storage_columns = metadata_snapshot_->getColumns().getAllPhysical();
ctx->txn = txn;
}
@ -1264,7 +1270,7 @@ bool MutateTask::prepare()
storage_from_source_part, ctx->metadata_snapshot, ctx->commands_for_part, Context::createCopy(context_for_reading)))
{
LOG_TRACE(ctx->log, "Part {} doesn't change up to mutation version {}", ctx->source_part->name, ctx->future_part->part_info.mutation);
promise.set_value(ctx->data->cloneAndLoadDataPartOnSameDisk(ctx->source_part, "tmp_clone_", ctx->future_part->part_info, ctx->metadata_snapshot));
promise.set_value(ctx->data->cloneAndLoadDataPartOnSameDisk(ctx->source_part, "tmp_clone_", ctx->future_part->part_info, ctx->metadata_snapshot, ctx->txn));
return false;
}
else
@ -1358,7 +1364,7 @@ bool MutateTask::prepare()
&& ctx->files_to_rename.empty())
{
LOG_TRACE(ctx->log, "Part {} doesn't change up to mutation version {} (optimized)", ctx->source_part->name, ctx->future_part->part_info.mutation);
promise.set_value(ctx->data->cloneAndLoadDataPartOnSameDisk(ctx->source_part, "tmp_clone_", ctx->future_part->part_info, ctx->metadata_snapshot));
promise.set_value(ctx->data->cloneAndLoadDataPartOnSameDisk(ctx->source_part, "tmp_clone_", ctx->future_part->part_info, ctx->metadata_snapshot, ctx->txn));
return false;
}

View File

@ -32,6 +32,7 @@ public:
ContextPtr context_,
ReservationSharedPtr space_reservation_,
TableLockHolder & table_lock_holder_,
const MergeTreeTransactionPtr & txn,
MergeTreeData & data_,
MergeTreeDataMergerMutator & mutator_,
ActionBlocker & merges_blocker_);

View File

@ -1550,7 +1550,8 @@ void StorageMergeTree::replacePartitionFrom(const StoragePtr & source_table, con
Int64 temp_index = insert_increment.get();
MergeTreePartInfo dst_part_info(partition_id, temp_index, temp_index, src_part->info.level);
dst_parts.emplace_back(cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, my_metadata_snapshot));
auto dst_part = cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, my_metadata_snapshot, local_context->getCurrentTransaction());
dst_parts.emplace_back(std::move(dst_part));
}
/// ATTACH empty part set
@ -1635,7 +1636,8 @@ void StorageMergeTree::movePartitionToTable(const StoragePtr & dest_table, const
Int64 temp_index = insert_increment.get();
MergeTreePartInfo dst_part_info(partition_id, temp_index, temp_index, src_part->info.level);
dst_parts.emplace_back(dest_table_storage->cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, dest_metadata_snapshot));
auto dst_part = dest_table_storage->cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, dest_metadata_snapshot, local_context->getCurrentTransaction());
dst_parts.emplace_back(std::move(dst_part));
}
/// empty part set

View File

@ -2107,7 +2107,7 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
throw Exception("Checksums of " + part_desc->src_table_part->name + " is suddenly changed", ErrorCodes::UNFINISHED);
part_desc->res_part = cloneAndLoadDataPartOnSameDisk(
part_desc->src_table_part, TMP_PREFIX + "clone_", part_desc->new_part_info, metadata_snapshot);
part_desc->src_table_part, TMP_PREFIX + "clone_", part_desc->new_part_info, metadata_snapshot, nullptr);
}
else if (!part_desc->replica.empty())
{
@ -3821,7 +3821,7 @@ bool StorageReplicatedMergeTree::fetchPart(const String & part_name, const Stora
{
get_part = [&, part_to_clone]()
{
return cloneAndLoadDataPartOnSameDisk(part_to_clone, "tmp_clone_", part_info, metadata_snapshot);
return cloneAndLoadDataPartOnSameDisk(part_to_clone, "tmp_clone_", part_info, metadata_snapshot, nullptr);
};
}
else
@ -6272,7 +6272,7 @@ void StorageReplicatedMergeTree::replacePartitionFrom(
UInt64 index = lock->getNumber();
MergeTreePartInfo dst_part_info(partition_id, index, index, src_part->info.level);
auto dst_part = cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, metadata_snapshot);
auto dst_part = cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, metadata_snapshot, nullptr);
src_parts.emplace_back(src_part);
dst_parts.emplace_back(dst_part);
@ -6483,7 +6483,7 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
UInt64 index = lock->getNumber();
MergeTreePartInfo dst_part_info(partition_id, index, index, src_part->info.level);
auto dst_part = dest_table_storage->cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, dest_metadata_snapshot);
auto dst_part = dest_table_storage->cloneAndLoadDataPartOnSameDisk(src_part, TMP_PREFIX, dst_part_info, dest_metadata_snapshot, nullptr);
src_parts.emplace_back(src_part);
dst_parts.emplace_back(dst_part);
@ -7544,7 +7544,7 @@ bool StorageReplicatedMergeTree::createEmptyPartInsteadOfLost(zkutil::ZooKeeperP
const auto & index_factory = MergeTreeIndexFactory::instance();
MergedBlockOutputStream out(new_data_part, metadata_snapshot, columns,
index_factory.getMany(metadata_snapshot->getSecondaryIndices()), compression_codec);
index_factory.getMany(metadata_snapshot->getSecondaryIndices()), compression_codec, nullptr);
bool sync_on_insert = settings->fsync_after_insert;

View File

@ -23,7 +23,6 @@
3 1 Begin 1 1 1 1
3 1 AddPart 1 1 1 1 all_3_3_0
3 1 Commit 1 1 1 0
1 1 AddPart 1 1 1 1 all_1_1_0
4 1 Begin 1 1 1 1
4 1 AddPart 1 1 1 1 all_4_4_0
4 1 Commit 1 1 1 0