mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-15 19:02:04 +00:00
Remove unneeded logging, better
This commit is contained in:
parent
0a1734bb69
commit
3fb26aefa3
@ -18,6 +18,9 @@
|
|||||||
|
|
||||||
#include <QueryPipeline/Pipe.h>
|
#include <QueryPipeline/Pipe.h>
|
||||||
|
|
||||||
|
#include <fmt/ranges.h>
|
||||||
|
#include <fmt/format.h>
|
||||||
|
|
||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
@ -25,6 +28,7 @@ namespace ErrorCodes
|
|||||||
{
|
{
|
||||||
extern const int S3_ERROR;
|
extern const int S3_ERROR;
|
||||||
extern const int BAD_ARGUMENTS;
|
extern const int BAD_ARGUMENTS;
|
||||||
|
extern const int INCORRECT_DATA;
|
||||||
}
|
}
|
||||||
|
|
||||||
void DeltaLakeMetadata::add(const String & filename, uint64_t timestamp)
|
void DeltaLakeMetadata::add(const String & filename, uint64_t timestamp)
|
||||||
@ -34,7 +38,9 @@ void DeltaLakeMetadata::add(const String & filename, uint64_t timestamp)
|
|||||||
|
|
||||||
void DeltaLakeMetadata::remove(const String & filename, uint64_t /*timestamp */)
|
void DeltaLakeMetadata::remove(const String & filename, uint64_t /*timestamp */)
|
||||||
{
|
{
|
||||||
file_update_time.erase(filename);
|
bool erase = file_update_time.erase(filename);
|
||||||
|
if (!erase)
|
||||||
|
throw Exception(ErrorCodes::INCORRECT_DATA, "Invalid table metadata, tried to remove {} before adding it", filename);
|
||||||
}
|
}
|
||||||
|
|
||||||
std::vector<String> DeltaLakeMetadata::ListCurrentFiles() &&
|
std::vector<String> DeltaLakeMetadata::ListCurrentFiles() &&
|
||||||
@ -95,9 +101,6 @@ void JsonMetadataGetter::Init()
|
|||||||
auto timestamp = json["add"]["modificationTime"].getInt();
|
auto timestamp = json["add"]["modificationTime"].getInt();
|
||||||
|
|
||||||
metadata.add(path, timestamp);
|
metadata.add(path, timestamp);
|
||||||
|
|
||||||
LOG_DEBUG(log, "Path {}", path);
|
|
||||||
LOG_DEBUG(log, "Timestamp {}", timestamp);
|
|
||||||
}
|
}
|
||||||
else if (json.has("remove"))
|
else if (json.has("remove"))
|
||||||
{
|
{
|
||||||
@ -105,9 +108,6 @@ void JsonMetadataGetter::Init()
|
|||||||
auto timestamp = json["remove"]["deletionTimestamp"].getInt();
|
auto timestamp = json["remove"]["deletionTimestamp"].getInt();
|
||||||
|
|
||||||
metadata.remove(path, timestamp);
|
metadata.remove(path, timestamp);
|
||||||
|
|
||||||
LOG_DEBUG(log, "Path {}", path);
|
|
||||||
LOG_DEBUG(log, "Timestamp {}", timestamp);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// reset
|
// reset
|
||||||
@ -132,7 +132,7 @@ std::vector<String> JsonMetadataGetter::getJsonLogFiles()
|
|||||||
const auto bucket{base_configuration.uri.bucket};
|
const auto bucket{base_configuration.uri.bucket};
|
||||||
|
|
||||||
request.SetBucket(bucket);
|
request.SetBucket(bucket);
|
||||||
request.SetPrefix(table_path + "_delta_log");
|
request.SetPrefix(std::filesystem::path(table_path) / "_delta_log");
|
||||||
|
|
||||||
while (!is_finished)
|
while (!is_finished)
|
||||||
{
|
{
|
||||||
@ -151,7 +151,7 @@ std::vector<String> JsonMetadataGetter::getJsonLogFiles()
|
|||||||
{
|
{
|
||||||
const auto & filename = obj.GetKey();
|
const auto & filename = obj.GetKey();
|
||||||
|
|
||||||
if (filename.substr(filename.size() - 5) == ".json")
|
if (std::filesystem::path(filename).extension() == ".json")
|
||||||
keys.push_back(filename);
|
keys.push_back(filename);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -164,7 +164,6 @@ std::vector<String> JsonMetadataGetter::getJsonLogFiles()
|
|||||||
|
|
||||||
std::unique_ptr<ReadBuffer> JsonMetadataGetter::createS3ReadBuffer(const String & key)
|
std::unique_ptr<ReadBuffer> JsonMetadataGetter::createS3ReadBuffer(const String & key)
|
||||||
{
|
{
|
||||||
|
|
||||||
// TBD: add parallel downloads
|
// TBD: add parallel downloads
|
||||||
return std::make_unique<ReadBufferFromS3>(
|
return std::make_unique<ReadBufferFromS3>(
|
||||||
base_configuration.client,
|
base_configuration.client,
|
||||||
@ -196,11 +195,6 @@ StorageDelta::StorageDelta(
|
|||||||
|
|
||||||
auto keys = getter.getFiles();
|
auto keys = getter.getFiles();
|
||||||
|
|
||||||
for (const String & path : keys)
|
|
||||||
{
|
|
||||||
LOG_DEBUG(log, "{}", path);
|
|
||||||
}
|
|
||||||
|
|
||||||
auto new_uri = base_configuration.uri.uri.toString() + generateQueryFromKeys(std::move(keys));
|
auto new_uri = base_configuration.uri.uri.toString() + generateQueryFromKeys(std::move(keys));
|
||||||
|
|
||||||
LOG_DEBUG(log, "New uri: {}", new_uri);
|
LOG_DEBUG(log, "New uri: {}", new_uri);
|
||||||
@ -225,7 +219,7 @@ StorageDelta::StorageDelta(
|
|||||||
access_key_,
|
access_key_,
|
||||||
secret_access_key_,
|
secret_access_key_,
|
||||||
table_id_,
|
table_id_,
|
||||||
String("Parquet"), // format name
|
"Parquet", // format name
|
||||||
base_configuration.rw_settings,
|
base_configuration.rw_settings,
|
||||||
columns_,
|
columns_,
|
||||||
constraints_,
|
constraints_,
|
||||||
@ -297,18 +291,7 @@ void StorageDelta::updateS3Configuration(ContextPtr ctx, StorageS3::S3Configurat
|
|||||||
|
|
||||||
String StorageDelta::generateQueryFromKeys(std::vector<String> && keys)
|
String StorageDelta::generateQueryFromKeys(std::vector<String> && keys)
|
||||||
{
|
{
|
||||||
String new_query;
|
std::string new_query = fmt::format("{{{}}}", fmt::join(keys, ","));
|
||||||
|
|
||||||
for (auto && key : keys)
|
|
||||||
{
|
|
||||||
if (!new_query.empty())
|
|
||||||
{
|
|
||||||
new_query += ",";
|
|
||||||
}
|
|
||||||
new_query += key;
|
|
||||||
}
|
|
||||||
new_query = "{" + new_query + "}";
|
|
||||||
|
|
||||||
return new_query;
|
return new_query;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -29,11 +29,9 @@ class DeltaLakeMetadata
|
|||||||
public:
|
public:
|
||||||
DeltaLakeMetadata() = default;
|
DeltaLakeMetadata() = default;
|
||||||
|
|
||||||
public:
|
|
||||||
void add(const String & filename, uint64_t timestamp);
|
void add(const String & filename, uint64_t timestamp);
|
||||||
void remove(const String & filename, uint64_t timestamp);
|
void remove(const String & filename, uint64_t timestamp);
|
||||||
|
|
||||||
public:
|
|
||||||
std::vector<String> ListCurrentFiles() &&;
|
std::vector<String> ListCurrentFiles() &&;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
@ -46,18 +44,15 @@ class JsonMetadataGetter
|
|||||||
public:
|
public:
|
||||||
JsonMetadataGetter(StorageS3::S3Configuration & configuration_, const String & table_path_, Poco::Logger * log_);
|
JsonMetadataGetter(StorageS3::S3Configuration & configuration_, const String & table_path_, Poco::Logger * log_);
|
||||||
|
|
||||||
|
std::vector<String> getFiles() { return std::move(metadata).ListCurrentFiles(); }
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void Init();
|
void Init();
|
||||||
|
|
||||||
std::vector<String> getJsonLogFiles();
|
std::vector<String> getJsonLogFiles();
|
||||||
|
|
||||||
private:
|
|
||||||
std::unique_ptr<ReadBuffer> createS3ReadBuffer(const String & key);
|
std::unique_ptr<ReadBuffer> createS3ReadBuffer(const String & key);
|
||||||
|
|
||||||
public:
|
|
||||||
std::vector<String> getFiles() { return std::move(metadata).ListCurrentFiles(); }
|
|
||||||
|
|
||||||
private:
|
|
||||||
StorageS3::S3Configuration base_configuration;
|
StorageS3::S3Configuration base_configuration;
|
||||||
String table_path;
|
String table_path;
|
||||||
DeltaLakeMetadata metadata;
|
DeltaLakeMetadata metadata;
|
||||||
@ -92,11 +87,8 @@ private:
|
|||||||
void Init();
|
void Init();
|
||||||
static void updateS3Configuration(ContextPtr, StorageS3::S3Configuration &);
|
static void updateS3Configuration(ContextPtr, StorageS3::S3Configuration &);
|
||||||
|
|
||||||
|
|
||||||
private:
|
|
||||||
static String generateQueryFromKeys(std::vector<String> && keys);
|
static String generateQueryFromKeys(std::vector<String> && keys);
|
||||||
|
|
||||||
private:
|
|
||||||
StorageS3::S3Configuration base_configuration;
|
StorageS3::S3Configuration base_configuration;
|
||||||
std::shared_ptr<StorageS3> s3engine;
|
std::shared_ptr<StorageS3> s3engine;
|
||||||
Poco::Logger * log;
|
Poco::Logger * log;
|
||||||
|
Loading…
Reference in New Issue
Block a user