Merge pull request #22514 from ClickHouse/merge_tree_deduplication

Non replicated merge tree deduplication
This commit is contained in:
alesapin 2021-04-07 10:19:43 +03:00 committed by GitHub
commit 9fd251eaaf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 878 additions and 24 deletions

View File

@ -1356,6 +1356,24 @@ String IMergeTreeDataPart::getUniqueId() const
return id;
}
String IMergeTreeDataPart::getZeroLevelPartBlockID() const
{
if (info.level != 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to get block id for non zero level part {}", name);
SipHash hash;
checksums.computeTotalChecksumDataOnly(hash);
union
{
char bytes[16];
UInt64 words[2];
} hash_value;
hash.get128(hash_value.bytes);
return info.partition_id + "_" + toString(hash_value.words[0]) + "_" + toString(hash_value.words[1]);
}
bool isCompactPart(const MergeTreeDataPartPtr & data_part)
{
return (data_part && data_part->getType() == MergeTreeDataPartType::COMPACT);
@ -1372,4 +1390,3 @@ bool isInMemoryPart(const MergeTreeDataPartPtr & data_part)
}
}

View File

@ -164,6 +164,9 @@ public:
bool isEmpty() const { return rows_count == 0; }
/// Compute part block id for zero level part. Otherwise throws an exception.
String getZeroLevelPartBlockID() const;
const MergeTreeData & storage;
String name;

View File

@ -35,13 +35,15 @@ void MergeTreeBlockOutputStream::write(const Block & block)
if (!part)
continue;
storage.renameTempPartAndAdd(part, &storage.increment);
/// Part can be deduplicated, so increment counters and add to part log only if it's really added
if (storage.renameTempPartAndAdd(part, &storage.increment, nullptr, storage.getDeduplicationLog()))
{
PartLog::addNewPart(storage.global_context, part, watch.elapsed());
/// Initiate async merge - it will be done if it's good time for merge and if there are space in 'background_pool'.
storage.background_executor.triggerTask();
}
}
}
}

View File

