dbms: removed dead code; healing corrupted nested columns [#METR-15896].

This commit is contained in:
Alexey Milovidov 2015-04-09 03:37:08 +03:00
parent 079615d43d
commit 543c4c92cb
9 changed files with 90 additions and 258 deletions

View File

@ -1139,7 +1139,7 @@ private:
template <typename T>
bool executeNumber(const ColumnArray * array, ColumnVector<UInt32>::Container_t & res_values)
{
const ColumnVector<T> * nested = typeid_cast<const ColumnVector<T> *>(&*array->getDataPtr());
const ColumnVector<T> * nested = typeid_cast<const ColumnVector<T> *>(&array->getData());
if (!nested)
return false;
const ColumnArray::Offsets_t & offsets = array->getOffsets();
@ -1165,7 +1165,7 @@ private:
bool executeString(const ColumnArray * array, ColumnVector<UInt32>::Container_t & res_values)
{
const ColumnString * nested = typeid_cast<const ColumnString *>(&*array->getDataPtr());
const ColumnString * nested = typeid_cast<const ColumnString *>(&array->getData());
if (!nested)
return false;
const ColumnArray::Offsets_t & offsets = array->getOffsets();

View File

@ -146,6 +146,11 @@ public:
}
}
/** Добавить столбец минимального размера.
* Используется в случае, когда ни один столбец не нужен, но нужно хотя бы знать количество строк.
* Добавляет в columns.
*/
void addMinimumSizeColumn()
{
const auto get_column_size = [this] (const String & name) {
@ -184,7 +189,7 @@ public:
addStream(minimum_size_column->name, *minimum_size_column->type, all_mark_ranges);
columns.emplace(std::begin(columns), *minimum_size_column);
added_column = &columns.front();
added_minimum_size_column = &columns.front();
}
@ -323,11 +328,15 @@ private:
const MergeTreeData::DataPartPtr & data_part;
String part_name;
FileStreams streams;
/// Запрашиваемые столбцы. Возможно, с добавлением minimum_size_column.
NamesAndTypesList columns;
const NameAndTypePair * added_minimum_size_column = nullptr;
bool use_uncompressed_cache;
MergeTreeData & storage;
const MarkRanges & all_mark_ranges;
const NameAndTypePair * added_column = nullptr;
void addStream(const String & name, const IDataType & type, const MarkRanges & all_mark_ranges, size_t level = 0)
{
@ -360,8 +369,9 @@ private:
streams[name].reset(new Stream(path + escaped_column_name, uncompressed_cache, mark_cache, all_mark_ranges));
}
void readData(const String & name, const IDataType & type, IColumn & column, size_t from_mark, size_t max_rows_to_read,
size_t level = 0, bool read_offsets = true)
size_t level = 0, bool read_offsets = true)
{
/// Для массивов требуется сначала десериализовать размеры, а потом значения.
if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
@ -379,38 +389,36 @@ private:
if (column.size())
{
ColumnArray & array = typeid_cast<ColumnArray &>(column);
readData(
name,
*type_arr->getNestedType(),
array.getData(),
from_mark,
array.getOffsets()[column.size() - 1] - array.getData().size(),
level + 1);
}
}
else if (const DataTypeNested * type_nested = typeid_cast<const DataTypeNested *>(&type))
{
Stream & stream = *streams[name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level)];
stream.seekToMark(from_mark);
type_nested->deserializeOffsets(
column,
*stream.data_buffer,
max_rows_to_read);
const size_t required_internal_size = array.getOffsets()[column.size() - 1];
if (column.size())
{
ColumnNested & column_nested = typeid_cast<ColumnNested &>(column);
NamesAndTypesList::const_iterator it = type_nested->getNestedTypesList()->begin();
for (size_t i = 0; i < column_nested.getData().size(); ++i, ++it)
if (required_internal_size)
{
readData(
DataTypeNested::concatenateNestedName(name, it->name),
*it->type,
*column_nested.getData()[i],
name,
*type_arr->getNestedType(),
array.getData(),
from_mark,
column_nested.getOffsets()[column.size() - 1] - column_nested.getData()[i]->size(),
required_internal_size - array.getData().size(),
level + 1);
/** Исправление для ошибочно записанных пустых файлов с данными массива.
* Такое бывает после ALTER с добавлением новых столбцов во вложенную структуру данных.
*/
size_t read_internal_size = array.getData().size();
if (required_internal_size != read_internal_size)
{
if (read_internal_size != 0)
LOG_ERROR("Internal size of array " + name + " doesn't match offsets: corrupted data, filling with default values.");
array.getDataPtr() = dynamic_cast<IColumnConst &>(
*type_arr->getNestedType()->createConstColumn(
required_internal_size,
type_arr->getNestedType()->getDefault())).convertToFullColumn();
/** NOTE Можно было бы занулять этот столбец, чтобы он не добавлялся в блок,
* а впоследствии создавался с более правильными (из определения таблицы) значениями по-умолчанию.
*/
}
}
}
}
@ -435,6 +443,7 @@ private:
}
}
void fillMissingColumnsImpl(Block & res, const Names & ordered_names, bool always_reorder)
{
try
@ -453,51 +462,56 @@ private:
if (const ColumnArray * array = typeid_cast<const ColumnArray *>(&*column.column))
{
String offsets_name = DataTypeNested::extractNestedTableName(column.name);
offset_columns[offsets_name] = array->getOffsetsColumn();
auto & offsets_column = offset_columns[offsets_name];
/// Если почему-то есть разные столбцы смещений для одной вложенной структуры, то берём непустой.
if (!offsets_column || offsets_column->empty())
offsets_column = array->getOffsetsColumn();
}
}
auto should_evaluate_defaults = false;
auto should_sort = always_reorder;
for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it)
for (const auto & requested_column : columns)
{
/// insert default values only for columns without default expressions
if (!res.has(it->name))
if (!res.has(requested_column.name))
{
should_sort = true;
if (storage.column_defaults.count(it->name) != 0)
if (storage.column_defaults.count(requested_column.name) != 0)
{
should_evaluate_defaults = true;
continue;
}
ColumnWithNameAndType column;
column.name = it->name;
column.type = it->type;
ColumnWithNameAndType column_to_add;
column_to_add.name = requested_column.name;
column_to_add.type = requested_column.type;
String offsets_name = DataTypeNested::extractNestedTableName(column.name);
String offsets_name = DataTypeNested::extractNestedTableName(column_to_add.name);
if (offset_columns.count(offsets_name))
{
ColumnPtr offsets_column = offset_columns[offsets_name];
DataTypePtr nested_type = typeid_cast<DataTypeArray &>(*column.type).getNestedType();
DataTypePtr nested_type = typeid_cast<DataTypeArray &>(*column_to_add.type).getNestedType();
size_t nested_rows = offsets_column->empty() ? 0
: typeid_cast<ColumnUInt64 &>(*offsets_column).getData().back();
ColumnPtr nested_column = dynamic_cast<IColumnConst &>(*nested_type->createConstColumn(
nested_rows, nested_type->getDefault())).convertToFullColumn();
column.column = new ColumnArray(nested_column, offsets_column);
column_to_add.column = new ColumnArray(nested_column, offsets_column);
}
else
{
/** Нужно превратить константный столбец в полноценный, так как в части блоков (из других кусков),
* он может быть полноценным (а то интерпретатор может посчитать, что он константный везде).
*/
column.column = dynamic_cast<IColumnConst &>(*column.type->createConstColumn(
res.rows(), column.type->getDefault())).convertToFullColumn();
column_to_add.column = dynamic_cast<IColumnConst &>(*column_to_add.type->createConstColumn(
res.rows(), column_to_add.type->getDefault())).convertToFullColumn();
}
res.insert(column);
res.insert(column_to_add);
}
}
@ -506,12 +520,12 @@ private:
evaluateMissingDefaults(res, columns, storage.column_defaults, storage.context);
/// remove added column to ensure same content among all blocks
if (added_column)
if (added_minimum_size_column)
{
res.erase(0);
streams.erase(added_column->name);
streams.erase(added_minimum_size_column->name);
columns.erase(std::begin(columns));
added_column = nullptr;
added_minimum_size_column = nullptr;
}
/// sort columns to ensure consistent order among all blocks

