Compare commits

...

46 Commits

Author SHA1 Message Date
Aleksei Filatov
e0b5ec379d
Merge 0c16dc0531 into c0c83236b6 2024-09-19 10:23:40 +03:00
Yakov Olkhovskiy
c0c83236b6
Merge pull request #69570 from alexkats/fix-azure
Mask azure connection string sensitive info
2024-09-19 05:40:47 +00:00
Yarik Briukhovetskyi
3eb5bc1a0f
Merge pull request #68963 from yariks5s/hive_partitioning_filtration
Filtering for hive partitioning
2024-09-18 22:16:58 +00:00
Alex Katsman
b88cd79959 Mask azure connection string sensitive info 2024-09-18 18:32:22 +00:00
Yarik Briukhovetskyi
143d9f0201
Merge branch 'ClickHouse:master' into hive_partitioning_filtration 2024-09-18 11:11:04 +02:00
Yarik Briukhovetskyi
f52cdfb795
Merge branch 'ClickHouse:master' into hive_partitioning_filtration 2024-09-17 18:50:43 +02:00
Yarik Briukhovetskyi
3a7c68a052
Update src/Storages/VirtualColumnUtils.cpp
Co-authored-by: Kruglov Pavel <48961922+Avogar@users.noreply.github.com>
2024-09-17 15:39:26 +02:00
Yarik Briukhovetskyi
e8d50aa97f
review 2024-09-17 15:02:33 +02:00
Yarik Briukhovetskyi
cb92aaf968
fix 03232_file_path_normalizing 2024-09-17 11:26:13 +02:00
Yarik Briukhovetskyi
0cdec0acf1
fix logical error 2024-09-16 19:13:30 +02:00
Yarik Briukhovetskyi
04f23332c3
fix filter issue 2024-09-16 15:59:22 +02:00
Yarik Briukhovetskyi
7d5203f8a7
add resize for partitioning_columns 2024-09-13 21:38:48 +02:00
Yarik Briukhovetskyi
0d1d750437
fix crash 2024-09-13 20:43:51 +02:00
Yarik Briukhovetskyi
ad31d86a15
move the block inserting 2024-09-13 19:58:19 +02:00
Yarik Briukhovetskyi
991279e5c6
revert 2024-09-13 19:23:00 +02:00
Yarik Briukhovetskyi
c184aae686
review 2024-09-13 16:40:01 +02:00
Yarik Briukhovetskyi
14a6b0422b
disable optimize_count_from_files 2024-09-13 16:33:17 +02:00
Yarik Briukhovetskyi
e8cec05d08
shellcheck 2024-09-11 13:52:20 +02:00
Yarik Briukhovetskyi
2876a4e714
add retries 2024-09-11 13:32:12 +02:00
Yarik Briukhovetskyi
a903e1a726
remove logging + fixing bug 2024-09-06 20:24:18 +02:00
Yarik Briukhovetskyi
2fa6be55ff
tests fix 2024-09-04 17:02:01 +02:00
Yarik Briukhovetskyi
8896d1b78b
try to fix tests 2024-09-04 14:46:29 +02:00
Yarik Briukhovetskyi
f688b903db
empty commit 2024-09-03 15:58:22 +02:00
Yarik Briukhovetskyi
21f9669836
empty commit 2024-09-03 15:41:43 +02:00
Yarik Briukhovetskyi
1a386ae4d5
Merge branch 'ClickHouse:master' into hive_partitioning_filtration 2024-09-03 15:35:31 +02:00
Yarik Briukhovetskyi
24f4e87f8b
revert debugging in tests 2024-09-03 15:20:22 +02:00
Yarik Briukhovetskyi
620640a042
just to test 2024-08-30 12:58:21 +02:00
Yarik Briukhovetskyi
ec469a117d
testing 2024-08-30 00:56:35 +02:00
Yarik Briukhovetskyi
7a879980d8
try to fix tests 2024-08-29 18:25:11 +02:00
Yarik Briukhovetskyi
2adc61c215
add flush logs 2024-08-29 16:39:22 +02:00
Yarik Briukhovetskyi
afc4d08aad
add no-fasttest tag 2024-08-29 13:31:05 +02:00
yariks5s
edc5d8dd92 fix path 2024-08-28 23:15:01 +00:00
yariks5s
d6b2a9d534 CLICKHOUSE_LOCAL -> CLIENT 2024-08-28 22:32:44 +00:00
yariks5s
dc97bd6b92 review + testing the code 2024-08-28 17:22:47 +00:00
Yarik Briukhovetskyi
60c6eb2610
trying to fix the test 2024-08-27 19:42:47 +02:00
Yarik Briukhovetskyi
9133505952
fix the test 2024-08-27 19:16:05 +02:00
Yarik Briukhovetskyi
2741bf00e4 chmod +x 2024-08-27 16:53:14 +00:00
Yarik Briukhovetskyi
4eca00a666
fix style 2024-08-27 18:10:41 +02:00
Yarik Briukhovetskyi
c6804122cb
fix shell 2024-08-27 16:52:29 +02:00
Yarik Briukhovetskyi
189cbe25fe
init 2024-08-27 16:28:18 +02:00
Aleksei Filatov
0c16dc0531 Add initial version VFSLog with JSON serializer 2024-07-29 12:53:28 +03:00
MikhailBurdukov
74b870710f
Snapshot Interfaces (#2) 2024-07-22 15:54:06 +03:00
MikhailBurdukov
4b8d315e87
Initial version of Garbage Collector (#1)
* Initial version of Garbage Collector

* Review fixes
2024-07-11 10:58:28 +03:00
Aleksei Filatov
d20f56c69d Merge remote-tracking branch 'upstream/master' into vfs_log_disk 2024-07-03 18:56:27 +03:00
Aleksei Filatov
c6e5824e59 Add is_corrupt flag and initial corruption detection 2024-07-03 18:55:02 +03:00
Aleksei Filatov
bb36d392be WAL initial implementation 2024-06-26 12:50:06 +03:00
35 changed files with 1988 additions and 53 deletions

View File

@ -146,6 +146,7 @@ endif()
add_headers_and_sources(dbms Disks/ObjectStorages/Cached)
add_headers_and_sources(dbms Disks/ObjectStorages/Local)
add_headers_and_sources(dbms Disks/ObjectStorages/Web)
add_headers_and_sources(dbms Disks/ObjectStorages/VFS)
add_headers_and_sources(dbms Storages/Cache)
if (TARGET ch_contrib::hivemetastore)

View File

@ -151,6 +151,15 @@ Names NamesAndTypesList::getNames() const
return res;
}
NameSet NamesAndTypesList::getNameSet() const
{
NameSet res;
res.reserve(size());
for (const NameAndTypePair & column : *this)
res.insert(column.name);
return res;
}
DataTypes NamesAndTypesList::getTypes() const
{
DataTypes res;

View File

@ -100,6 +100,7 @@ public:
void getDifference(const NamesAndTypesList & rhs, NamesAndTypesList & deleted, NamesAndTypesList & added) const;
Names getNames() const;
NameSet getNameSet() const;
DataTypes getTypes() const;
/// Remove columns which names are not in the `names`.

View File

@ -0,0 +1,425 @@
#include "AppendLog.h"
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <IO/copyData.h>
#include <Poco/Checksum.h>
#include <Common/re2.h>
#include <ranges>
#include <string>
namespace DB::ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int ARGUMENT_OUT_OF_BOUND;
extern const int CORRUPTED_DATA;
extern const int INVALID_STATE;
}
namespace DB::WAL
{
constexpr char first_ext[] = ".first";
constexpr char tmp_ext[] = ".tmp";
constexpr char prefix[] = "log_";
static UInt64 extract_start_index(const String & filename)
{
return static_cast<UInt64>(std::stoull(filename.substr(strlen(prefix), filename.find("."))));
}
static String makeSegmentName(UInt64 start_index, std::string_view ext = "")
{
return fmt::format("{}{:020}{}", prefix, start_index, ext);
}
static void verifyChecksum(const Entry & entry)
{
Poco::Checksum crc(Poco::Checksum::Type::TYPE_CRC32);
crc.update(reinterpret_cast<const char *>(&entry.index), sizeof(entry.index));
crc.update(entry.data.data(), static_cast<unsigned int>(entry.data.size()));
if (crc.checksum() != entry.checksum)
throw DB::Exception(ErrorCodes::CORRUPTED_DATA, "Checksum for the entry {} does not match the computed one", entry.index);
}
struct EntryNonOwning
{
EntryNonOwning(UInt64 index_, std::span<const char> data_) : index(index_), data(data_)
{
Poco::Checksum crc(Poco::Checksum::Type::TYPE_CRC32);
crc.update(reinterpret_cast<const char *>(&index), sizeof(index));
crc.update(data.data(), static_cast<unsigned int>(data.size()));
checksum = crc.checksum();
}
UInt64 index;
std::span<const char> data;
UInt32 checksum;
};
class EntrySerializer
{
public:
static void serialize(const EntryNonOwning & entry, WriteBuffer & out)
{
writeVarUInt(entry.index, out);
writeStringBinary(StringRef(entry.data.data(), entry.data.size()), out);
writeVarUInt(entry.checksum, out);
}
static Entry deserialize(ReadBuffer & in)
{
Entry entry;
try
{
readVarUInt(entry.index, in);
readBinary(entry.data, in);
readVarUInt(entry.checksum, in);
}
catch (const DB::Exception & e)
{
if (e.code() == DB::ErrorCodes::CANNOT_READ_ALL_DATA || e.code() == DB::ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF)
{
throw Exception(
ErrorCodes::CORRUPTED_DATA, "Cannot read data. Source data is possibly corrupted");
}
else
throw;
}
verifyChecksum(entry);
return entry;
}
static size_t calculateSerializedSize(const EntryNonOwning & entry)
{
size_t res = 0;
res += getLengthOfVarUInt(entry.index);
res += getLengthOfVarUInt(entry.data.size_bytes());
res += entry.data.size_bytes();
res += getLengthOfVarUInt(entry.checksum);
return res;
}
};
AppendLog::AppendLog(const fs::path & log_dir_, const Settings & settings_) : log_dir(log_dir_), settings(settings_)
{
if (!fs::exists(log_dir))
fs::create_directories(log_dir);
load();
}
UUID AppendLog::getID() const
{
std::lock_guard lock(mutex);
return log_id;
}
void AppendLog::load()
{
// TODO: load uuid file
for (const auto & dir_entry : fs::directory_iterator(log_dir))
{
const String filename = dir_entry.path().filename();
if (!dir_entry.is_regular_file())
continue;
if (!filename.starts_with(prefix))
continue;
if (filename.ends_with(tmp_ext))
{
fs::remove(dir_entry.path());
continue;
}
segments.emplace_back(dir_entry.path(), extract_start_index(filename));
}
// The last segment at the beginning
std::ranges::sort(segments, [](const auto & lhs, const auto & rhs) { return lhs.path > rhs.path; });
if (segments.empty())
{
next_index = 0;
const auto path = log_dir / makeSegmentName(next_index);
segments.emplace_back(path, next_index);
log_file = std::make_unique<WriteBufferFromFile>(path, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_CREAT | O_TRUNC | O_APPEND);
active_segment_size = 0;
return;
}
// Find the segment marked as first with the largest index
auto first_segment = std::ranges::find_if(segments, [](auto && s) { return s.path.extension() == first_ext; });
if (first_segment != segments.end())
{
// Remove outdated segments
std::for_each(std::next(first_segment), segments.end(), [](auto && s) { fs::remove(s.path); });
segments.erase(std::next(first_segment), segments.end());
fs::path new_path = first_segment->path;
new_path.replace_extension();
fs::rename(first_segment->path, new_path);
first_segment->path = new_path;
}
Segment & last_segment = segments.front();
next_index = findNextIndex(last_segment);
// Open the last segment for appending. File must exists
log_file = std::make_unique<WriteBufferFromFile>(last_segment.path, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_APPEND);
active_segment_size = log_file->size();
}
UInt64 AppendLog::getNextIndex() const
{
std::lock_guard lock(mutex);
assertNotCorrupted();
return next_index;
}
UInt64 AppendLog::segmentsCount() const
{
std::lock_guard lock(mutex);
assertNotCorrupted();
return segments.size();
}
UInt64 AppendLog::getStartIndex() const
{
std::lock_guard lock(mutex);
assertNotCorrupted();
return getStartIndexNoLock();
}
UInt64 AppendLog::getStartIndexNoLock() const
{
chassert(!segments.empty());
return segments.front().start_index;
}
void AppendLog::reload()
{
segments.clear();
load();
}
size_t AppendLog::size() const
{
std::lock_guard lock(mutex);
assertNotCorrupted();
if (segments.empty())
return 0;
return next_index - segments.front().start_index;
}
UInt64 AppendLog::findNextIndex(const Segment & segment) const
{
ReadBufferFromFile in(segment.path);
UInt64 res = segment.start_index;
// TODO: read and check version of segment
while (!in.eof())
{
try
{
Entry entry = EntrySerializer::deserialize(in);
res = entry.index + 1;
}
catch (const DB::Exception & e)
{
if (e.code() == ErrorCodes::CORRUPTED_DATA)
is_corrupt = true;
throw;
}
}
return res;
}
void AppendLog::appendSegment(UInt64 start_index)
{
const auto path = log_dir / makeSegmentName(start_index);
segments.emplace_back(path, start_index);
if (log_file)
log_file->sync();
log_file = std::make_unique<WriteBufferFromFile>(path, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_APPEND | O_CREAT);
active_segment_size = 0;
}
UInt64 AppendLog::append(std::span<const char> message)
{
std::lock_guard lock(mutex);
assertNotCorrupted();
UInt64 index = next_index;
EntryNonOwning entry(index, message);
size_t entry_size = EntrySerializer::calculateSerializedSize(entry);
if (entry_size > settings.max_segment_size)
throw DB::Exception(
DB::ErrorCodes::BAD_ARGUMENTS,
"Log entry size: {} is greater than max_segment_size: {}",
entry_size,
settings.max_segment_size);
if (active_segment_size + entry_size > settings.max_segment_size)
appendSegment(next_index);
size_t old_count = log_file->count();
EntrySerializer::serialize(entry, *log_file);
log_file->sync();
next_index++;
active_segment_size += log_file->count() - old_count;
return index;
}
Entries AppendLog::readFront(size_t count) const
{
std::lock_guard lock(mutex);
assertNotCorrupted();
Entries entries;
entries.reserve(count);
size_t read = 0;
for (const auto & segment : segments)
{
read += readEntries(segment, entries, count - read);
if (read >= count)
break;
}
chassert(entries.size() <= count);
return entries;
}
size_t AppendLog::readEntries(const Segment & segment, Entries & entries, size_t limit) const
{
// TODO: Make a segment cache(LRU) for keeping entries in memory
ReadBufferFromFile in(segment.path);
size_t loaded = 0;
// TODO: read and check version of segment
while (!in.eof())
{
try
{
auto entry = EntrySerializer::deserialize(in);
entries.push_back(entry);
}
catch (const DB::Exception & e)
{
if (e.code() == ErrorCodes::CORRUPTED_DATA)
is_corrupt = true;
throw;
}
loaded++;
if (loaded >= limit)
break;
}
return loaded;
}
void AppendLog::copyEntriesWithSkip(Segment & segment, DB::WriteBuffer & out, size_t entries_to_skip)
{
ReadBufferFromFile in(segment.path);
// TODO: read and check version of segment
for (size_t i = 0; i < entries_to_skip; i++)
{
try
{
EntrySerializer::deserialize(in);
}
catch (const DB::Exception & e)
{
if (e.code() == DB::ErrorCodes::CORRUPTED_DATA)
is_corrupt = true;
throw;
}
}
// Just copy the rest
copyData(in, out);
out.sync();
}
int AppendLog::findSegment(UInt64 index) const
{
for (size_t i = 0; i < segments.size(); i++)
{
UInt64 last_index{};
if (i == segments.size() - 1)
last_index = next_index;
else
last_index = segments[i + 1].start_index;
if (index >= segments[i].start_index && index < last_index)
return static_cast<int>(i);
}
return -1;
}
size_t AppendLog::dropUpTo(UInt64 index)
{
std::lock_guard lock(mutex);
assertNotCorrupted();
chassert(!segments.empty());
size_t start_index = getStartIndexNoLock();
if (index < start_index || index > next_index)
throw Exception(ErrorCodes::ARGUMENT_OUT_OF_BOUND, "The index to drop up to is out of range");
if (index == start_index)
return 0;
// Find a segment containing the last index to drop
int segment_idx = findSegment(index - 1);
chassert(segment_idx != -1);
size_t whole_segments_to_drop = segment_idx;
size_t entries_to_drop = index - segments[segment_idx].start_index;
size_t new_start_index{};
if (whole_segments_to_drop == segments.size())
new_start_index = next_index;
else
new_start_index = segments[whole_segments_to_drop].start_index + entries_to_drop;
String new_first_segment_name = makeSegmentName(new_start_index, first_ext);
// Create temporary file
String tmp_name = new_first_segment_name + tmp_ext;
auto tmp_file = WriteBufferFromFile(log_dir / tmp_name, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_CREAT);
if (whole_segments_to_drop != segments.size())
copyEntriesWithSkip(segments[whole_segments_to_drop], tmp_file, entries_to_drop);
fs::rename(log_dir / tmp_name, log_dir / new_first_segment_name);
reload();
chassert(getStartIndexNoLock() == index);
return getStartIndexNoLock() - start_index;
}
void AppendLog::assertNotCorrupted() const
{
if (is_corrupt)
throw Exception(ErrorCodes::INVALID_STATE, "WAL possibly contains corrupted data");
}
}

View File

@ -0,0 +1,103 @@
#pragma once
#include <IO/WriteBufferFromFile.h>
#include <Core/Types.h>
#include <filesystem>
#include <mutex>
#include <span>
namespace fs = std::filesystem;
namespace DB::WAL
{
enum class SyncMode
{
NO_SYNC,
TIMED_SYNC,
ALL_SYNC
};
struct Segment
{
fs::path path;
UInt64 start_index;
};
struct Entry
{
UInt64 index;
std::vector<char> data;
UInt32 checksum;
bool operator==(const Entry &) const = default;
};
using Entries = std::vector<Entry>;
struct Settings
{
SyncMode sync_mode = SyncMode::ALL_SYNC;
UInt64 max_segment_size = 10 * 1024 * 1024;
};
class AppendLog
{
public:
AppendLog(const fs::path & log_dir, const Settings & settings_ = Settings{});
/// Append message at the end of the log and return index of the inserted entry
UInt64 append(std::span<const char> message);
/// Read count entries(maybe less) from the front of the log
Entries readFront(size_t count) const;
/// Drop all entries from the beginning of the log up to one with the specified index (excluding)
size_t dropUpTo(UInt64 index);
/// Return the number of entries in the log
size_t size() const;
/// Return the index of the next entry to append
UInt64 getNextIndex() const;
/// Return the index of the first entry in the log
UInt64 getStartIndex() const;
/// Return the number of segments the log contains
size_t segmentsCount() const;
/// Return unique log ID
UUID getID() const;
private:
/// Load all segments from the log directory and clean outdated segments
void load() TSA_REQUIRES(mutex);
/// Reload segments from the log directory
void reload() TSA_REQUIRES(mutex);
/// Read no more than limit entries from the segment and return how much was read
size_t readEntries(const Segment & segment, Entries & entries, size_t limit) const TSA_REQUIRES(mutex);
/// Copy entries from the beginning of the segment to the output buffer skipping the first ones
void copyEntriesWithSkip(Segment & segment, WriteBuffer & out, size_t entries_to_skip) TSA_REQUIRES(mutex);
/// Create and append new empty segment to the log
void appendSegment(UInt64 start_index) TSA_REQUIRES(mutex);
/// Return the sequence number of the segment containing entry with the index
int findSegment(UInt64 index) const TSA_REQUIRES(mutex);
/// Return the index after the last entry of the segment
UInt64 findNextIndex(const Segment & segment) const TSA_REQUIRES(mutex);
void assertNotCorrupted() const TSA_REQUIRES(mutex);
UInt64 getStartIndexNoLock() const TSA_REQUIRES(mutex);
mutable std::mutex mutex;
/// The active segment file to append
std::unique_ptr<WriteBufferFromFile> log_file TSA_GUARDED_BY(mutex);
size_t active_segment_size TSA_GUARDED_BY(mutex) = 0;
UUID log_id TSA_GUARDED_BY(mutex) {};
/// Index of the next entry to append
UInt64 next_index TSA_GUARDED_BY(mutex) = 0;
/// Log working directory
fs::path log_dir TSA_GUARDED_BY(mutex);
/// Settings
Settings settings TSA_GUARDED_BY(mutex);
/// Segments compriing the log
std::vector<Segment> segments TSA_GUARDED_BY(mutex);
/// Whether corrupted data has been detected
mutable bool is_corrupt TSA_GUARDED_BY(mutex) = false;
};
}

View File

@ -0,0 +1,117 @@
#pragma once
#include <Common/Exception.h>
#include <Disks/ObjectStorages/VFS/VFSLog.h>
#include <IO/ReadHelpers.h>
#include <Poco/JSON/JSON.h>
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Parser.h>
#include <Poco/JSON/Stringifier.h>
#include <variant>
#include <vector>
namespace DB
{
namespace ErrorCodes
{
extern const int CORRUPTED_DATA;
}
class JSONSerializer
{
public:
static std::vector<char> serialize(const VFSEvent & event)
{
Poco::JSON::Object object;
serializeToJSON(event, object);
String str = to_string(object);
return {str.begin(), str.end()};
}
static VFSEvent deserialize(std::span<char> buffer)
{
String str(buffer.data(), buffer.size());
auto object = from_string(std::move(str));
if (!object)
throw Exception(ErrorCodes::CORRUPTED_DATA, "Cannot parse VFS log item buffer as JSON");
VFSEvent event;
deserializeFromJSON(event, *object);
return event;
}
private:
template <typename Value>
static Value getOrThrow(const Poco::JSON::Object & object, const String & key)
{
if (!object.has(key))
throw Exception(ErrorCodes::CORRUPTED_DATA, "Key {} is not found in VFS log item", key);
return object.getValue<Value>(key);
}
static String to_string(const Poco::JSON::Object & object)
{
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss.exceptions(std::ios::failbit);
Poco::JSON::Stringifier::stringify(object, oss);
return oss.str();
}
static Poco::JSON::Object::Ptr from_string(const String & json_str)
{
Poco::JSON::Parser parser;
return parser.parse(json_str).extract<Poco::JSON::Object::Ptr>();
}
static void serializeToJSON(const VFSEvent & event, Poco::JSON::Object & object)
{
object.set("remote_path", event.remote_path);
object.set("local_path", event.local_path);
object.set("action", static_cast<std::underlying_type_t<VFSAction>>(event.action));
object.set("timestamp", event.timestamp.epochMicroseconds());
if (event.orig_wal.has_value())
{
Poco::JSON::Object orig_wal;
serializeToJSON(*event.orig_wal, orig_wal);
object.set("orig_wal", orig_wal);
}
}
static void serializeToJSON(const WALInfo & orig_wal, Poco::JSON::Object & object)
{
object.set("index", orig_wal.index);
object.set("id", toString(orig_wal.id));
object.set("replica", orig_wal.replica);
}
static void deserializeFromJSON(VFSEvent & event, const Poco::JSON::Object & json)
{
event.remote_path = getOrThrow<String>(json, "remote_path");
event.local_path = getOrThrow<String>(json, "local_path");
event.action = static_cast<VFSAction>(getOrThrow<std::underlying_type_t<VFSAction>>(json, "action"));
event.timestamp = getOrThrow<UInt64>(json, "timestamp");
if (auto orig_wal = json.getObject("orig_wal"); orig_wal)
deserializeFromJSON(event.orig_wal.emplace(), *orig_wal);
}
static void deserializeFromJSON(WALInfo & orig_wal, const Poco::JSON::Object & json)
{
orig_wal.id = parseFromString<UUID>(getOrThrow<String>(json, "id"));
orig_wal.index = getOrThrow<UInt64>(json, "index");
orig_wal.replica = getOrThrow<String>(json, "replica");
}
};
}

View File

@ -0,0 +1,157 @@
#include "VFSGarbageCollector.h"
#include <IO/ReadBufferFromEmptyFile.h>
#include <IO/ReadHelpers.h>
#include <IO/S3Common.h>
#include <Interpreters/Context.h>
#include <Common/FailPoint.h>
#include <Common/ProfileEvents.h>
#include <Common/Stopwatch.h>
#include <Common/ZooKeeper/ZooKeeperLock.h>
namespace ProfileEvents
{
extern const Event VFSGcRunsCompleted;
extern const Event VFSGcRunsException;
extern const Event VFSGcRunsSkipped;
extern const Event VFSGcTotalSeconds;
extern const Event VFSGcCumulativeWALSItemsMerged;
}
namespace DB
{
using ms = std::chrono::milliseconds;
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int INVALID_STATE;
}
namespace FailPoints
{
extern const char vfs_gc_optimistic_lock_delay[];
}
VFSGarbageCollector::VFSGarbageCollector(
const String & gc_name_,
ObjectStoragePtr object_storage_,
VFSLog & wal_,
BackgroundSchedulePool & pool,
const GarbageCollectorSettings & settings_)
: gc_name(gc_name_)
, object_storage(object_storage_)
, vfs_shapshot_data(object_storage, gc_name)
, wal(wal_)
, settings(settings_)
, log(getLogger(fmt::format("VFSGC({})", gc_name)))
{
LOG_INFO(log, "GC started");
initGCState();
*static_cast<BackgroundSchedulePoolTaskHolder *>(this) = pool.createTask(
log->name(),
[this]
{
try
{
run();
}
catch (...)
{
ProfileEvents::increment(ProfileEvents::VFSGcRunsException);
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
(*this)->scheduleAfter(settings.gc_sleep_ms);
});
(*this)->activateAndSchedule();
}
void VFSGarbageCollector::initGCState() const
{
auto zookeeper = getZookeeper();
auto path = settings.zk_gc_path + "/garbage_collector";
zookeeper->tryCreate(path, "", zkutil::CreateMode::Persistent);
}
void VFSGarbageCollector::run()
{
Stopwatch stop_watch;
/// Optimization to reduce probability of clashes between replicas
/// TODO alexfvk: templatize ZooKeeperLock to support ZooKeeperWithFaultInjection
auto lock = zkutil::createSimpleZooKeeperLock(
getZookeeper()->getKeeper(), settings.zk_gc_path + "/garbage_collector", "lock", fmt::format("garbare_collector_run"));
if (!lock->tryLock())
{
LOG_DEBUG(log, "Skipped run due to pessimistic lock already acquired");
return;
}
LOG_DEBUG(log, "GC acquired lock");
updateSnapshot();
ProfileEvents::increment(ProfileEvents::VFSGcRunsCompleted);
ProfileEvents::increment(ProfileEvents::VFSGcTotalSeconds, stop_watch.elapsedMilliseconds() / 1000);
LOG_DEBUG(log, "GC iteration finished");
}
String VFSGarbageCollector::getZKSnapshotPath() const
{
return settings.zk_gc_path + "/garbage_collector/shapshot_meta";
}
SnapshotMetadata VFSGarbageCollector::getSnapshotMetadata() const
{
auto zk_shapshot_metadata_path = getZKSnapshotPath();
auto zookeeper = getZookeeper();
Coordination::Stat stat;
Coordination::Error error;
String content;
zookeeper->tryGet(zk_shapshot_metadata_path, content, &stat, nullptr, &error);
if (error == Coordination::Error::ZOK)
return SnapshotMetadata::deserialize(content, stat.version);
throw Coordination::Exception(error);
}
ZooKeeperWithFaultInjectionPtr VFSGarbageCollector::getZookeeper() const
{
return ZooKeeperWithFaultInjection::createInstance(
settings.keeper_fault_injection_probability,
settings.keeper_fault_injection_seed,
Context::getGlobalContextInstance()->getZooKeeper(),
fmt::format("VFSGarbageCollector({})", gc_name),
log);
}
void VFSGarbageCollector::updateShapshotMetadata(const SnapshotMetadata & new_snapshot, int32_t znode_required_version) const
{
auto zk_shapshot_metadata_path = getZKSnapshotPath();
auto zookeeper = getZookeeper();
/// For consistency
zookeeper->set(zk_shapshot_metadata_path, new_snapshot.serialize(), znode_required_version);
}
void VFSGarbageCollector::updateSnapshot()
{
SnapshotMetadata snapshot_meta = getSnapshotMetadata();
auto wal_items_batch = wal.read(settings.batch_size);
if (wal_items_batch.size() == 0ul)
{
LOG_DEBUG(log, "Merge snapshot exit due to empty wal.");
return;
}
LOG_DEBUG(log, "Merge snapshot with {} entries from wal.", wal_items_batch.size());
auto new_snaphot_meta = vfs_shapshot_data.mergeWithWals(std::move(wal_items_batch), snapshot_meta);
updateShapshotMetadata(new_snaphot_meta, snapshot_meta.znode_version);
wal.dropUpTo(wal_items_batch.back().wal.index + 1);
LOG_DEBUG(log, "Snapshot update finished with new shapshot key {}", new_snaphot_meta.object_storage_key);
}
}

View File

@ -0,0 +1,62 @@
#pragma once
#include "AppendLog.h"
#include "VFSSnapshot.h"
#include <vector>
#include <Core/BackgroundSchedulePool.h>
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <Disks/ObjectStorages/VFS/VFSSnapshot.h>
#include <IO/ReadBuffer.h>
#include <IO/ReadHelpers.h>
#include <fmt/chrono.h>
#include <Common/ZooKeeper/ZooKeeperWithFaultInjection.h>
namespace DB
{
class Context;
struct GarbageCollectorSettings
{
String zk_gc_path;
size_t gc_sleep_ms;
double keeper_fault_injection_probability;
UInt64 keeper_fault_injection_seed;
size_t batch_size;
};
class ZooKeeperWithFaultInjection;
using ZooKeeperWithFaultInjectionPtr = std::shared_ptr<ZooKeeperWithFaultInjection>;
class VFSGarbageCollector : private BackgroundSchedulePoolTaskHolder
{
public:
VFSGarbageCollector(
const String & gc_name_,
ObjectStoragePtr object_storage_,
VFSLog & wal_,
BackgroundSchedulePool & pool,
const GarbageCollectorSettings & settings_);
private:
void run();
void updateSnapshot();
String getZKSnapshotPath() const;
// Get current shapshot object path from zookeeper.
SnapshotMetadata getSnapshotMetadata() const;
void updateShapshotMetadata(const SnapshotMetadata & new_snapshot, int32_t znode_required_version) const;
void initGCState() const;
ZooKeeperWithFaultInjectionPtr getZookeeper() const;
String gc_name;
ObjectStoragePtr object_storage;
VFSSnapshotDataFromObjectStorage vfs_shapshot_data;
VFSLog & wal;
const GarbageCollectorSettings settings;
LoggerPtr log;
};
}

View File

@ -0,0 +1,53 @@
#include "VFSLog.h"
#include <Disks/ObjectStorages/VFS/JSONSerializer.h>
#include <Common/Concepts.h>
namespace DB
{
VFSLog::VFSLog(const String & log_dir) : wal(log_dir) // TODO: read settings from config
{
}
UInt64 VFSLog::write(const VFSEvent & event)
{
return wal.append(JSONSerializer::serialize(event));
}
VFSLogItems VFSLog::read(size_t count) const
{
VFSLogItems vfs_items;
WAL::Entries wal_items = wal.readFront(count);
auto wal_id = wal.getID(); // TODO: It makes sense to return wal id from readFront
vfs_items.reserve(wal_items.size());
std::ranges::transform(
wal_items,
std::back_inserter(vfs_items),
[&wal_id](auto && wal_item) -> VFSLogItem
{
VFSLogItem item;
item.event = JSONSerializer::deserialize(wal_item.data);
item.wal.index = wal_item.index;
item.wal.id = wal_id;
return item;
});
return vfs_items;
}
size_t VFSLog::dropUpTo(UInt64 index)
{
return wal.dropUpTo(index);
}
size_t VFSLog::size() const
{
return wal.size();
}
}

View File

@ -0,0 +1,65 @@
#pragma once
#include <Core/UUID.h>
#include <Disks/ObjectStorages/VFS/AppendLog.h>
#include <variant>
namespace DB
{
enum class VFSAction : uint8_t
{
LINK,
UNLINK,
REQUEST
};
struct WALInfo
{
String replica;
UUID id;
UInt64 index;
bool operator==(const WALInfo &) const = default;
};
struct VFSEvent
{
String remote_path;
String local_path;
std::optional<WALInfo> orig_wal;
Poco::Timestamp timestamp;
VFSAction action;
bool operator==(const VFSEvent &) const = default;
};
struct VFSLogItem
{
VFSEvent event;
WALInfo wal;
bool operator==(const VFSLogItem &) const = default;
};
using VFSLogItems = std::vector<VFSLogItem>;
class VFSLog
{
public:
VFSLog(const String & log_dir);
UInt64 write(const VFSEvent & event);
VFSLogItems read(size_t count) const;
size_t dropUpTo(UInt64 index);
size_t size() const;
private:
WAL::AppendLog wal;
};
using VFSLogPtr = std::shared_ptr<VFSLog>;
}

View File

@ -0,0 +1,74 @@
#pragma once
#include <IO/ReadBuffer.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromString.h>
#include <fmt/chrono.h>
namespace DB
{
struct SnapshotMetadata
{
uint64_t metadata_version;
String object_storage_key;
uint64_t total_size;
int32_t znode_version;
bool is_initial_snaphot;
SnapshotMetadata(
uint64_t metadata_version_ = 0ull,
const String & object_storage_key_ = "",
uint64_t total_size_ = 0ull,
int32_t znode_version_ = -1,
bool is_initial_ = false)
: metadata_version(metadata_version_)
, object_storage_key(object_storage_key_)
, total_size(total_size_)
, znode_version(znode_version_)
, is_initial_snaphot(is_initial_)
{
}
SnapshotMetadata(
const String & object_storage_key_,
uint64_t metadata_version_ = 0ull,
uint64_t total_size_ = 0ull,
int32_t znode_version_ = -1,
bool is_initial_ = false)
: metadata_version(metadata_version_)
, object_storage_key(object_storage_key_)
, total_size(total_size_)
, znode_version(znode_version_)
, is_initial_snaphot(is_initial_)
{
}
void update(const String & new_snapshot_key) { object_storage_key = new_snapshot_key; }
String serialize() const { return fmt::format("{} {} {} ", metadata_version, object_storage_key, total_size); }
static SnapshotMetadata deserialize(const String & str, int32_t znode_version)
{
SnapshotMetadata result;
result.znode_version = znode_version;
/// In case of initial snaphot, the content will be empty.
if (str.empty())
{
result.is_initial_snaphot = true;
return result;
}
ReadBufferFromString rb(str);
readIntTextUnsafe(result.metadata_version, rb);
checkChar(' ', rb);
readStringUntilWhitespace(result.object_storage_key, rb);
checkChar(' ', rb);
readIntTextUnsafe(result.total_size, rb);
result.is_initial_snaphot = false;
return result;
}
};
}

View File

@ -0,0 +1,200 @@
#include "VFSSnapshot.h"
#include <algorithm>
#include <iostream>
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <IO/ReadBufferFromEmptyFile.h>
#include "IO/ReadBufferFromFileBase.h"
#include "IO/ReadHelpers.h"
#include "IO/WriteHelpers.h"
namespace ProfileEvents
{
extern const Event VFSGcCumulativeSnapshotBytesRead;
}
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
bool VFSSnapshotEntry::operator==(const VFSSnapshotEntry & entry) const
{
return remote_path == entry.remote_path && link_count == entry.link_count;
}
std::optional<VFSSnapshotEntry> VFSSnapshotEntry::deserialize(ReadBuffer & buf)
{
if (buf.eof())
return std::nullopt;
VFSSnapshotEntry entry;
readStringUntilWhitespace(entry.remote_path, buf);
checkChar(' ', buf);
readIntTextUnsafe(entry.link_count, buf);
checkChar('\n', buf);
return entry;
}
void VFSSnapshotEntry::serialize(WriteBuffer & buf) const
{
writeString(fmt::format("{} {}\n", remote_path, link_count), buf);
}
void VFSSnapshotDataBase::writeEntryInSnaphot(
const VFSSnapshotEntry & entry, WriteBuffer & write_buffer, VFSSnapshotEntries & entries_to_remove)
{
if (entry.link_count < 0)
{
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Broken links count for file with remote path {}.", entry.remote_path);
}
else if (entry.link_count == 0)
{
entries_to_remove.emplace_back(entry);
}
else if (entry.link_count > 0)
{
entry.serialize(write_buffer);
}
}
SnapshotMetadata VFSSnapshotDataBase::mergeWithWals(VFSLogItems && wal_items, const SnapshotMetadata & old_snapshot_meta)
{
/// For most of object stroges (like s3 or azure) we don't need the object path, it's generated randomly.
/// But other ones reqiested to set it manually.
std::unique_ptr<ReadBuffer> shapshot_read_buffer = getShapshotReadBuffer(old_snapshot_meta);
auto [new_shapshot_write_buffer, new_object_key] = getShapshotWriteBufferAndSnaphotObject(old_snapshot_meta);
LOG_DEBUG(log, "Going to merge WAL batch(size {}) with snapshot (key {})", wal_items.size(), old_snapshot_meta.object_storage_key);
auto entires_to_remove = mergeWithWalsImpl(std::move(wal_items), *shapshot_read_buffer, *new_shapshot_write_buffer);
SnapshotMetadata new_snaphot_meta(new_object_key);
LOG_DEBUG(log, "Merge snapshot have finished, going to remove {} from object storage", entires_to_remove.size());
removeShapshotEntires(entires_to_remove);
return new_snaphot_meta;
}
VFSSnapshotEntries VFSSnapshotDataBase::mergeWithWalsImpl(VFSLogItems && wal_items_, ReadBuffer & read_buffer, WriteBuffer & write_buffer)
{
VFSSnapshotEntries entries_to_remove;
auto wal_items = std::move(wal_items_);
std::ranges::sort(
wal_items, [](const VFSLogItem & left, const VFSLogItem & right) { return left.event.remote_path < right.event.remote_path; });
auto current_snapshot_entry = VFSSnapshotEntry::deserialize(read_buffer);
// Implementation similar to the merge operation:
// Iterating thought 2 sorted vectors.
// If the links count will be == 0, then add to entries_to_remove
// Else perform sum and append into shapshot
for (auto wal_iterator = wal_items.begin(); wal_iterator != wal_items.end();)
{
// Combine all wal items with the same remote_path into single one.
VFSSnapshotEntry entry_to_merge = {wal_iterator->event.remote_path, 0};
while (wal_iterator != wal_items.end() && wal_iterator->event.remote_path == entry_to_merge.remote_path)
{
int delta_link_count = 0; // TODO: remove delta and save local_path in the snapshot
switch (wal_iterator->event.action)
{
case VFSAction::LINK:
case VFSAction::REQUEST:
delta_link_count = 1;
break;
case VFSAction::UNLINK:
delta_link_count = -1;
break;
}
entry_to_merge.link_count += delta_link_count;
++wal_iterator;
}
// Write and skip entries from snaphot which we won't update
while (current_snapshot_entry && current_snapshot_entry->remote_path < entry_to_merge.remote_path)
{
auto next_snapshot_entry = VFSSnapshotEntry::deserialize(read_buffer);
if (next_snapshot_entry && current_snapshot_entry->remote_path > next_snapshot_entry->remote_path)
{
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "VFS snapshot is not sorted.");
}
current_snapshot_entry->serialize(write_buffer);
std::swap(current_snapshot_entry, next_snapshot_entry);
}
if (!current_snapshot_entry || current_snapshot_entry->remote_path > entry_to_merge.remote_path)
{
writeEntryInSnaphot(entry_to_merge, write_buffer, entries_to_remove);
continue;
}
else if (current_snapshot_entry->remote_path == entry_to_merge.remote_path)
{
current_snapshot_entry->link_count += entry_to_merge.link_count;
writeEntryInSnaphot(*current_snapshot_entry, write_buffer, entries_to_remove);
current_snapshot_entry = VFSSnapshotEntry::deserialize(read_buffer);
}
else
{
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Unreachable");
}
}
while (current_snapshot_entry)
{
current_snapshot_entry->serialize(write_buffer);
auto next_snapshot_entry = VFSSnapshotEntry::deserialize(read_buffer);
if (next_snapshot_entry && current_snapshot_entry->remote_path > next_snapshot_entry->remote_path)
{
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "VFS snapshot is not sorted.");
}
std::swap(current_snapshot_entry, next_snapshot_entry);
}
write_buffer.finalize();
return entries_to_remove;
}
std::unique_ptr<ReadBuffer> VFSSnapshotDataFromObjectStorage::getShapshotReadBuffer(const SnapshotMetadata & snapshot_meta) const
{
if (!snapshot_meta.is_initial_snaphot)
{
StoredObject object(snapshot_meta.object_storage_key, "", snapshot_meta.total_size);
// to do read settings.
auto res = object_storage->readObject(object, {});
return res;
}
return std::make_unique<ReadBufferFromEmptyFile>();
}
std::pair<std::unique_ptr<WriteBuffer>, String>
VFSSnapshotDataFromObjectStorage::getShapshotWriteBufferAndSnaphotObject(const SnapshotMetadata & snapshot_meta) const
{
String new_object_path = fmt::format("/vfs_shapshots/shapshot_{}", snapshot_meta.znode_version + 1);
auto new_object_key = object_storage->generateObjectKeyForPath(new_object_path);
StoredObject new_object(new_object_key.serialize());
std::unique_ptr<WriteBuffer> new_shapshot_write_buffer = object_storage->writeObject(new_object, WriteMode::Rewrite);
return {std::move(new_shapshot_write_buffer), new_object_key.serialize()};
}
void VFSSnapshotDataFromObjectStorage::removeShapshotEntires(const VFSSnapshotEntries & entires_to_remove)
{
StoredObjects objects_to_remove;
objects_to_remove.reserve(entires_to_remove.size());
for (const auto & entry : entires_to_remove)
{
objects_to_remove.emplace_back(entry.remote_path);
}
object_storage->removeObjectsIfExist(objects_to_remove);
}
}

