This commit is contained in:
Nikita Mikhaylov 2021-04-08 03:09:15 +03:00
parent 2555ae5d3f
commit 7276b40556
8 changed files with 168 additions and 163 deletions

View File

@ -200,6 +200,8 @@ void RemoteQueryExecutor::sendQuery()
connections->sendIgnoredPartUUIDs(duplicated_part_uuids);
}
std::cout << "RemoteQueryExecutor " << toString(context.getClientInfo().task_identifier) << std::endl;
connections->sendQuery(timeouts, query, query_id, stage, modified_client_info, true);
established = false;

View File

@ -88,6 +88,8 @@ void ClientInfo::write(WriteBuffer & out, const UInt64 server_protocol_revision)
writeBinary(uint8_t(0), out);
}
}
writeBinary(task_identifier, out);
}
@ -163,6 +165,8 @@ void ClientInfo::read(ReadBuffer & in, const UInt64 client_protocol_revision)
readBinary(client_trace_context.trace_flags, in);
}
}
readBinary(task_identifier, in);
}

View File

@ -1,5 +1,6 @@
#pragma once
#include <Core/UUID.h>
#include <Poco/Net/SocketAddress.h>
#include <Common/UInt128.h>
#include <common/types.h>
@ -97,6 +98,8 @@ public:
String quota_key;
UInt64 distributed_depth = 0;
/// For distributed file processing (e.g. s3Distributed)
String task_identifier;
bool empty() const { return query_kind == QueryKind::NO_QUERY; }

View File

@ -149,28 +149,62 @@ Block StorageS3Source::getHeader(Block sample_block, bool with_path_column, bool
StorageS3Source::StorageS3Source(
bool need_path,
bool need_file,
const String & format,
const String & format_,
String name_,
const Block & sample_block,
const Context & context,
const ColumnsDescription & columns,
UInt64 max_block_size,
const CompressionMethod compression_method,
const std::shared_ptr<Aws::S3::S3Client> & client,
const String & bucket,
const String & key)
: SourceWithProgress(getHeader(sample_block, need_path, need_file))
const Block & sample_block_,
const Context & context_,
const ColumnsDescription & columns_,
UInt64 max_block_size_,
const String compression_hint_,
const std::shared_ptr<Aws::S3::S3Client> & client_,
const String & bucket_,
std::shared_ptr<FileIterator> file_iterator_)
: SourceWithProgress(getHeader(sample_block_, need_path, need_file))
, name(std::move(name_))
, bucket(bucket_)
, format(format_)
, context(context_)
, columns_desc(columns_)
, max_block_size(max_block_size_)
, compression_hint(compression_hint_)
, client(client_)
, sample_block(sample_block_)
, with_file_column(need_file)
, with_path_column(need_path)
, file_path(bucket + "/" + key)
, file_iterator(file_iterator_)
{
read_buf = wrapReadBufferWithCompressionMethod(std::make_unique<ReadBufferFromS3>(client, bucket, key), compression_method);
initialize();
}
bool StorageS3Source::initialize()
{
String current_key;
if (auto result = file_iterator->next())
{
current_key = result.value();
if (current_key.empty()) {
return false;
}
file_path = bucket + "/" + current_key;
std::cout << "StorageS3Source " << file_path << std::endl;
}
else
{
/// Do not initialize read_buffer and stream.
return false;
}
read_buf = wrapReadBufferWithCompressionMethod(
std::make_unique<ReadBufferFromS3>(client, bucket, current_key), chooseCompressionMethod(current_key, compression_hint));
auto input_format = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size);
reader = std::make_shared<InputStreamFromInputFormat>(input_format);
if (columns.hasDefaults())
reader = std::make_shared<AddingDefaultsBlockInputStream>(reader, columns, context);
if (columns_desc.hasDefaults())
reader = std::make_shared<AddingDefaultsBlockInputStream>(reader, columns_desc, context);
initialized = false;
return true;
}
String StorageS3Source::getName() const
@ -206,9 +240,14 @@ Chunk StorageS3Source::generate()
return Chunk(std::move(columns), num_rows);
}
reader->readSuffix();
reader.reset();
read_buf.reset();
if (!initialize())
return {};
return generate();
}
namespace
@ -322,9 +361,9 @@ Pipe StorageS3::read(
/// Iterate through disclosed globs and make a source for each file
StorageS3Source::DisclosedGlobIterator glob_iterator(*client_auth.client, client_auth.uri);
/// TODO: better to put first num_streams keys into pipeline
/// and put others dynamically in runtime
while (auto key = glob_iterator.next())
auto file_iterator = std::make_shared<LocalFileIterator>(glob_iterator);
for (size_t i = 0; i < num_streams; ++i)
{
pipes.emplace_back(std::make_shared<StorageS3Source>(
need_path_column,
@ -335,16 +374,12 @@ Pipe StorageS3::read(
local_context,
metadata_snapshot->getColumns(),
max_block_size,
chooseCompressionMethod(client_auth.uri.key, compression_method),
compression_method,
client_auth.client,
client_auth.uri.bucket,
key.value()));
file_iterator));
}
auto pipe = Pipe::unitePipes(std::move(pipes));
// It's possible to have many buckets read from s3, resize(num_streams) might open too many handles at the same time.
// Using narrowPipe instead.
narrowPipe(pipe, num_streams);
return pipe;
return Pipe::unitePipes(std::move(pipes));
}
BlockOutputStreamPtr StorageS3::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context)
@ -402,7 +437,8 @@ void registerStorageS3Impl(const String & name, StorageFactory & factory)
if (engine_args.size() < 2 || engine_args.size() > 5)
throw Exception(
"Storage S3 requires 2 to 5 arguments: url, [access_key_id, secret_access_key], name of used format and [compression_method].", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
"Storage S3 requires 2 to 5 arguments: url, [access_key_id, secret_access_key], name of used format and [compression_method].",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
for (auto & engine_arg : engine_args)
engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, args.getLocalContext());

