dbms: Server: queries with several replicas: development [#METR-14410]

This commit is contained in:
Alexey Arno 2015-01-16 18:22:12 +03:00
parent 0ba5c640b0
commit e86abf0311
12 changed files with 36 additions and 11 deletions

View File

@ -11,7 +11,7 @@ namespace DB
class ShardReplicas final
{
public:
ShardReplicas(std::vector<ConnectionPool::Entry> & entries, Settings * settings_);
ShardReplicas(std::vector<ConnectionPool::Entry> & entries, const Settings & settings_);
~ShardReplicas() = default;
@ -22,8 +22,8 @@ namespace DB
Connection::Packet receivePacket();
/// Отправить запрос ко всем репликам.
void sendQuery(const String & query, const String & query_id = "", UInt64 stage = QueryProcessingStage::Complete,
const Settings * settings_ = nullptr, bool with_pending_data = false);
void sendQuery(const String & query, const String & query_id = "",
UInt64 stage = QueryProcessingStage::Complete, bool with_pending_data = false);
/// Разорвать соединения ко всем репликам
void disconnect();
@ -76,7 +76,7 @@ namespace DB
int waitForReadEvent();
private:
Settings * settings;
const Settings & settings;
ReplicaHash replica_hash;
size_t valid_replicas_count;

View File

@ -275,6 +275,7 @@ namespace ErrorCodes
NO_AVAILABLE_REPLICA,
UNEXPECTED_REPLICA,
MISMATCH_REPLICAS_DATA_SOURCES,
STORAGE_DOESNT_SUPPORT_PARALLEL_REPLICAS,
POCO_EXCEPTION = 1000,
STD_EXCEPTION,

View File

@ -158,7 +158,7 @@ protected:
{
auto entries = pool->getMany(&settings);
if (entries.size() > 1)
shard_replicas.reset(new ShardReplicas(entries, &settings));
shard_replicas.reset(new ShardReplicas(entries, settings));
else if (entries.size() == 1)
{
use_many_replicas = false;
@ -176,7 +176,7 @@ protected:
}
if (use_many_replicas)
shard_replicas->sendQuery(query, "", stage, &settings, true);
shard_replicas->sendQuery(query, "", stage, true);
else
connection->sendQuery(query, "", stage, send_settings ? &settings : nullptr, true);

View File

@ -68,6 +68,10 @@ public:
*/
virtual bool supportsPrewhere() const { return false; }
/** Возвращает true, если хранилище поддерживает несколько реплик.
*/
virtual bool supportsParallelReplicas() const { return false; }
/** Не дает изменять описание таблицы (в том числе переименовывать и удалять таблицу).
* Если в течение какой-то операции структура таблицы должна оставаться неизменной, нужно держать такой лок на все ее время.
* Например, нужно держать такой лок на время всего запроса SELECT или INSERT и на все время слияния набора кусков

View File

@ -33,6 +33,8 @@ public:
std::string getName() const override { return "ChunkMerger"; }
std::string getTableName() const override { return name; }
bool supportsParallelReplicas() const override { return true; }
const NamesAndTypesList & getColumnsListImpl() const override { return *columns; }
NameAndTypePair getColumn(const String & column_name) const override;
bool hasColumn(const String & column_name) const override;

View File

@ -22,6 +22,8 @@ public:
NameAndTypePair getColumn(const String & column_name) const override { return getSource().getColumn(column_name); };
bool hasColumn(const String & column_name) const override { return getSource().hasColumn(column_name); };
bool supportsParallelReplicas() const override { return true; }
BlockInputStreams read(
const Names & column_names,
ASTPtr query,

View File

@ -48,6 +48,7 @@ public:
bool supportsSampling() const override { return true; }
bool supportsFinal() const override { return true; }
bool supportsPrewhere() const override { return true; }
bool supportsParallelReplicas() const override { return true; }
const NamesAndTypesList & getColumnsListImpl() const override { return *columns; }
NameAndTypePair getColumn(const String & column_name) const override;

View File

@ -42,6 +42,8 @@ public:
/// Проверка откладывается до метода read. Там проверяется поддержка PREWHERE у использующихся таблиц.
bool supportsPrewhere() const override { return true; }
bool supportsParallelReplicas() const override { return true; }
const NamesAndTypesList & getColumnsListImpl() const override { return *columns; }
NameAndTypePair getColumn(const String &column_name) const override;
bool hasColumn(const String &column_name) const override;

View File

@ -56,6 +56,7 @@ public:
bool supportsSampling() const override { return data.supportsSampling(); }
bool supportsFinal() const override { return data.supportsFinal(); }
bool supportsPrewhere() const override { return data.supportsPrewhere(); }
bool supportsParallelReplicas() const override { return true; }
const NamesAndTypesList & getColumnsListImpl() const override { return data.getColumnsListNonMaterialized(); }

View File

@ -30,6 +30,8 @@ public:
bool supportsSampling() const override { return true; }
bool supportsFinal() const override { return true; }
bool supportsParallelReplicas() const override { return true; }
BlockInputStreams read(
const Names & column_names,
ASTPtr query,

View File

@ -2,7 +2,7 @@
namespace DB
{
ShardReplicas::ShardReplicas(std::vector<ConnectionPool::Entry> & entries, Settings * settings_) :
ShardReplicas::ShardReplicas(std::vector<ConnectionPool::Entry> & entries, const Settings & settings_) :
settings(settings_)
{
valid_replicas_count = entries.size();
@ -34,7 +34,7 @@ namespace DB
read_list.push_back(replica.connection->socket);
}
int n = Poco::Net::Socket::select(read_list, write_list, except_list, settings->poll_interval * 1000000);
int n = Poco::Net::Socket::select(read_list, write_list, except_list, settings.poll_interval * 1000000);
for (const auto & socket : read_list)
{
@ -129,13 +129,18 @@ namespace DB
}
}
void ShardReplicas::sendQuery(const String & query, const String & query_id, UInt64 stage,
const Settings * settings_, bool with_pending_data)
void ShardReplicas::sendQuery(const String & query, const String & query_id, UInt64 stage, bool with_pending_data)
{
Settings query_settings = settings;
query_settings.parallel_replicas_count = replica_hash.size();
UInt64 offset = 0;
for (auto & e : replica_hash)
{
Connection * connection = e.second.connection;
connection->sendQuery(query, query_id, stage, settings_, with_pending_data);
connection->sendQuery(query, query_id, stage, &query_settings, with_pending_data);
query_settings.parallel_replica_offset = offset;
++offset;
}
}

View File

@ -99,6 +99,11 @@ void InterpreterSelectQuery::basicInit(BlockInputStreamPtr input_, const NamesAn
storage = context.getTable(database_name, table_name);
}
if (!storage->supportsParallelReplicas() && (settings.parallel_replicas_count > 0))
throw Exception("Storage engine " + storage->getName()
+ " does not support parallel execution on several replicas",
ErrorCodes::STORAGE_DOESNT_SUPPORT_PARALLEL_REPLICAS);
table_lock = storage->lockStructure(false);
if (table_column_names.empty())
context.setColumns(storage->getColumnsListNonMaterialized());