Remove in-memory data parts, step 2

This commit is contained in:
Alexey Milovidov 2024-03-10 06:44:26 +01:00
parent c240c33037
commit d337379385
33 changed files with 25 additions and 1108 deletions

View File

@ -513,10 +513,6 @@ Part was moved to another disk and should be deleted in own destructor.
Not active data part with identity refcounter, it is deleting right now by a cleaner.
### PartsInMemory
In-memory parts.
### PartsOutdated
Not active data part, but could be used by only current SELECTs, could be deleted after SELECTs finishes.

View File

@ -226,7 +226,6 @@
M(PartsDeleteOnDestroy, "Part was moved to another disk and should be deleted in own destructor.") \
M(PartsWide, "Wide parts.") \
M(PartsCompact, "Compact parts.") \
M(PartsInMemory, "In-memory parts.") \
M(MMappedFiles, "Total number of mmapped files.") \
M(MMappedFileBytes, "Sum size of mmapped file regions.") \
M(AsynchronousReadWait, "Number of threads waiting for asynchronous read.") \

View File

@ -1,7 +1,6 @@
#pragma once
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
namespace DB
@ -10,7 +9,7 @@ namespace DB
/* Allow to compute more accurate progress statistics */
class ColumnSizeEstimator
{
using ColumnToSize = MergeTreeDataPartInMemory::ColumnToSize;
using ColumnToSize = std::map<String, size_t>;
ColumnToSize map;
public:

View File

@ -10,7 +10,6 @@
#include <IO/S3Common.h>
#include <Server/HTTP/HTMLForm.h>
#include <Server/HTTP/HTTPServerResponse.h>
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Storages/MergeTree/ReplicatedFetchList.h>
#include <Storages/StorageReplicatedMergeTree.h>
@ -191,8 +190,6 @@ void Service::processQuery(const HTMLForm & params, ReadBuffer & /*body*/, Write
}
if (data_settings->allow_remote_fs_zero_copy_replication &&
/// In memory data part does not have metadata yet.
!isInMemoryPart(part) &&
client_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_ZERO_COPY)
{
auto disk_type = part->getDataPartStorage().getDiskType();
@ -205,11 +202,7 @@ void Service::processQuery(const HTMLForm & params, ReadBuffer & /*body*/, Write
}
}
if (isInMemoryPart(part))
sendPartFromMemory(part, out, send_projections);
else
sendPartFromDisk(part, out, client_protocol_version, false, send_projections);
data.addLastSentPart(part->info);
}
catch (const NetException &)
@ -231,36 +224,6 @@ void Service::processQuery(const HTMLForm & params, ReadBuffer & /*body*/, Write
}
}
void Service::sendPartFromMemory(
const MergeTreeData::DataPartPtr & part, WriteBuffer & out, bool send_projections)
{
auto metadata_snapshot = data.getInMemoryMetadataPtr();
if (send_projections)
{
for (const auto & [name, projection] : part->getProjectionParts())
{
auto projection_sample_block = metadata_snapshot->projections.get(name).sample_block;
auto part_in_memory = asInMemoryPart(projection);
if (!part_in_memory)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Projection {} of part {} is not stored in memory", name, part->name);
writeStringBinary(name, out);
projection->checksums.write(out);
NativeWriter block_out(out, 0, projection_sample_block);
block_out.write(part_in_memory->block);
}
}
auto part_in_memory = asInMemoryPart(part);
if (!part_in_memory)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Part {} is not stored in memory", part->name);
NativeWriter block_out(out, 0, metadata_snapshot->getSampleBlock());
part->checksums.write(out);
block_out.write(part_in_memory->block);
data.getSendsThrottler()->add(part_in_memory->block.bytes());
}
MergeTreeData::DataPart::Checksums Service::sendPartFromDisk(
const MergeTreeData::DataPartPtr & part,
@ -641,8 +604,6 @@ std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> Fetcher::fetchSelected
remote_fs_metadata, fmt::join(capability, ", "));
if (server_protocol_version < REPLICATION_PROTOCOL_VERSION_WITH_PARTS_ZERO_COPY)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Got 'remote_fs_metadata' cookie with old protocol version {}", server_protocol_version);
if (part_type == PartType::InMemory)
throw Exception(ErrorCodes::INCORRECT_PART_TYPE, "Got 'remote_fs_metadata' cookie for in-memory part");
try
{
@ -701,7 +662,7 @@ std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> Fetcher::fetchSelected
}
auto storage_id = data.getStorageID();
String new_part_path = part_type == PartType::InMemory ? "memory" : fs::path(data.getFullPathOnDisk(disk)) / part_name / "";
String new_part_path = fs::path(data.getFullPathOnDisk(disk)) / part_name / "";
auto entry = data.getContext()->getReplicatedFetchList().insert(
storage_id.getDatabaseName(), storage_id.getTableName(),
part_info.partition_id, part_name, new_part_path,
@ -709,22 +670,6 @@ std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> Fetcher::fetchSelected
in->setNextCallback(ReplicatedFetchReadCallback(*entry));
if (part_type == PartType::InMemory)
{
auto volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, disk, 0);
auto data_part_storage = std::make_shared<DataPartStorageOnDiskFull>(
volume,
data.getRelativeDataPath(),
part_name);
return std::make_pair(downloadPartToMemory(
data_part_storage, part_name,
MergeTreePartInfo::fromPartName(part_name, data.format_version),
part_uuid, metadata_snapshot, context, *in,
projections, false, throttler), std::move(temporary_directory_lock));
}
auto output_buffer_getter = [](IDataPartStorage & part_storage, const String & file_name, size_t file_size)
{
return part_storage.writeFile(file_name, std::min<UInt64>(file_size, DBMS_DEFAULT_BUFFER_SIZE), {});
@ -736,66 +681,6 @@ std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> Fetcher::fetchSelected
projections, throttler, sync),std::move(temporary_directory_lock));
}
MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
MutableDataPartStoragePtr data_part_storage,
const String & part_name,
const MergeTreePartInfo & part_info,
const UUID & part_uuid,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
PooledReadWriteBufferFromHTTP & in,
size_t projections,
bool is_projection,
ThrottlerPtr throttler)
{
auto new_data_part = std::make_shared<MergeTreeDataPartInMemory>(data, part_name, part_info, data_part_storage);
for (size_t i = 0; i < projections; ++i)
{
String projection_name;
readStringBinary(projection_name, in);
MergeTreePartInfo new_part_info("all", 0, 0, 0);
auto projection_part_storage = data_part_storage->getProjection(projection_name + ".proj");
auto new_projection_part = downloadPartToMemory(
projection_part_storage, projection_name,
new_part_info, part_uuid, metadata_snapshot,
context, in, 0, true, throttler);
new_data_part->addProjectionPart(projection_name, std::move(new_projection_part));
}
MergeTreeData::DataPart::Checksums checksums;
if (!checksums.read(in))
throw Exception(ErrorCodes::CORRUPTED_DATA, "Cannot deserialize checksums");
NativeReader block_in(in, 0);
auto block = block_in.read();
throttler->add(block.bytes());
new_data_part->setColumns(block.getNamesAndTypesList(), {}, metadata_snapshot->getMetadataVersion());
if (!is_projection)
{
new_data_part->version.setCreationTID(Tx::PrehistoricTID, nullptr);
new_data_part->uuid = part_uuid;
new_data_part->is_temp = true;
new_data_part->minmax_idx->update(block, data.getMinMaxColumnsNames(metadata_snapshot->getPartitionKey()));
new_data_part->partition.create(metadata_snapshot, block, 0, context);
}
MergedBlockOutputStream part_out(
new_data_part, metadata_snapshot, block.getNamesAndTypesList(), {}, {},
CompressionCodecFactory::instance().get("NONE", {}), NO_TRANSACTION_PTR);
part_out.write(block);
part_out.finalizePart(new_data_part, false);
new_data_part->checksums.checkEqual(checksums, /* have_uncompressed = */ true);
return new_data_part;
}
void Fetcher::downloadBaseOrProjectionPartToDisk(
const String & replica_path,
const MutableDataPartStoragePtr & data_part_storage,

View File

@ -40,10 +40,6 @@ public:
private:
MergeTreeData::DataPartPtr findPart(const String & name);
void sendPartFromMemory(
const MergeTreeData::DataPartPtr & part,
WriteBuffer & out,
bool send_projections);
MergeTreeData::DataPart::Checksums sendPartFromDisk(
const MergeTreeData::DataPartPtr & part,
@ -113,18 +109,6 @@ private:
ThrottlerPtr throttler,
bool sync);
MergeTreeData::MutableDataPartPtr downloadPartToMemory(
MutableDataPartStoragePtr data_part_storage,
const String & part_name,
const MergeTreePartInfo & part_info,
const UUID & part_uuid,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
PooledReadWriteBufferFromHTTP & in,
size_t projections,
bool is_projection,
ThrottlerPtr throttler);
MergeTreeData::MutableDataPartPtr downloadPartToDiskRemoteMeta(
const String & part_name,
const String & replica_path,

View File

@ -51,7 +51,6 @@ namespace CurrentMetrics
extern const Metric PartsWide;
extern const Metric PartsCompact;
extern const Metric PartsInMemory;
}
namespace DB
@ -278,9 +277,6 @@ static void incrementTypeMetric(MergeTreeDataPartType type)
case MergeTreeDataPartType::Compact:
CurrentMetrics::add(CurrentMetrics::PartsCompact);
return;
case MergeTreeDataPartType::InMemory:
CurrentMetrics::add(CurrentMetrics::PartsInMemory);
return;
case MergeTreeDataPartType::Unknown:
return;
}
@ -296,9 +292,6 @@ static void decrementTypeMetric(MergeTreeDataPartType type)
case MergeTreeDataPartType::Compact:
CurrentMetrics::sub(CurrentMetrics::PartsCompact);
return;
case MergeTreeDataPartType::InMemory:
CurrentMetrics::sub(CurrentMetrics::PartsInMemory);
return;
case MergeTreeDataPartType::Unknown:
return;
}
@ -2207,11 +2200,6 @@ bool isWidePart(const MergeTreeDataPartPtr & data_part)
return (data_part && data_part->getType() == MergeTreeDataPartType::Wide);
}
bool isInMemoryPart(const MergeTreeDataPartPtr & data_part)
{
return (data_part && data_part->getType() == MergeTreeDataPartType::InMemory);
}
std::optional<std::string> getIndexExtensionFromFilesystem(const IDataPartStorage & data_part_storage)
{
if (data_part_storage.exists())

View File

@ -710,7 +710,6 @@ using MergeTreeMutableDataPartPtr = std::shared_ptr<IMergeTreeDataPart>;
bool isCompactPart(const MergeTreeDataPartPtr & data_part);
bool isWidePart(const MergeTreeDataPartPtr & data_part);
bool isInMemoryPart(const MergeTreeDataPartPtr & data_part);
inline String getIndexExtension(bool is_compressed_primary_key) { return is_compressed_primary_key ? ".cidx" : ".idx"; }
std::optional<String> getIndexExtensionFromFilesystem(const IDataPartStorage & data_part_storage);

View File

@ -34,8 +34,6 @@ public:
virtual bool isWidePart() const = 0;
virtual bool isInMemoryPart() const = 0;
virtual bool isProjectionPart() const = 0;
virtual DataPartStoragePtr getDataPartStorage() const = 0;

View File

@ -22,8 +22,6 @@ public:
bool isWidePart() const override { return DB::isWidePart(data_part); }
bool isInMemoryPart() const override { return DB::isInMemoryPart(data_part); }
bool isProjectionPart() const override { return data_part->isProjectionPart(); }
DataPartStoragePtr getDataPartStorage() const override { return data_part->getDataPartStoragePtr(); }

View File

@ -309,7 +309,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
ctx->rows_sources_uncompressed_write_buf = ctx->tmp_disk->createRawStream();
ctx->rows_sources_write_buf = std::make_unique<CompressedWriteBuffer>(*ctx->rows_sources_uncompressed_write_buf);
MergeTreeDataPartInMemory::ColumnToSize local_merged_column_to_size;
std::map<String, size_t> local_merged_column_to_size;
for (const MergeTreeData::DataPartPtr & part : global_ctx->future_part->parts)
part->accumulateColumnSizes(local_merged_column_to_size);

View File

@ -72,7 +72,6 @@
#include <Storages/MergeTree/DataPartStorageOnDiskFull.h>
#include <Storages/MergeTree/MergeTreeDataPartBuilder.h>
#include <Storages/MergeTree/MergeTreeDataPartCompact.h>
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
#include <Storages/Statistics/Estimator.h>
#include <Storages/MergeTree/MergeTreeSelectProcessor.h>
#include <Storages/MergeTree/checkDataPart.h>
@ -1707,8 +1706,7 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks, std::optional<std::un
{
/// Skip temporary directories, file 'format_version.txt' and directory 'detached'.
if (startsWith(it->name(), "tmp") || it->name() == MergeTreeData::FORMAT_VERSION_FILE_NAME
|| it->name() == MergeTreeData::DETACHED_DIR_NAME
|| startsWith(it->name(), MergeTreeWriteAheadLog::WAL_FILE_NAME))
|| it->name() == MergeTreeData::DETACHED_DIR_NAME)
continue;
if (auto part_info = MergeTreePartInfo::tryParsePartName(it->name(), format_version))
@ -2261,7 +2259,6 @@ MergeTreeData::DataPartsVector MergeTreeData::grabOldParts(bool force)
bool reached_removal_time = part_remove_time <= time_now && time_now - part_remove_time >= getSettings()->old_parts_lifetime.totalSeconds();
if ((reached_removal_time && !has_skipped_mutation_parent(part))
|| force
|| isInMemoryPart(part) /// Remove in-memory parts immediately to not store excessive data in RAM
|| (part->version.creation_csn == Tx::RolledBackCSN && getSettings()->remove_rolled_back_parts_immediately))
{
part->removal_state.store(DataPartRemovalState::REMOVED, std::memory_order_relaxed);
@ -5225,14 +5222,14 @@ Pipe MergeTreeData::alterPartition(
case PartitionCommand::FREEZE_PARTITION:
{
auto lock = lockForShare(query_context->getCurrentQueryId(), query_context->getSettingsRef().lock_acquire_timeout);
current_command_results = freezePartition(command.partition, metadata_snapshot, command.with_name, query_context, lock);
current_command_results = freezePartition(command.partition, command.with_name, query_context, lock);
}
break;
case PartitionCommand::FREEZE_ALL_PARTITIONS:
{
auto lock = lockForShare(query_context->getCurrentQueryId(), query_context->getSettingsRef().lock_acquire_timeout);
current_command_results = freezeAll(command.with_name, metadata_snapshot, query_context, lock);
current_command_results = freezeAll(command.with_name, query_context, lock);
}
break;
@ -7103,27 +7100,6 @@ std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> MergeTreeData::cloneAn
scope_guard src_flushed_tmp_dir_lock;
MergeTreeData::MutableDataPartPtr src_flushed_tmp_part;
/// If source part is in memory, flush it to disk and clone it already in on-disk format
/// Protect tmp dir from removing by cleanup thread with src_flushed_tmp_dir_lock
/// Construct src_flushed_tmp_part in order to delete part with its directory at destructor
if (auto src_part_in_memory = asInMemoryPart(src_part))
{
auto flushed_part_path = *src_part_in_memory->getRelativePathForPrefix(tmp_part_prefix);
auto tmp_src_part_file_name = fs::path(tmp_dst_part_name).filename();
src_flushed_tmp_dir_lock = src_part->storage.getTemporaryPartDirectoryHolder(tmp_src_part_file_name);
auto flushed_part_storage = src_part_in_memory->flushToDisk(flushed_part_path, metadata_snapshot);
src_flushed_tmp_part = MergeTreeDataPartBuilder(*this, src_part->name, flushed_part_storage)
.withPartInfo(src_part->info)
.withPartFormatFromDisk()
.build();
src_flushed_tmp_part->is_temp = true;
src_part_storage = flushed_part_storage;
}
String with_copy;
if (params.copy_instead_of_hardlink)
with_copy = " (copying data)";
@ -7305,26 +7281,23 @@ MergeTreeData::MatcherFn MergeTreeData::getPartitionMatcher(const ASTPtr & parti
PartitionCommandsResultInfo MergeTreeData::freezePartition(
const ASTPtr & partition_ast,
const StorageMetadataPtr & metadata_snapshot,
const String & with_name,
ContextPtr local_context,
TableLockHolder &)
{
return freezePartitionsByMatcher(getPartitionMatcher(partition_ast, local_context), metadata_snapshot, with_name, local_context);
return freezePartitionsByMatcher(getPartitionMatcher(partition_ast, local_context), with_name, local_context);
}
PartitionCommandsResultInfo MergeTreeData::freezeAll(
const String & with_name,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr local_context,
TableLockHolder &)
{
return freezePartitionsByMatcher([] (const String &) { return true; }, metadata_snapshot, with_name, local_context);
return freezePartitionsByMatcher([] (const String &) { return true; }, with_name, local_context);
}
PartitionCommandsResultInfo MergeTreeData::freezePartitionsByMatcher(
MatcherFn matcher,
const StorageMetadataPtr & metadata_snapshot,
const String & with_name,
ContextPtr local_context)
{
@ -7376,22 +7349,6 @@ PartitionCommandsResultInfo MergeTreeData::freezePartitionsByMatcher(
scope_guard src_flushed_tmp_dir_lock;
MergeTreeData::MutableDataPartPtr src_flushed_tmp_part;
if (auto part_in_memory = asInMemoryPart(part))
{
auto flushed_part_path = *part_in_memory->getRelativePathForPrefix("tmp_freeze");
src_flushed_tmp_dir_lock = part->storage.getTemporaryPartDirectoryHolder("tmp_freeze" + part->name);
auto flushed_part_storage = part_in_memory->flushToDisk(flushed_part_path, metadata_snapshot);
src_flushed_tmp_part = MergeTreeDataPartBuilder(*this, part->name, flushed_part_storage)
.withPartInfo(part->info)
.withPartFormatFromDisk()
.build();
src_flushed_tmp_part->is_temp = true;
data_part_storage = flushed_part_storage;
}
auto callback = [this, &part, &backup_part_path](const DiskPtr & disk)
{
// Store metadata for replicated table.

View File

@ -23,9 +23,7 @@
#include <Storages/MergeTree/MergeList.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeDataPartBuilder.h>
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
#include <Storages/MergeTree/MergeTreePartsMover.h>
#include <Storages/MergeTree/MergeTreeWriteAheadLog.h>
#include <Storages/MergeTree/PinnedPartUUIDs.h>
#include <Storages/MergeTree/ZeroCopyLock.h>
#include <Storages/MergeTree/TemporaryParts.h>
@ -752,7 +750,6 @@ public:
*/
PartitionCommandsResultInfo freezePartition(
const ASTPtr & partition,
const StorageMetadataPtr & metadata_snapshot,
const String & with_name,
ContextPtr context,
TableLockHolder & table_lock_holder);
@ -760,7 +757,6 @@ public:
/// Freezes all parts.
PartitionCommandsResultInfo freezeAll(
const String & with_name,
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
TableLockHolder & table_lock_holder);
@ -1307,7 +1303,7 @@ protected:
bool isPrimaryOrMinMaxKeyColumnPossiblyWrappedInFunctions(const ASTPtr & node, const StorageMetadataPtr & metadata_snapshot) const;
/// Common part for |freezePartition()| and |freezeAll()|.
PartitionCommandsResultInfo freezePartitionsByMatcher(MatcherFn matcher, const StorageMetadataPtr & metadata_snapshot, const String & with_name, ContextPtr context);
PartitionCommandsResultInfo freezePartitionsByMatcher(MatcherFn matcher, const String & with_name, ContextPtr context);
PartitionCommandsResultInfo unfreezePartitionsByMatcher(MatcherFn matcher, const String & backup_name, ContextPtr context);
// Partition helpers

View File

@ -1,5 +1,4 @@
#include <Storages/MergeTree/MergeTreeDataPartBuilder.h>
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
#include <Storages/MergeTree/MergeTreeDataPartCompact.h>
#include <Storages/MergeTree/MergeTreeDataPartWide.h>
#include <Storages/MergeTree/DataPartStorageOnDiskFull.h>
@ -64,8 +63,6 @@ std::shared_ptr<IMergeTreeDataPart> MergeTreeDataPartBuilder::build()
return std::make_shared<MergeTreeDataPartWide>(data, name, *part_info, part_storage, parent_part);
case PartType::Compact:
return std::make_shared<MergeTreeDataPartCompact>(data, name, *part_info, part_storage, parent_part);
case PartType::InMemory:
return std::make_shared<MergeTreeDataPartInMemory>(data, name, *part_info, part_storage, parent_part);
default:
throw Exception(ErrorCodes::UNKNOWN_PART_TYPE,
"Unknown type of part {}", part_storage->getRelativePath());

View File

@ -1,115 +0,0 @@
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
#include <Storages/MergeTree/MergeTreeReaderInMemory.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Storages/MergeTree/IMergeTreeReader.h>
#include <Storages/MergeTree/LoadedMergeTreeDataPartInfoForReader.h>
#include <Storages/MergeTree/DataPartStorageOnDiskFull.h>
#include <DataTypes/NestedUtils.h>
#include <Disks/createVolume.h>
#include <Interpreters/Context.h>
#include <Poco/Logger.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
MergeTreeDataPartInMemory::MergeTreeDataPartInMemory(
const MergeTreeData & storage_,
const String & name_,
const MergeTreePartInfo & info_,
const MutableDataPartStoragePtr & data_part_storage_,
const IMergeTreeDataPart * parent_part_)
: IMergeTreeDataPart(storage_, name_, info_, data_part_storage_, Type::InMemory, parent_part_)
{
default_codec = CompressionCodecFactory::instance().get("NONE", {});
}
IMergeTreeDataPart::MergeTreeReaderPtr MergeTreeDataPartInMemory::getReader(
const NamesAndTypesList & columns_to_read,
const StorageSnapshotPtr & storage_snapshot,
const MarkRanges & mark_ranges,
const VirtualFields & virtual_fields,
UncompressedCache * /* uncompressed_cache */,
MarkCache * /* mark_cache */,
const AlterConversionsPtr & alter_conversions,
const MergeTreeReaderSettings & reader_settings,
const ValueSizeMap & /* avg_value_size_hints */,
const ReadBufferFromFileBase::ProfileCallback & /* profile_callback */) const
{
auto read_info = std::make_shared<LoadedMergeTreeDataPartInfoForReader>(shared_from_this(), alter_conversions);
auto ptr = std::static_pointer_cast<const MergeTreeDataPartInMemory>(shared_from_this());
return std::make_unique<MergeTreeReaderInMemory>(
read_info,
ptr,
columns_to_read,
virtual_fields,
storage_snapshot,
mark_ranges,
reader_settings);
}
IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartInMemory::getWriter(
const NamesAndTypesList &,
const StorageMetadataPtr &,
const std::vector<MergeTreeIndexPtr> &,
const Statistics &,
const CompressionCodecPtr &,
const MergeTreeWriterSettings &,
const MergeTreeIndexGranularity &)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "In-memory data parts are obsolete and no longer supported for writing");
}
MutableDataPartStoragePtr MergeTreeDataPartInMemory::flushToDisk(const String &, const StorageMetadataPtr &) const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "In-memory data parts are obsolete and no longer supported for writing");
}
DataPartStoragePtr MergeTreeDataPartInMemory::makeCloneInDetached(const String & prefix,
const StorageMetadataPtr & metadata_snapshot,
const DiskTransactionPtr & disk_transaction) const
{
if (disk_transaction)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "InMemory parts are not compatible with disk transactions");
String detached_path = *getRelativePathForDetachedPart(prefix, /* broken */ false);
return flushToDisk(detached_path, metadata_snapshot);
}
void MergeTreeDataPartInMemory::renameTo(const String & new_relative_path, bool /* remove_new_dir_if_exists */)
{
getDataPartStorage().setRelativePath(new_relative_path);
}
void MergeTreeDataPartInMemory::calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size) const
{
auto it = checksums.files.find("data.bin");
if (it != checksums.files.end())
total_size.data_uncompressed += it->second.uncompressed_size;
for (const auto & column : columns)
each_columns_size[column.name].data_uncompressed += block.getByName(column.name).column->byteSize();
}
IMergeTreeDataPart::Checksum MergeTreeDataPartInMemory::calculateBlockChecksum() const
{
SipHash hash;
IMergeTreeDataPart::Checksum checksum;
for (const auto & column : block)
column.column->updateHashFast(hash);
checksum.uncompressed_size = block.bytes();
checksum.uncompressed_hash = getSipHash128AsPair(hash);
return checksum;
}
DataPartInMemoryPtr asInMemoryPart(const MergeTreeDataPartPtr & part)
{
return std::dynamic_pointer_cast<const MergeTreeDataPartInMemory>(part);
}
}