View File

@ -0,0 +1,76 @@
#pragma once
#include <Disks/ObjectStorages/IObjectStorage.h>
#include "AppendLog.h"
#include "IO/ReadHelpers.h"
#include "VFSShapshotMetadata.h"
#include "VFSLog.h"
#include <fstream>
#include <optional>
#include <IO/Lz4DeflatingWriteBuffer.h>
#include <IO/Lz4InflatingReadBuffer.h>
#include <IO/ReadBufferFromString.h>
#include <fmt/chrono.h>
namespace DB
{
struct VFSSnapshotEntry
{
String remote_path;
Int32 link_count = 0;
bool operator==(const VFSSnapshotEntry & entry) const;
static std::optional<VFSSnapshotEntry> deserialize(ReadBuffer & buf);
void serialize(WriteBuffer & buf) const;
};
using VFSSnapshotEntries = std::vector<VFSSnapshotEntry>;
class VFSSnapshotDataBase
{
public:
VFSSnapshotDataBase() : log(getLogger("VFSSnapshotDataBase(unnamed)")) { }
virtual ~VFSSnapshotDataBase() = default;
VFSSnapshotDataBase(const VFSSnapshotDataBase &) = delete;
VFSSnapshotDataBase(VFSSnapshotDataBase &&) = delete;
/// Apply wal on the current snapshot and returns metadata of the updated one.
SnapshotMetadata mergeWithWals(VFSLogItems && wal_items, const SnapshotMetadata & old_snapshot_meta);
private:
void writeEntryInSnaphot(const VFSSnapshotEntry & entry, WriteBuffer & write_buffer, VFSSnapshotEntries & entries_to_remove);
VFSSnapshotEntries mergeWithWalsImpl(VFSLogItems && wal_items, ReadBuffer & read_buffer, WriteBuffer & write_buffer);
LoggerPtr log;
protected:
/// Methods with storage-specific logic.
virtual void removeShapshotEntires(const VFSSnapshotEntries & entires_to_remove) = 0;
virtual std::unique_ptr<ReadBuffer> getShapshotReadBuffer(const SnapshotMetadata & snapshot_meta) const = 0;
virtual std::pair<std::unique_ptr<WriteBuffer>, String>
getShapshotWriteBufferAndSnaphotObject(const SnapshotMetadata & snapshot_meta) const = 0;
};
class VFSSnapshotDataFromObjectStorage : public VFSSnapshotDataBase
{
public:
VFSSnapshotDataFromObjectStorage(ObjectStoragePtr object_storage_, const String & name_)
: object_storage(object_storage_), log(getLogger(fmt::format("VFSSnapshotDataFromObjectStorage({})", name_)))
{
}
protected:
void removeShapshotEntires(const VFSSnapshotEntries & entires_to_remove) override;
std::unique_ptr<ReadBuffer> getShapshotReadBuffer(const SnapshotMetadata & snapshot_meta) const override;
std::pair<std::unique_ptr<WriteBuffer>, String>
getShapshotWriteBufferAndSnaphotObject(const SnapshotMetadata & snapshot_meta) const override;
private:
ObjectStoragePtr object_storage;
LoggerPtr log;
};
}

