simplify storages3cluster

This commit is contained in:
Nikita Mikhaylov 2021-04-13 23:17:25 +03:00
parent 5aac762d9c
commit ec35a878d3
4 changed files with 49 additions and 57 deletions

View File

@ -318,7 +318,8 @@ StorageS3::StorageS3(
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
ContextPtr context_,
const String & compression_method_)
const String & compression_method_,
bool distributed_processing_)
: IStorage(table_id_)
, client_auth{uri_, access_key_id_, secret_access_key_, max_connections_, {}, {}} /// Client and settings will be updated later
, format_name(format_name_)
@ -326,6 +327,7 @@ StorageS3::StorageS3(
, max_single_part_upload_size(max_single_part_upload_size_)
, compression_method(compression_method_)
, name(uri_.storage_name)
, distributed_processing(distributed_processing_)
{
context_->getGlobalContext()->getRemoteHostFilter().checkURL(uri_.uri);
StorageInMemoryMetadata storage_metadata;
@ -358,12 +360,23 @@ Pipe StorageS3::read(
need_file_column = true;
}
/// Iterate through disclosed globs and make a source for each file
auto glob_iterator = std::make_shared<StorageS3Source::DisclosedGlobIterator>(*client_auth.client, client_auth.uri);
auto iterator_wrapper = std::make_shared<StorageS3Source::IteratorWrapper>([glob_iterator]()
std::shared_ptr<StorageS3Source::IteratorWrapper> iterator_wrapper{nullptr};
if (distributed_processing)
{
return glob_iterator->next();
});
iterator_wrapper = std::make_shared<StorageS3Source::IteratorWrapper>(
[callback = local_context->getReadTaskCallback()]() -> String {
return callback();
});
}
else
{
/// Iterate through disclosed globs and make a source for each file
auto glob_iterator = std::make_shared<StorageS3Source::DisclosedGlobIterator>(*client_auth.client, client_auth.uri);
iterator_wrapper = std::make_shared<StorageS3Source::IteratorWrapper>([glob_iterator]()
{
return glob_iterator->next();
});
}
for (size_t i = 0; i < num_streams; ++i)
{

View File

@ -106,7 +106,8 @@ public:
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
ContextPtr context_,
const String & compression_method_ = "");
const String & compression_method_ = "",
bool distributed_processing_ = false);
String getName() const override
{
@ -130,7 +131,6 @@ private:
friend class StorageS3Cluster;
friend class TableFunctionS3Cluster;
friend class StorageS3SequentialSource;
struct ClientAuthentificaiton
{
@ -149,6 +149,7 @@ private:
size_t max_single_part_upload_size;
String compression_method;
String name;
const bool distributed_processing;
static void updateClientAndAuthSettings(ContextPtr, ClientAuthentificaiton &);
};

View File

@ -79,58 +79,18 @@ StorageS3Cluster::StorageS3Cluster(
StorageS3::updateClientAndAuthSettings(context_, client_auth);
}
/// The code executes on initiator
Pipe StorageS3Cluster::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
SelectQueryInfo & query_info,
ContextPtr context,
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
unsigned num_streams)
size_t /*max_block_size*/,
unsigned /*num_streams*/)
{
StorageS3::updateClientAndAuthSettings(context, client_auth);
/// Secondary query, need to read from S3
if (context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY)
{
bool need_path_column = false;
bool need_file_column = false;
for (const auto & column : column_names)
{
if (column == "_path")
need_path_column = true;
if (column == "_file")
need_file_column = true;
}
/// Save callback not to capture context by reference of copy it.
auto file_iterator = std::make_shared<StorageS3Source::IteratorWrapper>(
[callback = context->getReadTaskCallback()]() -> String {
return callback();
});
Pipes pipes;
for (size_t i = 0; i < num_streams; ++i)
{
pipes.emplace_back(std::make_shared<StorageS3Source>(
need_path_column, need_file_column, format_name, getName(),
metadata_snapshot->getSampleBlock(), context,
metadata_snapshot->getColumns(), max_block_size,
compression_method,
client_auth.client,
client_auth.uri.bucket,
file_iterator
));
}
auto pipe = Pipe::unitePipes(std::move(pipes));
narrowPipe(pipe, num_streams);
return pipe;
}
/// The code from here and below executes on initiator
auto cluster = context->getCluster(cluster_name)->getClusterWithReplicasAsShards(context->getSettings());
S3::URI s3_uri(Poco::URI{filename});
StorageS3::updateClientAndAuthSettings(context, client_auth);

View File

@ -102,11 +102,29 @@ StoragePtr TableFunctionS3Cluster::executeImpl(
const ASTPtr & /*function*/, ContextPtr context,
const std::string & table_name, ColumnsDescription /*cached_columns*/) const
{
StoragePtr storage = StorageS3Cluster::create(
filename, access_key_id, secret_access_key, StorageID(getDatabaseName(), table_name),
cluster_name, format, context->getSettingsRef().s3_max_connections,
getActualTableStructure(context), ConstraintsDescription{},
context, compression_method);
StoragePtr storage;
if (context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY)
{
/// On worker node this filename won't contains globs
Poco::URI uri (filename);
S3::URI s3_uri (uri);
/// Actually this parameters are not used
UInt64 min_upload_part_size = context->getSettingsRef().s3_min_upload_part_size;
UInt64 max_single_part_upload_size = context->getSettingsRef().s3_max_single_part_upload_size;
UInt64 max_connections = context->getSettingsRef().s3_max_connections;
storage = StorageS3::create(
s3_uri, access_key_id, secret_access_key, StorageID(getDatabaseName(), table_name),
format, min_upload_part_size, max_single_part_upload_size, max_connections,
getActualTableStructure(context), ConstraintsDescription{},
context, compression_method, /*distributed_processing=*/true);
}
else {
storage = StorageS3Cluster::create(
filename, access_key_id, secret_access_key, StorageID(getDatabaseName(), table_name),
cluster_name, format, context->getSettingsRef().s3_max_connections,
getActualTableStructure(context), ConstraintsDescription{},
context, compression_method);
}
storage->startup();