@ -71,6 +71,7 @@ namespace ProfileEvents
extern const Event RejectedInserts;
extern const Event DelayedInserts;
extern const Event DelayedInsertsMilliseconds;
extern const Event DuplicatedInsertedBlocks;
}
namespace CurrentMetrics
@ -2022,7 +2023,7 @@ MergeTreeData::DataPartsVector MergeTreeData::getActivePartsToReplace(
}
bool MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction)
bool MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction, MergeTreeDeduplicationLog * deduplication_log)
{
if (out_transaction && &out_transaction->data != this)
throw Exception("MergeTreeData::Transaction for one table cannot be used with another. It is a bug.",
@ -2031,7 +2032,7 @@ bool MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrem
DataPartsVector covered_parts;
{
auto lock = lockParts();
if (!renameTempPartAndReplace(part, increment, out_transaction, lock, &covered_parts))
if (!renameTempPartAndReplace(part, increment, out_transaction, lock, &covered_parts, deduplication_log))
return false;
}
if (!covered_parts.empty())
@ -2044,7 +2045,7 @@ bool MergeTreeData::renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrem
bool MergeTreeData::renameTempPartAndReplace(
MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction,
std::unique_lock<std::mutex> & lock, DataPartsVector * out_covered_parts)
std::unique_lock<std::mutex> & lock, DataPartsVector * out_covered_parts, MergeTreeDeduplicationLog * deduplication_log)
{
if (out_transaction && &out_transaction->data != this)
throw Exception("MergeTreeData::Transaction for one table cannot be used with another. It is a bug.",
@ -2099,6 +2100,22 @@ bool MergeTreeData::renameTempPartAndReplace(
return false;
}
/// Deduplication log used only from non-replicated MergeTree. Replicated
/// tables have their own mechanism. We try to deduplicate at such deep
/// level, because only here we know real part name which is required for
/// deduplication.
if (deduplication_log)
{
String block_id = part->getZeroLevelPartBlockID();
auto res = deduplication_log->addPart(block_id, part_info);
if (!res.second)
{
ProfileEvents::increment(ProfileEvents::DuplicatedInsertedBlocks);
LOG_INFO(log, "Block with ID {} already exists as part {}; ignoring it", block_id, res.first.getPartName());
return false;
}
}
/// All checks are passed. Now we can rename the part on disk.
/// So, we maintain invariant: if a non-temporary part in filesystem then it is in data_parts
///
@ -2155,7 +2172,7 @@ bool MergeTreeData::renameTempPartAndReplace(
}
MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction)
MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction, MergeTreeDeduplicationLog * deduplication_log)
{
if (out_transaction && &out_transaction->data != this)
throw Exception("MergeTreeData::Transaction for one table cannot be used with another. It is a bug.",
@ -2164,7 +2181,7 @@ MergeTreeData::DataPartsVector MergeTreeData::renameTempPartAndReplace(
DataPartsVector covered_parts;
{
auto lock = lockParts();
renameTempPartAndReplace(part, increment, out_transaction, lock, &covered_parts);
renameTempPartAndReplace(part, increment, out_transaction, lock, &covered_parts, deduplication_log);
}
return covered_parts;
}

View File

@ -54,6 +54,7 @@ struct CurrentlySubmergingEmergingTagger;
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
using ManyExpressionActions = std::vector<ExpressionActionsPtr>;
class MergeTreeDeduplicationLog;
namespace ErrorCodes
{
@ -447,18 +448,18 @@ public:
/// active set later with out_transaction->commit()).
/// Else, commits the part immediately.
/// Returns true if part was added. Returns false if part is covered by bigger part.
bool renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrement * increment = nullptr, Transaction * out_transaction = nullptr);
bool renameTempPartAndAdd(MutableDataPartPtr & part, SimpleIncrement * increment = nullptr, Transaction * out_transaction = nullptr, MergeTreeDeduplicationLog * deduplication_log = nullptr);
/// The same as renameTempPartAndAdd but the block range of the part can contain existing parts.
/// Returns all parts covered by the added part (in ascending order).
/// If out_transaction == nullptr, marks covered parts as Outdated.
DataPartsVector renameTempPartAndReplace(
MutableDataPartPtr & part, SimpleIncrement * increment = nullptr, Transaction * out_transaction = nullptr);
MutableDataPartPtr & part, SimpleIncrement * increment = nullptr, Transaction * out_transaction = nullptr, MergeTreeDeduplicationLog * deduplication_log = nullptr);
/// Low-level version of previous one, doesn't lock mutex
bool renameTempPartAndReplace(
MutableDataPartPtr & part, SimpleIncrement * increment, Transaction * out_transaction, DataPartsLock & lock,
DataPartsVector * out_covered_parts = nullptr);
DataPartsVector * out_covered_parts = nullptr, MergeTreeDeduplicationLog * deduplication_log = nullptr);
/// Remove parts from working set immediately (without wait for background

View File

@ -0,0 +1,311 @@
#include <Storages/MergeTree/MergeTreeDeduplicationLog.h>
#include <filesystem>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/join.hpp>
#include <boost/algorithm/string/trim.hpp>
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
namespace DB
{
namespace
{
/// Deduplication operation part was dropped or added
enum class MergeTreeDeduplicationOp : uint8_t
{
ADD = 1,
DROP = 2,
};
/// Record for deduplication on disk
struct MergeTreeDeduplicationLogRecord
{
MergeTreeDeduplicationOp operation;
std::string part_name;
std::string block_id;
};
void writeRecord(const MergeTreeDeduplicationLogRecord & record, WriteBuffer & out)
{
writeIntText(static_cast<uint8_t>(record.operation), out);
writeChar('\t', out);
writeString(record.part_name, out);
writeChar('\t', out);
writeString(record.block_id, out);
writeChar('\n', out);
out.next();
}
void readRecord(MergeTreeDeduplicationLogRecord & record, ReadBuffer & in)
{
uint8_t op;
readIntText(op, in);
record.operation = static_cast<MergeTreeDeduplicationOp>(op);
assertChar('\t', in);
readString(record.part_name, in);
assertChar('\t', in);
readString(record.block_id, in);
assertChar('\n', in);
}
std::string getLogPath(const std::string & prefix, size_t number)
{
std::filesystem::path path(prefix);
path /= std::filesystem::path(std::string{"deduplication_log_"} + std::to_string(number) + ".txt");
return path;
}
size_t getLogNumber(const std::string & path_str)
{
std::filesystem::path path(path_str);
std::string filename = path.stem();
Strings filename_parts;
boost::split(filename_parts, filename, boost::is_any_of("_"));
return parse<size_t>(filename_parts[2]);
}
}
MergeTreeDeduplicationLog::MergeTreeDeduplicationLog(
const std::string & logs_dir_,
size_t deduplication_window_,
const MergeTreeDataFormatVersion & format_version_)
: logs_dir(logs_dir_)
, deduplication_window(deduplication_window_)
, rotate_interval(deduplication_window_ * 2) /// actually it doesn't matter
, format_version(format_version_)
, deduplication_map(deduplication_window)
{
namespace fs = std::filesystem;
if (deduplication_window != 0 && !fs::exists(logs_dir))
fs::create_directories(logs_dir);
}
void MergeTreeDeduplicationLog::load()
{
namespace fs = std::filesystem;
if (!fs::exists(logs_dir))
return;
for (const auto & p : fs::directory_iterator(logs_dir))
{
const auto & path = p.path();
auto log_number = getLogNumber(path);
existing_logs[log_number] = {path, 0};
}
/// We should know which logs are exist even in case
/// of deduplication_window = 0
if (!existing_logs.empty())
current_log_number = existing_logs.rbegin()->first;
if (deduplication_window != 0)
{
/// Order important, we load history from the begging to the end
for (auto & [log_number, desc] : existing_logs)
{
try
{
desc.entries_count = loadSingleLog(desc.path);
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__, "Error while loading MergeTree deduplication log on path " + desc.path);
}
}
/// Start new log, drop previous
rotateAndDropIfNeeded();
/// Can happen in case we have unfinished log
if (!current_writer)
current_writer = std::make_unique<WriteBufferFromFile>(existing_logs.rbegin()->second.path, DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY);
}
}
size_t MergeTreeDeduplicationLog::loadSingleLog(const std::string & path)
{
ReadBufferFromFile read_buf(path);
size_t total_entries = 0;
while (!read_buf.eof())
{
MergeTreeDeduplicationLogRecord record;
readRecord(record, read_buf);
if (record.operation == MergeTreeDeduplicationOp::DROP)
deduplication_map.erase(record.block_id);
else
deduplication_map.insert(record.block_id, MergeTreePartInfo::fromPartName(record.part_name, format_version));
total_entries++;
}
return total_entries;
}
void MergeTreeDeduplicationLog::rotate()
{
/// We don't deduplicate anything so we don't need any writers
if (deduplication_window == 0)
return;
current_log_number++;
auto new_path = getLogPath(logs_dir, current_log_number);
MergeTreeDeduplicationLogNameDescription log_description{new_path, 0};
existing_logs.emplace(current_log_number, log_description);
if (current_writer)
current_writer->sync();
current_writer = std::make_unique<WriteBufferFromFile>(log_description.path, DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY);
}
void MergeTreeDeduplicationLog::dropOutdatedLogs()
{
size_t current_sum = 0;
size_t remove_from_value = 0;
/// Go from end to the beginning
for (auto itr = existing_logs.rbegin(); itr != existing_logs.rend(); ++itr)
{
if (current_sum > deduplication_window)
{
/// We have more logs than required, all older files (including current) can be dropped
remove_from_value = itr->first;
break;
}
auto & description = itr->second;
current_sum += description.entries_count;
}
/// If we found some logs to drop
if (remove_from_value != 0)
{
/// Go from the beginning to the end and drop all outdated logs
for (auto itr = existing_logs.begin(); itr != existing_logs.end();)
{
size_t number = itr->first;
std::filesystem::remove(itr->second.path);
itr = existing_logs.erase(itr);
if (remove_from_value == number)
break;
}
}
}
void MergeTreeDeduplicationLog::rotateAndDropIfNeeded()
{
/// If we don't have logs at all or already have enough records in current
if (existing_logs.empty() || existing_logs[current_log_number].entries_count >= rotate_interval)
{
rotate();
dropOutdatedLogs();
}
}
std::pair<MergeTreePartInfo, bool> MergeTreeDeduplicationLog::addPart(const std::string & block_id, const MergeTreePartInfo & part_info)
{
std::lock_guard lock(state_mutex);
/// We support zero case because user may want to disable deduplication with
/// ALTER MODIFY SETTING query. It's much more simpler to handle zero case
/// here then destroy whole object, check for null pointer from different
/// threads and so on.
if (deduplication_window == 0)
return std::make_pair(part_info, true);
/// If we already have this block let's deduplicate it
if (deduplication_map.contains(block_id))
{
auto info = deduplication_map.get(block_id);
return std::make_pair(info, false);
}
assert(current_writer != nullptr);
/// Create new record
MergeTreeDeduplicationLogRecord record;
record.operation = MergeTreeDeduplicationOp::ADD;
record.part_name = part_info.getPartName();
record.block_id = block_id;
/// Write it to disk
writeRecord(record, *current_writer);
/// We have one more record in current log
existing_logs[current_log_number].entries_count++;
/// Add to deduplication map
deduplication_map.insert(record.block_id, part_info);
/// Rotate and drop old logs if needed
rotateAndDropIfNeeded();
return std::make_pair(part_info, true);
}
void MergeTreeDeduplicationLog::dropPart(const MergeTreePartInfo & drop_part_info)
{
std::lock_guard lock(state_mutex);
/// We support zero case because user may want to disable deduplication with
/// ALTER MODIFY SETTING query. It's much more simpler to handle zero case
/// here then destroy whole object, check for null pointer from different
/// threads and so on.
if (deduplication_window == 0)
return;
assert(current_writer != nullptr);
for (auto itr = deduplication_map.begin(); itr != deduplication_map.end(); /* no increment here, we erasing from map */)
{
const auto & part_info = itr->value;
/// Part is covered by dropped part, let's remove it from
/// deduplication history
if (drop_part_info.contains(part_info))
{
/// Create drop record
MergeTreeDeduplicationLogRecord record;
record.operation = MergeTreeDeduplicationOp::DROP;
record.part_name = part_info.getPartName();
record.block_id = itr->key;
/// Write it to disk
writeRecord(record, *current_writer);
/// We have one more record on disk
existing_logs[current_log_number].entries_count++;
/// Increment itr before erase, otherwise it will invalidated
++itr;
/// Remove block_id from in-memory table
deduplication_map.erase(record.block_id);
/// Rotate and drop old logs if needed
rotateAndDropIfNeeded();
}
else
{
++itr;
}
}
}
void MergeTreeDeduplicationLog::setDeduplicationWindowSize(size_t deduplication_window_)
{
std::lock_guard lock(state_mutex);
deduplication_window = deduplication_window_;
rotate_interval = deduplication_window * 2;
/// If settings was set for the first time with ALTER MODIFY SETTING query
if (deduplication_window != 0 && !std::filesystem::exists(logs_dir))
std::filesystem::create_directories(logs_dir);
deduplication_map.setMaxSize(deduplication_window);
rotateAndDropIfNeeded();
/// Can happen in case we have unfinished log
if (!current_writer)
current_writer = std::make_unique<WriteBufferFromFile>(existing_logs.rbegin()->second.path, DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY);
}
}

