diff --git a/src/Storages/MergeTree/MergeTreeData.cpp b/src/Storages/MergeTree/MergeTreeData.cpp index 14366ff7a43..6c014307c3c 100644 --- a/src/Storages/MergeTree/MergeTreeData.cpp +++ b/src/Storages/MergeTree/MergeTreeData.cpp @@ -1744,62 +1744,6 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks) } } - if (settings->in_memory_parts_enable_wal) - { - std::vector disks_wal_parts(disks.size()); - std::mutex wal_init_lock; - - std::vector> wal_disks_futures; - wal_disks_futures.reserve(disks.size()); - - for (size_t i = 0; i < disks.size(); ++i) - { - const auto & disk_ptr = disks[i]; - if (disk_ptr->isBroken()) - continue; - - auto & disk_wal_parts = disks_wal_parts[i]; - - wal_disks_futures.push_back(runner([&, disk_ptr]() - { - for (auto it = disk_ptr->iterateDirectory(relative_data_path); it->isValid(); it->next()) - { - if (!startsWith(it->name(), MergeTreeWriteAheadLog::WAL_FILE_NAME)) - continue; - - if (it->name() == MergeTreeWriteAheadLog::DEFAULT_WAL_FILE_NAME) - { - std::lock_guard lock(wal_init_lock); - if (write_ahead_log != nullptr) - throw Exception(ErrorCodes::CORRUPTED_DATA, - "There are multiple WAL files appeared in current storage policy. " - "You need to resolve this manually"); - - write_ahead_log = std::make_shared(*this, disk_ptr, it->name()); - for (auto && part : write_ahead_log->restore(metadata_snapshot, getContext(), part_lock, is_static_storage)) - disk_wal_parts.push_back(std::move(part)); - } - else - { - MergeTreeWriteAheadLog wal(*this, disk_ptr, it->name()); - for (auto && part : wal.restore(metadata_snapshot, getContext(), part_lock, is_static_storage)) - disk_wal_parts.push_back(std::move(part)); - } - } - }, Priority{0})); - } - - /// For for iteration to be completed - waitForAllToFinishAndRethrowFirstError(wal_disks_futures); - - MutableDataPartsVector parts_from_wal; - for (auto & disk_wal_parts : disks_wal_parts) - std::move(disk_wal_parts.begin(), disk_wal_parts.end(), std::back_inserter(parts_from_wal)); - - loadDataPartsFromWAL(parts_from_wal); - num_parts += parts_from_wal.size(); - } - if (num_parts == 0) { resetObjectColumnsFromActiveParts(part_lock); @@ -2629,68 +2573,6 @@ void MergeTreeData::clearPartsFromFilesystemImpl(const DataPartsVector & parts_t } -size_t MergeTreeData::clearOldWriteAheadLogs() -{ - DataPartsVector parts = getDataPartsVectorForInternalUsage(); - std::vector> all_block_numbers_on_disk; - std::vector> block_numbers_on_disk; - - for (const auto & part : parts) - if (part->isStoredOnDisk()) - all_block_numbers_on_disk.emplace_back(part->info.min_block, part->info.max_block); - - if (all_block_numbers_on_disk.empty()) - return 0; - - ::sort(all_block_numbers_on_disk.begin(), all_block_numbers_on_disk.end()); - block_numbers_on_disk.push_back(all_block_numbers_on_disk[0]); - for (size_t i = 1; i < all_block_numbers_on_disk.size(); ++i) - { - if (all_block_numbers_on_disk[i].first == all_block_numbers_on_disk[i - 1].second + 1) - block_numbers_on_disk.back().second = all_block_numbers_on_disk[i].second; - else - block_numbers_on_disk.push_back(all_block_numbers_on_disk[i]); - } - - auto is_range_on_disk = [&block_numbers_on_disk](Int64 min_block, Int64 max_block) - { - auto lower = std::lower_bound(block_numbers_on_disk.begin(), block_numbers_on_disk.end(), std::make_pair(min_block, Int64(-1L))); - if (lower != block_numbers_on_disk.end() && min_block >= lower->first && max_block <= lower->second) - return true; - - if (lower != block_numbers_on_disk.begin()) - { - --lower; - if (min_block >= lower->first && max_block <= lower->second) - return true; - } - - return false; - }; - - size_t cleared_count = 0; - auto disks = getStoragePolicy()->getDisks(); - for (auto disk_it = disks.rbegin(); disk_it != disks.rend(); ++disk_it) - { - auto disk_ptr = *disk_it; - if (disk_ptr->isBroken()) - continue; - - for (auto it = disk_ptr->iterateDirectory(relative_data_path); it->isValid(); it->next()) - { - auto min_max_block_number = MergeTreeWriteAheadLog::tryParseMinMaxBlockNumber(it->name()); - if (min_max_block_number && is_range_on_disk(min_max_block_number->first, min_max_block_number->second)) - { - LOG_DEBUG(log, "Removing from filesystem the outdated WAL file {}", it->name()); - disk_ptr->removeFile(relative_data_path + it->name()); - ++cleared_count; - } - } - } - - return cleared_count; -} - size_t MergeTreeData::clearEmptyParts() { if (!getSettings()->remove_empty_parts) @@ -2747,17 +2629,6 @@ void MergeTreeData::rename(const String & new_table_path, const StorageID & new_ throw Exception(ErrorCodes::DIRECTORY_ALREADY_EXISTS, "Target path already exists: {}", fullPath(disk, new_table_path)); } - { - /// Relies on storage path, so we drop it during rename - /// it will be recreated automatically. - std::lock_guard wal_lock(write_ahead_log_mutex); - if (write_ahead_log) - { - write_ahead_log->shutdown(); - write_ahead_log.reset(); - } - } - for (const auto & disk : disks) { auto new_table_path_parent = parentPath(new_table_path); @@ -2800,12 +2671,6 @@ void MergeTreeData::dropAllData() all_parts.push_back(*it); } - { - std::lock_guard wal_lock(write_ahead_log_mutex); - if (write_ahead_log) - write_ahead_log->shutdown(); - } - /// Tables in atomic databases have UUID and stored in persistent locations. /// No need to clear caches (that are keyed by filesystem path) because collision is not possible. if (!getStorageID().hasUUID()) @@ -2864,8 +2729,6 @@ void MergeTreeData::dropAllData() if (disk->exists(fs::path(relative_data_path) / MOVING_DIR_NAME)) disk->removeRecursive(fs::path(relative_data_path) / MOVING_DIR_NAME); - MergeTreeWriteAheadLog::dropAllWriteAheadLogs(disk, relative_data_path); - try { if (!disk->isDirectoryEmpty(relative_data_path) && @@ -3830,9 +3693,6 @@ void MergeTreeData::removePartsFromWorkingSet(MergeTreeTransaction * txn, const if (part->getState() != MergeTreeDataPartState::Outdated) modifyPartState(part, MergeTreeDataPartState::Outdated); - - if (isInMemoryPart(part) && getSettings()->in_memory_parts_enable_wal) - getWriteAheadLog()->dropPart(part->name); } if (removed_active_part) @@ -8255,30 +8115,6 @@ AlterConversionsPtr MergeTreeData::getAlterConversionsForPart(MergeTreeDataPartP return result; } -MergeTreeData::WriteAheadLogPtr MergeTreeData::getWriteAheadLog() -{ - std::lock_guard lock(write_ahead_log_mutex); - if (!write_ahead_log) - { - auto reservation = reserveSpace(getSettings()->write_ahead_log_max_bytes); - for (const auto & disk: reservation->getDisks()) - { - if (!disk->isRemote()) - { - write_ahead_log = std::make_shared(*this, disk); - break; - } - } - - if (!write_ahead_log) - throw Exception( - ErrorCodes::NOT_IMPLEMENTED, - "Can't store write ahead log in remote disk. It makes no sense."); - } - - return write_ahead_log; -} - NamesAndTypesList MergeTreeData::getVirtuals() const { return NamesAndTypesList{ diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index 5fb26f9a057..55b4b23e351 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -689,9 +689,6 @@ public: /// Try to clear parts from filesystem. Throw exception in case of errors. void clearPartsFromFilesystem(const DataPartsVector & parts, bool throw_on_error = true, NameSet * parts_failed_to_delete = nullptr); - /// Delete WAL files containing parts, that all already stored on disk. - size_t clearOldWriteAheadLogs(); - /// Delete all directories which names begin with "tmp" /// Must be called with locked lockForShare() because it's using relative_data_path. size_t clearOldTemporaryDirectories(size_t custom_directories_lifetime_seconds, const NameSet & valid_prefixes = {"tmp_", "tmp-fetch_"}); @@ -945,9 +942,6 @@ public: /// Method is cheap and doesn't require any locks. size_t getTotalMergesWithTTLInMergeList() const; - using WriteAheadLogPtr = std::shared_ptr; - WriteAheadLogPtr getWriteAheadLog(); - constexpr static auto EMPTY_PART_TMP_PREFIX = "tmp_empty_"; std::pair createEmptyPart( MergeTreePartInfo & new_part_info, const MergeTreePartition & partition, @@ -1513,9 +1507,6 @@ private: bool canUsePolymorphicParts(const MergeTreeSettings & settings, String & out_reason) const; - std::mutex write_ahead_log_mutex; - WriteAheadLogPtr write_ahead_log; - virtual void startBackgroundMovesIfNeeded() = 0; bool allow_nullable_key{}; diff --git a/src/Storages/MergeTree/MergeTreeDataPartInMemory.cpp b/src/Storages/MergeTree/MergeTreeDataPartInMemory.cpp index f04e08838a9..2d8e095ea29 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartInMemory.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartInMemory.cpp @@ -1,7 +1,6 @@ #include #include #include -#include #include #include #include @@ -9,14 +8,13 @@ #include #include #include -#include + namespace DB { namespace ErrorCodes { - extern const int DIRECTORY_ALREADY_EXISTS; extern const int NOT_IMPLEMENTED; } @@ -50,93 +48,19 @@ IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartInMemory::getReader( } IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartInMemory::getWriter( - const NamesAndTypesList & columns_list, - const StorageMetadataPtr & metadata_snapshot, - const std::vector & /* indices_to_recalc */, - const CompressionCodecPtr & /* default_codec */, - const MergeTreeWriterSettings & writer_settings, - const MergeTreeIndexGranularity & /* computed_index_granularity */) + const NamesAndTypesList &, + const StorageMetadataPtr &, + const std::vector &, + const CompressionCodecPtr &, + const MergeTreeWriterSettings &, + const MergeTreeIndexGranularity &) { - auto ptr = std::static_pointer_cast(shared_from_this()); - return std::make_unique( - ptr, columns_list, metadata_snapshot, writer_settings); + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "In-memory data parts are obsolete and no longer supported for writing"); } -MutableDataPartStoragePtr MergeTreeDataPartInMemory::flushToDisk(const String & new_relative_path, const StorageMetadataPtr & metadata_snapshot) const +MutableDataPartStoragePtr MergeTreeDataPartInMemory::flushToDisk(const String &, const StorageMetadataPtr &) const { - auto reservation = storage.reserveSpace(block.bytes(), getDataPartStorage()); - VolumePtr volume = storage.getStoragePolicy()->getVolume(0); - VolumePtr data_part_volume = createVolumeFromReservation(reservation, volume); - - auto new_data_part = storage.getDataPartBuilder(name, data_part_volume, new_relative_path) - .withPartFormat(storage.choosePartFormatOnDisk(block.bytes(), rows_count)) - .build(); - - auto new_data_part_storage = new_data_part->getDataPartStoragePtr(); - new_data_part_storage->beginTransaction(); - - new_data_part->uuid = uuid; - new_data_part->setColumns(columns, {}, metadata_snapshot->getMetadataVersion()); - new_data_part->partition.value = partition.value; - new_data_part->minmax_idx = minmax_idx; - - if (new_data_part_storage->exists()) - { - throw Exception( - ErrorCodes::DIRECTORY_ALREADY_EXISTS, - "Could not flush part {}. Part in {} already exists", - quoteString(getDataPartStorage().getFullPath()), - new_data_part_storage->getFullPath()); - } - - new_data_part_storage->createDirectories(); - - auto compression_codec = storage.getContext()->chooseCompressionCodec(0, 0); - auto indices = MergeTreeIndexFactory::instance().getMany(metadata_snapshot->getSecondaryIndices()); - MergedBlockOutputStream out(new_data_part, metadata_snapshot, columns, indices, compression_codec, NO_TRANSACTION_PTR); - out.write(block); - - const auto & projections = metadata_snapshot->getProjections(); - for (const auto & [projection_name, projection] : projection_parts) - { - if (projections.has(projection_name)) - { - auto old_projection_part = asInMemoryPart(projection); - auto new_projection_part = new_data_part->getProjectionPartBuilder(projection_name) - .withPartFormat(storage.choosePartFormatOnDisk(old_projection_part->block.bytes(), rows_count)) - .build(); - - new_projection_part->is_temp = false; // clean up will be done on parent part - new_projection_part->setColumns(projection->getColumns(), {}, metadata_snapshot->getMetadataVersion()); - - auto new_projection_part_storage = new_projection_part->getDataPartStoragePtr(); - if (new_projection_part_storage->exists()) - { - throw Exception( - ErrorCodes::DIRECTORY_ALREADY_EXISTS, - "Could not flush projection part {}. Projection part in {} already exists", - projection_name, - new_projection_part_storage->getFullPath()); - } - - new_projection_part_storage->createDirectories(); - const auto & desc = projections.get(name); - auto projection_compression_codec = storage.getContext()->chooseCompressionCodec(0, 0); - auto projection_indices = MergeTreeIndexFactory::instance().getMany(desc.metadata->getSecondaryIndices()); - MergedBlockOutputStream projection_out( - new_projection_part, desc.metadata, - new_projection_part->getColumns(), projection_indices, - projection_compression_codec, NO_TRANSACTION_PTR); - - projection_out.write(old_projection_part->block); - projection_out.finalizePart(new_projection_part, false); - new_data_part->addProjectionPart(projection_name, std::move(new_projection_part)); - } - } - - out.finalizePart(new_data_part, false); - new_data_part_storage->commitTransaction(); - return new_data_part_storage; + throw Exception(ErrorCodes::NOT_IMPLEMENTED, "In-memory data parts are obsolete and no longer supported for writing"); } DataPartStoragePtr MergeTreeDataPartInMemory::makeCloneInDetached(const String & prefix, diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterInMemory.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterInMemory.cpp deleted file mode 100644 index 048339b58c9..00000000000 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterInMemory.cpp +++ /dev/null @@ -1,89 +0,0 @@ -#include -#include -#include - -namespace DB -{ - -namespace ErrorCodes -{ - extern const int LOGICAL_ERROR; -} - -MergeTreeDataPartWriterInMemory::MergeTreeDataPartWriterInMemory( - const MutableDataPartInMemoryPtr & part_, - const NamesAndTypesList & columns_list_, - const StorageMetadataPtr & metadata_snapshot_, - const MergeTreeWriterSettings & settings_) - : IMergeTreeDataPartWriter(part_, columns_list_, metadata_snapshot_, settings_) - , part_in_memory(part_) {} - -void MergeTreeDataPartWriterInMemory::write( - const Block & block, const IColumn::Permutation * permutation) -{ - if (part_in_memory->block) - throw Exception(ErrorCodes::LOGICAL_ERROR, "DataPartWriterInMemory supports only one write"); - - Block primary_key_block; - if (settings.rewrite_primary_key) - primary_key_block = getBlockAndPermute(block, metadata_snapshot->getPrimaryKeyColumns(), permutation); - - Block result_block; - if (permutation) - { - for (const auto & col : columns_list) - { - if (primary_key_block.has(col.name)) - result_block.insert(primary_key_block.getByName(col.name)); - else - { - auto permuted = block.getByName(col.name); - permuted.column = permuted.column->permute(*permutation, 0); - result_block.insert(permuted); - } - } - } - else - { - for (const auto & col : columns_list) - result_block.insert(block.getByName(col.name)); - } - - index_granularity.appendMark(result_block.rows()); - if (with_final_mark) - index_granularity.appendMark(0); - part_in_memory->block = std::move(result_block); - - if (settings.rewrite_primary_key) - calculateAndSerializePrimaryIndex(primary_key_block); -} - -void MergeTreeDataPartWriterInMemory::calculateAndSerializePrimaryIndex(const Block & primary_index_block) -{ - size_t rows = primary_index_block.rows(); - if (!rows) - return; - - size_t primary_columns_num = primary_index_block.columns(); - index_columns.resize(primary_columns_num); - for (size_t i = 0; i < primary_columns_num; ++i) - { - const auto & primary_column = *primary_index_block.getByPosition(i).column; - index_columns[i] = primary_column.cloneEmpty(); - index_columns[i]->insertFrom(primary_column, 0); - if (with_final_mark) - index_columns[i]->insertFrom(primary_column, rows - 1); - } -} - -void MergeTreeDataPartWriterInMemory::fillChecksums(IMergeTreeDataPart::Checksums & checksums, NameSet & /*checksums_to_remove*/) -{ - /// If part is empty we still need to initialize block by empty columns. - if (!part_in_memory->block) - for (const auto & column : columns_list) - part_in_memory->block.insert(ColumnWithTypeAndName{column.type, column.name}); - - checksums.files["data.bin"] = part_in_memory->calculateBlockChecksum(); -} - -} diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterInMemory.h b/src/Storages/MergeTree/MergeTreeDataPartWriterInMemory.h deleted file mode 100644 index 2d333822652..00000000000 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterInMemory.h +++ /dev/null @@ -1,30 +0,0 @@ -#pragma once -#include -#include - -namespace DB -{ - -/// Writes data part in memory. -class MergeTreeDataPartWriterInMemory : public IMergeTreeDataPartWriter -{ -public: - MergeTreeDataPartWriterInMemory( - const MutableDataPartInMemoryPtr & part_, - const NamesAndTypesList & columns_list_, - const StorageMetadataPtr & metadata_snapshot, - const MergeTreeWriterSettings & settings_); - - /// You can write only one block. In-memory part can be written only at INSERT. - void write(const Block & block, const IColumn::Permutation * permutation) override; - - void fillChecksums(IMergeTreeDataPart::Checksums & checksums, NameSet & checksums_to_remove) override; - void finish(bool /*sync*/) override {} - -private: - void calculateAndSerializePrimaryIndex(const Block & primary_index_block); - - MutableDataPartInMemoryPtr part_in_memory; -}; - -} diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp index 65cf8bbce72..6b575b7a51c 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeAttachThread.cpp @@ -178,7 +178,6 @@ void ReplicatedMergeTreeAttachThread::runImpl() /// Temporary directories contain uninitialized results of Merges or Fetches (after forced restart), /// don't allow to reinitialize them, delete each of them immediately. storage.clearOldTemporaryDirectories(0, {"tmp_", "delete_tmp_", "tmp-fetch_"}); - storage.clearOldWriteAheadLogs(); storage.createNewZooKeeperNodes(); storage.syncPinnedPartUUIDs(); diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp index c425e11419d..8daee661c75 100644 --- a/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp +++ b/src/Storages/MergeTree/ReplicatedMergeTreeCleanupThread.cpp @@ -153,7 +153,6 @@ Float32 ReplicatedMergeTreeCleanupThread::iterate() auto lock = storage.lockForShare(RWLockImpl::NO_QUERY, storage.getSettings()->lock_acquire_timeout_for_background_operations); /// Both use relative_data_path which changes during rename, so we /// do it under share lock - cleaned_other += storage.clearOldWriteAheadLogs(); cleaned_part_like += storage.clearOldTemporaryDirectories(storage.getSettings()->temporary_directories_lifetime.totalSeconds()); } diff --git a/src/Storages/StorageMergeTree.cpp b/src/Storages/StorageMergeTree.cpp index 6ae3cdef2a7..470e30b7947 100644 --- a/src/Storages/StorageMergeTree.cpp +++ b/src/Storages/StorageMergeTree.cpp @@ -130,7 +130,6 @@ StorageMergeTree::StorageMergeTree( void StorageMergeTree::startup() { - clearOldWriteAheadLogs(); clearEmptyParts(); /// Temporary directories contain incomplete results of merges (after forced restart) @@ -1382,7 +1381,6 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign /// so execute under share lock. size_t cleared_count = 0; cleared_count += clearOldPartsFromFilesystem(); - cleared_count += clearOldWriteAheadLogs(); cleared_count += clearOldMutations(); cleared_count += clearEmptyParts(); return cleared_count; diff --git a/tests/integration/test_backward_compatibility/test_in_memory_parts_still_read.py b/tests/integration/test_backward_compatibility/test_in_memory_parts_still_read.py deleted file mode 100644 index cd67f1f6344..00000000000 --- a/tests/integration/test_backward_compatibility/test_in_memory_parts_still_read.py +++ /dev/null @@ -1,45 +0,0 @@ -import pytest - -from helpers.cluster import ClickHouseCluster - - -cluster = ClickHouseCluster(__file__) - -# Version 23.4 is the latest version to support writing in-memory parts. -node = cluster.add_instance( - "node_old", - image="clickhouse/clickhouse-server", - tag="23.4", - stay_alive=True, - with_installed_binary=True, - allow_analyzer=False, -) - - -@pytest.fixture(scope="module") -def start_cluster(): - try: - cluster.start() - yield cluster - - finally: - cluster.shutdown() - - -def test_in_memory_parts_still_read(start_cluster): - node.query( - "CREATE TABLE t (x UInt64, s String, a Array(Tuple(Map(String, LowCardinality(String)), Date32, DateTime64(3)))) ENGINE = MergeTree ORDER BY s SETTINGS min_rows_for_compact_part = 1000000, min_bytes_for_compact_part = '1G', in_memory_parts_enable_wal = 1" - ) - node.query("INSERT INTO t SELECT * FROM generateRandom() LIMIT 100") - - assert node.query("SELECT count() FROM t WHERE NOT ignore(*)") == "100\n" - - node.restart_with_latest_version() - assert node.query("SELECT count() FROM t WHERE NOT ignore(*)") == "100\n" - - node.query("INSERT INTO t SELECT * FROM generateRandom() LIMIT 100") - - assert node.query("SELECT count() FROM t WHERE NOT ignore(*)") == "200\n" - - node.restart_with_original_version() - assert node.query("SELECT count() FROM t WHERE NOT ignore(*)") == "200\n"