added hash of itiator address

This commit is contained in:
Nikita Mikhaylov 2021-03-24 21:36:31 +03:00
parent 2549468c14
commit 4e4b383214
11 changed files with 192 additions and 153 deletions

View File

@ -114,7 +114,7 @@ RemoteQueryExecutor::~RemoteQueryExecutor()
/** If we receive a block with slightly different column types, or with excessive columns,
* we will adapt it to expected structure.
*/
[[maybe_unused]] static Block adaptBlockStructure(const Block & block, const Block & header)
static Block adaptBlockStructure(const Block & block, const Block & header)
{
/// Special case when reader doesn't care about result structure. Deprecated and used only in Benchmark, PerformanceTest.
if (!header)
@ -123,9 +123,6 @@ RemoteQueryExecutor::~RemoteQueryExecutor()
Block res;
res.info = block.info;
std::cout << "block " << block.dumpStructure() << std::endl;
std::cout << "header " << header.dumpStructure() << std::endl;
for (const auto & elem : header)
{
ColumnPtr column;
@ -156,17 +153,7 @@ RemoteQueryExecutor::~RemoteQueryExecutor()
column = elem.column->cloneResized(block.rows());
}
else
{
// if (!block.has(elem.name))
// {
// column = elem.type->createColumn();
// }
// else
// {
// column = castColumn(block.getByName(elem.name), elem.type);
// }
column = castColumn(block.getByName(elem.name), elem.type);
}
res.insert({column, elem.type, elem.name});
}
@ -327,12 +314,7 @@ std::optional<Block> RemoteQueryExecutor::processPacket(Packet packet)
case Protocol::Server::Data:
/// If the block is not empty and is not a header block
if (packet.block && (packet.block.rows() > 0))
{
// return packet.block;
Block anime = adaptBlockStructure(packet.block, header);
std::cout << "RemoteQueryExecutor " << anime.dumpStructure() << std::endl;
return anime;
}
return adaptBlockStructure(packet.block, header);
break; /// If the block is empty - we will receive other packets before EndOfStream.
case Protocol::Server::Exception:

View File

@ -399,7 +399,6 @@ namespace S3
else
throw Exception("Bucket or key name are invalid in S3 URI: " + uri.toString(), ErrorCodes::BAD_ARGUMENTS);
}
}
}

View File