View File

@ -1,70 +0,0 @@
#pragma once
#include <Storages/MergeTree/IMergeTreeDataPart.h>
namespace DB
{
class UncompressedCache;
class MergeTreeDataPartInMemory : public IMergeTreeDataPart
{
public:
MergeTreeDataPartInMemory(
const MergeTreeData & storage_,
const String & name_,
const MergeTreePartInfo & info_,
const MutableDataPartStoragePtr & data_part_storage_,
const IMergeTreeDataPart * parent_part_ = nullptr);
MergeTreeReaderPtr getReader(
const NamesAndTypesList & columns,
const StorageSnapshotPtr & storage_snapshot,
const MarkRanges & mark_ranges,
const VirtualFields & virtual_fields,
UncompressedCache * uncompressed_cache,
MarkCache * mark_cache,
const AlterConversionsPtr & alter_conversions,
const MergeTreeReaderSettings & reader_settings_,
const ValueSizeMap & avg_value_size_hints,
const ReadBufferFromFileBase::ProfileCallback & profile_callback) const override;
MergeTreeWriterPtr getWriter(
const NamesAndTypesList & columns_list,
const StorageMetadataPtr & metadata_snapshot,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
const Statistics & stats_to_recalc_,
const CompressionCodecPtr & default_codec_,
const MergeTreeWriterSettings & writer_settings,
const MergeTreeIndexGranularity & computed_index_granularity) override;
bool isStoredOnDisk() const override { return false; }
bool isStoredOnRemoteDisk() const override { return false; }
bool isStoredOnRemoteDiskWithZeroCopySupport() const override { return false; }
bool hasColumnFiles(const NameAndTypePair & column) const override { return !!getColumnPosition(column.getNameInStorage()); }
std::optional<String> getFileNameForColumn(const NameAndTypePair & /* column */) const override { return ""; }
void renameTo(const String & new_relative_path, bool remove_new_dir_if_exists) override;
DataPartStoragePtr makeCloneInDetached(const String & prefix, const StorageMetadataPtr & metadata_snapshot,
const DiskTransactionPtr & disk_transaction) const override;
std::optional<time_t> getColumnModificationTime(const String & /* column_name */) const override { return {}; }
MutableDataPartStoragePtr flushToDisk(const String & new_relative_path, const StorageMetadataPtr & metadata_snapshot) const;
/// Returns hash of parts's block
Checksum calculateBlockChecksum() const;
mutable Block block;
private:
mutable std::condition_variable is_merged;
/// Calculates uncompressed sizes in memory.
void calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size) const override;
};
using DataPartInMemoryPtr = std::shared_ptr<const MergeTreeDataPartInMemory>;
using MutableDataPartInMemoryPtr = std::shared_ptr<MergeTreeDataPartInMemory>;
DataPartInMemoryPtr asInMemoryPart(const MergeTreeDataPartPtr & part);
}

