Add tests

This commit is contained in:
alesapin 2021-04-02 19:45:18 +03:00
parent 6f36661575
commit 759dd79c76
9 changed files with 225 additions and 77 deletions

View File

@ -35,23 +35,13 @@ void MergeTreeBlockOutputStream::write(const Block & block)
if (!part)
continue;
if (auto & deduplication_log = storage.getDeduplicationLog())
if (storage.renameTempPartAndAdd(part, &storage.increment, nullptr, storage.getDeduplicationLog()))
{
String block_id = part->getZeroLevelPartBlockID();
auto res = deduplication_log->addPart(block_id, part);
if (!res.second)
{
LOG_INFO(storage.log, "Block with ID {} already exists as part {}; ignoring it", block_id, res.first.getPartName());
continue;
}
PartLog::addNewPart(storage.global_context, part, watch.elapsed());
/// Initiate async merge - it will be done if it's good time for merge and if there are space in 'background_pool'.
storage.background_executor.triggerTask();
}
storage.renameTempPartAndAdd(part, &storage.increment);
PartLog::addNewPart(storage.global_context, part, watch.elapsed());
/// Initiate async merge - it will be done if it's good time for merge and if there are space in 'background_pool'.
storage.background_executor.triggerTask();
}
}

View File

@ -2022,7 +2022,7 @@ MergeTreeData::DataPartsVector MergeTreeData::getActivePartsToReplace(
}
bool MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction)
bool MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction, MergeTreeDeduplicationLog * deduplication_log)
{
if (out_transaction && &out_transaction->data != this)
throw Exception("MergeTreeData::Transaction for one table cannot be used with another. It is a bug.",
@ -2031,7 +2031,7 @@ bool MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrem
DataPartsVector covered_parts;
{
auto lock = lockParts();
if (!renameTempPartAndReplace(part, increment, out_transaction, lock, &covered_parts))
if (!renameTempPartAndReplace(part, increment, out_transaction, lock, &covered_parts, deduplication_log))
return false;
}
if (!covered_parts.empty())
@ -2044,7 +2044,7 @@ bool MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrem
bool MergeTreeData::renameTempPartAndReplace(
MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction,
std::unique_lock<std::mutex> & lock, DataPartsVector * out_covered_parts)
std::unique_lock<std::mutex> & lock, DataPartsVector * out_covered_parts, MergeTreeDeduplicationLog * deduplication_log)
{
if (out_transaction && &out_transaction->data != this)
throw Exception("MergeTreeData::Transaction for one table cannot be used with another. It is a bug.",
@ -2099,6 +2099,17 @@ bool MergeTreeData::renameTempPartAndReplace(
return false;
}
if (deduplication_log)
{
String block_id = part->getZeroLevelPartBlockID();
auto res = deduplication_log->addPart(block_id, part_info);
if (!res.second)
{
LOG_INFO(log, "Block with ID {} already exists as part {}; ignoring it", block_id, res.first.getPartName());
return false;
}
}
/// All checks are passed. Now we can rename the part on disk.
/// So, we maintain invariant: if a non-temporary part in filesystem then it is in data_parts
///
@ -2155,7 +2166,7 @@ bool MergeTreeData::renameTempPartAndReplace(
}
MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction)
MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction, MergeTreeDeduplicationLog * deduplication_log)
{
if (out_transaction && &out_transaction->data != this)
throw Exception("MergeTreeData::Transaction for one table cannot be used with another. It is a bug.",
@ -2164,7 +2175,7 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
DataPartsVector covered_parts;
{
auto lock = lockParts();
renameTempPartAndReplace(part, increment, out_transaction, lock, &covered_parts);
renameTempPartAndReplace(part, increment, out_transaction, lock, &covered_parts, deduplication_log);
}
return covered_parts;
}

View File

@ -54,6 +54,7 @@ struct CurrentlySubmergingEmergingTagger;
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
using ManyExpressionActions = std::vector<ExpressionActionsPtr>;
class MergeTreeDeduplicationLog;
namespace ErrorCodes
{
@ -447,18 +448,18 @@ public:
/// active set later with out_transaction->commit()).
/// Else, commits the part immediately.
/// Returns true if part was added. Returns false if part is covered by bigger part.
bool renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrement * increment = nullptr, Transaction * out_transaction = nullptr);
bool renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrement * increment = nullptr, Transaction * out_transaction = nullptr, MergeTreeDeduplicationLog * deduplication_log = nullptr);
/// The same as renameTempPartAndAdd but the block range of the part can contain existing parts.
/// Returns all parts covered by the added part (in ascending order).
/// If out_transaction == nullptr, marks covered parts as Outdated.
DataPartsVector renameTempPartAndReplace(
MutableDataPartPtr & part, SimpleIncrement * increment = nullptr, Transaction * out_transaction = nullptr);
MutableDataPartPtr & part, SimpleIncrement * increment = nullptr, Transaction * out_transaction = nullptr, MergeTreeDeduplicationLog * deduplication_log = nullptr);
/// Low-level version of previous one, doesn't lock mutex
bool renameTempPartAndReplace(
MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction, DataPartsLock & lock,
DataPartsVector * out_covered_parts = nullptr);
DataPartsVector * out_covered_parts = nullptr, MergeTreeDeduplicationLog * deduplication_log = nullptr);
/// Remove parts from working set immediately (without wait for background