@ -138,6 +138,17 @@ String Cluster::Address::toString() const
return toString(host_name, port);
}
String Cluster::Address::getHash() const
{
SipHash hash;
hash.update(host_name);
hash.update(std::to_string(port));
hash.update(user);
hash.update(password);
return std::to_string(hash.get64());
}
String Cluster::Address::toString(const String & host_name, UInt16 port)
{
return escapeForFileName(host_name) + ':' + DB::toString(port);

View File

@ -122,6 +122,9 @@ public:
/// Returns 'escaped_host_name:port'
String toString() const;
/// Returns hash of all fields
String getHash() const;
/// Returns 'host_name:port'
String readableString() const;

View File

@ -26,9 +26,9 @@ struct DatabaseAndTableWithAlias
UUID uuid = UUIDHelpers::Nil;
DatabaseAndTableWithAlias() = default;
DatabaseAndTableWithAlias(const ASTPtr & identifier_node, const String & current_database = "");
DatabaseAndTableWithAlias(const ASTIdentifier & identifier, const String & current_database = "");
DatabaseAndTableWithAlias(const ASTTableExpression & table_expression, const String & current_database = "");
explicit DatabaseAndTableWithAlias(const ASTPtr & identifier_node, const String & current_database = "");
explicit DatabaseAndTableWithAlias(const ASTIdentifier & identifier, const String & current_database = "");
explicit DatabaseAndTableWithAlias(const ASTTableExpression & table_expression, const String & current_database = "");
/// "alias." or "table." if alias is empty
String getQualifiedNamePrefix(bool with_dot = true) const;
@ -80,7 +80,7 @@ private:
void addAdditionalColumns(NamesAndTypesList & target, const NamesAndTypesList & addition)
{
target.insert(target.end(), addition.begin(), addition.end());
for (auto & col : addition)
for (const auto & col : addition)
names.insert(col.name);
}

View File

@ -20,7 +20,7 @@ public:
bool second_with_brackets;
public:
ASTPair(bool second_with_brackets_)
explicit ASTPair(bool second_with_brackets_)
: second_with_brackets(second_with_brackets_)
{
}
@ -49,7 +49,7 @@ public:
/// Has brackets around arguments
bool has_brackets;
ASTFunctionWithKeyValueArguments(bool has_brackets_ = true)
explicit ASTFunctionWithKeyValueArguments(bool has_brackets_ = true)
: has_brackets(has_brackets_)
{
}

View File

@ -45,7 +45,7 @@ protected:
class ParserIdentifier : public IParserBase
{
public:
ParserIdentifier(bool allow_query_parameter_ = false) : allow_query_parameter(allow_query_parameter_) {}
explicit ParserIdentifier(bool allow_query_parameter_ = false) : allow_query_parameter(allow_query_parameter_) {}
protected:
const char * getName() const override { return "identifier"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
@ -59,7 +59,7 @@ protected:
class ParserCompoundIdentifier : public IParserBase
{
public:
ParserCompoundIdentifier(bool table_name_with_optional_uuid_ = false, bool allow_query_parameter_ = false)
explicit ParserCompoundIdentifier(bool table_name_with_optional_uuid_ = false, bool allow_query_parameter_ = false)
: table_name_with_optional_uuid(table_name_with_optional_uuid_), allow_query_parameter(allow_query_parameter_)
{
}
@ -85,7 +85,7 @@ public:
using ColumnTransformers = MultiEnum<ColumnTransformer, UInt8>;
static constexpr auto AllTransformers = ColumnTransformers{ColumnTransformer::APPLY, ColumnTransformer::EXCEPT, ColumnTransformer::REPLACE};
ParserColumnsTransformers(ColumnTransformers allowed_transformers_ = AllTransformers, bool is_strict_ = false)
explicit ParserColumnsTransformers(ColumnTransformers allowed_transformers_ = AllTransformers, bool is_strict_ = false)
: allowed_transformers(allowed_transformers_)
, is_strict(is_strict_)
{}
@ -103,7 +103,7 @@ class ParserAsterisk : public IParserBase
{
public:
using ColumnTransformers = ParserColumnsTransformers::ColumnTransformers;
ParserAsterisk(ColumnTransformers allowed_transformers_ = ParserColumnsTransformers::AllTransformers)
explicit ParserAsterisk(ColumnTransformers allowed_transformers_ = ParserColumnsTransformers::AllTransformers)
: allowed_transformers(allowed_transformers_)
{}
@ -129,7 +129,7 @@ class ParserColumnsMatcher : public IParserBase
{
public:
using ColumnTransformers = ParserColumnsTransformers::ColumnTransformers;
ParserColumnsMatcher(ColumnTransformers allowed_transformers_ = ParserColumnsTransformers::AllTransformers)
explicit ParserColumnsMatcher(ColumnTransformers allowed_transformers_ = ParserColumnsTransformers::AllTransformers)
: allowed_transformers(allowed_transformers_)
{}
@ -149,7 +149,7 @@ protected:
class ParserFunction : public IParserBase
{
public:
ParserFunction(bool allow_function_parameters_ = true, bool is_table_function_ = false)
explicit ParserFunction(bool allow_function_parameters_ = true, bool is_table_function_ = false)
: allow_function_parameters(allow_function_parameters_), is_table_function(is_table_function_)
{
}

View File

@ -31,10 +31,12 @@
#include <Storages/SelectQueryInfo.h>
#include <Storages/StorageS3.h>
#include <Parsers/queryToString.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Interpreters/getHeaderForProcessingStage.h>
#include <Interpreters/SelectQueryOptions.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/getTableExpressions.h>
#include <Poco/Logger.h>
#include <Poco/Net/TCPServerConnection.h>
@ -61,6 +63,19 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
struct StorageS3SourceBuilder
{
bool need_path;
bool need_file;
String format;
String name;
Block sample_block;
const Context & context;
const ColumnsDescription & columns;
UInt64 max_block_size;
String compression_method;
};
class StorageS3SequentialSource : public SourceWithProgress
{
public:
@ -77,37 +92,23 @@ public:
StorageS3SequentialSource(
String initial_query_id_,
bool need_path_,
bool need_file_,
const String & format_,
String name_,
const Block & sample_block_,
const Context & context_,
const ColumnsDescription & columns_,
UInt64 max_block_size_,
const CompressionMethod compression_method_,
StorageS3::ClientAuthentificaiton & client_auth_)
: SourceWithProgress(getHeader(sample_block_, need_path_, need_file_))
, need_path(need_path_)
, need_file(need_file_)
, format(format_)
, name(name_)
, sample_block(sample_block_)
, context(context_)
, columns(columns_)
, max_block_size(max_block_size_)
, compression_method(compression_method_)
, client_auth(client_auth_)
Cluster::Address initiator,
const ClientAuthentificationBuilder & client_auth_builder_,
const StorageS3SourceBuilder & s3_builder_)
: SourceWithProgress(getHeader(s3_builder_.sample_block, s3_builder_.need_path, s3_builder_.need_file))
, initial_query_id(initial_query_id_)
, s3_source_builder(s3_builder_)
, cli_builder(client_auth_builder_)
{
initiator_connection = std::make_shared<Connection>(
/*host*/"127.0.0.1",
/*port*/9000,
/*default_database=*/context.getGlobalContext().getCurrentDatabase(),
/*user=*/context.getClientInfo().initial_user,
/*password=*/"",
/*cluster=*/"",
/*cluster_secret=*/""
connections = std::make_shared<ConnectionPool>(
/*max_connections*/3,
/*host*/initiator.host_name,
/*port*/initiator.port,
/*default_database=*/s3_builder_.context.getGlobalContext().getCurrentDatabase(),
/*user=*/s3_builder_.context.getClientInfo().initial_user,
/*password=*/initiator.password,
/*cluster=*/initiator.cluster,
/*cluster_secret=*/initiator.cluster_secret
);
createOrUpdateInnerSource();
@ -115,7 +116,7 @@ public:
String getName() const override
{
return name;
return "StorageS3SequentialSource";
}
Chunk generate() override
@ -131,7 +132,6 @@ public:
else
chunk = inner->generate();
}
std::cout << "generate() " << chunk.dumpStructure() << std::endl;
return chunk;
}
@ -141,9 +141,9 @@ private:
{
try
{
initiator_connection->connect(timeouts);
initiator_connection->sendNextTaskRequest(initial_query_id);
auto packet = initiator_connection->receivePacket();
auto connection = connections->get(timeouts);
connection->sendNextTaskRequest(initial_query_id);
auto packet = connection->receivePacket();
assert(packet.type = Protocol::Server::NextTaskReply);
LOG_TRACE(&Poco::Logger::get("StorageS3SequentialSource"), "Got new task {}", packet.next_task);
return packet.next_task;
@ -155,28 +155,32 @@ private:
}
}
bool createOrUpdateInnerSource()
{
auto next_string = askAboutNextKey();
std::cout << "createOrUpdateInnerSource " << next_string << std::endl;
if (next_string.empty())
return false;
auto next_uri = S3::URI(Poco::URI(next_string));
assert(next_uri.bucket == client_auth.uri.bucket);
auto client_auth = StorageS3::ClientAuthentificaiton{
next_uri,
cli_builder.access_key_id,
cli_builder.secret_access_key,
cli_builder.max_connections,
{}, {}};
StorageS3::updateClientAndAuthSettings(s3_source_builder.context, client_auth);
inner = std::make_unique<StorageS3Source>(
need_path,
need_file,
format,
name,
sample_block,
context,
columns,
max_block_size,
compression_method,
s3_source_builder.need_path,
s3_source_builder.need_file,
s3_source_builder.format,
s3_source_builder.name,
s3_source_builder.sample_block,
s3_source_builder.context,
s3_source_builder.columns,
s3_source_builder.max_block_size,
chooseCompressionMethod(client_auth.uri.key, s3_source_builder.compression_method),
client_auth.client,
client_auth.uri.bucket,
next_uri.key
@ -184,30 +188,24 @@ private:
return true;
}
bool need_path;
bool need_file;
String format;
String name;
Block sample_block;
const Context & context;
const ColumnsDescription & columns;
UInt64 max_block_size;
const CompressionMethod compression_method;
/// This is used to ask about next task
String initial_query_id;
StorageS3SourceBuilder s3_source_builder;
ClientAuthentificationBuilder cli_builder;
std::unique_ptr<StorageS3Source> inner;
StorageS3::ClientAuthentificaiton client_auth;
/// One second just in case
ConnectionTimeouts timeouts{{1, 0}, {1, 0}, {1, 0}};
std::shared_ptr<Connection> initiator_connection;
/// This is used to ask about next task
String initial_query_id;
std::shared_ptr<ConnectionPool> connections;
};
StorageS3Distributed::StorageS3Distributed(
const S3::URI & uri_,
IAST::Hash tree_hash_,
const String & address_hash_or_filename_,
const String & access_key_id_,
const String & secret_access_key_,
const StorageID & table_id_,
@ -219,17 +217,18 @@ StorageS3Distributed::StorageS3Distributed(
const Context & context_,
const String & compression_method_)
: IStorage(table_id_)
, tree_hash(tree_hash_)
, address_hash_or_filename(address_hash_or_filename_)
, cluster_name(cluster_name_)
, cluster(context_.getCluster(cluster_name)->getClusterWithReplicasAsShards(context_.getSettings()))
, client_auth{uri_, access_key_id_, secret_access_key_, max_connections_, {}, {}}
, format_name(format_name_)
, compression_method(compression_method_)
, cli_builder{access_key_id_, secret_access_key_, max_connections_}
{
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
storage_metadata.setConstraints(constraints_);
setInMemoryMetadata(storage_metadata);
StorageS3::updateClientAndAuthSettings(context_, client_auth);
}
@ -246,7 +245,16 @@ Pipe StorageS3Distributed::read(
/// Secondary query, need to read from S3
if (context.getCurrentQueryId() != context.getInitialQueryId())
{
StorageS3::updateClientAndAuthSettings(context, client_auth);
/// Find initiator in cluster
Cluster::Address initiator;
for (const auto & replicas : cluster->getShardsAddresses())
for (const auto & node : replicas)
if (node.getHash() == address_hash_or_filename)
{
initiator = node;
break;
}
bool need_path_column = false;
bool need_file_column = false;
@ -258,13 +266,8 @@ Pipe StorageS3Distributed::read(
need_file_column = true;
}
std::cout << need_file_column << std::boolalpha << need_file_column << std::endl;
std::cout << need_path_column << std::boolalpha << need_path_column << std::endl;
std::cout << "metadata_snapshot->getSampleBlock().dumpStructure() " << metadata_snapshot->getSampleBlock().dumpStructure() << std::endl;
return Pipe(std::make_shared<StorageS3SequentialSource>(
context.getInitialQueryId(),
StorageS3SourceBuilder s3builder
{
need_path_column,
need_file_column,
format_name,
@ -273,24 +276,65 @@ Pipe StorageS3Distributed::read(
context,
metadata_snapshot->getColumns(),
max_block_size,
chooseCompressionMethod(client_auth.uri.key, compression_method),
client_auth
compression_method
};
return Pipe(std::make_shared<StorageS3SequentialSource>(
context.getInitialQueryId(),
/*initiator*/initiator,
cli_builder,
s3builder
));
}
Pipes pipes;
connections.reserve(cluster->getShardCount());
/// This part of code executes on initiator
std::cout << "StorageS3Distributed::read" << std::endl;
std::cout << "QueryProcessingStage " << processed_stage << std::endl;
String hash_of_address;
for (const auto & replicas : cluster->getShardsAddresses())
for (const auto & node : replicas)
if (node.is_local && node.port == context.getTCPPort())
{
hash_of_address = node.getHash();
break;
}
/// FIXME: better exception
if (hash_of_address.empty())
throw Exception(fmt::format("Could not find outself in cluster {}", ""), ErrorCodes::LOGICAL_ERROR);
auto remote_query_ast = query_info.query->clone();
auto table_expressions_from_whole_query = getTableExpressions(remote_query_ast->as<ASTSelectQuery &>());
String remote_query;
for (const auto & table_expression : table_expressions_from_whole_query)
{
const auto & table_function_ast = table_expression->table_function;
if (table_function_ast->getTreeHash() == tree_hash)
{
std::cout << table_function_ast->dumpTree() << std::endl;
auto & arguments = table_function_ast->children.at(0)->children;
auto & bucket = arguments[1]->as<ASTLiteral &>().value.safeGet<String>();
/// We rewrite query, and insert a port to connect as a first parameter
/// So, we write hash_of_address here as buckey name to find initiator node
/// in cluster from config on remote replica
bucket = hash_of_address;
remote_query = queryToString(remote_query_ast);
break;
}
}
if (remote_query.empty())
throw Exception("No table function", ErrorCodes::LOGICAL_ERROR);
Block header =
InterpreterSelectQuery(query_info.query, context, SelectQueryOptions(processed_stage).analyze()).getSampleBlock();
InterpreterSelectQuery(remote_query_ast, context, SelectQueryOptions(processed_stage).analyze()).getSampleBlock();
const Scalars & scalars = context.hasQueryContext() ? context.getQueryContext().getScalars() : Scalars{};
Pipes pipes;
connections.reserve(cluster->getShardCount());
for (const auto & replicas : cluster->getShardsAddresses()) {
/// There will be only one replica, because we consider each replica as a shard
for (const auto & node : replicas)
@ -306,13 +350,13 @@ Pipe StorageS3Distributed::read(
));
auto stream = std::make_shared<RemoteBlockInputStream>(
/*connection=*/*connections.back(),
/*query=*/queryToString(query_info.query),
/*query=*/remote_query,
/*header=*/header,
/*context=*/context,
nullptr,
scalars,
Tables(),
QueryProcessingStage::FetchColumns
processed_stage
);
pipes.emplace_back(std::make_shared<SourceFromInputStream>(std::move(stream)));
}
@ -322,11 +366,5 @@ Pipe StorageS3Distributed::read(
metadata_snapshot->check(column_names, getVirtuals(), getStorageID());
return Pipe::unitePipes(std::move(pipes));
}
}

View File

@ -20,6 +20,12 @@ namespace ErrorCodes
class Context;
struct ClientAuthentificationBuilder
{
String access_key_id;
String secret_access_key;
UInt64 max_connections;
};
class StorageS3Distributed : public ext::shared_ptr_helper<StorageS3Distributed>, public IStorage
{
@ -39,7 +45,8 @@ public:
protected:
StorageS3Distributed(
const S3::URI & uri_,
IAST::Hash tree_hash_,
const String & address_hash_or_filename_,
const String & access_key_id_,
const String & secret_access_key_,
const StorageID & table_id_,
@ -49,21 +56,19 @@ protected:
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
const Context & context_,
const String & compression_method_ = "");
const String & compression_method_);
private:
/// Connections from initiator to other nodes
std::vector<std::shared_ptr<Connection>> connections;
IAST::Hash tree_hash;
String address_hash_or_filename;
std::string cluster_name;
ClusterPtr cluster;
/// This will be used on non-initiator nodes.
std::optional<Cluster::Address> initiator;
std::shared_ptr<Connection> initiator_connection;
StorageS3::ClientAuthentificaiton client_auth;
String format_name;
String compression_method;
ClientAuthentificationBuilder cli_builder;
};

View File

@ -1,6 +1,7 @@
#include <thread>
#include <Common/config.h>
#include "DataStreams/RemoteBlockInputStream.h"
#include "Parsers/ASTExpressionList.h"
#include "Parsers/ASTFunction.h"
#include "Parsers/IAST_fwd.h"
#include "Processors/Sources/SourceFromInputStream.h"
@ -49,7 +50,7 @@ void TableFunctionS3Distributed::parseArguments(const ASTPtr & ast_function, con
arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context);
cluster_name = args[0]->as<ASTLiteral &>().value.safeGet<String>();
filename = args[1]->as<ASTLiteral &>().value.safeGet<String>();
filename_or_initiator_hash = args[1]->as<ASTLiteral &>().value.safeGet<String>();
if (args.size() < 5)
{
@ -78,38 +79,38 @@ StoragePtr TableFunctionS3Distributed::executeImpl(
const ASTPtr & ast_function, const Context & context,
const std::string & table_name, ColumnsDescription /*cached_columns*/) const
{
Poco::URI uri (filename);
S3::URI s3_uri (uri);
// UInt64 min_upload_part_size = context.getSettingsRef().s3_min_upload_part_size;
// UInt64 max_single_part_upload_size = context.getSettingsRef().s3_max_single_part_upload_size;
UInt64 max_connections = context.getSettingsRef().s3_max_connections;
StorageS3::ClientAuthentificaiton client_auth{s3_uri, access_key_id, secret_access_key, max_connections, {}, {}};
StorageS3::updateClientAndAuthSettings(context, client_auth);
auto lists = StorageS3::listFilesWithRegexpMatching(*client_auth.client, client_auth.uri);
Strings tasks;
tasks.reserve(lists.size());
for (auto & value : lists)
/// Initiator specific logic
while (context.getInitialQueryId() == context.getCurrentQueryId())
{
tasks.emplace_back(client_auth.uri.endpoint + '/' + client_auth.uri.bucket + '/' + value);
std::cout << tasks.back() << std::endl;
auto poco_uri = Poco::URI{filename_or_initiator_hash};
/// This is needed, because secondary query on local replica has the same query-id
if (poco_uri.getHost().empty() || poco_uri.getPort() == 0)
break;
S3::URI s3_uri(poco_uri);
StorageS3::ClientAuthentificaiton client_auth{s3_uri, access_key_id, secret_access_key, max_connections, {}, {}};
StorageS3::updateClientAndAuthSettings(context, client_auth);
auto lists = StorageS3::listFilesWithRegexpMatching(*client_auth.client, client_auth.uri);
Strings tasks;
tasks.reserve(lists.size());
for (auto & value : lists)
tasks.emplace_back(client_auth.uri.endpoint + '/' + client_auth.uri.bucket + '/' + value);
/// Register resolver, which will give other nodes a task to execute
TaskSupervisor::instance().registerNextTaskResolver(
std::make_unique<S3NextTaskResolver>(context.getCurrentQueryId(), std::move(tasks)));
break;
}
std::cout << "query_id " << context.getCurrentQueryId() << std::endl;
std::cout << ast_function->dumpTree() << std::endl;
auto * func = ast_function->as<ASTFunction>();
std::cout << func->arguments->dumpTree() << std::endl;
/// Register resolver, which will give other nodes a task to execute
TaskSupervisor::instance().registerNextTaskResolver(
std::make_unique<S3NextTaskResolver>(context.getCurrentQueryId(), std::move(tasks)));
StoragePtr storage = StorageS3Distributed::create(
s3_uri,
ast_function->getTreeHash(),
filename_or_initiator_hash,
access_key_id,
secret_access_key,
StorageID(getDatabaseName(), table_name),

View File

@ -37,7 +37,7 @@ protected:
void parseArguments(const ASTPtr & ast_function, const Context & context) override;
String cluster_name;
String filename;
String filename_or_initiator_hash;
String format;
String structure;
String access_key_id;