View File

@ -42,9 +42,6 @@ public:
/// Data of all columns is stored in one file. Marks are also stored in single file.
Compact,
/// Format with buffering data in RAM. Obsolete - new parts cannot be created in this format.
InMemory,
Unknown,
};

View File

@ -1,5 +1,8 @@
#pragma once
#include <Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h>
#include <Formats/MarkInCompressedFile.h>
namespace DB
{

View File

@ -648,18 +648,11 @@ MergeTreeDataWriter::TemporaryPart MergeTreeDataWriter::writeProjectionPartImpl(
const auto & metadata_snapshot = projection.metadata;
MergeTreeDataPartType part_type;
if (parent_part->getType() == MergeTreeDataPartType::InMemory)
{
part_type = MergeTreeDataPartType::InMemory;
}
else
{
/// Size of part would not be greater than block.bytes() + epsilon
size_t expected_size = block.bytes();
// just check if there is enough space on parent volume
data.reserveSpace(expected_size, parent_part->getDataPartStorage());
part_type = data.choosePartFormatOnDisk(expected_size, block.rows()).part_type;
}
auto new_data_part = parent_part->getProjectionPartBuilder(part_name, is_temp).withPartType(part_type).build();
auto projection_part_storage = new_data_part->getDataPartStoragePtr();

View File

@ -81,8 +81,6 @@ std::string MarkType::getFileExtension() const
return res + "2";
case MergeTreeDataPartType::Compact:
return res + "3";
case MergeTreeDataPartType::InMemory:
return "";
case MergeTreeDataPartType::Unknown:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown data part type");
}
@ -126,8 +124,6 @@ size_t MergeTreeIndexGranularityInfo::getMarkSizeInBytes(size_t columns_num) con
return mark_type.adaptive ? getAdaptiveMrkSizeWide() : getNonAdaptiveMrkSizeWide();
else if (mark_type.part_type == MergeTreeDataPartType::Compact)
return getAdaptiveMrkSizeCompact(columns_num);
else if (mark_type.part_type == MergeTreeDataPartType::InMemory)
return 0;
else
throw Exception(ErrorCodes::UNKNOWN_PART_TYPE, "Unknown part type");
}

