mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-24 08:32:02 +00:00
dbms: Server: queries with several replicas: development [#METR-14410]
This commit is contained in:
parent
2201c073a8
commit
a5dec52f9a
@ -6,14 +6,11 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
class PartsWithRangesSplitter;
|
||||
|
||||
/** Выполняет запросы SELECT на данных из merge-дерева.
|
||||
*/
|
||||
class MergeTreeDataSelectExecutor
|
||||
{
|
||||
friend class PartsWithRangesSplitter;
|
||||
|
||||
public:
|
||||
MergeTreeDataSelectExecutor(MergeTreeData & data_);
|
||||
|
||||
@ -30,11 +27,7 @@ public:
|
||||
unsigned threads = 1,
|
||||
size_t * inout_part_index = nullptr);
|
||||
|
||||
private:
|
||||
MergeTreeData & data;
|
||||
|
||||
Logger * log;
|
||||
|
||||
public:
|
||||
struct RangesInDataPart
|
||||
{
|
||||
MergeTreeData::DataPartPtr data_part;
|
||||
@ -49,10 +42,13 @@ private:
|
||||
}
|
||||
};
|
||||
|
||||
public:
|
||||
typedef std::vector<RangesInDataPart> RangesInDataParts;
|
||||
|
||||
private:
|
||||
MergeTreeData & data;
|
||||
|
||||
Logger * log;
|
||||
|
||||
size_t min_marks_for_seek;
|
||||
size_t min_marks_for_concurrent_read;
|
||||
size_t max_marks_to_use_cache;
|
||||
|
@ -1,61 +1,63 @@
|
||||
#pragma once
|
||||
|
||||
#include <DB/Storages/MergeTree/MergeTreeDataSelectExecutor.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/// Разбиваем parts_with_ranges на n частей.
|
||||
/// Следующие условия должны быть выполнены:
|
||||
/// - 1 <= n <= settings.max_clusters_count;
|
||||
/// - каждая из n частей имеет >= min_cluster_size записей.
|
||||
/// 3 levels: cluster / part / range
|
||||
/// Разбиваем parts_with_ranges на n частей.
|
||||
/// Следующие условия должны быть выполнены:
|
||||
/// - 1 <= n <= settings.max_clusters_count;
|
||||
/// - каждая из n частей имеет >= min_cluster_size записей.
|
||||
/// 3 levels: cluster / part / range
|
||||
class PartsWithRangesSplitter
|
||||
{
|
||||
public:
|
||||
typedef MergeTreeDataSelectExecutor::RangesInDataParts Cluster;
|
||||
|
||||
class PartsWithRangesSplitter
|
||||
{
|
||||
public:
|
||||
typedef MergeTreeDataSelectExecutor::RangesInDataParts Cluster;
|
||||
public:
|
||||
PartsWithRangesSplitter(const Cluster & input_cluster_,
|
||||
size_t total_size_, size_t min_cluster_size_, size_t max_clusters_count_);
|
||||
|
||||
public:
|
||||
PartsWithRangesSplitter(const Cluster & input_cluster_,
|
||||
size_t total_size_, size_t min_cluster_size_, size_t max_clusters_count_);
|
||||
~PartsWithRangesSplitter() = default;
|
||||
PartsWithRangesSplitter(const PartsWithRangesSplitter &) = delete;
|
||||
PartsWithRangesSplitter & operator=(const PartsWithRangesSplitter &) = delete;
|
||||
|
||||
~PartsWithRangesSplitter() = default;
|
||||
PartsWithRangesSplitter(const PartsWithRangesSplitter &) = delete;
|
||||
PartsWithRangesSplitter & operator=(const PartsWithRangesSplitter &) = delete;
|
||||
std::vector<Cluster> perform();
|
||||
|
||||
std::vector<Cluster> perform();
|
||||
private:
|
||||
void init();
|
||||
bool emit();
|
||||
bool updateCluster();
|
||||
bool updateRange(bool add_part);
|
||||
void addPart();
|
||||
void initRangeInfo();
|
||||
void initClusterInfo();
|
||||
bool isRangeConsumed() const { return range_begin == range_end; }
|
||||
bool isClusterConsumed() const { return cluster_begin == cluster_end; }
|
||||
|
||||
private:
|
||||
void init();
|
||||
bool emit();
|
||||
bool updateCluster();
|
||||
bool updateRange(bool add_part);
|
||||
void addPart();
|
||||
void initRangeInfo();
|
||||
void initClusterInfo();
|
||||
bool isRangeConsumed() const { return range_begin == range_end; }
|
||||
bool isClusterConsumed() const { return cluster_begin == cluster_end; }
|
||||
private:
|
||||
// Input data.
|
||||
const Cluster & input_cluster;
|
||||
Cluster::const_iterator input_part;
|
||||
std::vector<MarkRange>::const_iterator input_range;
|
||||
|
||||
private:
|
||||
// Input data.
|
||||
const Cluster & input_cluster;
|
||||
Cluster::const_iterator input_part;
|
||||
std::vector<MarkRange>::const_iterator input_range;
|
||||
// Output data.
|
||||
std::vector<Cluster> output_clusters;
|
||||
std::vector<Cluster>::iterator current_output_cluster;
|
||||
MergeTreeDataSelectExecutor::RangesInDataPart * current_output_part;
|
||||
|
||||
// Output data.
|
||||
std::vector<Cluster> output_clusters;
|
||||
std::vector<Cluster>::iterator current_output_cluster;
|
||||
MergeTreeDataSelectExecutor::RangesInDataPart * current_output_part;
|
||||
size_t total_size;
|
||||
size_t remaining_size;
|
||||
size_t min_cluster_size;
|
||||
size_t max_clusters_count;
|
||||
size_t cluster_size;
|
||||
|
||||
size_t total_size;
|
||||
size_t remaining_size;
|
||||
size_t min_cluster_size;
|
||||
size_t max_clusters_count;
|
||||
size_t cluster_size;
|
||||
size_t range_begin;
|
||||
size_t range_end;
|
||||
|
||||
size_t range_begin;
|
||||
size_t range_end;
|
||||
size_t cluster_begin;
|
||||
size_t cluster_end;
|
||||
};
|
||||
|
||||
size_t cluster_begin;
|
||||
size_t cluster_end;
|
||||
};
|
||||
}
|
||||
}
|
||||
|
@ -14,27 +14,29 @@
|
||||
|
||||
namespace
|
||||
{
|
||||
std::pair<UInt64, UInt64> computeHash(const DB::MergeTreeDataSelectExecutor::RangesInDataParts & cluster)
|
||||
|
||||
std::pair<UInt64, UInt64> computeHash(const DB::MergeTreeDataSelectExecutor::RangesInDataParts & cluster)
|
||||
{
|
||||
SipHash hash;
|
||||
for (const auto & part_with_ranges : cluster)
|
||||
{
|
||||
SipHash hash;
|
||||
for (const auto & part_with_ranges : cluster)
|
||||
const auto & part = *(part_with_ranges.data_part);
|
||||
hash.update(part.name.c_str(), part.name.length());
|
||||
const auto & ranges = part_with_ranges.ranges;
|
||||
for (const auto & range : ranges)
|
||||
{
|
||||
const auto & part = *(part_with_ranges.data_part);
|
||||
hash.update(part.name.c_str(), part.name.length());
|
||||
const auto & ranges = part_with_ranges.ranges;
|
||||
for (const auto & range : ranges)
|
||||
{
|
||||
hash.update(reinterpret_cast<const char *>(&range.begin), sizeof(range.begin));
|
||||
hash.update(reinterpret_cast<const char *>(&range.end), sizeof(range.end));
|
||||
}
|
||||
hash.update(reinterpret_cast<const char *>(&range.begin), sizeof(range.begin));
|
||||
hash.update(reinterpret_cast<const char *>(&range.end), sizeof(range.end));
|
||||
}
|
||||
|
||||
UInt64 lo;
|
||||
UInt64 hi;
|
||||
|
||||
hash.get128(lo, hi);
|
||||
return std::make_pair(lo, hi);
|
||||
}
|
||||
|
||||
UInt64 lo;
|
||||
UInt64 hi;
|
||||
|
||||
hash.get128(lo, hi);
|
||||
return std::make_pair(lo, hi);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
namespace DB
|
||||
|
@ -3,116 +3,116 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
PartsWithRangesSplitter::PartsWithRangesSplitter(const Cluster & input_cluster_,
|
||||
size_t total_size_, size_t min_cluster_size_, size_t max_clusters_count_)
|
||||
: input_cluster(input_cluster_),
|
||||
total_size(total_size_),
|
||||
remaining_size(total_size_),
|
||||
min_cluster_size(min_cluster_size_),
|
||||
max_clusters_count(max_clusters_count_)
|
||||
PartsWithRangesSplitter::PartsWithRangesSplitter(const Cluster & input_cluster_,
|
||||
size_t total_size_, size_t min_cluster_size_, size_t max_clusters_count_)
|
||||
: input_cluster(input_cluster_),
|
||||
total_size(total_size_),
|
||||
remaining_size(total_size_),
|
||||
min_cluster_size(min_cluster_size_),
|
||||
max_clusters_count(max_clusters_count_)
|
||||
{
|
||||
}
|
||||
|
||||
std::vector<PartsWithRangesSplitter::Cluster> PartsWithRangesSplitter::perform()
|
||||
{
|
||||
init();
|
||||
while (emit()) {}
|
||||
return output_clusters;
|
||||
}
|
||||
|
||||
void PartsWithRangesSplitter::init()
|
||||
{
|
||||
size_t clusters_count = max_clusters_count;
|
||||
while ((clusters_count > 0) && (total_size < (min_cluster_size * clusters_count)))
|
||||
--clusters_count;
|
||||
|
||||
cluster_size = total_size / clusters_count;
|
||||
|
||||
output_clusters.resize(clusters_count);
|
||||
|
||||
// Initialize range reader.
|
||||
input_part = input_cluster.begin();
|
||||
input_range = input_part->ranges.begin();
|
||||
initRangeInfo();
|
||||
|
||||
// Initialize output writer.
|
||||
current_output_cluster = output_clusters.begin();
|
||||
addPart();
|
||||
initClusterInfo();
|
||||
}
|
||||
|
||||
bool PartsWithRangesSplitter::emit()
|
||||
{
|
||||
size_t new_size = std::min(range_end - range_begin, cluster_end - cluster_begin);
|
||||
current_output_part->ranges.push_back(MarkRange(range_begin, range_begin + new_size));
|
||||
|
||||
range_begin += new_size;
|
||||
cluster_begin += new_size;
|
||||
|
||||
if (isClusterConsumed())
|
||||
return updateCluster();
|
||||
else if (isRangeConsumed())
|
||||
return updateRange(true);
|
||||
else
|
||||
return false;
|
||||
}
|
||||
|
||||
bool PartsWithRangesSplitter::updateCluster()
|
||||
{
|
||||
++current_output_cluster;
|
||||
if (current_output_cluster == output_clusters.end())
|
||||
return false;
|
||||
|
||||
if (isRangeConsumed())
|
||||
if (!updateRange(false))
|
||||
return false;
|
||||
|
||||
addPart();
|
||||
initClusterInfo();
|
||||
return true;
|
||||
}
|
||||
|
||||
bool PartsWithRangesSplitter::updateRange(bool add_part)
|
||||
{
|
||||
++input_range;
|
||||
if (input_range == input_part->ranges.end())
|
||||
{
|
||||
}
|
||||
++input_part;
|
||||
if (input_part == input_cluster.end())
|
||||
return false;
|
||||
|
||||
std::vector<PartsWithRangesSplitter::Cluster> PartsWithRangesSplitter::perform()
|
||||
{
|
||||
init();
|
||||
while (emit()) {}
|
||||
return output_clusters;
|
||||
}
|
||||
|
||||
void PartsWithRangesSplitter::init()
|
||||
{
|
||||
size_t clusters_count = max_clusters_count;
|
||||
while ((clusters_count > 0) && (total_size < (min_cluster_size * clusters_count)))
|
||||
--clusters_count;
|
||||
|
||||
cluster_size = total_size / clusters_count;
|
||||
|
||||
output_clusters.resize(clusters_count);
|
||||
|
||||
// Initialize range reader.
|
||||
input_part = input_cluster.begin();
|
||||
input_range = input_part->ranges.begin();
|
||||
initRangeInfo();
|
||||
|
||||
// Initialize output writer.
|
||||
current_output_cluster = output_clusters.begin();
|
||||
addPart();
|
||||
initClusterInfo();
|
||||
if (add_part)
|
||||
addPart();
|
||||
}
|
||||
|
||||
bool PartsWithRangesSplitter::emit()
|
||||
{
|
||||
size_t new_size = std::min(range_end - range_begin, cluster_end - cluster_begin);
|
||||
current_output_part->ranges.push_back(MarkRange(range_begin, range_begin + new_size));
|
||||
initRangeInfo();
|
||||
return true;
|
||||
}
|
||||
|
||||
range_begin += new_size;
|
||||
cluster_begin += new_size;
|
||||
void PartsWithRangesSplitter::addPart()
|
||||
{
|
||||
MergeTreeDataSelectExecutor::RangesInDataPart new_part;
|
||||
new_part.data_part = input_part->data_part;
|
||||
current_output_cluster->push_back(new_part);
|
||||
current_output_part = &(current_output_cluster->back());
|
||||
}
|
||||
|
||||
if (isClusterConsumed())
|
||||
return updateCluster();
|
||||
else if (isRangeConsumed())
|
||||
return updateRange(true);
|
||||
else
|
||||
return false;
|
||||
}
|
||||
void PartsWithRangesSplitter::initRangeInfo()
|
||||
{
|
||||
range_begin = 0;
|
||||
range_end = input_range->end - input_range->begin;
|
||||
}
|
||||
|
||||
bool PartsWithRangesSplitter::updateCluster()
|
||||
{
|
||||
++current_output_cluster;
|
||||
if (current_output_cluster == output_clusters.end())
|
||||
return false;
|
||||
void PartsWithRangesSplitter::initClusterInfo()
|
||||
{
|
||||
cluster_begin = 0;
|
||||
cluster_end = cluster_size;
|
||||
|
||||
if (isRangeConsumed())
|
||||
if (!updateRange(false))
|
||||
return false;
|
||||
|
||||
addPart();
|
||||
initClusterInfo();
|
||||
return true;
|
||||
}
|
||||
|
||||
bool PartsWithRangesSplitter::updateRange(bool add_part)
|
||||
{
|
||||
++input_range;
|
||||
if (input_range == input_part->ranges.end())
|
||||
{
|
||||
++input_part;
|
||||
if (input_part == input_cluster.end())
|
||||
return false;
|
||||
|
||||
input_range = input_part->ranges.begin();
|
||||
|
||||
if (add_part)
|
||||
addPart();
|
||||
}
|
||||
|
||||
initRangeInfo();
|
||||
return true;
|
||||
}
|
||||
|
||||
void PartsWithRangesSplitter::addPart()
|
||||
{
|
||||
MergeTreeDataSelectExecutor::RangesInDataPart new_part;
|
||||
new_part.data_part = input_part->data_part;
|
||||
current_output_cluster->push_back(new_part);
|
||||
current_output_part = &(current_output_cluster->back());
|
||||
}
|
||||
|
||||
void PartsWithRangesSplitter::initRangeInfo()
|
||||
{
|
||||
range_begin = 0;
|
||||
range_end = input_range->end - input_range->begin;
|
||||
}
|
||||
|
||||
void PartsWithRangesSplitter::initClusterInfo()
|
||||
{
|
||||
cluster_begin = 0;
|
||||
cluster_end = cluster_size;
|
||||
|
||||
remaining_size -= cluster_size;
|
||||
if (remaining_size < cluster_size)
|
||||
cluster_end += remaining_size;
|
||||
}
|
||||
remaining_size -= cluster_size;
|
||||
if (remaining_size < cluster_size)
|
||||
cluster_end += remaining_size;
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user