View File

@ -0,0 +1,195 @@
#include <gtest/gtest.h>
#include <Disks/DiskLocal.h>
#include <Disks/IDisk.h>
#include <Disks/ObjectStorages/VFS/AppendLog.h>
#include <Poco/Checksum.h>
#include <filesystem>
using namespace DB;
namespace fs = std::filesystem;
const fs::path log_dir = "tmp/";
const std::vector<char> msg{'T', 'e', 's', 't'};
const std::vector<char> msg1{'T', 'e', 's', 't', '1'};
const std::vector<char> msg2{'T', 'e', 's', 't', '2'};
const std::vector<char> msg3{'T', 'e', 's', 't', '3'};
class WALTest : public ::testing::Test
{
protected:
void SetUp() override { fs::remove_all(log_dir); }
//void TearDown() override { SetUp(); }
};
TEST_F(WALTest, Main)
{
DB::WAL::AppendLog alog(log_dir);
auto idx = alog.append("test message");
EXPECT_EQ(idx, 0);
idx = alog.append(msg);
EXPECT_EQ(idx, 1);
EXPECT_EQ(alog.size(), 2);
}
TEST_F(WALTest, TestEntryChecksum)
{
WAL::AppendLog alog(log_dir);
UInt64 expected_index = 0;
Poco::Checksum crc(Poco::Checksum::Type::TYPE_CRC32);
crc.update(reinterpret_cast<const char *>(&expected_index), sizeof(expected_index));
crc.update(msg.data(), static_cast<unsigned int>(msg.size()));
alog.append(msg);
auto entries = alog.readFront(1);
EXPECT_EQ(entries[0].checksum, crc.checksum());
}
TEST_F(WALTest, TestLastIndex)
{
{
WAL::AppendLog alog(log_dir);
alog.append(msg);
alog.append(msg);
alog.append(msg);
ASSERT_EQ(alog.getNextIndex(), 3);
}
{
WAL::AppendLog alog(log_dir);
alog.append(msg);
ASSERT_EQ(alog.getNextIndex(), 4);
}
}
TEST_F(WALTest, TestReadFront)
{
WAL::AppendLog alog(log_dir);
alog.append(msg);
WAL::Entries entries = alog.readFront(1);
EXPECT_EQ(entries.size(), 1);
EXPECT_EQ(entries[0].index, 0);
EXPECT_EQ(entries[0].data, msg);
}
TEST_F(WALTest, TestReadFrontMultiple)
{
WAL::AppendLog alog(log_dir);
alog.append(msg1);
alog.append(msg2);
{
WAL::Entries entries = alog.readFront(1);
EXPECT_EQ(entries[0].data, msg1);
EXPECT_EQ(entries[0].index, 0);
}
{
WAL::Entries entries = alog.readFront(2);
EXPECT_EQ(entries[0].data, msg1);
EXPECT_EQ(entries[0].index, 0);
EXPECT_EQ(entries[1].data, msg2);
EXPECT_EQ(entries[1].index, 1);
}
{
WAL::Entries entries = alog.readFront(3);
EXPECT_EQ(entries.size(), 2);
}
}
TEST_F(WALTest, TestDrop)
{
{
WAL::AppendLog alog(log_dir);
alog.append(msg1);
alog.append(msg2);
size_t dropped = alog.dropUpTo(1);
EXPECT_EQ(dropped, 1);
EXPECT_EQ(alog.size(), 1);
EXPECT_EQ(alog.getNextIndex(), 2);
}
{
WAL::AppendLog alog(log_dir);
EXPECT_EQ(alog.size(), 1);
EXPECT_EQ(alog.getNextIndex(), 2);
}
}
TEST_F(WALTest, TestReadAfterDrop)
{
WAL::AppendLog alog(log_dir);
alog.append(msg1);
alog.append(msg2);
size_t dropped = alog.dropUpTo(1);
EXPECT_EQ(dropped, 1);
EXPECT_EQ(alog.size(), 1);
WAL::Entries entries = alog.readFront(1);
EXPECT_EQ(entries.size(), 1);
EXPECT_EQ(entries[0].index, 1);
alog.dropUpTo(2);
entries = alog.readFront(1);
EXPECT_EQ(entries.size(), 0);
EXPECT_EQ(alog.size(), 0);
}
TEST_F(WALTest, TestSegmentRotation)
{
WAL::AppendLog alog(log_dir, WAL::Settings{.max_segment_size = 100});
std::vector<char> buf_1(50, 0xAB);
std::vector<char> buf_2(50, 0xCD);
alog.append(buf_1);
alog.append(buf_2);
EXPECT_EQ(alog.segmentsCount(), 2);
WAL::Entries entries = alog.readFront(2);
EXPECT_EQ(entries.size(), 2);
EXPECT_EQ(entries[0].data, buf_1);
EXPECT_EQ(entries[0].index, 0);
EXPECT_EQ(entries[1].data, buf_2);
EXPECT_EQ(entries[1].index, 1);
}
TEST_F(WALTest, TestCorruptedException)
{
fs::path segment_path = log_dir / "log_00000000000000000000";
WAL::AppendLog alog(log_dir, WAL::Settings{.max_segment_size = 100});
alog.append(msg);
// Break size byte in the segment file
DB::WriteBufferFromFile buf(segment_path);
buf.write(0x01);
buf.sync();
EXPECT_NO_THROW(alog.size());
EXPECT_THROW(alog.readFront(1), DB::Exception);
// Now all public methods fails
EXPECT_THROW(alog.readFront(1), DB::Exception);
EXPECT_THROW(alog.append(msg), DB::Exception);
EXPECT_THROW(alog.size(), DB::Exception);
EXPECT_THROW(alog.dropUpTo(1), DB::Exception);
EXPECT_THROW(alog.getNextIndex(), DB::Exception);
EXPECT_THROW(alog.getStartIndex(), DB::Exception);
EXPECT_THROW(alog.segmentsCount(), DB::Exception);
}

