mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-17 21:24:28 +00:00
Rework Disk S3 revision number storing / searching.
This commit is contained in:
parent
cad81e3742
commit
a2623a2d97
@ -952,27 +952,22 @@ void DiskS3::startup()
|
||||
|
||||
void DiskS3::findLastRevision()
|
||||
{
|
||||
UInt64 l = 0, r = LATEST_REVISION;
|
||||
while (l < r)
|
||||
/// Construct revision number from high to low bits.
|
||||
String revision;
|
||||
for (int bit = 0; bit < 64; bit++)
|
||||
{
|
||||
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Check revision in bounds {}-{}", l, r);
|
||||
auto revision_prefix = revision + "1";
|
||||
|
||||
auto revision = l + (r - l + 1) / 2;
|
||||
if (revision == 0)
|
||||
break;
|
||||
|
||||
auto revision_str = revisionToString(revision);
|
||||
|
||||
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Check object with revision {}", revision);
|
||||
LOG_DEBUG(&Poco::Logger::get("DiskS3"), "Check object with revision prefix {} exists", revision_prefix);
|
||||
|
||||
/// Check file or operation with such revision exists.
|
||||
if (checkObjectExists(bucket, s3_root_path + "r" + revision_str)
|
||||
|| checkObjectExists(bucket, s3_root_path + "operations/r" + revision_str))
|
||||
l = revision;
|
||||
if (checkObjectExists(bucket, s3_root_path + "r" + revision_prefix)
|
||||
|| checkObjectExists(bucket, s3_root_path + "operations/r" + revision_prefix))
|
||||
revision += "1";
|
||||
else
|
||||
r = revision - 1;
|
||||
revision += "0";
|
||||
}
|
||||
revision_counter = l;
|
||||
revision_counter = static_cast<UInt64>(std::bitset<64>(revision).to_ullong());
|
||||
LOG_INFO(&Poco::Logger::get("DiskS3"), "Found last revision number {} for disk {}", revision_counter, name);
|
||||
}
|
||||
|
||||
@ -1085,7 +1080,7 @@ void DiskS3::migrateToRestorableSchema()
|
||||
|
||||
saveSchemaVersion(RESTORABLE_SCHEMA_VERSION);
|
||||
}
|
||||
catch (const Exception & e)
|
||||
catch (const Exception &)
|
||||
{
|
||||
tryLogCurrentException(&Poco::Logger::get("DiskS3"), fmt::format("Failed to migrate to restorable schema for disk {}", name));
|
||||
|
||||
@ -1173,7 +1168,7 @@ struct DiskS3::RestoreInformation
|
||||
UInt64 revision = LATEST_REVISION;
|
||||
String source_bucket;
|
||||
String source_path;
|
||||
bool detached;
|
||||
bool detached = false;
|
||||
};
|
||||
|
||||
void DiskS3::readRestoreInformation(DiskS3::RestoreInformation & restore_information)
|
||||
@ -1464,10 +1459,14 @@ void DiskS3::restoreFileOperations(const String & source_bucket, const String &
|
||||
|
||||
std::tuple<UInt64, String> DiskS3::extractRevisionAndOperationFromKey(const String & key)
|
||||
{
|
||||
UInt64 revision = UNKNOWN_REVISION;
|
||||
String revision_str;
|
||||
String operation;
|
||||
|
||||
re2::RE2::FullMatch(key, key_regexp, &revision, &operation);
|
||||
re2::RE2::FullMatch(key, key_regexp, &revision_str, &operation);
|
||||
|
||||
auto revision = revision_str.empty() ? UNKNOWN_REVISION : static_cast<UInt64>(std::bitset<64>(revision_str).to_ullong());
|
||||
|
||||
LOG_INFO(&Poco::Logger::get("DiskS3"), "Parsed revision {} {} {}", key, revision_str, revision);
|
||||
|
||||
return {revision, operation};
|
||||
}
|
||||
@ -1482,15 +1481,7 @@ String DiskS3::shrinkKey(const String & path, const String & key)
|
||||
|
||||
String DiskS3::revisionToString(UInt64 revision)
|
||||
{
|
||||
static constexpr size_t max_digits = 19; /// UInt64 max digits in decimal representation.
|
||||
|
||||
/// Align revision number with leading zeroes to have strict lexicographical order of them.
|
||||
auto revision_str = std::to_string(revision);
|
||||
auto digits_to_align = max_digits - revision_str.length();
|
||||
for (size_t i = 0; i < digits_to_align; ++i)
|
||||
revision_str = "0" + revision_str;
|
||||
|
||||
return revision_str;
|
||||
return std::bitset<64>(revision).to_string();
|
||||
}
|
||||
|
||||
void DiskS3::onFreeze(const String & path)
|
||||
|
@ -148,6 +148,7 @@ private:
|
||||
Metadata createMeta(const String & path) const;
|
||||
|
||||
void createFileOperationObject(const String & operation_name, UInt64 revision, const ObjectMetadata & metadata);
|
||||
/// Converts revision to binary string with leading zeroes (64 bit).
|
||||
static String revisionToString(UInt64 revision);
|
||||
|
||||
bool checkObjectExists(const String & source_bucket, const String & prefix);
|
||||
@ -202,7 +203,7 @@ private:
|
||||
int list_object_keys_size;
|
||||
|
||||
/// Key has format: ../../r{revision}-{operation}
|
||||
const re2::RE2 key_regexp {".*/r(\\d+)-(\\w+).*"};
|
||||
const re2::RE2 key_regexp {".*/r(\\w+)-(\\w+).*"};
|
||||
|
||||
/// Object contains information about schema version.
|
||||
inline static const String SCHEMA_VERSION_OBJECT = ".SCHEMA_VERSION";
|
||||
|
Loading…
Reference in New Issue
Block a user