diff --git a/src/Storages/StorageDelta.cpp b/src/Storages/StorageDelta.cpp index 8da5b5ce323..05b418e4208 100644 --- a/src/Storages/StorageDelta.cpp +++ b/src/Storages/StorageDelta.cpp @@ -56,15 +56,58 @@ JsonMetadataGetter::JsonMetadataGetter(StorageS3::S3Configuration & configuratio void JsonMetadataGetter::Init() { auto keys = getJsonLogFiles(); - char localbuf[100]; - + + // read data from every json log file for (const String & key : keys) { auto buf = createS3ReadBuffer(key); + String json_str; + size_t opening(0), closing(0); + char c; - while (!buf->eof()) { - buf->read(localbuf, 100); + while (buf->read(c)) { + // skip all space characters for JSON to parse correctly + if (isspace(c)) { + continue; + } + + json_str.push_back(c); + + if (c == '{') + opening++; + else if (c == '}') + closing++; + + if (opening == closing) { + + LOG_DEBUG(log, "JSON {}, {}", json_str, json_str.size()); + + JSON json(json_str); + + if (json.has("add")) { + auto path = json["add"]["path"].getString(); + auto timestamp = json["add"]["modificationTime"].getInt(); + + metadata.add(path, timestamp); + + LOG_DEBUG(log, "Path {}", path); + LOG_DEBUG(log, "Timestamp {}", timestamp); + } else if (json.has("remove")) { + auto path = json["remove"]["path"].getString(); + auto timestamp = json["remove"]["modificationTime"].getInt(); + + metadata.remove(path, timestamp); + + LOG_DEBUG(log, "Path {}", path); + LOG_DEBUG(log, "Timestamp {}", timestamp); + } + + // reset + opening = 0; + closing = 0; + json_str.clear(); + + } - LOG_DEBUG(log, "{}", String(localbuf)); } } @@ -124,57 +167,57 @@ StorageDelta::StorageDelta( const String & access_key_, const String & secret_access_key_, const StorageID & table_id_, - ColumnsDescription /*columns_*/, - const ConstraintsDescription & /*constraints_*/, - const String & /*comment*/, + ColumnsDescription columns_, + const ConstraintsDescription & constraints_, + const String & comment, ContextPtr context_) : IStorage(table_id_) , base_configuration({uri_, access_key_, secret_access_key_, {}, {}, {}}) , log(&Poco::Logger::get("StorageDeltaLake (" + table_id_.table_name + ")")) , table_path(uri_.key) { - //StorageInMemoryMetadata storage_metadata; + StorageInMemoryMetadata storage_metadata; updateS3Configuration(context_, base_configuration); - Init(); - - // auto keys = getKeysFromS3(); - - // auto new_uri = base_configuration.uri.uri.toString() + generateQueryFromKeys(std::move(keys)); - - // LOG_DEBUG(log, "New uri: {}", new_uri); - // LOG_DEBUG(log, "Table path: {}", table_path); - // auto s3_uri = S3::URI(Poco::URI(new_uri)); - - // if (columns_.empty()) - // { - // columns_ - // = StorageS3::getTableStructureFromData(String("Parquet"), s3_uri, access_key_, secret_access_key_, "", false, {}, context_); - // storage_metadata.setColumns(columns_); - // } - // else - // storage_metadata.setColumns(columns_); - - // storage_metadata.setConstraints(constraints_); - // storage_metadata.setComment(comment); - // setInMemoryMetadata(storage_metadata); - - // s3engine = std::make_shared( - // s3_uri, - // access_key_, - // secret_access_key_, - // table_id_, - // String("Parquet"), // format name - // base_configuration.rw_settings, - // columns_, - // constraints_, - // comment, - // context_, - // std::nullopt); -} - -void StorageDelta::Init() { JsonMetadataGetter getter{base_configuration, table_path, log}; + + auto keys = getter.getFiles(); + + for (const String & path : keys) { + LOG_DEBUG(log, "{}", path); + } + + auto new_uri = base_configuration.uri.uri.toString() + generateQueryFromKeys(std::move(keys)); + + LOG_DEBUG(log, "New uri: {}", new_uri); + LOG_DEBUG(log, "Table path: {}", table_path); + auto s3_uri = S3::URI(Poco::URI(new_uri)); + + if (columns_.empty()) + { + columns_ + = StorageS3::getTableStructureFromData(String("Parquet"), s3_uri, access_key_, secret_access_key_, "", false, {}, context_); + storage_metadata.setColumns(columns_); + } + else + storage_metadata.setColumns(columns_); + + storage_metadata.setConstraints(constraints_); + storage_metadata.setComment(comment); + setInMemoryMetadata(storage_metadata); + + s3engine = std::make_shared( + s3_uri, + access_key_, + secret_access_key_, + table_id_, + String("Parquet"), // format name + base_configuration.rw_settings, + columns_, + constraints_, + comment, + context_, + std::nullopt); } Pipe StorageDelta::read(