mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-23 10:10:50 +00:00
dbms: Server: queries with several replicas: development [#METR-14410]
This commit is contained in:
parent
b419f838d3
commit
046122cbef
@ -11,7 +11,7 @@ namespace DB
|
||||
class ShardReplicas final
|
||||
{
|
||||
public:
|
||||
ShardReplicas(std::vector<ConnectionPool::Entry> & entries, Settings * settings_);
|
||||
ShardReplicas(std::vector<ConnectionPool::Entry> & entries, const Settings & settings_);
|
||||
|
||||
~ShardReplicas() = default;
|
||||
|
||||
@ -22,8 +22,8 @@ namespace DB
|
||||
Connection::Packet receivePacket();
|
||||
|
||||
/// Отправить запрос ко всем репликам.
|
||||
void sendQuery(const String & query, const String & query_id = "", UInt64 stage = QueryProcessingStage::Complete,
|
||||
const Settings * settings_ = nullptr, bool with_pending_data = false);
|
||||
void sendQuery(const String & query, const String & query_id = "",
|
||||
UInt64 stage = QueryProcessingStage::Complete, bool with_pending_data = false);
|
||||
|
||||
/// Разорвать соединения ко всем репликам
|
||||
void disconnect();
|
||||
@ -76,7 +76,7 @@ namespace DB
|
||||
int waitForReadEvent();
|
||||
|
||||
private:
|
||||
Settings * settings;
|
||||
const Settings & settings;
|
||||
|
||||
ReplicaHash replica_hash;
|
||||
size_t valid_replicas_count;
|
||||
|
@ -274,6 +274,7 @@ namespace ErrorCodes
|
||||
NO_AVAILABLE_REPLICA,
|
||||
UNEXPECTED_REPLICA,
|
||||
MISMATCH_REPLICAS_DATA_SOURCES,
|
||||
STORAGE_DOESNT_SUPPORT_PARALLEL_REPLICAS,
|
||||
|
||||
POCO_EXCEPTION = 1000,
|
||||
STD_EXCEPTION,
|
||||
|
@ -158,7 +158,7 @@ protected:
|
||||
{
|
||||
auto entries = pool->getMany(&settings);
|
||||
if (entries.size() > 1)
|
||||
shard_replicas.reset(new ShardReplicas(entries, &settings));
|
||||
shard_replicas.reset(new ShardReplicas(entries, settings));
|
||||
else if (entries.size() == 1)
|
||||
{
|
||||
use_many_replicas = false;
|
||||
@ -176,7 +176,7 @@ protected:
|
||||
}
|
||||
|
||||
if (use_many_replicas)
|
||||
shard_replicas->sendQuery(query, "", stage, &settings, true);
|
||||
shard_replicas->sendQuery(query, "", stage, true);
|
||||
else
|
||||
connection->sendQuery(query, "", stage, send_settings ? &settings : nullptr, true);
|
||||
|
||||
|
@ -68,6 +68,10 @@ public:
|
||||
*/
|
||||
virtual bool supportsPrewhere() const { return false; }
|
||||
|
||||
/** Возвращает true, если хранилище поддерживает несколько реплик.
|
||||
*/
|
||||
virtual bool supportsParallelReplicas() const { return false; }
|
||||
|
||||
/** Не дает изменять описание таблицы (в том числе переименовывать и удалять таблицу).
|
||||
* Если в течение какой-то операции структура таблицы должна оставаться неизменной, нужно держать такой лок на все ее время.
|
||||
* Например, нужно держать такой лок на время всего запроса SELECT или INSERT и на все время слияния набора кусков
|
||||
|
@ -33,6 +33,8 @@ public:
|
||||
std::string getName() const override { return "ChunkMerger"; }
|
||||
std::string getTableName() const override { return name; }
|
||||
|
||||
bool supportsParallelReplicas() const override { return true; }
|
||||
|
||||
const NamesAndTypesList & getColumnsListImpl() const override { return *columns; }
|
||||
NameAndTypePair getColumn(const String & column_name) const override;
|
||||
bool hasColumn(const String & column_name) const override;
|
||||
|
@ -22,6 +22,8 @@ public:
|
||||
NameAndTypePair getColumn(const String & column_name) const override { return getSource().getColumn(column_name); };
|
||||
bool hasColumn(const String & column_name) const override { return getSource().hasColumn(column_name); };
|
||||
|
||||
bool supportsParallelReplicas() const override { return true; }
|
||||
|
||||
BlockInputStreams read(
|
||||
const Names & column_names,
|
||||
ASTPtr query,
|
||||
|
@ -48,6 +48,7 @@ public:
|
||||
bool supportsSampling() const override { return true; }
|
||||
bool supportsFinal() const override { return true; }
|
||||
bool supportsPrewhere() const override { return true; }
|
||||
bool supportsParallelReplicas() const override { return true; }
|
||||
|
||||
const NamesAndTypesList & getColumnsListImpl() const override { return *columns; }
|
||||
NameAndTypePair getColumn(const String & column_name) const override;
|
||||
|
@ -42,6 +42,8 @@ public:
|
||||
/// Проверка откладывается до метода read. Там проверяется поддержка PREWHERE у использующихся таблиц.
|
||||
bool supportsPrewhere() const override { return true; }
|
||||
|
||||
bool supportsParallelReplicas() const override { return true; }
|
||||
|
||||
const NamesAndTypesList & getColumnsListImpl() const override { return *columns; }
|
||||
NameAndTypePair getColumn(const String &column_name) const override;
|
||||
bool hasColumn(const String &column_name) const override;
|
||||
|
@ -56,6 +56,7 @@ public:
|
||||
bool supportsSampling() const override { return data.supportsSampling(); }
|
||||
bool supportsFinal() const override { return data.supportsFinal(); }
|
||||
bool supportsPrewhere() const override { return data.supportsPrewhere(); }
|
||||
bool supportsParallelReplicas() const override { return true; }
|
||||
|
||||
const NamesAndTypesList & getColumnsListImpl() const override { return data.getColumnsListNonMaterialized(); }
|
||||
|
||||
|
@ -30,6 +30,8 @@ public:
|
||||
bool supportsSampling() const override { return true; }
|
||||
bool supportsFinal() const override { return true; }
|
||||
|
||||
bool supportsParallelReplicas() const override { return true; }
|
||||
|
||||
BlockInputStreams read(
|
||||
const Names & column_names,
|
||||
ASTPtr query,
|
||||
|
@ -2,7 +2,7 @@
|
||||
|
||||
namespace DB
|
||||
{
|
||||
ShardReplicas::ShardReplicas(std::vector<ConnectionPool::Entry> & entries, Settings * settings_) :
|
||||
ShardReplicas::ShardReplicas(std::vector<ConnectionPool::Entry> & entries, const Settings & settings_) :
|
||||
settings(settings_)
|
||||
{
|
||||
valid_replicas_count = entries.size();
|
||||
@ -34,7 +34,7 @@ namespace DB
|
||||
read_list.push_back(replica.connection->socket);
|
||||
}
|
||||
|
||||
int n = Poco::Net::Socket::select(read_list, write_list, except_list, settings->poll_interval * 1000000);
|
||||
int n = Poco::Net::Socket::select(read_list, write_list, except_list, settings.poll_interval * 1000000);
|
||||
|
||||
for (const auto & socket : read_list)
|
||||
{
|
||||
@ -129,13 +129,18 @@ namespace DB
|
||||
}
|
||||
}
|
||||
|
||||
void ShardReplicas::sendQuery(const String & query, const String & query_id, UInt64 stage,
|
||||
const Settings * settings_, bool with_pending_data)
|
||||
void ShardReplicas::sendQuery(const String & query, const String & query_id, UInt64 stage, bool with_pending_data)
|
||||
{
|
||||
Settings query_settings = settings;
|
||||
query_settings.parallel_replicas_count = replica_hash.size();
|
||||
UInt64 offset = 0;
|
||||
|
||||
for (auto & e : replica_hash)
|
||||
{
|
||||
Connection * connection = e.second.connection;
|
||||
connection->sendQuery(query, query_id, stage, settings_, with_pending_data);
|
||||
connection->sendQuery(query, query_id, stage, &query_settings, with_pending_data);
|
||||
query_settings.parallel_replica_offset = offset;
|
||||
++offset;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -98,6 +98,11 @@ void InterpreterSelectQuery::basicInit(BlockInputStreamPtr input_, const NamesAn
|
||||
storage = context.getTable(database_name, table_name);
|
||||
}
|
||||
|
||||
if (!storage->supportsParallelReplicas() && (settings.parallel_replicas_count > 0))
|
||||
throw Exception("Storage engine " + storage->getName()
|
||||
+ " does not support parallel execution on several replicas",
|
||||
ErrorCodes::STORAGE_DOESNT_SUPPORT_PARALLEL_REPLICAS);
|
||||
|
||||
table_lock = storage->lockStructure(false);
|
||||
if (table_column_names.empty())
|
||||
context.setColumns(storage->getColumnsListNonMaterialized());
|
||||
|
Loading…
Reference in New Issue
Block a user