in-memory parts: better restore and clear stale wal files

This commit is contained in:
Anton Popov 2020-05-27 23:05:55 +03:00
parent 6d8749b100
commit d8342e5b12
15 changed files with 213 additions and 48 deletions

View File

@ -276,13 +276,13 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchPart(
if (server_protocol_version >= REPLICATION_PROTOCOL_VERSION_WITH_PARTS_SIZE)
readStringBinary(part_type, in);
return part_type == "InMemory" ? downloadPartToMemory(part_name, replica_path, in)
return part_type == "InMemory" ? downloadPartToMemory(part_name, std::move(reservation), in)
: downloadPartToDisk(part_name, replica_path, to_detached, tmp_prefix_, std::move(reservation), in);
}
MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
const String & part_name,
const String & /* replica_path */,
ReservationPtr reservation,
PooledReadWriteBufferFromHTTP & in)
{
MergeTreeData::DataPart::Checksums checksums;
@ -292,7 +292,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
NativeBlockInputStream block_in(in, 0);
auto block = block_in.read();
MergeTreeData::MutableDataPartPtr new_data_part =
std::make_shared<MergeTreeDataPartInMemory>(data, part_name, nullptr);
std::make_shared<MergeTreeDataPartInMemory>(data, part_name, reservation->getDisk());
new_data_part->is_temp = true;
new_data_part->setColumns(block.getNamesAndTypesList());

View File

@ -78,7 +78,7 @@ private:
MergeTreeData::MutableDataPartPtr downloadPartToMemory(
const String & part_name,
const String & replica_path,
ReservationPtr reservation,
PooledReadWriteBufferFromHTTP & in);
MergeTreeData & data;

View File

@ -380,8 +380,6 @@ String IMergeTreeDataPart::getColumnNameWithMinumumCompressedSize() const
String IMergeTreeDataPart::getFullPath() const
{
// assertOnDisk(); //TODO
if (relative_path.empty())
throw Exception("Part relative_path cannot be empty. It's bug.", ErrorCodes::LOGICAL_ERROR);
@ -390,8 +388,6 @@ String IMergeTreeDataPart::getFullPath() const
String IMergeTreeDataPart::getFullRelativePath() const
{
// assertOnDisk(); //TODO
if (relative_path.empty())
throw Exception("Part relative_path cannot be empty. It's bug.", ErrorCodes::LOGICAL_ERROR);

View File

@ -49,12 +49,13 @@ void MergeTreeBlockOutputStream::write(const Block & block)
storage.merging_mutating_task_handle->wake();
}
return;
continue;
}
/// Initiate async merge - it will be done if it's good time for merge and if there are space in 'background_pool'.
if (storage.merging_mutating_task_handle)
else if (storage.merging_mutating_task_handle)
{
/// Initiate async merge - it will be done if it's good time for merge and if there are space in 'background_pool'.
storage.merging_mutating_task_handle->wake();
}
}
}

View File

