review fixes

This commit is contained in:
Nikita Mikhaylov 2021-04-13 13:59:02 +03:00
parent f36a715c32
commit 024374a2ec
15 changed files with 29 additions and 37 deletions

View File

@ -552,11 +552,11 @@ void Connection::sendIgnoredPartUUIDs(const std::vector<UUID> & uuids)
}
void Connection::sendReadTaskResponse(const std::optional<String> & response)
void Connection::sendReadTaskResponse(const String & response)
{
writeVarUInt(Protocol::Client::ReadTaskResponse, *out);
writeVarUInt(DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION, *out);
writeStringBinary(response.has_value() ? String(*response) : "", *out);
writeStringBinary(response, *out);
out->next();
}

View File

@ -159,7 +159,7 @@ public:
/// Send parts' uuids to excluded them from query processing
void sendIgnoredPartUUIDs(const std::vector<UUID> & uuids);
void sendReadTaskResponse(const std::optional<String> &);
void sendReadTaskResponse(const String &);
/// Send prepared block of data (serialized and, if need, compressed), that will be read from 'input'.
/// You could pass size of serialized/compressed block.

View File

@ -90,7 +90,7 @@ public:
const ClientInfo & client_info,
bool with_pending_data) override;
void sendReadTaskResponse(const std::optional<String> &) override
void sendReadTaskResponse(const String &) override
{
throw Exception("sendReadTaskResponse in not supported with HedgedConnections", ErrorCodes::LOGICAL_ERROR);
}

View File

@ -24,7 +24,7 @@ public:
const ClientInfo & client_info,
bool with_pending_data) = 0;
virtual void sendReadTaskResponse(const std::optional<String> &) = 0;
virtual void sendReadTaskResponse(const String &) = 0;
/// Get packet from any replica.
virtual Packet receivePacket() = 0;

View File

@ -156,7 +156,7 @@ void MultiplexedConnections::sendIgnoredPartUUIDs(const std::vector<UUID> & uuid
}
void MultiplexedConnections::sendReadTaskResponse(const std::optional<String> & response)
void MultiplexedConnections::sendReadTaskResponse(const String & response)
{
std::lock_guard lock(cancel_mutex);
if (cancelled)

View File

@ -39,7 +39,7 @@ public:
const ClientInfo & client_info,
bool with_pending_data) override;
void sendReadTaskResponse(const std::optional<String> &) override;
void sendReadTaskResponse(const String &) override;
Packet receivePacket() override;

View File

@ -74,8 +74,7 @@
/// Minimum revision supporting OpenTelemetry
#define DBMS_MIN_REVISION_WITH_OPENTELEMETRY 54442
/// Minimum revision supporting task processing on cluster
#define DBMS_MIN_REVISION_WITH_CLUSTER_PROCESSING 54443
#define DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION 1
/// Minimum revision supporting interserver secret.

View File