View File

@ -0,0 +1,150 @@
#include <algorithm>
#include <iostream>
#include <Disks/ObjectStorages/VFS/VFSShapshotMetadata.h>
#include <Disks/ObjectStorages/VFS/VFSSnapshot.h>
#include <IO/WriteBufferFromString.h>
#include <gtest/gtest.h>
using namespace DB;
class VFSSnapshotDataFromString : public DB::VFSSnapshotDataBase
{
public:
VFSSnapshotDataFromString() = default;
void setSnaphotData(String str) { old_snapshot_data = str; }
String getSnapshotdata() const { return new_snaphot_data; }
VFSSnapshotEntries getEnriesToRemoveAfterLastMerge() const { return latest_entires_to_remove; }
protected:
std::unique_ptr<ReadBuffer> getShapshotReadBuffer(const SnapshotMetadata &) const override
{
return std::unique_ptr<ReadBuffer>(new ReadBufferFromString(old_snapshot_data));
}
std::pair<std::unique_ptr<WriteBuffer>, String> getShapshotWriteBufferAndSnaphotObject(const SnapshotMetadata &) const override
{
String key_placeholed = "test";
auto write_buffer = std::unique_ptr<WriteBuffer>(new WriteBufferFromString(new_snaphot_data));
return {std::move(write_buffer), key_placeholed};
}
void removeShapshotEntires(const VFSSnapshotEntries & entires_to_remove) override { latest_entires_to_remove = entires_to_remove; }
private:
String old_snapshot_data;
mutable String new_snaphot_data;
VFSSnapshotEntries latest_entires_to_remove;
};
String convertSnaphotEntiesToString(const VFSSnapshotEntries & entries)
{
String serialized;
{
WriteBufferFromString wb(serialized);
for (const auto & entry : entries)
{
entry.serialize(wb);
}
}
return serialized;
}
TEST(DiskObjectStorageVFS, SnaphotEntriesSerialization)
{
VFSSnapshotEntries entries = {{"/b", 1}, {"/a", 1}};
String serialized = convertSnaphotEntiesToString(entries);
String expected_serialized = "/b 1\n/a 1\n";
EXPECT_EQ(expected_serialized, serialized);
VFSSnapshotEntries deserialized;
{
ReadBufferFromString rb(serialized);
while (auto entry = VFSSnapshotEntry::deserialize(rb))
{
deserialized.emplace_back(*entry);
}
}
EXPECT_EQ(deserialized.size(), entries.size());
EXPECT_EQ(deserialized, entries);
}
TEST(DiskObjectStorageVFS, ExceptionOnUnsortedSnapshot)
{
VFSSnapshotEntries entries = {{"/b", 1}, {"/a", 1}, {"/c", 2}};
VFSSnapshotDataFromString snapshot_data;
snapshot_data.setSnaphotData(convertSnaphotEntiesToString(entries));
SnapshotMetadata placeholder_metadata;
EXPECT_THROW(auto new_snapshot = snapshot_data.mergeWithWals({}, placeholder_metadata), Exception) << "Snaphot is unsorted";
}
// TEST(DiskObjectStorageVFS, MergeWalWithSnaphot)
// {
// WALItems wals = {{"/c", 1}, {"/d", 2}, {"/b", 1}};
// String expected_snaphot_state;
// VFSSnapshotDataFromString snapshot_data;
// SnapshotMetadata placeholder_metadata;
// {
// snapshot_data.setSnaphotData("");
// snapshot_data.mergeWithWals(wals, placeholder_metadata);
// auto to_remove = snapshot_data.getEnriesToRemoveAfterLastMerge();
// VFSSnapshotEntries expected_entries_to_remove = {};
// EXPECT_EQ(expected_entries_to_remove, to_remove);
// }
// expected_snaphot_state = "/b 1\n/c 1\n/d 2\n";
// EXPECT_EQ(expected_snaphot_state, snapshot_data.getSnapshotdata());
// wals = {{"/c", -1}, {"/d", -1}, {"/d", -1}, {"/e", -1}, {"/e", 1}, {"/f", 1}, {"/f", 1}, {"/f", -1}, {"/f", 1}};
// {
// snapshot_data.setSnaphotData(expected_snaphot_state);
// snapshot_data.mergeWithWals(wals, placeholder_metadata);
// auto to_remove = snapshot_data.getEnriesToRemoveAfterLastMerge();
// VFSSnapshotEntries expected_entries_to_remove = {{"/c", 0}, {"/d", 0}, {"/e", 0}};
// EXPECT_EQ(expected_entries_to_remove, to_remove);
// }
// expected_snaphot_state = "/b 1\n/f 2\n";
// EXPECT_EQ(expected_snaphot_state, snapshot_data.getSnapshotdata());
// wals = {
// {"/a", 1},
// {"/b", 1},
// {"/c", 1},
// };
// {
// snapshot_data.setSnaphotData(expected_snaphot_state);
// snapshot_data.mergeWithWals(wals, placeholder_metadata);
// auto to_remove = snapshot_data.getEnriesToRemoveAfterLastMerge();
// VFSSnapshotEntries expected_entries_to_remove;
// EXPECT_EQ(expected_entries_to_remove, to_remove);
// }
// expected_snaphot_state = "/a 1\n/b 2\n/c 1\n/f 2\n";
// EXPECT_EQ(expected_snaphot_state, snapshot_data.getSnapshotdata());
// wals = {{"/a", -1}, {"/b", -2}, {"/c", -1}, {"/f", -2}};
// {
// snapshot_data.setSnaphotData(expected_snaphot_state);
// snapshot_data.mergeWithWals(wals, placeholder_metadata);
// auto to_remove = snapshot_data.getEnriesToRemoveAfterLastMerge();
// VFSSnapshotEntries expected_entries_to_remove = {{"/a", 0}, {"/b", 0}, {"/c", 0}, {"/f", 0}};
// EXPECT_EQ(expected_entries_to_remove, to_remove);
// }
// expected_snaphot_state = "";
// EXPECT_EQ(expected_snaphot_state, snapshot_data.getSnapshotdata());
// }

