flush all InMemoryDataParts when wal is not enabled

This commit is contained in:
nautaa 2021-12-14 16:31:17 +08:00
parent fc4e947f45
commit 0da3881ff4
7 changed files with 57 additions and 1 deletions

View File

@ -511,7 +511,7 @@ public:
virtual void shutdown() {}
/// Called before shutdown() to flush data to underlying storage
/// (for Buffer)
/// Data in memory need to be persistent
virtual void flush() {}
/// Asks table to stop executing some action identified by action_type

View File

@ -1525,6 +1525,24 @@ void MergeTreeData::removePartsFinally(const MergeTreeData::DataPartsVector & pa
}
}
void MergeTreeData::flushAllInMemoryPartsIfNeeded()
{
if(getSettings()->in_memory_parts_enable_wal)
return ;
auto metadata_snapshot = getInMemoryMetadataPtr();
DataPartsVector parts = getDataPartsVector();
for (const auto & part : parts)
{
if (auto part_in_memory = asInMemoryPart(part))
{
const auto & storage_relative_path = part_in_memory->storage.relative_data_path;
part_in_memory->flushToDisk(storage_relative_path, part_in_memory->relative_path, metadata_snapshot);
}
}
}
size_t MergeTreeData::clearOldPartsFromFilesystem(bool force)
{
DataPartsVector parts_to_remove = grabOldParts(force);

View File

@ -548,6 +548,9 @@ public:
/// Removes parts from data_parts, they should be in Deleting state
void removePartsFinally(const DataPartsVector & parts);
/// When WAL is not enabled, the InMemoryParts need to be persistent.
void flushAllInMemoryPartsIfNeeded();
/// Delete irrelevant parts from memory and disk.
/// If 'force' - don't wait for old_parts_lifetime.
size_t clearOldPartsFromFilesystem(bool force = false);

View File

@ -139,6 +139,10 @@ void StorageMergeTree::startup()
}
}
void StorageMergeTree::flush()
{
flushAllInMemoryPartsIfNeeded();
}
void StorageMergeTree::shutdown()
{

View File

@ -31,6 +31,7 @@ class StorageMergeTree final : public shared_ptr_helper<StorageMergeTree>, publi
friend struct shared_ptr_helper<StorageMergeTree>;
public:
void startup() override;
void flush() override;
void shutdown() override;
~StorageMergeTree() override;

View File

@ -0,0 +1,4 @@
before DETACH TABLE
500
after DETACH TABLE
500

View File

@ -0,0 +1,26 @@
DROP TABLE IF EXISTS mem_part_flush;
CREATE TABLE mem_part_flush
(
`key` UInt32,
`ts` DateTime,
`db_time` DateTime DEFAULT now()
)
ENGINE = MergeTree
ORDER BY (key, ts)
SETTINGS min_rows_for_compact_part = 1000000, min_bytes_for_compact_part = 200000000, in_memory_parts_enable_wal = 0;
INSERT INTO mem_part_flush(key, ts) SELECT number % 1000, now() + intDiv(number,1000) FROM numbers(500);
SELECT 'before DETACH TABLE';
SELECT count(*) FROM mem_part_flush;
DETACH TABLE mem_part_flush;
ATTACH TABLE mem_part_flush;
SELECT 'after DETACH TABLE';
SELECT count(*) FROM mem_part_flush;
DROP TABLE mem_part_flush;