@ -27,7 +27,7 @@ using ProfileInfoCallback = std::function<void(const BlockStreamProfileInfo & in
class RemoteQueryExecutorReadContext;
/// This is the same type as StorageS3Source::IteratorWrapper
using TaskIterator = std::function<std::optional<String>()>;
using TaskIterator = std::function<String()>;
/// This class allows one to launch queries on remote replicas of one shard and get results
class RemoteQueryExecutor

View File

@ -129,7 +129,7 @@ using InputInitializer = std::function<void(ContextPtr, const StoragePtr &)>;
using InputBlocksReader = std::function<Block(ContextPtr)>;
/// Used in distributed task processing
using ReadTaskCallback = std::function<std::optional<String>()>;
using ReadTaskCallback = std::function<String()>;
/// An empty interface for an arbitrary object that may be attached by a shared pointer
/// to query context, when using ClickHouse as a library.

View File

@ -289,7 +289,7 @@ void TCPHandler::runImpl()
customizeContext(query_context);
/// This callback is needed for requesting read tasks inside pipeline for distributed processing
query_context->setReadTaskCallback([this]() -> std::optional<String>
query_context->setReadTaskCallback([this]() -> String
{
std::lock_guard lock(task_callback_mutex);
sendReadTaskRequestAssumeLocked();
@ -1037,7 +1037,7 @@ void TCPHandler::receiveIgnoredPartUUIDs()
}
std::optional<String> TCPHandler::receiveReadTaskResponseAssumeLocked()
String TCPHandler::receiveReadTaskResponseAssumeLocked()
{
UInt64 packet_type = 0;
readVarUInt(packet_type, *in);
@ -1060,8 +1060,6 @@ std::optional<String> TCPHandler::receiveReadTaskResponseAssumeLocked()
throw Exception("Protocol version for distributed processing mismatched", ErrorCodes::UNKNOWN_PROTOCOL);
String response;
readStringBinary(response, *in);
if (response.empty())
return std::nullopt;
return response;
}

View File

@ -170,7 +170,7 @@ private:
bool receivePacket();
void receiveQuery();
void receiveIgnoredPartUUIDs();
std::optional<String> receiveReadTaskResponseAssumeLocked();
String receiveReadTaskResponseAssumeLocked();
bool receiveData(bool scalar);
bool readDataNext(const size_t & poll_interval, const int & receive_timeout);
void readData(const Settings & connection_settings);

View File

@ -74,7 +74,7 @@ public:
fillInternalBufferAssumeLocked();
}
std::optional<String> next()
String next()
{
std::lock_guard lock(mutex);
return nextAssumeLocked();
@ -82,7 +82,7 @@ public:
private:
std::optional<String> nextAssumeLocked()
String nextAssumeLocked()
{
if (buffer_iter != buffer.end())
{
@ -92,7 +92,7 @@ private:
}
if (is_finished)
return std::nullopt;
return {};
fillInternalBufferAssumeLocked();
@ -141,7 +141,7 @@ private:
StorageS3Source::DisclosedGlobIterator::DisclosedGlobIterator(Aws::S3::S3Client & client_, const S3::URI & globbed_uri_)
: pimpl(std::make_shared<StorageS3Source::DisclosedGlobIterator::Impl>(client_, globbed_uri_)) {}
std::optional<String> StorageS3Source::DisclosedGlobIterator::next()
String StorageS3Source::DisclosedGlobIterator::next()
{
return pimpl->next();
}
@ -190,17 +190,11 @@ StorageS3Source::StorageS3Source(
bool StorageS3Source::initialize()
{
String current_key;
if (auto result = (*file_iterator)())
{
current_key = result.value();
file_path = bucket + "/" + current_key;
}
else
{
/// Do not initialize read_buffer and stream.
String current_key = (*file_iterator)();
if (current_key.empty())
return false;
}
file_path = bucket + "/" + current_key;
read_buf = wrapReadBufferWithCompressionMethod(
std::make_unique<ReadBufferFromS3>(client, bucket, current_key), chooseCompressionMethod(current_key, compression_hint));

View File

@ -35,14 +35,14 @@ public:
{
public:
DisclosedGlobIterator(Aws::S3::S3Client &, const S3::URI &);
std::optional<String> next();
String next();
private:
class Impl;
/// shared_ptr to have copy constructor
std::shared_ptr<Impl> pimpl;
};
using IteratorWrapper = std::function<std::optional<String>()>;
using IteratorWrapper = std::function<String()>;
static Block getHeader(Block sample_block, bool with_path_column, bool with_file_column);

View File

@ -107,7 +107,7 @@ Pipe StorageS3Cluster::read(
/// Save callback not to capture context by reference of copy it.
auto file_iterator = std::make_shared<StorageS3Source::IteratorWrapper>(
[callback = context->getReadTaskCallback()]() -> std::optional<String> {
[callback = context->getReadTaskCallback()]() -> String {
return callback();
});
@ -127,7 +127,7 @@ Pipe StorageS3Cluster::read(
StorageS3::updateClientAndAuthSettings(context, client_auth);
auto iterator = std::make_shared<StorageS3Source::DisclosedGlobIterator>(*client_auth.client, client_auth.uri);
auto callback = std::make_shared<StorageS3Source::IteratorWrapper>([iterator]() mutable -> std::optional<String>
auto callback = std::make_shared<StorageS3Source::IteratorWrapper>([iterator]() mutable -> String
{
return iterator->next();
});
@ -141,6 +141,8 @@ Pipe StorageS3Cluster::read(
Pipes pipes;
connections.reserve(cluster->getShardCount());
const bool add_agg_info = processed_stage == QueryProcessingStage::WithMergeableState;
for (const auto & replicas : cluster->getShardsAddresses())
{
/// There will be only one replica, because we consider each replica as a shard
@ -160,7 +162,7 @@ Pipe StorageS3Cluster::read(
*connections.back(), queryToString(query_info.query), header, context,
/*throttler=*/nullptr, scalars, Tables(), processed_stage, callback);
pipes.emplace_back(std::make_shared<RemoteSource>(remote_query_executor, false, false));
pipes.emplace_back(std::make_shared<RemoteSource>(remote_query_executor, add_agg_info, false));
}
}

View File

@ -16,8 +16,7 @@ class Context;
* s3Cluster(cluster_name, source, [access_key_id, secret_access_key,] format, structure)
* A table function, which allows to process many files from S3 on a specific cluster
* On initiator it creates a connection to _all_ nodes in cluster, discloses asterics
* in S3 file path and register all tasks (paths in S3) in NextTaskResolver to dispatch
* them dynamically.
* in S3 file path and dispatch each file dynamically.
* On worker node it asks initiator about next task to process, processes it.
* This is repeated until the tasks are finished.
*/