View File

@ -0,0 +1,60 @@
#include <gtest/gtest.h>
#include <Disks/DiskLocal.h>
#include <Disks/ObjectStorages/VFS/JSONSerializer.h>
#include <Disks/ObjectStorages/VFS/VFSLog.h>
#include <Poco/Checksum.h>
#include <filesystem>
using namespace DB;
namespace fs = std::filesystem;
const fs::path log_dir = "tmp/";
const String remote_path = "/cloud_storage/remote/path";
const String local_path = "/table/path/to/part/file";
class VFSLogTest : public ::testing::Test
{
protected:
void SetUp() override { fs::remove_all(log_dir); }
};
TEST_F(VFSLogTest, Serialization)
{
{
VFSEvent event_in{remote_path, local_path, {}, Poco::Timestamp(), VFSAction::LINK};
auto buf = JSONSerializer::serialize(event_in);
VFSEvent event_out = JSONSerializer::deserialize(buf);
EXPECT_EQ(event_in, event_out);
}
{
VFSEvent event_in{remote_path, local_path, WALInfo{"src_replica", UUIDHelpers::generateV4(), 1}, Poco::Timestamp(), VFSAction::REQUEST};
auto buf = JSONSerializer::serialize(event_in);
VFSEvent event_out = JSONSerializer::deserialize(buf);
EXPECT_EQ(event_in, event_out);
}
}
TEST_F(VFSLogTest, Main)
{
DB::VFSLog vfs_log(log_dir);
VFSEvent event{remote_path, local_path, {}, Poco::Timestamp(), VFSAction::LINK};
vfs_log.write(event);
EXPECT_EQ(vfs_log.size(), 1);
VFSLogItems items = vfs_log.read(1);
EXPECT_EQ(items.size(), 1);
EXPECT_EQ(items[0].wal.index, 0);
vfs_log.dropUpTo(1);
EXPECT_EQ(vfs_log.size(), 0);
}

