Change JSON parsing

This commit is contained in:
Daniil Rubin 2022-09-20 14:16:27 +00:00
parent 745331a3f3
commit 8fe4485ee8
2 changed files with 64 additions and 42 deletions

View File

@ -70,51 +70,18 @@ void JsonMetadataGetter::Init()
for (const String & key : keys) for (const String & key : keys)
{ {
auto buf = createS3ReadBuffer(key); auto buf = createS3ReadBuffer(key);
String json_str;
size_t opening(0), closing(0);
char c;
while (buf->read(c)) while (!buf->eof())
{ {
// skip all space characters for JSON to parse correctly String json_str = readJSONStringFromBuffer(buf);
if (isspace(c))
{ if (json_str.empty()) {
continue; continue;
} }
json_str.push_back(c); const JSON json(json_str);
if (c == '{') handleJSON(json);
opening++;
else if (c == '}')
closing++;
if (opening == closing)
{
LOG_DEBUG(log, "JSON {}, {}", json_str, json_str.size());
JSON json(json_str);
if (json.has("add"))
{
auto path = json["add"]["path"].getString();
auto timestamp = json["add"]["modificationTime"].getInt();
metadata.add(path, timestamp);
}
else if (json.has("remove"))
{
auto path = json["remove"]["path"].getString();
auto timestamp = json["remove"]["deletionTimestamp"].getInt();
metadata.remove(path, timestamp);
}
// reset
opening = 0;
closing = 0;
json_str.clear();
}
} }
} }
} }
@ -162,10 +129,10 @@ std::vector<String> JsonMetadataGetter::getJsonLogFiles()
return keys; return keys;
} }
std::unique_ptr<ReadBuffer> JsonMetadataGetter::createS3ReadBuffer(const String & key) std::shared_ptr<ReadBuffer> JsonMetadataGetter::createS3ReadBuffer(const String & key)
{ {
// TBD: add parallel downloads // TBD: add parallel downloads
return std::make_unique<ReadBufferFromS3>( return std::make_shared<ReadBufferFromS3>(
base_configuration.client, base_configuration.client,
base_configuration.uri.bucket, base_configuration.uri.bucket,
key, key,
@ -174,6 +141,54 @@ std::unique_ptr<ReadBuffer> JsonMetadataGetter::createS3ReadBuffer(const String
ReadSettings{}); ReadSettings{});
} }
String JsonMetadataGetter::readJSONStringFromBuffer(std::shared_ptr<ReadBuffer> buf) {
String json_str;
int32_t opening(0), closing(0);
do {
char c;
if (!buf->read(c))
return json_str;
// skip all space characters for JSON to parse correctly
if (isspace(c))
{
continue;
}
json_str.push_back(c);
if (c == '{')
opening++;
else if (c == '}')
closing++;
} while (opening != closing || opening == 0);
LOG_DEBUG(log, "JSON {}", json_str);
return json_str;
}
void JsonMetadataGetter::handleJSON(const JSON & json) {
if (json.has("add"))
{
auto path = json["add"]["path"].getString();
auto timestamp = json["add"]["modificationTime"].getInt();
metadata.add(path, timestamp);
}
else if (json.has("remove"))
{
auto path = json["remove"]["path"].getString();
auto timestamp = json["remove"]["deletionTimestamp"].getInt();
metadata.remove(path, timestamp);
}
}
StorageDelta::StorageDelta( StorageDelta::StorageDelta(
const S3::URI & uri_, const S3::URI & uri_,
const String & access_key_, const String & access_key_,

View File

@ -51,7 +51,14 @@ private:
std::vector<String> getJsonLogFiles(); std::vector<String> getJsonLogFiles();
std::unique_ptr<ReadBuffer> createS3ReadBuffer(const String & key); std::shared_ptr<ReadBuffer> createS3ReadBuffer(const String & key);
/* every metadata file contains many jsons
this function reads one json from buffer
*/
String readJSONStringFromBuffer(std::shared_ptr<ReadBuffer> buf);
void handleJSON(const JSON & json);
StorageS3::S3Configuration base_configuration; StorageS3::S3Configuration base_configuration;
String table_path; String table_path;