dbms: development [#CONV-2944].

This commit is contained in:
Alexey Milovidov 2012-05-30 03:30:29 +00:00
parent f018ea4aac
commit c899cd3c0d
6 changed files with 30 additions and 33 deletions

View File

@ -15,6 +15,8 @@ class ColumnAggregateFunction : public ColumnVector<AggregateFunctionPtr>
public: public:
std::string getName() const { return "ColumnAggregateFunction"; } std::string getName() const { return "ColumnAggregateFunction"; }
ColumnPtr cloneEmpty() const { return new ColumnAggregateFunction; };
bool isNumeric() const { return false; } bool isNumeric() const { return false; }
Field operator[](size_t n) const Field operator[](size_t n) const

View File

@ -32,14 +32,10 @@ Block NativeBlockInputStream::readImpl()
readStringBinary(type_name, istr); readStringBinary(type_name, istr);
column.type = data_type_factory.get(type_name); column.type = data_type_factory.get(type_name);
std::cerr << "read: " << column.type->getName() << std::endl;
/// Данные /// Данные
column.column = column.type->createColumn(); column.column = column.type->createColumn();
column.type->deserializeBinary(*column.column, istr, rows); column.type->deserializeBinary(*column.column, istr, rows);
std::cerr << "read: done" << std::endl;
if (column.column->size() != rows) if (column.column->size() != rows)
throw Exception("Cannot read all data in NativeBlockInputStream.", ErrorCodes::CANNOT_READ_ALL_DATA); throw Exception("Cannot read all data in NativeBlockInputStream.", ErrorCodes::CANNOT_READ_ALL_DATA);

View File

@ -27,8 +27,6 @@ void NativeBlockOutputStream::write(const Block & block)
/// Тип /// Тип
writeStringBinary(column.type->getName(), ostr); writeStringBinary(column.type->getName(), ostr);
std::cerr << "write: " << column.type->getName() << std::endl;
/// Данные /// Данные
if (column.column->isConst()) if (column.column->isConst())
{ {
@ -40,8 +38,6 @@ void NativeBlockOutputStream::write(const Block & block)
} }
else else
column.type->serializeBinary(*column.column, ostr); column.type->serializeBinary(*column.column, ostr);
std::cerr << "write: done" << std::endl;
} }
} }

View File

@ -8,6 +8,7 @@
#include <DB/DataTypes/DataTypeDateTime.h> #include <DB/DataTypes/DataTypeDateTime.h>
#include <DB/DataTypes/DataTypeString.h> #include <DB/DataTypes/DataTypeString.h>
#include <DB/DataTypes/DataTypeFixedString.h> #include <DB/DataTypes/DataTypeFixedString.h>
#include <DB/DataTypes/DataTypeAggregateFunction.h>
#include <DB/DataTypes/DataTypeFactory.h> #include <DB/DataTypes/DataTypeFactory.h>
@ -19,21 +20,22 @@ DataTypeFactory::DataTypeFactory()
: fixed_string_regexp("^FixedString\\s*\\(\\s*(\\d+)\\s*\\)$") : fixed_string_regexp("^FixedString\\s*\\(\\s*(\\d+)\\s*\\)$")
{ {
boost::assign::insert(non_parametric_data_types) boost::assign::insert(non_parametric_data_types)
("UInt8", new DataTypeUInt8) ("UInt8", new DataTypeUInt8)
("UInt16", new DataTypeUInt16) ("UInt16", new DataTypeUInt16)
("UInt32", new DataTypeUInt32) ("UInt32", new DataTypeUInt32)
("UInt64", new DataTypeUInt64) ("UInt64", new DataTypeUInt64)
("Int8", new DataTypeInt8) ("Int8", new DataTypeInt8)
("Int16", new DataTypeInt16) ("Int16", new DataTypeInt16)
("Int32", new DataTypeInt32) ("Int32", new DataTypeInt32)
("Int64", new DataTypeInt64) ("Int64", new DataTypeInt64)
("Float32", new DataTypeFloat32) ("Float32", new DataTypeFloat32)
("Float64", new DataTypeFloat64) ("Float64", new DataTypeFloat64)
("VarUInt", new DataTypeVarUInt) ("VarUInt", new DataTypeVarUInt)
("VarInt", new DataTypeVarInt) ("VarInt", new DataTypeVarInt)
("Date", new DataTypeDate) ("Date", new DataTypeDate)
("DateTime", new DataTypeDateTime) ("DateTime", new DataTypeDateTime)
("String", new DataTypeString) ("String", new DataTypeString)
("AggregateFunction", new DataTypeAggregateFunction)
; ;
} }

View File

@ -669,6 +669,10 @@ void Aggregator::merge(BlockInputStreamPtr stream, AggregatedDataVariants & resu
/// Читаем все данные /// Читаем все данные
while (Block block = stream->read()) while (Block block = stream->read())
{ {
if (!sample)
for (size_t i = 0; i < keys_size + aggregates_size; ++i)
sample.insert(block.getByPosition(i).cloneEmpty());
/// Запоминаем столбцы, с которыми будем работать /// Запоминаем столбцы, с которыми будем работать
for (size_t i = 0; i < keys_size; ++i) for (size_t i = 0; i < keys_size; ++i)
key_columns[i] = block.getByPosition(i).column; key_columns[i] = block.getByPosition(i).column;
@ -690,14 +694,12 @@ void Aggregator::merge(BlockInputStreamPtr stream, AggregatedDataVariants & resu
{ {
res.resize(aggregates_size); res.resize(aggregates_size);
for (size_t i = 0; i < aggregates_size; ++i) for (size_t i = 0; i < aggregates_size; ++i)
res[i] = (*aggregate_columns[i])[0]; res[i] = aggregates[i].function->cloneEmpty();
}
else
{
/// Добавляем значения
for (size_t i = 0; i < aggregates_size; ++i)
res[i]->merge(*(*aggregate_columns[i])[0]);
} }
/// Добавляем значения
for (size_t i = 0; i < aggregates_size; ++i)
res[i]->merge(*(*aggregate_columns[i])[0]);
} }
else if (result.type == AggregatedDataVariants::KEY_64) else if (result.type == AggregatedDataVariants::KEY_64)
{ {

View File

@ -117,14 +117,13 @@ void TCPHandler::processOrdinaryQuery()
{ {
profiling_in->setIsCancelledCallback(boost::bind(&TCPHandler::isQueryCancelled, this)); profiling_in->setIsCancelledCallback(boost::bind(&TCPHandler::isQueryCancelled, this));
profiling_in->setProgressCallback(boost::bind(&TCPHandler::sendProgress, this, _1, _2)); profiling_in->setProgressCallback(boost::bind(&TCPHandler::sendProgress, this, _1, _2));
profiling_in->dumpTree(std::cerr);
} }
while (true) while (true)
{ {
std::cerr << Poco::ThreadNumber::get() << "!" << std::endl;
dynamic_cast<IProfilingBlockInputStream &>(*state.io.in).dumpTree(std::cerr);
Block block = state.io.in->read(); Block block = state.io.in->read();
std::cerr << Poco::ThreadNumber::get() << "!!" << std::endl;
sendData(block); sendData(block);
if (!block) if (!block)
break; break;