View File

@ -1,119 +0,0 @@
#include <Storages/MergeTree/MergeTreeReaderInMemory.h>
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
#include <Interpreters/getColumnFromBlock.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/NestedUtils.h>
#include <Columns/ColumnArray.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_READ_ALL_DATA;
extern const int ARGUMENT_OUT_OF_BOUND;
}
MergeTreeReaderInMemory::MergeTreeReaderInMemory(
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read_,
DataPartInMemoryPtr data_part_,
NamesAndTypesList columns_,
const VirtualFields & virtual_fields_,
const StorageSnapshotPtr & storage_snapshot_,
MarkRanges mark_ranges_,
MergeTreeReaderSettings settings_)
: IMergeTreeReader(
data_part_info_for_read_,
columns_,
virtual_fields_,
storage_snapshot_,
nullptr,
nullptr,
mark_ranges_,
settings_,
{})
, part_in_memory(std::move(data_part_))
{
for (const auto & column_to_read : columns_to_read)
{
/// If array of Nested column is missing in part,
/// we have to read its offsets if they exist.
if (typeid_cast<const DataTypeArray *>(column_to_read.type.get())
&& !tryGetColumnFromBlock(part_in_memory->block, column_to_read))
{
if (auto offsets_position = findColumnForOffsets(column_to_read))
{
positions_for_offsets[column_to_read.name] = *data_part_info_for_read->getColumnPosition(offsets_position->first);
partially_read_columns.insert(column_to_read.name);
}
}
}
}
size_t MergeTreeReaderInMemory::readRows(
size_t from_mark, size_t /* current_task_last_mark */, bool continue_reading, size_t max_rows_to_read, Columns & res_columns)
{
if (!continue_reading)
total_rows_read = 0;
size_t total_marks = data_part_info_for_read->getIndexGranularity().getMarksCount();
if (from_mark >= total_marks)
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "Mark {} is out of bound. Max mark: {}",
toString(from_mark), toString(total_marks));
size_t num_columns = res_columns.size();
checkNumberOfColumns(num_columns);
size_t part_rows = part_in_memory->block.rows();
if (total_rows_read >= part_rows)
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Cannot read data in MergeTreeReaderInMemory. "
"Rows already read: {}. Rows in part: {}", total_rows_read, part_rows);
size_t rows_to_read = std::min(max_rows_to_read, part_rows - total_rows_read);
for (size_t i = 0; i < num_columns; ++i)
{
const auto & column_to_read = columns_to_read[i];
/// Copy offsets, if array of Nested column is missing in part.
auto offsets_it = positions_for_offsets.find(column_to_read.name);
if (offsets_it != positions_for_offsets.end() && !column_to_read.isSubcolumn())
{
const auto & source_offsets = assert_cast<const ColumnArray &>(
*part_in_memory->block.getByPosition(offsets_it->second).column).getOffsets();
if (res_columns[i] == nullptr)
res_columns[i] = column_to_read.type->createColumn();
auto mutable_column = res_columns[i]->assumeMutable();
auto & res_offstes = assert_cast<ColumnArray &>(*mutable_column).getOffsets();
size_t start_offset = total_rows_read ? source_offsets[total_rows_read - 1] : 0;
for (size_t row = 0; row < rows_to_read; ++row)
res_offstes.push_back(source_offsets[total_rows_read + row] - start_offset);
res_columns[i] = std::move(mutable_column);
}
else if (part_in_memory->hasColumnFiles(column_to_read))
{
auto block_column = getColumnFromBlock(part_in_memory->block, column_to_read);
if (rows_to_read == part_rows)
{
res_columns[i] = block_column;
}
else
{
if (res_columns[i] == nullptr)
res_columns[i] = column_to_read.type->createColumn();
auto mutable_column = res_columns[i]->assumeMutable();
mutable_column->insertRangeFrom(*block_column, total_rows_read, rows_to_read);
res_columns[i] = std::move(mutable_column);
}
}
}
total_rows_read += rows_to_read;
return rows_to_read;
}
}