View File

@ -4,10 +4,7 @@
#include <DB/IO/VarInt.h>
#include <DB/Columns/ColumnArray.h>
#include <DB/Columns/ColumnNested.h>
#include <DB/DataTypes/DataTypeArray.h>
#include <DB/DataTypes/DataTypeNested.h>
#include <DB/DataStreams/NativeBlockInputStream.h>
@ -35,28 +32,6 @@ static void readData(const IDataType & type, IColumn & column, ReadBuffer & istr
istr,
typeid_cast<const ColumnArray &>(column).getOffsets()[rows - 1]);
}
else if (const DataTypeNested * type_nested = typeid_cast<const DataTypeNested *>(&type))
{
ColumnNested & column_nested = typeid_cast<ColumnNested &>(column);
IColumn & offsets_column = *column_nested.getOffsetsColumn();
type_nested->getOffsetsType()->deserializeBinary(offsets_column, istr, rows, 0);
if (offsets_column.size() != rows)
throw Exception("Cannot read all data in NativeBlockInputStream.", ErrorCodes::CANNOT_READ_ALL_DATA);
if (rows)
{
NamesAndTypesList::const_iterator it = type_nested->getNestedTypesList()->begin();
for (size_t i = 0; i < column_nested.getData().size(); ++i, ++it)
{
readData(
*it->type,
*column_nested.getData()[i],
istr,
column_nested.getOffsets()[rows - 1]);
}
}
}
else
type.deserializeBinary(column, istr, rows, 0); /// TODO Использовать avg_value_size_hint.

