Some initial code

This commit is contained in:
alesapin 2021-03-31 18:20:30 +03:00
parent 124d9e14ad
commit c15d7e009d
3 changed files with 154 additions and 0 deletions

View File

@ -1349,6 +1349,24 @@ String IMergeTreeDataPart::getUniqueId() const
return id;
}
String IMergeTreeDataPart::getZeroLevelPartBlockID() const
{
if (info.level != 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to get block id for non zero level part {}", name);
SipHash hash;
checksums.computeTotalChecksumDataOnly(hash);
union
{
char bytes[16];
UInt64 words[2];
} hash_value;
hash.get128(hash_value.bytes);
return info.partition_id + "_" + toString(hash_value.words[0]) + "_" + toString(hash_value.words[1]);
}
bool isCompactPart(const MergeTreeDataPartPtr & data_part)
{
return (data_part && data_part->getType() == MergeTreeDataPartType::COMPACT);
@ -1364,5 +1382,6 @@ bool isInMemoryPart(const MergeTreeDataPartPtr & data_part)
return (data_part && data_part->getType() == MergeTreeDataPartType::IN_MEMORY);
}
}

View File

@ -0,0 +1,78 @@
#include <Storages/MergeTree/MergeTreeDeduplicationLog.h>
#include <filesystem>
#include <boost/algorithm/string/split.hpp>
#include <boost/algorithm/string/join.hpp>
#include <boost/algorithm/string/trim.hpp>
namespace DB
{
namespace
{
std::string getLogPath(const std::string & prefix, size_t number)
{
std::filesystem::path path(prefix);
path /= std::filesystem::path(std::string{"deduplication_log_"} + std::to_string(number) + ".txt");
return path;
}
size_t getLogNumber(const std::string & path_str)
{
std::filesystem::path path(path_str);
std::string filename = path.stem();
Strings filename_parts;
boost::split(filename_parts, filename, boost::is_any_of("_"));
return parse<size_t>(filename_parts[2]);
}
}
MergeTreeDeduplicationLog::MergeTreeDeduplicationLog(
const std::string & logs_dir_,
size_t deduplication_window_,
size_t rotate_interval_)
: logs_dir(logs_dir_)
, deduplication_window(deduplication_window_)
, rotate_interval(rotate_interval_)
{}
void MergeTreeDeduplicationLog::load()
{
namespace fs = std::filesystem;
if (!fs::exists(logs_dir))
fs::create_directories(logs_dir);
for (const auto & p : fs::directory_iterator(logs_dir))
{
auto path = p.path();
auto log_number = getLogNumber(path);
existing_logs[log_description.log_number] = {path, 0};
}
}
std::unordered_set<std::string> MergeTreeDeduplicationLog::loadSingleLog(const std::string & path)
{
ReadBufferFromFile read_buf(path);
while (!read_buf.eof())
{
readIntBinary(record_checksum, read_buf);
}
}
void MergeTreeDeduplicationLog::rotate()
{
size_t new_log_number = log_counter++;
auto new_description = getLogDescription(logs_dir, new_log_number, rotate_interval);
existing_logs.emplace(new_log_number, new_description);
current_writer->sync();
current_writer = std::make_unique<ChangelogWriter>(description.path, WriteMode::Append, description.from_log_index);
}
}

View File

@ -0,0 +1,57 @@
#pragma once
#include <Core/Types.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeData.h>
namespace DB
{
enum class MergeTreeDeduplicationOp : uint8_t
{
ADD = 1,
DROP = 2,
};
struct MergeTreeDeduplicationLogRecord
{
MergeTreeDeduplicationOp operation;
std::string part_name;
std::string block_id;
};
struct MergeTreeDeduplicationLogNameDescription
{
std::string path;
size_t entries_count;
};
class MergeTreeDeduplicationLog
{
public:
MergeTreeDeduplicationLog(
const std::string & logs_dir_,
size_t deduplication_window_,
size_t rotate_interval_);
bool addPart(const MergeTreeData::MutableDataPartPtr & part);
void dropPart(const MergeTreeData::MutableDataPartPtr & part);
void dropPartition(const std::string & partition_id);
void load();
private:
const std::string logs_dir;
const size_t deduplication_window;
const size_t rotate_interval;
size_t log_counter = 1;
std::map<size_t, MergeTreeDeduplicationLogNameDescription> existing_logs;
std::unordered_set<std::string> deduplication_set;
std::unique_ptr<WriteBufferFromFile> current_writer;
size_t entries_written_in_current_file;
void rotate();
std::unordered_set<std::string> loadSingleLog(const std::string & path) const;
};
}