View File

@ -3,6 +3,7 @@
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/join.hpp>
#include <boost/algorithm/string/trim.hpp>
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
@ -28,11 +29,12 @@ struct MergeTreeDeduplicationLogRecord
void writeRecord(const MergeTreeDeduplicationLogRecord & record, WriteBuffer & out)
{
writeIntText(static_cast<uint8_t>(record.operation), out);
writeChar(' ', out);
writeStringBinary(record.part_name, out);
writeChar(' ', out);
writeStringBinary(record.block_id, out);
writeChar('\t', out);
writeString(record.part_name, out);
writeChar('\t', out);
writeString(record.block_id, out);
writeChar('\n', out);
out.next();
}
void readRecord(MergeTreeDeduplicationLogRecord & record, ReadBuffer & in)
@ -40,10 +42,10 @@ void readRecord(MergeTreeDeduplicationLogRecord & record, ReadBuffer & in)
uint8_t op;
readIntText(op, in);
record.operation = static_cast<MergeTreeDeduplicationOp>(op);
assertChar(' ', in);
readStringBinary(record.part_name, in);
assertChar(' ', in);
readStringBinary(record.block_id, in);
assertChar('\t', in);
readString(record.part_name, in);
assertChar('\t', in);
readString(record.block_id, in);
assertChar('\n', in);
}
@ -99,6 +101,8 @@ void MergeTreeDeduplicationLog::load()
}
rotateAndDropIfNeeded();
if (!current_writer)
current_writer = std::make_unique<WriteBufferFromFile>(existing_logs.rbegin()->second.path, DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY);
}
size_t MergeTreeDeduplicationLog::loadSingleLog(const std::string & path)
@ -163,80 +167,64 @@ void MergeTreeDeduplicationLog::dropOutdatedLogs()
void MergeTreeDeduplicationLog::rotateAndDropIfNeeded()
{
if (existing_logs.empty() || existing_logs[current_log_number].entries_count > rotate_interval)
if (existing_logs.empty() || existing_logs[current_log_number].entries_count >= rotate_interval)
{
rotate();
dropOutdatedLogs();
}
}
std::pair<MergeTreePartInfo, bool> MergeTreeDeduplicationLog::addPart(const std::string & block_id, const MergeTreeData::MutableDataPartPtr & part)
std::pair<MergeTreePartInfo, bool> MergeTreeDeduplicationLog::addPart(const std::string & block_id, const MergeTreePartInfo & part_info)
{
std::lock_guard lock(state_mutex);
if (deduplication_map.contains(block_id))
return std::make_pair(deduplication_map.get(block_id), false);
{
auto info = deduplication_map.get(block_id);
return std::make_pair(info, false);
}
assert(current_writer != nullptr);
MergeTreeDeduplicationLogRecord record;
record.operation = MergeTreeDeduplicationOp::ADD;
record.part_name = part->name;
record.part_name = part_info.getPartName();
record.block_id = block_id;
writeRecord(record, *current_writer);
existing_logs[current_log_number].entries_count++;
deduplication_map.insert(record.block_id, part->info);
deduplication_map.insert(record.block_id, part_info);
rotateAndDropIfNeeded();
return std::make_pair(part->info, true);
return std::make_pair(part_info, true);
}
std::pair<MergeTreePartInfo, bool> MergeTreeDeduplicationLog::addPart(const MergeTreeData::MutableDataPartPtr & part)
{
return addPart(part->getZeroLevelPartBlockID(), part);
}
void MergeTreeDeduplicationLog::dropPart(const MergeTreeData::DataPartPtr & part)
void MergeTreeDeduplicationLog::dropPart(const MergeTreePartInfo & drop_part_info)
{
std::lock_guard lock(state_mutex);
assert(current_writer != nullptr);
for (auto itr = deduplication_map.begin(); itr != deduplication_map.end(); ++itr)
for (auto itr = deduplication_map.begin(); itr != deduplication_map.end();)
{
const auto & part_info = itr->value;
if (part->info.contains(part_info))
if (drop_part_info.contains(part_info))
{
MergeTreeDeduplicationLogRecord record;
record.operation = MergeTreeDeduplicationOp::DROP;
record.part_name = part_info.getPartName();
record.block_id = itr->key;
writeRecord(record, *current_writer);
existing_logs[current_log_number].entries_count++;
deduplication_map.erase(itr->key);
++itr;
deduplication_map.erase(record.block_id);
rotateAndDropIfNeeded();
}
}
}
void MergeTreeDeduplicationLog::dropPartition(const std::string & partition_id)
{
std::lock_guard lock(state_mutex);
assert(current_writer != nullptr);
for (auto itr = deduplication_map.begin(); itr != deduplication_map.end(); ++itr)
{
const auto & part_info = itr->value;
if (part_info.partition_id == partition_id)
else
{
MergeTreeDeduplicationLogRecord record;
record.operation = MergeTreeDeduplicationOp::DROP;
record.part_name = part_info.getPartName();
record.block_id = itr->key;
writeRecord(record, *current_writer);
deduplication_map.erase(itr->key);
existing_logs[current_log_number].entries_count++;
rotateAndDropIfNeeded();
++itr;
}
}
}

