mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-22 15:42:02 +00:00
dbms: Server: queries with several replicas: development [#METR-14410]
This commit is contained in:
parent
60d4b1209c
commit
278cff6259
@ -6,11 +6,14 @@
|
|||||||
namespace DB
|
namespace DB
|
||||||
{
|
{
|
||||||
|
|
||||||
|
class PartsWithRangesSplitter;
|
||||||
|
|
||||||
/** Выполняет запросы SELECT на данных из merge-дерева.
|
/** Выполняет запросы SELECT на данных из merge-дерева.
|
||||||
*/
|
*/
|
||||||
class MergeTreeDataSelectExecutor
|
class MergeTreeDataSelectExecutor
|
||||||
{
|
{
|
||||||
|
friend class PartsWithRangesSplitter;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
MergeTreeDataSelectExecutor(MergeTreeData & data_);
|
MergeTreeDataSelectExecutor(MergeTreeData & data_);
|
||||||
|
|
||||||
|
61
dbms/include/DB/Storages/MergeTree/PartsWithRangesSplitter.h
Normal file
61
dbms/include/DB/Storages/MergeTree/PartsWithRangesSplitter.h
Normal file
@ -0,0 +1,61 @@
|
|||||||
|
#include <DB/Storages/MergeTree/MergeTreeDataSelectExecutor.h>
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
|
/// Разбиваем parts_with_ranges на n частей.
|
||||||
|
/// Следующие условия должны быть выполнены:
|
||||||
|
/// - 1 <= n <= settings.max_clusters_count;
|
||||||
|
/// - каждая из n частей имеет >= min_cluster_size записей.
|
||||||
|
/// 3 levels: cluster / part / range
|
||||||
|
|
||||||
|
class PartsWithRangesSplitter
|
||||||
|
{
|
||||||
|
public:
|
||||||
|
typedef MergeTreeDataSelectExecutor::RangesInDataParts Cluster;
|
||||||
|
|
||||||
|
public:
|
||||||
|
PartsWithRangesSplitter(const Cluster & input_cluster_,
|
||||||
|
size_t total_size_, size_t min_cluster_size_, size_t max_clusters_count_);
|
||||||
|
|
||||||
|
~PartsWithRangesSplitter() = default;
|
||||||
|
PartsWithRangesSplitter(const PartsWithRangesSplitter &) = delete;
|
||||||
|
PartsWithRangesSplitter & operator=(const PartsWithRangesSplitter &) = delete;
|
||||||
|
|
||||||
|
std::vector<Cluster> perform();
|
||||||
|
|
||||||
|
private:
|
||||||
|
void init();
|
||||||
|
bool emit();
|
||||||
|
bool updateCluster();
|
||||||
|
bool updateRange(bool add_part);
|
||||||
|
void addPart();
|
||||||
|
void initRangeInfo();
|
||||||
|
void initClusterInfo();
|
||||||
|
bool isRangeConsumed() const { return range_begin == range_end; }
|
||||||
|
bool isClusterConsumed() const { return cluster_begin == cluster_end; }
|
||||||
|
|
||||||
|
private:
|
||||||
|
// Input data.
|
||||||
|
const Cluster & input_cluster;
|
||||||
|
Cluster::const_iterator input_part;
|
||||||
|
std::vector<MarkRange>::const_iterator input_range;
|
||||||
|
|
||||||
|
// Output data.
|
||||||
|
std::vector<Cluster> output_clusters;
|
||||||
|
std::vector<Cluster>::iterator current_output_cluster;
|
||||||
|
MergeTreeDataSelectExecutor::RangesInDataPart * current_output_part;
|
||||||
|
|
||||||
|
size_t total_size;
|
||||||
|
size_t remaining_size;
|
||||||
|
size_t min_cluster_size;
|
||||||
|
size_t max_clusters_count;
|
||||||
|
size_t cluster_size;
|
||||||
|
|
||||||
|
size_t range_begin;
|
||||||
|
size_t range_end;
|
||||||
|
|
||||||
|
size_t cluster_begin;
|
||||||
|
size_t cluster_end;
|
||||||
|
};
|
||||||
|
}
|
@ -1,5 +1,6 @@
|
|||||||
#include <DB/Storages/MergeTree/MergeTreeDataSelectExecutor.h>
|
#include <DB/Storages/MergeTree/MergeTreeDataSelectExecutor.h>
|
||||||
#include <DB/Storages/MergeTree/MergeTreeWhereOptimizer.h>
|
#include <DB/Storages/MergeTree/MergeTreeWhereOptimizer.h>
|
||||||
|
#include <DB/Storages/MergeTree/PartsWithRangesSplitter.h>
|
||||||
#include <DB/Interpreters/ExpressionAnalyzer.h>
|
#include <DB/Interpreters/ExpressionAnalyzer.h>
|
||||||
#include <DB/Parsers/ASTIdentifier.h>
|
#include <DB/Parsers/ASTIdentifier.h>
|
||||||
#include <DB/DataStreams/ExpressionBlockInputStream.h>
|
#include <DB/DataStreams/ExpressionBlockInputStream.h>
|
||||||
@ -226,6 +227,18 @@ BlockInputStreams MergeTreeDataSelectExecutor::read(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (settings.parallel_replicas_count > 0)
|
||||||
|
{
|
||||||
|
PartsWithRangesSplitter splitter(parts_with_ranges, sum_marks, data.settings.min_rows_for_seek,
|
||||||
|
settings.parallel_replicas_count);
|
||||||
|
auto per_replica_parts_with_ranges = splitter.perform();
|
||||||
|
|
||||||
|
/// Для каждого элемента per_replica_parts_with_ranges[k], вычисляем хэш от RangesInDataParts
|
||||||
|
/// Сортируем per_replica_parts_with_ranges по хэшу
|
||||||
|
/// Выбираем per_replica_parts_with_ranges[settings.parallel_replica_offset]
|
||||||
|
/// Если settings.parallel_replica_offset > (n - 1), то ничего не делаем.
|
||||||
|
}
|
||||||
|
|
||||||
LOG_DEBUG(log, "Selected " << parts.size() << " parts by date, " << parts_with_ranges.size() << " parts by key, "
|
LOG_DEBUG(log, "Selected " << parts.size() << " parts by date, " << parts_with_ranges.size() << " parts by key, "
|
||||||
<< sum_marks << " marks to read from " << sum_ranges << " ranges");
|
<< sum_marks << " marks to read from " << sum_ranges << " ranges");
|
||||||
|
|
||||||
|
118
dbms/src/Storages/MergeTree/PartsWithRangesSplitter.cpp
Normal file
118
dbms/src/Storages/MergeTree/PartsWithRangesSplitter.cpp
Normal file
@ -0,0 +1,118 @@
|
|||||||
|
#include <DB/Storages/MergeTree/PartsWithRangesSplitter.h>
|
||||||
|
|
||||||
|
namespace DB
|
||||||
|
{
|
||||||
|
|
||||||
|
PartsWithRangesSplitter::PartsWithRangesSplitter(const Cluster & input_cluster_,
|
||||||
|
size_t total_size_, size_t min_cluster_size_, size_t max_clusters_count_)
|
||||||
|
: input_cluster(input_cluster_),
|
||||||
|
total_size(total_size_),
|
||||||
|
remaining_size(total_size_),
|
||||||
|
min_cluster_size(min_cluster_size_),
|
||||||
|
max_clusters_count(max_clusters_count_)
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
|
std::vector<PartsWithRangesSplitter::Cluster> PartsWithRangesSplitter::perform()
|
||||||
|
{
|
||||||
|
init();
|
||||||
|
while (emit()) {}
|
||||||
|
return output_clusters;
|
||||||
|
}
|
||||||
|
|
||||||
|
void PartsWithRangesSplitter::init()
|
||||||
|
{
|
||||||
|
size_t clusters_count = max_clusters_count;
|
||||||
|
while ((clusters_count > 0) && (total_size < (min_cluster_size * clusters_count)))
|
||||||
|
--clusters_count;
|
||||||
|
|
||||||
|
cluster_size = total_size / clusters_count;
|
||||||
|
|
||||||
|
output_clusters.resize(clusters_count);
|
||||||
|
|
||||||
|
// Initialize range reader.
|
||||||
|
input_part = input_cluster.begin();
|
||||||
|
input_range = input_part->ranges.begin();
|
||||||
|
initRangeInfo();
|
||||||
|
|
||||||
|
// Initialize output writer.
|
||||||
|
current_output_cluster = output_clusters.begin();
|
||||||
|
addPart();
|
||||||
|
initClusterInfo();
|
||||||
|
}
|
||||||
|
|
||||||
|
bool PartsWithRangesSplitter::emit()
|
||||||
|
{
|
||||||
|
size_t new_size = std::min(range_end - range_begin, cluster_end - cluster_begin);
|
||||||
|
current_output_part->ranges.push_back(MarkRange(range_begin, range_begin + new_size));
|
||||||
|
|
||||||
|
range_begin += new_size;
|
||||||
|
cluster_begin += new_size;
|
||||||
|
|
||||||
|
if (isClusterConsumed())
|
||||||
|
return updateCluster();
|
||||||
|
else if (isRangeConsumed())
|
||||||
|
return updateRange(true);
|
||||||
|
else
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool PartsWithRangesSplitter::updateCluster()
|
||||||
|
{
|
||||||
|
++current_output_cluster;
|
||||||
|
if (current_output_cluster == output_clusters.end())
|
||||||
|
return false;
|
||||||
|
|
||||||
|
if (isRangeConsumed())
|
||||||
|
if (!updateRange(false))
|
||||||
|
return false;
|
||||||
|
|
||||||
|
addPart();
|
||||||
|
initClusterInfo();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool PartsWithRangesSplitter::updateRange(bool add_part)
|
||||||
|
{
|
||||||
|
++input_range;
|
||||||
|
if (input_range == input_part->ranges.end())
|
||||||
|
{
|
||||||
|
++input_part;
|
||||||
|
if (input_part == input_cluster.end())
|
||||||
|
return false;
|
||||||
|
|
||||||
|
input_range = input_part->ranges.begin();
|
||||||
|
|
||||||
|
if (add_part)
|
||||||
|
addPart();
|
||||||
|
}
|
||||||
|
|
||||||
|
initRangeInfo();
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
void PartsWithRangesSplitter::addPart()
|
||||||
|
{
|
||||||
|
MergeTreeDataSelectExecutor::RangesInDataPart new_part;
|
||||||
|
new_part.data_part = input_part->data_part;
|
||||||
|
current_output_cluster->push_back(new_part);
|
||||||
|
current_output_part = &(current_output_cluster->back());
|
||||||
|
}
|
||||||
|
|
||||||
|
void PartsWithRangesSplitter::initRangeInfo()
|
||||||
|
{
|
||||||
|
range_begin = 0;
|
||||||
|
range_end = input_range->end - input_range->begin;
|
||||||
|
}
|
||||||
|
|
||||||
|
void PartsWithRangesSplitter::initClusterInfo()
|
||||||
|
{
|
||||||
|
cluster_begin = 0;
|
||||||
|
cluster_end = cluster_size;
|
||||||
|
|
||||||
|
remaining_size -= cluster_size;
|
||||||
|
if (remaining_size < cluster_size)
|
||||||
|
cluster_end += remaining_size;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user