From 7276b40556ec657b077e21b83dbd545768d23b48 Mon Sep 17 00:00:00 2001 From: Nikita Mikhaylov Date: Thu, 8 Apr 2021 03:09:15 +0300 Subject: [PATCH] better --- src/DataStreams/RemoteQueryExecutor.cpp | 2 + src/Interpreters/ClientInfo.cpp | 4 + src/Interpreters/ClientInfo.h | 3 + src/Storages/StorageS3.cpp | 88 +++++++---- src/Storages/StorageS3.h | 54 ++++++- src/Storages/StorageS3Distributed.cpp | 143 +++--------------- src/Storages/StorageS3Distributed.h | 28 +++- .../TableFunctionS3Distributed.cpp | 9 +- 8 files changed, 168 insertions(+), 163 deletions(-) diff --git a/src/DataStreams/RemoteQueryExecutor.cpp b/src/DataStreams/RemoteQueryExecutor.cpp index dc161a52ac3..bb41a460e2b 100644 --- a/src/DataStreams/RemoteQueryExecutor.cpp +++ b/src/DataStreams/RemoteQueryExecutor.cpp @@ -200,6 +200,8 @@ void RemoteQueryExecutor::sendQuery() connections->sendIgnoredPartUUIDs(duplicated_part_uuids); } + std::cout << "RemoteQueryExecutor " << toString(context.getClientInfo().task_identifier) << std::endl; + connections->sendQuery(timeouts, query, query_id, stage, modified_client_info, true); established = false; diff --git a/src/Interpreters/ClientInfo.cpp b/src/Interpreters/ClientInfo.cpp index 223837aaf3d..45b3e8aeb28 100644 --- a/src/Interpreters/ClientInfo.cpp +++ b/src/Interpreters/ClientInfo.cpp @@ -88,6 +88,8 @@ void ClientInfo::write(WriteBuffer & out, const UInt64 server_protocol_revision) writeBinary(uint8_t(0), out); } } + + writeBinary(task_identifier, out); } @@ -163,6 +165,8 @@ void ClientInfo::read(ReadBuffer & in, const UInt64 client_protocol_revision) readBinary(client_trace_context.trace_flags, in); } } + + readBinary(task_identifier, in); } diff --git a/src/Interpreters/ClientInfo.h b/src/Interpreters/ClientInfo.h index 21aae45bfab..127d50706fc 100644 --- a/src/Interpreters/ClientInfo.h +++ b/src/Interpreters/ClientInfo.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -97,6 +98,8 @@ public: String quota_key; UInt64 distributed_depth = 0; + /// For distributed file processing (e.g. s3Distributed) + String task_identifier; bool empty() const { return query_kind == QueryKind::NO_QUERY; } diff --git a/src/Storages/StorageS3.cpp b/src/Storages/StorageS3.cpp index 7d77d420584..71a3bcdf3a9 100644 --- a/src/Storages/StorageS3.cpp +++ b/src/Storages/StorageS3.cpp @@ -149,28 +149,62 @@ Block StorageS3Source::getHeader(Block sample_block, bool with_path_column, bool StorageS3Source::StorageS3Source( bool need_path, bool need_file, - const String & format, + const String & format_, String name_, - const Block & sample_block, - const Context & context, - const ColumnsDescription & columns, - UInt64 max_block_size, - const CompressionMethod compression_method, - const std::shared_ptr & client, - const String & bucket, - const String & key) - : SourceWithProgress(getHeader(sample_block, need_path, need_file)) + const Block & sample_block_, + const Context & context_, + const ColumnsDescription & columns_, + UInt64 max_block_size_, + const String compression_hint_, + const std::shared_ptr & client_, + const String & bucket_, + std::shared_ptr file_iterator_) + : SourceWithProgress(getHeader(sample_block_, need_path, need_file)) , name(std::move(name_)) + , bucket(bucket_) + , format(format_) + , context(context_) + , columns_desc(columns_) + , max_block_size(max_block_size_) + , compression_hint(compression_hint_) + , client(client_) + , sample_block(sample_block_) , with_file_column(need_file) , with_path_column(need_path) - , file_path(bucket + "/" + key) + , file_iterator(file_iterator_) { - read_buf = wrapReadBufferWithCompressionMethod(std::make_unique(client, bucket, key), compression_method); + initialize(); +} + + +bool StorageS3Source::initialize() +{ + String current_key; + if (auto result = file_iterator->next()) + { + current_key = result.value(); + if (current_key.empty()) { + return false; + } + file_path = bucket + "/" + current_key; + std::cout << "StorageS3Source " << file_path << std::endl; + } + else + { + /// Do not initialize read_buffer and stream. + return false; + } + + read_buf = wrapReadBufferWithCompressionMethod( + std::make_unique(client, bucket, current_key), chooseCompressionMethod(current_key, compression_hint)); auto input_format = FormatFactory::instance().getInput(format, *read_buf, sample_block, context, max_block_size); reader = std::make_shared(input_format); - if (columns.hasDefaults()) - reader = std::make_shared(reader, columns, context); + if (columns_desc.hasDefaults()) + reader = std::make_shared(reader, columns_desc, context); + + initialized = false; + return true; } String StorageS3Source::getName() const @@ -206,9 +240,14 @@ Chunk StorageS3Source::generate() return Chunk(std::move(columns), num_rows); } + reader->readSuffix(); reader.reset(); + read_buf.reset(); - return {}; + if (!initialize()) + return {}; + + return generate(); } namespace @@ -322,9 +361,9 @@ Pipe StorageS3::read( /// Iterate through disclosed globs and make a source for each file StorageS3Source::DisclosedGlobIterator glob_iterator(*client_auth.client, client_auth.uri); - /// TODO: better to put first num_streams keys into pipeline - /// and put others dynamically in runtime - while (auto key = glob_iterator.next()) + + auto file_iterator = std::make_shared(glob_iterator); + for (size_t i = 0; i < num_streams; ++i) { pipes.emplace_back(std::make_shared( need_path_column, @@ -335,16 +374,12 @@ Pipe StorageS3::read( local_context, metadata_snapshot->getColumns(), max_block_size, - chooseCompressionMethod(client_auth.uri.key, compression_method), + compression_method, client_auth.client, client_auth.uri.bucket, - key.value())); + file_iterator)); } - auto pipe = Pipe::unitePipes(std::move(pipes)); - // It's possible to have many buckets read from s3, resize(num_streams) might open too many handles at the same time. - // Using narrowPipe instead. - narrowPipe(pipe, num_streams); - return pipe; + return Pipe::unitePipes(std::move(pipes)); } BlockOutputStreamPtr StorageS3::write(const ASTPtr & /*query*/, const StorageMetadataPtr & metadata_snapshot, ContextPtr local_context) @@ -402,7 +437,8 @@ void registerStorageS3Impl(const String & name, StorageFactory & factory) if (engine_args.size() < 2 || engine_args.size() > 5) throw Exception( - "Storage S3 requires 2 to 5 arguments: url, [access_key_id, secret_access_key], name of used format and [compression_method].", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); + "Storage S3 requires 2 to 5 arguments: url, [access_key_id, secret_access_key], name of used format and [compression_method].", + ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); for (auto & engine_arg : engine_args) engine_arg = evaluateConstantExpressionOrIdentifierAsLiteral(engine_arg, args.getLocalContext()); diff --git a/src/Storages/StorageS3.h b/src/Storages/StorageS3.h index 6e9202abb6f..1cb26470c51 100644 --- a/src/Storages/StorageS3.h +++ b/src/Storages/StorageS3.h @@ -17,6 +17,7 @@ #include #include #include +#include namespace Aws::S3 { @@ -30,7 +31,6 @@ class StorageS3SequentialSource; class StorageS3Source : public SourceWithProgress { public: - class DisclosedGlobIterator { public: @@ -42,6 +42,13 @@ public: std::shared_ptr pimpl; }; + struct FileIterator + { + virtual ~FileIterator() = default; + virtual std::optional next() = 0; + }; + + static Block getHeader(Block sample_block, bool with_path_column, bool with_file_column); StorageS3Source( @@ -50,13 +57,13 @@ public: const String & format, String name_, const Block & sample_block, - const Context & context, - const ColumnsDescription & columns, - UInt64 max_block_size, - const CompressionMethod compression_method, - const std::shared_ptr & client, + const Context & context_, + const ColumnsDescription & columns_, + UInt64 max_block_size_, + const String compression_hint_, + const std::shared_ptr & client_, const String & bucket, - const String & key); + std::shared_ptr file_iterator_); String getName() const override; @@ -64,12 +71,26 @@ public: private: String name; + String bucket; + String file_path; + String format; + Context context; + ColumnsDescription columns_desc; + UInt64 max_block_size; + String compression_hint; + std::shared_ptr client; + Block sample_block; + + std::unique_ptr read_buf; BlockInputStreamPtr reader; bool initialized = false; bool with_file_column = false; bool with_path_column = false; - String file_path; + std::shared_ptr file_iterator; + + /// Recreate ReadBuffer and BlockInputStream for each file. + bool initialize(); }; /** @@ -136,6 +157,23 @@ private: String compression_method; String name; + struct LocalFileIterator : public StorageS3Source::FileIterator + { + explicit LocalFileIterator(StorageS3Source::DisclosedGlobIterator glob_iterator_) + : glob_iterator(glob_iterator_) {} + + StorageS3Source::DisclosedGlobIterator glob_iterator; + /// Several files could be processed in parallel + /// from different sources + std::mutex iterator_mutex; + + std::optional next() override + { + std::lock_guard lock(iterator_mutex); + return glob_iterator.next(); + } + }; + static void updateClientAndAuthSettings(ContextPtr, ClientAuthentificaiton &); }; diff --git a/src/Storages/StorageS3Distributed.cpp b/src/Storages/StorageS3Distributed.cpp index 604750e4e9a..9a7707cb418 100644 --- a/src/Storages/StorageS3Distributed.cpp +++ b/src/Storages/StorageS3Distributed.cpp @@ -9,6 +9,7 @@ #include #include "Client/Connection.h" #include "Core/QueryProcessingStage.h" +#include #include "DataStreams/RemoteBlockInputStream.h" #include #include @@ -34,7 +35,6 @@ #include #include #include -#include #include #include @@ -56,119 +56,6 @@ namespace ErrorCodes extern const int LOGICAL_ERROR; } -struct StorageS3SourceBuilder -{ - bool need_path; - bool need_file; - String format; - String name; - Block sample_block; - const Context & context; - const ColumnsDescription & columns; - UInt64 max_block_size; - String compression_method; -}; - -class StorageS3SequentialSource : public SourceWithProgress -{ -public: - - static Block getHeader(Block sample_block, bool with_path_column, bool with_file_column) - { - if (with_path_column) - sample_block.insert({DataTypeString().createColumn(), std::make_shared(), "_path"}); - if (with_file_column) - sample_block.insert({DataTypeString().createColumn(), std::make_shared(), "_file"}); - - return sample_block; - } - - StorageS3SequentialSource( - String initial_query_id_, - NextTaskCallback read_task_callback_, - const ClientAuthentificationBuilder & client_auth_builder_, - const StorageS3SourceBuilder & s3_builder_) - : SourceWithProgress(getHeader(s3_builder_.sample_block, s3_builder_.need_path, s3_builder_.need_file)) - , initial_query_id(initial_query_id_) - , s3b(s3_builder_) - , cab(client_auth_builder_) - , read_task_callback(read_task_callback_) - { - createOrUpdateInnerSource(); - } - - String getName() const override - { - return "StorageS3SequentialSource"; - } - - Chunk generate() override - { - if (!inner) - return {}; - - auto chunk = inner->generate(); - if (!chunk) - { - if (!createOrUpdateInnerSource()) - return {}; - else - chunk = inner->generate(); - } - return chunk; - } - -private: - - String askAboutNextKey() - { - try - { - return read_task_callback(initial_query_id); - } - catch (...) - { - tryLogCurrentException(&Poco::Logger::get(getName())); - throw; - } - } - - bool createOrUpdateInnerSource() - { - auto next_string = askAboutNextKey(); - if (next_string.empty()) - return false; - - auto next_uri = S3::URI(Poco::URI(next_string)); - - auto client_auth = StorageS3::ClientAuthentificaiton{ - next_uri, - cab.access_key_id, - cab.secret_access_key, - cab.max_connections, - {}, {}}; - StorageS3::updateClientAndAuthSettings(s3b.context, client_auth); - - inner = std::make_unique( - s3b.need_path, s3b.need_file, s3b.format, s3b.name, - s3b.sample_block, s3b.context, s3b.columns, s3b.max_block_size, - chooseCompressionMethod(client_auth.uri.key, ""), - client_auth.client, - client_auth.uri.bucket, - next_uri.key - ); - - return true; - } - - String initial_query_id; - StorageS3SourceBuilder s3b; - ClientAuthentificationBuilder cab; - std::unique_ptr inner; - NextTaskCallback read_task_callback; -}; - - StorageS3Distributed::StorageS3Distributed( const String & filename_, @@ -183,17 +70,18 @@ StorageS3Distributed::StorageS3Distributed( const Context & context_, const String & compression_method_) : IStorage(table_id_) + , client_auth{S3::URI{Poco::URI{filename_}}, access_key_id_, secret_access_key_, max_connections_, {}, {}} /// Client and settings will be updated later , filename(filename_) , cluster_name(cluster_name_) , cluster(context_.getCluster(cluster_name)->getClusterWithReplicasAsShards(context_.getSettings())) , format_name(format_name_) , compression_method(compression_method_) - , cli_builder{access_key_id_, secret_access_key_, max_connections_} { StorageInMemoryMetadata storage_metadata; storage_metadata.setColumns(columns_); storage_metadata.setConstraints(constraints_); setInMemoryMetadata(storage_metadata); + StorageS3::updateClientAndAuthSettings(context_, client_auth); } @@ -206,6 +94,8 @@ Pipe StorageS3Distributed::read( size_t max_block_size, unsigned /*num_streams*/) { + StorageS3::updateClientAndAuthSettings(context, client_auth); + /// Secondary query, need to read from S3 if (context.getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY) { @@ -219,16 +109,21 @@ Pipe StorageS3Distributed::read( need_file_column = true; } - StorageS3SourceBuilder s3builder{ - need_path_column, need_file_column, - format_name, getName(), + std::cout << "Got UUID on worker " << toString(context.getClientInfo().task_identifier) << std::endl; + + auto file_iterator = std::make_shared( + context.getNextTaskCallback(), + context.getInitialQueryId()); + + return Pipe(std::make_shared( + need_path_column, need_file_column, format_name, getName(), metadata_snapshot->getSampleBlock(), context, metadata_snapshot->getColumns(), max_block_size, - compression_method - }; - - return Pipe(std::make_shared( - context.getInitialQueryId(), context.getNextTaskCallback(), cli_builder, s3builder)); + compression_method, + client_auth.client, + client_auth.uri.bucket, + file_iterator + )); } /// The code from here and below executes on initiator @@ -254,6 +149,8 @@ Pipe StorageS3Distributed::read( node.secure )); + std::cout << "S3Distributed initiator " << toString(context.getClientInfo().task_identifier) << std::endl; + auto remote_query_executor = std::make_shared( *connections.back(), queryToString(query_info.query), header, context, /*throttler=*/nullptr, scalars, Tables(), processed_stage); diff --git a/src/Storages/StorageS3Distributed.h b/src/Storages/StorageS3Distributed.h index 9bfb792766d..4a811f4f84f 100644 --- a/src/Storages/StorageS3Distributed.h +++ b/src/Storages/StorageS3Distributed.h @@ -5,9 +5,9 @@ #if USE_AWS_S3 #include "Client/Connection.h" -#include "Interpreters/Cluster.h" -#include "Storages/IStorage.h" -#include "Storages/StorageS3.h" +#include +#include +#include #include #include @@ -67,13 +67,33 @@ protected: private: /// Connections from initiator to other nodes std::vector> connections; + StorageS3::ClientAuthentificaiton client_auth; + String filename; std::string cluster_name; ClusterPtr cluster; String format_name; String compression_method; - ClientAuthentificationBuilder cli_builder; + + + struct DistributedFileIterator : public StorageS3Source::FileIterator + { + DistributedFileIterator(NextTaskCallback callback_, String identifier_) + : callback(callback_), identifier(identifier_) {} + + NextTaskCallback callback; + String identifier; + + std::optional next() override + { + std::cout << "DistributedFileIterator" << std::endl; + std::cout << identifier << std::endl; + auto answer = callback(identifier); + std::cout << answer << std::endl; + return answer; + } + }; }; diff --git a/src/TableFunctions/TableFunctionS3Distributed.cpp b/src/TableFunctions/TableFunctionS3Distributed.cpp index 814b2586242..36f16e12d4d 100644 --- a/src/TableFunctions/TableFunctionS3Distributed.cpp +++ b/src/TableFunctions/TableFunctionS3Distributed.cpp @@ -115,10 +115,15 @@ StoragePtr TableFunctionS3Distributed::executeImpl( StorageS3::updateClientAndAuthSettings(context, client_auth); StorageS3Source::DisclosedGlobIterator iterator(*client_auth.client, client_auth.uri); - auto callback = [endpoint = client_auth.uri.endpoint, bucket = client_auth.uri.bucket, iterator = std::move(iterator)]() mutable -> String + auto task_identifier = UUIDHelpers::generateV4(); + const_cast(context).getClientInfo().task_identifier = toString(task_identifier); + + std::cout << "Created UUID: " << toString(context.getClientInfo().task_identifier) << std::endl; + + auto callback = [iterator = std::move(iterator)]() mutable -> String { if (auto value = iterator.next()) - return endpoint + '/' + bucket + '/' + *value; + return *value; return {}; };