View File

@ -0,0 +1,192 @@
#pragma once
#include <Core/Types.h>
#include <common/StringRef.h>
#include <IO/WriteBufferFromFile.h>
#include <Storages/MergeTree/MergeTreePartInfo.h>
#include <map>
#include <list>
#include <mutex>
#include <string>
#include <unordered_map>
namespace DB
{
/// Description of dedupliction log
struct MergeTreeDeduplicationLogNameDescription
{
/// Path to log
std::string path;
/// How many entries we have in log
size_t entries_count;
};
/// Simple string-key HashTable with fixed size based on STL containers.
/// Preserves order using linked list and remove elements
/// on overflow in FIFO order.
template <typename V>
class LimitedOrderedHashMap
{
private:
struct ListNode
{
std::string key;
V value;
};
using Queue = std::list<ListNode>;
using IndexMap = std::unordered_map<StringRef, typename Queue::iterator, StringRefHash>;
Queue queue;
IndexMap map;
size_t max_size;
public:
using iterator = typename Queue::iterator;
using const_iterator = typename Queue::const_iterator;
using reverse_iterator = typename Queue::reverse_iterator;
using const_reverse_iterator = typename Queue::const_reverse_iterator;
explicit LimitedOrderedHashMap(size_t max_size_)
: max_size(max_size_)
{}
bool contains(const std::string & key) const
{
return map.find(key) != map.end();
}
V get(const std::string & key) const
{
return map.at(key)->value;
}
size_t size() const
{
return queue.size();
}
void setMaxSize(size_t max_size_)
{
max_size = max_size_;
while (size() > max_size)
{
map.erase(queue.front().key);
queue.pop_front();
}
}
bool erase(const std::string & key)
{
auto it = map.find(key);
if (it == map.end())
return false;
auto queue_itr = it->second;
map.erase(it);
queue.erase(queue_itr);
return true;
}
bool insert(const std::string & key, const V & value)
{
auto it = map.find(key);
if (it != map.end())
return false;
if (size() == max_size)
{
map.erase(queue.front().key);
queue.pop_front();
}
ListNode elem{key, value};
auto itr = queue.insert(queue.end(), elem);
map.emplace(itr->key, itr);
return true;
}
void clear()
{
map.clear();
queue.clear();
}
iterator begin() { return queue.begin(); }
const_iterator begin() const { return queue.cbegin(); }
iterator end() { return queue.end(); }
const_iterator end() const { return queue.cend(); }
reverse_iterator rbegin() { return queue.rbegin(); }
const_reverse_iterator rbegin() const { return queue.crbegin(); }
reverse_iterator rend() { return queue.rend(); }
const_reverse_iterator rend() const { return queue.crend(); }
};
/// Fixed-size log for deduplication in non-replicated MergeTree.
/// Stores records on disk for zero-level parts in human-readable format:
/// operation part_name partition_id_check_sum
/// 1 88_18_18_0 88_10619499460461868496_9553701830997749308
/// 2 77_14_14_0 77_15147918179036854170_6725063583757244937
/// 2 77_15_15_0 77_14977227047908934259_8047656067364802772
/// 1 77_20_20_0 77_15147918179036854170_6725063583757244937
/// Also stores them in memory in hash table with limited size.
class MergeTreeDeduplicationLog
{
public:
MergeTreeDeduplicationLog(
const std::string & logs_dir_,
size_t deduplication_window_,
const MergeTreeDataFormatVersion & format_version_);
/// Add part into in-memory hash table and to disk
/// Return true and part info if insertion was successful.
/// Otherwise, in case of duplicate, return false and previous part name with same hash (useful for logging)
std::pair<MergeTreePartInfo, bool> addPart(const std::string & block_id, const MergeTreePartInfo & part);
/// Remove all covered parts from in memory table and add DROP records to the disk
void dropPart(const MergeTreePartInfo & drop_part_info);
/// Load history from disk. Ignores broken logs.
void load();
void setDeduplicationWindowSize(size_t deduplication_window_);
private:
const std::string logs_dir;
/// Size of deduplication window
size_t deduplication_window;
/// How often we create new logs. Not very important,
/// default value equals deduplication_window * 2
size_t rotate_interval;
const MergeTreeDataFormatVersion format_version;
/// Current log number. Always growing number.
size_t current_log_number = 0;
/// All existing logs in order of their numbers
std::map<size_t, MergeTreeDeduplicationLogNameDescription> existing_logs;
/// In memory hash-table
LimitedOrderedHashMap<MergeTreePartInfo> deduplication_map;
/// Writer to the current log file
std::unique_ptr<WriteBufferFromFile> current_writer;
/// Overall mutex because we can have a lot of cocurrent inserts
std::mutex state_mutex;
/// Start new log
void rotate();
/// Remove all old logs with non-needed records for deduplication_window
void dropOutdatedLogs();
/// Execute both previous methods if needed
void rotateAndDropIfNeeded();
/// Load single log from disk. In case of corruption throws exceptions
size_t loadSingleLog(const std::string & path);
};
}

