This commit is contained in:
Alexey Milovidov 2016-04-15 22:09:42 +03:00
parent 3923c7abce
commit 2bb5a4749b
8 changed files with 234 additions and 18 deletions

View File

@ -193,6 +193,7 @@ add_library (dbms
include/DB/DataStreams/MergingAggregatedBlockInputStream.h
include/DB/DataStreams/MergingAggregatedMemoryEfficientBlockInputStream.h
include/DB/DataStreams/SummingSortedBlockInputStream.h
include/DB/DataStreams/ReplacingSortedBlockInputStream.h
include/DB/DataStreams/AddingConstColumnBlockInputStream.h
include/DB/DataStreams/DistinctBlockInputStream.h
include/DB/DataStreams/BlockOutputStreamFromRowOutputStream.h
@ -705,6 +706,7 @@ add_library (dbms
src/DataStreams/CSVRowOutputStream.cpp
src/DataStreams/CSVRowInputStream.cpp
src/DataStreams/SummingSortedBlockInputStream.cpp
src/DataStreams/ReplacingSortedBlockInputStream.cpp
src/DataStreams/TotalsHavingBlockInputStream.cpp
src/DataStreams/CreatingSetsBlockInputStream.cpp
src/DataStreams/DistinctBlockInputStream.cpp

View File

@ -83,7 +83,7 @@ private:
template<class TSortCursor>
void merge(ColumnPlainPtrs & merged_columns, std::priority_queue<TSortCursor> & queue);
/// Вставить в результат строки для текущего идентификатора "визита".
/// Вставить в результат строки для текущего первичного ключа.
void insertRows(ColumnPlainPtrs & merged_columns, size_t & merged_rows, bool last_in_stream = false);
void reportIncorrectData();

View File

@ -0,0 +1,71 @@
#pragma once
#include <common/logger_useful.h>
#include <DB/DataStreams/MergingSortedBlockInputStream.h>
namespace DB
{
/** Соединяет несколько сортированных потоков в один.
* При этом, для каждой группы идущих подряд одинаковых значений первичного ключа (столбцов, по которым сортируются данные),
* оставляет
*/
class ReplacingSortedBlockInputStream : public MergingSortedBlockInputStream
{
public:
ReplacingSortedBlockInputStream(BlockInputStreams inputs_, const SortDescription & description_,
const String & version_column_, size_t max_block_size_)
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_),
version_column(version_column_)
{
}
String getName() const override { return "ReplacingSorted"; }
String getID() const override
{
std::stringstream res;
res << "ReplacingSorted(inputs";
for (size_t i = 0; i < children.size(); ++i)
res << ", " << children[i]->getID();
res << ", description";
for (size_t i = 0; i < description.size(); ++i)
res << ", " << description[i].getID();
res << ", version_column, " << version_column << ")";
return res.str();
}
protected:
/// Может возвращаться на 1 больше записей, чем max_block_size.
Block readImpl() override;
private:
String version_column;
ssize_t version_column_number = -1;
Logger * log = &Logger::get("ReplacingSortedBlockInputStream");
/// Прочитали до конца.
bool finished = false;
RowRef current_key; /// Текущий первичный ключ.
RowRef next_key; /// Первичный ключ следующей строки.
RowRef selected_row; /// Последняя строка с максимальной версией для текущего первичного ключа.
UInt64 max_version = 0; /// Максимальная версия для текущего первичного ключа.
template<class TSortCursor>
void merge(ColumnPlainPtrs & merged_columns, std::priority_queue<TSortCursor> & queue);
/// Вставить в результат строки для текущего первичного ключа.
void insertRow(ColumnPlainPtrs & merged_columns, size_t & merged_rows);
};
}

View File

@ -151,7 +151,7 @@ void CollapsingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, s
if (key_differs)
{
/// Запишем данные для предыдущего визита.
/// Запишем данные для предыдущего первичного ключа.
insertRows(merged_columns, merged_rows);
current_key.swap(next_key);
@ -193,7 +193,7 @@ void CollapsingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, s
}
}
/// Запишем данные для последнего визита.
/// Запишем данные для последнего первичного ключа.
insertRows(merged_columns, merged_rows, true);
finished = true;

