in-memory parts: delay merges

This commit is contained in:
Anton Popov 2020-05-20 15:02:02 +03:00
parent c61f1dbac8
commit 4fb6492b08
7 changed files with 71 additions and 12 deletions

View File

@ -1,4 +1,5 @@
#include <Storages/MergeTree/MergeTreeBlockOutputStream.h>
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
#include <Storages/StorageMergeTree.h>
#include <Interpreters/PartLog.h>
@ -31,11 +32,24 @@ void MergeTreeBlockOutputStream::write(const Block & block)
PartLog::addNewPart(storage.global_context, part, watch.elapsed());
if (isInMemoryPart(part) && storage.getSettings()->in_memory_parts_insert_sync)
if (auto * part_in_memory = dynamic_cast<MergeTreeDataPartInMemory *>(part.get()))
{
if (!part->waitUntilMerged(in_memory_parts_timeout))
throw Exception("Timeout exceeded while waiting to write part "
+ part->name + " on disk", ErrorCodes::TIMEOUT_EXCEEDED);
storage.in_memory_merges_throttler.add(part_in_memory->block.bytes(), part_in_memory->rows_count);
auto settings = storage.getSettings();
if (settings->in_memory_parts_insert_sync)
{
if (!part->waitUntilMerged(in_memory_parts_timeout))
throw Exception("Timeout exceeded while waiting to write part "
+ part->name + " on disk", ErrorCodes::TIMEOUT_EXCEEDED);
}
else if (storage.merging_mutating_task_handle && !storage.in_memory_merges_throttler.needDelayMerge())
{
storage.in_memory_merges_throttler.reset();
storage.merging_mutating_task_handle->wake();
}
return;
}
/// Initiate async merge - it will be done if it's good time for merge and if there are space in 'background_pool'.

View File

@ -142,6 +142,7 @@ MergeTreeData::MergeTreeData(
, data_parts_by_info(data_parts_indexes.get<TagByInfo>())
, data_parts_by_state_and_info(data_parts_indexes.get<TagByStateAndInfo>())
, parts_mover(this)
, in_memory_merges_throttler(storage_settings.get()->min_bytes_for_compact_part, storage_settings.get()->min_rows_for_compact_part)
{
if (relative_data_path.empty())
throw Exception("MergeTree storages require data path", ErrorCodes::INCORRECT_FILE_NAME);
@ -3673,4 +3674,24 @@ NamesAndTypesList MergeTreeData::getVirtuals() const
NameAndTypePair("_sample_factor", std::make_shared<DataTypeFloat64>()),
};
}
bool MergeTreeData::MergesThrottler::needDelayMerge() const
{
std::lock_guard lock(mutex);
return (!max_bytes || have_bytes < max_bytes) && (!max_rows || have_rows < max_rows);
}
void MergeTreeData::MergesThrottler::add(size_t bytes, size_t rows)
{
std::lock_guard lock(mutex);
have_bytes += bytes;
have_rows += rows;
}
void MergeTreeData::MergesThrottler::reset()
{
have_bytes = 0;
have_rows = 0;
}
}

View File

@ -921,6 +921,25 @@ protected:
bool areBackgroundMovesNeeded() const;
struct MergesThrottler
{
mutable std::mutex mutex;
size_t have_bytes = 0;
size_t have_rows = 0;
size_t max_bytes;
size_t max_rows;
MergesThrottler(size_t max_bytes_, size_t max_rows_)
: max_bytes(max_bytes_), max_rows(max_rows_) {}
bool needDelayMerge() const;
void add(size_t bytes, size_t rows);
void reset();
};
MergesThrottler in_memory_merges_throttler;
private:
/// RAII Wrapper for atomic work with currently moving parts
/// Acuire them in constructor and remove them in destructor

View File

@ -998,7 +998,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mutatePartToTempor
commands_for_part.emplace_back(command);
}
if (!isStorageTouchedByMutations(storage_from_source_part, commands_for_part, context_for_reading))
if (source_part->isStoredOnDisk() && !isStorageTouchedByMutations(storage_from_source_part, commands_for_part, context_for_reading))
{
LOG_TRACE(log, "Part " << source_part->name << " doesn't change up to mutation version " << future_part.part_info.mutation);
return data.cloneAndLoadDataPartOnSameDisk(source_part, "tmp_clone_", future_part.part_info);

View File

@ -45,6 +45,9 @@ void MergeTreeDataPartWriterInMemory::write(
result_block.insert(block.getByName(col.name));
}
index_granularity.appendMark(result_block.rows());
if (with_final_mark)
index_granularity.appendMark(0);
part->block = std::move(result_block);
block_written = true;
}
@ -55,10 +58,6 @@ void MergeTreeDataPartWriterInMemory::calculateAndSerializePrimaryIndex(const Bl
if (!rows)
return;
index_granularity.appendMark(rows);
if (with_final_mark)
index_granularity.appendMark(0);
size_t primary_columns_num = primary_index_block.columns();
index_columns.resize(primary_columns_num);
for (size_t i = 0; i < primary_columns_num; ++i)

View File

@ -11,6 +11,7 @@ namespace ErrorCodes
{
extern const int UNKNOWN_FORMAT_VERSION;
extern const int CANNOT_READ_ALL_DATA;
extern const int BAD_DATA_PART_NAME;
}
@ -95,7 +96,9 @@ MergeTreeData::MutableDataPartsVector MergeTreeWriteAheadLog::restore()
}
catch (const Exception & e)
{
if (e.code() == ErrorCodes::CANNOT_READ_ALL_DATA || e.code() == ErrorCodes::UNKNOWN_FORMAT_VERSION)
if (e.code() == ErrorCodes::CANNOT_READ_ALL_DATA
|| e.code() == ErrorCodes::UNKNOWN_FORMAT_VERSION
|| e.code() == ErrorCodes::BAD_DATA_PART_NAME)
{
LOG_WARNING(&Logger::get(storage.getLogName() + " (WriteAheadLog)"),
"WAL file '" << path << "' is broken. " << e.displayText());

View File

@ -822,8 +822,11 @@ BackgroundProcessingPoolTaskResult StorageMergeTree::mergeMutateTask()
///TODO: read deduplicate option from table config
if (merge(false /*aggressive*/, {} /*partition_id*/, false /*final*/, false /*deduplicate*/))
return BackgroundProcessingPoolTaskResult::SUCCESS;
{
return in_memory_merges_throttler.needDelayMerge()
? BackgroundProcessingPoolTaskResult::NOTHING_TO_DO
: BackgroundProcessingPoolTaskResult::SUCCESS;
}
if (tryMutatePart())
return BackgroundProcessingPoolTaskResult::SUCCESS;