View File

@ -1,8 +1,13 @@
#pragma once
#include <Core/Types.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <common/StringRef.h>
#include <IO/WriteBufferFromFile.h>
#include <Storages/MergeTree/MergeTreePartInfo.h>
#include <map>
#include <list>
#include <mutex>
#include <string>
#include <unordered_map>
namespace DB
{
@ -111,10 +116,8 @@ public:
size_t deduplication_window_,
const MergeTreeDataFormatVersion & format_version_);
std::pair<MergeTreePartInfo, bool> addPart(const std::string & block_id, const MergeTreeData::MutableDataPartPtr & part);
std::pair<MergeTreePartInfo, bool> addPart(const MergeTreeData::MutableDataPartPtr & part);
void dropPart(const MergeTreeData::DataPartPtr & part);
void dropPartition(const std::string & partition_id);
std::pair<MergeTreePartInfo, bool> addPart(const std::string & block_id, const MergeTreePartInfo & part);
void dropPart(const MergeTreePartInfo & part);
void load();

View File

@ -98,7 +98,7 @@ StorageMergeTree::StorageMergeTree(
if (settings->non_replicated_deduplication_window != 0)
{
std::string path = getDataPaths()[0] + "/deduplication_logs";
deduplication_log.emplace(path, settings->non_replicated_deduplication_window, format_version);
deduplication_log = std::make_unique<MergeTreeDeduplicationLog>(path, settings->non_replicated_deduplication_window, format_version);
deduplication_log->load();
}
}
@ -1220,7 +1220,7 @@ void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, bool
if (deduplication_log)
{
for (const auto & part : parts_to_remove)
deduplication_log->dropPart(part);
deduplication_log->dropPart(part->info);
}
if (detach)

View File

@ -96,7 +96,7 @@ public:
std::optional<JobAndPool> getDataProcessingJob() override;
std::optional<MergeTreeDeduplicationLog> & getDeduplicationLog() { return deduplication_log; }
MergeTreeDeduplicationLog * getDeduplicationLog() { return deduplication_log.get(); }
private:
/// Mutex and condvar for synchronous mutations wait
@ -109,7 +109,7 @@ private:
BackgroundJobsExecutor background_executor;
BackgroundMovesExecutor background_moves_executor;
std::optional<MergeTreeDeduplicationLog> deduplication_log;
std::unique_ptr<MergeTreeDeduplicationLog> deduplication_log;
/// For block numbers.
SimpleIncrement increment;

View File

