mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-18 21:51:57 +00:00
Better
This commit is contained in:
parent
66c9305668
commit
e82c4800f2
@ -12,9 +12,11 @@
|
||||
|
||||
#include <QueryPipeline/Pipe.h>
|
||||
|
||||
namespace DB {
|
||||
namespace DB
|
||||
{
|
||||
|
||||
namespace ErrorCodes {
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
|
||||
extern const int S3_ERROR;
|
||||
}
|
||||
@ -24,11 +26,11 @@ StorageHudi::StorageHudi(
|
||||
const String & access_key_,
|
||||
const String & secret_access_key_,
|
||||
const StorageID & table_id_,
|
||||
const ColumnsDescription & columns_,
|
||||
ColumnsDescription columns_,
|
||||
const ConstraintsDescription & constraints_,
|
||||
const String & comment,
|
||||
ContextPtr context_
|
||||
) : IStorage(table_id_)
|
||||
ContextPtr context_)
|
||||
: IStorage(table_id_)
|
||||
, base_configuration({uri_, access_key_, secret_access_key_, {}, {}, {}})
|
||||
, log(&Poco::Logger::get("StorageHudi (" + table_id_.table_name + ")"))
|
||||
{
|
||||
@ -38,25 +40,16 @@ StorageHudi::StorageHudi(
|
||||
auto keys = getKeysFromS3();
|
||||
|
||||
auto new_uri = base_configuration.uri.uri.toString() + generateQueryFromKeys(std::move(keys));
|
||||
|
||||
|
||||
LOG_DEBUG(log, "New uri: {}", new_uri);
|
||||
|
||||
auto s3_uri = S3::URI(Poco::URI(new_uri));
|
||||
// StorageS3::S3Configuration s3_configuration{s3_uri, access_key_, secret_access_key_, {}, {}, {}};
|
||||
|
||||
if (columns_.empty())
|
||||
{
|
||||
auto columns = StorageS3::getTableStructureFromData(
|
||||
String("Parquet"),
|
||||
s3_uri,
|
||||
access_key_,
|
||||
secret_access_key_,
|
||||
"",
|
||||
false,
|
||||
{},
|
||||
context_
|
||||
);
|
||||
storage_metadata.setColumns(columns);
|
||||
columns_
|
||||
= StorageS3::getTableStructureFromData(String("Parquet"), s3_uri, access_key_, secret_access_key_, "", false, {}, context_);
|
||||
storage_metadata.setColumns(columns_);
|
||||
}
|
||||
else
|
||||
storage_metadata.setColumns(columns_);
|
||||
@ -76,30 +69,21 @@ StorageHudi::StorageHudi(
|
||||
constraints_,
|
||||
comment,
|
||||
context_,
|
||||
std::nullopt
|
||||
);
|
||||
std::nullopt);
|
||||
}
|
||||
|
||||
Pipe StorageHudi::read(
|
||||
const Names & column_names,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
SelectQueryInfo & query_info,
|
||||
ContextPtr context,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
size_t max_block_size,
|
||||
unsigned num_streams)
|
||||
const Names & column_names,
|
||||
const StorageSnapshotPtr & storage_snapshot,
|
||||
SelectQueryInfo & query_info,
|
||||
ContextPtr context,
|
||||
QueryProcessingStage::Enum processed_stage,
|
||||
size_t max_block_size,
|
||||
unsigned num_streams)
|
||||
{
|
||||
updateS3Configuration(context, base_configuration);
|
||||
|
||||
//auto keys = getKeysFromS3();
|
||||
|
||||
//auto new_uri = base_configuration.uri.uri.toString() + "/" + generateQueryFromKeys(std::forward(keys));
|
||||
//s3_configuration.uri = S3::URI(Poco::URI(new_uri));
|
||||
|
||||
return s3engine->read(column_names, storage_snapshot,
|
||||
query_info, context, processed_stage,
|
||||
max_block_size, num_streams);
|
||||
|
||||
return s3engine->read(column_names, storage_snapshot, query_info, context, processed_stage, max_block_size, num_streams);
|
||||
}
|
||||
|
||||
void StorageHudi::updateS3Configuration(ContextPtr ctx, StorageS3::S3Configuration & upd)
|
||||
@ -128,7 +112,8 @@ void StorageHudi::updateS3Configuration(ContextPtr ctx, StorageS3::S3Configurati
|
||||
|
||||
S3::PocoHTTPClientConfiguration client_configuration = S3::ClientFactory::instance().createClientConfiguration(
|
||||
settings.auth_settings.region,
|
||||
ctx->getRemoteHostFilter(), ctx->getGlobalContext()->getSettingsRef().s3_max_redirects,
|
||||
ctx->getRemoteHostFilter(),
|
||||
ctx->getGlobalContext()->getSettingsRef().s3_max_redirects,
|
||||
ctx->getGlobalContext()->getSettingsRef().enable_s3_requests_logging);
|
||||
|
||||
client_configuration.endpointOverride = upd.uri.endpoint;
|
||||
@ -147,11 +132,12 @@ void StorageHudi::updateS3Configuration(ContextPtr ctx, StorageS3::S3Configurati
|
||||
upd.auth_settings = std::move(settings.auth_settings);
|
||||
}
|
||||
|
||||
std::vector<std::string> StorageHudi::getKeysFromS3() {
|
||||
std::vector<std::string> StorageHudi::getKeysFromS3()
|
||||
{
|
||||
std::vector<std::string> keys;
|
||||
|
||||
const auto & client = base_configuration.client;
|
||||
|
||||
|
||||
Aws::S3::Model::ListObjectsV2Request request;
|
||||
Aws::S3::Model::ListObjectsV2Outcome outcome;
|
||||
|
||||
@ -176,7 +162,7 @@ std::vector<std::string> StorageHudi::getKeysFromS3() {
|
||||
const auto & result_batch = outcome.GetResult().GetContents();
|
||||
for (const auto & obj : result_batch)
|
||||
{
|
||||
const auto& filename = obj.GetKey();
|
||||
const auto & filename = obj.GetKey();
|
||||
keys.push_back(filename);
|
||||
//LOG_DEBUG(log, "Found file: {}", filename);
|
||||
}
|
||||
@ -188,49 +174,64 @@ std::vector<std::string> StorageHudi::getKeysFromS3() {
|
||||
return keys;
|
||||
}
|
||||
|
||||
std::string StorageHudi::generateQueryFromKeys(std::vector<std::string>&& keys) {
|
||||
std::string StorageHudi::generateQueryFromKeys(std::vector<std::string> && keys)
|
||||
{
|
||||
// filter only .parquet files
|
||||
std::erase_if(keys, [](const std::string& s) {
|
||||
if (s.size() >= 8) {
|
||||
return s.substr(s.size() - 8) != ".parquet";
|
||||
}
|
||||
return true;
|
||||
});
|
||||
std::erase_if(
|
||||
keys,
|
||||
[](const std::string & s)
|
||||
{
|
||||
if (s.size() >= 8)
|
||||
{
|
||||
return s.substr(s.size() - 8) != ".parquet";
|
||||
}
|
||||
return true;
|
||||
});
|
||||
|
||||
// for each partition path take only latest parquet file
|
||||
|
||||
std::unordered_map<std::string, std::pair<std::string, uint64_t>> latest_parquets;
|
||||
|
||||
for (const auto& key : keys) {
|
||||
|
||||
for (const auto & key : keys)
|
||||
{
|
||||
auto slash = key.find_last_of("/");
|
||||
std::string path;
|
||||
if (slash == std::string::npos) {
|
||||
if (slash == std::string::npos)
|
||||
{
|
||||
path = "";
|
||||
} else {
|
||||
}
|
||||
else
|
||||
{
|
||||
path = key.substr(0, slash);
|
||||
}
|
||||
|
||||
uint64_t timestamp = std::stoul(key.substr(key.find_last_of("_")+1));
|
||||
uint64_t timestamp = std::stoul(key.substr(key.find_last_of("_") + 1));
|
||||
|
||||
auto it = latest_parquets.find(path);
|
||||
|
||||
if (it != latest_parquets.end()) {
|
||||
if (it->second.second < timestamp) {
|
||||
|
||||
if (it != latest_parquets.end())
|
||||
{
|
||||
if (it->second.second < timestamp)
|
||||
{
|
||||
it->second = {key, timestamp};
|
||||
}
|
||||
} else {
|
||||
}
|
||||
else
|
||||
{
|
||||
latest_parquets[path] = {key, timestamp};
|
||||
}
|
||||
}
|
||||
|
||||
std::vector<std::string> filtered_keys;
|
||||
std::transform(latest_parquets.begin(), latest_parquets.end(), std::back_inserter(filtered_keys), [](const auto& kv){return kv.second.first;});
|
||||
std::transform(
|
||||
latest_parquets.begin(), latest_parquets.end(), std::back_inserter(filtered_keys), [](const auto & kv) { return kv.second.first; });
|
||||
|
||||
std::string new_query;
|
||||
|
||||
for (auto&& key : filtered_keys) {
|
||||
if (!new_query.empty()) {
|
||||
|
||||
for (auto && key : filtered_keys)
|
||||
{
|
||||
if (!new_query.empty())
|
||||
{
|
||||
new_query += ",";
|
||||
}
|
||||
new_query += key;
|
||||
@ -243,7 +244,9 @@ std::string StorageHudi::generateQueryFromKeys(std::vector<std::string>&& keys)
|
||||
|
||||
void registerStorageHudi(StorageFactory & factory)
|
||||
{
|
||||
factory.registerStorage("Hudi", [](const StorageFactory::Arguments & args)
|
||||
factory.registerStorage(
|
||||
"Hudi",
|
||||
[](const StorageFactory::Arguments & args)
|
||||
{
|
||||
auto & engine_args = args.engine_args;
|
||||
if (engine_args.empty())
|
||||
@ -269,11 +272,10 @@ void registerStorageHudi(StorageFactory & factory)
|
||||
args.getContext());
|
||||
},
|
||||
{
|
||||
.supports_settings = true,
|
||||
.supports_schema_inference = true,
|
||||
.source_access_type = AccessType::S3,
|
||||
});
|
||||
.supports_settings = true,
|
||||
.supports_schema_inference = true,
|
||||
.source_access_type = AccessType::S3,
|
||||
});
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
@ -5,30 +5,31 @@
|
||||
#include <Storages/IStorage.h>
|
||||
#include <Storages/StorageS3.h>
|
||||
|
||||
namespace Poco {
|
||||
class Logger;
|
||||
namespace Poco
|
||||
{
|
||||
class Logger;
|
||||
}
|
||||
|
||||
namespace Aws::S3
|
||||
{
|
||||
class S3Client;
|
||||
class S3Client;
|
||||
}
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class StorageHudi : public IStorage {
|
||||
class StorageHudi : public IStorage
|
||||
{
|
||||
public:
|
||||
StorageHudi(
|
||||
const S3::URI& uri_,
|
||||
const String& access_key_,
|
||||
const String& secret_access_key_,
|
||||
const S3::URI & uri_,
|
||||
const String & access_key_,
|
||||
const String & secret_access_key_,
|
||||
const StorageID & table_id_,
|
||||
const ColumnsDescription & columns_,
|
||||
ColumnsDescription columns_,
|
||||
const ConstraintsDescription & constraints_,
|
||||
const String & comment,
|
||||
ContextPtr context_
|
||||
);
|
||||
ContextPtr context_);
|
||||
|
||||
String getName() const override { return "Hudi"; }
|
||||
|
||||
@ -46,7 +47,7 @@ private:
|
||||
|
||||
private:
|
||||
std::vector<std::string> getKeysFromS3();
|
||||
std::string generateQueryFromKeys(std::vector<std::string>&& keys);
|
||||
std::string generateQueryFromKeys(std::vector<std::string> && keys);
|
||||
|
||||
private:
|
||||
StorageS3::S3Configuration base_configuration;
|
||||
|
Loading…
Reference in New Issue
Block a user