fixed uuid

This commit is contained in:
Nikita Mikhaylov 2021-04-08 17:22:19 +03:00
parent 7276b40556
commit 8a4b5a586e
6 changed files with 53 additions and 69 deletions

View File

@ -74,6 +74,9 @@
/// Minimum revision supporting OpenTelemetry
#define DBMS_MIN_REVISION_WITH_OPENTELEMETRY 54442
/// Minimum revision supporting task processing on cluster
#define DBMS_MIN_REVISION_WITH_CLUSTER_PROCESSING 54443
/// Minimum revision supporting interserver secret.
#define DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET 54441

View File

@ -29,14 +29,11 @@ namespace ErrorCodes
RemoteQueryExecutor::RemoteQueryExecutor(
Connection & connection,
const String & query_,
const Block & header_,
ContextPtr context_,
ThrottlerPtr throttler,
const Scalars & scalars_,
const Tables & external_tables_,
QueryProcessingStage::Enum stage_)
: header(header_), query(query_), context(context_), scalars(scalars_), external_tables(external_tables_), stage(stage_)
const String & query_, const Block & header_, ContextPtr context_,
ThrottlerPtr throttler, const Scalars & scalars_, const Tables & external_tables_,
QueryProcessingStage::Enum stage_, std::optional<String> task_identifier_)
: header(header_), query(query_), context(context_)
, scalars(scalars_), external_tables(external_tables_), stage(stage_), task_identifier(task_identifier_)
{
create_connections = [this, &connection, throttler]()
{
@ -46,14 +43,11 @@ RemoteQueryExecutor::RemoteQueryExecutor(
RemoteQueryExecutor::RemoteQueryExecutor(
std::vector<IConnectionPool::Entry> && connections_,
const String & query_,
const Block & header_,
ContextPtr context_,
const ThrottlerPtr & throttler,
const Scalars & scalars_,
const Tables & external_tables_,
QueryProcessingStage::Enum stage_)
: header(header_), query(query_), context(context_), scalars(scalars_), external_tables(external_tables_), stage(stage_)
const String & query_, const Block & header_, ContextPtr context_,
const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_,
QueryProcessingStage::Enum stage_, std::optional<String> task_identifier_)
: header(header_), query(query_), context(context_)
, scalars(scalars_), external_tables(external_tables_), stage(stage_), task_identifier(task_identifier_)
{
create_connections = [this, connections_, throttler]() mutable {
return std::make_unique<MultiplexedConnections>(std::move(connections_), context->getSettingsRef(), throttler);
@ -62,14 +56,11 @@ RemoteQueryExecutor::RemoteQueryExecutor(
RemoteQueryExecutor::RemoteQueryExecutor(
const ConnectionPoolWithFailoverPtr & pool,
const String & query_,
const Block & header_,
ContextPtr context_,
const ThrottlerPtr & throttler,
const Scalars & scalars_,
const Tables & external_tables_,
QueryProcessingStage::Enum stage_)
: header(header_), query(query_), context(context_), scalars(scalars_), external_tables(external_tables_), stage(stage_)
const String & query_, const Block & header_, ContextPtr context_,
const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_,
QueryProcessingStage::Enum stage_, std::optional<String> task_identifier_)
: header(header_), query(query_), context(context_)
, scalars(scalars_), external_tables(external_tables_), stage(stage_), task_identifier(task_identifier_)
{
create_connections = [this, pool, throttler]()->std::unique_ptr<IConnections>
{
@ -189,6 +180,7 @@ void RemoteQueryExecutor::sendQuery()
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings);
ClientInfo modified_client_info = context->getClientInfo();
modified_client_info.query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
modified_client_info.task_identifier = task_identifier ? *task_identifier : "";
if (CurrentThread::isInitialized())
{
modified_client_info.client_trace_context = CurrentThread::get().thread_trace_context;
@ -200,8 +192,6 @@ void RemoteQueryExecutor::sendQuery()
connections->sendIgnoredPartUUIDs(duplicated_part_uuids);
}
std::cout << "RemoteQueryExecutor " << toString(context.getClientInfo().task_identifier) << std::endl;
connections->sendQuery(timeouts, query, query_id, stage, modified_client_info, true);
established = false;

View File

@ -37,21 +37,21 @@ public:
Connection & connection,
const String & query_, const Block & header_, ContextPtr context_,
ThrottlerPtr throttler_ = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete);
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete, std::optional<String> task_identifier_ = {});
/// Accepts several connections already taken from pool.
RemoteQueryExecutor(
std::vector<IConnectionPool::Entry> && connections_,
const String & query_, const Block & header_, ContextPtr context_,
const ThrottlerPtr & throttler = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete);
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete, std::optional<String> task_identifier_ = {});
/// Takes a pool and gets one or several connections from it.
RemoteQueryExecutor(
const ConnectionPoolWithFailoverPtr & pool,
const String & query_, const Block & header_, ContextPtr context_,
const ThrottlerPtr & throttler = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete);
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete, std::optional<String> task_identifier_ = {});
~RemoteQueryExecutor();
@ -119,6 +119,8 @@ private:
/// Temporary tables needed to be sent to remote servers
Tables external_tables;
QueryProcessingStage::Enum stage;
/// Initiator identifier for distributed task processing
std::optional<String> task_identifier;
/// Streams for reading from temporary tables and following sending of data
/// to remote servers for GLOBAL-subqueries

View File

@ -89,6 +89,7 @@ void ClientInfo::write(WriteBuffer & out, const UInt64 server_protocol_revision)
}
}
if (server_protocol_revision >= DBMS_MIN_REVISION_WITH_CLUSTER_PROCESSING)
writeBinary(task_identifier, out);
}
@ -166,6 +167,7 @@ void ClientInfo::read(ReadBuffer & in, const UInt64 client_protocol_revision)
}
}
if (client_protocol_revision >= DBMS_MIN_REVISION_WITH_CLUSTER_PROCESSING)
readBinary(task_identifier, in);
}