View File

@ -2,6 +2,7 @@
#include <Core/Defines.h>
#include <Core/BaseSettings.h>
#include <Storages/MergeTree/MergeTreeDataFormatVersion.h>
namespace Poco::Util
@ -54,6 +55,7 @@ struct Settings;
M(UInt64, write_ahead_log_bytes_to_fsync, 100ULL * 1024 * 1024, "Amount of bytes, accumulated in WAL to do fsync.", 0) \
M(UInt64, write_ahead_log_interval_ms_to_fsync, 100, "Interval in milliseconds after which fsync for WAL is being done.", 0) \
M(Bool, in_memory_parts_insert_sync, false, "If true insert of part with in-memory format will wait for fsync of WAL", 0) \
M(UInt64, non_replicated_deduplication_window, 0, "How many last blocks of hashes should be kept on disk (0 - disabled).", 0) \
\
/** Inserts settings. */ \
M(UInt64, parts_to_delay_insert, 150, "If table contains at least that many active parts in single partition, artificially slow down insert into table.", 0) \

View File

@ -155,18 +155,9 @@ void ReplicatedMergeTreeBlockOutputStream::write(const Block & block)
if (deduplicate)
{
SipHash hash;
part->checksums.computeTotalChecksumDataOnly(hash);
union
{
char bytes[16];
UInt64 words[2];
} hash_value;
hash.get128(hash_value.bytes);
/// We add the hash from the data and partition identifier to deduplication ID.
/// That is, do not insert the same data to the same partition twice.
block_id = part->info.partition_id + "_" + toString(hash_value.words[0]) + "_" + toString(hash_value.words[1]);
block_id = part->getZeroLevelPartBlockID();
LOG_DEBUG(log, "Wrote block with ID '{}', {} rows", block_id, current_block.block.rows());
}

