added nested arrays offsets check [#CONV-7967]

This commit is contained in:
Vyacheslav Alipov 2013-08-02 14:26:04 +00:00
parent e2e6032020
commit adf9e506f4
4 changed files with 69 additions and 25 deletions

View File

@ -89,6 +89,13 @@ public:
/** Получить такой же блок, но пустой. */
Block cloneEmpty() const;
/** Заменяет столбцы смещений внутри вложенных таблиц на один общий для таблицы.
* Кидает исключение, если эти смещения вдруг оказались неодинаковы.
*/
void optimizeNestedArraysOffsets();
/** Тоже самое, только без замены смещений. */
void checkNestedArraysOffsets() const;
};
typedef std::vector<Block> Blocks;

View File

@ -243,6 +243,7 @@ void Connection::sendData(const Block & block)
}
writeVarUInt(Protocol::Client::Data, *out);
block.checkNestedArraysOffsets();
block_out->write(block);
maybe_compressed_out->next();
out->next();

View File

@ -5,6 +5,9 @@
#include <DB/Core/Block.h>
#include <DB/Columns/ColumnArray.h>
#include <DB/DataTypes/DataTypeNested.h>
namespace DB
{
@ -251,6 +254,63 @@ NamesAndTypesList Block::getColumnsList() const
}
void Block::checkNestedArraysOffsets() const
{
/// Указатели на столбцы-массивы, для проверки равенства столбцов смещений во вложенных структурах данных
typedef std::map<String, const ColumnArray *> ArrayColumns;
ArrayColumns array_columns;
for (Container_t::const_iterator it = data.begin(); it != data.end(); ++it)
{
const ColumnWithNameAndType & column = *it;
if (const ColumnArray * column_array = dynamic_cast<const ColumnArray *>(&*column.column))
{
String name = DataTypeNested::extractNestedTableName(column.name);
ArrayColumns::const_iterator it = array_columns.find(name);
if (array_columns.end() == it)
array_columns[name] = column_array;
else
{
if (!it->second->hasEqualOffsets(*column_array))
throw Exception("Sizes of nested arrays do not match", ErrorCodes::SIZES_OF_ARRAYS_DOESNT_MATCH);
}
}
}
}
void Block::optimizeNestedArraysOffsets()
{
/// Указатели на столбцы-массивы, для проверки равенства столбцов смещений во вложенных структурах данных
typedef std::map<String, ColumnArray *> ArrayColumns;
ArrayColumns array_columns;
for (Container_t::iterator it = data.begin(); it != data.end(); ++it)
{
ColumnWithNameAndType & column = *it;
if (ColumnArray * column_array = dynamic_cast<ColumnArray *>(&*column.column))
{
String name = DataTypeNested::extractNestedTableName(column.name);
ArrayColumns::const_iterator it = array_columns.find(name);
if (array_columns.end() == it)
array_columns[name] = column_array;
else
{
if (!it->second->hasEqualOffsets(*column_array))
throw Exception("Sizes of nested arrays do not match", ErrorCodes::SIZES_OF_ARRAYS_DOESNT_MATCH);
/// делаем так, чтобы столбцы смещений массивов внутри одной вложенной таблицы указывали в одно место
column_array->getOffsetsColumn() = it->second->getOffsetsColumn();
}
}
}
}
bool blocksHaveEqualStructure(const Block & lhs, const Block & rhs)
{
size_t columns = lhs.columns();

View File

@ -53,32 +53,8 @@ Block BlockInputStreamFromRowInputStream::readImpl()
throw DB::Exception(e.message() + " (at row " + toString(total_rows + 1) + ")", e, e.code());
}
/// Указатели на столбцы-массивы, для проверки равенства столбцов смещений во вложенных структурах данных
typedef std::map<String, ColumnArray *> ArrayColumns;
ArrayColumns array_columns;
res.optimizeNestedArraysOffsets();
for (size_t i = 0; i < res.columns(); ++i)
{
ColumnWithNameAndType & column = res.getByPosition(i);
if (ColumnArray * column_array = dynamic_cast<ColumnArray *>(&*column.column))
{
String name = DataTypeNested::extractNestedTableName(column.name);
ArrayColumns::const_iterator it = array_columns.find(name);
if (array_columns.end() == it)
array_columns[name] = column_array;
else
{
if (!it->second->hasEqualOffsets(*column_array))
throw Exception("Sizes of nested arrays do not match", ErrorCodes::SIZES_OF_ARRAYS_DOESNT_MATCH);
/// делаем так, чтобы столбцы смещений массивов внутри одной вложенной таблицы указывали в одно место
column_array->getOffsetsColumn() = it->second->getOffsetsColumn();
}
}
}
return res;
}