View File

@ -722,7 +722,14 @@ void ASTFunction::formatImplWithoutAlias(const FormatSettings & settings, Format
assert_cast<const ASTFunction *>(argument.get())->arguments->children[0]->formatImpl(settings, state, nested_dont_need_parens);
settings.ostr << (settings.hilite ? hilite_operator : "") << " = " << (settings.hilite ? hilite_none : "");
}
settings.ostr << "'[HIDDEN]'";
if (!secret_arguments.replacement.empty())
{
settings.ostr << "'" << secret_arguments.replacement << "'";
}
else
{
settings.ostr << "'[HIDDEN]'";
}
if (size <= secret_arguments.start + secret_arguments.count && !secret_arguments.are_named)
break; /// All other arguments should also be hidden.
continue;

View File

@ -1,6 +1,7 @@
#pragma once
#include <Common/KnownObjectNames.h>
#include <Common/re2.h>
#include <Core/QualifiedTableName.h>
#include <base/defines.h>
#include <boost/algorithm/string/predicate.hpp>
@ -49,6 +50,11 @@ public:
bool are_named = false; /// Arguments like `password = 'password'` are considered as named arguments.
/// E.g. "headers" in `url('..', headers('foo' = '[HIDDEN]'))`
std::vector<std::string> nested_maps;
/// Full replacement of an argument. Only supported when count is 1, otherwise all arguments will be replaced with this string.
/// It's needed in cases when we don't want to hide the entire parameter, but some part of it, e.g. "connection_string" in
/// `azureBlobStorage('DefaultEndpointsProtocol=https;AccountKey=secretkey;...', ...)` should be replaced with
/// `azureBlobStorage('DefaultEndpointsProtocol=https;AccountKey=[HIDDEN];...', ...)`.
std::string replacement;
bool hasSecrets() const
{
@ -74,6 +80,7 @@ protected:
result.are_named = argument_is_named;
}
chassert(index >= result.start); /// We always check arguments consecutively
chassert(result.replacement.empty()); /// We shouldn't use replacement with masking other arguments
result.count = index + 1 - result.start;
if (!argument_is_named)
result.are_named = false;
@ -199,32 +206,39 @@ protected:
void findAzureBlobStorageFunctionSecretArguments(bool is_cluster_function)
{
/// azureBlobStorage('cluster_name', 'conn_string/storage_account_url', ...) has 'conn_string/storage_account_url' as its second argument.
/// azureBlobStorageCluster('cluster_name', 'conn_string/storage_account_url', ...) has 'conn_string/storage_account_url' as its second argument.
size_t url_arg_idx = is_cluster_function ? 1 : 0;
if (!is_cluster_function && isNamedCollectionName(0))
{
/// azureBlobStorage(named_collection, ..., account_key = 'account_key', ...)
if (maskAzureConnectionString(-1, true, 1))
return;
findSecretNamedArgument("account_key", 1);
return;
}
else if (is_cluster_function && isNamedCollectionName(1))
{
/// azureBlobStorageCluster(cluster, named_collection, ..., account_key = 'account_key', ...)
if (maskAzureConnectionString(-1, true, 2))
return;
findSecretNamedArgument("account_key", 2);
return;
}
/// We should check other arguments first because we don't need to do any replacement in case storage_account_url is not used
/// azureBlobStorage(connection_string|storage_account_url, container_name, blobpath, account_name, account_key, format, compression, structure)
/// azureBlobStorageCluster(cluster, connection_string|storage_account_url, container_name, blobpath, [account_name, account_key, format, compression, structure])
if (maskAzureConnectionString(url_arg_idx))
return;
/// We should check other arguments first because we don't need to do any replacement in case of
/// azureBlobStorage(connection_string|storage_account_url, container_name, blobpath, format, [account_name, account_key, ...])
/// azureBlobStorageCluster(cluster, connection_string|storage_account_url, container_name, blobpath, format, [account_name, account_key, ...])
size_t count = function->arguments->size();
if ((url_arg_idx + 4 <= count) && (count <= url_arg_idx + 7))
{
String second_arg;
if (tryGetStringFromArgument(url_arg_idx + 3, &second_arg))
String fourth_arg;
if (tryGetStringFromArgument(url_arg_idx + 3, &fourth_arg))
{
if (second_arg == "auto" || KnownFormatNames::instance().exists(second_arg))
if (fourth_arg == "auto" || KnownFormatNames::instance().exists(fourth_arg))
return; /// The argument after 'url' is a format: s3('url', 'format', ...)
}
}
@ -234,6 +248,40 @@ protected:
markSecretArgument(url_arg_idx + 4);
}
bool maskAzureConnectionString(ssize_t url_arg_idx, bool argument_is_named = false, size_t start = 0)
{
String url_arg;
if (argument_is_named)
{
url_arg_idx = findNamedArgument(&url_arg, "connection_string", start);
if (url_arg_idx == -1 || url_arg.empty())
url_arg_idx = findNamedArgument(&url_arg, "storage_account_url", start);
if (url_arg_idx == -1 || url_arg.empty())
return false;
}
else
{
if (!tryGetStringFromArgument(url_arg_idx, &url_arg))
return false;
}
if (!url_arg.starts_with("http"))
{
static re2::RE2 account_key_pattern = "AccountKey=.*?(;|$)";
if (RE2::Replace(&url_arg, account_key_pattern, "AccountKey=[HIDDEN]\\1"))
{
chassert(result.count == 0); /// We shouldn't use replacement with masking other arguments
result.start = url_arg_idx;
result.are_named = argument_is_named;
result.count = 1;
result.replacement = url_arg;
return true;
}
}
return false;
}
void findURLSecretArguments()
{
if (!isNamedCollectionName(0))
@ -513,8 +561,9 @@ protected:
return function->arguments->at(arg_idx)->isIdentifier();
}
/// Looks for a secret argument with a specified name. This function looks for arguments in format `key=value` where the key is specified.
void findSecretNamedArgument(const std::string_view & key, size_t start = 0)
/// Looks for an argument with a specified name. This function looks for arguments in format `key=value` where the key is specified.
/// Returns -1 if no argument was found.
ssize_t findNamedArgument(String * res, const std::string_view & key, size_t start = 0)
{
for (size_t i = start; i < function->arguments->size(); ++i)
{
@ -531,8 +580,22 @@ protected:
continue;
if (found_key == key)
markSecretArgument(i, /* argument_is_named= */ true);
{
tryGetStringFromArgument(*equals_func->arguments->at(1), res);
return i;
}
}
return -1;
}
/// Looks for a secret argument with a specified name. This function looks for arguments in format `key=value` where the key is specified.
/// If the argument is found, it is marked as a secret.
void findSecretNamedArgument(const std::string_view & key, size_t start = 0)
{
ssize_t arg_idx = findNamedArgument(nullptr, key, start);
if (arg_idx >= 0)
markSecretArgument(arg_idx, /* argument_is_named= */ true);
}
};

View File

