Implement StorageDelta

This commit is contained in:
Daniil Rubin 2022-09-02 06:54:16 +00:00 committed by Daniil Rubin
parent fc2c8f37b1
commit c68257d711

View File

@ -56,15 +56,58 @@ JsonMetadataGetter::JsonMetadataGetter(StorageS3::S3Configuration & configuratio
void JsonMetadataGetter::Init() { void JsonMetadataGetter::Init() {
auto keys = getJsonLogFiles(); auto keys = getJsonLogFiles();
char localbuf[100];
// read data from every json log file
for (const String & key : keys) { for (const String & key : keys) {
auto buf = createS3ReadBuffer(key); auto buf = createS3ReadBuffer(key);
String json_str;
size_t opening(0), closing(0);
char c;
while (!buf->eof()) { while (buf->read(c)) {
buf->read(localbuf, 100); // skip all space characters for JSON to parse correctly
if (isspace(c)) {
continue;
}
json_str.push_back(c);
if (c == '{')
opening++;
else if (c == '}')
closing++;
if (opening == closing) {
LOG_DEBUG(log, "JSON {}, {}", json_str, json_str.size());
JSON json(json_str);
if (json.has("add")) {
auto path = json["add"]["path"].getString();
auto timestamp = json["add"]["modificationTime"].getInt();
metadata.add(path, timestamp);
LOG_DEBUG(log, "Path {}", path);
LOG_DEBUG(log, "Timestamp {}", timestamp);
} else if (json.has("remove")) {
auto path = json["remove"]["path"].getString();
auto timestamp = json["remove"]["modificationTime"].getInt();
metadata.remove(path, timestamp);
LOG_DEBUG(log, "Path {}", path);
LOG_DEBUG(log, "Timestamp {}", timestamp);
}
// reset
opening = 0;
closing = 0;
json_str.clear();
}
LOG_DEBUG(log, "{}", String(localbuf));
} }
} }
@ -124,57 +167,57 @@ StorageDelta::StorageDelta(
const String & access_key_, const String & access_key_,
const String & secret_access_key_, const String & secret_access_key_,
const StorageID & table_id_, const StorageID & table_id_,
ColumnsDescription /*columns_*/, ColumnsDescription columns_,
const ConstraintsDescription & /*constraints_*/, const ConstraintsDescription & constraints_,
const String & /*comment*/, const String & comment,
ContextPtr context_) ContextPtr context_)
: IStorage(table_id_) : IStorage(table_id_)
, base_configuration({uri_, access_key_, secret_access_key_, {}, {}, {}}) , base_configuration({uri_, access_key_, secret_access_key_, {}, {}, {}})
, log(&Poco::Logger::get("StorageDeltaLake (" + table_id_.table_name + ")")) , log(&Poco::Logger::get("StorageDeltaLake (" + table_id_.table_name + ")"))
, table_path(uri_.key) , table_path(uri_.key)
{ {
//StorageInMemoryMetadata storage_metadata; StorageInMemoryMetadata storage_metadata;
updateS3Configuration(context_, base_configuration); updateS3Configuration(context_, base_configuration);
Init();
// auto keys = getKeysFromS3();
// auto new_uri = base_configuration.uri.uri.toString() + generateQueryFromKeys(std::move(keys));
// LOG_DEBUG(log, "New uri: {}", new_uri);
// LOG_DEBUG(log, "Table path: {}", table_path);
// auto s3_uri = S3::URI(Poco::URI(new_uri));
// if (columns_.empty())
// {
// columns_
// = StorageS3::getTableStructureFromData(String("Parquet"), s3_uri, access_key_, secret_access_key_, "", false, {}, context_);
// storage_metadata.setColumns(columns_);
// }
// else
// storage_metadata.setColumns(columns_);
// storage_metadata.setConstraints(constraints_);
// storage_metadata.setComment(comment);
// setInMemoryMetadata(storage_metadata);
// s3engine = std::make_shared<StorageS3>(
// s3_uri,
// access_key_,
// secret_access_key_,
// table_id_,
// String("Parquet"), // format name
// base_configuration.rw_settings,
// columns_,
// constraints_,
// comment,
// context_,
// std::nullopt);
}
void StorageDelta::Init() {
JsonMetadataGetter getter{base_configuration, table_path, log}; JsonMetadataGetter getter{base_configuration, table_path, log};
auto keys = getter.getFiles();
for (const String & path : keys) {
LOG_DEBUG(log, "{}", path);
}
auto new_uri = base_configuration.uri.uri.toString() + generateQueryFromKeys(std::move(keys));
LOG_DEBUG(log, "New uri: {}", new_uri);
LOG_DEBUG(log, "Table path: {}", table_path);
auto s3_uri = S3::URI(Poco::URI(new_uri));
if (columns_.empty())
{
columns_
= StorageS3::getTableStructureFromData(String("Parquet"), s3_uri, access_key_, secret_access_key_, "", false, {}, context_);
storage_metadata.setColumns(columns_);
}
else
storage_metadata.setColumns(columns_);
storage_metadata.setConstraints(constraints_);
storage_metadata.setComment(comment);
setInMemoryMetadata(storage_metadata);
s3engine = std::make_shared<StorageS3>(
s3_uri,
access_key_,
secret_access_key_,
table_id_,
String("Parquet"), // format name
base_configuration.rw_settings,
columns_,
constraints_,
comment,
context_,
std::nullopt);
} }
Pipe StorageDelta::read( Pipe StorageDelta::read(