@ -0,0 +1,50 @@
1 1
1 1
===============
1 1
1 1
2 2
3 3
4 4
===============
1 1
1 1
2 2
3 3
4 4
5 5
6 6
7 7
===============
1 1
1 1
2 2
3 3
4 4
5 5
6 6
7 7
8 8
9 9
10 10
11 11
12 12
===============
10 10
12 12
===============
1 1
1 1
2 2
3 3
4 4
5 5
6 6
8 8
9 9
11 11
12 12
===============
88 11 11
77 11 11
77 12 12

View File

@ -0,0 +1,105 @@
DROP TABLE IF EXISTS merge_tree_deduplication;
CREATE TABLE merge_tree_deduplication
(
key UInt64,
value String,
part UInt8 DEFAULT 77
)
ENGINE=MergeTree()
ORDER BY key
PARTITION BY part
SETTINGS non_replicated_deduplication_window=3;
SYSTEM STOP MERGES merge_tree_deduplication;
INSERT INTO merge_tree_deduplication (key, value) VALUES (1, '1');
SELECT key, value FROM merge_tree_deduplication;
INSERT INTO merge_tree_deduplication (key, value) VALUES (1, '1');
SELECT key, value FROM merge_tree_deduplication;
SELECT '===============';
INSERT INTO merge_tree_deduplication (key, value) VALUES (2, '2');
INSERT INTO merge_tree_deduplication (key, value) VALUES (3, '3');
INSERT INTO merge_tree_deduplication (key, value) VALUES (4, '4');
INSERT INTO merge_tree_deduplication (key, value) VALUES (1, '1');
SELECT key, value FROM merge_tree_deduplication ORDER BY key;
SELECT '===============';
INSERT INTO merge_tree_deduplication (key, value) VALUES (5, '5');
INSERT INTO merge_tree_deduplication (key, value) VALUES (6, '6');
INSERT INTO merge_tree_deduplication (key, value) VALUES (7, '7');
INSERT INTO merge_tree_deduplication (key, value) VALUES (5, '5');
SELECT key, value FROM merge_tree_deduplication ORDER BY key;
SELECT '===============';
INSERT INTO merge_tree_deduplication (key, value) VALUES (8, '8');
INSERT INTO merge_tree_deduplication (key, value) VALUES (9, '9');
INSERT INTO merge_tree_deduplication (key, value) VALUES (10, '10');
INSERT INTO merge_tree_deduplication (key, value) VALUES (11, '11');
INSERT INTO merge_tree_deduplication (key, value) VALUES (12, '12');
INSERT INTO merge_tree_deduplication (key, value) VALUES (10, '10');
INSERT INTO merge_tree_deduplication (key, value) VALUES (11, '11');
INSERT INTO merge_tree_deduplication (key, value) VALUES (12, '12');
SELECT key, value FROM merge_tree_deduplication ORDER BY key;
SELECT '===============';
ALTER TABLE merge_tree_deduplication DROP PART '77_9_9_0'; -- some old part
INSERT INTO merge_tree_deduplication (key, value) VALUES (10, '10');
SELECT key, value FROM merge_tree_deduplication WHERE key = 10;
ALTER TABLE merge_tree_deduplication DROP PART '77_13_13_0'; -- fresh part
INSERT INTO merge_tree_deduplication (key, value) VALUES (12, '12');
SELECT key, value FROM merge_tree_deduplication WHERE key = 12;
DETACH TABLE merge_tree_deduplication;
ATTACH TABLE merge_tree_deduplication;
OPTIMIZE TABLE merge_tree_deduplication FINAL;
INSERT INTO merge_tree_deduplication (key, value) VALUES (11, '11'); -- deduplicated
INSERT INTO merge_tree_deduplication (key, value) VALUES (12, '12'); -- deduplicated
SELECT '===============';
SELECT key, value FROM merge_tree_deduplication ORDER BY key;
SELECT '===============';
INSERT INTO merge_tree_deduplication (key, value, part) VALUES (11, '11', 88);
ALTER TABLE merge_tree_deduplication DROP PARTITION 77;
INSERT INTO merge_tree_deduplication (key, value, part) VALUES (11, '11', 88); --deduplicated
INSERT INTO merge_tree_deduplication (key, value) VALUES (11, '11'); -- not deduplicated
INSERT INTO merge_tree_deduplication (key, value) VALUES (12, '12'); -- not deduplicated
SELECT part, key, value FROM merge_tree_deduplication ORDER BY key;
DROP TABLE IF EXISTS merge_tree_deduplication;