View File

@ -17,6 +17,7 @@
#include <ext/shared_ptr_helper.h>
#include <IO/S3Common.h>
#include <IO/CompressionMethod.h>
#include <Interpreters/Context.h>
namespace Aws::S3
{
@ -30,7 +31,6 @@ class StorageS3SequentialSource;
class StorageS3Source : public SourceWithProgress
{
public:
class DisclosedGlobIterator
{
public:
@ -42,6 +42,13 @@ public:
std::shared_ptr<Impl> pimpl;
};
struct FileIterator
{
virtual ~FileIterator() = default;
virtual std::optional<String> next() = 0;
};
static Block getHeader(Block sample_block, bool with_path_column, bool with_file_column);
StorageS3Source(
@ -50,13 +57,13 @@ public:
const String & format,
String name_,
const Block & sample_block,
const Context & context,
const ColumnsDescription & columns,
UInt64 max_block_size,
const CompressionMethod compression_method,
const std::shared_ptr<Aws::S3::S3Client> & client,
const Context & context_,
const ColumnsDescription & columns_,
UInt64 max_block_size_,
const String compression_hint_,
const std::shared_ptr<Aws::S3::S3Client> & client_,
const String & bucket,
const String & key);
std::shared_ptr<FileIterator> file_iterator_);
String getName() const override;
@ -64,12 +71,26 @@ public:
private:
String name;
String bucket;
String file_path;
String format;
Context context;
ColumnsDescription columns_desc;
UInt64 max_block_size;
String compression_hint;
std::shared_ptr<Aws::S3::S3Client> client;
Block sample_block;
std::unique_ptr<ReadBuffer> read_buf;
BlockInputStreamPtr reader;
bool initialized = false;
bool with_file_column = false;
bool with_path_column = false;
String file_path;
std::shared_ptr<FileIterator> file_iterator;
/// Recreate ReadBuffer and BlockInputStream for each file.
bool initialize();
};
/**
@ -136,6 +157,23 @@ private:
String compression_method;
String name;
struct LocalFileIterator : public StorageS3Source::FileIterator
{
explicit LocalFileIterator(StorageS3Source::DisclosedGlobIterator glob_iterator_)
: glob_iterator(glob_iterator_) {}
StorageS3Source::DisclosedGlobIterator glob_iterator;
/// Several files could be processed in parallel
/// from different sources
std::mutex iterator_mutex;
std::optional<String> next() override
{
std::lock_guard lock(iterator_mutex);
return glob_iterator.next();
}
};
static void updateClientAndAuthSettings(ContextPtr, ClientAuthentificaiton &);
};

View File

@ -9,6 +9,7 @@
#include <Common/Throttler.h>
#include "Client/Connection.h"
#include "Core/QueryProcessingStage.h"
#include <Core/UUID.h>
#include "DataStreams/RemoteBlockInputStream.h"
#include <Columns/ColumnsNumber.h>
#include <DataTypes/DataTypesNumber.h>
@ -34,7 +35,6 @@
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Storages/IStorage.h>
#include <Storages/SelectQueryInfo.h>
#include <Storages/StorageS3.h>
#include <common/logger_useful.h>
#include <aws/core/auth/AWSCredentials.h>
@ -56,119 +56,6 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
struct StorageS3SourceBuilder
{
bool need_path;
bool need_file;
String format;
String name;
Block sample_block;
const Context & context;
const ColumnsDescription & columns;
UInt64 max_block_size;
String compression_method;
};
class StorageS3SequentialSource : public SourceWithProgress
{
public:
static Block getHeader(Block sample_block, bool with_path_column, bool with_file_column)
{
if (with_path_column)
sample_block.insert({DataTypeString().createColumn(), std::make_shared<DataTypeString>(), "_path"});
if (with_file_column)
sample_block.insert({DataTypeString().createColumn(), std::make_shared<DataTypeString>(), "_file"});
return sample_block;
}
StorageS3SequentialSource(
String initial_query_id_,
NextTaskCallback read_task_callback_,
const ClientAuthentificationBuilder & client_auth_builder_,
const StorageS3SourceBuilder & s3_builder_)
: SourceWithProgress(getHeader(s3_builder_.sample_block, s3_builder_.need_path, s3_builder_.need_file))
, initial_query_id(initial_query_id_)
, s3b(s3_builder_)
, cab(client_auth_builder_)
, read_task_callback(read_task_callback_)
{
createOrUpdateInnerSource();
}
String getName() const override
{
return "StorageS3SequentialSource";
}
Chunk generate() override
{
if (!inner)
return {};
auto chunk = inner->generate();
if (!chunk)
{
if (!createOrUpdateInnerSource())
return {};
else
chunk = inner->generate();
}
return chunk;
}
private:
String askAboutNextKey()
{
try
{
return read_task_callback(initial_query_id);
}
catch (...)
{
tryLogCurrentException(&Poco::Logger::get(getName()));
throw;
}
}
bool createOrUpdateInnerSource()
{
auto next_string = askAboutNextKey();
if (next_string.empty())
return false;
auto next_uri = S3::URI(Poco::URI(next_string));
auto client_auth = StorageS3::ClientAuthentificaiton{
next_uri,
cab.access_key_id,
cab.secret_access_key,
cab.max_connections,
{}, {}};
StorageS3::updateClientAndAuthSettings(s3b.context, client_auth);
inner = std::make_unique<StorageS3Source>(
s3b.need_path, s3b.need_file, s3b.format, s3b.name,
s3b.sample_block, s3b.context, s3b.columns, s3b.max_block_size,
chooseCompressionMethod(client_auth.uri.key, ""),
client_auth.client,
client_auth.uri.bucket,
next_uri.key
);
return true;
}
String initial_query_id;
StorageS3SourceBuilder s3b;
ClientAuthentificationBuilder cab;
std::unique_ptr<StorageS3Source> inner;
NextTaskCallback read_task_callback;
};
StorageS3Distributed::StorageS3Distributed(
const String & filename_,
@ -183,17 +70,18 @@ StorageS3Distributed::StorageS3Distributed(
const Context & context_,
const String & compression_method_)
: IStorage(table_id_)
, client_auth{S3::URI{Poco::URI{filename_}}, access_key_id_, secret_access_key_, max_connections_, {}, {}} /// Client and settings will be updated later
, filename(filename_)
, cluster_name(cluster_name_)
, cluster(context_.getCluster(cluster_name)->getClusterWithReplicasAsShards(context_.getSettings()))
, format_name(format_name_)
, compression_method(compression_method_)
, cli_builder{access_key_id_, secret_access_key_, max_connections_}
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
storage_metadata.setConstraints(constraints_);
setInMemoryMetadata(storage_metadata);
StorageS3::updateClientAndAuthSettings(context_, client_auth);
}
@ -206,6 +94,8 @@ Pipe StorageS3Distributed::read(
size_t max_block_size,
unsigned /*num_streams*/)
{
StorageS3::updateClientAndAuthSettings(context, client_auth);
/// Secondary query, need to read from S3
if (context.getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY)
{
@ -219,16 +109,21 @@ Pipe StorageS3Distributed::read(
need_file_column = true;
}
StorageS3SourceBuilder s3builder{
need_path_column, need_file_column,
format_name, getName(),
std::cout << "Got UUID on worker " << toString(context.getClientInfo().task_identifier) << std::endl;
auto file_iterator = std::make_shared<DistributedFileIterator>(
context.getNextTaskCallback(),
context.getInitialQueryId());
return Pipe(std::make_shared<StorageS3Source>(
need_path_column, need_file_column, format_name, getName(),
metadata_snapshot->getSampleBlock(), context,
metadata_snapshot->getColumns(), max_block_size,
compression_method
};
return Pipe(std::make_shared<StorageS3SequentialSource>(
context.getInitialQueryId(), context.getNextTaskCallback(), cli_builder, s3builder));
compression_method,
client_auth.client,
client_auth.uri.bucket,
file_iterator
));
}
/// The code from here and below executes on initiator
@ -254,6 +149,8 @@ Pipe StorageS3Distributed::read(
node.secure
));
std::cout << "S3Distributed initiator " << toString(context.getClientInfo().task_identifier) << std::endl;
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
*connections.back(), queryToString(query_info.query), header, context, /*throttler=*/nullptr, scalars, Tables(), processed_stage);

