in-memory parts: several fixes

This commit is contained in:
Anton Popov 2020-06-05 23:47:46 +03:00
parent b312ac9786
commit 66e31d4311
13 changed files with 35 additions and 22 deletions

View File

@ -135,7 +135,7 @@ void Service::processQuery(const Poco::Net::HTMLForm & params, ReadBuffer & /*bo
void Service::sendPartFromMemory(const MergeTreeData::DataPartPtr & part, WriteBuffer & out)
{
const auto * part_in_memory = dynamic_cast<const MergeTreeDataPartInMemory *>(part.get());
auto part_in_memory = asInMemoryPart(part);
if (!part_in_memory)
throw Exception("Part " + part->name + " is not stored in memory", ErrorCodes::LOGICAL_ERROR);

View File

@ -95,9 +95,6 @@ public:
virtual bool supportsVerticalMerge() const { return false; }
virtual bool waitUntilMerged(size_t /* timeout */) const { return true; }
virtual void notifyMerged() const {}
/// NOTE: Returns zeros if column files are not found in checksums.
/// Otherwise return information about column size on disk.
ColumnSize getColumnSize(const String & column_name, const IDataType & /* type */) const;

View File

@ -32,14 +32,14 @@ void MergeTreeBlockOutputStream::write(const Block & block)
PartLog::addNewPart(storage.global_context, part, watch.elapsed());
if (auto * part_in_memory = dynamic_cast<MergeTreeDataPartInMemory *>(part.get()))
if (auto part_in_memory = asInMemoryPart(part))
{
storage.in_memory_merges_throttler.add(part_in_memory->block.bytes(), part_in_memory->rows_count);
auto settings = storage.getSettings();
if (settings->in_memory_parts_insert_sync)
{
if (!part->waitUntilMerged(in_memory_parts_timeout))
if (!part_in_memory->waitUntilMerged(in_memory_parts_timeout))
throw Exception("Timeout exceeded while waiting to write part "
+ part->name + " on disk", ErrorCodes::TIMEOUT_EXCEEDED);
}

View File

@ -1941,7 +1941,7 @@ void MergeTreeData::renameTempPartAndReplace(
addPartContributionToColumnSizes(part);
}
auto * part_in_memory = dynamic_cast<MergeTreeDataPartInMemory *>(part.get());
auto part_in_memory = asInMemoryPart(part);
if (part_in_memory && getSettings()->in_memory_parts_enable_wal)
{
auto wal = getWriteAheadLog();
@ -3271,7 +3271,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeData::cloneAndLoadDataPartOnSameDisk(
throw Exception("Part in " + fullPath(disk, dst_part_path) + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
/// If source part is in memory, flush it to disk and clone it already in on-disk format
if (const auto * src_part_in_memory = dynamic_cast<const MergeTreeDataPartInMemory *>(src_part.get()))
if (auto src_part_in_memory = asInMemoryPart(src_part))
{
const auto & src_relative_data_path = src_part_in_memory->storage.relative_data_path;
auto flushed_part_path = src_part_in_memory->getRelativePathForPrefix(tmp_part_prefix);
@ -3367,7 +3367,7 @@ void MergeTreeData::freezePartitionsByMatcher(MatcherFn matcher, const String &
LOG_DEBUG(log, "Freezing part {} snapshot will be placed at {}", part->name, backup_path);
String backup_part_path = backup_path + relative_data_path + part->relative_path;
if (const auto * part_in_memory = dynamic_cast<const MergeTreeDataPartInMemory *>(part.get()))
if (auto part_in_memory = asInMemoryPart(part))
part_in_memory->flushToDisk(backup_path + relative_data_path, part->relative_path);
else
localBackup(part->volume->getDisk(), part->getFullRelativePath(), backup_part_path);

View File

@ -1455,7 +1455,7 @@ NamesAndTypesList MergeTreeDataMergerMutator::getColumnsForNewDataPart(
{
/// In compact parts we read all columns, because they all stored in a
/// single file
if (isCompactPart(source_part))
if (!isWidePart(source_part))
return updated_header.getNamesAndTypesList();
NameSet removed_columns;

View File

@ -13,7 +13,6 @@ namespace DB
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int DIRECTORY_ALREADY_EXISTS;
}
@ -124,4 +123,10 @@ void MergeTreeDataPartInMemory::calculateEachColumnSizesOnDisk(ColumnSizeByName
each_columns_size[column.name].data_uncompressed += block.getByName(column.name).column->byteSize();
}
DataPartInMemoryPtr asInMemoryPart(const MergeTreeDataPartPtr & part)
{
return std::dynamic_pointer_cast<const MergeTreeDataPartInMemory>(part);
}
}

View File

@ -45,8 +45,8 @@ public:
void flushToDisk(const String & base_path, const String & new_relative_path) const;
bool waitUntilMerged(size_t timeout) const override;
void notifyMerged() const override;
bool waitUntilMerged(size_t timeout) const;
void notifyMerged() const;
mutable Block block;
@ -58,5 +58,6 @@ private:
};
using DataPartInMemoryPtr = std::shared_ptr<const MergeTreeDataPartInMemory>;
DataPartInMemoryPtr asInMemoryPart(const MergeTreeDataPartPtr & part);
}

View File

@ -79,6 +79,11 @@ static MergeTreeDataPartChecksum createUncompressedChecksum(size_t size, SipHash
void MergeTreeDataPartWriterInMemory::finishDataSerialization(IMergeTreeDataPart::Checksums & checksums)
{
/// If part is empty we still need to initialize block by empty columns.
if (!part_in_memory->block)
for (const auto & column : columns_list)
part_in_memory->block.insert(ColumnWithTypeAndName{column.type, column.name});
SipHash hash;
for (const auto & column : part_in_memory->block)
column.column->updateHashFast(hash);

View File

@ -1007,7 +1007,7 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
auto part = data.getPartIfExists(name, {MergeTreeDataPartState::PreCommitted, MergeTreeDataPartState::Committed, MergeTreeDataPartState::Outdated});
if (part)
{
if (const auto * part_in_memory = dynamic_cast<const MergeTreeDataPartInMemory *>(part.get()))
if (auto part_in_memory = asInMemoryPart(part))
sum_parts_size_in_bytes += part_in_memory->block.bytes();
else
sum_parts_size_in_bytes += part->getBytesOnDisk();

View File

@ -681,11 +681,11 @@ bool StorageMergeTree::merge(
auto lock = lockParts();
for (const auto & part : future_part.parts)
{
part->notifyMerged();
if (isInMemoryPart(part))
if (auto part_in_memory = asInMemoryPart(part))
{
modifyPartState(part, DataPartState::Deleting);
parts_to_remove_immediately.push_back(part);
part_in_memory->notifyMerged();
modifyPartState(part_in_memory, DataPartState::Deleting);
parts_to_remove_immediately.push_back(part_in_memory);
}
}
}

View File

@ -1089,11 +1089,11 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
DataPartsVector parts_to_remove_immediatly;
for (const auto & part_ptr : parts)
{
part_ptr->notifyMerged();
if (isInMemoryPart(part_ptr))
if (auto part_in_memory = asInMemoryPart(part_ptr))
{
modifyPartState(part_ptr, DataPartState::Deleting);
parts_to_remove_immediatly.push_back(part_ptr);
part_in_memory->notifyMerged();
modifyPartState(part_in_memory, DataPartState::Deleting);
parts_to_remove_immediatly.push_back(part_in_memory);
}
}

View File

@ -36,3 +36,4 @@ Mutations and Alters
4 [4,16]
5 []
7 [7,49]
0

View File

@ -39,4 +39,8 @@ ALTER TABLE in_memory DROP COLUMN str;
SELECT * FROM in_memory ORDER BY a LIMIT 5;
-- in-memory parts works if they're empty.
ALTER TABLE in_memory DELETE WHERE 1;
SELECT count() FROM in_memory;
DROP TABLE in_memory;