View File

@ -0,0 +1,115 @@
#include <DB/DataStreams/ReplacingSortedBlockInputStream.h>
#include <DB/Columns/ColumnsNumber.h>
namespace DB
{
void ReplacingSortedBlockInputStream::insertRow(ColumnPlainPtrs & merged_columns, size_t & merged_rows)
{
++merged_rows;
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insertFrom(*selected_row.columns[i], selected_row.row_num);
}
Block ReplacingSortedBlockInputStream::readImpl()
{
if (finished)
return Block();
if (children.size() == 1)
return children[0]->read();
Block merged_block;
ColumnPlainPtrs merged_columns;
init(merged_block, merged_columns);
if (merged_columns.empty())
return Block();
/// Дополнительная инициализация.
if (selected_row.empty())
{
selected_row.columns.resize(num_columns);
if (!version_column.empty())
version_column_number = merged_block.getPositionByName(version_column);
}
if (has_collation)
merge(merged_columns, queue_with_collation);
else
merge(merged_columns, queue);
return merged_block;
}
template<class TSortCursor>
void ReplacingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, std::priority_queue<TSortCursor> & queue)
{
size_t merged_rows = 0;
/// Вынимаем строки в нужном порядке и кладём в merged_block, пока строк не больше max_block_size
while (!queue.empty())
{
TSortCursor current = queue.top();
if (current_key.empty())
{
current_key.columns.resize(description.size());
next_key.columns.resize(description.size());
setPrimaryKeyRef(current_key, current);
}
UInt64 version = version_column_number != -1
? current->all_columns[version_column_number]->get64(current->pos)
: 0;
setPrimaryKeyRef(next_key, current);
bool key_differs = next_key != current_key;
/// если накопилось достаточно строк и последняя посчитана полностью
if (key_differs && merged_rows >= max_block_size)
return;
queue.pop();
if (key_differs)
{
max_version = 0;
/// Запишем данные для предыдущего первичного ключа.
insertRow(merged_columns, merged_rows);
current_key.swap(next_key);
}
/// Нестрогое сравнение, так как мы выбираем последнюю строку для одинаковых значений версий.
if (version >= max_version)
{
max_version = version;
setRowRef(selected_row, current);
}
if (!current->isLast())
{
current->next();
queue.push(current);
}
else
{
/// Достаём из соответствующего источника следующий блок, если есть.
fetchNextBlock(current, queue);
}
}
/// Запишем данные для последнего первичного ключа.
insertRow(merged_columns, merged_rows);
finished = true;
}
}

View File