View File

@ -1,40 +0,0 @@
#pragma once
#include <Core/NamesAndTypes.h>
#include <Storages/MergeTree/IMergeTreeReader.h>
namespace DB
{
class MergeTreeDataPartInMemory;
using DataPartInMemoryPtr = std::shared_ptr<const MergeTreeDataPartInMemory>;
/// Reader for InMemory parts
class MergeTreeReaderInMemory : public IMergeTreeReader
{
public:
MergeTreeReaderInMemory(
MergeTreeDataPartInfoForReaderPtr data_part_info_for_read_,
DataPartInMemoryPtr data_part_,
NamesAndTypesList columns_,
const VirtualFields & virtual_fields_,
const StorageSnapshotPtr & storage_snapshot_,
MarkRanges mark_ranges_,
MergeTreeReaderSettings settings_);
/// Return the number of rows has been read or zero if there is no columns to read.
/// If continue_reading is true, continue reading from last state, otherwise seek to from_mark
size_t readRows(size_t from_mark, size_t current_tasl_last_mark,
bool continue_reading, size_t max_rows_to_read, Columns & res_columns) override;
bool canReadIncompleteGranules() const override { return true; }
private:
size_t total_rows_read = 0;
DataPartInMemoryPtr part_in_memory;
std::unordered_map<String, size_t> positions_for_offsets;
};
}

