ClickHouse/src/Storages/StorageDelta.h

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

108 lines
2.7 KiB
C++
Raw Normal View History

#pragma once
2022-11-04 16:03:12 +00:00
#include "config.h"
2022-09-07 07:16:32 +00:00
#if USE_AWS_S3
2022-09-28 11:21:32 +00:00
# include <Storages/IStorage.h>
# include <Storages/StorageS3.h>
2022-09-28 11:21:32 +00:00
# include <unordered_map>
# include <base/JSON.h>
namespace Poco
{
class Logger;
}
namespace Aws::S3
{
class S3Client;
}
namespace DB
{
// class to parse json deltalake metadata and find files needed for query in table
2022-09-06 15:01:34 +00:00
class DeltaLakeMetadata
{
public:
DeltaLakeMetadata() = default;
2022-09-21 16:14:51 +00:00
void setLastModifiedTime(const String & filename, uint64_t timestamp);
void remove(const String & filename, uint64_t timestamp);
std::vector<String> ListCurrentFiles() &&;
private:
std::unordered_map<String, uint64_t> file_update_time;
};
// class to get deltalake log json files and read json from them
2022-09-06 15:01:34 +00:00
class JsonMetadataGetter
{
public:
2022-11-04 20:49:24 +00:00
JsonMetadataGetter(StorageS3::S3Configuration & configuration_, const String & table_path_, ContextPtr context);
2022-09-20 10:24:47 +00:00
std::vector<String> getFiles() { return std::move(metadata).ListCurrentFiles(); }
private:
2022-11-04 20:49:24 +00:00
void Init(ContextPtr context);
std::vector<String> getJsonLogFiles();
2022-11-04 20:49:24 +00:00
std::shared_ptr<ReadBuffer> createS3ReadBuffer(const String & key, ContextPtr context);
2022-09-28 11:21:32 +00:00
2022-09-20 14:16:27 +00:00
void handleJSON(const JSON & json);
StorageS3::S3Configuration base_configuration;
String table_path;
DeltaLakeMetadata metadata;
};
class StorageDelta : public IStorage
{
public:
2022-11-04 20:49:24 +00:00
// 1. Parses internal file structure of table
// 2. Finds out parts with latest version
// 3. Creates url for underlying StorageS3 enigne to handle reads
StorageDelta(
2022-11-03 17:28:41 +00:00
const StorageS3Configuration & configuration_,
const StorageID & table_id_,
ColumnsDescription columns_,
const ConstraintsDescription & constraints_,
const String & comment,
2022-11-03 17:28:41 +00:00
ContextPtr context_,
std::optional<FormatSettings> format_settings_);
String getName() const override { return "DeltaLake"; }
2022-11-04 20:49:24 +00:00
// Reads latest version of DeltaLake table
Pipe read(
const Names & column_names,
const StorageSnapshotPtr & storage_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
2022-11-04 16:03:12 +00:00
size_t num_streams) override;
private:
void Init();
2022-11-04 20:51:44 +00:00
2022-11-04 20:49:24 +00:00
// DeltaLake stores data in parts in different files
// keys is vector of parts with latest version
// generateQueryFromKeys constructs query from parts filenames for
// underlying StorageS3 engine
2022-09-19 15:23:07 +00:00
static String generateQueryFromKeys(std::vector<String> && keys);
StorageS3::S3Configuration base_configuration;
std::shared_ptr<StorageS3> s3engine;
Poco::Logger * log;
String table_path;
};
}
2022-09-07 07:16:32 +00:00
#endif