Fix some trash

This commit is contained in:
alesapin 2022-05-27 16:08:49 +02:00
parent 32167cb6fb
commit be1c3c132b
4 changed files with 63 additions and 24 deletions

View File

@ -3,9 +3,10 @@
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/join.hpp>
#include <boost/algorithm/string/trim.hpp>
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <Disks/WriteMode.h>
#include <Disks/IDisk.h>
namespace DB
{
@ -74,27 +75,27 @@ size_t getLogNumber(const std::string & path_str)
MergeTreeDeduplicationLog::MergeTreeDeduplicationLog(
const std::string & logs_dir_,
size_t deduplication_window_,
const MergeTreeDataFormatVersion & format_version_)
const MergeTreeDataFormatVersion & format_version_,
DiskPtr disk_)
: logs_dir(logs_dir_)
, deduplication_window(deduplication_window_)
, rotate_interval(deduplication_window_ * 2) /// actually it doesn't matter
, format_version(format_version_)
, deduplication_map(deduplication_window)
, disk(disk_)
{
namespace fs = std::filesystem;
if (deduplication_window != 0 && !fs::exists(logs_dir))
fs::create_directories(logs_dir);
if (deduplication_window != 0 && !disk->exists(logs_dir))
disk->createDirectories(logs_dir);
}
void MergeTreeDeduplicationLog::load()
{
namespace fs = std::filesystem;
if (!fs::exists(logs_dir))
if (!disk->exists(logs_dir))
return;
for (const auto & p : fs::directory_iterator(logs_dir))
for (auto it = disk->iterateDirectory(logs_dir); it->isValid(); it->next())
{
const auto & path = p.path();
const auto & path = it->path();
auto log_number = getLogNumber(path);
existing_logs[log_number] = {path, 0};
}
@ -124,19 +125,19 @@ void MergeTreeDeduplicationLog::load()
/// Can happen in case we have unfinished log
if (!current_writer)
current_writer = std::make_unique<WriteBufferFromFile>(existing_logs.rbegin()->second.path, DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY);
current_writer = disk->writeFile(existing_logs.rbegin()->second.path, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Append);
}
}
size_t MergeTreeDeduplicationLog::loadSingleLog(const std::string & path)
{
ReadBufferFromFile read_buf(path);
auto read_buf = disk->readFile(path);
size_t total_entries = 0;
while (!read_buf.eof())
while (!read_buf->eof())
{
MergeTreeDeduplicationLogRecord record;
readRecord(record, read_buf);
readRecord(record, *read_buf);
if (record.operation == MergeTreeDeduplicationOp::DROP)
deduplication_map.erase(record.block_id);
else
@ -160,7 +161,7 @@ void MergeTreeDeduplicationLog::rotate()
if (current_writer)
current_writer->sync();
current_writer = std::make_unique<WriteBufferFromFile>(log_description.path, DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY);
current_writer = disk->writeFile(log_description.path, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Append);
}
void MergeTreeDeduplicationLog::dropOutdatedLogs()
@ -188,7 +189,7 @@ void MergeTreeDeduplicationLog::dropOutdatedLogs()
for (auto itr = existing_logs.begin(); itr != existing_logs.end();)
{
size_t number = itr->first;
std::filesystem::remove(itr->second.path);
disk->removeFile(itr->second.path);
itr = existing_logs.erase(itr);
if (remove_from_value == number)
break;
@ -297,15 +298,38 @@ void MergeTreeDeduplicationLog::setDeduplicationWindowSize(size_t deduplication_
rotate_interval = deduplication_window * 2;
/// If settings was set for the first time with ALTER MODIFY SETTING query
if (deduplication_window != 0 && !std::filesystem::exists(logs_dir))
std::filesystem::create_directories(logs_dir);
if (deduplication_window != 0 && !disk->exists(logs_dir))
disk->createDirectories(logs_dir);
deduplication_map.setMaxSize(deduplication_window);
rotateAndDropIfNeeded();
/// Can happen in case we have unfinished log
if (!current_writer)
current_writer = std::make_unique<WriteBufferFromFile>(existing_logs.rbegin()->second.path, DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY);
current_writer = disk->writeFile(existing_logs.rbegin()->second.path, DBMS_DEFAULT_BUFFER_SIZE, WriteMode::Append);
}
void MergeTreeDeduplicationLog::shutdown()
{
std::lock_guard lock(state_mutex);
if (stopped)
return;
stopped = true;
current_writer->finalize();
}
MergeTreeDeduplicationLog::~MergeTreeDeduplicationLog()
{
try
{
shutdown();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
}

View File

@ -1,8 +1,8 @@
#pragma once
#include <Core/Types.h>
#include <base/StringRef.h>
#include <IO/WriteBufferFromFile.h>
#include <Storages/MergeTree/MergeTreePartInfo.h>
#include <Disks/IDisk.h>
#include <map>
#include <list>
#include <mutex>
@ -137,7 +137,8 @@ public:
MergeTreeDeduplicationLog(
const std::string & logs_dir_,
size_t deduplication_window_,
const MergeTreeDataFormatVersion & format_version_);
const MergeTreeDataFormatVersion & format_version_,
DiskPtr disk_);
/// Add part into in-memory hash table and to disk
/// Return true and part info if insertion was successful.
@ -151,6 +152,10 @@ public:
void load();
void setDeduplicationWindowSize(size_t deduplication_window_);
void shutdown();
~MergeTreeDeduplicationLog();
private:
const std::string logs_dir;
/// Size of deduplication window
@ -171,11 +176,16 @@ private:
LimitedOrderedHashMap<MergeTreePartInfo> deduplication_map;
/// Writer to the current log file
std::unique_ptr<WriteBufferFromFile> current_writer;
std::unique_ptr<WriteBufferFromFileBase> current_writer;
/// Overall mutex because we can have a lot of cocurrent inserts
std::mutex state_mutex;
/// Disk where log is stored
DiskPtr disk;
bool stopped{false};
/// Start new log
void rotate();

View File

@ -182,6 +182,9 @@ void StorageMergeTree::shutdown()
background_operations_assignee.finish();
background_moves_assignee.finish();
if (deduplication_log)
deduplication_log->shutdown();
try
{
/// We clear all old parts after stopping all background operations.
@ -715,8 +718,9 @@ void StorageMergeTree::loadDeduplicationLog()
if (settings->non_replicated_deduplication_window != 0 && format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING)
throw Exception("Deduplication for non-replicated MergeTree in old syntax is not supported", ErrorCodes::BAD_ARGUMENTS);
std::string path = getDataPaths()[0] + "/deduplication_logs";
deduplication_log = std::make_unique<MergeTreeDeduplicationLog>(path, settings->non_replicated_deduplication_window, format_version);
auto disk = getDisks()[0];
std::string path = fs::path(relative_data_path) / "deduplication_logs";
deduplication_log = std::make_unique<MergeTreeDeduplicationLog>(path, settings->non_replicated_deduplication_window, format_version, disk);
deduplication_log->load();
}

View File

@ -1,4 +1,5 @@
#!/usr/bin/env bash
# Tags: no-s3-storage
set -eo pipefail
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)