Merge pull request #55186 from ClickHouse/remove-in-memory-data-parts

Remove the support for in memory data parts (part 2)
This commit is contained in:
Alexey Milovidov 2023-10-09 23:53:25 +02:00 committed by GitHub
commit b50889a8ff
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 10 additions and 427 deletions

View File

@ -1744,62 +1744,6 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
}
}
if (settings->in_memory_parts_enable_wal)
{
std::vector<MutableDataPartsVector> disks_wal_parts(disks.size());
std::mutex wal_init_lock;
std::vector<std::future<void>> wal_disks_futures;
wal_disks_futures.reserve(disks.size());
for (size_t i = 0; i < disks.size(); ++i)
{
const auto & disk_ptr = disks[i];
if (disk_ptr->isBroken())
continue;
auto & disk_wal_parts = disks_wal_parts[i];
wal_disks_futures.push_back(runner([&, disk_ptr]()
{
for (auto it = disk_ptr->iterateDirectory(relative_data_path); it->isValid(); it->next())
{
if (!startsWith(it->name(), MergeTreeWriteAheadLog::WAL_FILE_NAME))
continue;
if (it->name() == MergeTreeWriteAheadLog::DEFAULT_WAL_FILE_NAME)
{
std::lock_guard lock(wal_init_lock);
if (write_ahead_log != nullptr)
throw Exception(ErrorCodes::CORRUPTED_DATA,
"There are multiple WAL files appeared in current storage policy. "
"You need to resolve this manually");
write_ahead_log = std::make_shared<MergeTreeWriteAheadLog>(*this, disk_ptr, it->name());
for (auto && part : write_ahead_log->restore(metadata_snapshot, getContext(), part_lock, is_static_storage))
disk_wal_parts.push_back(std::move(part));
}
else
{
MergeTreeWriteAheadLog wal(*this, disk_ptr, it->name());
for (auto && part : wal.restore(metadata_snapshot, getContext(), part_lock, is_static_storage))
disk_wal_parts.push_back(std::move(part));
}
}
}, Priority{0}));
}
/// For for iteration to be completed
waitForAllToFinishAndRethrowFirstError(wal_disks_futures);
MutableDataPartsVector parts_from_wal;
for (auto & disk_wal_parts : disks_wal_parts)
std::move(disk_wal_parts.begin(), disk_wal_parts.end(), std::back_inserter(parts_from_wal));
loadDataPartsFromWAL(parts_from_wal);
num_parts += parts_from_wal.size();
}
if (num_parts == 0)
{
resetObjectColumnsFromActiveParts(part_lock);
@ -2629,68 +2573,6 @@ void MergeTreeData::clearPartsFromFilesystemImpl(const DataPartsVector & parts_t
}
size_t MergeTreeData::clearOldWriteAheadLogs()
{
DataPartsVector parts = getDataPartsVectorForInternalUsage();
std::vector<std::pair<Int64, Int64>> all_block_numbers_on_disk;
std::vector<std::pair<Int64, Int64>> block_numbers_on_disk;
for (const auto & part : parts)
if (part->isStoredOnDisk())
all_block_numbers_on_disk.emplace_back(part->info.min_block, part->info.max_block);
if (all_block_numbers_on_disk.empty())
return 0;
::sort(all_block_numbers_on_disk.begin(), all_block_numbers_on_disk.end());
block_numbers_on_disk.push_back(all_block_numbers_on_disk[0]);
for (size_t i = 1; i < all_block_numbers_on_disk.size(); ++i)
{
if (all_block_numbers_on_disk[i].first == all_block_numbers_on_disk[i - 1].second + 1)
block_numbers_on_disk.back().second = all_block_numbers_on_disk[i].second;
else
block_numbers_on_disk.push_back(all_block_numbers_on_disk[i]);
}
auto is_range_on_disk = [&block_numbers_on_disk](Int64 min_block, Int64 max_block)
{
auto lower = std::lower_bound(block_numbers_on_disk.begin(), block_numbers_on_disk.end(), std::make_pair(min_block, Int64(-1L)));
if (lower != block_numbers_on_disk.end() && min_block >= lower->first && max_block <= lower->second)
return true;
if (lower != block_numbers_on_disk.begin())
{
--lower;
if (min_block >= lower->first && max_block <= lower->second)
return true;
}
return false;
};
size_t cleared_count = 0;
auto disks = getStoragePolicy()->getDisks();
for (auto disk_it = disks.rbegin(); disk_it != disks.rend(); ++disk_it)
{
auto disk_ptr = *disk_it;
if (disk_ptr->isBroken())
continue;
for (auto it = disk_ptr->iterateDirectory(relative_data_path); it->isValid(); it->next())
{
auto min_max_block_number = MergeTreeWriteAheadLog::tryParseMinMaxBlockNumber(it->name());
if (min_max_block_number && is_range_on_disk(min_max_block_number->first, min_max_block_number->second))
{
LOG_DEBUG(log, "Removing from filesystem the outdated WAL file {}", it->name());
disk_ptr->removeFile(relative_data_path + it->name());
++cleared_count;
}
}
}
return cleared_count;
}
size_t MergeTreeData::clearEmptyParts()
{
if (!getSettings()->remove_empty_parts)
@ -2747,17 +2629,6 @@ void MergeTreeData::rename(const String & new_table_path, const StorageID & new_
throw Exception(ErrorCodes::DIRECTORY_ALREADY_EXISTS, "Target path already exists: {}", fullPath(disk, new_table_path));
}
{
/// Relies on storage path, so we drop it during rename
/// it will be recreated automatically.
std::lock_guard wal_lock(write_ahead_log_mutex);
if (write_ahead_log)
{
write_ahead_log->shutdown();
write_ahead_log.reset();
}
}
for (const auto & disk : disks)
{
auto new_table_path_parent = parentPath(new_table_path);
@ -2800,12 +2671,6 @@ void MergeTreeData::dropAllData()
all_parts.push_back(*it);
}
{
std::lock_guard wal_lock(write_ahead_log_mutex);
if (write_ahead_log)
write_ahead_log->shutdown();
}
/// Tables in atomic databases have UUID and stored in persistent locations.
/// No need to clear caches (that are keyed by filesystem path) because collision is not possible.
if (!getStorageID().hasUUID())
@ -2864,8 +2729,6 @@ void MergeTreeData::dropAllData()
if (disk->exists(fs::path(relative_data_path) / MOVING_DIR_NAME))
disk->removeRecursive(fs::path(relative_data_path) / MOVING_DIR_NAME);
MergeTreeWriteAheadLog::dropAllWriteAheadLogs(disk, relative_data_path);
try
{
if (!disk->isDirectoryEmpty(relative_data_path) &&
@ -3830,9 +3693,6 @@ void MergeTreeData::removePartsFromWorkingSet(MergeTreeTransaction * txn, const
if (part->getState() != MergeTreeDataPartState::Outdated)
modifyPartState(part, MergeTreeDataPartState::Outdated);
if (isInMemoryPart(part) && getSettings()->in_memory_parts_enable_wal)
getWriteAheadLog()->dropPart(part->name);
}
if (removed_active_part)
@ -8255,30 +8115,6 @@ AlterConversionsPtr MergeTreeData::getAlterConversionsForPart(MergeTreeDataPartP
return result;
}
MergeTreeData::WriteAheadLogPtr MergeTreeData::getWriteAheadLog()
{
std::lock_guard lock(write_ahead_log_mutex);
if (!write_ahead_log)
{
auto reservation = reserveSpace(getSettings()->write_ahead_log_max_bytes);
for (const auto & disk: reservation->getDisks())
{
if (!disk->isRemote())
{
write_ahead_log = std::make_shared<MergeTreeWriteAheadLog>(*this, disk);
break;
}
}
if (!write_ahead_log)
throw Exception(
ErrorCodes::NOT_IMPLEMENTED,
"Can't store write ahead log in remote disk. It makes no sense.");
}
return write_ahead_log;
}
NamesAndTypesList MergeTreeData::getVirtuals() const
{
return NamesAndTypesList{

View File

@ -689,9 +689,6 @@ public:
/// Try to clear parts from filesystem. Throw exception in case of errors.
void clearPartsFromFilesystem(const DataPartsVector & parts, bool throw_on_error = true, NameSet * parts_failed_to_delete = nullptr);
/// Delete WAL files containing parts, that all already stored on disk.
size_t clearOldWriteAheadLogs();
/// Delete all directories which names begin with "tmp"
/// Must be called with locked lockForShare() because it's using relative_data_path.
size_t clearOldTemporaryDirectories(size_t custom_directories_lifetime_seconds, const NameSet & valid_prefixes = {"tmp_", "tmp-fetch_"});
@ -945,9 +942,6 @@ public:
/// Method is cheap and doesn't require any locks.
size_t getTotalMergesWithTTLInMergeList() const;
using WriteAheadLogPtr = std::shared_ptr<MergeTreeWriteAheadLog>;
WriteAheadLogPtr getWriteAheadLog();
constexpr static auto EMPTY_PART_TMP_PREFIX = "tmp_empty_";
std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> createEmptyPart(
MergeTreePartInfo & new_part_info, const MergeTreePartition & partition,
@ -1513,9 +1507,6 @@ private:
bool canUsePolymorphicParts(const MergeTreeSettings & settings, String & out_reason) const;
std::mutex write_ahead_log_mutex;
WriteAheadLogPtr write_ahead_log;
virtual void startBackgroundMovesIfNeeded() = 0;
bool allow_nullable_key{};

View File

@ -1,7 +1,6 @@
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
#include <Storages/MergeTree/MergeTreeReaderInMemory.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Storages/MergeTree/MergeTreeDataPartWriterInMemory.h>
#include <Storages/MergeTree/IMergeTreeReader.h>
#include <Storages/MergeTree/LoadedMergeTreeDataPartInfoForReader.h>
#include <Storages/MergeTree/DataPartStorageOnDiskFull.h>
@ -9,14 +8,13 @@
#include <Disks/createVolume.h>
#include <Interpreters/Context.h>
#include <Poco/Logger.h>
#include <Common/logger_useful.h>
namespace DB
{
namespace ErrorCodes
{
extern const int DIRECTORY_ALREADY_EXISTS;
extern const int NOT_IMPLEMENTED;
}
@ -50,93 +48,19 @@ IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartInMemory::getReader(
}
IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartInMemory::getWriter(
const NamesAndTypesList & columns_list,
const StorageMetadataPtr & metadata_snapshot,
const std::vector<MergeTreeIndexPtr> & /* indices_to_recalc */,
const CompressionCodecPtr & /* default_codec */,
const MergeTreeWriterSettings & writer_settings,
const MergeTreeIndexGranularity & /* computed_index_granularity */)
const NamesAndTypesList &,
const StorageMetadataPtr &,
const std::vector<MergeTreeIndexPtr> &,
const CompressionCodecPtr &,
const MergeTreeWriterSettings &,
const MergeTreeIndexGranularity &)
{
auto ptr = std::static_pointer_cast<MergeTreeDataPartInMemory>(shared_from_this());
return std::make_unique<MergeTreeDataPartWriterInMemory>(
ptr, columns_list, metadata_snapshot, writer_settings);
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "In-memory data parts are obsolete and no longer supported for writing");
}
MutableDataPartStoragePtr MergeTreeDataPartInMemory::flushToDisk(const String & new_relative_path, const StorageMetadataPtr & metadata_snapshot) const
MutableDataPartStoragePtr MergeTreeDataPartInMemory::flushToDisk(const String &, const StorageMetadataPtr &) const
{
auto reservation = storage.reserveSpace(block.bytes(), getDataPartStorage());
VolumePtr volume = storage.getStoragePolicy()->getVolume(0);
VolumePtr data_part_volume = createVolumeFromReservation(reservation, volume);
auto new_data_part = storage.getDataPartBuilder(name, data_part_volume, new_relative_path)
.withPartFormat(storage.choosePartFormatOnDisk(block.bytes(), rows_count))
.build();
auto new_data_part_storage = new_data_part->getDataPartStoragePtr();
new_data_part_storage->beginTransaction();
new_data_part->uuid = uuid;
new_data_part->setColumns(columns, {}, metadata_snapshot->getMetadataVersion());
new_data_part->partition.value = partition.value;
new_data_part->minmax_idx = minmax_idx;
if (new_data_part_storage->exists())
{
throw Exception(
ErrorCodes::DIRECTORY_ALREADY_EXISTS,
"Could not flush part {}. Part in {} already exists",
quoteString(getDataPartStorage().getFullPath()),
new_data_part_storage->getFullPath());
}
new_data_part_storage->createDirectories();
auto compression_codec = storage.getContext()->chooseCompressionCodec(0, 0);
auto indices = MergeTreeIndexFactory::instance().getMany(metadata_snapshot->getSecondaryIndices());
MergedBlockOutputStream out(new_data_part, metadata_snapshot, columns, indices, compression_codec, NO_TRANSACTION_PTR);
out.write(block);
const auto & projections = metadata_snapshot->getProjections();
for (const auto & [projection_name, projection] : projection_parts)
{
if (projections.has(projection_name))
{
auto old_projection_part = asInMemoryPart(projection);
auto new_projection_part = new_data_part->getProjectionPartBuilder(projection_name)
.withPartFormat(storage.choosePartFormatOnDisk(old_projection_part->block.bytes(), rows_count))
.build();
new_projection_part->is_temp = false; // clean up will be done on parent part
new_projection_part->setColumns(projection->getColumns(), {}, metadata_snapshot->getMetadataVersion());
auto new_projection_part_storage = new_projection_part->getDataPartStoragePtr();
if (new_projection_part_storage->exists())
{
throw Exception(
ErrorCodes::DIRECTORY_ALREADY_EXISTS,
"Could not flush projection part {}. Projection part in {} already exists",
projection_name,
new_projection_part_storage->getFullPath());
}
new_projection_part_storage->createDirectories();
const auto & desc = projections.get(name);
auto projection_compression_codec = storage.getContext()->chooseCompressionCodec(0, 0);
auto projection_indices = MergeTreeIndexFactory::instance().getMany(desc.metadata->getSecondaryIndices());
MergedBlockOutputStream projection_out(
new_projection_part, desc.metadata,
new_projection_part->getColumns(), projection_indices,
projection_compression_codec, NO_TRANSACTION_PTR);
projection_out.write(old_projection_part->block);
projection_out.finalizePart(new_projection_part, false);
new_data_part->addProjectionPart(projection_name, std::move(new_projection_part));
}
}
out.finalizePart(new_data_part, false);
new_data_part_storage->commitTransaction();
return new_data_part_storage;
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "In-memory data parts are obsolete and no longer supported for writing");
}
DataPartStoragePtr MergeTreeDataPartInMemory::makeCloneInDetached(const String & prefix,

View File

@ -1,89 +0,0 @@
#include <Storages/MergeTree/MergeTreeDataPartWriterInMemory.h>
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
#include <Storages/MergeTree/MergeTreeWriteAheadLog.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
MergeTreeDataPartWriterInMemory::MergeTreeDataPartWriterInMemory(
const MutableDataPartInMemoryPtr & part_,
const NamesAndTypesList & columns_list_,
const StorageMetadataPtr & metadata_snapshot_,
const MergeTreeWriterSettings & settings_)
: IMergeTreeDataPartWriter(part_, columns_list_, metadata_snapshot_, settings_)
, part_in_memory(part_) {}
void MergeTreeDataPartWriterInMemory::write(
const Block & block, const IColumn::Permutation * permutation)
{
if (part_in_memory->block)
throw Exception(ErrorCodes::LOGICAL_ERROR, "DataPartWriterInMemory supports only one write");
Block primary_key_block;
if (settings.rewrite_primary_key)
primary_key_block = getBlockAndPermute(block, metadata_snapshot->getPrimaryKeyColumns(), permutation);
Block result_block;
if (permutation)
{
for (const auto & col : columns_list)
{
if (primary_key_block.has(col.name))
result_block.insert(primary_key_block.getByName(col.name));
else
{
auto permuted = block.getByName(col.name);
permuted.column = permuted.column->permute(*permutation, 0);
result_block.insert(permuted);
}
}
}
else
{
for (const auto & col : columns_list)
result_block.insert(block.getByName(col.name));
}
index_granularity.appendMark(result_block.rows());
if (with_final_mark)
index_granularity.appendMark(0);
part_in_memory->block = std::move(result_block);
if (settings.rewrite_primary_key)
calculateAndSerializePrimaryIndex(primary_key_block);
}
void MergeTreeDataPartWriterInMemory::calculateAndSerializePrimaryIndex(const Block & primary_index_block)
{
size_t rows = primary_index_block.rows();
if (!rows)
return;
size_t primary_columns_num = primary_index_block.columns();
index_columns.resize(primary_columns_num);
for (size_t i = 0; i < primary_columns_num; ++i)
{
const auto & primary_column = *primary_index_block.getByPosition(i).column;
index_columns[i] = primary_column.cloneEmpty();
index_columns[i]->insertFrom(primary_column, 0);
if (with_final_mark)
index_columns[i]->insertFrom(primary_column, rows - 1);
}
}
void MergeTreeDataPartWriterInMemory::fillChecksums(IMergeTreeDataPart::Checksums & checksums, NameSet & /*checksums_to_remove*/)
{
/// If part is empty we still need to initialize block by empty columns.
if (!part_in_memory->block)
for (const auto & column : columns_list)
part_in_memory->block.insert(ColumnWithTypeAndName{column.type, column.name});
checksums.files["data.bin"] = part_in_memory->calculateBlockChecksum();
}
}

View File

@ -1,30 +0,0 @@
#pragma once
#include <Storages/MergeTree/IMergeTreeDataPartWriter.h>
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
namespace DB
{
/// Writes data part in memory.
class MergeTreeDataPartWriterInMemory : public IMergeTreeDataPartWriter
{
public:
MergeTreeDataPartWriterInMemory(
const MutableDataPartInMemoryPtr & part_,
const NamesAndTypesList & columns_list_,
const StorageMetadataPtr & metadata_snapshot,
const MergeTreeWriterSettings & settings_);
/// You can write only one block. In-memory part can be written only at INSERT.
void write(const Block & block, const IColumn::Permutation * permutation) override;
void fillChecksums(IMergeTreeDataPart::Checksums & checksums, NameSet & checksums_to_remove) override;
void finish(bool /*sync*/) override {}
private:
void calculateAndSerializePrimaryIndex(const Block & primary_index_block);
MutableDataPartInMemoryPtr part_in_memory;
};
}

View File

@ -178,7 +178,6 @@ void ReplicatedMergeTreeAttachThread::runImpl()
/// Temporary directories contain uninitialized results of Merges or Fetches (after forced restart),
/// don't allow to reinitialize them, delete each of them immediately.
storage.clearOldTemporaryDirectories(0, {"tmp_", "delete_tmp_", "tmp-fetch_"});
storage.clearOldWriteAheadLogs();
storage.createNewZooKeeperNodes();
storage.syncPinnedPartUUIDs();

View File

@ -153,7 +153,6 @@ Float32 ReplicatedMergeTreeCleanupThread::iterate()
auto lock = storage.lockForShare(RWLockImpl::NO_QUERY, storage.getSettings()->lock_acquire_timeout_for_background_operations);
/// Both use relative_data_path which changes during rename, so we
/// do it under share lock
cleaned_other += storage.clearOldWriteAheadLogs();
cleaned_part_like += storage.clearOldTemporaryDirectories(storage.getSettings()->temporary_directories_lifetime.totalSeconds());
}

View File

@ -130,7 +130,6 @@ StorageMergeTree::StorageMergeTree(
void StorageMergeTree::startup()
{
clearOldWriteAheadLogs();
clearEmptyParts();
/// Temporary directories contain incomplete results of merges (after forced restart)
@ -1382,7 +1381,6 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign
/// so execute under share lock.
size_t cleared_count = 0;
cleared_count += clearOldPartsFromFilesystem();
cleared_count += clearOldWriteAheadLogs();
cleared_count += clearOldMutations();
cleared_count += clearEmptyParts();
return cleared_count;

View File

@ -1,45 +0,0 @@
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
# Version 23.4 is the latest version to support writing in-memory parts.
node = cluster.add_instance(
"node_old",
image="clickhouse/clickhouse-server",
tag="23.4",
stay_alive=True,
with_installed_binary=True,
allow_analyzer=False,
)
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_in_memory_parts_still_read(start_cluster):
node.query(
"CREATE TABLE t (x UInt64, s String, a Array(Tuple(Map(String, LowCardinality(String)), Date32, DateTime64(3)))) ENGINE = MergeTree ORDER BY s SETTINGS min_rows_for_compact_part = 1000000, min_bytes_for_compact_part = '1G', in_memory_parts_enable_wal = 1"
)
node.query("INSERT INTO t SELECT * FROM generateRandom() LIMIT 100")
assert node.query("SELECT count() FROM t WHERE NOT ignore(*)") == "100\n"
node.restart_with_latest_version()
assert node.query("SELECT count() FROM t WHERE NOT ignore(*)") == "100\n"
node.query("INSERT INTO t SELECT * FROM generateRandom() LIMIT 100")
assert node.query("SELECT count() FROM t WHERE NOT ignore(*)") == "200\n"
node.restart_with_original_version()
assert node.query("SELECT count() FROM t WHERE NOT ignore(*)") == "200\n"