diff --git a/src/Storages/MergeTree/DataPartsExchange.cpp b/src/Storages/MergeTree/DataPartsExchange.cpp index 61561a8e3cf..7039951b256 100644 --- a/src/Storages/MergeTree/DataPartsExchange.cpp +++ b/src/Storages/MergeTree/DataPartsExchange.cpp @@ -135,7 +135,7 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo void Service::sendPartFromMemory(const MergeTreeData::DataPartPtr & part, WriteBuffer & out) { - const auto * part_in_memory = dynamic_cast(part.get()); + auto part_in_memory = asInMemoryPart(part); if (!part_in_memory) throw Exception("Part " + part->name + " is not stored in memory", ErrorCodes::LOGICAL_ERROR); diff --git a/src/Storages/MergeTree/IMergeTreeDataPart.h b/src/Storages/MergeTree/IMergeTreeDataPart.h index f091d8ec519..8b65ec29b97 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPart.h +++ b/src/Storages/MergeTree/IMergeTreeDataPart.h @@ -95,9 +95,6 @@ public: virtual bool supportsVerticalMerge() const { return false; } - virtual bool waitUntilMerged(size_t /* timeout */) const { return true; } - virtual void notifyMerged() const {} - /// NOTE: Returns zeros if column files are not found in checksums. /// Otherwise return information about column size on disk. ColumnSize getColumnSize(const String & column_name, const IDataType & /* type */) const; diff --git a/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp b/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp index 1b3f80b4e09..e5ee8b2be5e 100644 --- a/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp +++ b/src/Storages/MergeTree/MergeTreeBlockOutputStream.cpp @@ -32,14 +32,14 @@ void MergeTreeBlockOutputStream::write(const Block & block) PartLog::addNewPart(storage.global_context, part, watch.elapsed()); - if (auto * part_in_memory = dynamic_cast(part.get())) + if (auto part_in_memory = asInMemoryPart(part)) { storage.in_memory_merges_throttler.add(part_in_memory->block.bytes(), part_in_memory->rows_count); auto settings = storage.getSettings(); if (settings->in_memory_parts_insert_sync) { - if (!part->waitUntilMerged(in_memory_parts_timeout)) + if (!part_in_memory->waitUntilMerged(in_memory_parts_timeout)) throw Exception("Timeout exceeded while waiting to write part " + part->name + " on disk", ErrorCodes::TIMEOUT_EXCEEDED); } diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index ef526552e12..098416e87ed 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -1941,7 +1941,7 @@ void MergeTreeData::renameTempPartAndReplace( addPartContributionToColumnSizes(part); } - auto * part_in_memory = dynamic_cast(part.get()); + auto part_in_memory = asInMemoryPart(part); if (part_in_memory && getSettings()->in_memory_parts_enable_wal) { auto wal = getWriteAheadLog(); @@ -3271,7 +3271,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPartOnSameDisk( throw Exception("Part in " + fullPath(disk, dst_part_path) + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS); /// If source part is in memory, flush it to disk and clone it already in on-disk format - if (const auto * src_part_in_memory = dynamic_cast(src_part.get())) + if (auto src_part_in_memory = asInMemoryPart(src_part)) { const auto & src_relative_data_path = src_part_in_memory->storage.relative_data_path; auto flushed_part_path = src_part_in_memory->getRelativePathForPrefix(tmp_part_prefix); @@ -3367,7 +3367,7 @@ void MergeTreeData::freezePartitionsByMatcher(MatcherFn matcher, const String & LOG_DEBUG(log, "Freezing part {} snapshot will be placed at {}", part->name, backup_path); String backup_part_path = backup_path + relative_data_path + part->relative_path; - if (const auto * part_in_memory = dynamic_cast(part.get())) + if (auto part_in_memory = asInMemoryPart(part)) part_in_memory->flushToDisk(backup_path + relative_data_path, part->relative_path); else localBackup(part->volume->getDisk(), part->getFullRelativePath(), backup_part_path); diff --git a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp index 4f6a5e38384..7c6204a5a32 100644 --- a/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp +++ b/src/Storages/MergeTree/MergeTreeDataMergerMutator.cpp @@ -1455,7 +1455,7 @@ NamesAndTypesList MergeTreeDataMergerMutator::getColumnsForNewDataPart( { /// In compact parts we read all columns, because they all stored in a /// single file - if (isCompactPart(source_part)) + if (!isWidePart(source_part)) return updated_header.getNamesAndTypesList(); NameSet removed_columns; diff --git a/src/Storages/MergeTree/MergeTreeDataPartInMemory.cpp b/src/Storages/MergeTree/MergeTreeDataPartInMemory.cpp index 0d930eba4e8..bec9d16209d 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartInMemory.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartInMemory.cpp @@ -13,7 +13,6 @@ namespace DB namespace ErrorCodes { - extern const int NOT_IMPLEMENTED; extern const int DIRECTORY_ALREADY_EXISTS; } @@ -124,4 +123,10 @@ void MergeTreeDataPartInMemory::calculateEachColumnSizesOnDisk(ColumnSizeByName each_columns_size[column.name].data_uncompressed += block.getByName(column.name).column->byteSize(); } +DataPartInMemoryPtr asInMemoryPart(const MergeTreeDataPartPtr & part) +{ + return std::dynamic_pointer_cast(part); +} + + } diff --git a/src/Storages/MergeTree/MergeTreeDataPartInMemory.h b/src/Storages/MergeTree/MergeTreeDataPartInMemory.h index 3e2ec82b038..e48d9b8e201 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartInMemory.h +++ b/src/Storages/MergeTree/MergeTreeDataPartInMemory.h @@ -45,8 +45,8 @@ public: void flushToDisk(const String & base_path, const String & new_relative_path) const; - bool waitUntilMerged(size_t timeout) const override; - void notifyMerged() const override; + bool waitUntilMerged(size_t timeout) const; + void notifyMerged() const; mutable Block block; @@ -58,5 +58,6 @@ private: }; using DataPartInMemoryPtr = std::shared_ptr; +DataPartInMemoryPtr asInMemoryPart(const MergeTreeDataPartPtr & part); } diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterInMemory.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterInMemory.cpp index 917f2b862a9..2c50d5baee0 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterInMemory.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterInMemory.cpp @@ -79,6 +79,11 @@ static MergeTreeDataPartChecksum createUncompressedChecksum(size_t size, SipHash void MergeTreeDataPartWriterInMemory::finishDataSerialization(IMergeTreeDataPart::Checksums & checksums) { + /// If part is empty we still need to initialize block by empty columns. + if (!part_in_memory->block) + for (const auto & column : columns_list) + part_in_memory->block.insert(ColumnWithTypeAndName{column.type, column.name}); + SipHash hash; for (const auto & column : part_in_memory->block) column.column->updateHashFast(hash); diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp index 11c12d47823..b367cf73a08 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeQueue.cpp @@ -1007,7 +1007,7 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry( auto part = data.getPartIfExists(name, {MergeTreeDataPartState::PreCommitted, MergeTreeDataPartState::Committed, MergeTreeDataPartState::Outdated}); if (part) { - if (const auto * part_in_memory = dynamic_cast(part.get())) + if (auto part_in_memory = asInMemoryPart(part)) sum_parts_size_in_bytes += part_in_memory->block.bytes(); else sum_parts_size_in_bytes += part->getBytesOnDisk(); diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 44a942551fb..b5cf716b079 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -681,11 +681,11 @@ bool StorageMergeTree::merge( auto lock = lockParts(); for (const auto & part : future_part.parts) { - part->notifyMerged(); - if (isInMemoryPart(part)) + if (auto part_in_memory = asInMemoryPart(part)) { - modifyPartState(part, DataPartState::Deleting); - parts_to_remove_immediately.push_back(part); + part_in_memory->notifyMerged(); + modifyPartState(part_in_memory, DataPartState::Deleting); + parts_to_remove_immediately.push_back(part_in_memory); } } } diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index 9babf9476d1..882b5593c76 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -1089,11 +1089,11 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry) DataPartsVector parts_to_remove_immediatly; for (const auto & part_ptr : parts) { - part_ptr->notifyMerged(); - if (isInMemoryPart(part_ptr)) + if (auto part_in_memory = asInMemoryPart(part_ptr)) { - modifyPartState(part_ptr, DataPartState::Deleting); - parts_to_remove_immediatly.push_back(part_ptr); + part_in_memory->notifyMerged(); + modifyPartState(part_in_memory, DataPartState::Deleting); + parts_to_remove_immediatly.push_back(part_in_memory); } } diff --git a/tests/queries/0_stateless/01130_in_memory_parts.reference b/tests/queries/0_stateless/01130_in_memory_parts.reference index 4a22f17c644..ad5435abb59 100644 --- a/tests/queries/0_stateless/01130_in_memory_parts.reference +++ b/tests/queries/0_stateless/01130_in_memory_parts.reference @@ -36,3 +36,4 @@ Mutations and Alters 4 [4,16] 5 [] 7 [7,49] +0 diff --git a/tests/queries/0_stateless/01130_in_memory_parts.sql b/tests/queries/0_stateless/01130_in_memory_parts.sql index 21665faefd6..dca12a85841 100644 --- a/tests/queries/0_stateless/01130_in_memory_parts.sql +++ b/tests/queries/0_stateless/01130_in_memory_parts.sql @@ -39,4 +39,8 @@ ALTER TABLE in_memory DROP COLUMN str; SELECT * FROM in_memory ORDER BY a LIMIT 5; +-- in-memory parts works if they're empty. +ALTER TABLE in_memory DELETE WHERE 1; +SELECT count() FROM in_memory; + DROP TABLE in_memory;