My latest changes.

REVIEW:new
This commit is contained in:
Sergey Fedorov 2013-10-25 14:56:47 +00:00
parent 99041b3173
commit 8fd6963212
5 changed files with 234 additions and 10 deletions

View File

@ -0,0 +1,134 @@
#pragma once
#include <DB/IO/WriteHelpers.h>
#include <DB/IO/ReadHelpers.h>
#include <DB/AggregateFunctions/IAggregateFunction.h>
namespace DB
{
struct AggregateFunctionAgrMinTraits
{
static bool better(const Field & lhs, const Field & rhs) { return lhs < rhs; }
static String name() { return "agrMin"; }
};
struct AggregateFunctionAgrMaxTraits
{
static bool better(const Field & lhs, const Field & rhs) { return lhs > rhs; }
static String name() { return "agrMax"; }
};
struct AggregateFunctionsAgrMinMaxData
{
Field value;
Field result;
};
/// Возвращает первое попавшееся значение arg для минимального/максимального value
template <typename Traits>
class AggregateFunctionsAgrMinMax : public IAggregateFunctionHelper<AggregateFunctionsAgrMinMaxData>
{
private:
DataTypePtr typeRes;
DataTypePtr typeVal;
public:
String getName() const { return Traits::name(); }
DataTypePtr getReturnType() const
{
return typeRes;
}
void setArguments(const DataTypes & arguments)
{
if (arguments.size() != 2)
throw Exception("Aggregate function " + getName() + " requires exactly two arguments.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
typeRes = arguments[0];
typeVal = arguments[1];
}
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num) const
{
Field value, result;
columns[0]->get(row_num, result);
columns[1]->get(row_num, value);
Data & d = data(place);
if (!d.value.isNull())
{
if (Traits::better(value, d.value)) {
d.value = value;
d.result = result;
}
} else {
d.value = value;
d.result = result;
}
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs) const
{
Data & d = data(place);
const Data & d_rhs = data(rhs);
if (!d.value.isNull())
{
if (Traits::better(d_rhs.value, d.value)){
d.value = d_rhs.value;
d.result = d_rhs.result;
}
} else {
d.value = d_rhs.value;
d.result = d_rhs.result;
}
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const
{
typeVal->serializeBinary(data(place).value, buf);
typeRes->serializeBinary(data(place).result, buf);
}
void deserializeMerge(AggregateDataPtr place, ReadBuffer & buf) const
{
Data & d = data(place);
if (!d.value.isNull())
{
Field value_, result_;
typeRes->deserializeBinary(result_, buf);
typeVal->deserializeBinary(value_, buf);
if (Traits::better(value_, d.value)) {
d.value = value_;
d.result = result_;
}
} else {
typeRes->deserializeBinary(d.result, buf);
typeVal->deserializeBinary(d.value, buf);
}
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const
{
if (unlikely(data(place).value.isNull()))
to.insertDefault();
else
to.insert(data(place).result);
}
};
typedef AggregateFunctionsAgrMinMax<AggregateFunctionAgrMinTraits> AggregateFunctionAgrMin;
typedef AggregateFunctionsAgrMinMax<AggregateFunctionAgrMaxTraits> AggregateFunctionAgrMax;
}

View File

@ -0,0 +1,51 @@
#pragma once
#include <Poco/SharedPtr.h>
#include <DB/DataStreams/IBlockOutputStream.h>
#include <DB/Columns/ColumnConst.h>
namespace DB
{
/** Добавляет в блок недостающие столбцы со значениями по-умолчанию.
* Эти столбцы - материалированные (не константы).
*/
class AddingDefaultBlockOutputStream : public IBlockOutputStream
{
public:
AddingDefaultBlockOutputStream(
BlockOutputStreamPtr output_,
NamesAndTypesListPtr required_columns_)
: output(output_), required_columns(required_columns_)
{
}
String getName() const { return "AddingDefaultBlockOutputStream"; }
void write(const Block & block) {
Block res = block;
for (NamesAndTypesList::const_iterator it = required_columns->begin(); it != required_columns->end(); ++it)
{
if (!res.has(it->first))
{
ColumnWithNameAndType col;
col.name = it->first;
col.type = it->second;
col.column = dynamic_cast<IColumnConst &>(*it->second->createConstColumn(
res.rows(), it->second->getDefault())).convertToFullColumn();
res.insert(col);
}
}
output->write(res);
}
private:
BlockOutputStreamPtr output;
NamesAndTypesListPtr required_columns;
};
}

View File

@ -4,6 +4,7 @@
#include <DB/AggregateFunctions/AggregateFunctionAny.h>
#include <DB/AggregateFunctions/AggregateFunctionAnyLast.h>
#include <DB/AggregateFunctions/AggregateFunctionsMinMax.h>
#include <DB/AggregateFunctions/AggregateFunctionsAgrMinMax.h>
#include <DB/AggregateFunctions/AggregateFunctionUniq.h>
#include <DB/AggregateFunctions/AggregateFunctionGroupArray.h>
#include <DB/AggregateFunctions/AggregateFunctionGroupUniqArray.h>
@ -77,6 +78,10 @@ AggregateFunctionPtr AggregateFunctionFactory::get(const String & name, const Da
return new AggregateFunctionMin;
else if (name == "max")
return new AggregateFunctionMax;
else if (name == "agrMin")
return new AggregateFunctionAgrMin;
else if (name == "agrMax")
return new AggregateFunctionAgrMax;
else if (name == "groupArray")
return new AggregateFunctionGroupArray;
else if (name == "groupUniqArray")
@ -301,6 +306,8 @@ bool AggregateFunctionFactory::isAggregateFunctionName(const String & name, int
"anyLast",
"min",
"max",
"agrMin",
"agrMax",
"sum",
"avg",
"uniq",

View File

@ -1,5 +1,6 @@
#include <DB/IO/ConcatReadBuffer.h>
#include <DB/DataStreams/AddingDefaultBlockOutputStream.h>
#include <DB/DataStreams/MaterializingBlockInputStream.h>
#include <DB/DataStreams/copyData.h>
@ -10,7 +11,6 @@
#include <DB/Interpreters/InterpreterSelectQuery.h>
#include <DB/Interpreters/InterpreterInsertQuery.h>
namespace DB
{
@ -29,22 +29,49 @@ StoragePtr InterpreterInsertQuery::getTable()
return context.getTable(query.database, query.table);
}
Block InterpreterInsertQuery::getSampleBlock()
{
return getTable()->getSampleBlock();
}
ASTInsertQuery & query = dynamic_cast<ASTInsertQuery &>(*query_ptr);
Block dbSample = getTable()->getSampleBlock();
/// Если в запросе не указана информация о столбцах
if (!query.columns)
return dbSample;
/// Строим мап из имени столбца в его тип
const NamesAndTypesList & names_and_types = dbSample.getColumnsList();
std::map<std::string, DataTypePtr> nameToType(names_and_types.begin(), names_and_types.end());
/// Формируем блок, основываясь на именах столбцов из запроса
Block res;
for (ASTs::iterator it = query.columns->children.begin(); it != query.columns->children.end(); it ++)
{
std::string currentName = (*it)->getColumnName();
/// В таблице нет столбца с таким именем
if (nameToType.count(currentName) == 0)
throw Exception("No such column in table: " + currentName, ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
ColumnWithNameAndType col;
col.name = currentName;
col.type = nameToType[col.name];
col.column = col.type->createColumn();
res.insert(col);
}
return res;
}
void InterpreterInsertQuery::execute(ReadBuffer * remaining_data_istr)
{
ASTInsertQuery & query = dynamic_cast<ASTInsertQuery &>(*query_ptr);
StoragePtr table = getTable();
/// TODO - если указаны не все столбцы, то дополнить поток недостающими столбцами со значениями по-умолчанию.
BlockInputStreamPtr in;
BlockOutputStreamPtr out = table->write(query_ptr);
NamesAndTypesListPtr required_columns = new NamesAndTypesList;
*required_columns = table->getSampleBlock().getColumnsList();
BlockOutputStreamPtr out = new AddingDefaultBlockOutputStream(table->write(query_ptr), required_columns);
/// Какой тип запроса: INSERT VALUES | INSERT FORMAT | INSERT SELECT?
if (!query.select)
@ -53,6 +80,7 @@ void InterpreterInsertQuery::execute(ReadBuffer * remaining_data_istr)
if (format.empty())
format = "Values";
/// Данные могут содержаться в распарсенной (query.data) и ещё не распарсенной (remaining_data_istr) части запроса.
/// Если данных нет.
@ -80,6 +108,7 @@ void InterpreterInsertQuery::execute(ReadBuffer * remaining_data_istr)
InterpreterSelectQuery interpreter_select(query.select, context);
in = interpreter_select.execute();
in = new MaterializingBlockInputStream(in);
copyData(*in, *out);
}
}
@ -90,8 +119,9 @@ BlockOutputStreamPtr InterpreterInsertQuery::execute()
ASTInsertQuery & query = dynamic_cast<ASTInsertQuery &>(*query_ptr);
StoragePtr table = getTable();
/// TODO - если указаны не все столбцы, то дополнить поток недостающими столбцами со значениями по-умолчанию.
BlockOutputStreamPtr out = table->write(query_ptr);
NamesAndTypesListPtr required_columns = new NamesAndTypesList;
*required_columns = table->getSampleBlock().getColumnsList();
BlockOutputStreamPtr out = new AddingDefaultBlockOutputStream(table->write(query_ptr), required_columns);
/// Какой тип запроса: INSERT или INSERT SELECT?
if (!query.select)

View File

@ -66,9 +66,11 @@ bool ParserInsertQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, String & ex
/// Есть ли список столбцов
if (s_lparen.ignore(pos, end, expected)
&& (!columns_p.parse(pos, end, columns, expected)
|| !ws.ignore(pos, end)
|| (!ws.ignore(pos, end) && ws.ignore(pos, end))
|| !s_rparen.ignore(pos, end, expected)))
{
return false;
}
ws.ignore(pos, end);