diff --git a/src/Client/ClientBase.cpp b/src/Client/ClientBase.cpp index 4f1c1f4539e..c187222bb1e 100644 --- a/src/Client/ClientBase.cpp +++ b/src/Client/ClientBase.cpp @@ -1096,7 +1096,7 @@ void ClientBase::sendData(Block & sample, const ColumnsDescription & columns_des sendDataFromPipe( storage->read( sample.getNames(), - storage->getStorageSnapshot(metadata), + storage->getStorageSnapshot(metadata, global_context), query_info, global_context, {}, diff --git a/src/Interpreters/InterpreterDescribeQuery.cpp b/src/Interpreters/InterpreterDescribeQuery.cpp index da5fcedd469..9919b1272bd 100644 --- a/src/Interpreters/InterpreterDescribeQuery.cpp +++ b/src/Interpreters/InterpreterDescribeQuery.cpp @@ -89,7 +89,7 @@ BlockIO InterpreterDescribeQuery::execute() auto table_lock = table->lockForShare(getContext()->getInitialQueryId(), settings.lock_acquire_timeout); auto metadata_snapshot = table->getInMemoryMetadataPtr(); - storage_snapshot = table->getStorageSnapshot(metadata_snapshot); + storage_snapshot = table->getStorageSnapshot(metadata_snapshot, getContext()); columns = metadata_snapshot->getColumns(); } diff --git a/src/Interpreters/InterpreterOptimizeQuery.cpp b/src/Interpreters/InterpreterOptimizeQuery.cpp index 05b65ac5d51..83bf23ab4ad 100644 --- a/src/Interpreters/InterpreterOptimizeQuery.cpp +++ b/src/Interpreters/InterpreterOptimizeQuery.cpp @@ -33,7 +33,7 @@ BlockIO InterpreterOptimizeQuery::execute() StoragePtr table = DatabaseCatalog::instance().getTable(table_id, getContext()); checkStorageSupportsTransactionsIfNeeded(table, getContext()); auto metadata_snapshot = table->getInMemoryMetadataPtr(); - auto storage_snapshot = table->getStorageSnapshot(metadata_snapshot); + auto storage_snapshot = table->getStorageSnapshot(metadata_snapshot, getContext()); // Empty list of names means we deduplicate by all columns, but user can explicitly state which columns to use. Names column_names; diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 087108e7d79..708429acf9f 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -329,7 +329,7 @@ InterpreterSelectQuery::InterpreterSelectQuery( if (!metadata_snapshot) metadata_snapshot = storage->getInMemoryMetadataPtr(); - storage_snapshot = storage->getStorageSnapshotForQuery(metadata_snapshot, query_ptr); + storage_snapshot = storage->getStorageSnapshotForQuery(metadata_snapshot, query_ptr, context); } if (has_input || !joined_tables.resolveTables()) diff --git a/src/Interpreters/MergeTreeTransaction.cpp b/src/Interpreters/MergeTreeTransaction.cpp index 2442c16772a..b51836ddb03 100644 --- a/src/Interpreters/MergeTreeTransaction.cpp +++ b/src/Interpreters/MergeTreeTransaction.cpp @@ -65,8 +65,10 @@ void MergeTreeTransaction::removeOldPart(const StoragePtr & storage, const DataP else { /// Lock part for removal with special TID, so transactions will not try to remove it concurrently. - /// We lock it only in memory. + /// We lock it only in memory if part was not involved in any transactions. part_to_remove->version.lockRemovalTID(Tx::PrehistoricTID, transaction_context); + if (part_to_remove->wasInvolvedInTransaction()) + part_to_remove->appendRemovalTIDToVersionMetadata(); } } @@ -189,6 +191,9 @@ void MergeTreeTransaction::afterCommit(CSN assigned_csn) noexcept part->version.removal_csn.store(csn); part->appendCSNToVersionMetadata(VersionMetadata::WhichCSN::REMOVAL); } + + for (const auto & storage_and_mutation : mutations) + storage_and_mutation.first->setMutationCSN(storage_and_mutation.second, csn); } bool MergeTreeTransaction::rollback() noexcept @@ -220,7 +225,11 @@ bool MergeTreeTransaction::rollback() noexcept /// Kind of optimization: cleanup thread can remove these parts immediately for (const auto & part : parts_to_remove) + { part->version.creation_csn.store(Tx::RolledBackCSN); + /// Write special RolledBackCSN, so we will be able to cleanup transaction log + part->appendCSNToVersionMetadata(VersionMetadata::CREATION); + } for (const auto & part : parts_to_activate) { diff --git a/src/Interpreters/MutationsInterpreter.cpp b/src/Interpreters/MutationsInterpreter.cpp index 5e795c5760a..f89601094f0 100644 --- a/src/Interpreters/MutationsInterpreter.cpp +++ b/src/Interpreters/MutationsInterpreter.cpp @@ -802,7 +802,7 @@ ASTPtr MutationsInterpreter::prepareInterpreterSelectQuery(std::vector & /// e.g. ALTER referencing the same table in scalar subquery bool execute_scalar_subqueries = !dry_run; auto syntax_result = TreeRewriter(context).analyze( - all_asts, all_columns, storage, storage->getStorageSnapshot(metadata_snapshot), + all_asts, all_columns, storage, storage->getStorageSnapshot(metadata_snapshot, context), false, true, execute_scalar_subqueries); if (execute_scalar_subqueries && context->hasQueryContext()) diff --git a/src/Interpreters/TransactionVersionMetadata.cpp b/src/Interpreters/TransactionVersionMetadata.cpp index 20ebc1170ed..7c139686916 100644 --- a/src/Interpreters/TransactionVersionMetadata.cpp +++ b/src/Interpreters/TransactionVersionMetadata.cpp @@ -383,7 +383,8 @@ void VersionMetadata::read(ReadBuffer & buf) { /// NOTE Metadata file may actually contain multiple creation TIDs, we need the last one. removal_tid = TransactionID::read(buf); - removal_tid_lock = removal_tid.getHash(); + if (!removal_tid.isEmpty()) + removal_tid_lock = removal_tid.getHash(); } else if (name == REMOVAL_CSN_STR) { diff --git a/src/QueryPipeline/RemoteQueryExecutor.cpp b/src/QueryPipeline/RemoteQueryExecutor.cpp index 110d4308236..f4a30a9fee7 100644 --- a/src/QueryPipeline/RemoteQueryExecutor.cpp +++ b/src/QueryPipeline/RemoteQueryExecutor.cpp @@ -563,7 +563,7 @@ void RemoteQueryExecutor::sendExternalTables() { SelectQueryInfo query_info; auto metadata_snapshot = cur->getInMemoryMetadataPtr(); - auto storage_snapshot = cur->getStorageSnapshot(metadata_snapshot); + auto storage_snapshot = cur->getStorageSnapshot(metadata_snapshot, context); QueryProcessingStage::Enum read_from_table_stage = cur->getQueryProcessingStage( context, QueryProcessingStage::Complete, storage_snapshot, query_info); diff --git a/src/Storages/FileLog/StorageFileLog.cpp b/src/Storages/FileLog/StorageFileLog.cpp index 32ca936f039..85bd8754cb5 100644 --- a/src/Storages/FileLog/StorageFileLog.cpp +++ b/src/Storages/FileLog/StorageFileLog.cpp @@ -679,7 +679,7 @@ bool StorageFileLog::streamToViews() throw Exception("Engine table " + table_id.getNameForLogs() + " doesn't exist", ErrorCodes::LOGICAL_ERROR); auto metadata_snapshot = getInMemoryMetadataPtr(); - auto storage_snapshot = getStorageSnapshot(metadata_snapshot); + auto storage_snapshot = getStorageSnapshot(metadata_snapshot, getContext()); auto max_streams_number = std::min(filelog_settings->max_threads.value, file_infos.file_names.size()); /// No files to parse diff --git a/src/Storages/IStorage.h b/src/Storages/IStorage.h index a73cd3c4a21..734769a9b18 100644 --- a/src/Storages/IStorage.h +++ b/src/Storages/IStorage.h @@ -488,6 +488,11 @@ public: throw Exception("Mutations are not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED); } + virtual void setMutationCSN(const String & /*mutation_id*/, UInt64 /*csn*/) + { + throw Exception("Mutations are not supported by storage " + getName(), ErrorCodes::NOT_IMPLEMENTED); + } + /// Cancel a part move to shard. virtual CancellationCode killPartMoveToShard(const UUID & /*task_uuid*/) { @@ -610,15 +615,15 @@ public: virtual std::optional lifetimeBytes() const { return {}; } /// Creates a storage snapshot from given metadata. - virtual StorageSnapshotPtr getStorageSnapshot(const StorageMetadataPtr & metadata_snapshot) const + virtual StorageSnapshotPtr getStorageSnapshot(const StorageMetadataPtr & metadata_snapshot, ContextPtr /*query_context*/) const { return std::make_shared(*this, metadata_snapshot); } /// Creates a storage snapshot from given metadata and columns, which are used in query. - virtual StorageSnapshotPtr getStorageSnapshotForQuery(const StorageMetadataPtr & metadata_snapshot, const ASTPtr & /*query*/) const + virtual StorageSnapshotPtr getStorageSnapshotForQuery(const StorageMetadataPtr & metadata_snapshot, const ASTPtr & /*query*/, ContextPtr query_context) const { - return getStorageSnapshot(metadata_snapshot); + return getStorageSnapshot(metadata_snapshot, query_context); } private: diff --git a/src/Storages/Kafka/StorageKafka.cpp b/src/Storages/Kafka/StorageKafka.cpp index 4c7465d587d..0686c9b39cb 100644 --- a/src/Storages/Kafka/StorageKafka.cpp +++ b/src/Storages/Kafka/StorageKafka.cpp @@ -615,7 +615,7 @@ bool StorageKafka::streamToViews() if (!table) throw Exception("Engine table " + table_id.getNameForLogs() + " doesn't exist.", ErrorCodes::LOGICAL_ERROR); - auto storage_snapshot = getStorageSnapshot(getInMemoryMetadataPtr()); + auto storage_snapshot = getStorageSnapshot(getInMemoryMetadataPtr(), getContext()); // Create an INSERT query for streaming data auto insert = std::make_shared(); diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 3eba1bb03ea..ef8f44dbae6 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -1396,7 +1396,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) } /// Sanity checks - bool csn_order = !version.removal_csn || version.creation_csn <= version.removal_csn; + bool csn_order = !version.removal_csn || version.creation_csn <= version.removal_csn || version.removal_csn == Tx::PrehistoricCSN; bool min_start_csn_order = version.creation_tid.start_csn <= version.creation_csn; bool max_start_csn_order = version.removal_tid.start_csn <= version.removal_csn; bool creation_csn_known = version.creation_csn; @@ -4039,6 +4039,21 @@ DataPartsVector MergeTreeData::getVisibleDataPartsVector(ContextPtr local_contex return res; } +DataPartsVector MergeTreeData::getVisibleDataPartsVectorUnlocked(ContextPtr local_context, const DataPartsLock & lock) const +{ + DataPartsVector res; + if (const auto * txn = local_context->getCurrentTransaction().get()) + { + res = getDataPartsVectorForInternalUsage({DataPartState::Active, DataPartState::Outdated}, lock); + filterVisibleDataParts(res, txn->getSnapshot(), txn->tid); + } + else + { + res = getDataPartsVectorForInternalUsage({DataPartState::Active}, lock); + } + return res; +} + MergeTreeData::DataPartsVector MergeTreeData::getVisibleDataPartsVector(const MergeTreeTransactionPtr & txn) const { DataPartsVector res; @@ -6420,12 +6435,12 @@ void MergeTreeData::updateObjectColumns(const DataPartPtr & part, const DataPart DB::updateObjectColumns(object_columns, part->getColumns()); } -StorageSnapshotPtr MergeTreeData::getStorageSnapshot(const StorageMetadataPtr & metadata_snapshot) const +StorageSnapshotPtr MergeTreeData::getStorageSnapshot(const StorageMetadataPtr & metadata_snapshot, ContextPtr query_context) const { auto snapshot_data = std::make_unique(); auto lock = lockParts(); - snapshot_data->parts = getDataPartsVectorForInternalUsage({DataPartState::Active}, lock); + snapshot_data->parts = getVisibleDataPartsVectorUnlocked(query_context, lock); return std::make_shared(*this, metadata_snapshot, object_columns, std::move(snapshot_data)); } diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index eab742a23a1..8c2d31e7b08 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -435,7 +435,7 @@ public: DataPartsVector parts; }; - StorageSnapshotPtr getStorageSnapshot(const StorageMetadataPtr & metadata_snapshot) const override; + StorageSnapshotPtr getStorageSnapshot(const StorageMetadataPtr & metadata_snapshot, ContextPtr query_context) const override; /// Load the set of data parts from disk. Call once - immediately after the object is created. void loadDataParts(bool skip_sanity_checks); @@ -473,6 +473,7 @@ public: /// Returns parts that visible with current snapshot DataPartsVector getVisibleDataPartsVector(ContextPtr local_context) const; + DataPartsVector getVisibleDataPartsVectorUnlocked(ContextPtr local_context, const DataPartsLock & lock) const; DataPartsVector getVisibleDataPartsVector(const MergeTreeTransactionPtr & txn) const; DataPartsVector getVisibleDataPartsVector(CSN snapshot_version, TransactionID current_tid) const; diff --git a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp index cdf3a7b902d..758a2f6b241 100644 --- a/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp +++ b/src/Storages/MergeTree/MergeTreeDataSelectExecutor.cpp @@ -131,14 +131,11 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read( const auto & settings = context->getSettingsRef(); - auto parts_in_txn_snapshot = data.getVisibleDataPartsVector(context); - const auto & metadata_for_reading = storage_snapshot->getMetadataForQuery(); const auto & snapshot_data = assert_cast(*storage_snapshot->data); - /// FIXME: use one snapshot - const auto & parts = context->getCurrentTransaction() ? parts_in_txn_snapshot : snapshot_data.parts; + const auto & parts = snapshot_data.parts; if (!query_info.projection) { diff --git a/src/Storages/MergeTree/MergeTreeDataWriter.cpp b/src/Storages/MergeTree/MergeTreeDataWriter.cpp index b636f7cd6af..d80a3525b4f 100644 --- a/src/Storages/MergeTree/MergeTreeDataWriter.cpp +++ b/src/Storages/MergeTree/MergeTreeDataWriter.cpp @@ -284,7 +284,7 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeTempPart( TemporaryPart temp_part; Block & block = block_with_partition.block; auto columns = metadata_snapshot->getColumns().getAllPhysical().filter(block.getNames()); - auto storage_snapshot = data.getStorageSnapshot(metadata_snapshot); + auto storage_snapshot = data.getStorageSnapshot(metadata_snapshot, context); if (!storage_snapshot->object_columns.empty()) { diff --git a/src/Storages/MergeTree/MergeTreeMutationEntry.cpp b/src/Storages/MergeTree/MergeTreeMutationEntry.cpp index 6c29714a474..0d557ed36c2 100644 --- a/src/Storages/MergeTree/MergeTreeMutationEntry.cpp +++ b/src/Storages/MergeTree/MergeTreeMutationEntry.cpp @@ -62,7 +62,11 @@ MergeTreeMutationEntry::MergeTreeMutationEntry(MutationCommands commands_, DiskP *out << "commands: "; commands.writeText(*out); *out << "\n"; - if (!tid.isPrehistoric()) + if (tid.isPrehistoric()) + { + csn = Tx::PrehistoricCSN; + } + else { *out << "tid: "; TransactionID::write(tid, *out); @@ -99,6 +103,14 @@ void MergeTreeMutationEntry::removeFile() } } +void MergeTreeMutationEntry::writeCSN(CSN csn_) +{ + csn = csn_; + auto out = disk->writeFile(path_prefix + file_name, 256, WriteMode::Append); + *out << "csn: " << csn << "\n"; + out->finalize(); +} + MergeTreeMutationEntry::MergeTreeMutationEntry(DiskPtr disk_, const String & path_prefix_, const String & file_name_) : disk(std::move(disk_)) , path_prefix(path_prefix_) @@ -120,11 +132,21 @@ MergeTreeMutationEntry::MergeTreeMutationEntry(DiskPtr disk_, const String & pat commands.readText(*buf); *buf >> "\n"; - if (!buf->eof()) + if (buf->eof()) + { + tid = Tx::PrehistoricTID; + csn = Tx::PrehistoricCSN; + } + else { *buf >> "tid: "; tid = TransactionID::read(*buf); *buf >> "\n"; + + if (!buf->eof()) + { + *buf >> "csn: " >> csn >> "\n"; + } } assertEOF(*buf); diff --git a/src/Storages/MergeTree/MergeTreeMutationEntry.h b/src/Storages/MergeTree/MergeTreeMutationEntry.h index 5fb92b9954d..47debe47a22 100644 --- a/src/Storages/MergeTree/MergeTreeMutationEntry.h +++ b/src/Storages/MergeTree/MergeTreeMutationEntry.h @@ -31,7 +31,7 @@ struct MergeTreeMutationEntry /// ID of transaction which has created mutation. TransactionID tid = Tx::PrehistoricTID; - CSN csn; + CSN csn = Tx::UnknownCSN; /// Create a new entry and write it to a temporary file. MergeTreeMutationEntry(MutationCommands commands_, DiskPtr disk, const String & path_prefix_, UInt64 tmp_number, @@ -45,6 +45,8 @@ struct MergeTreeMutationEntry void removeFile(); + void writeCSN(CSN csn_); + static String versionToFileName(UInt64 block_number_); static UInt64 tryParseFileName(const String & file_name_); static UInt64 parseFileName(const String & file_name_); diff --git a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp index cadfa85299c..5a527ca2d23 100644 --- a/src/Storages/RabbitMQ/StorageRabbitMQ.cpp +++ b/src/Storages/RabbitMQ/StorageRabbitMQ.cpp @@ -1024,7 +1024,7 @@ bool StorageRabbitMQ::streamToViews() InterpreterInsertQuery interpreter(insert, rabbitmq_context, false, true, true); auto block_io = interpreter.execute(); - auto storage_snapshot = getStorageSnapshot(getInMemoryMetadataPtr()); + auto storage_snapshot = getStorageSnapshot(getInMemoryMetadataPtr(), getContext()); auto column_names = block_io.pipeline.getHeader().getNames(); auto sample_block = storage_snapshot->getSampleBlockForColumns(column_names); diff --git a/src/Storages/ReadFinalForExternalReplicaStorage.cpp b/src/Storages/ReadFinalForExternalReplicaStorage.cpp index cf1c5c35629..a03ccb5cf43 100644 --- a/src/Storages/ReadFinalForExternalReplicaStorage.cpp +++ b/src/Storages/ReadFinalForExternalReplicaStorage.cpp @@ -54,7 +54,7 @@ Pipe readFinalFromNestedStorage( filter_column_name = expressions->children.back()->getColumnName(); } - auto nested_snapshot = nested_storage->getStorageSnapshot(nested_metadata); + auto nested_snapshot = nested_storage->getStorageSnapshot(nested_metadata, context); Pipe pipe = nested_storage->read(require_columns_name, nested_snapshot, query_info, context, processed_stage, max_block_size, num_streams); pipe.addTableLock(lock); pipe.addStorageHolder(nested_storage); diff --git a/src/Storages/StorageBuffer.cpp b/src/Storages/StorageBuffer.cpp index 801e1b80a20..a503e79dc2c 100644 --- a/src/Storages/StorageBuffer.cpp +++ b/src/Storages/StorageBuffer.cpp @@ -203,7 +203,7 @@ QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage( /// TODO: Find a way to support projections for StorageBuffer query_info.ignore_projections = true; const auto & destination_metadata = destination->getInMemoryMetadataPtr(); - return destination->getQueryProcessingStage(local_context, to_stage, destination->getStorageSnapshot(destination_metadata), query_info); + return destination->getQueryProcessingStage(local_context, to_stage, destination->getStorageSnapshot(destination_metadata, local_context), query_info); } return QueryProcessingStage::FetchColumns; @@ -248,7 +248,7 @@ void StorageBuffer::read( auto destination_lock = destination->lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout); auto destination_metadata_snapshot = destination->getInMemoryMetadataPtr(); - auto destination_snapshot = destination->getStorageSnapshot(destination_metadata_snapshot); + auto destination_snapshot = destination->getStorageSnapshot(destination_metadata_snapshot, local_context); const bool dst_has_same_structure = std::all_of(column_names.begin(), column_names.end(), [metadata_snapshot, destination_metadata_snapshot](const String& column_name) { diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index 1a390f784a2..62ec2524a32 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -617,13 +617,13 @@ static bool requiresObjectColumns(const ColumnsDescription & all_columns, ASTPtr return false; } -StorageSnapshotPtr StorageDistributed::getStorageSnapshot(const StorageMetadataPtr & metadata_snapshot) const +StorageSnapshotPtr StorageDistributed::getStorageSnapshot(const StorageMetadataPtr & metadata_snapshot, ContextPtr query_context) const { - return getStorageSnapshotForQuery(metadata_snapshot, nullptr); + return getStorageSnapshotForQuery(metadata_snapshot, nullptr, query_context); } StorageSnapshotPtr StorageDistributed::getStorageSnapshotForQuery( - const StorageMetadataPtr & metadata_snapshot, const ASTPtr & query) const + const StorageMetadataPtr & metadata_snapshot, const ASTPtr & query, ContextPtr /*query_context*/) const { /// If query doesn't use columns of type Object, don't deduce /// concrete types for them, because it required extra round trip. diff --git a/src/Storages/StorageDistributed.h b/src/Storages/StorageDistributed.h index 317463783ee..a890cabd8b1 100644 --- a/src/Storages/StorageDistributed.h +++ b/src/Storages/StorageDistributed.h @@ -69,9 +69,9 @@ public: ColumnsDescriptionByShardNum objects_by_shard; }; - StorageSnapshotPtr getStorageSnapshot(const StorageMetadataPtr & metadata_snapshot) const override; + StorageSnapshotPtr getStorageSnapshot(const StorageMetadataPtr & metadata_snapshot, ContextPtr query_context) const override; StorageSnapshotPtr getStorageSnapshotForQuery( - const StorageMetadataPtr & metadata_snapshot, const ASTPtr & query) const override; + const StorageMetadataPtr & metadata_snapshot, const ASTPtr & query, ContextPtr query_context) const override; QueryProcessingStage::Enum getQueryProcessingStage(ContextPtr, QueryProcessingStage::Enum, const StorageSnapshotPtr &, SelectQueryInfo &) const override; diff --git a/src/Storages/StorageMaterializedView.cpp b/src/Storages/StorageMaterializedView.cpp index 008b42e3299..72b0433ed6f 100644 --- a/src/Storages/StorageMaterializedView.cpp +++ b/src/Storages/StorageMaterializedView.cpp @@ -140,7 +140,7 @@ QueryProcessingStage::Enum StorageMaterializedView::getQueryProcessingStage( /// converting and use it just like a normal view. query_info.ignore_projections = true; const auto & target_metadata = getTargetTable()->getInMemoryMetadataPtr(); - return getTargetTable()->getQueryProcessingStage(local_context, to_stage, getTargetTable()->getStorageSnapshot(target_metadata), query_info); + return getTargetTable()->getQueryProcessingStage(local_context, to_stage, getTargetTable()->getStorageSnapshot(target_metadata, local_context), query_info); } Pipe StorageMaterializedView::read( @@ -172,7 +172,7 @@ void StorageMaterializedView::read( auto storage = getTargetTable(); auto lock = storage->lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout); auto target_metadata_snapshot = storage->getInMemoryMetadataPtr(); - auto target_storage_snapshot = storage->getStorageSnapshot(target_metadata_snapshot); + auto target_storage_snapshot = storage->getStorageSnapshot(target_metadata_snapshot, local_context); if (query_info.order_optimizer) query_info.input_order_info = query_info.order_optimizer->getInputOrder(target_metadata_snapshot, local_context); diff --git a/src/Storages/StorageMemory.cpp b/src/Storages/StorageMemory.cpp index c3601b33a04..f2d53dfa0d5 100644 --- a/src/Storages/StorageMemory.cpp +++ b/src/Storages/StorageMemory.cpp @@ -110,10 +110,11 @@ class MemorySink : public SinkToStorage public: MemorySink( StorageMemory & storage_, - const StorageMetadataPtr & metadata_snapshot_) + const StorageMetadataPtr & metadata_snapshot_, + ContextPtr context) : SinkToStorage(metadata_snapshot_->getSampleBlock()) , storage(storage_) - , storage_snapshot(storage_.getStorageSnapshot(metadata_snapshot_)) + , storage_snapshot(storage_.getStorageSnapshot(metadata_snapshot_, context)) { } @@ -190,7 +191,7 @@ StorageMemory::StorageMemory( setInMemoryMetadata(storage_metadata); } -StorageSnapshotPtr StorageMemory::getStorageSnapshot(const StorageMetadataPtr & metadata_snapshot) const +StorageSnapshotPtr StorageMemory::getStorageSnapshot(const StorageMetadataPtr & metadata_snapshot, ContextPtr /*query_context*/) const { auto snapshot_data = std::make_unique(); snapshot_data->blocks = data.get(); @@ -260,9 +261,9 @@ Pipe StorageMemory::read( } -SinkToStoragePtr StorageMemory::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr /*context*/) +SinkToStoragePtr StorageMemory::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr context) { - return std::make_shared(*this, metadata_snapshot); + return std::make_shared(*this, metadata_snapshot, context); } diff --git a/src/Storages/StorageMemory.h b/src/Storages/StorageMemory.h index 1c4421e51a6..cb308ad5c58 100644 --- a/src/Storages/StorageMemory.h +++ b/src/Storages/StorageMemory.h @@ -36,7 +36,7 @@ public: std::shared_ptr blocks; }; - StorageSnapshotPtr getStorageSnapshot(const StorageMetadataPtr & metadata_snapshot) const override; + StorageSnapshotPtr getStorageSnapshot(const StorageMetadataPtr & metadata_snapshot, ContextPtr query_context) const override; Pipe read( const Names & column_names, diff --git a/src/Storages/StorageMerge.cpp b/src/Storages/StorageMerge.cpp index 96e6070e09e..8b71cfdb102 100644 --- a/src/Storages/StorageMerge.cpp +++ b/src/Storages/StorageMerge.cpp @@ -201,7 +201,7 @@ QueryProcessingStage::Enum StorageMerge::getQueryProcessingStage( stage_in_source_tables = std::max( stage_in_source_tables, table->getQueryProcessingStage(local_context, to_stage, - table->getStorageSnapshot(table->getInMemoryMetadataPtr()), query_info)); + table->getStorageSnapshot(table->getInMemoryMetadataPtr(), local_context), query_info)); } iterator->next(); @@ -338,7 +338,7 @@ Pipe StorageMerge::read( Aliases aliases; auto storage_metadata_snapshot = storage->getInMemoryMetadataPtr(); auto storage_columns = storage_metadata_snapshot->getColumns(); - auto nested_storage_snaphsot = storage->getStorageSnapshot(storage_metadata_snapshot); + auto nested_storage_snaphsot = storage->getStorageSnapshot(storage_metadata_snapshot, local_context); auto modified_query_info = getModifiedQueryInfo(query_info, modified_context, storage->getStorageID(), storage->as()); auto syntax_result = TreeRewriter(local_context).analyzeSelect( @@ -377,7 +377,7 @@ Pipe StorageMerge::read( } syntax_result = TreeRewriter(local_context).analyze( - required_columns_expr_list, storage_columns.getAllPhysical(), storage, storage->getStorageSnapshot(storage_metadata_snapshot)); + required_columns_expr_list, storage_columns.getAllPhysical(), storage, storage->getStorageSnapshot(storage_metadata_snapshot, local_context)); auto alias_actions = ExpressionAnalyzer(required_columns_expr_list, syntax_result, local_context).getActionsDAG(true); diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index d37554f3d4a..f3425546023 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -539,6 +539,18 @@ void StorageMergeTree::waitForMutation(const String & mutation_id) LOG_INFO(log, "Mutation {} done", mutation_id); } +void StorageMergeTree::setMutationCSN(const String & mutation_id, CSN csn) +{ + LOG_INFO(log, "Writing CSN {} for mutation {}", csn, mutation_id); + UInt64 version = MergeTreeMutationEntry::parseFileName(mutation_id); + + std::lock_guard lock(currently_processing_in_background_mutex); + auto it = current_mutations_by_version.find(version); + if (it == current_mutations_by_version.end()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot find mutation {}", mutation_id); + it->second.writeCSN(csn); +} + void StorageMergeTree::mutate(const MutationCommands & commands, ContextPtr query_context) { /// Validate partition IDs (if any) before starting mutation @@ -718,9 +730,13 @@ void StorageMergeTree::loadMutations() UInt64 block_number = entry.block_number; LOG_DEBUG(log, "Loading mutation: {} entry, commands size: {}", it->name(), entry.commands.size()); - if (!entry.tid.isPrehistoric()) + if (!entry.tid.isPrehistoric() && !entry.csn) { - if (!TransactionLog::getCSN(entry.tid)) + if (auto csn = TransactionLog::getCSN(entry.tid)) + { + entry.writeCSN(csn); + } + else { TransactionLog::assertTIDIsNotOutdated(entry.tid); LOG_DEBUG(log, "Mutation entry {} was created by transaction {}, but it was not committed. Removing mutation entry", diff --git a/src/Storages/StorageMergeTree.h b/src/Storages/StorageMergeTree.h index cb0436c7af5..72900fd7a3c 100644 --- a/src/Storages/StorageMergeTree.h +++ b/src/Storages/StorageMergeTree.h @@ -175,6 +175,8 @@ private: /// Wait until mutation with version will finish mutation for all parts void waitForMutation(Int64 version); void waitForMutation(const String & mutation_id) override; + void setMutationCSN(const String & mutation_id, CSN csn) override; + friend struct CurrentlyMergingPartsTagger; diff --git a/src/Storages/StorageProxy.h b/src/Storages/StorageProxy.h index ce72633f024..b1eb190bd1d 100644 --- a/src/Storages/StorageProxy.h +++ b/src/Storages/StorageProxy.h @@ -41,7 +41,7 @@ public: /// TODO: Find a way to support projections for StorageProxy info.ignore_projections = true; const auto & nested_metadata = getNested()->getInMemoryMetadataPtr(); - return getNested()->getQueryProcessingStage(context, to_stage, getNested()->getStorageSnapshot(nested_metadata), info); + return getNested()->getQueryProcessingStage(context, to_stage, getNested()->getStorageSnapshot(nested_metadata, context), info); } Pipe watch( diff --git a/src/Storages/StorageTableFunction.h b/src/Storages/StorageTableFunction.h index 4616421b24a..8bc1b160e77 100644 --- a/src/Storages/StorageTableFunction.h +++ b/src/Storages/StorageTableFunction.h @@ -104,7 +104,7 @@ public: for (const auto & c : column_names) cnames += c + " "; auto storage = getNested(); - auto nested_snapshot = storage->getStorageSnapshot(storage->getInMemoryMetadataPtr()); + auto nested_snapshot = storage->getStorageSnapshot(storage->getInMemoryMetadataPtr(), context); auto pipe = storage->read(column_names, nested_snapshot, query_info, context, processed_stage, max_block_size, num_streams); if (!pipe.empty() && add_conversion) diff --git a/src/Storages/tests/gtest_storage_log.cpp b/src/Storages/tests/gtest_storage_log.cpp index 4cda9d6c9f5..66922afdd9c 100644 --- a/src/Storages/tests/gtest_storage_log.cpp +++ b/src/Storages/tests/gtest_storage_log.cpp @@ -117,7 +117,7 @@ std::string readData(DB::StoragePtr & table, const DB::ContextPtr context) { using namespace DB; auto metadata_snapshot = table->getInMemoryMetadataPtr(); - auto storage_snapshot = table->getStorageSnapshot(metadata_snapshot); + auto storage_snapshot = table->getStorageSnapshot(metadata_snapshot, context); Names column_names; column_names.push_back("a");