View File

@ -1,5 +1,4 @@
#include <Storages/MergeTree/MergeTreeSink.h>
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
#include <Storages/StorageMergeTree.h>
#include <Interpreters/PartLog.h>
#include <DataTypes/ObjectUtils.h>

View File

@ -1,336 +0,0 @@
#include <Storages/MergeTree/MergeTreeWriteAheadLog.h>
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeDataWriter.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <Storages/MergeTree/MergeTreeDataPartState.h>
#include <Storages/MergeTree/DataPartStorageOnDiskFull.h>
#include <IO/MemoryReadWriteBuffer.h>
#include <IO/ReadHelpers.h>
#include <IO/copyData.h>
#include <Interpreters/Context.h>
#include <Common/logger_useful.h>
#include <Poco/JSON/JSON.h>
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Stringifier.h>
#include <Poco/JSON/Parser.h>
#include <sys/time.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_FORMAT_VERSION;
extern const int CANNOT_READ_ALL_DATA;
extern const int BAD_DATA_PART_NAME;
extern const int CORRUPTED_DATA;
}
MergeTreeWriteAheadLog::MergeTreeWriteAheadLog(
MergeTreeData & storage_,
const DiskPtr & disk_,
const String & name_)
: storage(storage_)
, disk(disk_)
, name(name_)
, path(storage.getRelativeDataPath() + name_)
, pool(storage.getContext()->getSchedulePool())
, log(getLogger(storage.getLogName() + " (WriteAheadLog)"))
{
init();
sync_task = pool.createTask("MergeTreeWriteAheadLog::sync", [this]
{
std::lock_guard lock(write_mutex);
out->sync();
sync_scheduled = false;
sync_cv.notify_all();
});
}
MergeTreeWriteAheadLog::~MergeTreeWriteAheadLog()
{
try
{
shutdown();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
void MergeTreeWriteAheadLog::dropAllWriteAheadLogs(DiskPtr disk_to_drop, std::string relative_data_path)
{
std::vector<std::string> files;
disk_to_drop->listFiles(relative_data_path, files);
for (const auto & file : files)
{
if (file.starts_with(WAL_FILE_NAME))
disk_to_drop->removeFile(fs::path(relative_data_path) / file);
}
}
void MergeTreeWriteAheadLog::init()
{
out = disk->writeFile(path, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Append);
/// Small hack: in NativeWriter header is used only in `getHeader` method.
/// To avoid complex logic of changing it during ALTERs we leave it empty.
block_out = std::make_unique<NativeWriter>(*out, 0, Block{});
min_block_number = std::numeric_limits<Int64>::max();
max_block_number = -1;
bytes_at_last_sync = 0;
}
void MergeTreeWriteAheadLog::dropPart(const String & part_name)
{
std::unique_lock lock(write_mutex);
writeIntBinary(WAL_VERSION, *out);
ActionMetadata metadata{};
metadata.write(*out);
writeIntBinary(static_cast<UInt8>(ActionType::DROP_PART), *out);
writeStringBinary(part_name, *out);
out->next();
}
void MergeTreeWriteAheadLog::rotate(const std::unique_lock<std::mutex> &)
{
String new_name = String(WAL_FILE_NAME) + "_"
+ toString(min_block_number) + "_"
+ toString(max_block_number) + WAL_FILE_EXTENSION;
/// Finalize stream before file rename
out->finalize();
disk->replaceFile(path, storage.getRelativeDataPath() + new_name);
init();
}
MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore(
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
DataPartsLock & parts_lock,
bool readonly)
{
std::unique_lock lock(write_mutex);
MergeTreeData::MutableDataPartsVector parts;
auto in = disk->readFile(path);
NativeReader block_in(*in, 0);
NameSet dropped_parts;
while (!in->eof())
{
MergeTreeData::MutableDataPartPtr part;
UInt8 version;
String part_name;
Block block;
ActionType action_type;
try
{
ActionMetadata metadata;
readIntBinary(version, *in);
if (version > 0)
{
metadata.read(*in);
}
readIntBinary(action_type, *in);
readStringBinary(part_name, *in);
if (action_type == ActionType::DROP_PART)
{
dropped_parts.insert(part_name);
}
else if (action_type == ActionType::ADD_PART)
{
auto single_disk_volume = std::make_shared<SingleDiskVolume>("volume_" + part_name, disk, 0);
part = storage.getDataPartBuilder(part_name, single_disk_volume, part_name)
.withPartType(MergeTreeDataPartType::InMemory)
.withPartStorageType(MergeTreeDataPartStorageType::Full)
.build();
part->uuid = metadata.part_uuid;
block = block_in.read();
if (storage.getActiveContainingPart(part->info, MergeTreeDataPartState::Active, parts_lock))
continue;
}
else
{
throw Exception(ErrorCodes::CORRUPTED_DATA, "Unknown action type: {}", toString(static_cast<UInt8>(action_type)));
}
}
catch (const Exception & e)
{
if (e.code() == ErrorCodes::CANNOT_READ_ALL_DATA
|| e.code() == ErrorCodes::UNKNOWN_FORMAT_VERSION
|| e.code() == ErrorCodes::BAD_DATA_PART_NAME
|| e.code() == ErrorCodes::CORRUPTED_DATA)
{
LOG_WARNING(log, "WAL file '{}' is broken. {}", path, e.displayText());
/// If file is broken, do not write new parts to it.
/// But if it contains any part rotate and save them.
if (max_block_number == -1)
{
if (!readonly)
disk->removeFile(path);
}
else if (name == DEFAULT_WAL_FILE_NAME)
rotate(lock);
break;
}
throw;
}
if (action_type == ActionType::ADD_PART)
{
MergedBlockOutputStream part_out(
part,
metadata_snapshot,
block.getNamesAndTypesList(),
{}, {},
CompressionCodecFactory::instance().get("NONE", {}),
NO_TRANSACTION_PTR);
part->minmax_idx->update(block, storage.getMinMaxColumnsNames(metadata_snapshot->getPartitionKey()));
part->partition.create(metadata_snapshot, block, 0, context);
part->setColumns(block.getNamesAndTypesList(), {}, metadata_snapshot->getMetadataVersion());
if (metadata_snapshot->hasSortingKey())
metadata_snapshot->getSortingKey().expression->execute(block);
part_out.write(block);
for (const auto & projection : metadata_snapshot->getProjections())
{
auto projection_block = projection.calculate(block, context);
auto temp_part = MergeTreeDataWriter::writeProjectionPart(storage, log, projection_block, projection, part.get());
temp_part.finalize();
if (projection_block.rows())
part->addProjectionPart(projection.name, std::move(temp_part.part));
}
part_out.finalizePart(part, false);
min_block_number = std::min(min_block_number, part->info.min_block);
max_block_number = std::max(max_block_number, part->info.max_block);
parts.push_back(std::move(part));
}
}
MergeTreeData::MutableDataPartsVector result;
std::copy_if(parts.begin(), parts.end(), std::back_inserter(result),
[&dropped_parts](const auto & part) { return dropped_parts.count(part->name) == 0; });
/// All parts in WAL had been already committed into the disk -> clear the WAL
if (!readonly && result.empty())
{
LOG_DEBUG(log, "WAL file '{}' had been completely processed. Removing.", path);
disk->removeFile(path);
init();
return {};
}
return result;
}
void MergeTreeWriteAheadLog::shutdown()
{
{
std::unique_lock lock(write_mutex);
if (shutdown_called)
return;
if (sync_scheduled)
sync_cv.wait(lock, [this] { return !sync_scheduled; });
shutdown_called = true;
out->finalize();
out.reset();
}
/// Do it without lock, otherwise inversion between pool lock and write_mutex is possible
sync_task->deactivate();
}
std::optional<MergeTreeWriteAheadLog::MinMaxBlockNumber>
MergeTreeWriteAheadLog::tryParseMinMaxBlockNumber(const String & filename)
{
Int64 min_block;
Int64 max_block;
ReadBufferFromString in(filename);
if (!checkString(WAL_FILE_NAME, in)
|| !checkChar('_', in)
|| !tryReadIntText(min_block, in)
|| !checkChar('_', in)
|| !tryReadIntText(max_block, in))
{
return {};
}
return std::make_pair(min_block, max_block);
}
String MergeTreeWriteAheadLog::ActionMetadata::toJSON() const
{
Poco::JSON::Object json;
if (part_uuid != UUIDHelpers::Nil)
json.set(JSON_KEY_PART_UUID, toString(part_uuid));
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss.exceptions(std::ios::failbit);
json.stringify(oss);
return oss.str();
}
void MergeTreeWriteAheadLog::ActionMetadata::fromJSON(const String & buf)
{
Poco::JSON::Parser parser;
auto json = parser.parse(buf).extract<Poco::JSON::Object::Ptr>();
if (json->has(JSON_KEY_PART_UUID))
part_uuid = parseFromString<UUID>(json->getValue<std::string>(JSON_KEY_PART_UUID));
}
void MergeTreeWriteAheadLog::ActionMetadata::read(ReadBuffer & meta_in)
{
readIntBinary(min_compatible_version, meta_in);
if (min_compatible_version > WAL_VERSION)
throw Exception(ErrorCodes::UNKNOWN_FORMAT_VERSION,
"WAL metadata version {} is not compatible with this ClickHouse version",
toString(min_compatible_version));
size_t metadata_size;
readVarUInt(metadata_size, meta_in);
if (metadata_size == 0)
return;
String buf(metadata_size, ' ');
meta_in.readStrict(buf.data(), metadata_size);
fromJSON(buf);
}
void MergeTreeWriteAheadLog::ActionMetadata::write(WriteBuffer & meta_out) const
{
writeIntBinary(min_compatible_version, meta_out);
String ser_meta = toJSON();
writeVarUInt(static_cast<UInt32>(ser_meta.length()), meta_out);
writeString(ser_meta, meta_out);
}
}

View File

@ -1,105 +0,0 @@
#pragma once
#include <Formats/NativeReader.h>
#include <Formats/NativeWriter.h>
#include <Core/BackgroundSchedulePool.h>
#include <Disks/IDisk.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
namespace DB
{
class MergeTreeData;
struct DataPartsLock;
/** WAL stores addditions and removals of data parts in in-memory format.
* Format of data in WAL:
* - version
* - type of action (ADD or DROP)
* - part name
* - part's block in Native format. (for ADD action)
*/
class MergeTreeWriteAheadLog
{
public:
/// Append-only enum. It is serialized to WAL
enum class ActionType : UInt8
{
ADD_PART = 0,
DROP_PART = 1,
};
struct ActionMetadata
{
/// The minimum version of WAL reader that can understand metadata written by current ClickHouse version.
/// This field must be increased when making backwards incompatible changes.
///
/// The same approach can be used recursively inside metadata.
UInt8 min_compatible_version = 0;
/// Actual metadata.
UUID part_uuid = UUIDHelpers::Nil;
void write(WriteBuffer & meta_out) const;
void read(ReadBuffer & meta_in);
private:
static constexpr auto JSON_KEY_PART_UUID = "part_uuid";
String toJSON() const;
void fromJSON(const String & buf);
};
constexpr static UInt8 WAL_VERSION = 1;
constexpr static auto WAL_FILE_NAME = "wal";
constexpr static auto WAL_FILE_EXTENSION = ".bin";
constexpr static auto DEFAULT_WAL_FILE_NAME = "wal.bin";
MergeTreeWriteAheadLog(MergeTreeData & storage_, const DiskPtr & disk_,
const String & name = DEFAULT_WAL_FILE_NAME);
~MergeTreeWriteAheadLog();
void dropPart(const String & part_name);
std::vector<MergeTreeMutableDataPartPtr> restore(
const StorageMetadataPtr & metadata_snapshot,
ContextPtr context,
DataPartsLock & parts_lock,
bool readonly);
using MinMaxBlockNumber = std::pair<Int64, Int64>;
static std::optional<MinMaxBlockNumber> tryParseMinMaxBlockNumber(const String & filename);
void shutdown();
/// Drop all write ahead logs from disk. Useful during table drop.
static void dropAllWriteAheadLogs(DiskPtr disk_to_drop, std::string relative_data_path);
private:
void init();
void rotate(const std::unique_lock<std::mutex> & lock);
const MergeTreeData & storage;
DiskPtr disk;
String name;
String path;
std::unique_ptr<WriteBuffer> out;
std::unique_ptr<NativeWriter> block_out;
Int64 min_block_number = std::numeric_limits<Int64>::max();
Int64 max_block_number = -1;
BackgroundSchedulePool & pool;
BackgroundSchedulePoolTaskHolder sync_task;
std::condition_variable sync_cv;
size_t bytes_at_last_sync = 0;
bool sync_scheduled = false;
bool shutdown_called = false;
mutable std::mutex write_mutex;
LoggerPtr log;
};
}

View File

@ -1235,13 +1235,7 @@ void PartMergerWriter::prepare()
for (size_t i = 0, size = ctx->projections_to_build.size(); i < size; ++i)
{
// If the parent part is an in-memory part, squash projection output into one block and
// build in-memory projection because we don't support merging into a new in-memory part.
// Otherwise we split the materialization into multiple stages similar to the process of
// INSERT SELECT query.
if (ctx->new_data_part->getType() == MergeTreeDataPartType::InMemory)
projection_squashes.emplace_back(0, 0);
else
// We split the materialization into multiple stages similar to the process of INSERT SELECT query.
projection_squashes.emplace_back(settings.min_insert_block_size_rows, settings.min_insert_block_size_bytes);
}
}

