dbms: development [#CONV-2944].

This commit is contained in:
Alexey Milovidov 2012-08-27 05:13:14 +00:00
parent 312b007e6f
commit 44aa02f806
19 changed files with 403 additions and 22 deletions

View File

@ -24,8 +24,6 @@ class ColumnArray : public IColumn
{
public:
/** По индексу i находится смещение до начала i + 1 -го элемента. */
typedef UInt32 Offset_t;
typedef std::vector<Offset_t> Offsets_t;
typedef ColumnVector<Offset_t> ColumnOffsets_t;
/** Создать пустой столбец массивов, с типом значений, как в столбце nested_column */
@ -136,6 +134,11 @@ public:
tmp.swap(getOffsets());
}
void replicate(const Offsets_t & offsets)
{
throw Exception("Replication of column Array is not implemented.", ErrorCodes::NOT_IMPLEMENTED);
}
void permute(const Permutation & perm)
{
size_t size = getOffsets().size();

View File

@ -51,6 +51,9 @@ public:
void filter(const Filter & filt)
{
if (s != filt.size())
throw Exception("Size of filter doesn't match size of column.", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
size_t new_size = 0;
for (Filter::const_iterator it = filt.begin(); it != filt.end(); ++it)
if (*it)
@ -58,6 +61,14 @@ public:
s = new_size;
}
void replicate(const Offsets_t & offsets)
{
if (s != offsets.size())
throw Exception("Size of offsets doesn't match size of column.", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
s = offsets.back();
}
size_t byteSize() const { return sizeof(data) + sizeof(s); }
void permute(const Permutation & perm)

View File

@ -96,6 +96,11 @@ public:
data->filter(nested_filt);
}
void replicate(const Offsets_t & offsets)
{
throw Exception("Replication of column FixedArray is not implemented.", ErrorCodes::NOT_IMPLEMENTED);
}
void permute(const Permutation & perm)
{
size_t size = this->size();

View File

@ -77,6 +77,29 @@ public:
std::sort(res.begin(), res.end(), less(*this));
return res;
}
void replicate(const Offsets_t & offsets)
{
size_t col_size = size();
if (col_size != offsets.size())
throw Exception("Size of offsets doesn't match size of column.", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
ColumnUInt8::Container_t tmp;
tmp.reserve(n * offsets.back());
Offset_t prev_offset = 0;
for (size_t i = 0; i < col_size; ++i)
{
size_t size_to_replicate = offsets[i] - prev_offset;
prev_offset = offsets[i];
for (size_t j = 0; j < size_to_replicate; ++j)
for (size_t k = 0; k < n; ++k)
tmp.push_back(char_data[i * n + k]);
}
tmp.swap(char_data);
}
};

View File

@ -50,6 +50,12 @@ public:
return res;
}
void replicate(const Offsets_t & offsets)
{
if (s != offsets.size())
throw Exception("Size of offsets doesn't match size of column.", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
}
SetPtr & getData() { return data; }
const SetPtr & getData() const { return data; }

View File

@ -91,6 +91,43 @@ public:
std::sort(res.begin(), res.end(), less(*this));
return res;
}
void replicate(const Offsets_t & replicate_offsets)
{
size_t col_size = size();
if (col_size != replicate_offsets.size())
throw Exception("Size of offsets doesn't match size of column.", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
ColumnUInt8::Container_t tmp_chars;
Offsets_t tmp_offsets;
tmp_chars.reserve(char_data.size() / col_size * replicate_offsets.back());
tmp_offsets.reserve(replicate_offsets.back());
Offset_t prev_replicate_offset = 0;
Offset_t prev_string_offset = 0;
Offset_t current_new_offset = 0;
for (size_t i = 0; i < col_size; ++i)
{
size_t size_to_replicate = replicate_offsets[i] - prev_replicate_offset;
size_t string_size = getOffsets()[i] - prev_string_offset;
for (size_t j = 0; j < size_to_replicate; ++j)
{
current_new_offset += string_size;
tmp_offsets.push_back(current_new_offset);
for (size_t k = 0; k < string_size; ++k)
tmp_chars.push_back(char_data[prev_string_offset + k]);
}
prev_replicate_offset = replicate_offsets[i];
prev_string_offset = getOffsets()[i];
}
tmp_chars.swap(char_data);
tmp_offsets.swap(getOffsets());
}
};

View File

@ -69,12 +69,13 @@ public:
}
/** В следующих трёх функциях ничего не делаем, так как столбцы - элементы tuple обычно содержатся в блоке вместе с tuple,
/** В следующих функциях ничего не делаем, так как столбцы - элементы tuple обычно содержатся в блоке вместе с tuple,
* и соответствующие операции применяются к ним также. То есть, операции будут применены к tuple автоматически.
*/
void cut(size_t start, size_t length) {}
void filter(const Filter & filt) {}
void permute(const Permutation & perm) {}
void replicate(const Offsets_t & offsets) {}
int compareAt(size_t n, size_t m, const IColumn & rhs) const
{

View File

@ -145,6 +145,28 @@ public:
return res;
}
void replicate(const Offsets_t & offsets)
{
size_t size = data.size();
if (size != offsets.size())
throw Exception("Size of offsets doesn't match size of column.", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
Container_t tmp;
tmp.reserve(offsets.back());
Offset_t prev_offset = 0;
for (size_t i = 0; i < size; ++i)
{
size_t size_to_replicate = offsets[i] - prev_offset;
prev_offset = offsets[i];
for (size_t j = 0; j < size_to_replicate; ++j)
tmp.push_back(data[i]);
}
tmp.swap(data);
}
/** Более эффективные методы манипуляции */
Container_t & getData()
{

View File

@ -97,6 +97,13 @@ public:
*/
virtual Permutation getPermutation() const = 0;
/** Размножить все значения столько раз, сколько прописано в offsets.
* (i-е значение размножается в offsets[i] - offsets[i - 1] значений.)
*/
typedef UInt32 Offset_t;
typedef std::vector<Offset_t> Offsets_t;
virtual void replicate(const Offsets_t & offsets) = 0;
/** Очистить */
virtual void clear() = 0;

View File

@ -133,6 +133,7 @@ namespace ErrorCodes
CANNOT_GET_RETURN_TYPE,
ILLEGAL_INDEX,
TOO_LARGE_ARRAY_SIZE,
FUNCTION_IS_SPECIAL,
POCO_EXCEPTION = 1000,
STD_EXCEPTION,

View File

@ -0,0 +1,98 @@
#pragma once
#include <Poco/SharedPtr.h>
#include <DB/Columns/ColumnConst.h>
#include <DB/Columns/ColumnArray.h>
#include <DB/DataTypes/DataTypeArray.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
namespace DB
{
/** Реализует операцию ARRAY JOIN.
* (Для удобства, эта операция записывается, как функция arrayJoin, применённая к массиву.)
* Эта операция размножает все строки столько раз, сколько элементов в массиве.
* Результат функции arrayJoin - столбец единичных значений соответствующих элементов.
*
* Например,
*
* name arr
* ------ ------
* 'вася' [1, 2]
* 'петя' []
*
* преобразуется в
*
* name arrayJoin(arr)
* ------ --------------
* 'вася' 1
* 'вася' 2
*/
class ArrayJoiningBlockInputStream : public IProfilingBlockInputStream
{
public:
ArrayJoiningBlockInputStream(BlockInputStreamPtr input_, ssize_t array_column_)
: input(input_), array_column(array_column_)
{
children.push_back(input);
}
ArrayJoiningBlockInputStream(BlockInputStreamPtr input_, const String & array_column_name_)
: input(input_), array_column(-1), array_column_name(array_column_name_)
{
children.push_back(input);
}
Block readImpl()
{
Block block = input->read();
if (!block)
return block;
if (-1 == array_column)
array_column = block.getPositionByName(array_column_name);
ColumnPtr & array = block.getByPosition(array_column).column;
if (array->isConst())
array = dynamic_cast<const IColumnConst &>(*array).convertToFullColumn();
size_t columns = block.columns();
for (size_t i = 0; i < columns; ++i)
{
ColumnWithNameAndType & current = block.getByPosition(i);
if (static_cast<ssize_t>(i) == array_column)
{
ColumnWithNameAndType result;
result.column = dynamic_cast<const ColumnArray &>(*current.column).getDataPtr();
result.type = dynamic_cast<const DataTypeArray &>(*current.type).getNestedType();
result.name = "arrayJoin(" + current.name + ")";
block.erase(i);
block.insert(result);
}
else
current.column->replicate(dynamic_cast<const ColumnArray &>(*array).getOffsets());
}
return block;
}
String getName() const { return "ArrayJoiningBlockInputStream"; }
BlockInputStreamPtr clone() { return new ArrayJoiningBlockInputStream(input, array_column); }
private:
BlockInputStreamPtr input;
ssize_t array_column;
String array_column_name;
};
}

View File

@ -43,18 +43,26 @@ public:
void deserializeTextQuoted(Field & field, ReadBuffer & istr) const;
/** Потоковая сериализация массивов устроена по-особенному:
* - запиваются/читаются элементы, уложенные подряд, без смещений;
* - смещения записываются/читаются в отдельный столбец,
* и о записи/чтении смещений должна позаботиться вызывающая сторона.
* Это нужно, так как при реализации вложенных структур, несколько массивов могут иметь общие смещения.
* - запиваются/читаются элементы, уложенные подряд, без размеров массивов;
* - размеры записываются/читаются в отдельный столбец,
* и о записи/чтении размеров должна позаботиться вызывающая сторона.
* Это нужно, так как при реализации вложенных структур, несколько массивов могут иметь общие размеры.
*/
/** Записать только значения, без смещений. Вызывающая сторона также должна куда-нибудь записать смещения. */
/** Записать только значения, без размеров. Вызывающая сторона также должна куда-нибудь записать смещения. */
void serializeBinary(const IColumn & column, WriteBuffer & ostr, WriteCallback callback = WriteCallback()) const;
/** Прочитать только значения, без смещений. При этом, в column уже заранее должны быть считаны все смещения. */
/** Прочитать только значения, без размеров.
* При этом, в column уже заранее должны быть считаны все размеры.
*/
void deserializeBinary(IColumn & column, ReadBuffer & istr, size_t limit) const;
/** Записать размеры. */
void serializeOffsets(const IColumn & column, WriteBuffer & ostr, WriteCallback callback = WriteCallback()) const;
/** Прочитать размеры. Вызывайте этот метод перед чтением значений. */
void deserializeOffsets(IColumn & column, ReadBuffer & istr, size_t limit) const;
ColumnPtr createColumn() const;
ColumnPtr createConstColumn(size_t size, const Field & field) const;

View File

@ -38,6 +38,9 @@ namespace DB
*
* tuple(x, y, ...) - функция, позволяющая сгруппировать несколько столбцов
* tupleElement(tuple, n) - функция, позволяющая достать столбец из tuple.
*
* arrayJoin(arr) - особая функция - выполнить её напрямую нельзя;
* используется только чтобы получить тип результата соответствующего выражения.
*/
@ -561,4 +564,34 @@ public:
}
};
class FunctionArrayJoin : public IFunction
{
public:
/// Получить имя функции.
String getName() const
{
return "arrayJoin";
}
/// Получить тип результата по типам аргументов. Если функция неприменима для данных аргументов - кинуть исключение.
DataTypePtr getReturnType(const DataTypes & arguments) const
{
if (arguments.size() != 1)
throw Exception("Function arrayJoin requires exactly one argument.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
const DataTypeArray * arr = dynamic_cast<const DataTypeArray *>(&*arguments[0]);
if (!arr)
throw Exception("Argument for function arrayJoin must be Array.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return arr->getNestedType()->clone();
}
/// Выполнить функцию над блоком.
void execute(Block & block, const ColumnNumbers & arguments, size_t result)
{
throw Exception("Function arrayJoin must not be executed directly.", ErrorCodes::FUNCTION_IS_SPECIAL);
}
};
}

View File

@ -75,7 +75,16 @@ public:
/** Пометить то, что должно быть вычислено до агрегирования одним part_id,
* а то, что должно быть вычислено после агрегирования, а также сами агрегатные функции - другим part_id.
*/
void markBeforeAndAfterAggregation(unsigned before_part_id, unsigned after_part_id);
void markBeforeAggregation(unsigned before_part_id);
/** Получить информацию об операции arrayJoin, если она есть.
* Если есть - в column_name будет записано имя столбца, находящегося внутри arrayJoin.
*/
bool getArrayJoinInfo(String & column_name);
/** Пометить то, что должно быть вычислено до применения операции arrayJoin.
*/
void markBeforeArrayJoin(unsigned part_id);
private:
ASTPtr ast;
@ -125,12 +134,16 @@ private:
bool hasAggregatesImpl(ASTPtr ast);
void markBeforeAndAfterAggregationImpl(ASTPtr ast, unsigned before_part_id, unsigned after_part_id, bool below = false);
void markBeforeAggregationImpl(ASTPtr ast, unsigned before_part_id, bool below = false);
void makeSetsImpl(ASTPtr ast);
void resolveScalarSubqueriesImpl(ASTPtr & ast);
bool getArrayJoinInfoImpl(ASTPtr ast, String & column_name);
void markBeforeArrayJoinImpl(ASTPtr ast, unsigned part_id, bool below = false);
/// Получить тип у функции, идентификатора или литерала.
DataTypePtr getType(ASTPtr ast);
};

View File

@ -59,8 +59,8 @@ private:
PART_GROUP = 8,
PART_HAVING = 16,
PART_ORDER = 32,
PART_BEFORE_AGGREGATING = 64, /// Под агрегатной функцией, или в ветке, не содержащей агрегатных функций
PART_AFTER_AGGREGATING = 128,
PART_BEFORE_AGGREGATING = 64, /// Под агрегатной функцией
PART_BEFORE_ARRAY_JOIN = 128, /// Под функцией arrayJoin
};
@ -69,6 +69,7 @@ private:
/// Вынимает данные из таблицы. Возвращает стадию, до которой запрос был обработан в Storage.
QueryProcessingStage::Enum executeFetchColumns(BlockInputStreams & streams, ExpressionPtr & expression);
void executeArrayJoin( BlockInputStreams & streams, ExpressionPtr & expression);
void executeWhere( BlockInputStreams & streams, ExpressionPtr & expression);
void executeAggregation( BlockInputStreams & streams, ExpressionPtr & expression);
void executeMergeAggregated( BlockInputStreams & streams, ExpressionPtr & expression);

