Support alter setting

This commit is contained in:
alesapin 2021-04-06 13:14:44 +03:00
parent 6afa8abb0e
commit 6456a1507c
7 changed files with 231 additions and 32 deletions

View File

@ -80,13 +80,17 @@ MergeTreeDeduplicationLog::MergeTreeDeduplicationLog(
, rotate_interval(deduplication_window_ * 2) /// actually it doesn't matter , rotate_interval(deduplication_window_ * 2) /// actually it doesn't matter
, format_version(format_version_) , format_version(format_version_)
, deduplication_map(deduplication_window) , deduplication_map(deduplication_window)
{} {
namespace fs = std::filesystem;
if (deduplication_window != 0 && !fs::exists(logs_dir))
fs::create_directories(logs_dir);
}
void MergeTreeDeduplicationLog::load() void MergeTreeDeduplicationLog::load()
{ {
namespace fs = std::filesystem; namespace fs = std::filesystem;
if (!fs::exists(logs_dir)) if (!fs::exists(logs_dir))
fs::create_directories(logs_dir); return;
for (const auto & p : fs::directory_iterator(logs_dir)) for (const auto & p : fs::directory_iterator(logs_dir))
{ {
@ -95,26 +99,33 @@ void MergeTreeDeduplicationLog::load()
existing_logs[log_number] = {path, 0}; existing_logs[log_number] = {path, 0};
} }
/// Order important, we load history from the begging to the end /// We should know which logs are exist even in case
for (auto & [log_number, desc] : existing_logs) /// of deduplication_window = 0
if (!existing_logs.empty())
current_log_number = existing_logs.rbegin()->first;
if (deduplication_window != 0)
{ {
try /// Order important, we load history from the begging to the end
for (auto & [log_number, desc] : existing_logs)
{ {
desc.entries_count = loadSingleLog(desc.path); try
current_log_number = log_number; {
} desc.entries_count = loadSingleLog(desc.path);
catch (...) }
{ catch (...)
tryLogCurrentException(__PRETTY_FUNCTION__, "Error while loading MergeTree deduplication log on path " + desc.path); {
tryLogCurrentException(__PRETTY_FUNCTION__, "Error while loading MergeTree deduplication log on path " + desc.path);
}
} }
/// Start new log, drop previous
rotateAndDropIfNeeded();
/// Can happen in case we have unfinished log
if (!current_writer)
current_writer = std::make_unique<WriteBufferFromFile>(existing_logs.rbegin()->second.path, DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY);
} }
/// Start new log, drop previous
rotateAndDropIfNeeded();
/// Can happen in case we have unfinished log
if (!current_writer)
current_writer = std::make_unique<WriteBufferFromFile>(existing_logs.rbegin()->second.path, DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY);
} }
size_t MergeTreeDeduplicationLog::loadSingleLog(const std::string & path) size_t MergeTreeDeduplicationLog::loadSingleLog(const std::string & path)
@ -137,6 +148,10 @@ size_t MergeTreeDeduplicationLog::loadSingleLog(const std::string & path)
void MergeTreeDeduplicationLog::rotate() void MergeTreeDeduplicationLog::rotate()
{ {
/// We don't deduplicate anything so we don't need any writers
if (deduplication_window == 0)
return;
current_log_number++; current_log_number++;
auto new_path = getLogPath(logs_dir, current_log_number); auto new_path = getLogPath(logs_dir, current_log_number);
MergeTreeDeduplicationLogNameDescription log_description{new_path, 0}; MergeTreeDeduplicationLogNameDescription log_description{new_path, 0};
@ -169,7 +184,7 @@ void MergeTreeDeduplicationLog::dropOutdatedLogs()
/// If we found some logs to drop /// If we found some logs to drop
if (remove_from_value != 0) if (remove_from_value != 0)
{ {
/// Go from beginning to the end and drop all outdated logs /// Go from the beginning to the end and drop all outdated logs
for (auto itr = existing_logs.begin(); itr != existing_logs.end();) for (auto itr = existing_logs.begin(); itr != existing_logs.end();)
{ {
size_t number = itr->first; size_t number = itr->first;
@ -196,6 +211,13 @@ std::pair<MergeTreePartInfo, bool> MergeTreeDeduplicationLog::addPart(const std:
{ {
std::lock_guard lock(state_mutex); std::lock_guard lock(state_mutex);
/// We support zero case because user may want to disable deduplication with
/// ALTER MODIFY SETTING query. It's much more simplier to handle zero case
/// here then destroy whole object, check for null pointer from different
/// threads and so on.
if (deduplication_window == 0)
return std::make_pair(part_info, true);
/// If we already have this block let's deduplicate it /// If we already have this block let's deduplicate it
if (deduplication_map.contains(block_id)) if (deduplication_map.contains(block_id))
{ {
@ -226,6 +248,13 @@ void MergeTreeDeduplicationLog::dropPart(const MergeTreePartInfo & drop_part_inf
{ {
std::lock_guard lock(state_mutex); std::lock_guard lock(state_mutex);
/// We support zero case because user may want to disable deduplication with
/// ALTER MODIFY SETTING query. It's much more simplier to handle zero case
/// here then destroy whole object, check for null pointer from different
/// threads and so on.
if (deduplication_window == 0)
return;
assert(current_writer != nullptr); assert(current_writer != nullptr);
for (auto itr = deduplication_map.begin(); itr != deduplication_map.end(); /* no increment here, we erasing from map */) for (auto itr = deduplication_map.begin(); itr != deduplication_map.end(); /* no increment here, we erasing from map */)
@ -260,4 +289,23 @@ void MergeTreeDeduplicationLog::dropPart(const MergeTreePartInfo & drop_part_inf
} }
} }
void MergeTreeDeduplicationLog::setDeduplicationWindowSize(size_t deduplication_window_)
{
std::lock_guard lock(state_mutex);
deduplication_window = deduplication_window_;
rotate_interval = deduplication_window * 2;
/// If settings was set for the first time with ALTER MODIFY SETTING query
if (deduplication_window != 0 && !std::filesystem::exists(logs_dir))
std::filesystem::create_directories(logs_dir);
deduplication_map.setMaxSize(deduplication_window);
rotateAndDropIfNeeded();
/// Can happen in case we have unfinished log
if (!current_writer)
current_writer = std::make_unique<WriteBufferFromFile>(existing_logs.rbegin()->second.path, DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY);
}
} }

View File

@ -39,7 +39,7 @@ private:
Queue queue; Queue queue;
IndexMap map; IndexMap map;
const size_t max_size; size_t max_size;
public: public:
using iterator = typename Queue::iterator; using iterator = typename Queue::iterator;
using const_iterator = typename Queue::const_iterator; using const_iterator = typename Queue::const_iterator;
@ -65,6 +65,16 @@ public:
return queue.size(); return queue.size();
} }
void setMaxSize(size_t max_size_)
{
max_size = max_size_;
while (size() > max_size)
{
map.erase(queue.front().key);
queue.pop_front();
}
}
bool erase(const std::string & key) bool erase(const std::string & key)
{ {
auto it = map.find(key); auto it = map.find(key);
@ -139,14 +149,16 @@ public:
/// Load history from disk. Ignores broken logs. /// Load history from disk. Ignores broken logs.
void load(); void load();
void setDeduplicationWindowSize(size_t deduplication_window_);
private: private:
const std::string logs_dir; const std::string logs_dir;
/// Size of deduplication window /// Size of deduplication window
const size_t deduplication_window; size_t deduplication_window;
/// How often we create new logs. Not very important, /// How often we create new logs. Not very important,
/// default value equals deduplication_window * 2 /// default value equals deduplication_window * 2
const size_t rotate_interval; size_t rotate_interval;
const MergeTreeDataFormatVersion format_version; const MergeTreeDataFormatVersion format_version;
/// Current log number. Always growing number. /// Current log number. Always growing number.

View File

@ -2,6 +2,7 @@
#include <Core/Defines.h> #include <Core/Defines.h>
#include <Core/BaseSettings.h> #include <Core/BaseSettings.h>
#include <Storages/MergeTree/MergeTreeDataFormatVersion.h>
namespace Poco::Util namespace Poco::Util

View File

@ -94,16 +94,7 @@ StorageMergeTree::StorageMergeTree(
loadMutations(); loadMutations();
auto settings = getSettings(); loadDeduplicationLog();
if (settings->non_replicated_deduplication_window != 0)
{
if (format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
throw Exception("Deduplication for non-replicated MergeTree in old syntax is not supported", ErrorCodes::BAD_ARGUMENTS);
std::string path = getDataPaths()[0] + "/deduplication_logs";
deduplication_log = std::make_unique<MergeTreeDeduplicationLog>(path, settings->non_replicated_deduplication_window, format_version);
deduplication_log->load();
}
} }
@ -276,6 +267,7 @@ void StorageMergeTree::alter(
TableLockHolder & table_lock_holder) TableLockHolder & table_lock_holder)
{ {
auto table_id = getStorageID(); auto table_id = getStorageID();
auto old_storage_settings = getSettings();
StorageInMemoryMetadata new_metadata = getInMemoryMetadata(); StorageInMemoryMetadata new_metadata = getInMemoryMetadata();
StorageInMemoryMetadata old_metadata = getInMemoryMetadata(); StorageInMemoryMetadata old_metadata = getInMemoryMetadata();
@ -310,6 +302,21 @@ void StorageMergeTree::alter(
if (!maybe_mutation_commands.empty()) if (!maybe_mutation_commands.empty())
waitForMutation(mutation_version, mutation_file_name); waitForMutation(mutation_version, mutation_file_name);
} }
{
/// Some additional changes in settings
auto new_storage_settings = getSettings();
if (old_storage_settings->non_replicated_deduplication_window != new_storage_settings->non_replicated_deduplication_window)
{
/// We cannot place this check into settings sanityCheck because it depends on format_version.
/// sanityCheck must work event without storage.
if (new_storage_settings->non_replicated_deduplication_window != 0 && format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
throw Exception("Deduplication for non-replicated MergeTree in old syntax is not supported", ErrorCodes::BAD_ARGUMENTS);
deduplication_log->setDeduplicationWindowSize(new_storage_settings->non_replicated_deduplication_window);
}
}
} }
@ -625,6 +632,16 @@ CancellationCode StorageMergeTree::killMutation(const String & mutation_id)
return CancellationCode::CancelSent; return CancellationCode::CancelSent;
} }
void StorageMergeTree::loadDeduplicationLog()
{
auto settings = getSettings();
if (settings->non_replicated_deduplication_window != 0 && format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
throw Exception("Deduplication for non-replicated MergeTree in old syntax is not supported", ErrorCodes::BAD_ARGUMENTS);
std::string path = getDataPaths()[0] + "/deduplication_logs";
deduplication_log = std::make_unique<MergeTreeDeduplicationLog>(path, settings->non_replicated_deduplication_window, format_version);
deduplication_log->load();
}
void StorageMergeTree::loadMutations() void StorageMergeTree::loadMutations()
{ {

View File

@ -134,6 +134,10 @@ private:
void loadMutations(); void loadMutations();
/// Load and initialize deduplication logs. Even if deduplication setting
/// equals zero creates object with deduplication window equals zero.
void loadDeduplicationLog();
/** Determines what parts should be merged and merges it. /** Determines what parts should be merged and merges it.
* If aggressive - when selects parts don't takes into account their ratio size and novelty (used for OPTIMIZE query). * If aggressive - when selects parts don't takes into account their ratio size and novelty (used for OPTIMIZE query).
* Returns true if merge is finished successfully. * Returns true if merge is finished successfully.

View File

@ -48,3 +48,38 @@
88 11 11 88 11 11
77 11 11 77 11 11
77 12 12 77 12 12
===============
1 1 33
1 1 33
2 2 33
3 3 33
===============
1 1 33
1 1 33
1 1 33
1 1 33
2 2 33
3 3 33
===============
1 1 33
1 1 33
1 1 33
1 1 33
1 1 33
2 2 33
3 3 33
===============
1 1 44
2 2 44
3 3 44
4 4 44
===============
1 1
1 1
===============
1 1
1 1
1 1
2 2
3 3
4 4

View File

@ -102,4 +102,86 @@ INSERT INTO merge_tree_deduplication (key, value) VALUES (12, '12'); -- not dedu
SELECT part, key, value FROM merge_tree_deduplication ORDER BY key; SELECT part, key, value FROM merge_tree_deduplication ORDER BY key;
-- Alters....
ALTER TABLE merge_tree_deduplication MODIFY SETTING non_replicated_deduplication_window = 2;
SELECT '===============';
INSERT INTO merge_tree_deduplication (key, value, part) VALUES (1, '1', 33);
INSERT INTO merge_tree_deduplication (key, value, part) VALUES (2, '2', 33);
INSERT INTO merge_tree_deduplication (key, value, part) VALUES (3, '3', 33);
INSERT INTO merge_tree_deduplication (key, value, part) VALUES (1, '1', 33);
SELECT * FROM merge_tree_deduplication WHERE part = 33 ORDER BY key;
SELECT '===============';
ALTER TABLE merge_tree_deduplication MODIFY SETTING non_replicated_deduplication_window = 0;
INSERT INTO merge_tree_deduplication (key, value, part) VALUES (1, '1', 33);
INSERT INTO merge_tree_deduplication (key, value, part) VALUES (1, '1', 33);
DETACH TABLE merge_tree_deduplication;
ATTACH TABLE merge_tree_deduplication;
SELECT * FROM merge_tree_deduplication WHERE part = 33 ORDER BY key;
SELECT '===============';
ALTER TABLE merge_tree_deduplication MODIFY SETTING non_replicated_deduplication_window = 3;
INSERT INTO merge_tree_deduplication (key, value, part) VALUES (1, '1', 33);
SELECT * FROM merge_tree_deduplication WHERE part = 33 ORDER BY key;
SELECT '===============';
INSERT INTO merge_tree_deduplication (key, value, part) VALUES (1, '1', 44);
INSERT INTO merge_tree_deduplication (key, value, part) VALUES (2, '2', 44);
INSERT INTO merge_tree_deduplication (key, value, part) VALUES (3, '3', 44);
INSERT INTO merge_tree_deduplication (key, value, part) VALUES (1, '1', 44);
INSERT INTO merge_tree_deduplication (key, value, part) VALUES (4, '4', 44);
DETACH TABLE merge_tree_deduplication;
ATTACH TABLE merge_tree_deduplication;
SELECT * FROM merge_tree_deduplication WHERE part = 44 ORDER BY key;
DROP TABLE IF EXISTS merge_tree_deduplication; DROP TABLE IF EXISTS merge_tree_deduplication;
SELECT '===============';
DROP TABLE IF EXISTS merge_tree_no_deduplication;
CREATE TABLE merge_tree_no_deduplication
(
key UInt64,
value String
)
ENGINE=MergeTree()
ORDER BY key;
INSERT INTO merge_tree_no_deduplication (key, value) VALUES (1, '1');
INSERT INTO merge_tree_no_deduplication (key, value) VALUES (1, '1');
SELECT * FROM merge_tree_no_deduplication ORDER BY key;
SELECT '===============';
ALTER TABLE merge_tree_no_deduplication MODIFY SETTING non_replicated_deduplication_window = 3;
INSERT INTO merge_tree_no_deduplication (key, value) VALUES (1, '1');
INSERT INTO merge_tree_no_deduplication (key, value) VALUES (2, '2');
INSERT INTO merge_tree_no_deduplication (key, value) VALUES (3, '3');
DETACH TABLE merge_tree_no_deduplication;
ATTACH TABLE merge_tree_no_deduplication;
INSERT INTO merge_tree_no_deduplication (key, value) VALUES (1, '1');
INSERT INTO merge_tree_no_deduplication (key, value) VALUES (4, '4');
SELECT * FROM merge_tree_no_deduplication ORDER BY key;
DROP TABLE IF EXISTS merge_tree_no_deduplication;