View File

@ -1350,9 +1350,6 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
auto part = data.getPartIfExists(name, {MergeTreeDataPartState::PreActive, MergeTreeDataPartState::Active, MergeTreeDataPartState::Outdated});
if (part)
{
if (auto part_in_memory = asInMemoryPart(part))
sum_parts_size_in_bytes += part_in_memory->block.bytes();
else
sum_parts_size_in_bytes += part->getBytesOnDisk();
if (entry.type == LogEntry::MUTATE_PART && !storage.mutation_backoff_policy.partCanBeMutated(part->name))

View File

@ -7,7 +7,6 @@
#include <Storages/MergeTree/MergeTreeIndexGranularity.h>
#include <Storages/MergeTree/checkDataPart.h>
#include <Storages/MergeTree/MergeTreeDataPartCompact.h>
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
#include <Storages/MergeTree/IDataPartStorage.h>
#include <Interpreters/Cache/FileCache.h>
#include <Interpreters/Cache/FileCacheFactory.h>
@ -310,22 +309,11 @@ static IMergeTreeDataPart::Checksums checkDataPart(
return checksums_data;
}
IMergeTreeDataPart::Checksums checkDataPartInMemory(const DataPartInMemoryPtr & data_part)
{
IMergeTreeDataPart::Checksums data_checksums;
data_checksums.files["data.bin"] = data_part->calculateBlockChecksum();
data_part->checksums.checkEqual(data_checksums, true);
return data_checksums;
}
IMergeTreeDataPart::Checksums checkDataPart(
MergeTreeData::DataPartPtr data_part,
bool require_checksums,
std::function<bool()> is_cancelled)
{
if (auto part_in_memory = asInMemoryPart(data_part))
return checkDataPartInMemory(part_in_memory);
/// If check of part has failed and it is stored on disk with cache
/// try to drop cache and check it once again because maybe the cache
/// is broken not the part itself.

View File

@ -1,2 +0,0 @@
clickhouse_add_executable (wal_action_metadata wal_action_metadata.cpp)
target_link_libraries (wal_action_metadata PRIVATE dbms)

View File

@ -1,61 +0,0 @@
#include <iostream>
#include <IO/MemoryReadWriteBuffer.h>
#include <Storages/MergeTree/MergeTreeWriteAheadLog.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_FORMAT_VERSION;
}
}
int main(int, char **)
{
try
{
{
std::cout << "test: dummy test" << std::endl;
DB::MergeTreeWriteAheadLog::ActionMetadata metadata_out;
DB::MemoryWriteBuffer buf{};
metadata_out.write(buf);
buf.finalize();
metadata_out.read(*buf.tryGetReadBuffer());
}
{
std::cout << "test: min compatibility" << std::endl;
DB::MergeTreeWriteAheadLog::ActionMetadata metadata_out;
metadata_out.min_compatible_version = DB::MergeTreeWriteAheadLog::WAL_VERSION + 1;
DB::MemoryWriteBuffer buf{};
metadata_out.write(buf);
buf.finalize();
try
{
metadata_out.read(*buf.tryGetReadBuffer());
}
catch (const DB::Exception & e)
{
if (e.code() != DB::ErrorCodes::UNKNOWN_FORMAT_VERSION)
{
std::cerr << "Expected UNKNOWN_FORMAT_VERSION exception but got: "
<< e.what() << ", " << e.displayText() << std::endl;
}
}
}
}
catch (const DB::Exception & e)
{
std::cerr << e.what() << ", " << e.displayText() << std::endl;
return 1;
}
return 0;
}

