Add some comments

This commit is contained in:
alesapin 2021-04-02 20:46:01 +03:00
parent a555d078a2
commit ab34873344
2 changed files with 70 additions and 9 deletions

View File

@ -13,12 +13,14 @@ namespace DB
namespace
{
/// Deduplication operation part was dropped or added
enum class MergeTreeDeduplicationOp : uint8_t
{
ADD = 1,
DROP = 2,
};
/// Record for deduplication on disk
struct MergeTreeDeduplicationLogRecord
{
MergeTreeDeduplicationOp operation;
@ -93,7 +95,7 @@ void MergeTreeDeduplicationLog::load()
existing_logs[log_number] = {path, 0};
}
/// Order important
/// Order important, we load history from the begging to the end
for (auto & [log_number, desc] : existing_logs)
{
try
@ -107,7 +109,10 @@ void MergeTreeDeduplicationLog::load()
}
}
/// Start new log, drop previous
rotateAndDropIfNeeded();
/// Can happen in case we have unfinished log
if (!current_writer)
current_writer = std::make_unique<WriteBufferFromFile>(existing_logs.rbegin()->second.path, DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY);
}
@ -147,19 +152,24 @@ void MergeTreeDeduplicationLog::dropOutdatedLogs()
{
size_t current_sum = 0;
size_t remove_from_value = 0;
/// Go from end to the beginning
for (auto itr = existing_logs.rbegin(); itr != existing_logs.rend(); ++itr)
{
auto & description = itr->second;
if (current_sum > deduplication_window)
{
/// We have more logs than required, all older files (including current) can be dropped
remove_from_value = itr->first;
break;
}
auto & description = itr->second;
current_sum += description.entries_count;
}
/// If we found some logs to drop
if (remove_from_value != 0)
{
/// Go from beginning to the end and drop all outdated logs
for (auto itr = existing_logs.begin(); itr != existing_logs.end();)
{
size_t number = itr->first;
@ -174,18 +184,19 @@ void MergeTreeDeduplicationLog::dropOutdatedLogs()
void MergeTreeDeduplicationLog::rotateAndDropIfNeeded()
{
/// If we don't have logs at all or already have enough records in current
if (existing_logs.empty() || existing_logs[current_log_number].entries_count >= rotate_interval)
{
rotate();
dropOutdatedLogs();
}
}
std::pair<MergeTreePartInfo, bool> MergeTreeDeduplicationLog::addPart(const std::string & block_id, const MergeTreePartInfo & part_info)
{
std::lock_guard lock(state_mutex);
/// If we alredy have this block let's deduplicate it
if (deduplication_map.contains(block_id))
{
auto info = deduplication_map.get(block_id);
@ -194,14 +205,18 @@ std::pair<MergeTreePartInfo, bool> MergeTreeDeduplicationLog::addPart(const std:
assert(current_writer != nullptr);
/// Create new record
MergeTreeDeduplicationLogRecord record;
record.operation = MergeTreeDeduplicationOp::ADD;
record.part_name = part_info.getPartName();
record.block_id = block_id;
/// Write it to disk
writeRecord(record, *current_writer);
/// We have one more record in current log
existing_logs[current_log_number].entries_count++;
/// Add to deduplication map
deduplication_map.insert(record.block_id, part_info);
/// Rotate and drop old logs if needed
rotateAndDropIfNeeded();
return std::make_pair(part_info, true);
@ -213,20 +228,29 @@ void MergeTreeDeduplicationLog::dropPart(const MergeTreePartInfo & drop_part_inf
assert(current_writer != nullptr);
for (auto itr = deduplication_map.begin(); itr != deduplication_map.end();)
for (auto itr = deduplication_map.begin(); itr != deduplication_map.end(); /* no increment here, we erasing from map */)
{
const auto & part_info = itr->value;
/// Part is covered by dropped part, let's remove it from
/// deduplication history
if (drop_part_info.contains(part_info))
{
/// Create drop record
MergeTreeDeduplicationLogRecord record;
record.operation = MergeTreeDeduplicationOp::DROP;
record.part_name = part_info.getPartName();
record.block_id = itr->key;
/// Write it to disk
writeRecord(record, *current_writer);
/// We have one more record on disk
existing_logs[current_log_number].entries_count++;
/// Increment itr before erase, otherwise it will invalidated
++itr;
/// Remove block_id from in-memory table
deduplication_map.erase(record.block_id);
/// Rotate and drop old logs if needed
rotateAndDropIfNeeded();
}
else

View File

@ -12,13 +12,19 @@
namespace DB
{
/// Description of dedupliction log
struct MergeTreeDeduplicationLogNameDescription
{
/// Path to log
std::string path;
/// How many entries we have in log
size_t entries_count;
};
/// Simple string-key HashTable with fixed size based on STL containers.
/// Preserves order using linked list and remove elements
/// on overflow in FIFO order.
template <typename V>
class LimitedOrderedHashMap
{
@ -72,7 +78,6 @@ public:
return true;
}
bool insert(const std::string & key, const V & value)
{
auto it = map.find(key);
@ -108,6 +113,14 @@ public:
const_reverse_iterator rend() const { return queue.crend(); }
};
/// Fixed-size log for deduplication in non-replicated MergeTree.
/// Stores records on disk for zero-level parts in human-readable format:
/// operation part_name partition_id_check_sum
/// 1 88_18_18_0 88_10619499460461868496_9553701830997749308
/// 2 77_14_14_0 77_15147918179036854170_6725063583757244937
/// 2 77_15_15_0 77_14977227047908934259_8047656067364802772
/// 1 77_20_20_0 77_15147918179036854170_6725063583757244937
/// Also stores them in memory in hash table with limited size.
class MergeTreeDeduplicationLog
{
public:
@ -116,27 +129,51 @@ public:
size_t deduplication_window_,
const MergeTreeDataFormatVersion & format_version_);
/// Add part into in-memory hash table and to disk
/// Return true and part info if insertion was successful.
/// Otherwise, in case of duplicate, return false and previous part name with same hash (useful for logging)
std::pair<MergeTreePartInfo, bool> addPart(const std::string & block_id, const MergeTreePartInfo & part);
/// Remove all covered parts from in memory table and add DROP records to the disk
void dropPart(const MergeTreePartInfo & part);
/// Load history from disk. Ignores broken logs.
void load();
private:
const std::string logs_dir;
/// Size of deduplication window
const size_t deduplication_window;
/// How often we create new logs. Not very important,
/// default value equals deduplication_window * 2
const size_t rotate_interval;
const MergeTreeDataFormatVersion format_version;
/// Current log number. Always growing number.
size_t current_log_number = 0;
/// All existing logs in order of their numbers
std::map<size_t, MergeTreeDeduplicationLogNameDescription> existing_logs;
/// In memory hash-table
LimitedOrderedHashMap<MergeTreePartInfo> deduplication_map;
/// Writer to the current log file
std::unique_ptr<WriteBufferFromFile> current_writer;
/// Overall mutex because we can have a lot of cocurrent inserts
std::mutex state_mutex;
/// Start new log
void rotate();
/// Remove all old logs with non-needed records for deduplication_window
void dropOutdatedLogs();
/// Execute both previous methods if needed
void rotateAndDropIfNeeded();
/// Load single log from disk. In case of corruption throws exceptions
size_t loadSingleLog(const std::string & path);
};