dbms: development [#CONV-2944].

This commit is contained in:
Alexey Milovidov 2012-08-13 20:16:06 +00:00
parent 598614b624
commit 0d5710f04a
8 changed files with 91 additions and 5 deletions

View File

@ -120,6 +120,7 @@ namespace ErrorCodes
THERE_IS_NO_SESSION,
CANNOT_CLOCK_GETTIME,
UNKNOWN_SETTING,
THERE_IS_NO_DEFAULT_VALUE,
POCO_EXCEPTION = 1000,
STD_EXCEPTION,

View File

@ -0,0 +1,60 @@
#pragma once
#include <Poco/SharedPtr.h>
#include <DB/Interpreters/Expression.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/Columns/ColumnConst.h>
namespace DB
{
/** Добавляет в блок недостающие столбцы со значениями по-умолчанию.
* Эти столбцы - материалированные (не константы).
*/
class AddingDefaultBlockInputStream : public IProfilingBlockInputStream
{
public:
AddingDefaultBlockInputStream(
BlockInputStreamPtr input_,
NamesAndTypesListPtr required_columns_)
: input(input_), required_columns(required_columns_)
{
children.push_back(input);
}
Block readImpl()
{
Block res = input->read();
if (!res)
return res;
for (NamesAndTypesList::const_iterator it = required_columns->begin(); it != required_columns->end(); ++it)
{
if (!res.has(it->first))
{
ColumnWithNameAndType col;
col.name = it->first;
col.type = it->second;
col.column = dynamic_cast<IColumnConst &>(*it->second->createConstColumn(
res.rows(), it->second->getDefault())).convertToFullColumn();
res.insert(col);
}
}
return res;
}
String getName() const { return "AddingDefaultBlockInputStream"; }
BlockInputStreamPtr clone() { return new AddingDefaultBlockInputStream(input, required_columns); }
private:
BlockInputStreamPtr input;
NamesAndTypesListPtr required_columns;
};
}

View File

@ -38,6 +38,11 @@ public:
ColumnPtr createColumn() const;
ColumnPtr createConstColumn(size_t size, const Field & field) const;
Field getDefault() const
{
throw Exception("There is no default value for AggregateFunction data type", ErrorCodes::THERE_IS_NO_DEFAULT_VALUE);
}
};

View File

@ -48,6 +48,11 @@ public:
ColumnPtr createColumn() const;
ColumnPtr createConstColumn(size_t size, const Field & field) const;
Field getDefault() const
{
return String("");
}
};
}

View File

@ -1,5 +1,4 @@
#ifndef DBMS_DATA_TYPES_DATATYPE_STRING_H
#define DBMS_DATA_TYPES_DATATYPE_STRING_H
#pragma once
#include <ostream>
@ -43,8 +42,11 @@ public:
ColumnPtr createColumn() const;
ColumnPtr createConstColumn(size_t size, const Field & field) const;
Field getDefault() const
{
return String("");
}
};
}
#endif

View File

@ -76,6 +76,10 @@ public:
*/
virtual SharedPtr<IColumn> createConstColumn(size_t size, const Field & field) const = 0;
/** Получить значение "по-умолчанию".
*/
virtual Field getDefault() const = 0;
/// Вернуть приблизительный (оценочный) размер значения.
virtual size_t getSizeOfField() const
{

View File

@ -54,6 +54,11 @@ public:
}
size_t getSizeOfField() const { return sizeof(FieldType); }
Field getDefault() const
{
return typename NearestFieldType<FieldType>::Type();
}
};
}

View File

@ -22,6 +22,7 @@
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/DataStreams/MergingSortedBlockInputStream.h>
#include <DB/DataStreams/ExpressionBlockInputStream.h>
#include <DB/DataStreams/AddingDefaultBlockInputStream.h>
#include <DB/DataStreams/narrowBlockInputStreams.h>
#include <DB/DataStreams/copyData.h>
@ -1246,6 +1247,7 @@ void StorageMergeTree::mergeParts(DataPartPtr left, DataPartPtr right)
/** Читаем из левого и правого куска, сливаем и пишем в новый.
* Попутно вычисляем выражение для сортировки.
* Также, если в одном из кусков есть не все столбцы, то добавляем столбцы с значениями по-умолчанию.
*/
BlockInputStreams src_streams;
@ -1258,7 +1260,9 @@ void StorageMergeTree::mergeParts(DataPartPtr left, DataPartPtr right)
src_streams.push_back(new ExpressionBlockInputStream(new MergeTreeBlockInputStream(
full_path + right->name + '/', DEFAULT_BLOCK_SIZE, all_column_names, *this, right, empty_prefix, empty_range), primary_expr));
BlockInputStreamPtr merged_stream = new MergingSortedBlockInputStream(src_streams, sort_descr, DEFAULT_BLOCK_SIZE);
BlockInputStreamPtr merged_stream = new AddingDefaultBlockInputStream(
new MergingSortedBlockInputStream(src_streams, sort_descr, DEFAULT_BLOCK_SIZE), columns);
BlockOutputStreamPtr to = new MergedBlockOutputStream(*this,
left->left_date, right->right_date, left->left, right->right, 1 + std::max(left->level, right->level));