Fix poor distributed insertion scalability (#481)

* Fix poor scalability of INSERT into Distributed table [#CLICKHOUSE-2791].

* Fix poor scalability of INSERT into Distributed table (continued) [#CLICKHOUSE-2791].

* Misc [#CLICKHOUSE-2791].

* Fixed error [#CLICKHOUSE-2791].
This commit is contained in:
alexey-milovidov 2017-02-12 00:20:57 +04:00 committed by GitHub
parent 6a24e21f77
commit 4593d363d1
19 changed files with 339 additions and 189 deletions

View File

@ -238,6 +238,29 @@ public:
throw Exception("Method replicate is not supported for ColumnAggregateFunction.", ErrorCodes::NOT_IMPLEMENTED);
}
Columns scatter(ColumnIndex num_columns, const Selector & selector) const override
{
/// Columns with scattered values will point to this column as the owner of values.
Columns columns(num_columns);
for (auto & column : columns)
column = std::make_shared<ColumnAggregateFunction>(*this);
size_t num_rows = size();
{
size_t reserve_size = num_rows / num_columns * 1.1; /// 1.1 is just a guess. Better to use n-sigma rule.
if (reserve_size > 1)
for (auto & column : columns)
column->reserve(reserve_size);
}
for (size_t i = 0; i < num_rows; ++i)
static_cast<ColumnAggregateFunction &>(*columns[selector[i]]).data.push_back(data[i]);
return columns;
}
int compareAt(size_t n, size_t m, const IColumn & rhs_, int nan_direction_hint) const override
{
return 0;

View File

@ -308,6 +308,10 @@ public:
ColumnPtr replicate(const Offsets_t & replicate_offsets) const override;
Columns scatter(ColumnIndex num_columns, const Selector & selector) const override
{
return scatterImpl<ColumnArray>(num_columns, selector);
}
ColumnPtr convertToFullColumnIfConst() const override
{

View File

@ -158,6 +158,22 @@ public:
return std::make_shared<Derived>(replicated_size, data, data_type);
}
Columns scatter(ColumnIndex num_columns, const Selector & selector) const override
{
if (s != selector.size())
throw Exception("Size of selector doesn't match size of column.", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
std::vector<size_t> counts(num_columns);
for (auto idx : selector)
++counts[idx];
Columns res(num_columns);
for (size_t i = 0; i < num_columns; ++i)
res[i] = cloneResized(counts[i]);
return res;
}
size_t byteSize() const override { return sizeof(data) + sizeof(s); }
size_t allocatedSize() const override { return byteSize(); }

View File

@ -309,6 +309,11 @@ public:
return res;
}
Columns scatter(ColumnIndex num_columns, const Selector & selector) const override
{
return scatterImpl<ColumnFixedString>(num_columns, selector);
}
void reserve(size_t size) override
{
chars.reserve(n * size);

View File

@ -60,6 +60,11 @@ public:
void updateHashWithValue(size_t n, SipHash & hash) const override;
void getExtremes(Field & min, Field & max) const override;
Columns scatter(ColumnIndex num_columns, const Selector & selector) const override
{
return scatterImpl<ColumnNullable>(num_columns, selector);
}
/// Return the column that represents values.
ColumnPtr & getNestedColumn() { return nested_column; }
const ColumnPtr & getNestedColumn() const { return nested_column; }

View File

@ -445,6 +445,11 @@ public:
return res;
}
Columns scatter(ColumnIndex num_columns, const Selector & selector) const override
{
return scatterImpl<ColumnString>(num_columns, selector);
}
void reserve(size_t n) override
{
offsets.reserve(n);

View File

@ -172,6 +172,27 @@ public:
return std::make_shared<ColumnTuple>(res_block);
}
Columns scatter(ColumnIndex num_columns, const Selector & selector) const override
{
size_t num_tuple_elements = columns.size();
std::vector<Columns> scattered_tuple_elements(num_tuple_elements);
for (size_t tuple_element_idx = 0; tuple_element_idx < num_tuple_elements; ++tuple_element_idx)
scattered_tuple_elements[tuple_element_idx] = data.getByPosition(tuple_element_idx).column->scatter(num_columns, selector);
Columns res(num_columns);
for (size_t scattered_idx = 0; scattered_idx < num_columns; ++scattered_idx)
{
Block res_block = data.cloneEmpty();
for (size_t tuple_element_idx = 0; tuple_element_idx < num_tuple_elements; ++tuple_element_idx)
res_block.getByPosition(tuple_element_idx).column = scattered_tuple_elements[tuple_element_idx][scattered_idx];
res[scattered_idx] = std::make_shared<ColumnTuple>(res_block);
}
return res;
}
int compareAt(size_t n, size_t m, const IColumn & rhs, int nan_direction_hint) const override
{
size_t size = columns.size();

View File

@ -497,7 +497,13 @@ public:
}
/** Более эффективные методы манипуляции */
Columns scatter(ColumnIndex num_columns, const Selector & selector) const override
{
return this->scatterImpl<Self>(num_columns, selector);
}
/** More efficient methods of manipulation - to manipulate with data directly. */
Container_t & getData()
{
return data;

View File

@ -20,6 +20,7 @@ namespace ErrorCodes
{
extern const int CANNOT_GET_SIZE_OF_FIELD;
extern const int NOT_IMPLEMENTED;
extern const int SIZES_OF_COLUMNS_DOESNT_MATCH;
}
class IColumn;
@ -245,6 +246,14 @@ public:
using Offsets_t = PaddedPODArray<Offset_t>;
virtual ColumnPtr replicate(const Offsets_t & offsets) const = 0;
/** Split column to smaller columns. Each value goes to column index, selected by corresponding element of 'selector'.
* Selector must contain values from 0 to num_columns - 1.
* For default implementation, see scatterImpl.
*/
using ColumnIndex = UInt64;
using Selector = PaddedPODArray<ColumnIndex>;
virtual Columns scatter(ColumnIndex num_columns, const Selector & selector) const = 0;
/** Посчитать минимум и максимум по столбцу.
* Функция должна быть реализована полноценно только для числовых столбцов, а также дат/дат-с-временем.
* Для строк и массивов функция должна возвращать значения по-умолчанию
@ -269,6 +278,36 @@ public:
virtual size_t allocatedSize() const = 0;
virtual ~IColumn() {}
protected:
/// Template is to devirtualize calls to insertFrom method.
/// In derived classes (that use final keyword), implement scatter method as call to scatterImpl.
template <typename Derived>
Columns scatterImpl(ColumnIndex num_columns, const Selector & selector) const
{
size_t num_rows = size();
if (num_rows != selector.size())
throw Exception("Size of selector doesn't match size of column.", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
Columns columns(num_columns);
for (auto & column : columns)
column = cloneEmpty();
{
size_t reserve_size = num_rows / num_columns * 1.1; /// 1.1 is just a guess. Better to use n-sigma rule.
if (reserve_size > 1)
for (auto & column : columns)
column->reserve(reserve_size);
}
for (size_t i = 0; i < num_rows; ++i)
static_cast<Derived &>(*columns[selector[i]]).insertFrom(*this, i);
return columns;
}
};

View File

@ -87,6 +87,22 @@ public:
return cloneDummy(s == 0 ? 0 : offsets.back());
}
Columns scatter(ColumnIndex num_columns, const Selector & selector) const override
{
if (s != selector.size())
throw Exception("Size of selector doesn't match size of column.", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
std::vector<size_t> counts(num_columns);
for (auto idx : selector)
++counts[idx];
Columns res(num_columns);
for (size_t i = 0; i < num_columns; ++i)
res[i] = cloneResized(counts[i]);
return res;
}
void getExtremes(Field & min, Field & max) const override
{
throw Exception("Method getExtremes is not supported for " + getName(), ErrorCodes::NOT_IMPLEMENTED);

View File

@ -1,64 +0,0 @@
#pragma once
#include <DB/Columns/ColumnConst.h>
#include <DB/Columns/ColumnVector.h>
#include <type_traits>
#if __SSE2__
#define LIBDIVIDE_USE_SSE2 1
#endif
#include <libdivide.h>
namespace DB
{
template <typename T>
struct BlockFilterCreator
{
static std::vector<IColumn::Filter> perform(const size_t num_rows, const IColumn * column,
size_t num_shards, const std::vector<size_t> & slots)
{
const auto total_weight = slots.size();
std::vector<IColumn::Filter> filters(num_shards);
/** Деление отрицательного числа с остатком на положительное, в C++ даёт отрицательный остаток.
* Для данной задачи это не подходит. Поэтому, будем обрабатывать знаковые типы как беззнаковые.
* Это даёт уже что-то совсем не похожее на деление с остатком, но подходящее для данной задачи.
*/
using UnsignedT = typename std::make_unsigned<T>::type;
/// const columns contain only one value, therefore we do not need to read it at every iteration
if (column->isConst())
{
const auto data = typeid_cast<const ColumnConst<T> *>(column)->getData();
const auto shard_num = slots[static_cast<UnsignedT>(data) % total_weight];
for (size_t i = 0; i < num_shards; ++i)
filters[i].assign(num_rows, static_cast<UInt8>(shard_num == i));
}
else
{
/// libdivide поддерживает только UInt32 или UInt64.
using TUInt32Or64 = typename std::conditional<sizeof(UnsignedT) <= 4, UInt32, UInt64>::type;
libdivide::divider<TUInt32Or64> divider(total_weight);
const auto & data = typeid_cast<const ColumnVector<T> *>(column)->getData();
/// NOTE Может быть, стоит поменять местами циклы.
for (size_t i = 0; i < num_shards; ++i)
{
filters[i].resize(num_rows);
for (size_t j = 0; j < num_rows; ++j)
filters[i][j] = slots[
static_cast<TUInt32Or64>(data[j]) - (static_cast<TUInt32Or64>(data[j]) / divider) * total_weight] == i;
}
}
return filters;
}
};
}

View File

@ -1073,8 +1073,8 @@ public:
*/
Block mergeBlocks(BlocksList & blocks, bool final);
/** Преобразовать (разрезать) блок частично-агрегированных данных на много блоков, как если бы использовался двухуровневый метод агрегации.
* Это нужно, чтобы потом было проще объединить результат с другими результатами, уже являющимися двухуровневыми.
/** Split block with partially-aggregated data to many blocks, as if two-level method of aggregation was used.
* This is needed to simplify merging of that data with other results, that are already two-level.
*/
std::vector<Block> convertBlockToTwoLevel(const Block & block);

View File

@ -0,0 +1,27 @@
#pragma once
#include <DB/Columns/IColumn.h>
namespace DB
{
/** Create a 'selector' to be used in IColumn::scatter method
* according to sharding scheme and values of column with sharding key.
*
* Each of num_shards has its weight. Weight must be small.
* 'slots' contains weight elements for each shard, in total - sum of all weight elements.
*
* Values of column get divided to sum_weight, and modulo of division
* will map to corresponding shard through 'slots' array.
*
* Column must have integer type.
* T is type of column elements.
*/
template <typename T>
IColumn::Selector createBlockSelector(
const IColumn & column,
size_t num_shards,
const std::vector<size_t> & slots);
}

View File

@ -26,7 +26,7 @@ public:
void write(const Block & block) override;
private:
std::vector<IColumn::Filter> createFilters(Block block);
IColumn::Selector createSelector(Block block);
void writeSplit(const Block & block);

View File

@ -43,9 +43,8 @@ public:
ShardedBlocksWithDateIntervals shardBlock(const Block & block);
private:
std::vector<IColumn::Filter> createFilters(Block block);
IColumn::Selector createSelector(Block block);
private:
MergeTreeData & data;
const ReshardingJob & job;
Logger * log;

View File

@ -2305,49 +2305,42 @@ void NO_INLINE Aggregator::convertBlockToTwoLevelImpl(
size_t rows = source.rows();
size_t columns = source.columns();
/// Для каждого номера корзины создадим фильтр, где будут отмечены строки, относящиеся к этой корзине.
std::vector<IColumn::Filter> filters(destinations.size());
/// Create a 'selector' that will contain bucket index for every row. It will be used to scatter rows to buckets.
IColumn::Selector selector(rows);
/// Для всех строчек.
/// For every row.
for (size_t i = 0; i < rows; ++i)
{
/// Получаем ключ. Вычисляем на его основе номер корзины.
/// Obtain a key. Calculate bucket number from it.
typename Method::Key key = state.getKey(key_columns, params.keys_size, i, key_sizes, keys, *pool);
auto hash = method.data.hash(key);
auto bucket = method.data.getBucketFromHash(hash);
/// Этот ключ нам больше не нужен.
selector[i] = bucket;
/// We don't need to store this key in pool.
method.onExistingKey(key, keys, *pool);
auto & filter = filters[bucket];
if (unlikely(filter.empty()))
filter.resize_fill(rows);
filter[i] = 1;
}
ssize_t size_hint = ((source.rows() + method.data.NUM_BUCKETS - 1)
/ method.data.NUM_BUCKETS) * 1.1; /// Число 1.1 выбрано наугад.
size_t num_buckets = destinations.size();
for (size_t bucket = 0, size = destinations.size(); bucket < size; ++bucket)
for (size_t column_idx = 0; column_idx < columns; ++column_idx)
{
const auto & filter = filters[bucket];
const ColumnWithTypeAndName & src_col = source.getByPosition(column_idx);
Columns scattered_columns = src_col.column->scatter(num_buckets, selector);
if (filter.empty())
continue;
Block & dst = destinations[bucket];
dst.info.bucket_num = bucket;
for (size_t j = 0; j < columns; ++j)
for (size_t bucket = 0, size = num_buckets; bucket < size; ++bucket)
{
const ColumnWithTypeAndName & src_col = source.getByPosition(j);
dst.insert({src_col.column->filter(filter, size_hint), src_col.type, src_col.name});
if (!scattered_columns[bucket]->empty())
{
Block & dst = destinations[bucket];
dst.info.bucket_num = bucket;
dst.insert({scattered_columns[bucket], src_col.type, src_col.name});
}
/** Вставленные в блок столбцы типа ColumnAggregateFunction будут владеть состояниями агрегатных функций
* путём удержания shared_ptr-а на исходный столбец. См. ColumnAggregateFunction.h
/** Inserted columns of type ColumnAggregateFunction will own states of aggregate functions
* by holding shared_ptr to source column. See ColumnAggregateFunction.h
*/
}
}

View File

@ -0,0 +1,66 @@
#include <DB/Columns/ColumnConst.h>
#include <DB/Columns/ColumnVector.h>
#include <type_traits>
#if __SSE2__
#define LIBDIVIDE_USE_SSE2 1
#endif
#include <libdivide.h>
namespace DB
{
template <typename T>
IColumn::Selector createBlockSelector(
const IColumn & column,
size_t num_shards,
const std::vector<size_t> & slots)
{
const auto total_weight = slots.size();
size_t num_rows = column.size();
IColumn::Selector selector(num_rows);
/** Modulo of division of negative numbers to positive number in C++11 is negative (so called truncated division).
* This is not suitable for our task. So we will process signed numbers as unsigned.
* It is not near like remainder of division, but is suitable for our task.
*/
using UnsignedT = typename std::make_unsigned<T>::type;
/// const columns contain only one value, therefore we do not need to read it at every iteration
if (column.isConst())
{
const auto data = typeid_cast<const ColumnConst<T> &>(column).getData();
const auto shard_num = slots[static_cast<UnsignedT>(data) % total_weight];
selector.assign(num_rows, shard_num);
}
else
{
/// libdivide support only UInt32 and UInt64.
using TUInt32Or64 = typename std::conditional<sizeof(UnsignedT) <= 4, UInt32, UInt64>::type;
libdivide::divider<TUInt32Or64> divider(total_weight);
const auto & data = typeid_cast<const ColumnVector<T> &>(column).getData();
for (size_t i = 0; i < num_rows; ++i)
selector[i] = slots[static_cast<TUInt32Or64>(data[i]) - (static_cast<TUInt32Or64>(data[i]) / divider) * total_weight];
}
return selector;
}
/// Explicit instantinations to avoid code bloat in headers.
template IColumn::Selector createBlockSelector<UInt8>(const IColumn & column, size_t num_shards, const std::vector<size_t> & slots);
template IColumn::Selector createBlockSelector<UInt16>(const IColumn & column, size_t num_shards, const std::vector<size_t> & slots);
template IColumn::Selector createBlockSelector<UInt32>(const IColumn & column, size_t num_shards, const std::vector<size_t> & slots);
template IColumn::Selector createBlockSelector<UInt64>(const IColumn & column, size_t num_shards, const std::vector<size_t> & slots);
template IColumn::Selector createBlockSelector<Int8>(const IColumn & column, size_t num_shards, const std::vector<size_t> & slots);
template IColumn::Selector createBlockSelector<Int16>(const IColumn & column, size_t num_shards, const std::vector<size_t> & slots);
template IColumn::Selector createBlockSelector<Int32>(const IColumn & column, size_t num_shards, const std::vector<size_t> & slots);
template IColumn::Selector createBlockSelector<Int64>(const IColumn & column, size_t num_shards, const std::vector<size_t> & slots);
}

View File

@ -9,33 +9,25 @@
#include <DB/DataStreams/NativeBlockOutputStream.h>
#include <DB/Interpreters/InterpreterInsertQuery.h>
#include <DB/Interpreters/Cluster.h>
#include <DB/Common/BlockFilterCreator.h>
#include <DB/Interpreters/createBlockSelector.h>
#include <DB/Common/Increment.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <memory>
#include <common/ClickHouseRevision.h>
#include <iostream>
namespace DB
{
namespace
{
template <typename T>
std::vector<IColumn::Filter> createFiltersImpl(const size_t num_rows, const IColumn * column, const Cluster & cluster)
{
return BlockFilterCreator<T>::perform(num_rows, column, cluster.getShardsInfo().size(), cluster.getSlotToShard());
}
}
DistributedBlockOutputStream::DistributedBlockOutputStream(StorageDistributed & storage, const ASTPtr & query_ast, const ClusterPtr & cluster_)
: storage(storage), query_ast(query_ast), cluster(cluster_)
{
}
void DistributedBlockOutputStream::write(const Block & block)
{
if (storage.getShardingKeyExpr() && (cluster->getShardsInfo().size() > 1))
@ -44,35 +36,33 @@ void DistributedBlockOutputStream::write(const Block & block)
writeImpl(block);
}
std::vector<IColumn::Filter> DistributedBlockOutputStream::createFilters(Block block)
IColumn::Selector DistributedBlockOutputStream::createSelector(Block block)
{
using create_filters_sig = std::vector<IColumn::Filter>(size_t, const IColumn *, const Cluster &);
/// hashmap of pointers to functions corresponding to each integral type
static std::unordered_map<std::string, create_filters_sig *> creators{
{ TypeName<UInt8>::get(), &createFiltersImpl<UInt8> },
{ TypeName<UInt16>::get(), &createFiltersImpl<UInt16> },
{ TypeName<UInt32>::get(), &createFiltersImpl<UInt32> },
{ TypeName<UInt64>::get(), &createFiltersImpl<UInt64> },
{ TypeName<Int8>::get(), &createFiltersImpl<Int8> },
{ TypeName<Int16>::get(), &createFiltersImpl<Int16> },
{ TypeName<Int32>::get(), &createFiltersImpl<Int32> },
{ TypeName<Int64>::get(), &createFiltersImpl<Int64> },
};
storage.getShardingKeyExpr()->execute(block);
const auto & key_column = block.getByName(storage.getShardingKeyColumnName());
size_t num_shards = cluster->getShardsInfo().size();
const auto & slot_to_shard = cluster->getSlotToShard();
/// check that key column has valid type
const auto it = creators.find(key_column.type->getName());
#define CREATE_FOR_TYPE(TYPE) \
if (typeid_cast<const DataType ## TYPE *>(key_column.type.get())) \
return createBlockSelector<TYPE>(*key_column.column, num_shards, slot_to_shard);
if (it == std::end(creators))
throw Exception{"Sharding key expression does not evaluate to an integer type",
ErrorCodes::TYPE_MISMATCH};
CREATE_FOR_TYPE(UInt8)
CREATE_FOR_TYPE(UInt16)
CREATE_FOR_TYPE(UInt32)
CREATE_FOR_TYPE(UInt64)
CREATE_FOR_TYPE(Int8)
CREATE_FOR_TYPE(Int16)
CREATE_FOR_TYPE(Int32)
CREATE_FOR_TYPE(Int64)
return (*it->second)(block.rows(), key_column.column.get(), *cluster);
#undef CREATE_FOR_TYPE
throw Exception{"Sharding key expression does not evaluate to an integer type", ErrorCodes::TYPE_MISMATCH};
}
void DistributedBlockOutputStream::writeSplit(const Block & block)
{
const auto num_cols = block.columns();
@ -81,24 +71,30 @@ void DistributedBlockOutputStream::writeSplit(const Block & block)
for (size_t i = 0; i < columns.size(); ++i)
columns[i] = block.safeGetByPosition(i).column.get();
auto filters = createFilters(block);
auto selector = createSelector(block);
const auto num_shards = cluster->getShardsInfo().size();
/// Split block to num_shard smaller block, using 'selector'.
ssize_t size_hint = ((block.rows() + num_shards - 1) / num_shards) * 1.1; /// Число 1.1 выбрано наугад.
const size_t num_shards = cluster->getShardsInfo().size();
Blocks splitted_blocks(num_shards);
for (size_t i = 0; i < num_shards; ++i)
for (size_t shard_idx = 0; shard_idx < num_shards; ++shard_idx)
splitted_blocks[shard_idx] = block.cloneEmpty();
size_t columns_in_block = block.columns();
for (size_t col_idx_in_block = 0; col_idx_in_block < columns_in_block; ++col_idx_in_block)
{
auto target_block = block.cloneEmpty();
for (size_t col = 0; col < num_cols; ++col)
target_block.safeGetByPosition(col).column = columns[col]->filter(filters[i], size_hint);
if (target_block.rows())
writeImpl(target_block, i);
Columns splitted_columns = block.getByPosition(col_idx_in_block).column->scatter(num_shards, selector);
for (size_t shard_idx = 0; shard_idx < num_shards; ++shard_idx)
splitted_blocks[shard_idx].getByPosition(col_idx_in_block).column = std::move(splitted_columns[shard_idx]);
}
for (size_t shard_idx = 0; shard_idx < num_shards; ++shard_idx)
if (splitted_blocks[shard_idx].rows())
writeImpl(splitted_blocks[shard_idx], shard_idx);
}
void DistributedBlockOutputStream::writeImpl(const Block & block, const size_t shard_id)
{
const auto & shard_info = cluster->getShardsInfo()[shard_id];
@ -110,6 +106,7 @@ void DistributedBlockOutputStream::writeImpl(const Block & block, const size_t s
writeToShard(block, shard_info.dir_names);
}
void DistributedBlockOutputStream::writeToLocal(const Block & block, const size_t repeats)
{
InterpreterInsertQuery interp{query_ast, storage.context};
@ -123,6 +120,7 @@ void DistributedBlockOutputStream::writeToLocal(const Block & block, const size_
block_io.out->writeSuffix();
}
void DistributedBlockOutputStream::writeToShard(const Block & block, const std::vector<std::string> & dir_names)
{
/** tmp directory is used to ensure atomicity of transactions

View File

@ -4,7 +4,7 @@
#include <DB/Common/escapeForFileName.h>
#include <DB/DataTypes/DataTypeArray.h>
#include <DB/IO/HashingWriteBuffer.h>
#include <DB/Common/BlockFilterCreator.h>
#include <DB/Interpreters/createBlockSelector.h>
#include <DB/Interpreters/ExpressionAnalyzer.h>
#include <ctime>
@ -47,71 +47,62 @@ ShardedBlocksWithDateIntervals MergeTreeSharder::shardBlock(const Block & block)
for (size_t i = 0; i < columns.size(); ++i)
columns[i] = block.safeGetByPosition(i).column.get();
auto filters = createFilters(block);
auto selector = createSelector(block);
/// Split block to num_shard smaller block, using 'selector'.
const auto num_shards = job.paths.size();
Blocks splitted_blocks(num_shards);
ssize_t size_hint = ((block.rows() + num_shards - 1) / num_shards) * 1.1; /// Число 1.1 выбрано наугад.
for (size_t shard_idx = 0; shard_idx < num_shards; ++shard_idx)
splitted_blocks[shard_idx] = block.cloneEmpty();
for (size_t shard_no = 0; shard_no < num_shards; ++shard_no)
size_t columns_in_block = block.columns();
for (size_t col_idx_in_block = 0; col_idx_in_block < columns_in_block; ++col_idx_in_block)
{
auto target_block = block.cloneEmpty();
Columns splitted_columns = block.getByPosition(col_idx_in_block).column->scatter(num_shards, selector);
for (size_t shard_idx = 0; shard_idx < num_shards; ++shard_idx)
splitted_blocks[shard_idx].getByPosition(col_idx_in_block).column = std::move(splitted_columns[shard_idx]);
}
for (size_t col = 0; col < num_cols; ++col)
target_block.safeGetByPosition(col).column = columns[col]->filter(filters[shard_no], size_hint);
if (target_block.rows())
for (size_t shard_idx = 0; shard_idx < num_shards; ++shard_idx)
{
if (splitted_blocks[shard_idx].rows())
{
/// Достаём столбец с датой.
const ColumnUInt16::Container_t & dates =
typeid_cast<const ColumnUInt16 &>(*target_block.getByName(data.date_column_name).column).getData();
/// Минимальная и максимальная дата.
UInt16 min_date = std::numeric_limits<UInt16>::max();
UInt16 max_date = std::numeric_limits<UInt16>::min();
for (ColumnUInt16::Container_t::const_iterator it = dates.begin(); it != dates.end(); ++it)
{
if (*it < min_date)
min_date = *it;
if (*it > max_date)
max_date = *it;
}
res.emplace_back(target_block, shard_no, min_date, max_date);
/// Get min and max date.
Field min_date;
Field max_date;
typeid_cast<const ColumnUInt16 &>(*splitted_blocks[shard_idx].getByName(data.date_column_name).column).getExtremes(min_date, max_date);
res.emplace_back(splitted_blocks[shard_idx], shard_idx, get<UInt64>(min_date), get<UInt64>(max_date));
}
}
return res;
}
std::vector<IColumn::Filter> MergeTreeSharder::createFilters(Block block)
IColumn::Selector MergeTreeSharder::createSelector(Block block)
{
using create_filters_sig = std::vector<IColumn::Filter>(size_t, const IColumn *, size_t num_shards, const std::vector<size_t> & slots);
/// hashmap of pointers to functions corresponding to each integral type
static std::unordered_map<std::string, create_filters_sig *> creators{
{ TypeName<UInt8>::get(), &BlockFilterCreator<UInt8>::perform },
{ TypeName<UInt16>::get(), &BlockFilterCreator<UInt16>::perform },
{ TypeName<UInt32>::get(), &BlockFilterCreator<UInt32>::perform },
{ TypeName<UInt64>::get(), &BlockFilterCreator<UInt64>::perform },
{ TypeName<Int8>::get(), &BlockFilterCreator<Int8>::perform },
{ TypeName<Int16>::get(), &BlockFilterCreator<Int16>::perform },
{ TypeName<Int32>::get(), &BlockFilterCreator<Int32>::perform },
{ TypeName<Int64>::get(), &BlockFilterCreator<Int64>::perform },
};
sharding_key_expr->execute(block);
const auto & key_column = block.getByName(sharding_key_column_name);
size_t num_shards = job.paths.size();
/// check that key column has valid type
const auto it = creators.find(key_column.type->getName());
#define CREATE_FOR_TYPE(TYPE) \
if (typeid_cast<const DataType ## TYPE *>(key_column.type.get())) \
return createBlockSelector<TYPE>(*key_column.column, num_shards, slots);
return it != std::end(creators)
? (*it->second)(block.rows(), key_column.column.get(), job.paths.size(), slots)
: throw Exception{
"Sharding key expression does not evaluate to an integer type",
ErrorCodes::TYPE_MISMATCH
};
CREATE_FOR_TYPE(UInt8)
CREATE_FOR_TYPE(UInt16)
CREATE_FOR_TYPE(UInt32)
CREATE_FOR_TYPE(UInt64)
CREATE_FOR_TYPE(Int8)
CREATE_FOR_TYPE(Int16)
CREATE_FOR_TYPE(Int32)
CREATE_FOR_TYPE(Int64)
#undef CREATE_FOR_TYPE
throw Exception{"Sharding key expression does not evaluate to an integer type", ErrorCodes::TYPE_MISMATCH};
}
}