@ -223,7 +223,7 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context,
{
account_name = fourth_arg;
account_key = checkAndGetLiteralArgument<String>(engine_args[4], "account_key");
auto sixth_arg = checkAndGetLiteralArgument<String>(engine_args[5], "format/account_name");
auto sixth_arg = checkAndGetLiteralArgument<String>(engine_args[5], "format/structure");
if (is_format_arg(sixth_arg))
{
format = sixth_arg;
@ -257,10 +257,10 @@ void StorageAzureConfiguration::fromAST(ASTs & engine_args, ContextPtr context,
}
else if (with_structure && engine_args.size() == 8)
{
auto fourth_arg = checkAndGetLiteralArgument<String>(engine_args[3], "format/account_name");
auto fourth_arg = checkAndGetLiteralArgument<String>(engine_args[3], "account_name");
account_name = fourth_arg;
account_key = checkAndGetLiteralArgument<String>(engine_args[4], "account_key");
auto sixth_arg = checkAndGetLiteralArgument<String>(engine_args[5], "format/account_name");
auto sixth_arg = checkAndGetLiteralArgument<String>(engine_args[5], "format");
if (!is_format_arg(sixth_arg))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown format {}", sixth_arg);
format = sixth_arg;

View File

@ -131,7 +131,7 @@ std::shared_ptr<StorageObjectStorageSource::IIterator> StorageObjectStorageSourc
else
{
ConfigurationPtr copy_configuration = configuration->clone();
auto filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns);
auto filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns, local_context);
if (filter_dag)
{
auto keys = configuration->getPaths();
@ -142,7 +142,7 @@ std::shared_ptr<StorageObjectStorageSource::IIterator> StorageObjectStorageSourc
VirtualColumnUtils::buildSetsForDAG(*filter_dag, local_context);
auto actions = std::make_shared<ExpressionActions>(std::move(*filter_dag));
VirtualColumnUtils::filterByPathOrFile(keys, paths, actions, virtual_columns);
VirtualColumnUtils::filterByPathOrFile(keys, paths, actions, virtual_columns, local_context);
copy_configuration->setPaths(keys);
}
@ -489,6 +489,7 @@ StorageObjectStorageSource::GlobIterator::GlobIterator(
, virtual_columns(virtual_columns_)
, throw_on_zero_files_match(throw_on_zero_files_match_)
, read_keys(read_keys_)
, local_context(context_)
, file_progress_callback(file_progress_callback_)
{
if (configuration->isNamespaceWithGlobs())
@ -510,7 +511,7 @@ StorageObjectStorageSource::GlobIterator::GlobIterator(
}
recursive = key_with_globs == "/**";
if (auto filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns))
if (auto filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns, local_context))
{
VirtualColumnUtils::buildSetsForDAG(*filter_dag, getContext());
filter_expr = std::make_shared<ExpressionActions>(std::move(*filter_dag));
@ -585,7 +586,7 @@ StorageObjectStorage::ObjectInfoPtr StorageObjectStorageSource::GlobIterator::ne
for (const auto & object_info : new_batch)
paths.push_back(getUniqueStoragePathIdentifier(*configuration, *object_info, false));
VirtualColumnUtils::filterByPathOrFile(new_batch, paths, filter_expr, virtual_columns);
VirtualColumnUtils::filterByPathOrFile(new_batch, paths, filter_expr, virtual_columns, local_context);
LOG_TEST(logger, "Filtered files: {} -> {}", paths.size(), new_batch.size());
}

View File

@ -220,6 +220,7 @@ private:
bool is_finished = false;
bool first_iteration = true;
std::mutex next_mutex;
const ContextPtr local_context;
std::function<void(FileProgress)> file_progress_callback;
};

View File

@ -1141,13 +1141,13 @@ StorageFileSource::FilesIterator::FilesIterator(
{
std::optional<ActionsDAG> filter_dag;
if (!distributed_processing && !archive_info && !files.empty())
filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns);
filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns, context_);
if (filter_dag)
{
VirtualColumnUtils::buildSetsForDAG(*filter_dag, context_);
auto actions = std::make_shared<ExpressionActions>(std::move(*filter_dag));
VirtualColumnUtils::filterByPathOrFile(files, files, actions, virtual_columns);
VirtualColumnUtils::filterByPathOrFile(files, files, actions, virtual_columns, context_);
}
}

View File

@ -227,7 +227,7 @@ public:
std::optional<ActionsDAG> filter_dag;
if (!uris.empty())
filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns);
filter_dag = VirtualColumnUtils::createPathAndFileFilterDAG(predicate, virtual_columns, context);
if (filter_dag)
{
@ -238,7 +238,7 @@ public:
VirtualColumnUtils::buildSetsForDAG(*filter_dag, context);
auto actions = std::make_shared<ExpressionActions>(std::move(*filter_dag));
VirtualColumnUtils::filterByPathOrFile(uris, paths, actions, virtual_columns);
VirtualColumnUtils::filterByPathOrFile(uris, paths, actions, virtual_columns, context);
}
}

View File

