mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-19 16:20:50 +00:00
Compare commits
46 Commits
2a5c94b0ee
...
e0b5ec379d
Author | SHA1 | Date | |
---|---|---|---|
|
e0b5ec379d | ||
|
c0c83236b6 | ||
|
3eb5bc1a0f | ||
|
b88cd79959 | ||
|
143d9f0201 | ||
|
f52cdfb795 | ||
|
3a7c68a052 | ||
|
e8d50aa97f | ||
|
cb92aaf968 | ||
|
0cdec0acf1 | ||
|
04f23332c3 | ||
|
7d5203f8a7 | ||
|
0d1d750437 | ||
|
ad31d86a15 | ||
|
991279e5c6 | ||
|
c184aae686 | ||
|
14a6b0422b | ||
|
e8cec05d08 | ||
|
2876a4e714 | ||
|
a903e1a726 | ||
|
2fa6be55ff | ||
|
8896d1b78b | ||
|
f688b903db | ||
|
21f9669836 | ||
|
1a386ae4d5 | ||
|
24f4e87f8b | ||
|
620640a042 | ||
|
ec469a117d | ||
|
7a879980d8 | ||
|
2adc61c215 | ||
|
afc4d08aad | ||
|
edc5d8dd92 | ||
|
d6b2a9d534 | ||
|
dc97bd6b92 | ||
|
60c6eb2610 | ||
|
9133505952 | ||
|
2741bf00e4 | ||
|
4eca00a666 | ||
|
c6804122cb | ||
|
189cbe25fe | ||
|
0c16dc0531 | ||
|
74b870710f | ||
|
4b8d315e87 | ||
|
d20f56c69d | ||
|
c6e5824e59 | ||
|
bb36d392be |
@ -146,6 +146,7 @@ endif()
|
||||
add_headers_and_sources(dbms Disks/ObjectStorages/Cached)
|
||||
add_headers_and_sources(dbms Disks/ObjectStorages/Local)
|
||||
add_headers_and_sources(dbms Disks/ObjectStorages/Web)
|
||||
add_headers_and_sources(dbms Disks/ObjectStorages/VFS)
|
||||
|
||||
add_headers_and_sources(dbms Storages/Cache)
|
||||
if (TARGET ch_contrib::hivemetastore)
|
||||
|
@ -151,6 +151,15 @@ Names NamesAndTypesList::getNames() const
|
||||
return res;
|
||||
}
|
||||
|
||||
NameSet NamesAndTypesList::getNameSet() const
|
||||
{
|
||||
NameSet res;
|
||||
res.reserve(size());
|
||||
for (const NameAndTypePair & column : *this)
|
||||
res.insert(column.name);
|
||||
return res;
|
||||
}
|
||||
|
||||
DataTypes NamesAndTypesList::getTypes() const
|
||||
{
|
||||
DataTypes res;
|
||||
|
@ -100,6 +100,7 @@ public:
|
||||
void getDifference(const NamesAndTypesList & rhs, NamesAndTypesList & deleted, NamesAndTypesList & added) const;
|
||||
|
||||
Names getNames() const;
|
||||
NameSet getNameSet() const;
|
||||
DataTypes getTypes() const;
|
||||
|
||||
/// Remove columns which names are not in the `names`.
|
||||
|
425
src/Disks/ObjectStorages/VFS/AppendLog.cpp
Normal file
425
src/Disks/ObjectStorages/VFS/AppendLog.cpp
Normal file
@ -0,0 +1,425 @@
|
||||
#include "AppendLog.h"
|
||||
|
||||
#include <IO/ReadBufferFromFile.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <IO/copyData.h>
|
||||
#include <Poco/Checksum.h>
|
||||
#include <Common/re2.h>
|
||||
|
||||
#include <ranges>
|
||||
#include <string>
|
||||
|
||||
namespace DB::ErrorCodes
|
||||
{
|
||||
extern const int BAD_ARGUMENTS;
|
||||
extern const int ARGUMENT_OUT_OF_BOUND;
|
||||
extern const int CORRUPTED_DATA;
|
||||
extern const int INVALID_STATE;
|
||||
}
|
||||
|
||||
namespace DB::WAL
|
||||
{
|
||||
|
||||
constexpr char first_ext[] = ".first";
|
||||
constexpr char tmp_ext[] = ".tmp";
|
||||
constexpr char prefix[] = "log_";
|
||||
|
||||
static UInt64 extract_start_index(const String & filename)
|
||||
{
|
||||
return static_cast<UInt64>(std::stoull(filename.substr(strlen(prefix), filename.find("."))));
|
||||
}
|
||||
|
||||
static String makeSegmentName(UInt64 start_index, std::string_view ext = "")
|
||||
{
|
||||
return fmt::format("{}{:020}{}", prefix, start_index, ext);
|
||||
}
|
||||
|
||||
static void verifyChecksum(const Entry & entry)
|
||||
{
|
||||
Poco::Checksum crc(Poco::Checksum::Type::TYPE_CRC32);
|
||||
crc.update(reinterpret_cast<const char *>(&entry.index), sizeof(entry.index));
|
||||
crc.update(entry.data.data(), static_cast<unsigned int>(entry.data.size()));
|
||||
|
||||
if (crc.checksum() != entry.checksum)
|
||||
throw DB::Exception(ErrorCodes::CORRUPTED_DATA, "Checksum for the entry {} does not match the computed one", entry.index);
|
||||
}
|
||||
|
||||
struct EntryNonOwning
|
||||
{
|
||||
EntryNonOwning(UInt64 index_, std::span<const char> data_) : index(index_), data(data_)
|
||||
{
|
||||
Poco::Checksum crc(Poco::Checksum::Type::TYPE_CRC32);
|
||||
crc.update(reinterpret_cast<const char *>(&index), sizeof(index));
|
||||
crc.update(data.data(), static_cast<unsigned int>(data.size()));
|
||||
checksum = crc.checksum();
|
||||
}
|
||||
|
||||
UInt64 index;
|
||||
std::span<const char> data;
|
||||
UInt32 checksum;
|
||||
};
|
||||
|
||||
|
||||
class EntrySerializer
|
||||
{
|
||||
public:
|
||||
static void serialize(const EntryNonOwning & entry, WriteBuffer & out)
|
||||
{
|
||||
writeVarUInt(entry.index, out);
|
||||
writeStringBinary(StringRef(entry.data.data(), entry.data.size()), out);
|
||||
writeVarUInt(entry.checksum, out);
|
||||
}
|
||||
|
||||
static Entry deserialize(ReadBuffer & in)
|
||||
{
|
||||
Entry entry;
|
||||
try
|
||||
{
|
||||
readVarUInt(entry.index, in);
|
||||
readBinary(entry.data, in);
|
||||
readVarUInt(entry.checksum, in);
|
||||
}
|
||||
catch (const DB::Exception & e)
|
||||
{
|
||||
if (e.code() == DB::ErrorCodes::CANNOT_READ_ALL_DATA || e.code() == DB::ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF)
|
||||
{
|
||||
throw Exception(
|
||||
ErrorCodes::CORRUPTED_DATA, "Cannot read data. Source data is possibly corrupted");
|
||||
}
|
||||
else
|
||||
throw;
|
||||
}
|
||||
|
||||
verifyChecksum(entry);
|
||||
return entry;
|
||||
}
|
||||
|
||||
static size_t calculateSerializedSize(const EntryNonOwning & entry)
|
||||
{
|
||||
size_t res = 0;
|
||||
|
||||
res += getLengthOfVarUInt(entry.index);
|
||||
res += getLengthOfVarUInt(entry.data.size_bytes());
|
||||
res += entry.data.size_bytes();
|
||||
res += getLengthOfVarUInt(entry.checksum);
|
||||
|
||||
return res;
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
AppendLog::AppendLog(const fs::path & log_dir_, const Settings & settings_) : log_dir(log_dir_), settings(settings_)
|
||||
{
|
||||
if (!fs::exists(log_dir))
|
||||
fs::create_directories(log_dir);
|
||||
load();
|
||||
}
|
||||
|
||||
UUID AppendLog::getID() const
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
return log_id;
|
||||
}
|
||||
|
||||
|
||||
void AppendLog::load()
|
||||
{
|
||||
// TODO: load uuid file
|
||||
|
||||
for (const auto & dir_entry : fs::directory_iterator(log_dir))
|
||||
{
|
||||
const String filename = dir_entry.path().filename();
|
||||
|
||||
if (!dir_entry.is_regular_file())
|
||||
continue;
|
||||
if (!filename.starts_with(prefix))
|
||||
continue;
|
||||
if (filename.ends_with(tmp_ext))
|
||||
{
|
||||
fs::remove(dir_entry.path());
|
||||
continue;
|
||||
}
|
||||
|
||||
segments.emplace_back(dir_entry.path(), extract_start_index(filename));
|
||||
}
|
||||
// The last segment at the beginning
|
||||
std::ranges::sort(segments, [](const auto & lhs, const auto & rhs) { return lhs.path > rhs.path; });
|
||||
|
||||
if (segments.empty())
|
||||
{
|
||||
next_index = 0;
|
||||
const auto path = log_dir / makeSegmentName(next_index);
|
||||
segments.emplace_back(path, next_index);
|
||||
|
||||
log_file = std::make_unique<WriteBufferFromFile>(path, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_CREAT | O_TRUNC | O_APPEND);
|
||||
active_segment_size = 0;
|
||||
return;
|
||||
}
|
||||
|
||||
// Find the segment marked as first with the largest index
|
||||
auto first_segment = std::ranges::find_if(segments, [](auto && s) { return s.path.extension() == first_ext; });
|
||||
if (first_segment != segments.end())
|
||||
{
|
||||
// Remove outdated segments
|
||||
std::for_each(std::next(first_segment), segments.end(), [](auto && s) { fs::remove(s.path); });
|
||||
segments.erase(std::next(first_segment), segments.end());
|
||||
|
||||
fs::path new_path = first_segment->path;
|
||||
new_path.replace_extension();
|
||||
fs::rename(first_segment->path, new_path);
|
||||
first_segment->path = new_path;
|
||||
}
|
||||
|
||||
Segment & last_segment = segments.front();
|
||||
next_index = findNextIndex(last_segment);
|
||||
// Open the last segment for appending. File must exists
|
||||
log_file = std::make_unique<WriteBufferFromFile>(last_segment.path, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_APPEND);
|
||||
active_segment_size = log_file->size();
|
||||
}
|
||||
|
||||
UInt64 AppendLog::getNextIndex() const
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
assertNotCorrupted();
|
||||
return next_index;
|
||||
}
|
||||
|
||||
UInt64 AppendLog::segmentsCount() const
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
assertNotCorrupted();
|
||||
return segments.size();
|
||||
}
|
||||
|
||||
UInt64 AppendLog::getStartIndex() const
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
assertNotCorrupted();
|
||||
return getStartIndexNoLock();
|
||||
}
|
||||
|
||||
UInt64 AppendLog::getStartIndexNoLock() const
|
||||
{
|
||||
chassert(!segments.empty());
|
||||
return segments.front().start_index;
|
||||
}
|
||||
|
||||
void AppendLog::reload()
|
||||
{
|
||||
segments.clear();
|
||||
load();
|
||||
}
|
||||
|
||||
size_t AppendLog::size() const
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
assertNotCorrupted();
|
||||
|
||||
if (segments.empty())
|
||||
return 0;
|
||||
return next_index - segments.front().start_index;
|
||||
}
|
||||
|
||||
|
||||
UInt64 AppendLog::findNextIndex(const Segment & segment) const
|
||||
{
|
||||
ReadBufferFromFile in(segment.path);
|
||||
UInt64 res = segment.start_index;
|
||||
|
||||
// TODO: read and check version of segment
|
||||
while (!in.eof())
|
||||
{
|
||||
try
|
||||
{
|
||||
Entry entry = EntrySerializer::deserialize(in);
|
||||
res = entry.index + 1;
|
||||
}
|
||||
catch (const DB::Exception & e)
|
||||
{
|
||||
if (e.code() == ErrorCodes::CORRUPTED_DATA)
|
||||
is_corrupt = true;
|
||||
throw;
|
||||
}
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
void AppendLog::appendSegment(UInt64 start_index)
|
||||
{
|
||||
const auto path = log_dir / makeSegmentName(start_index);
|
||||
segments.emplace_back(path, start_index);
|
||||
|
||||
if (log_file)
|
||||
log_file->sync();
|
||||
|
||||
log_file = std::make_unique<WriteBufferFromFile>(path, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_APPEND | O_CREAT);
|
||||
active_segment_size = 0;
|
||||
}
|
||||
|
||||
UInt64 AppendLog::append(std::span<const char> message)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
assertNotCorrupted();
|
||||
|
||||
UInt64 index = next_index;
|
||||
EntryNonOwning entry(index, message);
|
||||
size_t entry_size = EntrySerializer::calculateSerializedSize(entry);
|
||||
|
||||
if (entry_size > settings.max_segment_size)
|
||||
throw DB::Exception(
|
||||
DB::ErrorCodes::BAD_ARGUMENTS,
|
||||
"Log entry size: {} is greater than max_segment_size: {}",
|
||||
entry_size,
|
||||
settings.max_segment_size);
|
||||
|
||||
if (active_segment_size + entry_size > settings.max_segment_size)
|
||||
appendSegment(next_index);
|
||||
|
||||
size_t old_count = log_file->count();
|
||||
EntrySerializer::serialize(entry, *log_file);
|
||||
log_file->sync();
|
||||
next_index++;
|
||||
active_segment_size += log_file->count() - old_count;
|
||||
|
||||
return index;
|
||||
}
|
||||
|
||||
Entries AppendLog::readFront(size_t count) const
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
assertNotCorrupted();
|
||||
|
||||
Entries entries;
|
||||
entries.reserve(count);
|
||||
size_t read = 0;
|
||||
|
||||
for (const auto & segment : segments)
|
||||
{
|
||||
read += readEntries(segment, entries, count - read);
|
||||
if (read >= count)
|
||||
break;
|
||||
}
|
||||
chassert(entries.size() <= count);
|
||||
|
||||
return entries;
|
||||
}
|
||||
|
||||
size_t AppendLog::readEntries(const Segment & segment, Entries & entries, size_t limit) const
|
||||
{
|
||||
// TODO: Make a segment cache(LRU) for keeping entries in memory
|
||||
ReadBufferFromFile in(segment.path);
|
||||
size_t loaded = 0;
|
||||
|
||||
// TODO: read and check version of segment
|
||||
while (!in.eof())
|
||||
{
|
||||
try
|
||||
{
|
||||
auto entry = EntrySerializer::deserialize(in);
|
||||
entries.push_back(entry);
|
||||
}
|
||||
catch (const DB::Exception & e)
|
||||
{
|
||||
if (e.code() == ErrorCodes::CORRUPTED_DATA)
|
||||
is_corrupt = true;
|
||||
throw;
|
||||
}
|
||||
loaded++;
|
||||
|
||||
if (loaded >= limit)
|
||||
break;
|
||||
}
|
||||
return loaded;
|
||||
}
|
||||
|
||||
void AppendLog::copyEntriesWithSkip(Segment & segment, DB::WriteBuffer & out, size_t entries_to_skip)
|
||||
{
|
||||
ReadBufferFromFile in(segment.path);
|
||||
|
||||
// TODO: read and check version of segment
|
||||
for (size_t i = 0; i < entries_to_skip; i++)
|
||||
{
|
||||
try
|
||||
{
|
||||
EntrySerializer::deserialize(in);
|
||||
}
|
||||
catch (const DB::Exception & e)
|
||||
{
|
||||
if (e.code() == DB::ErrorCodes::CORRUPTED_DATA)
|
||||
is_corrupt = true;
|
||||
throw;
|
||||
}
|
||||
}
|
||||
// Just copy the rest
|
||||
copyData(in, out);
|
||||
out.sync();
|
||||
}
|
||||
|
||||
int AppendLog::findSegment(UInt64 index) const
|
||||
{
|
||||
for (size_t i = 0; i < segments.size(); i++)
|
||||
{
|
||||
UInt64 last_index{};
|
||||
if (i == segments.size() - 1)
|
||||
last_index = next_index;
|
||||
else
|
||||
last_index = segments[i + 1].start_index;
|
||||
|
||||
if (index >= segments[i].start_index && index < last_index)
|
||||
return static_cast<int>(i);
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
size_t AppendLog::dropUpTo(UInt64 index)
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
|
||||
assertNotCorrupted();
|
||||
chassert(!segments.empty());
|
||||
size_t start_index = getStartIndexNoLock();
|
||||
|
||||
if (index < start_index || index > next_index)
|
||||
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "The index to drop up to is out of range");
|
||||
|
||||
if (index == start_index)
|
||||
return 0;
|
||||
|
||||
// Find a segment containing the last index to drop
|
||||
int segment_idx = findSegment(index - 1);
|
||||
chassert(segment_idx != -1);
|
||||
|
||||
size_t whole_segments_to_drop = segment_idx;
|
||||
size_t entries_to_drop = index - segments[segment_idx].start_index;
|
||||
|
||||
size_t new_start_index{};
|
||||
if (whole_segments_to_drop == segments.size())
|
||||
new_start_index = next_index;
|
||||
else
|
||||
new_start_index = segments[whole_segments_to_drop].start_index + entries_to_drop;
|
||||
|
||||
String new_first_segment_name = makeSegmentName(new_start_index, first_ext);
|
||||
|
||||
// Create temporary file
|
||||
String tmp_name = new_first_segment_name + tmp_ext;
|
||||
auto tmp_file = WriteBufferFromFile(log_dir / tmp_name, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_CREAT);
|
||||
|
||||
if (whole_segments_to_drop != segments.size())
|
||||
copyEntriesWithSkip(segments[whole_segments_to_drop], tmp_file, entries_to_drop);
|
||||
|
||||
fs::rename(log_dir / tmp_name, log_dir / new_first_segment_name);
|
||||
reload();
|
||||
|
||||
chassert(getStartIndexNoLock() == index);
|
||||
return getStartIndexNoLock() - start_index;
|
||||
}
|
||||
|
||||
void AppendLog::assertNotCorrupted() const
|
||||
{
|
||||
if (is_corrupt)
|
||||
throw Exception(ErrorCodes::INVALID_STATE, "WAL possibly contains corrupted data");
|
||||
}
|
||||
|
||||
}
|
103
src/Disks/ObjectStorages/VFS/AppendLog.h
Normal file
103
src/Disks/ObjectStorages/VFS/AppendLog.h
Normal file
@ -0,0 +1,103 @@
|
||||
#pragma once
|
||||
|
||||
#include <IO/WriteBufferFromFile.h>
|
||||
#include <Core/Types.h>
|
||||
|
||||
#include <filesystem>
|
||||
#include <mutex>
|
||||
#include <span>
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
|
||||
namespace DB::WAL
|
||||
{
|
||||
|
||||
enum class SyncMode
|
||||
{
|
||||
NO_SYNC,
|
||||
TIMED_SYNC,
|
||||
ALL_SYNC
|
||||
};
|
||||
|
||||
struct Segment
|
||||
{
|
||||
fs::path path;
|
||||
UInt64 start_index;
|
||||
};
|
||||
|
||||
struct Entry
|
||||
{
|
||||
UInt64 index;
|
||||
std::vector<char> data;
|
||||
UInt32 checksum;
|
||||
|
||||
bool operator==(const Entry &) const = default;
|
||||
};
|
||||
using Entries = std::vector<Entry>;
|
||||
|
||||
struct Settings
|
||||
{
|
||||
SyncMode sync_mode = SyncMode::ALL_SYNC;
|
||||
UInt64 max_segment_size = 10 * 1024 * 1024;
|
||||
};
|
||||
|
||||
class AppendLog
|
||||
{
|
||||
public:
|
||||
AppendLog(const fs::path & log_dir, const Settings & settings_ = Settings{});
|
||||
/// Append message at the end of the log and return index of the inserted entry
|
||||
UInt64 append(std::span<const char> message);
|
||||
/// Read count entries(maybe less) from the front of the log
|
||||
Entries readFront(size_t count) const;
|
||||
/// Drop all entries from the beginning of the log up to one with the specified index (excluding)
|
||||
size_t dropUpTo(UInt64 index);
|
||||
/// Return the number of entries in the log
|
||||
size_t size() const;
|
||||
/// Return the index of the next entry to append
|
||||
UInt64 getNextIndex() const;
|
||||
/// Return the index of the first entry in the log
|
||||
UInt64 getStartIndex() const;
|
||||
/// Return the number of segments the log contains
|
||||
size_t segmentsCount() const;
|
||||
/// Return unique log ID
|
||||
UUID getID() const;
|
||||
|
||||
private:
|
||||
/// Load all segments from the log directory and clean outdated segments
|
||||
void load() TSA_REQUIRES(mutex);
|
||||
/// Reload segments from the log directory
|
||||
void reload() TSA_REQUIRES(mutex);
|
||||
/// Read no more than limit entries from the segment and return how much was read
|
||||
size_t readEntries(const Segment & segment, Entries & entries, size_t limit) const TSA_REQUIRES(mutex);
|
||||
/// Copy entries from the beginning of the segment to the output buffer skipping the first ones
|
||||
void copyEntriesWithSkip(Segment & segment, WriteBuffer & out, size_t entries_to_skip) TSA_REQUIRES(mutex);
|
||||
/// Create and append new empty segment to the log
|
||||
void appendSegment(UInt64 start_index) TSA_REQUIRES(mutex);
|
||||
|
||||
/// Return the sequence number of the segment containing entry with the index
|
||||
int findSegment(UInt64 index) const TSA_REQUIRES(mutex);
|
||||
/// Return the index after the last entry of the segment
|
||||
UInt64 findNextIndex(const Segment & segment) const TSA_REQUIRES(mutex);
|
||||
void assertNotCorrupted() const TSA_REQUIRES(mutex);
|
||||
|
||||
UInt64 getStartIndexNoLock() const TSA_REQUIRES(mutex);
|
||||
|
||||
mutable std::mutex mutex;
|
||||
/// The active segment file to append
|
||||
std::unique_ptr<WriteBufferFromFile> log_file TSA_GUARDED_BY(mutex);
|
||||
size_t active_segment_size TSA_GUARDED_BY(mutex) = 0;
|
||||
|
||||
UUID log_id TSA_GUARDED_BY(mutex) {};
|
||||
/// Index of the next entry to append
|
||||
UInt64 next_index TSA_GUARDED_BY(mutex) = 0;
|
||||
/// Log working directory
|
||||
fs::path log_dir TSA_GUARDED_BY(mutex);
|
||||
/// Settings
|
||||
Settings settings TSA_GUARDED_BY(mutex);
|
||||
/// Segments compriing the log
|
||||
std::vector<Segment> segments TSA_GUARDED_BY(mutex);
|
||||
/// Whether corrupted data has been detected
|
||||
mutable bool is_corrupt TSA_GUARDED_BY(mutex) = false;
|
||||
};
|
||||
|
||||
}
|
117
src/Disks/ObjectStorages/VFS/JSONSerializer.h
Normal file
117
src/Disks/ObjectStorages/VFS/JSONSerializer.h
Normal file
@ -0,0 +1,117 @@
|
||||
#pragma once
|
||||
|
||||
#include <Common/Exception.h>
|
||||
|
||||
#include <Disks/ObjectStorages/VFS/VFSLog.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <Poco/JSON/JSON.h>
|
||||
#include <Poco/JSON/Object.h>
|
||||
#include <Poco/JSON/Parser.h>
|
||||
#include <Poco/JSON/Stringifier.h>
|
||||
|
||||
#include <variant>
|
||||
#include <vector>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int CORRUPTED_DATA;
|
||||
}
|
||||
|
||||
class JSONSerializer
|
||||
{
|
||||
public:
|
||||
static std::vector<char> serialize(const VFSEvent & event)
|
||||
{
|
||||
Poco::JSON::Object object;
|
||||
|
||||
serializeToJSON(event, object);
|
||||
String str = to_string(object);
|
||||
|
||||
return {str.begin(), str.end()};
|
||||
}
|
||||
|
||||
static VFSEvent deserialize(std::span<char> buffer)
|
||||
{
|
||||
String str(buffer.data(), buffer.size());
|
||||
auto object = from_string(std::move(str));
|
||||
|
||||
if (!object)
|
||||
throw Exception(ErrorCodes::CORRUPTED_DATA, "Cannot parse VFS log item buffer as JSON");
|
||||
|
||||
VFSEvent event;
|
||||
deserializeFromJSON(event, *object);
|
||||
|
||||
return event;
|
||||
}
|
||||
|
||||
private:
|
||||
template <typename Value>
|
||||
static Value getOrThrow(const Poco::JSON::Object & object, const String & key)
|
||||
{
|
||||
if (!object.has(key))
|
||||
throw Exception(ErrorCodes::CORRUPTED_DATA, "Key {} is not found in VFS log item", key);
|
||||
return object.getValue<Value>(key);
|
||||
}
|
||||
|
||||
static String to_string(const Poco::JSON::Object & object)
|
||||
{
|
||||
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
|
||||
oss.exceptions(std::ios::failbit);
|
||||
Poco::JSON::Stringifier::stringify(object, oss);
|
||||
|
||||
return oss.str();
|
||||
}
|
||||
|
||||
static Poco::JSON::Object::Ptr from_string(const String & json_str)
|
||||
{
|
||||
Poco::JSON::Parser parser;
|
||||
return parser.parse(json_str).extract<Poco::JSON::Object::Ptr>();
|
||||
}
|
||||
|
||||
static void serializeToJSON(const VFSEvent & event, Poco::JSON::Object & object)
|
||||
{
|
||||
object.set("remote_path", event.remote_path);
|
||||
object.set("local_path", event.local_path);
|
||||
object.set("action", static_cast<std::underlying_type_t<VFSAction>>(event.action));
|
||||
object.set("timestamp", event.timestamp.epochMicroseconds());
|
||||
|
||||
if (event.orig_wal.has_value())
|
||||
{
|
||||
Poco::JSON::Object orig_wal;
|
||||
serializeToJSON(*event.orig_wal, orig_wal);
|
||||
|
||||
object.set("orig_wal", orig_wal);
|
||||
}
|
||||
}
|
||||
|
||||
static void serializeToJSON(const WALInfo & orig_wal, Poco::JSON::Object & object)
|
||||
{
|
||||
object.set("index", orig_wal.index);
|
||||
object.set("id", toString(orig_wal.id));
|
||||
object.set("replica", orig_wal.replica);
|
||||
}
|
||||
|
||||
static void deserializeFromJSON(VFSEvent & event, const Poco::JSON::Object & json)
|
||||
{
|
||||
event.remote_path = getOrThrow<String>(json, "remote_path");
|
||||
event.local_path = getOrThrow<String>(json, "local_path");
|
||||
event.action = static_cast<VFSAction>(getOrThrow<std::underlying_type_t<VFSAction>>(json, "action"));
|
||||
event.timestamp = getOrThrow<UInt64>(json, "timestamp");
|
||||
|
||||
if (auto orig_wal = json.getObject("orig_wal"); orig_wal)
|
||||
deserializeFromJSON(event.orig_wal.emplace(), *orig_wal);
|
||||
}
|
||||
|
||||
static void deserializeFromJSON(WALInfo & orig_wal, const Poco::JSON::Object & json)
|
||||
{
|
||||
orig_wal.id = parseFromString<UUID>(getOrThrow<String>(json, "id"));
|
||||
orig_wal.index = getOrThrow<UInt64>(json, "index");
|
||||
orig_wal.replica = getOrThrow<String>(json, "replica");
|
||||
}
|
||||
};
|
||||
|
||||
}
|
157
src/Disks/ObjectStorages/VFS/VFSGarbageCollector.cpp
Normal file
157
src/Disks/ObjectStorages/VFS/VFSGarbageCollector.cpp
Normal file
@ -0,0 +1,157 @@
|
||||
#include "VFSGarbageCollector.h"
|
||||
#include <IO/ReadBufferFromEmptyFile.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/S3Common.h>
|
||||
#include <Interpreters/Context.h>
|
||||
|
||||
#include <Common/FailPoint.h>
|
||||
#include <Common/ProfileEvents.h>
|
||||
#include <Common/Stopwatch.h>
|
||||
#include <Common/ZooKeeper/ZooKeeperLock.h>
|
||||
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
extern const Event VFSGcRunsCompleted;
|
||||
extern const Event VFSGcRunsException;
|
||||
extern const Event VFSGcRunsSkipped;
|
||||
extern const Event VFSGcTotalSeconds;
|
||||
extern const Event VFSGcCumulativeWALSItemsMerged;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
using ms = std::chrono::milliseconds;
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int INVALID_STATE;
|
||||
}
|
||||
namespace FailPoints
|
||||
{
|
||||
extern const char vfs_gc_optimistic_lock_delay[];
|
||||
}
|
||||
|
||||
VFSGarbageCollector::VFSGarbageCollector(
|
||||
const String & gc_name_,
|
||||
ObjectStoragePtr object_storage_,
|
||||
VFSLog & wal_,
|
||||
BackgroundSchedulePool & pool,
|
||||
const GarbageCollectorSettings & settings_)
|
||||
: gc_name(gc_name_)
|
||||
, object_storage(object_storage_)
|
||||
, vfs_shapshot_data(object_storage, gc_name)
|
||||
, wal(wal_)
|
||||
, settings(settings_)
|
||||
, log(getLogger(fmt::format("VFSGC({})", gc_name)))
|
||||
{
|
||||
LOG_INFO(log, "GC started");
|
||||
initGCState();
|
||||
*static_cast<BackgroundSchedulePoolTaskHolder *>(this) = pool.createTask(
|
||||
log->name(),
|
||||
[this]
|
||||
{
|
||||
try
|
||||
{
|
||||
run();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
ProfileEvents::increment(ProfileEvents::VFSGcRunsException);
|
||||
tryLogCurrentException(log, __PRETTY_FUNCTION__);
|
||||
}
|
||||
(*this)->scheduleAfter(settings.gc_sleep_ms);
|
||||
});
|
||||
(*this)->activateAndSchedule();
|
||||
}
|
||||
|
||||
void VFSGarbageCollector::initGCState() const
|
||||
{
|
||||
auto zookeeper = getZookeeper();
|
||||
auto path = settings.zk_gc_path + "/garbage_collector";
|
||||
zookeeper->tryCreate(path, "", zkutil::CreateMode::Persistent);
|
||||
}
|
||||
|
||||
|
||||
void VFSGarbageCollector::run()
|
||||
{
|
||||
Stopwatch stop_watch;
|
||||
|
||||
/// Optimization to reduce probability of clashes between replicas
|
||||
/// TODO alexfvk: templatize ZooKeeperLock to support ZooKeeperWithFaultInjection
|
||||
auto lock = zkutil::createSimpleZooKeeperLock(
|
||||
getZookeeper()->getKeeper(), settings.zk_gc_path + "/garbage_collector", "lock", fmt::format("garbare_collector_run"));
|
||||
if (!lock->tryLock())
|
||||
{
|
||||
LOG_DEBUG(log, "Skipped run due to pessimistic lock already acquired");
|
||||
return;
|
||||
}
|
||||
LOG_DEBUG(log, "GC acquired lock");
|
||||
|
||||
updateSnapshot();
|
||||
|
||||
ProfileEvents::increment(ProfileEvents::VFSGcRunsCompleted);
|
||||
ProfileEvents::increment(ProfileEvents::VFSGcTotalSeconds, stop_watch.elapsedMilliseconds() / 1000);
|
||||
LOG_DEBUG(log, "GC iteration finished");
|
||||
}
|
||||
|
||||
String VFSGarbageCollector::getZKSnapshotPath() const
|
||||
{
|
||||
return settings.zk_gc_path + "/garbage_collector/shapshot_meta";
|
||||
}
|
||||
|
||||
|
||||
SnapshotMetadata VFSGarbageCollector::getSnapshotMetadata() const
|
||||
{
|
||||
auto zk_shapshot_metadata_path = getZKSnapshotPath();
|
||||
auto zookeeper = getZookeeper();
|
||||
|
||||
Coordination::Stat stat;
|
||||
Coordination::Error error;
|
||||
String content;
|
||||
|
||||
zookeeper->tryGet(zk_shapshot_metadata_path, content, &stat, nullptr, &error);
|
||||
|
||||
if (error == Coordination::Error::ZOK)
|
||||
return SnapshotMetadata::deserialize(content, stat.version);
|
||||
|
||||
throw Coordination::Exception(error);
|
||||
}
|
||||
|
||||
ZooKeeperWithFaultInjectionPtr VFSGarbageCollector::getZookeeper() const
|
||||
{
|
||||
return ZooKeeperWithFaultInjection::createInstance(
|
||||
settings.keeper_fault_injection_probability,
|
||||
settings.keeper_fault_injection_seed,
|
||||
Context::getGlobalContextInstance()->getZooKeeper(),
|
||||
fmt::format("VFSGarbageCollector({})", gc_name),
|
||||
log);
|
||||
}
|
||||
|
||||
void VFSGarbageCollector::updateShapshotMetadata(const SnapshotMetadata & new_snapshot, int32_t znode_required_version) const
|
||||
{
|
||||
auto zk_shapshot_metadata_path = getZKSnapshotPath();
|
||||
auto zookeeper = getZookeeper();
|
||||
/// For consistency
|
||||
zookeeper->set(zk_shapshot_metadata_path, new_snapshot.serialize(), znode_required_version);
|
||||
}
|
||||
|
||||
void VFSGarbageCollector::updateSnapshot()
|
||||
{
|
||||
SnapshotMetadata snapshot_meta = getSnapshotMetadata();
|
||||
auto wal_items_batch = wal.read(settings.batch_size);
|
||||
if (wal_items_batch.size() == 0ul)
|
||||
{
|
||||
LOG_DEBUG(log, "Merge snapshot exit due to empty wal.");
|
||||
return;
|
||||
}
|
||||
LOG_DEBUG(log, "Merge snapshot with {} entries from wal.", wal_items_batch.size());
|
||||
|
||||
auto new_snaphot_meta = vfs_shapshot_data.mergeWithWals(std::move(wal_items_batch), snapshot_meta);
|
||||
|
||||
updateShapshotMetadata(new_snaphot_meta, snapshot_meta.znode_version);
|
||||
wal.dropUpTo(wal_items_batch.back().wal.index + 1);
|
||||
|
||||
LOG_DEBUG(log, "Snapshot update finished with new shapshot key {}", new_snaphot_meta.object_storage_key);
|
||||
}
|
||||
}
|
62
src/Disks/ObjectStorages/VFS/VFSGarbageCollector.h
Normal file
62
src/Disks/ObjectStorages/VFS/VFSGarbageCollector.h
Normal file
@ -0,0 +1,62 @@
|
||||
#pragma once
|
||||
#include "AppendLog.h"
|
||||
#include "VFSSnapshot.h"
|
||||
|
||||
#include <vector>
|
||||
#include <Core/BackgroundSchedulePool.h>
|
||||
#include <Disks/ObjectStorages/IObjectStorage.h>
|
||||
#include <Disks/ObjectStorages/VFS/VFSSnapshot.h>
|
||||
#include <IO/ReadBuffer.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <fmt/chrono.h>
|
||||
#include <Common/ZooKeeper/ZooKeeperWithFaultInjection.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class Context;
|
||||
|
||||
struct GarbageCollectorSettings
|
||||
{
|
||||
String zk_gc_path;
|
||||
size_t gc_sleep_ms;
|
||||
double keeper_fault_injection_probability;
|
||||
UInt64 keeper_fault_injection_seed;
|
||||
size_t batch_size;
|
||||
};
|
||||
|
||||
class ZooKeeperWithFaultInjection;
|
||||
using ZooKeeperWithFaultInjectionPtr = std::shared_ptr<ZooKeeperWithFaultInjection>;
|
||||
|
||||
class VFSGarbageCollector : private BackgroundSchedulePoolTaskHolder
|
||||
{
|
||||
public:
|
||||
VFSGarbageCollector(
|
||||
const String & gc_name_,
|
||||
ObjectStoragePtr object_storage_,
|
||||
VFSLog & wal_,
|
||||
BackgroundSchedulePool & pool,
|
||||
const GarbageCollectorSettings & settings_);
|
||||
|
||||
private:
|
||||
void run();
|
||||
void updateSnapshot();
|
||||
|
||||
|
||||
String getZKSnapshotPath() const;
|
||||
// Get current shapshot object path from zookeeper.
|
||||
SnapshotMetadata getSnapshotMetadata() const;
|
||||
void updateShapshotMetadata(const SnapshotMetadata & new_snapshot, int32_t znode_required_version) const;
|
||||
void initGCState() const;
|
||||
ZooKeeperWithFaultInjectionPtr getZookeeper() const;
|
||||
|
||||
|
||||
String gc_name;
|
||||
ObjectStoragePtr object_storage;
|
||||
VFSSnapshotDataFromObjectStorage vfs_shapshot_data;
|
||||
VFSLog & wal;
|
||||
|
||||
const GarbageCollectorSettings settings;
|
||||
LoggerPtr log;
|
||||
};
|
||||
}
|
53
src/Disks/ObjectStorages/VFS/VFSLog.cpp
Normal file
53
src/Disks/ObjectStorages/VFS/VFSLog.cpp
Normal file
@ -0,0 +1,53 @@
|
||||
#include "VFSLog.h"
|
||||
|
||||
#include <Disks/ObjectStorages/VFS/JSONSerializer.h>
|
||||
#include <Common/Concepts.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
VFSLog::VFSLog(const String & log_dir) : wal(log_dir) // TODO: read settings from config
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
UInt64 VFSLog::write(const VFSEvent & event)
|
||||
{
|
||||
return wal.append(JSONSerializer::serialize(event));
|
||||
}
|
||||
|
||||
VFSLogItems VFSLog::read(size_t count) const
|
||||
{
|
||||
VFSLogItems vfs_items;
|
||||
WAL::Entries wal_items = wal.readFront(count);
|
||||
auto wal_id = wal.getID(); // TODO: It makes sense to return wal id from readFront
|
||||
vfs_items.reserve(wal_items.size());
|
||||
|
||||
std::ranges::transform(
|
||||
wal_items,
|
||||
std::back_inserter(vfs_items),
|
||||
[&wal_id](auto && wal_item) -> VFSLogItem
|
||||
{
|
||||
VFSLogItem item;
|
||||
item.event = JSONSerializer::deserialize(wal_item.data);
|
||||
item.wal.index = wal_item.index;
|
||||
item.wal.id = wal_id;
|
||||
|
||||
return item;
|
||||
});
|
||||
|
||||
return vfs_items;
|
||||
}
|
||||
|
||||
size_t VFSLog::dropUpTo(UInt64 index)
|
||||
{
|
||||
return wal.dropUpTo(index);
|
||||
}
|
||||
|
||||
size_t VFSLog::size() const
|
||||
{
|
||||
return wal.size();
|
||||
}
|
||||
|
||||
}
|
65
src/Disks/ObjectStorages/VFS/VFSLog.h
Normal file
65
src/Disks/ObjectStorages/VFS/VFSLog.h
Normal file
@ -0,0 +1,65 @@
|
||||
#pragma once
|
||||
|
||||
#include <Core/UUID.h>
|
||||
#include <Disks/ObjectStorages/VFS/AppendLog.h>
|
||||
|
||||
|
||||
#include <variant>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
enum class VFSAction : uint8_t
|
||||
{
|
||||
LINK,
|
||||
UNLINK,
|
||||
REQUEST
|
||||
};
|
||||
|
||||
|
||||
struct WALInfo
|
||||
{
|
||||
String replica;
|
||||
UUID id;
|
||||
UInt64 index;
|
||||
|
||||
bool operator==(const WALInfo &) const = default;
|
||||
};
|
||||
|
||||
struct VFSEvent
|
||||
{
|
||||
String remote_path;
|
||||
String local_path;
|
||||
std::optional<WALInfo> orig_wal;
|
||||
Poco::Timestamp timestamp;
|
||||
VFSAction action;
|
||||
|
||||
bool operator==(const VFSEvent &) const = default;
|
||||
};
|
||||
|
||||
struct VFSLogItem
|
||||
{
|
||||
VFSEvent event;
|
||||
WALInfo wal;
|
||||
|
||||
bool operator==(const VFSLogItem &) const = default;
|
||||
};
|
||||
|
||||
using VFSLogItems = std::vector<VFSLogItem>;
|
||||
|
||||
class VFSLog
|
||||
{
|
||||
public:
|
||||
VFSLog(const String & log_dir);
|
||||
UInt64 write(const VFSEvent & event);
|
||||
VFSLogItems read(size_t count) const;
|
||||
size_t dropUpTo(UInt64 index);
|
||||
size_t size() const;
|
||||
|
||||
private:
|
||||
WAL::AppendLog wal;
|
||||
};
|
||||
|
||||
using VFSLogPtr = std::shared_ptr<VFSLog>;
|
||||
|
||||
}
|
74
src/Disks/ObjectStorages/VFS/VFSShapshotMetadata.h
Normal file
74
src/Disks/ObjectStorages/VFS/VFSShapshotMetadata.h
Normal file
@ -0,0 +1,74 @@
|
||||
#pragma once
|
||||
|
||||
#include <IO/ReadBuffer.h>
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
|
||||
#include <fmt/chrono.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
struct SnapshotMetadata
|
||||
{
|
||||
uint64_t metadata_version;
|
||||
String object_storage_key;
|
||||
uint64_t total_size;
|
||||
int32_t znode_version;
|
||||
bool is_initial_snaphot;
|
||||
|
||||
SnapshotMetadata(
|
||||
uint64_t metadata_version_ = 0ull,
|
||||
const String & object_storage_key_ = "",
|
||||
uint64_t total_size_ = 0ull,
|
||||
int32_t znode_version_ = -1,
|
||||
bool is_initial_ = false)
|
||||
: metadata_version(metadata_version_)
|
||||
, object_storage_key(object_storage_key_)
|
||||
, total_size(total_size_)
|
||||
, znode_version(znode_version_)
|
||||
, is_initial_snaphot(is_initial_)
|
||||
{
|
||||
}
|
||||
|
||||
SnapshotMetadata(
|
||||
const String & object_storage_key_,
|
||||
uint64_t metadata_version_ = 0ull,
|
||||
uint64_t total_size_ = 0ull,
|
||||
int32_t znode_version_ = -1,
|
||||
bool is_initial_ = false)
|
||||
: metadata_version(metadata_version_)
|
||||
, object_storage_key(object_storage_key_)
|
||||
, total_size(total_size_)
|
||||
, znode_version(znode_version_)
|
||||
, is_initial_snaphot(is_initial_)
|
||||
{
|
||||
}
|
||||
|
||||
void update(const String & new_snapshot_key) { object_storage_key = new_snapshot_key; }
|
||||
|
||||
String serialize() const { return fmt::format("{} {} {} ", metadata_version, object_storage_key, total_size); }
|
||||
|
||||
static SnapshotMetadata deserialize(const String & str, int32_t znode_version)
|
||||
{
|
||||
SnapshotMetadata result;
|
||||
result.znode_version = znode_version;
|
||||
/// In case of initial snaphot, the content will be empty.
|
||||
if (str.empty())
|
||||
{
|
||||
result.is_initial_snaphot = true;
|
||||
return result;
|
||||
}
|
||||
ReadBufferFromString rb(str);
|
||||
|
||||
readIntTextUnsafe(result.metadata_version, rb);
|
||||
checkChar(' ', rb);
|
||||
readStringUntilWhitespace(result.object_storage_key, rb);
|
||||
checkChar(' ', rb);
|
||||
readIntTextUnsafe(result.total_size, rb);
|
||||
result.is_initial_snaphot = false;
|
||||
return result;
|
||||
}
|
||||
};
|
||||
}
|
200
src/Disks/ObjectStorages/VFS/VFSSnapshot.cpp
Normal file
200
src/Disks/ObjectStorages/VFS/VFSSnapshot.cpp
Normal file
@ -0,0 +1,200 @@
|
||||
#include "VFSSnapshot.h"
|
||||
#include <algorithm>
|
||||
#include <iostream>
|
||||
#include <Disks/ObjectStorages/IObjectStorage.h>
|
||||
#include <IO/ReadBufferFromEmptyFile.h>
|
||||
|
||||
#include "IO/ReadBufferFromFileBase.h"
|
||||
#include "IO/ReadHelpers.h"
|
||||
#include "IO/WriteHelpers.h"
|
||||
|
||||
namespace ProfileEvents
|
||||
{
|
||||
extern const Event VFSGcCumulativeSnapshotBytesRead;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
}
|
||||
|
||||
bool VFSSnapshotEntry::operator==(const VFSSnapshotEntry & entry) const
|
||||
{
|
||||
return remote_path == entry.remote_path && link_count == entry.link_count;
|
||||
}
|
||||
|
||||
std::optional<VFSSnapshotEntry> VFSSnapshotEntry::deserialize(ReadBuffer & buf)
|
||||
{
|
||||
if (buf.eof())
|
||||
return std::nullopt;
|
||||
|
||||
VFSSnapshotEntry entry;
|
||||
|
||||
readStringUntilWhitespace(entry.remote_path, buf);
|
||||
checkChar(' ', buf);
|
||||
readIntTextUnsafe(entry.link_count, buf);
|
||||
checkChar('\n', buf);
|
||||
|
||||
return entry;
|
||||
}
|
||||
|
||||
void VFSSnapshotEntry::serialize(WriteBuffer & buf) const
|
||||
{
|
||||
writeString(fmt::format("{} {}\n", remote_path, link_count), buf);
|
||||
}
|
||||
|
||||
|
||||
void VFSSnapshotDataBase::writeEntryInSnaphot(
|
||||
const VFSSnapshotEntry & entry, WriteBuffer & write_buffer, VFSSnapshotEntries & entries_to_remove)
|
||||
{
|
||||
if (entry.link_count < 0)
|
||||
{
|
||||
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Broken links count for file with remote path {}.", entry.remote_path);
|
||||
}
|
||||
else if (entry.link_count == 0)
|
||||
{
|
||||
entries_to_remove.emplace_back(entry);
|
||||
}
|
||||
else if (entry.link_count > 0)
|
||||
{
|
||||
entry.serialize(write_buffer);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
SnapshotMetadata VFSSnapshotDataBase::mergeWithWals(VFSLogItems && wal_items, const SnapshotMetadata & old_snapshot_meta)
|
||||
{
|
||||
/// For most of object stroges (like s3 or azure) we don't need the object path, it's generated randomly.
|
||||
/// But other ones reqiested to set it manually.
|
||||
std::unique_ptr<ReadBuffer> shapshot_read_buffer = getShapshotReadBuffer(old_snapshot_meta);
|
||||
auto [new_shapshot_write_buffer, new_object_key] = getShapshotWriteBufferAndSnaphotObject(old_snapshot_meta);
|
||||
|
||||
LOG_DEBUG(log, "Going to merge WAL batch(size {}) with snapshot (key {})", wal_items.size(), old_snapshot_meta.object_storage_key);
|
||||
auto entires_to_remove = mergeWithWalsImpl(std::move(wal_items), *shapshot_read_buffer, *new_shapshot_write_buffer);
|
||||
SnapshotMetadata new_snaphot_meta(new_object_key);
|
||||
|
||||
LOG_DEBUG(log, "Merge snapshot have finished, going to remove {} from object storage", entires_to_remove.size());
|
||||
removeShapshotEntires(entires_to_remove);
|
||||
return new_snaphot_meta;
|
||||
}
|
||||
|
||||
|
||||
VFSSnapshotEntries VFSSnapshotDataBase::mergeWithWalsImpl(VFSLogItems && wal_items_, ReadBuffer & read_buffer, WriteBuffer & write_buffer)
|
||||
{
|
||||
VFSSnapshotEntries entries_to_remove;
|
||||
|
||||
auto wal_items = std::move(wal_items_);
|
||||
std::ranges::sort(
|
||||
wal_items, [](const VFSLogItem & left, const VFSLogItem & right) { return left.event.remote_path < right.event.remote_path; });
|
||||
|
||||
|
||||
auto current_snapshot_entry = VFSSnapshotEntry::deserialize(read_buffer);
|
||||
|
||||
// Implementation similar to the merge operation:
|
||||
// Iterating thought 2 sorted vectors.
|
||||
// If the links count will be == 0, then add to entries_to_remove
|
||||
// Else perform sum and append into shapshot
|
||||
for (auto wal_iterator = wal_items.begin(); wal_iterator != wal_items.end();)
|
||||
{
|
||||
// Combine all wal items with the same remote_path into single one.
|
||||
VFSSnapshotEntry entry_to_merge = {wal_iterator->event.remote_path, 0};
|
||||
while (wal_iterator != wal_items.end() && wal_iterator->event.remote_path == entry_to_merge.remote_path)
|
||||
{
|
||||
int delta_link_count = 0; // TODO: remove delta and save local_path in the snapshot
|
||||
switch (wal_iterator->event.action)
|
||||
{
|
||||
case VFSAction::LINK:
|
||||
case VFSAction::REQUEST:
|
||||
delta_link_count = 1;
|
||||
break;
|
||||
case VFSAction::UNLINK:
|
||||
delta_link_count = -1;
|
||||
break;
|
||||
}
|
||||
|
||||
entry_to_merge.link_count += delta_link_count;
|
||||
++wal_iterator;
|
||||
}
|
||||
|
||||
// Write and skip entries from snaphot which we won't update
|
||||
while (current_snapshot_entry && current_snapshot_entry->remote_path < entry_to_merge.remote_path)
|
||||
{
|
||||
auto next_snapshot_entry = VFSSnapshotEntry::deserialize(read_buffer);
|
||||
if (next_snapshot_entry && current_snapshot_entry->remote_path > next_snapshot_entry->remote_path)
|
||||
{
|
||||
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "VFS snapshot is not sorted.");
|
||||
}
|
||||
current_snapshot_entry->serialize(write_buffer);
|
||||
std::swap(current_snapshot_entry, next_snapshot_entry);
|
||||
}
|
||||
|
||||
if (!current_snapshot_entry || current_snapshot_entry->remote_path > entry_to_merge.remote_path)
|
||||
{
|
||||
writeEntryInSnaphot(entry_to_merge, write_buffer, entries_to_remove);
|
||||
continue;
|
||||
}
|
||||
else if (current_snapshot_entry->remote_path == entry_to_merge.remote_path)
|
||||
{
|
||||
current_snapshot_entry->link_count += entry_to_merge.link_count;
|
||||
writeEntryInSnaphot(*current_snapshot_entry, write_buffer, entries_to_remove);
|
||||
current_snapshot_entry = VFSSnapshotEntry::deserialize(read_buffer);
|
||||
}
|
||||
else
|
||||
{
|
||||
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Unreachable");
|
||||
}
|
||||
}
|
||||
|
||||
while (current_snapshot_entry)
|
||||
{
|
||||
current_snapshot_entry->serialize(write_buffer);
|
||||
auto next_snapshot_entry = VFSSnapshotEntry::deserialize(read_buffer);
|
||||
|
||||
if (next_snapshot_entry && current_snapshot_entry->remote_path > next_snapshot_entry->remote_path)
|
||||
{
|
||||
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "VFS snapshot is not sorted.");
|
||||
}
|
||||
std::swap(current_snapshot_entry, next_snapshot_entry);
|
||||
}
|
||||
write_buffer.finalize();
|
||||
return entries_to_remove;
|
||||
}
|
||||
|
||||
std::unique_ptr<ReadBuffer> VFSSnapshotDataFromObjectStorage::getShapshotReadBuffer(const SnapshotMetadata & snapshot_meta) const
|
||||
{
|
||||
if (!snapshot_meta.is_initial_snaphot)
|
||||
{
|
||||
StoredObject object(snapshot_meta.object_storage_key, "", snapshot_meta.total_size);
|
||||
// to do read settings.
|
||||
auto res = object_storage->readObject(object, {});
|
||||
return res;
|
||||
}
|
||||
return std::make_unique<ReadBufferFromEmptyFile>();
|
||||
}
|
||||
|
||||
std::pair<std::unique_ptr<WriteBuffer>, String>
|
||||
VFSSnapshotDataFromObjectStorage::getShapshotWriteBufferAndSnaphotObject(const SnapshotMetadata & snapshot_meta) const
|
||||
{
|
||||
String new_object_path = fmt::format("/vfs_shapshots/shapshot_{}", snapshot_meta.znode_version + 1);
|
||||
auto new_object_key = object_storage->generateObjectKeyForPath(new_object_path);
|
||||
StoredObject new_object(new_object_key.serialize());
|
||||
std::unique_ptr<WriteBuffer> new_shapshot_write_buffer = object_storage->writeObject(new_object, WriteMode::Rewrite);
|
||||
|
||||
return {std::move(new_shapshot_write_buffer), new_object_key.serialize()};
|
||||
}
|
||||
|
||||
void VFSSnapshotDataFromObjectStorage::removeShapshotEntires(const VFSSnapshotEntries & entires_to_remove)
|
||||
{
|
||||
StoredObjects objects_to_remove;
|
||||
objects_to_remove.reserve(entires_to_remove.size());
|
||||
|
||||
for (const auto & entry : entires_to_remove)
|
||||
{
|
||||
objects_to_remove.emplace_back(entry.remote_path);
|
||||
}
|
||||
object_storage->removeObjectsIfExist(objects_to_remove);
|
||||
}
|
||||
|
||||
}
|
76
src/Disks/ObjectStorages/VFS/VFSSnapshot.h
Normal file
76
src/Disks/ObjectStorages/VFS/VFSSnapshot.h
Normal file
@ -0,0 +1,76 @@
|
||||
#pragma once
|
||||
#include <Disks/ObjectStorages/IObjectStorage.h>
|
||||
#include "AppendLog.h"
|
||||
#include "IO/ReadHelpers.h"
|
||||
#include "VFSShapshotMetadata.h"
|
||||
#include "VFSLog.h"
|
||||
|
||||
#include <fstream>
|
||||
#include <optional>
|
||||
#include <IO/Lz4DeflatingWriteBuffer.h>
|
||||
#include <IO/Lz4InflatingReadBuffer.h>
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
#include <fmt/chrono.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
struct VFSSnapshotEntry
|
||||
{
|
||||
String remote_path;
|
||||
Int32 link_count = 0;
|
||||
|
||||
bool operator==(const VFSSnapshotEntry & entry) const;
|
||||
static std::optional<VFSSnapshotEntry> deserialize(ReadBuffer & buf);
|
||||
void serialize(WriteBuffer & buf) const;
|
||||
};
|
||||
|
||||
using VFSSnapshotEntries = std::vector<VFSSnapshotEntry>;
|
||||
|
||||
class VFSSnapshotDataBase
|
||||
{
|
||||
public:
|
||||
VFSSnapshotDataBase() : log(getLogger("VFSSnapshotDataBase(unnamed)")) { }
|
||||
virtual ~VFSSnapshotDataBase() = default;
|
||||
|
||||
VFSSnapshotDataBase(const VFSSnapshotDataBase &) = delete;
|
||||
VFSSnapshotDataBase(VFSSnapshotDataBase &&) = delete;
|
||||
|
||||
/// Apply wal on the current snapshot and returns metadata of the updated one.
|
||||
SnapshotMetadata mergeWithWals(VFSLogItems && wal_items, const SnapshotMetadata & old_snapshot_meta);
|
||||
|
||||
private:
|
||||
void writeEntryInSnaphot(const VFSSnapshotEntry & entry, WriteBuffer & write_buffer, VFSSnapshotEntries & entries_to_remove);
|
||||
VFSSnapshotEntries mergeWithWalsImpl(VFSLogItems && wal_items, ReadBuffer & read_buffer, WriteBuffer & write_buffer);
|
||||
|
||||
LoggerPtr log;
|
||||
|
||||
protected:
|
||||
/// Methods with storage-specific logic.
|
||||
virtual void removeShapshotEntires(const VFSSnapshotEntries & entires_to_remove) = 0;
|
||||
virtual std::unique_ptr<ReadBuffer> getShapshotReadBuffer(const SnapshotMetadata & snapshot_meta) const = 0;
|
||||
virtual std::pair<std::unique_ptr<WriteBuffer>, String>
|
||||
getShapshotWriteBufferAndSnaphotObject(const SnapshotMetadata & snapshot_meta) const = 0;
|
||||
};
|
||||
|
||||
class VFSSnapshotDataFromObjectStorage : public VFSSnapshotDataBase
|
||||
{
|
||||
public:
|
||||
VFSSnapshotDataFromObjectStorage(ObjectStoragePtr object_storage_, const String & name_)
|
||||
: object_storage(object_storage_), log(getLogger(fmt::format("VFSSnapshotDataFromObjectStorage({})", name_)))
|
||||
{
|
||||
}
|
||||
|
||||
protected:
|
||||
void removeShapshotEntires(const VFSSnapshotEntries & entires_to_remove) override;
|
||||
std::unique_ptr<ReadBuffer> getShapshotReadBuffer(const SnapshotMetadata & snapshot_meta) const override;
|
||||
std::pair<std::unique_ptr<WriteBuffer>, String>
|
||||
getShapshotWriteBufferAndSnaphotObject(const SnapshotMetadata & snapshot_meta) const override;
|
||||
|
||||
private:
|
||||
ObjectStoragePtr object_storage;
|
||||
LoggerPtr log;
|
||||
};
|
||||
|
||||
}
|
195
src/Disks/ObjectStorages/VFS/tests/gtest_append_log.cpp
Normal file
195
src/Disks/ObjectStorages/VFS/tests/gtest_append_log.cpp
Normal file
@ -0,0 +1,195 @@
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <Disks/DiskLocal.h>
|
||||
#include <Disks/IDisk.h>
|
||||
#include <Disks/ObjectStorages/VFS/AppendLog.h>
|
||||
#include <Poco/Checksum.h>
|
||||
|
||||
#include <filesystem>
|
||||
|
||||
using namespace DB;
|
||||
namespace fs = std::filesystem;
|
||||
|
||||
const fs::path log_dir = "tmp/";
|
||||
const std::vector<char> msg{'T', 'e', 's', 't'};
|
||||
const std::vector<char> msg1{'T', 'e', 's', 't', '1'};
|
||||
const std::vector<char> msg2{'T', 'e', 's', 't', '2'};
|
||||
const std::vector<char> msg3{'T', 'e', 's', 't', '3'};
|
||||
|
||||
|
||||
class WALTest : public ::testing::Test
|
||||
{
|
||||
protected:
|
||||
void SetUp() override { fs::remove_all(log_dir); }
|
||||
//void TearDown() override { SetUp(); }
|
||||
};
|
||||
|
||||
|
||||
TEST_F(WALTest, Main)
|
||||
{
|
||||
DB::WAL::AppendLog alog(log_dir);
|
||||
|
||||
auto idx = alog.append("test message");
|
||||
EXPECT_EQ(idx, 0);
|
||||
|
||||
idx = alog.append(msg);
|
||||
EXPECT_EQ(idx, 1);
|
||||
EXPECT_EQ(alog.size(), 2);
|
||||
}
|
||||
|
||||
TEST_F(WALTest, TestEntryChecksum)
|
||||
{
|
||||
WAL::AppendLog alog(log_dir);
|
||||
|
||||
UInt64 expected_index = 0;
|
||||
Poco::Checksum crc(Poco::Checksum::Type::TYPE_CRC32);
|
||||
crc.update(reinterpret_cast<const char *>(&expected_index), sizeof(expected_index));
|
||||
crc.update(msg.data(), static_cast<unsigned int>(msg.size()));
|
||||
|
||||
alog.append(msg);
|
||||
auto entries = alog.readFront(1);
|
||||
EXPECT_EQ(entries[0].checksum, crc.checksum());
|
||||
}
|
||||
|
||||
TEST_F(WALTest, TestLastIndex)
|
||||
{
|
||||
{
|
||||
WAL::AppendLog alog(log_dir);
|
||||
|
||||
alog.append(msg);
|
||||
alog.append(msg);
|
||||
alog.append(msg);
|
||||
|
||||
ASSERT_EQ(alog.getNextIndex(), 3);
|
||||
}
|
||||
|
||||
{
|
||||
WAL::AppendLog alog(log_dir);
|
||||
alog.append(msg);
|
||||
|
||||
ASSERT_EQ(alog.getNextIndex(), 4);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(WALTest, TestReadFront)
|
||||
{
|
||||
WAL::AppendLog alog(log_dir);
|
||||
|
||||
alog.append(msg);
|
||||
WAL::Entries entries = alog.readFront(1);
|
||||
|
||||
EXPECT_EQ(entries.size(), 1);
|
||||
EXPECT_EQ(entries[0].index, 0);
|
||||
EXPECT_EQ(entries[0].data, msg);
|
||||
}
|
||||
|
||||
TEST_F(WALTest, TestReadFrontMultiple)
|
||||
{
|
||||
WAL::AppendLog alog(log_dir);
|
||||
|
||||
alog.append(msg1);
|
||||
alog.append(msg2);
|
||||
|
||||
{
|
||||
WAL::Entries entries = alog.readFront(1);
|
||||
EXPECT_EQ(entries[0].data, msg1);
|
||||
EXPECT_EQ(entries[0].index, 0);
|
||||
}
|
||||
|
||||
{
|
||||
WAL::Entries entries = alog.readFront(2);
|
||||
EXPECT_EQ(entries[0].data, msg1);
|
||||
EXPECT_EQ(entries[0].index, 0);
|
||||
EXPECT_EQ(entries[1].data, msg2);
|
||||
EXPECT_EQ(entries[1].index, 1);
|
||||
}
|
||||
|
||||
{
|
||||
WAL::Entries entries = alog.readFront(3);
|
||||
EXPECT_EQ(entries.size(), 2);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(WALTest, TestDrop)
|
||||
{
|
||||
{
|
||||
WAL::AppendLog alog(log_dir);
|
||||
|
||||
alog.append(msg1);
|
||||
alog.append(msg2);
|
||||
size_t dropped = alog.dropUpTo(1);
|
||||
|
||||
EXPECT_EQ(dropped, 1);
|
||||
EXPECT_EQ(alog.size(), 1);
|
||||
EXPECT_EQ(alog.getNextIndex(), 2);
|
||||
}
|
||||
|
||||
{
|
||||
WAL::AppendLog alog(log_dir);
|
||||
|
||||
EXPECT_EQ(alog.size(), 1);
|
||||
EXPECT_EQ(alog.getNextIndex(), 2);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(WALTest, TestReadAfterDrop)
|
||||
{
|
||||
WAL::AppendLog alog(log_dir);
|
||||
alog.append(msg1);
|
||||
alog.append(msg2);
|
||||
|
||||
size_t dropped = alog.dropUpTo(1);
|
||||
EXPECT_EQ(dropped, 1);
|
||||
EXPECT_EQ(alog.size(), 1);
|
||||
|
||||
WAL::Entries entries = alog.readFront(1);
|
||||
EXPECT_EQ(entries.size(), 1);
|
||||
EXPECT_EQ(entries[0].index, 1);
|
||||
|
||||
alog.dropUpTo(2);
|
||||
entries = alog.readFront(1);
|
||||
EXPECT_EQ(entries.size(), 0);
|
||||
EXPECT_EQ(alog.size(), 0);
|
||||
}
|
||||
|
||||
TEST_F(WALTest, TestSegmentRotation)
|
||||
{
|
||||
WAL::AppendLog alog(log_dir, WAL::Settings{.max_segment_size = 100});
|
||||
std::vector<char> buf_1(50, 0xAB);
|
||||
std::vector<char> buf_2(50, 0xCD);
|
||||
alog.append(buf_1);
|
||||
alog.append(buf_2);
|
||||
|
||||
EXPECT_EQ(alog.segmentsCount(), 2);
|
||||
|
||||
WAL::Entries entries = alog.readFront(2);
|
||||
EXPECT_EQ(entries.size(), 2);
|
||||
EXPECT_EQ(entries[0].data, buf_1);
|
||||
EXPECT_EQ(entries[0].index, 0);
|
||||
EXPECT_EQ(entries[1].data, buf_2);
|
||||
EXPECT_EQ(entries[1].index, 1);
|
||||
}
|
||||
|
||||
TEST_F(WALTest, TestCorruptedException)
|
||||
{
|
||||
fs::path segment_path = log_dir / "log_00000000000000000000";
|
||||
WAL::AppendLog alog(log_dir, WAL::Settings{.max_segment_size = 100});
|
||||
alog.append(msg);
|
||||
|
||||
// Break size byte in the segment file
|
||||
DB::WriteBufferFromFile buf(segment_path);
|
||||
buf.write(0x01);
|
||||
buf.sync();
|
||||
|
||||
EXPECT_NO_THROW(alog.size());
|
||||
EXPECT_THROW(alog.readFront(1), DB::Exception);
|
||||
|
||||
// Now all public methods fails
|
||||
EXPECT_THROW(alog.readFront(1), DB::Exception);
|
||||
EXPECT_THROW(alog.append(msg), DB::Exception);
|
||||
EXPECT_THROW(alog.size(), DB::Exception);
|
||||
EXPECT_THROW(alog.dropUpTo(1), DB::Exception);
|
||||
EXPECT_THROW(alog.getNextIndex(), DB::Exception);
|
||||
EXPECT_THROW(alog.getStartIndex(), DB::Exception);
|
||||
EXPECT_THROW(alog.segmentsCount(), DB::Exception);
|
||||
}
|
150
src/Disks/ObjectStorages/VFS/tests/gtest_snaphot_merge.cpp
Normal file
150
src/Disks/ObjectStorages/VFS/tests/gtest_snaphot_merge.cpp
Normal file
@ -0,0 +1,150 @@
|
||||
#include <algorithm>
|
||||
#include <iostream>
|
||||
#include <Disks/ObjectStorages/VFS/VFSShapshotMetadata.h>
|
||||
#include <Disks/ObjectStorages/VFS/VFSSnapshot.h>
|
||||
#include <IO/WriteBufferFromString.h>
|
||||
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
|
||||
using namespace DB;
|
||||
|
||||
class VFSSnapshotDataFromString : public DB::VFSSnapshotDataBase
|
||||
{
|
||||
public:
|
||||
VFSSnapshotDataFromString() = default;
|
||||
|
||||
void setSnaphotData(String str) { old_snapshot_data = str; }
|
||||
String getSnapshotdata() const { return new_snaphot_data; }
|
||||
VFSSnapshotEntries getEnriesToRemoveAfterLastMerge() const { return latest_entires_to_remove; }
|
||||
|
||||
protected:
|
||||
std::unique_ptr<ReadBuffer> getShapshotReadBuffer(const SnapshotMetadata &) const override
|
||||
{
|
||||
return std::unique_ptr<ReadBuffer>(new ReadBufferFromString(old_snapshot_data));
|
||||
}
|
||||
std::pair<std::unique_ptr<WriteBuffer>, String> getShapshotWriteBufferAndSnaphotObject(const SnapshotMetadata &) const override
|
||||
{
|
||||
String key_placeholed = "test";
|
||||
auto write_buffer = std::unique_ptr<WriteBuffer>(new WriteBufferFromString(new_snaphot_data));
|
||||
return {std::move(write_buffer), key_placeholed};
|
||||
}
|
||||
|
||||
void removeShapshotEntires(const VFSSnapshotEntries & entires_to_remove) override { latest_entires_to_remove = entires_to_remove; }
|
||||
|
||||
private:
|
||||
String old_snapshot_data;
|
||||
mutable String new_snaphot_data;
|
||||
VFSSnapshotEntries latest_entires_to_remove;
|
||||
};
|
||||
|
||||
|
||||
String convertSnaphotEntiesToString(const VFSSnapshotEntries & entries)
|
||||
{
|
||||
String serialized;
|
||||
{
|
||||
WriteBufferFromString wb(serialized);
|
||||
for (const auto & entry : entries)
|
||||
{
|
||||
entry.serialize(wb);
|
||||
}
|
||||
}
|
||||
return serialized;
|
||||
}
|
||||
|
||||
TEST(DiskObjectStorageVFS, SnaphotEntriesSerialization)
|
||||
{
|
||||
VFSSnapshotEntries entries = {{"/b", 1}, {"/a", 1}};
|
||||
|
||||
String serialized = convertSnaphotEntiesToString(entries);
|
||||
|
||||
String expected_serialized = "/b 1\n/a 1\n";
|
||||
EXPECT_EQ(expected_serialized, serialized);
|
||||
|
||||
VFSSnapshotEntries deserialized;
|
||||
{
|
||||
ReadBufferFromString rb(serialized);
|
||||
|
||||
while (auto entry = VFSSnapshotEntry::deserialize(rb))
|
||||
{
|
||||
deserialized.emplace_back(*entry);
|
||||
}
|
||||
}
|
||||
EXPECT_EQ(deserialized.size(), entries.size());
|
||||
EXPECT_EQ(deserialized, entries);
|
||||
}
|
||||
|
||||
TEST(DiskObjectStorageVFS, ExceptionOnUnsortedSnapshot)
|
||||
{
|
||||
VFSSnapshotEntries entries = {{"/b", 1}, {"/a", 1}, {"/c", 2}};
|
||||
VFSSnapshotDataFromString snapshot_data;
|
||||
snapshot_data.setSnaphotData(convertSnaphotEntiesToString(entries));
|
||||
|
||||
SnapshotMetadata placeholder_metadata;
|
||||
|
||||
EXPECT_THROW(auto new_snapshot = snapshot_data.mergeWithWals({}, placeholder_metadata), Exception) << "Snaphot is unsorted";
|
||||
}
|
||||
|
||||
// TEST(DiskObjectStorageVFS, MergeWalWithSnaphot)
|
||||
// {
|
||||
// WALItems wals = {{"/c", 1}, {"/d", 2}, {"/b", 1}};
|
||||
|
||||
// String expected_snaphot_state;
|
||||
// VFSSnapshotDataFromString snapshot_data;
|
||||
// SnapshotMetadata placeholder_metadata;
|
||||
// {
|
||||
// snapshot_data.setSnaphotData("");
|
||||
// snapshot_data.mergeWithWals(wals, placeholder_metadata);
|
||||
|
||||
// auto to_remove = snapshot_data.getEnriesToRemoveAfterLastMerge();
|
||||
// VFSSnapshotEntries expected_entries_to_remove = {};
|
||||
|
||||
// EXPECT_EQ(expected_entries_to_remove, to_remove);
|
||||
// }
|
||||
// expected_snaphot_state = "/b 1\n/c 1\n/d 2\n";
|
||||
// EXPECT_EQ(expected_snaphot_state, snapshot_data.getSnapshotdata());
|
||||
|
||||
// wals = {{"/c", -1}, {"/d", -1}, {"/d", -1}, {"/e", -1}, {"/e", 1}, {"/f", 1}, {"/f", 1}, {"/f", -1}, {"/f", 1}};
|
||||
// {
|
||||
// snapshot_data.setSnaphotData(expected_snaphot_state);
|
||||
// snapshot_data.mergeWithWals(wals, placeholder_metadata);
|
||||
|
||||
// auto to_remove = snapshot_data.getEnriesToRemoveAfterLastMerge();
|
||||
// VFSSnapshotEntries expected_entries_to_remove = {{"/c", 0}, {"/d", 0}, {"/e", 0}};
|
||||
|
||||
// EXPECT_EQ(expected_entries_to_remove, to_remove);
|
||||
// }
|
||||
// expected_snaphot_state = "/b 1\n/f 2\n";
|
||||
// EXPECT_EQ(expected_snaphot_state, snapshot_data.getSnapshotdata());
|
||||
|
||||
// wals = {
|
||||
// {"/a", 1},
|
||||
// {"/b", 1},
|
||||
// {"/c", 1},
|
||||
// };
|
||||
|
||||
// {
|
||||
// snapshot_data.setSnaphotData(expected_snaphot_state);
|
||||
|
||||
// snapshot_data.mergeWithWals(wals, placeholder_metadata);
|
||||
// auto to_remove = snapshot_data.getEnriesToRemoveAfterLastMerge();
|
||||
// VFSSnapshotEntries expected_entries_to_remove;
|
||||
|
||||
// EXPECT_EQ(expected_entries_to_remove, to_remove);
|
||||
// }
|
||||
// expected_snaphot_state = "/a 1\n/b 2\n/c 1\n/f 2\n";
|
||||
// EXPECT_EQ(expected_snaphot_state, snapshot_data.getSnapshotdata());
|
||||
|
||||
// wals = {{"/a", -1}, {"/b", -2}, {"/c", -1}, {"/f", -2}};
|
||||
// {
|
||||
// snapshot_data.setSnaphotData(expected_snaphot_state);
|
||||
// snapshot_data.mergeWithWals(wals, placeholder_metadata);
|
||||
|
||||
// auto to_remove = snapshot_data.getEnriesToRemoveAfterLastMerge();
|
||||
// VFSSnapshotEntries expected_entries_to_remove = {{"/a", 0}, {"/b", 0}, {"/c", 0}, {"/f", 0}};
|
||||
|
||||
// EXPECT_EQ(expected_entries_to_remove, to_remove);
|
||||
// }
|
||||
// expected_snaphot_state = "";
|
||||
// EXPECT_EQ(expected_snaphot_state, snapshot_data.getSnapshotdata());
|
||||
// }
|
60
src/Disks/ObjectStorages/VFS/tests/gtest_vfs_log.cpp
Normal file
60
src/Disks/ObjectStorages/VFS/tests/gtest_vfs_log.cpp
Normal file
@ -0,0 +1,60 @@
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
#include <Disks/DiskLocal.h>
|
||||
#include <Disks/ObjectStorages/VFS/JSONSerializer.h>
|
||||
#include <Disks/ObjectStorages/VFS/VFSLog.h>
|
||||
|
||||
#include <Poco/Checksum.h>
|
||||
|
||||
#include <filesystem>
|
||||
|
||||
using namespace DB;
|
||||
namespace fs = std::filesystem;
|
||||
|
||||
const fs::path log_dir = "tmp/";
|
||||
const String remote_path = "/cloud_storage/remote/path";
|
||||
const String local_path = "/table/path/to/part/file";
|
||||
|
||||
|
||||
class VFSLogTest : public ::testing::Test
|
||||
{
|
||||
protected:
|
||||
void SetUp() override { fs::remove_all(log_dir); }
|
||||
};
|
||||
|
||||
TEST_F(VFSLogTest, Serialization)
|
||||
{
|
||||
{
|
||||
VFSEvent event_in{remote_path, local_path, {}, Poco::Timestamp(), VFSAction::LINK};
|
||||
|
||||
auto buf = JSONSerializer::serialize(event_in);
|
||||
VFSEvent event_out = JSONSerializer::deserialize(buf);
|
||||
|
||||
EXPECT_EQ(event_in, event_out);
|
||||
}
|
||||
|
||||
{
|
||||
VFSEvent event_in{remote_path, local_path, WALInfo{"src_replica", UUIDHelpers::generateV4(), 1}, Poco::Timestamp(), VFSAction::REQUEST};
|
||||
|
||||
auto buf = JSONSerializer::serialize(event_in);
|
||||
VFSEvent event_out = JSONSerializer::deserialize(buf);
|
||||
|
||||
EXPECT_EQ(event_in, event_out);
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(VFSLogTest, Main)
|
||||
{
|
||||
DB::VFSLog vfs_log(log_dir);
|
||||
VFSEvent event{remote_path, local_path, {}, Poco::Timestamp(), VFSAction::LINK};
|
||||
|
||||
vfs_log.write(event);
|
||||
EXPECT_EQ(vfs_log.size(), 1);
|
||||
|
||||
VFSLogItems items = vfs_log.read(1);
|
||||
EXPECT_EQ(items.size(), 1);
|
||||
EXPECT_EQ(items[0].wal.index, 0);
|
||||
|
||||
vfs_log.dropUpTo(1);
|
||||
EXPECT_EQ(vfs_log.size(), 0);
|
||||
}
|
@ -722,7 +722,14 @@ void ASTFunction::formatImplWithoutAlias(const FormatSettings & settings, Format
|
||||
assert_cast<const ASTFunction *>(argument.get())->arguments->children[0]->formatImpl(settings, state, nested_dont_need_parens);
|
||||
settings.ostr << (settings.hilite ? hilite_operator : "") << " = " << (settings.hilite ? hilite_none : "");
|
||||
}
|
||||
settings.ostr << "'[HIDDEN]'";
|
||||
if (!secret_arguments.replacement.empty())
|
||||
{
|
||||
settings.ostr << "'" << secret_arguments.replacement << "'";
|
||||
}
|
||||
else
|
||||
{
|
||||
settings.ostr << "'[HIDDEN]'";
|
||||
}
|
||||
if (size <= secret_arguments.start + secret_arguments.count && !secret_arguments.are_named)
|
||||
break; /// All other arguments should also be hidden.
|
||||
continue;
|
||||
|
@ -1,6 +1,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <Common/KnownObjectNames.h>
|
||||
#include <Common/re2.h>
|
||||
#include <Core/QualifiedTableName.h>
|
||||
#include <base/defines.h>
|
||||
#include <boost/algorithm/string/predicate.hpp>
|
||||
@ -49,6 +50,11 @@ public:
|
||||
bool are_named = false; /// Arguments like `password = 'password'` are considered as named arguments.
|
||||
/// E.g. "headers" in `url('..', headers('foo' = '[HIDDEN]'))`
|
||||
std::vector<std::string> nested_maps;
|
||||
/// Full replacement of an argument. Only supported when count is 1, otherwise all arguments will be replaced with this string.
|
||||
/// It's needed in cases when we don't want to hide the entire parameter, but some part of it, e.g. "connection_string" in
|
||||
/// `azureBlobStorage('DefaultEndpointsProtocol=https;AccountKey=secretkey;...', ...)` should be replaced with
|
||||
/// `azureBlobStorage('DefaultEndpointsProtocol=https;AccountKey=[HIDDEN];...', ...)`.
|
||||
std::string replacement;
|
||||
|
||||
bool hasSecrets() const
|
||||
{
|
||||
@ -74,6 +80,7 @@ protected:
|
||||
result.are_named = argument_is_named;
|
||||
}
|
||||
chassert(index >= result.start); /// We always check arguments consecutively
|
||||
chassert(result.replacement.empty()); /// We shouldn't use replacement with masking other arguments
|
||||
result.count = index + 1 - result.start;
|
||||
if (!argument_is_named)
|
||||
result.are_named = false;
|
||||
@ -199,32 +206,39 @@ protected:
|
||||
|
||||
void findAzureBlobStorageFunctionSecretArguments(bool is_cluster_function)
|
||||
{
|
||||
/// azureBlobStorage('cluster_name', 'conn_string/storage_account_url', ...) has 'conn_string/storage_account_url' as its second argument.
|
||||
/// azureBlobStorageCluster('cluster_name', 'conn_string/storage_account_url', ...) has 'conn_string/storage_account_url' as its second argument.
|
||||
size_t url_arg_idx = is_cluster_function ? 1 : 0;
|
||||
|
||||
if (!is_cluster_function && isNamedCollectionName(0))
|
||||
{
|
||||
/// azureBlobStorage(named_collection, ..., account_key = 'account_key', ...)
|
||||
if (maskAzureConnectionString(-1, true, 1))
|
||||
return;
|
||||
findSecretNamedArgument("account_key", 1);
|
||||
return;
|
||||
}
|
||||
else if (is_cluster_function && isNamedCollectionName(1))
|
||||
{
|
||||
/// azureBlobStorageCluster(cluster, named_collection, ..., account_key = 'account_key', ...)
|
||||
if (maskAzureConnectionString(-1, true, 2))
|
||||
return;
|
||||
findSecretNamedArgument("account_key", 2);
|
||||
return;
|
||||
}
|
||||
|
||||
/// We should check other arguments first because we don't need to do any replacement in case storage_account_url is not used
|
||||
/// azureBlobStorage(connection_string|storage_account_url, container_name, blobpath, account_name, account_key, format, compression, structure)
|
||||
/// azureBlobStorageCluster(cluster, connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression, structure])
|
||||
if (maskAzureConnectionString(url_arg_idx))
|
||||
return;
|
||||
|
||||
/// We should check other arguments first because we don't need to do any replacement in case of
|
||||
/// azureBlobStorage(connection_string|storage_account_url, container_name, blobpath, format, [account_name, account_key, ...])
|
||||
/// azureBlobStorageCluster(cluster, connection_string|storage_account_url, container_name, blobpath, format, [account_name, account_key, ...])
|
||||
size_t count = function->arguments->size();
|
||||
if ((url_arg_idx + 4 <= count) && (count <= url_arg_idx + 7))
|
||||
{
|
||||
String second_arg;
|
||||
if (tryGetStringFromArgument(url_arg_idx + 3, &second_arg))
|
||||
String fourth_arg;
|
||||
if (tryGetStringFromArgument(url_arg_idx + 3, &fourth_arg))
|
||||
{
|
||||
if (second_arg == "auto" || KnownFormatNames::instance().exists(second_arg))
|
||||
if (fourth_arg == "auto" || KnownFormatNames::instance().exists(fourth_arg))
|
||||
return; /// The argument after 'url' is a format: s3('url', 'format', ...)
|
||||
}
|
||||
}
|
||||
@ -234,6 +248,40 @@ protected:
|
||||
markSecretArgument(url_arg_idx + 4);
|
||||
}
|
||||
|
||||
bool maskAzureConnectionString(ssize_t url_arg_idx, bool argument_is_named = false, size_t start = 0)
|
||||
{
|
||||
String url_arg;
|
||||
if (argument_is_named)
|
||||
{
|
||||
url_arg_idx = findNamedArgument(&url_arg, "connection_string", start);
|
||||
if (url_arg_idx == -1 || url_arg.empty())
|
||||
url_arg_idx = findNamedArgument(&url_arg, "storage_account_url", start);
|
||||
if (url_arg_idx == -1 || url_arg.empty())
|
||||
return false;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (!tryGetStringFromArgument(url_arg_idx, &url_arg))
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!url_arg.starts_with("http"))
|
||||
{
|
||||
static re2::RE2 account_key_pattern = "AccountKey=.*?(;|$)";
|
||||
if (RE2::Replace(&url_arg, account_key_pattern, "AccountKey=[HIDDEN]\\1"))
|
||||
{
|
||||
chassert(result.count == 0); /// We shouldn't use replacement with masking other arguments
|
||||
result.start = url_arg_idx;
|
||||
result.are_named = argument_is_named;
|
||||
result.count = 1;
|
||||
result.replacement = url_arg;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
void findURLSecretArguments()
|
||||
{
|
||||
if (!isNamedCollectionName(0))
|
||||
@ -513,8 +561,9 @@ protected:
|
||||
return function->arguments->at(arg_idx)->isIdentifier();
|
||||
}
|
||||
|
||||
/// Looks for a secret argument with a specified name. This function looks for arguments in format `key=value` where the key is specified.
|
||||
void findSecretNamedArgument(const std::string_view & key, size_t start = 0)
|
||||
/// Looks for an argument with a specified name. This function looks for arguments in format `key=value` where the key is specified.
|
||||
/// Returns -1 if no argument was found.
|
||||
ssize_t findNamedArgument(String * res, const std::string_view & key, size_t start = 0)
|
||||
{
|
||||
for (size_t i = start; i < function->arguments->size(); ++i)
|
||||
{
|
||||
@ -531,8 +580,22 @@ protected:
|
||||
continue;
|
||||
|
||||
if (found_key == key)
|
||||
markSecretArgument(i, /* argument_is_named= */ true);
|
||||
{
|
||||
tryGetStringFromArgument(*equals_func->arguments->at(1), res);
|
||||
return i;
|
||||
}
|
||||
}
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
/// Looks for a secret argument with a specified name. This function looks for arguments in format `key=value` where the key is specified.
|
||||
/// If the argument is found, it is marked as a secret.
|
||||
void findSecretNamedArgument(const std::string_view & key, size_t start = 0)
|
||||
{
|
||||
ssize_t arg_idx = findNamedArgument(nullptr, key, start);
|
||||
if (arg_idx >= 0)
|
||||
markSecretArgument(arg_idx, /* argument_is_named= */ true);
|
||||
}
|
||||
};
|
||||
|
||||
|
@ -223,7 +223,7 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context,
|
||||
{
|
||||
account_name = fourth_arg;
|
||||
account_key = checkAndGetLiteralArgument<String>(engine_args[4], "account_key");
|
||||
auto sixth_arg = checkAndGetLiteralArgument<String>(engine_args[5], "format/account_name");
|
||||
auto sixth_arg = checkAndGetLiteralArgument<String>(engine_args[5], "format/structure");
|
||||
if (is_format_arg(sixth_arg))
|
||||
{
|
||||
format = sixth_arg;
|
||||
@ -257,10 +257,10 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context,
|
||||
}
|
||||
else if (with_structure && engine_args.size() == 8)
|
||||
{
|
||||
auto fourth_arg = checkAndGetLiteralArgument<String>(engine_args[3], "format/account_name");
|
||||
auto fourth_arg = checkAndGetLiteralArgument<String>(engine_args[3], "account_name");
|
||||
account_name = fourth_arg;
|
||||
account_key = checkAndGetLiteralArgument<String>(engine_args[4], "account_key");
|
||||
auto sixth_arg = checkAndGetLiteralArgument<String>(engine_args[5], "format/account_name");
|
||||
auto sixth_arg = checkAndGetLiteralArgument<String>(engine_args[5], "format");
|
||||
if (!is_format_arg(sixth_arg))
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown format {}", sixth_arg);
|
||||
format = sixth_arg;
|
||||
|
@ -131,7 +131,7 @@ std::shared_ptr<StorageObjectStorageSource::IIterator> StorageObjectStorageSourc
|
||||
else
|
||||
{
|
||||
ConfigurationPtr copy_configuration = configuration->clone();
|
||||
auto filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns);
|
||||
auto filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns, local_context);
|
||||
if (filter_dag)
|
||||
{
|
||||
auto keys = configuration->getPaths();
|
||||
@ -142,7 +142,7 @@ std::shared_ptr<StorageObjectStorageSource::IIterator> StorageObjectStorageSourc
|
||||
|
||||
VirtualColumnUtils::buildSetsForDAG(*filter_dag, local_context);
|
||||
auto actions = std::make_shared<ExpressionActions>(std::move(*filter_dag));
|
||||
VirtualColumnUtils::filterByPathOrFile(keys, paths, actions, virtual_columns);
|
||||
VirtualColumnUtils::filterByPathOrFile(keys, paths, actions, virtual_columns, local_context);
|
||||
copy_configuration->setPaths(keys);
|
||||
}
|
||||
|
||||
@ -489,6 +489,7 @@ StorageObjectStorageSource::GlobIterator::GlobIterator(
|
||||
, virtual_columns(virtual_columns_)
|
||||
, throw_on_zero_files_match(throw_on_zero_files_match_)
|
||||
, read_keys(read_keys_)
|
||||
, local_context(context_)
|
||||
, file_progress_callback(file_progress_callback_)
|
||||
{
|
||||
if (configuration->isNamespaceWithGlobs())
|
||||
@ -510,7 +511,7 @@ StorageObjectStorageSource::GlobIterator::GlobIterator(
|
||||
}
|
||||
|
||||
recursive = key_with_globs == "/**";
|
||||
if (auto filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns))
|
||||
if (auto filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns, local_context))
|
||||
{
|
||||
VirtualColumnUtils::buildSetsForDAG(*filter_dag, getContext());
|
||||
filter_expr = std::make_shared<ExpressionActions>(std::move(*filter_dag));
|
||||
@ -585,7 +586,7 @@ StorageObjectStorage::ObjectInfoPtr StorageObjectStorageSource::GlobIterator::ne
|
||||
for (const auto & object_info : new_batch)
|
||||
paths.push_back(getUniqueStoragePathIdentifier(*configuration, *object_info, false));
|
||||
|
||||
VirtualColumnUtils::filterByPathOrFile(new_batch, paths, filter_expr, virtual_columns);
|
||||
VirtualColumnUtils::filterByPathOrFile(new_batch, paths, filter_expr, virtual_columns, local_context);
|
||||
|
||||
LOG_TEST(logger, "Filtered files: {} -> {}", paths.size(), new_batch.size());
|
||||
}
|
||||
|
@ -220,6 +220,7 @@ private:
|
||||
bool is_finished = false;
|
||||
bool first_iteration = true;
|
||||
std::mutex next_mutex;
|
||||
const ContextPtr local_context;
|
||||
|
||||
std::function<void(FileProgress)> file_progress_callback;
|
||||
};
|
||||
|
@ -1141,13 +1141,13 @@ StorageFileSource::FilesIterator::FilesIterator(
|
||||
{
|
||||
std::optional<ActionsDAG> filter_dag;
|
||||
if (!distributed_processing && !archive_info && !files.empty())
|
||||
filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns);
|
||||
filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns, context_);
|
||||
|
||||
if (filter_dag)
|
||||
{
|
||||
VirtualColumnUtils::buildSetsForDAG(*filter_dag, context_);
|
||||
auto actions = std::make_shared<ExpressionActions>(std::move(*filter_dag));
|
||||
VirtualColumnUtils::filterByPathOrFile(files, files, actions, virtual_columns);
|
||||
VirtualColumnUtils::filterByPathOrFile(files, files, actions, virtual_columns, context_);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -227,7 +227,7 @@ public:
|
||||
|
||||
std::optional<ActionsDAG> filter_dag;
|
||||
if (!uris.empty())
|
||||
filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns);
|
||||
filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns, context);
|
||||
|
||||
if (filter_dag)
|
||||
{
|
||||
@ -238,7 +238,7 @@ public:
|
||||
|
||||
VirtualColumnUtils::buildSetsForDAG(*filter_dag, context);
|
||||
auto actions = std::make_shared<ExpressionActions>(std::move(*filter_dag));
|
||||
VirtualColumnUtils::filterByPathOrFile(uris, paths, actions, virtual_columns);
|
||||
VirtualColumnUtils::filterByPathOrFile(uris, paths, actions, virtual_columns, context);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,5 +1,6 @@
|
||||
#include <memory>
|
||||
#include <stack>
|
||||
#include <unordered_set>
|
||||
#include <Core/NamesAndTypes.h>
|
||||
#include <Core/TypeId.h>
|
||||
|
||||
@ -46,6 +47,7 @@
|
||||
#include "Functions/IFunction.h"
|
||||
#include "Functions/IFunctionAdaptors.h"
|
||||
#include "Functions/indexHint.h"
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
#include <Interpreters/convertFieldToType.h>
|
||||
#include <Parsers/makeASTForLogicalFunction.h>
|
||||
#include <Columns/ColumnSet.h>
|
||||
@ -124,9 +126,18 @@ void filterBlockWithExpression(const ExpressionActionsPtr & actions, Block & blo
|
||||
}
|
||||
}
|
||||
|
||||
NamesAndTypesList getCommonVirtualsForFileLikeStorage()
|
||||
{
|
||||
return {{"_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
|
||||
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
|
||||
{"_size", makeNullable(std::make_shared<DataTypeUInt64>())},
|
||||
{"_time", makeNullable(std::make_shared<DataTypeDateTime>())},
|
||||
{"_etag", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};
|
||||
}
|
||||
|
||||
NameSet getVirtualNamesForFileLikeStorage()
|
||||
{
|
||||
return {"_path", "_file", "_size", "_time", "_etag"};
|
||||
return getCommonVirtualsForFileLikeStorage().getNameSet();
|
||||
}
|
||||
|
||||
std::unordered_map<std::string, std::string> parseHivePartitioningKeysAndValues(const String & path)
|
||||
@ -154,8 +165,10 @@ VirtualColumnsDescription getVirtualsForFileLikeStorage(ColumnsDescription & sto
|
||||
{
|
||||
VirtualColumnsDescription desc;
|
||||
|
||||
auto add_virtual = [&](const auto & name, const auto & type)
|
||||
auto add_virtual = [&](const NameAndTypePair & pair)
|
||||
{
|
||||
const auto & name = pair.getNameInStorage();
|
||||
const auto & type = pair.getTypeInStorage();
|
||||
if (storage_columns.has(name))
|
||||
{
|
||||
if (!context->getSettingsRef().use_hive_partitioning)
|
||||
@ -172,11 +185,8 @@ VirtualColumnsDescription getVirtualsForFileLikeStorage(ColumnsDescription & sto
|
||||
desc.addEphemeral(name, type, "");
|
||||
};
|
||||
|
||||
add_virtual("_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()));
|
||||
add_virtual("_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()));
|
||||
add_virtual("_size", makeNullable(std::make_shared<DataTypeUInt64>()));
|
||||
add_virtual("_time", makeNullable(std::make_shared<DataTypeDateTime>()));
|
||||
add_virtual("_etag", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()));
|
||||
for (const auto & item : getCommonVirtualsForFileLikeStorage())
|
||||
add_virtual(item);
|
||||
|
||||
if (context->getSettingsRef().use_hive_partitioning)
|
||||
{
|
||||
@ -188,16 +198,16 @@ VirtualColumnsDescription getVirtualsForFileLikeStorage(ColumnsDescription & sto
|
||||
if (type == nullptr)
|
||||
type = std::make_shared<DataTypeString>();
|
||||
if (type->canBeInsideLowCardinality())
|
||||
add_virtual(item.first, std::make_shared<DataTypeLowCardinality>(type));
|
||||
add_virtual({item.first, std::make_shared<DataTypeLowCardinality>(type)});
|
||||
else
|
||||
add_virtual(item.first, type);
|
||||
add_virtual({item.first, type});
|
||||
}
|
||||
}
|
||||
|
||||
return desc;
|
||||
}
|
||||
|
||||
static void addPathAndFileToVirtualColumns(Block & block, const String & path, size_t idx)
|
||||
static void addPathAndFileToVirtualColumns(Block & block, const String & path, size_t idx, const FormatSettings & format_settings, bool use_hive_partitioning)
|
||||
{
|
||||
if (block.has("_path"))
|
||||
block.getByName("_path").column->assumeMutableRef().insert(path);
|
||||
@ -214,18 +224,34 @@ static void addPathAndFileToVirtualColumns(Block & block, const String & path, s
|
||||
block.getByName("_file").column->assumeMutableRef().insert(file);
|
||||
}
|
||||
|
||||
if (use_hive_partitioning)
|
||||
{
|
||||
auto keys_and_values = parseHivePartitioningKeysAndValues(path);
|
||||
for (const auto & [key, value] : keys_and_values)
|
||||
{
|
||||
if (const auto * column = block.findByName(key))
|
||||
{
|
||||
ReadBufferFromString buf(value);
|
||||
column->type->getDefaultSerialization()->deserializeWholeText(column->column->assumeMutableRef(), buf, format_settings);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
block.getByName("_idx").column->assumeMutableRef().insert(idx);
|
||||
}
|
||||
|
||||
std::optional<ActionsDAG> createPathAndFileFilterDAG(const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns)
|
||||
std::optional<ActionsDAG> createPathAndFileFilterDAG(const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
|
||||
{
|
||||
if (!predicate || virtual_columns.empty())
|
||||
return {};
|
||||
|
||||
Block block;
|
||||
NameSet common_virtuals;
|
||||
if (context->getSettingsRef().use_hive_partitioning)
|
||||
common_virtuals = getVirtualNamesForFileLikeStorage();
|
||||
for (const auto & column : virtual_columns)
|
||||
{
|
||||
if (column.name == "_file" || column.name == "_path")
|
||||
if (column.name == "_file" || column.name == "_path" || !common_virtuals.contains(column.name))
|
||||
block.insert({column.type->createColumn(), column.type, column.name});
|
||||
}
|
||||
|
||||
@ -233,18 +259,19 @@ std::optional<ActionsDAG> createPathAndFileFilterDAG(const ActionsDAG::Node * pr
|
||||
return splitFilterDagForAllowedInputs(predicate, &block);
|
||||
}
|
||||
|
||||
ColumnPtr getFilterByPathAndFileIndexes(const std::vector<String> & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns)
|
||||
ColumnPtr getFilterByPathAndFileIndexes(const std::vector<String> & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
|
||||
{
|
||||
Block block;
|
||||
NameSet common_virtuals = getVirtualNamesForFileLikeStorage();
|
||||
for (const auto & column : virtual_columns)
|
||||
{
|
||||
if (column.name == "_file" || column.name == "_path")
|
||||
if (column.name == "_file" || column.name == "_path" || !common_virtuals.contains(column.name))
|
||||
block.insert({column.type->createColumn(), column.type, column.name});
|
||||
}
|
||||
block.insert({ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "_idx"});
|
||||
|
||||
for (size_t i = 0; i != paths.size(); ++i)
|
||||
addPathAndFileToVirtualColumns(block, paths[i], i);
|
||||
addPathAndFileToVirtualColumns(block, paths[i], i, getFormatSettings(context), context->getSettingsRef().use_hive_partitioning);
|
||||
|
||||
filterBlockWithExpression(actions, block);
|
||||
|
||||
|
@ -75,14 +75,14 @@ VirtualColumnsDescription getVirtualsForFileLikeStorage(
|
||||
const std::string & sample_path = "",
|
||||
std::optional<FormatSettings> format_settings_ = std::nullopt);
|
||||
|
||||
std::optional<ActionsDAG> createPathAndFileFilterDAG(const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns);
|
||||
std::optional<ActionsDAG> createPathAndFileFilterDAG(const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, const ContextPtr & context);
|
||||
|
||||
ColumnPtr getFilterByPathAndFileIndexes(const std::vector<String> & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns);
|
||||
ColumnPtr getFilterByPathAndFileIndexes(const std::vector<String> & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns, const ContextPtr & context);
|
||||
|
||||
template <typename T>
|
||||
void filterByPathOrFile(std::vector<T> & sources, const std::vector<String> & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns)
|
||||
void filterByPathOrFile(std::vector<T> & sources, const std::vector<String> & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
|
||||
{
|
||||
auto indexes_column = getFilterByPathAndFileIndexes(paths, actions, virtual_columns);
|
||||
auto indexes_column = getFilterByPathAndFileIndexes(paths, actions, virtual_columns, context);
|
||||
const auto & indexes = typeid_cast<const ColumnUInt64 &>(*indexes_column).getData();
|
||||
if (indexes.size() == sources.size())
|
||||
return;
|
||||
|
@ -1,5 +1,6 @@
|
||||
import pytest
|
||||
import random, string
|
||||
import re
|
||||
from helpers.cluster import ClickHouseCluster
|
||||
from helpers.test_tools import TSV
|
||||
|
||||
@ -336,6 +337,10 @@ def test_create_database():
|
||||
def test_table_functions():
|
||||
password = new_password()
|
||||
azure_conn_string = cluster.env_variables["AZURITE_CONNECTION_STRING"]
|
||||
account_key_pattern = re.compile("AccountKey=.*?(;|$)")
|
||||
masked_azure_conn_string = re.sub(
|
||||
account_key_pattern, "AccountKey=[HIDDEN]\\1", azure_conn_string
|
||||
)
|
||||
azure_storage_account_url = cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]
|
||||
azure_account_name = "devstoreaccount1"
|
||||
azure_account_key = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
|
||||
@ -467,23 +472,23 @@ def test_table_functions():
|
||||
"CREATE TABLE tablefunc30 (x int) AS s3('http://minio1:9001/root/data/test9.csv.gz', 'NOSIGN', 'CSV')",
|
||||
"CREATE TABLE tablefunc31 (`x` int) AS s3('http://minio1:9001/root/data/test10.csv.gz', 'minio', '[HIDDEN]')",
|
||||
"CREATE TABLE tablefunc32 (`x` int) AS deltaLake('http://minio1:9001/root/data/test11.csv.gz', 'minio', '[HIDDEN]')",
|
||||
f"CREATE TABLE tablefunc33 (x int) AS azureBlobStorage('{azure_conn_string}', 'cont', 'test_simple.csv', 'CSV')",
|
||||
f"CREATE TABLE tablefunc34 (x int) AS azureBlobStorage('{azure_conn_string}', 'cont', 'test_simple_1.csv', 'CSV', 'none')",
|
||||
f"CREATE TABLE tablefunc35 (x int) AS azureBlobStorage('{azure_conn_string}', 'cont', 'test_simple_2.csv', 'CSV', 'none', 'auto')",
|
||||
f"CREATE TABLE tablefunc33 (`x` int) AS azureBlobStorage('{masked_azure_conn_string}', 'cont', 'test_simple.csv', 'CSV')",
|
||||
f"CREATE TABLE tablefunc34 (`x` int) AS azureBlobStorage('{masked_azure_conn_string}', 'cont', 'test_simple_1.csv', 'CSV', 'none')",
|
||||
f"CREATE TABLE tablefunc35 (`x` int) AS azureBlobStorage('{masked_azure_conn_string}', 'cont', 'test_simple_2.csv', 'CSV', 'none', 'auto')",
|
||||
f"CREATE TABLE tablefunc36 (`x` int) AS azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_3.csv', '{azure_account_name}', '[HIDDEN]')",
|
||||
f"CREATE TABLE tablefunc37 (`x` int) AS azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_4.csv', '{azure_account_name}', '[HIDDEN]', 'CSV')",
|
||||
f"CREATE TABLE tablefunc38 (`x` int) AS azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_5.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none')",
|
||||
f"CREATE TABLE tablefunc39 (`x` int) AS azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_6.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none', 'auto')",
|
||||
f"CREATE TABLE tablefunc40 (x int) AS azureBlobStorage(named_collection_2, connection_string = '{azure_conn_string}', container = 'cont', blob_path = 'test_simple_7.csv', format = 'CSV')",
|
||||
f"CREATE TABLE tablefunc40 (`x` int) AS azureBlobStorage(named_collection_2, connection_string = '{masked_azure_conn_string}', container = 'cont', blob_path = 'test_simple_7.csv', format = 'CSV')",
|
||||
f"CREATE TABLE tablefunc41 (`x` int) AS azureBlobStorage(named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_8.csv', account_name = '{azure_account_name}', account_key = '[HIDDEN]')",
|
||||
f"CREATE TABLE tablefunc42 (x int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_conn_string}', 'cont', 'test_simple_9.csv', 'CSV')",
|
||||
f"CREATE TABLE tablefunc43 (x int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_conn_string}', 'cont', 'test_simple_10.csv', 'CSV', 'none')",
|
||||
f"CREATE TABLE tablefunc44 (x int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_conn_string}', 'cont', 'test_simple_11.csv', 'CSV', 'none', 'auto')",
|
||||
f"CREATE TABLE tablefunc42 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{masked_azure_conn_string}', 'cont', 'test_simple_9.csv', 'CSV')",
|
||||
f"CREATE TABLE tablefunc43 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{masked_azure_conn_string}', 'cont', 'test_simple_10.csv', 'CSV', 'none')",
|
||||
f"CREATE TABLE tablefunc44 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{masked_azure_conn_string}', 'cont', 'test_simple_11.csv', 'CSV', 'none', 'auto')",
|
||||
f"CREATE TABLE tablefunc45 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_12.csv', '{azure_account_name}', '[HIDDEN]')",
|
||||
f"CREATE TABLE tablefunc46 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_13.csv', '{azure_account_name}', '[HIDDEN]', 'CSV')",
|
||||
f"CREATE TABLE tablefunc47 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_14.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none')",
|
||||
f"CREATE TABLE tablefunc48 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_15.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none', 'auto')",
|
||||
f"CREATE TABLE tablefunc49 (x int) AS azureBlobStorageCluster('test_shard_localhost', named_collection_2, connection_string = '{azure_conn_string}', container = 'cont', blob_path = 'test_simple_16.csv', format = 'CSV')",
|
||||
f"CREATE TABLE tablefunc49 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', named_collection_2, connection_string = '{masked_azure_conn_string}', container = 'cont', blob_path = 'test_simple_16.csv', format = 'CSV')",
|
||||
f"CREATE TABLE tablefunc50 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_17.csv', account_name = '{azure_account_name}', account_key = '[HIDDEN]')",
|
||||
"CREATE TABLE tablefunc51 (`x` int) AS iceberg('http://minio1:9001/root/data/test11.csv.gz', 'minio', '[HIDDEN]')",
|
||||
],
|
||||
|
@ -33,8 +33,8 @@ Cross Elizabeth
|
||||
[1,2,3] 42.42
|
||||
Array(Int64) LowCardinality(Float64)
|
||||
101
|
||||
2070
|
||||
2070
|
||||
2071
|
||||
2071
|
||||
b
|
||||
1
|
||||
1
|
||||
|
@ -0,0 +1,6 @@
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
||||
1
|
72
tests/queries/0_stateless/03231_hive_partitioning_filtering.sh
Executable file
72
tests/queries/0_stateless/03231_hive_partitioning_filtering.sh
Executable file
@ -0,0 +1,72 @@
|
||||
#!/usr/bin/env bash
|
||||
# Tags: no-fasttest
|
||||
|
||||
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
DATA_DIR=$USER_FILES_PATH/$CLICKHOUSE_TEST_UNIQUE_NAME
|
||||
mkdir -p $DATA_DIR
|
||||
cp -r $CURDIR/data_hive/ $DATA_DIR
|
||||
|
||||
$CLICKHOUSE_CLIENT --query_id="test_03231_1_$CLICKHOUSE_TEST_UNIQUE_NAME" --query "
|
||||
SELECT countDistinct(_path) FROM file('$DATA_DIR/data_hive/partitioning/column0=*/sample.parquet') WHERE column0 = 'Elizabeth' SETTINGS use_hive_partitioning=1, optimize_count_from_files=0;
|
||||
"
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "
|
||||
SYSTEM FLUSH LOGS;
|
||||
"
|
||||
|
||||
for _ in {1..5}; do
|
||||
count=$( $CLICKHOUSE_CLIENT --query "
|
||||
SELECT ProfileEvents['EngineFileLikeReadFiles'] FROM system.query_log
|
||||
WHERE query_id='test_03231_1_$CLICKHOUSE_TEST_UNIQUE_NAME' AND
|
||||
current_database = currentDatabase() and type='QueryFinish';" )
|
||||
if [[ "$count" == "1" ]]; then
|
||||
echo "1"
|
||||
break
|
||||
fi
|
||||
sleep 1
|
||||
done
|
||||
|
||||
$CLICKHOUSE_CLIENT --query_id="test_03231_2_$CLICKHOUSE_TEST_UNIQUE_NAME" --query "
|
||||
SELECT countDistinct(_path) FROM file('$DATA_DIR/data_hive/partitioning/identifier=*/email.csv') WHERE identifier = 2070 SETTINGS use_hive_partitioning=1, optimize_count_from_files=0;
|
||||
"
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "
|
||||
SYSTEM FLUSH LOGS;
|
||||
"
|
||||
|
||||
for _ in {1..5}; do
|
||||
count=$( $CLICKHOUSE_CLIENT --query "
|
||||
SELECT ProfileEvents['EngineFileLikeReadFiles'] FROM system.query_log
|
||||
WHERE query_id='test_03231_2_$CLICKHOUSE_TEST_UNIQUE_NAME' AND
|
||||
current_database = currentDatabase() and type='QueryFinish';" )
|
||||
if [[ "$count" == "1" ]]; then
|
||||
echo "1"
|
||||
break
|
||||
fi
|
||||
sleep 1
|
||||
done
|
||||
|
||||
$CLICKHOUSE_CLIENT --query_id="test_03231_3_$CLICKHOUSE_TEST_UNIQUE_NAME" --query "
|
||||
SELECT countDistinct(_path) FROM file('$DATA_DIR/data_hive/partitioning/array=*/sample.parquet') WHERE array = [1,2,3] SETTINGS use_hive_partitioning=1, optimize_count_from_files=0;
|
||||
"
|
||||
|
||||
$CLICKHOUSE_CLIENT --query "
|
||||
SYSTEM FLUSH LOGS;
|
||||
"
|
||||
|
||||
for _ in {1..5}; do
|
||||
count=$( $CLICKHOUSE_CLIENT --query "
|
||||
SELECT ProfileEvents['EngineFileLikeReadFiles'] FROM system.query_log
|
||||
WHERE query_id='test_03231_3_$CLICKHOUSE_TEST_UNIQUE_NAME' AND
|
||||
current_database = currentDatabase() and type='QueryFinish';" )
|
||||
if [[ "$count" == "1" ]]; then
|
||||
echo "1"
|
||||
break
|
||||
fi
|
||||
sleep 1
|
||||
done
|
||||
|
||||
rm -rf $DATA_DIR
|
@ -1 +1 @@
|
||||
data_hive/partitioning/column0=Elizabeth/sample.parquet
|
||||
data_hive/partitioning/non_existing_column=Elizabeth/sample.parquet
|
||||
|
@ -5,4 +5,4 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
|
||||
# shellcheck source=../shell_config.sh
|
||||
. "$CURDIR"/../shell_config.sh
|
||||
|
||||
$CLICKHOUSE_LOCAL -q "SELECT substring(_path, position(_path, 'data_hive')) FROM file('$CURDIR/data_hive/partitioning/column0=*/sample.parquet') LIMIT 1;"
|
||||
$CLICKHOUSE_LOCAL -q "SELECT substring(_path, position(_path, 'data_hive')) FROM file('$CURDIR/data_hive/partitioning/non_existing_column=*/sample.parquet') LIMIT 1;"
|
||||
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -0,0 +1,5 @@
|
||||
_login_email,_identifier,_first_name,_last_name
|
||||
laura@example.com,2070,Laura,Grey
|
||||
craig@example.com,4081,Craig,Johnson
|
||||
mary@example.com,9346,Mary,Jenkins
|
||||
jamie@example.com,5079,Jamie,Smith
|
|
Loading…
Reference in New Issue
Block a user