mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-23 10:10:50 +00:00
dbms: Server: queries with several replicas: development [#METR-14410]
This commit is contained in:
parent
ec2c7389a5
commit
cfbc15c0b4
@ -44,6 +44,10 @@ namespace DB
|
||||
|
||||
std::string dumpAddresses() const;
|
||||
|
||||
size_t size() const;
|
||||
|
||||
void sendExternalTablesData(std::vector<ExternalTablesData> & data);
|
||||
|
||||
private:
|
||||
using ConnectionHash = std::unordered_map<int, ConnectionInfo>;
|
||||
|
||||
|
@ -274,6 +274,7 @@ namespace ErrorCodes
|
||||
INCOMPATIBLE_TYPE_OF_JOIN,
|
||||
NO_AVAILABLE_REPLICA,
|
||||
UNEXPECTED_REPLICA,
|
||||
MISMATCH_REPLICAS_DATA_SOURCES,
|
||||
|
||||
POCO_EXCEPTION = 1000,
|
||||
STD_EXCEPTION,
|
||||
|
@ -122,25 +122,32 @@ protected:
|
||||
/// Отправить на удаленные сервера все временные таблицы
|
||||
void sendExternalTables()
|
||||
{
|
||||
ExternalTablesData res;
|
||||
for (const auto & table : external_tables)
|
||||
size_t count = use_many_replicas ? replicas_connections->size() : 1;
|
||||
|
||||
std::vector<ExternalTablesData> instances;
|
||||
instances.reserve(count);
|
||||
|
||||
for (size_t i = 0; i < count; ++i)
|
||||
{
|
||||
StoragePtr cur = table.second;
|
||||
QueryProcessingStage::Enum stage = QueryProcessingStage::Complete;
|
||||
DB::BlockInputStreams input = cur->read(cur->getColumnNamesList(), ASTPtr(), context, settings,
|
||||
stage, DEFAULT_BLOCK_SIZE, 1);
|
||||
if (input.size() == 0)
|
||||
res.push_back(std::make_pair(new OneBlockInputStream(cur->getSampleBlock()), table.first));
|
||||
else
|
||||
res.push_back(std::make_pair(input[0], table.first));
|
||||
ExternalTablesData res;
|
||||
for (const auto & table : external_tables)
|
||||
{
|
||||
StoragePtr cur = table.second;
|
||||
QueryProcessingStage::Enum stage = QueryProcessingStage::Complete;
|
||||
DB::BlockInputStreams input = cur->read(cur->getColumnNamesList(), ASTPtr(), context, settings,
|
||||
stage, DEFAULT_BLOCK_SIZE, 1);
|
||||
if (input.size() == 0)
|
||||
res.push_back(std::make_pair(new OneBlockInputStream(cur->getSampleBlock()), table.first));
|
||||
else
|
||||
res.push_back(std::make_pair(input[0], table.first));
|
||||
}
|
||||
instances.push_back(std::move(res));
|
||||
}
|
||||
|
||||
if (use_many_replicas)
|
||||
{
|
||||
/// XXX Отправить res по всем соединениям.
|
||||
//replicas_connections->sendExternalTablesData(res);
|
||||
}
|
||||
replicas_connections->sendExternalTablesData(instances);
|
||||
else
|
||||
connection->sendExternalTablesData(res);
|
||||
connection->sendExternalTablesData(instances[0]);
|
||||
}
|
||||
|
||||
Block readImpl() override
|
||||
|
@ -236,4 +236,23 @@ namespace DB
|
||||
|
||||
return os.str();
|
||||
}
|
||||
|
||||
size_t ReplicasConnections::size() const
|
||||
{
|
||||
return connection_hash.size();
|
||||
}
|
||||
|
||||
void ReplicasConnections::sendExternalTablesData(std::vector<ExternalTablesData> & data)
|
||||
{
|
||||
if (data.size() != connection_hash.size())
|
||||
throw Exception("Mismatch between replicas and data sources", ErrorCodes::MISMATCH_REPLICAS_DATA_SOURCES);
|
||||
|
||||
auto it = data.begin();
|
||||
for (auto & e : connection_hash)
|
||||
{
|
||||
Connection * connection = e.second.connection;
|
||||
connection->sendExternalTablesData(*it);
|
||||
++it;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user