View File

@ -93,6 +93,8 @@ StorageMergeTree::StorageMergeTree(
increment.set(getMaxBlockNumber());
loadMutations();
loadDeduplicationLog();
}
@ -265,6 +267,7 @@ void StorageMergeTree::alter(
TableLockHolder & table_lock_holder)
{
auto table_id = getStorageID();
auto old_storage_settings = getSettings();
StorageInMemoryMetadata new_metadata = getInMemoryMetadata();
StorageInMemoryMetadata old_metadata = getInMemoryMetadata();
@ -299,6 +302,21 @@ void StorageMergeTree::alter(
if (!maybe_mutation_commands.empty())
waitForMutation(mutation_version, mutation_file_name);
}
{
/// Some additional changes in settings
auto new_storage_settings = getSettings();
if (old_storage_settings->non_replicated_deduplication_window != new_storage_settings->non_replicated_deduplication_window)
{
/// We cannot place this check into settings sanityCheck because it depends on format_version.
/// sanityCheck must work event without storage.
if (new_storage_settings->non_replicated_deduplication_window != 0 && format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
throw Exception("Deduplication for non-replicated MergeTree in old syntax is not supported", ErrorCodes::BAD_ARGUMENTS);
deduplication_log->setDeduplicationWindowSize(new_storage_settings->non_replicated_deduplication_window);
}
}
}
@ -614,6 +632,16 @@ CancellationCode StorageMergeTree::killMutation(const String & mutation_id)
return CancellationCode::CancelSent;
}
void StorageMergeTree::loadDeduplicationLog()
{
auto settings = getSettings();
if (settings->non_replicated_deduplication_window != 0 && format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
throw Exception("Deduplication for non-replicated MergeTree in old syntax is not supported", ErrorCodes::BAD_ARGUMENTS);
std::string path = getDataPaths()[0] + "/deduplication_logs";
deduplication_log = std::make_unique<MergeTreeDeduplicationLog>(path, settings->non_replicated_deduplication_window, format_version);
deduplication_log->load();
}
void StorageMergeTree::loadMutations()
{
@ -1209,6 +1237,12 @@ void StorageMergeTree::dropPartition(const ASTPtr & partition, bool detach, bool
}
}
if (deduplication_log)
{
for (const auto & part : parts_to_remove)
deduplication_log->dropPart(part->info);
}
if (detach)
LOG_INFO(log, "Detached {} parts.", parts_to_remove.size());
else

View File

@ -12,6 +12,8 @@
#include <Storages/MergeTree/MergeTreePartsMover.h>
#include <Storages/MergeTree/MergeTreeMutationEntry.h>
#include <Storages/MergeTree/MergeTreeMutationStatus.h>
#include <Storages/MergeTree/MergeTreeDeduplicationLog.h>
#include <Disks/StoragePolicy.h>
#include <Common/SimpleIncrement.h>
#include <Storages/MergeTree/BackgroundJobsExecutor.h>
@ -93,6 +95,8 @@ public:
CheckResults checkData(const ASTPtr & query, const Context & context) override;
std::optional<JobAndPool> getDataProcessingJob() override;
MergeTreeDeduplicationLog * getDeduplicationLog() { return deduplication_log.get(); }
private:
/// Mutex and condvar for synchronous mutations wait
@ -105,6 +109,8 @@ private:
BackgroundJobsExecutor background_executor;
BackgroundMovesExecutor background_moves_executor;
std::unique_ptr<MergeTreeDeduplicationLog> deduplication_log;
/// For block numbers.
SimpleIncrement increment;
@ -128,6 +134,10 @@ private:
void loadMutations();
/// Load and initialize deduplication logs. Even if deduplication setting
/// equals zero creates object with deduplication window equals zero.
void loadDeduplicationLog();
/** Determines what parts should be merged and merges it.
* If aggressive - when selects parts don't takes into account their ratio size and novelty (used for OPTIMIZE query).
* Returns true if merge is finished successfully.

View File

@ -57,6 +57,7 @@ SRCS(
MergeTree/MergeTreeDataPartWriterWide.cpp
MergeTree/MergeTreeDataSelectExecutor.cpp
MergeTree/MergeTreeDataWriter.cpp
MergeTree/MergeTreeDeduplicationLog.cpp
MergeTree/MergeTreeIndexAggregatorBloomFilter.cpp
MergeTree/MergeTreeIndexBloomFilter.cpp
MergeTree/MergeTreeIndexConditionBloomFilter.cpp

View File

@ -0,0 +1,85 @@
1 1
1 1
===============
1 1
1 1
2 2
3 3
4 4
===============
1 1
1 1
2 2
3 3
4 4
5 5
6 6
7 7
===============
1 1
1 1
2 2
3 3
4 4
5 5
6 6
7 7
8 8
9 9
10 10
11 11
12 12
===============
10 10
12 12
===============
1 1
1 1
2 2
3 3
4 4
5 5
6 6
8 8
9 9
11 11
12 12
===============
88 11 11
77 11 11
77 12 12
===============
1 1 33
1 1 33
2 2 33
3 3 33
===============
1 1 33
1 1 33
1 1 33
1 1 33
2 2 33
3 3 33
===============
1 1 33
1 1 33
1 1 33
1 1 33
1 1 33
2 2 33
3 3 33
===============
1 1 44
2 2 44
3 3 44
4 4 44
===============
1 1
1 1
===============
1 1
1 1
1 1
2 2
3 3
4 4

View File

@ -0,0 +1,187 @@
DROP TABLE IF EXISTS merge_tree_deduplication;
CREATE TABLE merge_tree_deduplication
(
key UInt64,
value String,
part UInt8 DEFAULT 77
)
ENGINE=MergeTree()
ORDER BY key
PARTITION BY part
SETTINGS non_replicated_deduplication_window=3;
SYSTEM STOP MERGES merge_tree_deduplication;
INSERT INTO merge_tree_deduplication (key, value) VALUES (1, '1');
SELECT key, value FROM merge_tree_deduplication;
INSERT INTO merge_tree_deduplication (key, value) VALUES (1, '1');
SELECT key, value FROM merge_tree_deduplication;
SELECT '===============';
INSERT INTO merge_tree_deduplication (key, value) VALUES (2, '2');
INSERT INTO merge_tree_deduplication (key, value) VALUES (3, '3');
INSERT INTO merge_tree_deduplication (key, value) VALUES (4, '4');
INSERT INTO merge_tree_deduplication (key, value) VALUES (1, '1');
SELECT key, value FROM merge_tree_deduplication ORDER BY key;
SELECT '===============';
INSERT INTO merge_tree_deduplication (key, value) VALUES (5, '5');
INSERT INTO merge_tree_deduplication (key, value) VALUES (6, '6');
INSERT INTO merge_tree_deduplication (key, value) VALUES (7, '7');
INSERT INTO merge_tree_deduplication (key, value) VALUES (5, '5');
SELECT key, value FROM merge_tree_deduplication ORDER BY key;
SELECT '===============';
INSERT INTO merge_tree_deduplication (key, value) VALUES (8, '8');
INSERT INTO merge_tree_deduplication (key, value) VALUES (9, '9');
INSERT INTO merge_tree_deduplication (key, value) VALUES (10, '10');
INSERT INTO merge_tree_deduplication (key, value) VALUES (11, '11');
INSERT INTO merge_tree_deduplication (key, value) VALUES (12, '12');
INSERT INTO merge_tree_deduplication (key, value) VALUES (10, '10');
INSERT INTO merge_tree_deduplication (key, value) VALUES (11, '11');
INSERT INTO merge_tree_deduplication (key, value) VALUES (12, '12');
SELECT key, value FROM merge_tree_deduplication ORDER BY key;
SELECT '===============';
ALTER TABLE merge_tree_deduplication DROP PART '77_9_9_0'; -- some old part
INSERT INTO merge_tree_deduplication (key, value) VALUES (10, '10');
SELECT key, value FROM merge_tree_deduplication WHERE key = 10;
ALTER TABLE merge_tree_deduplication DROP PART '77_13_13_0'; -- fresh part
INSERT INTO merge_tree_deduplication (key, value) VALUES (12, '12');
SELECT key, value FROM merge_tree_deduplication WHERE key = 12;
DETACH TABLE merge_tree_deduplication;
ATTACH TABLE merge_tree_deduplication;
OPTIMIZE TABLE merge_tree_deduplication FINAL;
INSERT INTO merge_tree_deduplication (key, value) VALUES (11, '11'); -- deduplicated
INSERT INTO merge_tree_deduplication (key, value) VALUES (12, '12'); -- deduplicated
SELECT '===============';
SELECT key, value FROM merge_tree_deduplication ORDER BY key;
SELECT '===============';
INSERT INTO merge_tree_deduplication (key, value, part) VALUES (11, '11', 88);
ALTER TABLE merge_tree_deduplication DROP PARTITION 77;
INSERT INTO merge_tree_deduplication (key, value, part) VALUES (11, '11', 88); --deduplicated
INSERT INTO merge_tree_deduplication (key, value) VALUES (11, '11'); -- not deduplicated
INSERT INTO merge_tree_deduplication (key, value) VALUES (12, '12'); -- not deduplicated
SELECT part, key, value FROM merge_tree_deduplication ORDER BY key;
-- Alters....
ALTER TABLE merge_tree_deduplication MODIFY SETTING non_replicated_deduplication_window = 2;
SELECT '===============';
INSERT INTO merge_tree_deduplication (key, value, part) VALUES (1, '1', 33);
INSERT INTO merge_tree_deduplication (key, value, part) VALUES (2, '2', 33);
INSERT INTO merge_tree_deduplication (key, value, part) VALUES (3, '3', 33);
INSERT INTO merge_tree_deduplication (key, value, part) VALUES (1, '1', 33);
SELECT * FROM merge_tree_deduplication WHERE part = 33 ORDER BY key;
SELECT '===============';
ALTER TABLE merge_tree_deduplication MODIFY SETTING non_replicated_deduplication_window = 0;
INSERT INTO merge_tree_deduplication (key, value, part) VALUES (1, '1', 33);
INSERT INTO merge_tree_deduplication (key, value, part) VALUES (1, '1', 33);
DETACH TABLE merge_tree_deduplication;
ATTACH TABLE merge_tree_deduplication;
SELECT * FROM merge_tree_deduplication WHERE part = 33 ORDER BY key;
SELECT '===============';
ALTER TABLE merge_tree_deduplication MODIFY SETTING non_replicated_deduplication_window = 3;
INSERT INTO merge_tree_deduplication (key, value, part) VALUES (1, '1', 33);
SELECT * FROM merge_tree_deduplication WHERE part = 33 ORDER BY key;
SELECT '===============';
INSERT INTO merge_tree_deduplication (key, value, part) VALUES (1, '1', 44);
INSERT INTO merge_tree_deduplication (key, value, part) VALUES (2, '2', 44);
INSERT INTO merge_tree_deduplication (key, value, part) VALUES (3, '3', 44);
INSERT INTO merge_tree_deduplication (key, value, part) VALUES (1, '1', 44);
INSERT INTO merge_tree_deduplication (key, value, part) VALUES (4, '4', 44);
DETACH TABLE merge_tree_deduplication;
ATTACH TABLE merge_tree_deduplication;
SELECT * FROM merge_tree_deduplication WHERE part = 44 ORDER BY key;
DROP TABLE IF EXISTS merge_tree_deduplication;
SELECT '===============';
DROP TABLE IF EXISTS merge_tree_no_deduplication;
CREATE TABLE merge_tree_no_deduplication
(
key UInt64,
value String
)
ENGINE=MergeTree()
ORDER BY key;
INSERT INTO merge_tree_no_deduplication (key, value) VALUES (1, '1');
INSERT INTO merge_tree_no_deduplication (key, value) VALUES (1, '1');
SELECT * FROM merge_tree_no_deduplication ORDER BY key;
SELECT '===============';
ALTER TABLE merge_tree_no_deduplication MODIFY SETTING non_replicated_deduplication_window = 3;
INSERT INTO merge_tree_no_deduplication (key, value) VALUES (1, '1');
INSERT INTO merge_tree_no_deduplication (key, value) VALUES (2, '2');
INSERT INTO merge_tree_no_deduplication (key, value) VALUES (3, '3');
DETACH TABLE merge_tree_no_deduplication;
ATTACH TABLE merge_tree_no_deduplication;
INSERT INTO merge_tree_no_deduplication (key, value) VALUES (1, '1');
INSERT INTO merge_tree_no_deduplication (key, value) VALUES (4, '4');
SELECT * FROM merge_tree_no_deduplication ORDER BY key;
DROP TABLE IF EXISTS merge_tree_no_deduplication;

View File

@ -148,6 +148,7 @@
"00626_replace_partition_from_table",
"00152_insert_different_granularity",
"00054_merge_tree_partitions",
"01781_merge_tree_deduplication",
/// Old syntax is not allowed
"01062_alter_on_mutataion_zookeeper",
"00925_zookeeper_empty_replicated_merge_tree_optimize_final",