View File

@ -13,6 +13,7 @@
namespace DB
{
DataTypeArray::DataTypeArray(DataTypePtr nested_) : nested(nested_)
{
offsets = new DataTypeFromFieldType<ColumnArray::Offset_t>::Type;
@ -54,7 +55,52 @@ void DataTypeArray::deserializeBinary(IColumn & column, ReadBuffer & istr, size_
ColumnArray & column_array = dynamic_cast<ColumnArray &>(column);
ColumnArray::Offsets_t & offsets = column_array.getOffsets();
nested->deserializeBinary(column_array.getData(), istr, limit == 0 ? 0 : offsets[limit - 1]);
/// Должно быть считано согласнованное с offsets количество значений.
size_t nested_limit = offsets.empty() ? 0 : offsets.back();
nested->deserializeBinary(column_array.getData(), istr, nested_limit);
if (column_array.getData().size() != nested_limit)
throw Exception("Cannot read all array values", ErrorCodes::CANNOT_READ_ALL_DATA);
}
void DataTypeArray::serializeOffsets(const IColumn & column, WriteBuffer & ostr, WriteCallback callback) const
{
const ColumnArray & column_array = dynamic_cast<const ColumnArray &>(column);
const ColumnArray::Offsets_t & offsets = column_array.getOffsets();
size_t size = offsets.size();
size_t next_callback_point = callback ? callback() : 0;
writeIntBinary(offsets[0], ostr);
for (size_t i = 1; i < size; ++i)
{
if (next_callback_point && i == next_callback_point)
next_callback_point = callback();
writeIntBinary(offsets[i] - offsets[i - 1], ostr);
}
}
void DataTypeArray::deserializeOffsets(IColumn & column, ReadBuffer & istr, size_t limit) const
{
ColumnArray & column_array = dynamic_cast<ColumnArray &>(column);
ColumnArray::Offsets_t & offsets = column_array.getOffsets();
offsets.resize(limit);
size_t i = 0;
ColumnArray::Offset_t current_offset = 0;
while (i < limit && !istr.eof())
{
ColumnArray::Offset_t current_size = 0;
readIntBinary(current_size, istr);
current_offset += current_size;
offsets[i] = current_offset;
++i;
}
offsets.resize(i);
}

View File

@ -116,6 +116,7 @@ namespace FunctionsLibrary
("blockSize", new FunctionBlockSize)
("materialize", new FunctionMaterialize)
("ignore", new FunctionIgnore)
("arrayJoin", new FunctionArrayJoin)
("tuple", new FunctionTuple)
("tupleElement", new FunctionTupleElement)

View File

@ -472,7 +472,7 @@ bool Expression::hasAggregates()
}
void Expression::markBeforeAndAfterAggregationImpl(ASTPtr ast, unsigned before_part_id, unsigned after_part_id, bool below)
void Expression::markBeforeAggregationImpl(ASTPtr ast, unsigned before_part_id, bool below)
{
if (ASTFunction * func = dynamic_cast<ASTFunction *>(&*ast))
if (func->aggregate_function)
@ -480,18 +480,62 @@ void Expression::markBeforeAndAfterAggregationImpl(ASTPtr ast, unsigned before_p
if (below)
ast->part_id |= before_part_id;
else
ast->part_id |= after_part_id;
for (ASTs::iterator it = ast->children.begin(); it != ast->children.end(); ++it)
if (!dynamic_cast<ASTSelectQuery *>(&**it))
markBeforeAndAfterAggregationImpl(*it, before_part_id, after_part_id, below);
markBeforeAggregationImpl(*it, before_part_id, below);
}
void Expression::markBeforeAndAfterAggregation(unsigned before_part_id, unsigned after_part_id)
void Expression::markBeforeAggregation(unsigned before_part_id)
{
markBeforeAndAfterAggregationImpl(ast, before_part_id, after_part_id);
markBeforeAggregationImpl(ast, before_part_id);
}
bool Expression::getArrayJoinInfoImpl(ASTPtr ast, String & column_name)
{
if (ASTFunction * func = dynamic_cast<ASTFunction *>(&*ast))
{
if (func->name == "arrayJoin")
{
column_name = func->arguments->children.at(0)->getColumnName();
return true;
}
}
for (ASTs::iterator it = ast->children.begin(); it != ast->children.end(); ++it)
if (!dynamic_cast<ASTSelectQuery *>(&**it) && getArrayJoinInfoImpl(*it, column_name))
return true;
return false;
}
void Expression::markBeforeArrayJoinImpl(ASTPtr ast, unsigned part_id, bool below)
{
if (below)
ast->part_id |= part_id;
if (ASTFunction * func = dynamic_cast<ASTFunction *>(&*ast))
if (func->name == "arrayJoin")
below = true;
for (ASTs::iterator it = ast->children.begin(); it != ast->children.end(); ++it)
if (!dynamic_cast<ASTSelectQuery *>(&**it))
markBeforeArrayJoinImpl(*it, part_id, below);
}
bool Expression::getArrayJoinInfo(String & column_name)
{
return getArrayJoinInfoImpl(ast, column_name);
}
void Expression::markBeforeArrayJoin(unsigned part_id)
{
markBeforeArrayJoinImpl(ast, part_id);
}

View File

@ -10,6 +10,7 @@
#include <DB/DataStreams/AsynchronousBlockInputStream.h>
#include <DB/DataStreams/UnionBlockInputStream.h>
#include <DB/DataStreams/ParallelAggregatingBlockInputStream.h>
#include <DB/DataStreams/ArrayJoiningBlockInputStream.h>
#include <DB/DataStreams/NullBlockInputStream.h>
#include <DB/DataStreams/narrowBlockInputStreams.h>
#include <DB/DataStreams/copyData.h>
@ -152,6 +153,7 @@ BlockInputStreamPtr InterpreterSelectQuery::execute()
if (from_stage < QueryProcessingStage::WithMergeableState)
{
executeArrayJoin(streams, expression);
executeWhere(streams, expression);
if (need_aggregate)
@ -278,9 +280,28 @@ void InterpreterSelectQuery::executeWhere(BlockInputStreams & streams, Expressio
}
void InterpreterSelectQuery::executeArrayJoin(BlockInputStreams & streams, ExpressionPtr & expression)
{
/// Если есть ARRAY JOIN - сначала выполним часть выражения, необходимую для его вычисления
String array_join_column_name;
if (expression->getArrayJoinInfo(array_join_column_name))
{
expression->markBeforeArrayJoin(PART_BEFORE_ARRAY_JOIN);
bool is_async = settings.asynchronous && streams.size() <= settings.max_threads;
for (BlockInputStreams::iterator it = streams.begin(); it != streams.end(); ++it)
{
BlockInputStreamPtr & stream = *it;
stream = maybeAsynchronous(new ExpressionBlockInputStream(stream, expression, PART_BEFORE_ARRAY_JOIN), is_async);
stream = maybeAsynchronous(new ArrayJoiningBlockInputStream(stream, array_join_column_name), is_async);
}
}
}
void InterpreterSelectQuery::executeAggregation(BlockInputStreams & streams, ExpressionPtr & expression)
{
expression->markBeforeAndAfterAggregation(PART_BEFORE_AGGREGATING, PART_AFTER_AGGREGATING);
expression->markBeforeAggregation(PART_BEFORE_AGGREGATING);
if (query.group_expression_list)
setPartID(query.group_expression_list, PART_GROUP);