View File

@ -109,11 +109,9 @@ Pipe StorageS3Distributed::read(
need_file_column = true;
}
std::cout << "Got UUID on worker " << toString(context.getClientInfo().task_identifier) << std::endl;
auto file_iterator = std::make_shared<DistributedFileIterator>(
context.getNextTaskCallback(),
context.getInitialQueryId());
context.getClientInfo().task_identifier);
return Pipe(std::make_shared<StorageS3Source>(
need_path_column, need_file_column, format_name, getName(),
@ -127,6 +125,23 @@ Pipe StorageS3Distributed::read(
}
/// The code from here and below executes on initiator
S3::URI s3_uri(Poco::URI{filename});
StorageS3::updateClientAndAuthSettings(context, client_auth);
auto callback = [iterator = StorageS3Source::DisclosedGlobIterator(*client_auth.client, client_auth.uri)]() mutable -> String
{
if (auto value = iterator.next())
return *value;
return {};
};
auto task_identifier = toString(UUIDHelpers::generateV4());
std::cout << "Generated UUID : " << task_identifier << std::endl;
/// Register resolver, which will give other nodes a task std::make_unique
context.getReadTaskSupervisor()->registerNextTaskResolver(
std::make_unique<ReadTaskResolver>(task_identifier, std::move(callback)));
/// Calculate the header. This is significant, because some columns could be thrown away in some cases like query with count(*)
Block header =
@ -149,10 +164,11 @@ Pipe StorageS3Distributed::read(
node.secure
));
std::cout << "S3Distributed initiator " << toString(context.getClientInfo().task_identifier) << std::endl;
/// For unknown reason global context is passed to IStorage::read() method
/// So, task_identifier is passed as constructor argument. It is more obvious.
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
*connections.back(), queryToString(query_info.query), header, context, /*throttler=*/nullptr, scalars, Tables(), processed_stage);
*connections.back(), queryToString(query_info.query), header, context,
/*throttler=*/nullptr, scalars, Tables(), processed_stage, task_identifier);
pipes.emplace_back(std::make_shared<RemoteSource>(remote_query_executor, false, false));
}
@ -162,7 +178,8 @@ Pipe StorageS3Distributed::read(
return Pipe::unitePipes(std::move(pipes));
}
QueryProcessingStage::Enum StorageS3Distributed::getQueryProcessingStage(const Context & context, QueryProcessingStage::Enum /*to_stage*/, SelectQueryInfo &) const
QueryProcessingStage::Enum StorageS3Distributed::getQueryProcessingStage(
const Context & context, QueryProcessingStage::Enum /*to_stage*/, SelectQueryInfo &) const
{
/// Initiator executes query on remote node.
if (context.getClientInfo().query_kind == ClientInfo::QueryKind::INITIAL_QUERY) {

View File

@ -104,36 +104,6 @@ StoragePtr TableFunctionS3Distributed::executeImpl(
const ASTPtr & /*filename*/, const Context & context,
const std::string & table_name, ColumnsDescription /*cached_columns*/) const
{
UInt64 max_connections = context.getSettingsRef().s3_max_connections;
/// Initiator specific logic
while (context.getClientInfo().query_kind == ClientInfo::QueryKind::INITIAL_QUERY)
{
auto poco_uri = Poco::URI{filename};
S3::URI s3_uri(poco_uri);
StorageS3::ClientAuthentificaiton client_auth{s3_uri, access_key_id, secret_access_key, max_connections, {}, {}};
StorageS3::updateClientAndAuthSettings(context, client_auth);
StorageS3Source::DisclosedGlobIterator iterator(*client_auth.client, client_auth.uri);
auto task_identifier = UUIDHelpers::generateV4();
const_cast<Context &>(context).getClientInfo().task_identifier = toString(task_identifier);
std::cout << "Created UUID: " << toString(context.getClientInfo().task_identifier) << std::endl;
auto callback = [iterator = std::move(iterator)]() mutable -> String
{
if (auto value = iterator.next())
return *value;
return {};
};
/// Register resolver, which will give other nodes a task std::make_unique
context.getReadTaskSupervisor()->registerNextTaskResolver(
std::make_unique<ReadTaskResolver>(context.getCurrentQueryId(), std::move(callback)));
break;
}
StoragePtr storage = StorageS3Distributed::create(
filename,
access_key_id,
@ -141,7 +111,7 @@ StoragePtr TableFunctionS3Distributed::executeImpl(
StorageID(getDatabaseName(), table_name),
cluster_name,
format,
max_connections,
context.getSettingsRef().s3_max_connections,
getActualTableStructure(context),
ConstraintsDescription{},
context,