View File

@ -16,6 +16,7 @@
#include <Common/Exception.h>
#include <Core/ColumnsWithTypeAndName.h>
#include <Interpreters/JoinUtils.h>
#include <Formats/NativeWriter.h>
#include <Compression/CompressedWriteBuffer.h>
#include <Processors/ISource.h>
@ -25,6 +26,7 @@
#include <Poco/String.h>
#include <filesystem>
namespace fs = std::filesystem;
namespace DB

View File

@ -19,6 +19,8 @@
#include <Processors/QueryPlan/ReadFromMemoryStorageStep.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Parsers/ASTCreateQuery.h>
#include <Formats/NativeReader.h>
#include <Formats/NativeWriter.h>
#include <Common/FileChecker.h>
#include <Compression/CompressedReadBuffer.h>

View File

@ -37,7 +37,6 @@
#include <Storages/AlterCommands.h>
#include <Storages/PartitionCommands.h>
#include <Storages/MergeTree/MergeTreeSink.h>
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
#include <Storages/MergeTree/MergePlainMergeTreeTask.h>
#include <Storages/MergeTree/PartitionPruner.h>
#include <Storages/MergeTree/MergeList.h>

View File

@ -685,7 +685,6 @@ PartsCommitted
PartsCompact
PartsDeleteOnDestroy
PartsDeleting
PartsInMemory
PartsOutdated
PartsPreActive
PartsPreCommitted