@ -58,6 +58,7 @@
#include <typeinfo>
#include <typeindex>
#include <unordered_set>
#include <algorithm>
namespace ProfileEvents
@ -1046,6 +1047,9 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
for (auto & part : parts_from_wal)
{
if (getActiveContainingPart(part->info, DataPartState::Committed, part_lock))
continue;
part->modification_time = time(nullptr);
/// Assume that all parts are Committed, covered parts will be detected and marked as Outdated later
part->state = DataPartState::Committed;
@ -1322,6 +1326,61 @@ void MergeTreeData::clearPartsFromFilesystem(const DataPartsVector & parts_to_re
}
}
void MergeTreeData::clearOldWriteAheadLogs()
{
DataPartsVector parts = getDataPartsVector();
std::vector<std::pair<Int64, Int64>> all_block_numbers_on_disk;
std::vector<std::pair<Int64, Int64>> block_numbers_on_disk;
for (const auto & part : parts)
if (part->isStoredOnDisk())
all_block_numbers_on_disk.emplace_back(part->info.min_block, part->info.max_block);
if (all_block_numbers_on_disk.empty())
return;
std::sort(all_block_numbers_on_disk.begin(), all_block_numbers_on_disk.end());
block_numbers_on_disk.push_back(all_block_numbers_on_disk[0]);
for (size_t i = 1; i < all_block_numbers_on_disk.size(); ++i)
{
if (all_block_numbers_on_disk[i].first == all_block_numbers_on_disk[i - 1].second + 1)
block_numbers_on_disk.back().second = all_block_numbers_on_disk[i].second;
else
block_numbers_on_disk.push_back(all_block_numbers_on_disk[i]);
}
auto is_range_on_disk = [&block_numbers_on_disk](Int64 min_block, Int64 max_block)
{
auto lower = std::upper_bound(block_numbers_on_disk.begin(), block_numbers_on_disk.end(), std::make_pair(min_block, -1L));
if (lower != block_numbers_on_disk.end() && min_block >= lower->first && max_block <= lower->second)
return true;
if (lower != block_numbers_on_disk.begin())
{
--lower;
if (min_block >= lower->first && max_block <= lower->second)
return true;
}
return false;
};
auto disks = getStoragePolicy()->getDisks();
for (auto disk_it = disks.rbegin(); disk_it != disks.rend(); ++disk_it)
{
auto disk_ptr = *disk_it;
for (auto it = disk_ptr->iterateDirectory(relative_data_path); it->isValid(); it->next())
{
auto min_max_block_number = MergeTreeWriteAheadLog::tryParseMinMaxBlockNumber(it->name());
if (min_max_block_number && is_range_on_disk(min_max_block_number->first, min_max_block_number->second))
{
LOG_DEBUG(log, "Removing from filesystem outdated WAL file " + it->name());
disk_ptr->remove(relative_data_path + it->name());
}
}
}
}
void MergeTreeData::rename(const String & new_table_path, const StorageID & new_table_id)
{
auto disks = getStoragePolicy()->getDisks();
@ -1875,6 +1934,7 @@ void MergeTreeData::renameTempPartAndReplace(
DataPartPtr covering_part;
DataPartsVector covered_parts = getActivePartsToReplace(part_info, part_name, covering_part, lock);
DataPartsVector covered_parts_in_memory;
if (covering_part)
{
@ -3690,6 +3750,7 @@ void MergeTreeData::MergesThrottler::add(size_t bytes, size_t rows)
void MergeTreeData::MergesThrottler::reset()
{
std::lock_guard lock(mutex);
have_bytes = 0;
have_rows = 0;
}

View File

@ -492,6 +492,9 @@ public:
void clearOldPartsFromFilesystem(bool force = false);
void clearPartsFromFilesystem(const DataPartsVector & parts);
/// Delete WAL files containing parts, that all already stored on disk.
void clearOldWriteAheadLogs();
/// Delete all directories which names begin with "tmp"
/// Set non-negative parameter value to override MergeTreeSettings temporary_directories_lifetime
/// Must be called with locked lockStructureForShare().

View File

@ -35,6 +35,7 @@ struct MergeTreeSettings : public SettingsCollection<MergeTreeSettings>
M(SettingUInt64, min_rows_for_compact_part, 0, "Minimal number of rows to create part in compact format instead of saving it in RAM", 0) \
M(SettingBool, in_memory_parts_enable_wal, 1, "Whether to write blocks in Native format to write-ahead-log before creation in-memory part", 0) \
M(SettingBool, in_memory_parts_insert_sync, 0, "", 0) \
M(SettingUInt64, write_ahead_log_max_bytes, 1024 * 1024 * 1024, "Rotate WAL, if it exceeds that amount of bytes", 0) \
\
/** Merge settings. */ \
M(SettingUInt64, merge_max_block_size, DEFAULT_MERGE_BLOCK_SIZE, "How many rows in blocks should be formed for merge operations.", 0) \

View File

@ -2,6 +2,7 @@
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergedBlockOutputStream.h>
#include <IO/ReadHelpers.h>
#include <Poco/File.h>
namespace DB
@ -48,7 +49,8 @@ void MergeTreeWriteAheadLog::write(const Block & block, const String & part_name
block_out->write(block);
block_out->flush();
if (out->count() > MAX_WAL_BYTES)
auto max_wal_bytes = storage.getSettings()->write_ahead_log_max_bytes;
if (out->count() > max_wal_bytes)
rotate();
}
@ -106,7 +108,7 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore()
/// If file is broken, do not write new parts to it.
/// But if it contains any part rotate and save them.
if (max_block_number == -1)
Poco::File(path).remove();
disk->remove(path);
else if (name == DEFAULT_WAL_FILE)
rotate();
@ -135,4 +137,22 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore()
return result;
}
std::optional<MergeTreeWriteAheadLog::MinMaxBlockNumber>
MergeTreeWriteAheadLog::tryParseMinMaxBlockNumber(const String & filename)
{
Int64 min_block;
Int64 max_block;
ReadBufferFromString in(filename);
if (!checkString(WAL_FILE_NAME, in)
|| !checkChar('_', in)
|| !tryReadIntText(min_block, in)
|| !checkChar('_', in)
|| !tryReadIntText(max_block, in))
{
return {};
}
return std::make_pair(min_block, max_block);
}
}

View File

@ -16,7 +16,6 @@ public:
constexpr static auto WAL_FILE_NAME = "wal";
constexpr static auto WAL_FILE_EXTENSION = ".bin";
constexpr static auto DEFAULT_WAL_FILE = "wal.bin";
constexpr static size_t MAX_WAL_BYTES = 1024 * 1024 * 1024;
MergeTreeWriteAheadLog(const MergeTreeData & storage_, const DiskPtr & disk_,
const String & name = DEFAULT_WAL_FILE);
@ -24,6 +23,9 @@ public:
void write(const Block & block, const String & part_name);
std::vector<MergeTreeMutableDataPartPtr> restore();
using MinMaxBlockNumber = std::pair<Int64, Int64>;
static std::optional<MinMaxBlockNumber> tryParseMinMaxBlockNumber(const String & filename);
private:
void init();
void rotate();

View File

@ -54,6 +54,7 @@ void ReplicatedMergeTreeCleanupThread::run()
void ReplicatedMergeTreeCleanupThread::iterate()
{
storage.clearOldPartsAndRemoveFromZK();
storage.clearOldWriteAheadLogs();
{
/// TODO: Implement tryLockStructureForShare.

View File

@ -1014,7 +1014,12 @@ bool ReplicatedMergeTreeQueue::shouldExecuteLogEntry(
auto part = data.getPartIfExists(name, {MergeTreeDataPartState::PreCommitted, MergeTreeDataPartState::Committed, MergeTreeDataPartState::Outdated});
if (part)
sum_parts_size_in_bytes += part->getBytesOnDisk();
{
if (auto * part_in_memory = dynamic_cast<const MergeTreeDataPartInMemory *>(part.get()))
sum_parts_size_in_bytes += part_in_memory->block.bytes();
else
sum_parts_size_in_bytes += part->getBytesOnDisk();
}
}
if (merger_mutator.merges_blocker.isCancelled())

View File

@ -87,6 +87,7 @@ StorageMergeTree::StorageMergeTree(
void StorageMergeTree::startup()
{
clearOldPartsFromFilesystem();
clearOldWriteAheadLogs();
/// Temporary directories contain incomplete results of merges (after forced restart)
/// and don't allow to reinitialize them, so delete each of them immediately
@ -632,8 +633,22 @@ bool StorageMergeTree::merge(
new_part = merger_mutator.mergePartsToTemporaryPart(
future_part, *merge_entry, table_lock_holder, time(nullptr),
merging_tagger->reserved_space, deduplicate, force_ttl);
merger_mutator.renameMergedTemporaryPart(new_part, future_part.parts, nullptr);
DataPartsVector parts_to_remove_immediately;
for (const auto & part : future_part.parts)
{
part->notifyMerged();
if (isInMemoryPart(part))
{
modifyPartState(part, DataPartState::Deleting);
parts_to_remove_immediately.push_back(part);
}
}
removePartsFinally(parts_to_remove_immediately);
merging_tagger->is_successful = true;
write_part_log({});
}
@ -644,9 +659,6 @@ bool StorageMergeTree::merge(
throw;
}
for (const auto & part : future_part.parts)
part->notifyMerged();
return true;
}
@ -818,6 +830,7 @@ BackgroundProcessingPoolTaskResult StorageMergeTree::mergeMutateTask()
clearOldTemporaryDirectories();
}
clearOldMutations();
clearOldWriteAheadLogs();
}
///TODO: read deduplicate option from table config

View File

@ -298,6 +298,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
/// Temporary directories contain untinalized results of Merges or Fetches (after forced restart)
/// and don't allow to reinitialize them, so delete each of them immediately
clearOldTemporaryDirectories(0);
clearOldWriteAheadLogs();
}
createNewZooKeeperNodes();
@ -1050,7 +1051,8 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
for (auto & part_ptr : parts)
{
ttl_infos.update(part_ptr->ttl_infos);
max_volume_index = std::max(max_volume_index, getStoragePolicy()->getVolumeIndexByDisk(part_ptr->disk));
if (part_ptr->isStoredOnDisk())
max_volume_index = std::max(max_volume_index, getStoragePolicy()->getVolumeIndexByDisk(part_ptr->disk));
}
ReservationPtr reserved_space = reserveSpacePreferringTTLRules(estimated_space_for_merge,
ttl_infos, time(nullptr), max_volume_index);
@ -1091,6 +1093,20 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
try
{
checkPartChecksumsAndCommit(transaction, part);
DataPartsVector parts_to_remove_immediatly;
for (const auto & part_ptr : parts)
{
part_ptr->notifyMerged();
if (isInMemoryPart(part_ptr))
{
modifyPartState(part_ptr, DataPartState::Deleting);
parts_to_remove_immediatly.push_back(part_ptr);
}
}
tryRemovePartsFromZooKeeperWithRetries(parts_to_remove_immediatly);
removePartsFinally(parts_to_remove_immediatly);
}
catch (const Exception & e)
{

View File

@ -2,5 +2,7 @@
<merge_tree>
<max_bytes_to_merge_at_min_space_in_pool>1</max_bytes_to_merge_at_min_space_in_pool>
<max_bytes_to_merge_at_max_space_in_pool>2</max_bytes_to_merge_at_max_space_in_pool>
<number_of_free_entries_in_pool_to_lower_max_size_of_merge>100</number_of_free_entries_in_pool_to_lower_max_size_of_merge>
<max_replicated_merges_in_queue>0</max_replicated_merges_in_queue>
</merge_tree>
</yandex>

View File

@ -70,8 +70,11 @@ node6 = cluster.add_instance('node6', config_dir='configs', main_configs=['confi
settings_in_memory = {'index_granularity_bytes' : 10485760, 'min_rows_for_wide_part' : 512, 'min_rows_for_compact_part' : 256}
node9 = cluster.add_instance('node9', config_dir="configs", main_configs=['configs/do_not_merge.xml'], with_zookeeper=True, stay_alive=True)
node10 = cluster.add_instance('node10', config_dir="configs", main_configs=['configs/do_not_merge.xml'], with_zookeeper=True, stay_alive=True)
node9 = cluster.add_instance('node9', config_dir="configs", with_zookeeper=True)
node10 = cluster.add_instance('node10', config_dir="configs", with_zookeeper=True)
node11 = cluster.add_instance('node11', config_dir="configs", main_configs=['configs/do_not_merge.xml'], with_zookeeper=True, stay_alive=True)
node12 = cluster.add_instance('node12', config_dir="configs", main_configs=['configs/do_not_merge.xml'], with_zookeeper=True, stay_alive=True)
@pytest.fixture(scope="module")
def start_cluster():
@ -85,7 +88,8 @@ def start_cluster():
create_tables('polymorphic_table_wide', [node3, node4], [settings_wide, settings_compact], "shard2")
create_tables_old_format('polymorphic_table', [node5, node6], "shard3")
create_tables('in_memory_table', [node9, node10], [settings_in_memory, settings_in_memory], "shard4")
create_tables('wal_table', [node9, node10], [settings_in_memory, settings_in_memory], "shard4")
create_tables('wal_table', [node11, node12], [settings_in_memory, settings_in_memory], "shard4")
create_tables('restore_table', [node11, node12], [settings_in_memory, settings_in_memory], "shard5")
yield cluster
@ -317,66 +321,106 @@ def test_in_memory(start_cluster):
assert TSV(node10.query("SELECT part_type, count() FROM system.parts " \
"WHERE table = 'in_memory_table' AND active GROUP BY part_type ORDER BY part_type")) == TSV(expected)
node9.query("SYSTEM START MERGES")
node10.query("SYSTEM START MERGES")
assert_eq_with_retry(node9, "OPTIMIZE TABLE in_memory_table FINAL SETTINGS optimize_throw_if_noop = 1", "")
node10.query("SYSTEM SYNC REPLICA in_memory_table", timeout=20)
assert node9.query("SELECT count() FROM in_memory_table") == "1300\n"
assert node10.query("SELECT count() FROM in_memory_table") == "1300\n"
assert TSV(node9.query("SELECT part_type, count() FROM system.parts " \
"WHERE table = 'in_memory_table' AND active GROUP BY part_type ORDER BY part_type")) == TSV("Wide\t1\n")
assert TSV(node10.query("SELECT part_type, count() FROM system.parts " \
"WHERE table = 'in_memory_table' AND active GROUP BY part_type ORDER BY part_type")) == TSV("Wide\t1\n")
def test_in_memory_wal(start_cluster):
# Merges are disabled in config
for i in range(5):
insert_random_data('wal_table', node9, 50)
node10.query("SYSTEM SYNC REPLICA wal_table", timeout=20)
insert_random_data('wal_table', node11, 50)
node12.query("SYSTEM SYNC REPLICA wal_table", timeout=20)
def check(node, rows, parts):
node.query("SELECT count() FROM wal_table") == "{}\n".format(rows)
node.query("SELECT count() FROM system.parts WHERE table = 'wal_table' AND part_type = 'InMemory'") == "{}\n".format(parts)
check(node9, 250, 5)
check(node10, 250, 5)
check(node11, 250, 5)
check(node12, 250, 5)
# WAL works at inserts
node9.restart_clickhouse(kill=True)
check(node9, 250, 5)
node11.restart_clickhouse(kill=True)
check(node11, 250, 5)
# WAL works at fetches
node10.restart_clickhouse(kill=True)
check(node10, 250, 5)
node12.restart_clickhouse(kill=True)
check(node12, 250, 5)
insert_random_data('wal_table', node9, 50)
node10.query("SYSTEM SYNC REPLICA wal_table", timeout=20)
insert_random_data('wal_table', node11, 50)
node12.query("SYSTEM SYNC REPLICA wal_table", timeout=20)
# Disable replication
with PartitionManager() as pm:
pm.partition_instances(node9, node10)
check(node9, 300, 6)
pm.partition_instances(node11, node12)
check(node11, 300, 6)
wal_file = os.path.join(node9.path, "database/data/default/wal_table/wal.bin")
wal_file = os.path.join(node11.path, "database/data/default/wal_table/wal.bin")
# Corrupt wal file
open(wal_file, 'rw+').truncate(os.path.getsize(wal_file) - 10)
node9.restart_clickhouse(kill=True)
node11.restart_clickhouse(kill=True)
# Broken part is lost, but other restored successfully
check(node9, 250, 5)
check(node11, 250, 5)
# WAL with blocks from 0 to 4
broken_wal_file = os.path.join(node9.path, "database/data/default/wal_table/wal_0_4.bin")
broken_wal_file = os.path.join(node11.path, "database/data/default/wal_table/wal_0_4.bin")
assert os.path.exists(broken_wal_file)
# Fetch lost part from replica
node9.query("SYSTEM SYNC REPLICA wal_table", timeout=20)
check(node9, 300, 6)
node11.query("SYSTEM SYNC REPLICA wal_table", timeout=20)
check(node11, 300, 6)
#Check that new data is written to new wal, but old is still exists for restoring
assert os.path.getsize(wal_file) > 0
assert os.path.getsize(broken_wal_file)
assert os.path.exists(broken_wal_file)
# Data is lost without WAL
node9.query("ALTER TABLE wal_table MODIFY SETTING in_memory_parts_enable_wal = 0")
node11.query("ALTER TABLE wal_table MODIFY SETTING in_memory_parts_enable_wal = 0")
with PartitionManager() as pm:
pm.partition_instances(node9, node10)
pm.partition_instances(node11, node12)
insert_random_data('wal_table', node9, 50)
check(node9, 350, 7)
insert_random_data('wal_table', node11, 50)
check(node11, 350, 7)
node9.restart_clickhouse(kill=True)
check(node9, 300, 6)
node11.restart_clickhouse(kill=True)
check(node11, 300, 6)
def test_in_memory_wal_rotate(start_cluster):
# Write every part to single wal
node11.query("ALTER TABLE restore_table MODIFY SETTING write_ahead_log_max_bytes = 10")
for i in range(5):
insert_random_data('restore_table', node11, 50)
for i in range(5):
wal_file = os.path.join(node11.path, "database/data/default/restore_table/wal_{0}_{0}.bin".format(i))
assert os.path.exists(wal_file)
for node in [node11, node12]:
node.query("ALTER TABLE restore_table MODIFY SETTING number_of_free_entries_in_pool_to_lower_max_size_of_merge = 0")
node.query("ALTER TABLE restore_table MODIFY SETTING max_bytes_to_merge_at_max_space_in_pool = 10000000")
assert_eq_with_retry(node11, "OPTIMIZE TABLE restore_table FINAL SETTINGS optimize_throw_if_noop = 1", "")
# Restart to be sure, that clearing stale logs task was ran
node11.restart_clickhouse(kill=True)
for i in range(5):
wal_file = os.path.join(node11.path, "database/data/default/restore_table/wal_{0}_{0}.bin".format(i))
assert not os.path.exists(wal_file)
# New wal file was created and ready to write part to it
wal_file = os.path.join(node11.path, "database/data/default/restore_table/wal.bin")
assert os.path.exists(wal_file)
assert os.path.getsize(wal_file) == 0
def test_polymorphic_parts_index(start_cluster):
node1.query('''