@ -1,5 +1,6 @@
#include <memory>
#include <stack>
#include <unordered_set>
#include <Core/NamesAndTypes.h>
#include <Core/TypeId.h>
@ -46,6 +47,7 @@
#include "Functions/IFunction.h"
#include "Functions/IFunctionAdaptors.h"
#include "Functions/indexHint.h"
#include <IO/ReadBufferFromString.h>
#include <Interpreters/convertFieldToType.h>
#include <Parsers/makeASTForLogicalFunction.h>
#include <Columns/ColumnSet.h>
@ -124,9 +126,18 @@ void filterBlockWithExpression(const ExpressionActionsPtr & actions, Block & blo
}
}
NamesAndTypesList getCommonVirtualsForFileLikeStorage()
{
return {{"_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())},
{"_size", makeNullable(std::make_shared<DataTypeUInt64>())},
{"_time", makeNullable(std::make_shared<DataTypeDateTime>())},
{"_etag", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>())}};
}
NameSet getVirtualNamesForFileLikeStorage()
{
return {"_path", "_file", "_size", "_time", "_etag"};
return getCommonVirtualsForFileLikeStorage().getNameSet();
}
std::unordered_map<std::string, std::string> parseHivePartitioningKeysAndValues(const String & path)
@ -154,8 +165,10 @@ VirtualColumnsDescription getVirtualsForFileLikeStorage(ColumnsDescription & sto
{
VirtualColumnsDescription desc;
auto add_virtual = [&](const auto & name, const auto & type)
auto add_virtual = [&](const NameAndTypePair & pair)
{
const auto & name = pair.getNameInStorage();
const auto & type = pair.getTypeInStorage();
if (storage_columns.has(name))
{
if (!context->getSettingsRef().use_hive_partitioning)
@ -172,11 +185,8 @@ VirtualColumnsDescription getVirtualsForFileLikeStorage(ColumnsDescription & sto
desc.addEphemeral(name, type, "");
};
add_virtual("_path", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()));
add_virtual("_file", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()));
add_virtual("_size", makeNullable(std::make_shared<DataTypeUInt64>()));
add_virtual("_time", makeNullable(std::make_shared<DataTypeDateTime>()));
add_virtual("_etag", std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>()));
for (const auto & item : getCommonVirtualsForFileLikeStorage())
add_virtual(item);
if (context->getSettingsRef().use_hive_partitioning)
{
@ -188,16 +198,16 @@ VirtualColumnsDescription getVirtualsForFileLikeStorage(ColumnsDescription & sto
if (type == nullptr)
type = std::make_shared<DataTypeString>();
if (type->canBeInsideLowCardinality())
add_virtual(item.first, std::make_shared<DataTypeLowCardinality>(type));
add_virtual({item.first, std::make_shared<DataTypeLowCardinality>(type)});
else
add_virtual(item.first, type);
add_virtual({item.first, type});
}
}
return desc;
}
static void addPathAndFileToVirtualColumns(Block & block, const String & path, size_t idx)
static void addPathAndFileToVirtualColumns(Block & block, const String & path, size_t idx, const FormatSettings & format_settings, bool use_hive_partitioning)
{
if (block.has("_path"))
block.getByName("_path").column->assumeMutableRef().insert(path);
@ -214,18 +224,34 @@ static void addPathAndFileToVirtualColumns(Block & block, const String & path, s
block.getByName("_file").column->assumeMutableRef().insert(file);
}
if (use_hive_partitioning)
{
auto keys_and_values = parseHivePartitioningKeysAndValues(path);
for (const auto & [key, value] : keys_and_values)
{
if (const auto * column = block.findByName(key))
{
ReadBufferFromString buf(value);
column->type->getDefaultSerialization()->deserializeWholeText(column->column->assumeMutableRef(), buf, format_settings);
}
}
}
block.getByName("_idx").column->assumeMutableRef().insert(idx);
}
std::optional<ActionsDAG> createPathAndFileFilterDAG(const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns)
std::optional<ActionsDAG> createPathAndFileFilterDAG(const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
{
if (!predicate || virtual_columns.empty())
return {};
Block block;
NameSet common_virtuals;
if (context->getSettingsRef().use_hive_partitioning)
common_virtuals = getVirtualNamesForFileLikeStorage();
for (const auto & column : virtual_columns)
{
if (column.name == "_file" || column.name == "_path")
if (column.name == "_file" || column.name == "_path" || !common_virtuals.contains(column.name))
block.insert({column.type->createColumn(), column.type, column.name});
}
@ -233,18 +259,19 @@ std::optional<ActionsDAG> createPathAndFileFilterDAG(const ActionsDAG::Node * pr
return splitFilterDagForAllowedInputs(predicate, &block);
}
ColumnPtr getFilterByPathAndFileIndexes(const std::vector<String> & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns)
ColumnPtr getFilterByPathAndFileIndexes(const std::vector<String> & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
{
Block block;
NameSet common_virtuals = getVirtualNamesForFileLikeStorage();
for (const auto & column : virtual_columns)
{
if (column.name == "_file" || column.name == "_path")
if (column.name == "_file" || column.name == "_path" || !common_virtuals.contains(column.name))
block.insert({column.type->createColumn(), column.type, column.name});
}
block.insert({ColumnUInt64::create(), std::make_shared<DataTypeUInt64>(), "_idx"});
for (size_t i = 0; i != paths.size(); ++i)
addPathAndFileToVirtualColumns(block, paths[i], i);
addPathAndFileToVirtualColumns(block, paths[i], i, getFormatSettings(context), context->getSettingsRef().use_hive_partitioning);
filterBlockWithExpression(actions, block);

View File

@ -75,14 +75,14 @@ VirtualColumnsDescription getVirtualsForFileLikeStorage(
const std::string & sample_path = "",
std::optional<FormatSettings> format_settings_ = std::nullopt);
std::optional<ActionsDAG> createPathAndFileFilterDAG(const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns);
std::optional<ActionsDAG> createPathAndFileFilterDAG(const ActionsDAG::Node * predicate, const NamesAndTypesList & virtual_columns, const ContextPtr & context);
ColumnPtr getFilterByPathAndFileIndexes(const std::vector<String> & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns);
ColumnPtr getFilterByPathAndFileIndexes(const std::vector<String> & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns, const ContextPtr & context);
template <typename T>
void filterByPathOrFile(std::vector<T> & sources, const std::vector<String> & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns)
void filterByPathOrFile(std::vector<T> & sources, const std::vector<String> & paths, const ExpressionActionsPtr & actions, const NamesAndTypesList & virtual_columns, const ContextPtr & context)
{
auto indexes_column = getFilterByPathAndFileIndexes(paths, actions, virtual_columns);
auto indexes_column = getFilterByPathAndFileIndexes(paths, actions, virtual_columns, context);
const auto & indexes = typeid_cast<const ColumnUInt64 &>(*indexes_column).getData();
if (indexes.size() == sources.size())
return;

View File

@ -1,5 +1,6 @@
import pytest
import random, string
import re
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
@ -336,6 +337,10 @@ def test_create_database():
def test_table_functions():
password = new_password()
azure_conn_string = cluster.env_variables["AZURITE_CONNECTION_STRING"]
account_key_pattern = re.compile("AccountKey=.*?(;|$)")
masked_azure_conn_string = re.sub(
account_key_pattern, "AccountKey=[HIDDEN]\\1", azure_conn_string
)
azure_storage_account_url = cluster.env_variables["AZURITE_STORAGE_ACCOUNT_URL"]
azure_account_name = "devstoreaccount1"
azure_account_key = "Eby8vdM02xNOcqFlqUwJPLlmEtlCDXJ1OUzFT50uSRZ6IFsuFq2UVErCz4I6tq/K1SZFPTOtr/KBHBeksoGMGw=="
@ -467,23 +472,23 @@ def test_table_functions():
"CREATE TABLE tablefunc30 (x int) AS s3('http://minio1:9001/root/data/test9.csv.gz', 'NOSIGN', 'CSV')",
"CREATE TABLE tablefunc31 (`x` int) AS s3('http://minio1:9001/root/data/test10.csv.gz', 'minio', '[HIDDEN]')",
"CREATE TABLE tablefunc32 (`x` int) AS deltaLake('http://minio1:9001/root/data/test11.csv.gz', 'minio', '[HIDDEN]')",
f"CREATE TABLE tablefunc33 (x int) AS azureBlobStorage('{azure_conn_string}', 'cont', 'test_simple.csv', 'CSV')",
f"CREATE TABLE tablefunc34 (x int) AS azureBlobStorage('{azure_conn_string}', 'cont', 'test_simple_1.csv', 'CSV', 'none')",
f"CREATE TABLE tablefunc35 (x int) AS azureBlobStorage('{azure_conn_string}', 'cont', 'test_simple_2.csv', 'CSV', 'none', 'auto')",
f"CREATE TABLE tablefunc33 (`x` int) AS azureBlobStorage('{masked_azure_conn_string}', 'cont', 'test_simple.csv', 'CSV')",
f"CREATE TABLE tablefunc34 (`x` int) AS azureBlobStorage('{masked_azure_conn_string}', 'cont', 'test_simple_1.csv', 'CSV', 'none')",
f"CREATE TABLE tablefunc35 (`x` int) AS azureBlobStorage('{masked_azure_conn_string}', 'cont', 'test_simple_2.csv', 'CSV', 'none', 'auto')",
f"CREATE TABLE tablefunc36 (`x` int) AS azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_3.csv', '{azure_account_name}', '[HIDDEN]')",
f"CREATE TABLE tablefunc37 (`x` int) AS azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_4.csv', '{azure_account_name}', '[HIDDEN]', 'CSV')",
f"CREATE TABLE tablefunc38 (`x` int) AS azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_5.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none')",
f"CREATE TABLE tablefunc39 (`x` int) AS azureBlobStorage('{azure_storage_account_url}', 'cont', 'test_simple_6.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none', 'auto')",
f"CREATE TABLE tablefunc40 (x int) AS azureBlobStorage(named_collection_2, connection_string = '{azure_conn_string}', container = 'cont', blob_path = 'test_simple_7.csv', format = 'CSV')",
f"CREATE TABLE tablefunc40 (`x` int) AS azureBlobStorage(named_collection_2, connection_string = '{masked_azure_conn_string}', container = 'cont', blob_path = 'test_simple_7.csv', format = 'CSV')",
f"CREATE TABLE tablefunc41 (`x` int) AS azureBlobStorage(named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_8.csv', account_name = '{azure_account_name}', account_key = '[HIDDEN]')",
f"CREATE TABLE tablefunc42 (x int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_conn_string}', 'cont', 'test_simple_9.csv', 'CSV')",
f"CREATE TABLE tablefunc43 (x int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_conn_string}', 'cont', 'test_simple_10.csv', 'CSV', 'none')",
f"CREATE TABLE tablefunc44 (x int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_conn_string}', 'cont', 'test_simple_11.csv', 'CSV', 'none', 'auto')",
f"CREATE TABLE tablefunc42 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{masked_azure_conn_string}', 'cont', 'test_simple_9.csv', 'CSV')",
f"CREATE TABLE tablefunc43 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{masked_azure_conn_string}', 'cont', 'test_simple_10.csv', 'CSV', 'none')",
f"CREATE TABLE tablefunc44 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{masked_azure_conn_string}', 'cont', 'test_simple_11.csv', 'CSV', 'none', 'auto')",
f"CREATE TABLE tablefunc45 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_12.csv', '{azure_account_name}', '[HIDDEN]')",
f"CREATE TABLE tablefunc46 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_13.csv', '{azure_account_name}', '[HIDDEN]', 'CSV')",
f"CREATE TABLE tablefunc47 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_14.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none')",
f"CREATE TABLE tablefunc48 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', '{azure_storage_account_url}', 'cont', 'test_simple_15.csv', '{azure_account_name}', '[HIDDEN]', 'CSV', 'none', 'auto')",
f"CREATE TABLE tablefunc49 (x int) AS azureBlobStorageCluster('test_shard_localhost', named_collection_2, connection_string = '{azure_conn_string}', container = 'cont', blob_path = 'test_simple_16.csv', format = 'CSV')",
f"CREATE TABLE tablefunc49 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', named_collection_2, connection_string = '{masked_azure_conn_string}', container = 'cont', blob_path = 'test_simple_16.csv', format = 'CSV')",
f"CREATE TABLE tablefunc50 (`x` int) AS azureBlobStorageCluster('test_shard_localhost', named_collection_2, storage_account_url = '{azure_storage_account_url}', container = 'cont', blob_path = 'test_simple_17.csv', account_name = '{azure_account_name}', account_key = '[HIDDEN]')",
"CREATE TABLE tablefunc51 (`x` int) AS iceberg('http://minio1:9001/root/data/test11.csv.gz', 'minio', '[HIDDEN]')",
],

View File

@ -33,8 +33,8 @@ Cross Elizabeth
[1,2,3] 42.42
Array(Int64) LowCardinality(Float64)
101
2070
2070
2071
2071
b
1
1

View File

@ -0,0 +1,6 @@
1
1
1
1
1
1

View File

@ -0,0 +1,72 @@
#!/usr/bin/env bash
# Tags: no-fasttest
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
DATA_DIR=$USER_FILES_PATH/$CLICKHOUSE_TEST_UNIQUE_NAME
mkdir -p $DATA_DIR
cp -r $CURDIR/data_hive/ $DATA_DIR
$CLICKHOUSE_CLIENT --query_id="test_03231_1_$CLICKHOUSE_TEST_UNIQUE_NAME" --query "
SELECT countDistinct(_path) FROM file('$DATA_DIR/data_hive/partitioning/column0=*/sample.parquet') WHERE column0 = 'Elizabeth' SETTINGS use_hive_partitioning=1, optimize_count_from_files=0;
"
$CLICKHOUSE_CLIENT --query "
SYSTEM FLUSH LOGS;
"
for _ in {1..5}; do
count=$( $CLICKHOUSE_CLIENT --query "
SELECT ProfileEvents['EngineFileLikeReadFiles'] FROM system.query_log
WHERE query_id='test_03231_1_$CLICKHOUSE_TEST_UNIQUE_NAME' AND
current_database = currentDatabase() and type='QueryFinish';" )
if [[ "$count" == "1" ]]; then
echo "1"
break
fi
sleep 1
done
$CLICKHOUSE_CLIENT --query_id="test_03231_2_$CLICKHOUSE_TEST_UNIQUE_NAME" --query "
SELECT countDistinct(_path) FROM file('$DATA_DIR/data_hive/partitioning/identifier=*/email.csv') WHERE identifier = 2070 SETTINGS use_hive_partitioning=1, optimize_count_from_files=0;
"
$CLICKHOUSE_CLIENT --query "
SYSTEM FLUSH LOGS;
"
for _ in {1..5}; do
count=$( $CLICKHOUSE_CLIENT --query "
SELECT ProfileEvents['EngineFileLikeReadFiles'] FROM system.query_log
WHERE query_id='test_03231_2_$CLICKHOUSE_TEST_UNIQUE_NAME' AND
current_database = currentDatabase() and type='QueryFinish';" )
if [[ "$count" == "1" ]]; then
echo "1"
break
fi
sleep 1
done
$CLICKHOUSE_CLIENT --query_id="test_03231_3_$CLICKHOUSE_TEST_UNIQUE_NAME" --query "
SELECT countDistinct(_path) FROM file('$DATA_DIR/data_hive/partitioning/array=*/sample.parquet') WHERE array = [1,2,3] SETTINGS use_hive_partitioning=1, optimize_count_from_files=0;
"
$CLICKHOUSE_CLIENT --query "
SYSTEM FLUSH LOGS;
"
for _ in {1..5}; do
count=$( $CLICKHOUSE_CLIENT --query "
SELECT ProfileEvents['EngineFileLikeReadFiles'] FROM system.query_log
WHERE query_id='test_03231_3_$CLICKHOUSE_TEST_UNIQUE_NAME' AND
current_database = currentDatabase() and type='QueryFinish';" )
if [[ "$count" == "1" ]]; then
echo "1"
break
fi
sleep 1
done
rm -rf $DATA_DIR

View File

@ -1 +1 @@
data_hive/partitioning/column0=Elizabeth/sample.parquet
data_hive/partitioning/non_existing_column=Elizabeth/sample.parquet

View File

@ -5,4 +5,4 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
$CLICKHOUSE_LOCAL -q "SELECT substring(_path, position(_path, 'data_hive')) FROM file('$CURDIR/data_hive/partitioning/column0=*/sample.parquet') LIMIT 1;"
$CLICKHOUSE_LOCAL -q "SELECT substring(_path, position(_path, 'data_hive')) FROM file('$CURDIR/data_hive/partitioning/non_existing_column=*/sample.parquet') LIMIT 1;"

View File

@ -0,0 +1,5 @@
_login_email,_identifier,_first_name,_last_name
laura@example.com,2070,Laura,Grey
craig@example.com,4081,Craig,Johnson
mary@example.com,9346,Mary,Jenkins
jamie@example.com,5079,Jamie,Smith
1 _login_email _identifier _first_name _last_name
2 laura@example.com 2070 Laura Grey
3 craig@example.com 4081 Craig Johnson
4 mary@example.com 9346 Mary Jenkins
5 jamie@example.com 5079 Jamie Smith