From c899cd3c0d5444c1e6dda7b80d2d309236d4d1be Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Wed, 30 May 2012 03:30:29 +0000 Subject: [PATCH] dbms: development [#CONV-2944]. --- .../DB/Columns/ColumnAggregateFunction.h | 2 ++ .../DataStreams/NativeBlockInputStream.cpp | 4 --- .../DataStreams/NativeBlockOutputStream.cpp | 4 --- dbms/src/DataTypes/DataTypeFactory.cpp | 32 ++++++++++--------- dbms/src/Interpreters/Aggregator.cpp | 16 ++++++---- dbms/src/Server/TCPHandler.cpp | 5 ++- 6 files changed, 30 insertions(+), 33 deletions(-) diff --git a/dbms/include/DB/Columns/ColumnAggregateFunction.h b/dbms/include/DB/Columns/ColumnAggregateFunction.h index 6606ce09302..b78a4756845 100644 --- a/dbms/include/DB/Columns/ColumnAggregateFunction.h +++ b/dbms/include/DB/Columns/ColumnAggregateFunction.h @@ -15,6 +15,8 @@ class ColumnAggregateFunction : public ColumnVector public: std::string getName() const { return "ColumnAggregateFunction"; } + ColumnPtr cloneEmpty() const { return new ColumnAggregateFunction; }; + bool isNumeric() const { return false; } Field operator[](size_t n) const diff --git a/dbms/src/DataStreams/NativeBlockInputStream.cpp b/dbms/src/DataStreams/NativeBlockInputStream.cpp index 54d9feba457..fd1a50cb824 100644 --- a/dbms/src/DataStreams/NativeBlockInputStream.cpp +++ b/dbms/src/DataStreams/NativeBlockInputStream.cpp @@ -32,14 +32,10 @@ Block NativeBlockInputStream::readImpl() readStringBinary(type_name, istr); column.type = data_type_factory.get(type_name); - std::cerr << "read: " << column.type->getName() << std::endl; - /// Данные column.column = column.type->createColumn(); column.type->deserializeBinary(*column.column, istr, rows); - std::cerr << "read: done" << std::endl; - if (column.column->size() != rows) throw Exception("Cannot read all data in NativeBlockInputStream.", ErrorCodes::CANNOT_READ_ALL_DATA); diff --git a/dbms/src/DataStreams/NativeBlockOutputStream.cpp b/dbms/src/DataStreams/NativeBlockOutputStream.cpp index f2d38e912c8..83b6c92114f 100644 --- a/dbms/src/DataStreams/NativeBlockOutputStream.cpp +++ b/dbms/src/DataStreams/NativeBlockOutputStream.cpp @@ -27,8 +27,6 @@ void NativeBlockOutputStream::write(const Block & block) /// Тип writeStringBinary(column.type->getName(), ostr); - std::cerr << "write: " << column.type->getName() << std::endl; - /// Данные if (column.column->isConst()) { @@ -40,8 +38,6 @@ void NativeBlockOutputStream::write(const Block & block) } else column.type->serializeBinary(*column.column, ostr); - - std::cerr << "write: done" << std::endl; } } diff --git a/dbms/src/DataTypes/DataTypeFactory.cpp b/dbms/src/DataTypes/DataTypeFactory.cpp index e3e4230c811..cc316a925ad 100644 --- a/dbms/src/DataTypes/DataTypeFactory.cpp +++ b/dbms/src/DataTypes/DataTypeFactory.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #include @@ -19,21 +20,22 @@ DataTypeFactory::DataTypeFactory() : fixed_string_regexp("^FixedString\\s*\\(\\s*(\\d+)\\s*\\)$") { boost::assign::insert(non_parametric_data_types) - ("UInt8", new DataTypeUInt8) - ("UInt16", new DataTypeUInt16) - ("UInt32", new DataTypeUInt32) - ("UInt64", new DataTypeUInt64) - ("Int8", new DataTypeInt8) - ("Int16", new DataTypeInt16) - ("Int32", new DataTypeInt32) - ("Int64", new DataTypeInt64) - ("Float32", new DataTypeFloat32) - ("Float64", new DataTypeFloat64) - ("VarUInt", new DataTypeVarUInt) - ("VarInt", new DataTypeVarInt) - ("Date", new DataTypeDate) - ("DateTime", new DataTypeDateTime) - ("String", new DataTypeString) + ("UInt8", new DataTypeUInt8) + ("UInt16", new DataTypeUInt16) + ("UInt32", new DataTypeUInt32) + ("UInt64", new DataTypeUInt64) + ("Int8", new DataTypeInt8) + ("Int16", new DataTypeInt16) + ("Int32", new DataTypeInt32) + ("Int64", new DataTypeInt64) + ("Float32", new DataTypeFloat32) + ("Float64", new DataTypeFloat64) + ("VarUInt", new DataTypeVarUInt) + ("VarInt", new DataTypeVarInt) + ("Date", new DataTypeDate) + ("DateTime", new DataTypeDateTime) + ("String", new DataTypeString) + ("AggregateFunction", new DataTypeAggregateFunction) ; } diff --git a/dbms/src/Interpreters/Aggregator.cpp b/dbms/src/Interpreters/Aggregator.cpp index 77b52bcb014..e95ce6557c9 100644 --- a/dbms/src/Interpreters/Aggregator.cpp +++ b/dbms/src/Interpreters/Aggregator.cpp @@ -669,6 +669,10 @@ void Aggregator::merge(BlockInputStreamPtr stream, AggregatedDataVariants & resu /// Читаем все данные while (Block block = stream->read()) { + if (!sample) + for (size_t i = 0; i < keys_size + aggregates_size; ++i) + sample.insert(block.getByPosition(i).cloneEmpty()); + /// Запоминаем столбцы, с которыми будем работать for (size_t i = 0; i < keys_size; ++i) key_columns[i] = block.getByPosition(i).column; @@ -690,14 +694,12 @@ void Aggregator::merge(BlockInputStreamPtr stream, AggregatedDataVariants & resu { res.resize(aggregates_size); for (size_t i = 0; i < aggregates_size; ++i) - res[i] = (*aggregate_columns[i])[0]; - } - else - { - /// Добавляем значения - for (size_t i = 0; i < aggregates_size; ++i) - res[i]->merge(*(*aggregate_columns[i])[0]); + res[i] = aggregates[i].function->cloneEmpty(); } + + /// Добавляем значения + for (size_t i = 0; i < aggregates_size; ++i) + res[i]->merge(*(*aggregate_columns[i])[0]); } else if (result.type == AggregatedDataVariants::KEY_64) { diff --git a/dbms/src/Server/TCPHandler.cpp b/dbms/src/Server/TCPHandler.cpp index 4de1e6e1fa1..b9b28b6065c 100644 --- a/dbms/src/Server/TCPHandler.cpp +++ b/dbms/src/Server/TCPHandler.cpp @@ -117,14 +117,13 @@ void TCPHandler::processOrdinaryQuery() { profiling_in->setIsCancelledCallback(boost::bind(&TCPHandler::isQueryCancelled, this)); profiling_in->setProgressCallback(boost::bind(&TCPHandler::sendProgress, this, _1, _2)); + + profiling_in->dumpTree(std::cerr); } while (true) { - std::cerr << Poco::ThreadNumber::get() << "!" << std::endl; - dynamic_cast(*state.io.in).dumpTree(std::cerr); Block block = state.io.in->read(); - std::cerr << Poco::ThreadNumber::get() << "!!" << std::endl; sendData(block); if (!block) break;