ClickHouse/dbms/include/DB/DataStreams/ArrayJoiningBlockInputStream.h

107 lines
2.9 KiB
C
Raw Normal View History

2012-08-27 05:13:14 +00:00
#pragma once
#include <Poco/SharedPtr.h>
#include <DB/Columns/ColumnConst.h>
#include <DB/Columns/ColumnArray.h>
#include <DB/DataTypes/DataTypeArray.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
namespace DB
{
/** Реализует операцию ARRAY JOIN.
* (Для удобства, эта операция записывается, как функция arrayJoin, применённая к массиву.)
* Эта операция размножает все строки столько раз, сколько элементов в массиве.
* Результат функции arrayJoin - столбец единичных значений соответствующих элементов.
*
* Например,
*
* name arr
* ------ ------
* 'вася' [1, 2]
* 'петя' []
*
* преобразуется в
*
* name arrayJoin(arr)
* ------ --------------
* 'вася' 1
* 'вася' 2
*/
class ArrayJoiningBlockInputStream : public IProfilingBlockInputStream
{
public:
ArrayJoiningBlockInputStream(BlockInputStreamPtr input_, ssize_t array_column_)
2013-05-04 04:05:15 +00:00
: array_column(array_column_)
2012-08-27 05:13:14 +00:00
{
2013-05-04 04:05:15 +00:00
children.push_back(input_);
2012-08-27 05:13:14 +00:00
}
ArrayJoiningBlockInputStream(BlockInputStreamPtr input_, const String & array_column_name_)
2013-05-04 04:05:15 +00:00
: array_column(-1), array_column_name(array_column_name_)
2012-08-27 05:13:14 +00:00
{
2013-05-04 04:05:15 +00:00
children.push_back(input_);
2012-08-27 05:13:14 +00:00
}
2012-10-20 02:10:47 +00:00
String getName() const { return "ArrayJoiningBlockInputStream"; }
String getID() const
{
std::stringstream res;
2013-05-04 05:20:07 +00:00
res << "ArrayJoining(" << children.back()->getID() << ", " << array_column << ", " << array_column_name << ")";
return res.str();
}
2012-10-20 02:10:47 +00:00
protected:
2012-08-27 05:13:14 +00:00
Block readImpl()
{
2013-05-04 05:20:07 +00:00
Block block = children.back()->read();
2012-08-27 05:13:14 +00:00
if (!block)
return block;
if (-1 == array_column)
array_column = block.getPositionByName(array_column_name);
2012-09-19 19:30:18 +00:00
ColumnPtr array = block.getByPosition(array_column).column;
2012-08-27 05:13:14 +00:00
if (array->isConst())
array = dynamic_cast<const IColumnConst &>(*array).convertToFullColumn();
size_t columns = block.columns();
for (size_t i = 0; i < columns; ++i)
{
ColumnWithNameAndType & current = block.getByPosition(i);
2012-10-20 02:10:47 +00:00
2012-08-27 05:13:14 +00:00
if (static_cast<ssize_t>(i) == array_column)
{
ColumnWithNameAndType result;
2013-05-29 18:37:28 +00:00
result.column = dynamic_cast<const ColumnArray &>(*array).getDataPtr();
2012-08-27 05:13:14 +00:00
result.type = dynamic_cast<const DataTypeArray &>(*current.type).getNestedType();
result.name = "arrayJoin(" + current.name + ")";
block.erase(i);
2012-09-19 19:30:18 +00:00
block.insert(i, result);
2012-08-27 05:13:14 +00:00
}
else
2013-05-03 05:23:14 +00:00
current.column = current.column->replicate(dynamic_cast<const ColumnArray &>(*array).getOffsets());
2012-08-27 05:13:14 +00:00
}
2013-06-01 07:43:57 +00:00
/// TODO: баг - если все массивы в данном блоке пустые, то вернётся пустой блок, и это будет ошибочно воспринято, как конец потока.
2012-08-27 05:13:14 +00:00
return block;
}
private:
ssize_t array_column;
String array_column_name;
};
}