View File

@ -5,10 +5,8 @@
#include <DB/Columns/ColumnConst.h>
#include <DB/Columns/ColumnArray.h>
#include <DB/Columns/ColumnNested.h>
#include <DB/DataTypes/DataTypeArray.h>
#include <DB/DataTypes/DataTypeNested.h>
#include <DB/DataStreams/NativeBlockOutputStream.h>
@ -28,20 +26,6 @@ static void writeData(const IDataType & type, const IColumn & column, WriteBuffe
if (!typeid_cast<const ColumnArray &>(column).getData().empty())
writeData(*type_arr->getNestedType(), typeid_cast<const ColumnArray &>(column).getData(), ostr);
}
else if (const DataTypeNested * type_nested = typeid_cast<const DataTypeNested *>(&type))
{
const ColumnNested & column_nested = typeid_cast<const ColumnNested &>(column);
type_nested->getOffsetsType()->serializeBinary(*column_nested.getOffsetsColumn(), ostr);
NamesAndTypesList::const_iterator it = type_nested->getNestedTypesList()->begin();
for (size_t i = 0; i < column_nested.getData().size(); ++i, ++it)
{
if (column_nested.getData()[i]->empty())
break;
writeData(*it->type, *column_nested.getData()[i], ostr);
}
}
else
type.serializeBinary(column, ostr);
}

View File

@ -2,7 +2,6 @@
#include <sparsehash/dense_hash_map>
#include <sparsehash/dense_hash_set>
#include <DB/Storages/ITableDeclaration.h>
#include <DB/DataTypes/DataTypeNested.h>
#include <DB/Parsers/ASTIdentifier.h>
#include <DB/Parsers/ASTNameTypePair.h>
#include <DB/Interpreters/Context.h>

View File