@ -9,6 +9,7 @@
#include <DB/DataStreams/MergingSortedBlockInputStream.h>
#include <DB/DataStreams/CollapsingSortedBlockInputStream.h>
#include <DB/DataStreams/SummingSortedBlockInputStream.h>
#include <DB/DataStreams/ReplacingSortedBlockInputStream.h>
#include <DB/DataStreams/AggregatingSortedBlockInputStream.h>
#include <DB/DataStreams/MaterializingBlockInputStream.h>
#include <DB/DataStreams/ConcatBlockInputStream.h>
@ -438,9 +439,9 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
break;
case MergeTreeData::MergingParams::Replacing: /// TODO
merged_stream = std::make_unique<MergingSortedBlockInputStream>(
src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
case MergeTreeData::MergingParams::Replacing:
merged_stream = std::make_unique<ReplacingSortedBlockInputStream>(
src_streams, data.getSortDescription(), data.merging_params.version_column, DEFAULT_MERGE_BLOCK_SIZE);
break;
case MergeTreeData::MergingParams::Unsorted:
@ -698,9 +699,9 @@ MergeTreeData::PerShardDataParts MergeTreeDataMerger::reshardPartition(
src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
break;
case MergeTreeData::MergingParams::Replacing: /// TODO
merged_stream = std::make_unique<MergingSortedBlockInputStream>(
src_streams, data.getSortDescription(), DEFAULT_MERGE_BLOCK_SIZE);
case MergeTreeData::MergingParams::Replacing:
merged_stream = std::make_unique<ReplacingSortedBlockInputStream>(
src_streams, data.getSortDescription(), data.merging_params.version_column, DEFAULT_MERGE_BLOCK_SIZE);
break;
case MergeTreeData::MergingParams::Unsorted:

View File

@ -29,6 +29,7 @@
#include <DB/DataStreams/CreatingSetsBlockInputStream.h>
#include <DB/DataStreams/NullBlockInputStream.h>
#include <DB/DataStreams/SummingSortedBlockInputStream.h>
#include <DB/DataStreams/ReplacingSortedBlockInputStream.h>
#include <DB/DataStreams/AggregatingSortedBlockInputStream.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataTypes/DataTypeDate.h>
@ -834,8 +835,9 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreadsFinal
merged = new AggregatingSortedBlockInputStream(to_merge, data.getSortDescription(), max_block_size);
break;
case MergeTreeData::MergingParams::Replacing: /// TODO
merged = new MergingSortedBlockInputStream(to_merge, data.getSortDescription(), max_block_size);
case MergeTreeData::MergingParams::Replacing: /// TODO Сделать ReplacingFinalBlockInputStream
merged = new ReplacingSortedBlockInputStream(to_merge,
data.getSortDescription(), data.merging_params.version_column, max_block_size);
break;
case MergeTreeData::MergingParams::Unsorted:

View File

@ -361,15 +361,17 @@ StoragePtr StorageFactory::get(
* (запрос с SAMPLE x будет выбирать строки, у которых в этом столбце значение меньше, чем x * UINT32_MAX);
* - выражение для сортировки (либо скалярное выражение, либо tuple из нескольких);
* - index_granularity;
* - (для Collapsing) имя столбца, содержащего тип строчки с изменением "визита" (принимающего значения 1 и -1).
* - (для Collapsing) имя столбца типа Int8, содержащего тип строчки с изменением "визита" (принимающего значения 1 и -1).
* Например: ENGINE = ReplicatedCollapsingMergeTree('/tables/mytable', 'rep02', EventDate, (CounterID, EventDate, intHash32(UniqID), VisitID), 8192, Sign).
* - (для Summing, не обязательно) кортеж столбцов, которых следует суммировать. Если не задано - используются все числовые столбцы, не входящие в первичный ключ.
* - (для Replacing, не обязательно) имя столбца одного из UInt типов, обозначающего "версию"
* Например: ENGINE = ReplicatedCollapsingMergeTree('/tables/mytable', 'rep02', EventDate, (CounterID, EventDate, intHash32(UniqID), VisitID), 8192, Sign).
*
* MergeTree(date, [sample_key], primary_key, index_granularity)
* CollapsingMergeTree(date, [sample_key], primary_key, index_granularity, sign)
* SummingMergeTree(date, [sample_key], primary_key, index_granularity, [columns_to_sum])
* AggregatingMergeTree(date, [sample_key], primary_key, index_granularity)
* ReplacingMergeTree(date, [sample_key], primary_key, index_granularity, [version_column])
* UnsortedMergeTree(date, index_granularity) TODO Добавить описание ниже.
*/
@ -381,9 +383,9 @@ MergeTrees is different in two ways:
- it may be replicated and non-replicated;
- it may do different actions on merge: nothing; sign collapse; sum; apply aggregete functions.
So we have 8 combinations:
MergeTree, CollapsingMergeTree, SummingMergeTree, AggregatingMergeTree,
ReplicatedMergeTree, ReplicatedCollapsingMergeTree, ReplicatedSummingMergeTree, ReplicatedAggregatingMergeTree.
So we have 12 combinations:
MergeTree, CollapsingMergeTree, SummingMergeTree, AggregatingMergeTree, ReplacingMergeTree, UnsortedMergeTree
ReplicatedMergeTree, ReplicatedCollapsingMergeTree, ReplicatedSummingMergeTree, ReplicatedAggregatingMergeTree, ReplicatedReplacingMergeTree, ReplicatedUnsortedMergeTree
In most of cases, you need MergeTree or ReplicatedMergeTree.
@ -416,6 +418,8 @@ For Collapsing mode, last parameter is name of sign column - special column that
For Summing mode, last parameter is optional list of columns to sum while merge. List is passed in round brackets, like (PageViews, Cost).
If this parameter is omitted, storage will sum all numeric columns except columns participated in primary key.
For Replacing mode, last parameter is optional name of 'version' column. While merging, for all rows with same primary key, only one row is selected: last row, if version column was not specified, or last row with maximum version value, if specified.
Examples:
@ -452,6 +456,8 @@ For further info please read the documentation: https://clickhouse.yandex-team.r
merging_params.mode = MergeTreeData::MergingParams::Aggregating;
else if (name_part == "Unsorted")
merging_params.mode = MergeTreeData::MergingParams::Unsorted;
else if (name_part == "Replacing")
merging_params.mode = MergeTreeData::MergingParams::Replacing;
else if (!name_part.empty())
throw Exception("Unknown storage " + name + verbose_help, ErrorCodes::UNKNOWN_STORAGE);
@ -485,6 +491,7 @@ For further info please read the documentation: https://clickhouse.yandex-team.r
}
if (merging_params.mode != MergeTreeData::MergingParams::Summing
&& merging_params.mode != MergeTreeData::MergingParams::Replacing
&& merging_params.mode != MergeTreeData::MergingParams::Unsorted
&& args.size() != num_additional_params + 3
&& args.size() != num_additional_params + 4)
@ -511,7 +518,8 @@ For further info please read the documentation: https://clickhouse.yandex-team.r
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
}
if (merging_params.mode == MergeTreeData::MergingParams::Summing
if ((merging_params.mode == MergeTreeData::MergingParams::Summing
|| merging_params.mode == MergeTreeData::MergingParams::Replacing)
&& args.size() != num_additional_params + 3
&& args.size() != num_additional_params + 4
&& args.size() != num_additional_params + 5)
@ -527,8 +535,12 @@ For further info please read the documentation: https://clickhouse.yandex-team.r
"\nname of column with date,"
"\n[sampling element of primary key],"
"\nprimary key expression,"
"\nindex granularity,"
"\n[list of columns to sum]\n";
"\nindex granularity,";
if (merging_params.mode == MergeTreeData::MergingParams::Summing)
params += "\n[list of columns to sum]\n";
else
params += "\n[version]\n";
throw Exception("Storage " + name + " requires "
+ toString(num_additional_params + 3) + " or "
@ -576,6 +588,19 @@ For further info please read the documentation: https://clickhouse.yandex-team.r
args.pop_back();
}
else if (merging_params.mode == MergeTreeData::MergingParams::Replacing)
{
/// Если последний элемент - не index granularity (литерал), то это - имя столбца-версии.
if (!typeid_cast<const ASTLiteral *>(&*args.back()))
{
if (auto ast = typeid_cast<ASTIdentifier *>(&*args.back()))
merging_params.version_column = ast->name;
else
throw Exception(String("Version column name must be an unquoted string") + verbose_help, ErrorCodes::BAD_ARGUMENTS);
args.pop_back();
}
}
else if (merging_params.mode == MergeTreeData::MergingParams::Summing)
{
/// Если последний элемент - не index granularity (литерал), то это - список суммируемых столбцов.