View File

@ -5,9 +5,9 @@
#if USE_AWS_S3
#include "Client/Connection.h"
#include "Interpreters/Cluster.h"
#include "Storages/IStorage.h"
#include "Storages/StorageS3.h"
#include <Interpreters/Cluster.h>
#include <IO/S3Common.h>
#include <Storages/StorageS3.h>
#include <memory>
#include <optional>
@ -67,13 +67,33 @@ protected:
private:
/// Connections from initiator to other nodes
std::vector<std::shared_ptr<Connection>> connections;
StorageS3::ClientAuthentificaiton client_auth;
String filename;
std::string cluster_name;
ClusterPtr cluster;
String format_name;
String compression_method;
ClientAuthentificationBuilder cli_builder;
struct DistributedFileIterator : public StorageS3Source::FileIterator
{
DistributedFileIterator(NextTaskCallback callback_, String identifier_)
: callback(callback_), identifier(identifier_) {}
NextTaskCallback callback;
String identifier;
std::optional<String> next() override
{
std::cout << "DistributedFileIterator" << std::endl;
std::cout << identifier << std::endl;
auto answer = callback(identifier);
std::cout << answer << std::endl;
return answer;
}
};
};

View File

@ -115,10 +115,15 @@ StoragePtr TableFunctionS3Distributed::executeImpl(
StorageS3::updateClientAndAuthSettings(context, client_auth);
StorageS3Source::DisclosedGlobIterator iterator(*client_auth.client, client_auth.uri);
auto callback = [endpoint = client_auth.uri.endpoint, bucket = client_auth.uri.bucket, iterator = std::move(iterator)]() mutable -> String
auto task_identifier = UUIDHelpers::generateV4();
const_cast<Context &>(context).getClientInfo().task_identifier = toString(task_identifier);
std::cout << "Created UUID: " << toString(context.getClientInfo().task_identifier) << std::endl;
auto callback = [iterator = std::move(iterator)]() mutable -> String
{
if (auto value = iterator.next())
return endpoint + '/' + bucket + '/' + *value;
return *value;
return {};
};