Remove unneeded comments, format

This commit is contained in:
Daniil Rubin 2022-09-06 15:01:34 +00:00 committed by Daniil Rubin
parent 616b72d665
commit a77aa6b2b0
2 changed files with 52 additions and 43 deletions

View File

@ -2,9 +2,9 @@
#include <Common/config.h>
#include <Common/logger_useful.h>
#include <IO/S3Common.h>
#include <IO/ReadBufferFromS3.h>
#include <IO/ReadSettings.h>
#include <IO/S3Common.h>
#include <Storages/StorageFactory.h>
#include <Storages/checkAndGetLiteralArgument.h>
@ -24,49 +24,52 @@ namespace ErrorCodes
extern const int S3_ERROR;
}
void DeltaLakeMetadata::add(const String & key, uint64_t timestamp) {
void DeltaLakeMetadata::add(const String & key, uint64_t timestamp)
{
file_update_time[key] = timestamp;
}
void DeltaLakeMetadata::remove(const String & key, uint64_t /*timestamp */) {
void DeltaLakeMetadata::remove(const String & key, uint64_t /*timestamp */)
{
file_update_time.erase(key);
}
std::vector<String> DeltaLakeMetadata::ListCurrentFiles() && {
std::vector<String> DeltaLakeMetadata::ListCurrentFiles() &&
{
std::vector<String> keys;
keys.reserve(file_update_time.size());
for (auto && [k, _] : file_update_time) {
for (auto && [k, _] : file_update_time)
{
keys.push_back(k);
}
return keys;
}
JsonMetadataGetter::JsonMetadataGetter(StorageS3::S3Configuration & configuration_,
const String & table_path_,
Poco::Logger * log_) :
base_configuration(configuration_)
, table_path(table_path_)
, metadata()
, log(log_)
{
Init();
}
JsonMetadataGetter::JsonMetadataGetter(StorageS3::S3Configuration & configuration_, const String & table_path_, Poco::Logger * log_)
: base_configuration(configuration_), table_path(table_path_), metadata(), log(log_)
{
Init();
}
void JsonMetadataGetter::Init() {
void JsonMetadataGetter::Init()
{
auto keys = getJsonLogFiles();
// read data from every json log file
for (const String & key : keys) {
for (const String & key : keys)
{
auto buf = createS3ReadBuffer(key);
String json_str;
size_t opening(0), closing(0);
char c;
while (buf->read(c)) {
while (buf->read(c))
{
// skip all space characters for JSON to parse correctly
if (isspace(c)) {
if (isspace(c))
{
continue;
}
@ -77,13 +80,14 @@ void JsonMetadataGetter::Init() {
else if (c == '}')
closing++;
if (opening == closing) {
if (opening == closing)
{
LOG_DEBUG(log, "JSON {}, {}", json_str, json_str.size());
JSON json(json_str);
if (json.has("add")) {
if (json.has("add"))
{
auto path = json["add"]["path"].getString();
auto timestamp = json["add"]["modificationTime"].getInt();
@ -91,7 +95,9 @@ void JsonMetadataGetter::Init() {
LOG_DEBUG(log, "Path {}", path);
LOG_DEBUG(log, "Timestamp {}", timestamp);
} else if (json.has("remove")) {
}
else if (json.has("remove"))
{
auto path = json["remove"]["path"].getString();
auto timestamp = json["remove"]["deletionTimestamp"].getInt();
@ -100,20 +106,18 @@ void JsonMetadataGetter::Init() {
LOG_DEBUG(log, "Path {}", path);
LOG_DEBUG(log, "Timestamp {}", timestamp);
}
// reset
opening = 0;
closing = 0;
json_str.clear();
}
}
}
}
std::vector<String> JsonMetadataGetter::getJsonLogFiles() {
std::vector<String> JsonMetadataGetter::getJsonLogFiles()
{
std::vector<String> keys;
const auto & client = base_configuration.client;
@ -143,7 +147,7 @@ std::vector<String> JsonMetadataGetter::getJsonLogFiles() {
for (const auto & obj : result_batch)
{
const auto & filename = obj.GetKey();
if (filename.substr(filename.size() - 5) == ".json")
keys.push_back(filename);
}
@ -155,11 +159,17 @@ std::vector<String> JsonMetadataGetter::getJsonLogFiles() {
return keys;
}
std::unique_ptr<ReadBuffer> JsonMetadataGetter::createS3ReadBuffer(const String & key) {
// size_t object_size = DB::S3::getObjectSize(base_configuration.client, base_configuration.uri.bucket, key, base_configuration.uri.version_id, false);
std::unique_ptr<ReadBuffer> JsonMetadataGetter::createS3ReadBuffer(const String & key)
{
// TBD: add parallel downloads
return std::make_unique<ReadBufferFromS3>(base_configuration.client, base_configuration.uri.bucket, key, base_configuration.uri.version_id, /* max single read retries */ 10, ReadSettings{});
return std::make_unique<ReadBufferFromS3>(
base_configuration.client,
base_configuration.uri.bucket,
key,
base_configuration.uri.version_id,
/* max single read retries */ 10,
ReadSettings{});
}
StorageDelta::StorageDelta(
@ -178,12 +188,13 @@ StorageDelta::StorageDelta(
{
StorageInMemoryMetadata storage_metadata;
updateS3Configuration(context_, base_configuration);
JsonMetadataGetter getter{base_configuration, table_path, log};
auto keys = getter.getFiles();
for (const String & path : keys) {
for (const String & path : keys)
{
LOG_DEBUG(log, "{}", path);
}
@ -200,7 +211,7 @@ StorageDelta::StorageDelta(
storage_metadata.setColumns(columns_);
}
else
storage_metadata.setColumns(columns_);
storage_metadata.setColumns(columns_);
storage_metadata.setConstraints(constraints_);
storage_metadata.setComment(comment);

View File

@ -22,7 +22,8 @@ namespace DB
{
// class to parse json deltalake metadata and find files needed for query in table
class DeltaLakeMetadata {
class DeltaLakeMetadata
{
public:
DeltaLakeMetadata() = default;
@ -38,13 +39,10 @@ private:
};
// class to get deltalake log json files and read json from them
class JsonMetadataGetter
class JsonMetadataGetter
{
public:
JsonMetadataGetter(StorageS3::S3Configuration & configuration_,
const String & table_path_,
Poco::Logger * log_
);
JsonMetadataGetter(StorageS3::S3Configuration & configuration_, const String & table_path_, Poco::Logger * log_);
private:
void Init();
@ -91,7 +89,7 @@ public:
private:
void Init();
static void updateS3Configuration(ContextPtr, StorageS3::S3Configuration &);
private:
String generateQueryFromKeys(std::vector<String> && keys);