@ -282,19 +282,6 @@ void LogBlockInputStream::addStream(const String & name, const IDataType & type,
addStream(name, *type_arr->getNestedType(), level + 1);
}
else if (const DataTypeNested * type_nested = typeid_cast<const DataTypeNested *>(&type))
{
String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
streams[size_name].reset(new Stream(
storage.files[size_name].data_file.path(),
mark_number
? storage.files[size_name].marks[mark_number].offset
: 0));
const NamesAndTypesList & columns = *type_nested->getNestedTypesList();
for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it)
addStream(DataTypeNested::concatenateNestedName(name, it->name), *it->type, level + 1);
}
else
streams[name].reset(new Stream(
storage.files[name].data_file.path(),
@ -326,29 +313,6 @@ void LogBlockInputStream::readData(const String & name, const IDataType & type,
typeid_cast<const ColumnArray &>(column).getOffsets()[column.size() - 1],
level + 1);
}
else if (const DataTypeNested * type_nested = typeid_cast<const DataTypeNested *>(&type))
{
type_nested->deserializeOffsets(
column,
streams[name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level)]->compressed,
max_rows_to_read);
if (column.size())
{
ColumnNested & column_nested = typeid_cast<ColumnNested &>(column);
NamesAndTypesList::const_iterator it = type_nested->getNestedTypesList()->begin();
for (size_t i = 0; i < column_nested.getData().size(); ++i, ++it)
{
readData(
DataTypeNested::concatenateNestedName(name, it->name),
*it->type,
*column_nested.getData()[i],
column_nested.getOffsets()[column.size() - 1],
level + 1);
}
}
}
else
type.deserializeBinary(column, streams[name]->compressed, max_rows_to_read, 0); /// TODO Использовать avg_value_size_hint.
}
@ -407,15 +371,6 @@ void LogBlockOutputStream::addStream(const String & name, const IDataType & type
addStream(name, *type_arr->getNestedType(), level + 1);
}
else if (const DataTypeNested * type_nested = typeid_cast<const DataTypeNested *>(&type))
{
String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
streams[size_name].reset(new Stream(storage.files[size_name].data_file.path(), storage.max_compress_block_size));
const NamesAndTypesList & columns = *type_nested->getNestedTypesList();
for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it)
addStream(DataTypeNested::concatenateNestedName(name, it->name), *it->type, level + 1);
}
else
streams[name].reset(new Stream(storage.files[name].data_file.path(), storage.max_compress_block_size));
}
@ -445,33 +400,6 @@ void LogBlockOutputStream::writeData(const String & name, const IDataType & type
writeData(name, *type_arr->getNestedType(), typeid_cast<const ColumnArray &>(column).getData(), out_marks, offset_columns, level + 1);
}
else if (const DataTypeNested * type_nested = typeid_cast<const DataTypeNested *>(&type))
{
String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
Mark mark;
mark.rows = (storage.files[size_name].marks.empty() ? 0 : storage.files[size_name].marks.back().rows) + column.size();
mark.offset = streams[size_name]->plain_offset + streams[size_name]->plain.count();
out_marks.push_back(std::make_pair(storage.files[size_name].column_index, mark));
type_nested->serializeOffsets(column, streams[size_name]->compressed);
streams[size_name]->compressed.next();
const ColumnNested & column_nested = typeid_cast<const ColumnNested &>(column);
NamesAndTypesList::const_iterator it = type_nested->getNestedTypesList()->begin();
for (size_t i = 0; i < column_nested.getData().size(); ++i, ++it)
{
writeData(
DataTypeNested::concatenateNestedName(name, it->name),
*it->type,
*column_nested.getData()[i],
out_marks,
offset_columns,
level + 1);
}
}
else
{
Mark mark;
@ -590,21 +518,6 @@ void StorageLog::addFile(const String & column_name, const IDataType & type, siz
addFile(column_name, *type_arr->getNestedType(), level + 1);
}
else if (const DataTypeNested * type_nested = typeid_cast<const DataTypeNested *>(&type))
{
String size_column_suffix = ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
ColumnData & column_data = files.insert(std::make_pair(column_name + size_column_suffix, ColumnData())).first->second;
column_data.column_index = column_names.size();
column_data.data_file = Poco::File(
path + escapeForFileName(name) + '/' + escapeForFileName(column_name) + size_column_suffix + DBMS_STORAGE_LOG_DATA_FILE_EXTENSION);
column_names.push_back(column_name + size_column_suffix);
const NamesAndTypesList & columns = *type_nested->getNestedTypesList();
for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it)
addFile(DataTypeNested::concatenateNestedName(name, it->name), *it->type, level + 1);
}
else
{
ColumnData & column_data = files.insert(std::make_pair(column_name, ColumnData())).first->second;
@ -701,10 +614,6 @@ const Marks & StorageLog::getMarksWithRealRowCount() const
{
file_name = DataTypeNested::extractNestedTableName(column_name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX "0";
}
else if (typeid_cast<const DataTypeNested *>(&column_type))
{
file_name = column_name + ARRAY_SIZES_COLUMN_NAME_SUFFIX "0";
}
else
{
file_name = column_name;

View File

@ -224,15 +224,6 @@ void TinyLogBlockInputStream::addStream(const String & name, const IDataType & t
addStream(name, *type_arr->getNestedType(), level + 1);
}
else if (const DataTypeNested * type_nested = typeid_cast<const DataTypeNested *>(&type))
{
String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
streams[size_name].reset(new Stream(storage.files[size_name].data_file.path()));
const NamesAndTypesList & columns = *type_nested->getNestedTypesList();
for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it)
addStream(DataTypeNested::concatenateNestedName(name, it->name), *it->type, level + 1);
}
else
streams[name].reset(new Stream(storage.files[name].data_file.path()));
}
@ -261,29 +252,6 @@ void TinyLogBlockInputStream::readData(const String & name, const IDataType & ty
throw Exception("Cannot read array data for all offsets", ErrorCodes::CANNOT_READ_ALL_DATA);
}
}
else if (const DataTypeNested * type_nested = typeid_cast<const DataTypeNested *>(&type))
{
type_nested->deserializeOffsets(
column,
streams[name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level)]->compressed,
limit);
if (column.size())
{
ColumnNested & column_nested = typeid_cast<ColumnNested &>(column);
NamesAndTypesList::const_iterator it = type_nested->getNestedTypesList()->begin();
for (size_t i = 0; i < column_nested.getData().size(); ++i, ++it)
{
readData(
DataTypeNested::concatenateNestedName(name, it->name),
*it->type,
*column_nested.getData()[i],
column_nested.getOffsets()[column.size() - 1],
level + 1);
}
}
}
else
type.deserializeBinary(column, streams[name]->compressed, limit, 0); /// TODO Использовать avg_value_size_hint.
}
@ -300,15 +268,6 @@ void TinyLogBlockOutputStream::addStream(const String & name, const IDataType &
addStream(name, *type_arr->getNestedType(), level + 1);
}
else if (const DataTypeNested * type_nested = typeid_cast<const DataTypeNested *>(&type))
{
String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
streams[size_name].reset(new Stream(storage.files[size_name].data_file.path(), storage.max_compress_block_size));
const NamesAndTypesList & columns = *type_nested->getNestedTypesList();
for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it)
addStream(DataTypeNested::concatenateNestedName(name, it->name), *it->type, level + 1);
}
else
streams[name].reset(new Stream(storage.files[name].data_file.path(), storage.max_compress_block_size));
}
@ -332,25 +291,6 @@ void TinyLogBlockOutputStream::writeData(const String & name, const IDataType &
writeData(name, *type_arr->getNestedType(), typeid_cast<const ColumnArray &>(column).getData(), offset_columns, level + 1);
}
else if (const DataTypeNested * type_nested = typeid_cast<const DataTypeNested *>(&type))
{
String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
type_nested->serializeOffsets(column, streams[size_name]->compressed);
const ColumnNested & column_nested = typeid_cast<const ColumnNested &>(column);
NamesAndTypesList::const_iterator it = type_nested->getNestedTypesList()->begin();
for (size_t i = 0; i < column_nested.getData().size(); ++i, ++it)
{
writeData(
DataTypeNested::concatenateNestedName(name, it->name),
*it->type,
*column_nested.getData()[i],
offset_columns,
level + 1);
}
}
else
type.serializeBinary(column, streams[name]->compressed);
}
@ -460,19 +400,6 @@ void StorageTinyLog::addFile(const String & column_name, const IDataType & type,
addFile(column_name, *type_arr->getNestedType(), level + 1);
}
else if (const DataTypeNested * type_nested = typeid_cast<const DataTypeNested *>(&type))
{
String size_column_suffix = ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
ColumnData column_data;
files.insert(std::make_pair(column_name + size_column_suffix, column_data));
files[column_name + size_column_suffix].data_file = Poco::File(
path + escapeForFileName(name) + '/' + escapeForFileName(column_name) + size_column_suffix + DBMS_STORAGE_LOG_DATA_FILE_EXTENSION);
const NamesAndTypesList & columns = *type_nested->getNestedTypesList();
for (NamesAndTypesList::const_iterator it = columns.begin(); it != columns.end(); ++it)
addFile(DataTypeNested::concatenateNestedName(name, it->name), *it->type, level + 1);
}
else
{
ColumnData column_data;

View File

@ -0,0 +1,24 @@
2015-01-01 ['Hello','World']
2015-01-01 Hello
2015-01-01 World
2015-01-01 Hello
2015-01-01 ['Hello','World'] [0,0]
2015-01-01 Hello 0
2015-01-01 World 0
2015-01-01 Hello 0
2015-01-01 ['Hello','World'] [0,0]
2015-01-01 ['Hello2','World2'] [0,0]
2015-01-01 Hello 0
2015-01-01 Hello2 0
2015-01-01 World 0
2015-01-01 World2 0
2015-01-01 Hello 0
2015-01-01 Hello2 0
2015-01-01 ['Hello','World'] [0,0]
2015-01-01 ['Hello2','World2'] [0,0]
2015-01-01 Hello 0
2015-01-01 World 0
2015-01-01 Hello2 0
2015-01-01 World2 0
2015-01-01 Hello 0
2015-01-01 Hello2 0

View File

@ -14,11 +14,11 @@ SELECT * FROM test.alter;
SELECT * FROM test.alter ARRAY JOIN n;
SELECT * FROM test.alter ARRAY JOIN n WHERE n.x LIKE '%Hello%';
INSERT INTO test.alter (`n.x`, `n.y`) VALUES (['Hello2', 'World2'], [123, 456]);
INSERT INTO test.alter (`n.x`) VALUES (['Hello2', 'World2']);
SELECT * FROM test.alter;
SELECT * FROM test.alter ARRAY JOIN n;
SELECT * FROM test.alter ARRAY JOIN n WHERE n.x LIKE '%Hello%';
SELECT * FROM test.alter ORDER BY n.x;
SELECT * FROM test.alter ARRAY JOIN n ORDER BY n.x;
SELECT * FROM test.alter ARRAY JOIN n WHERE n.x LIKE '%Hello%' ORDER BY n.x;
OPTIMIZE TABLE test.alter;