mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-28 02:21:59 +00:00
dbms: development [#CONV-2944].
This commit is contained in:
parent
b2a826727e
commit
987784ed80
@ -62,6 +62,9 @@ public:
|
||||
|
||||
operator bool() const { return !data.empty(); }
|
||||
bool operator!() const { return data.empty(); }
|
||||
|
||||
/** Получить список имён столбцов через запятую. */
|
||||
std::string dumpNames() const;
|
||||
};
|
||||
|
||||
}
|
||||
|
41
dbms/include/DB/DataStreams/ExpressionBlockInputStream.h
Normal file
41
dbms/include/DB/DataStreams/ExpressionBlockInputStream.h
Normal file
@ -0,0 +1,41 @@
|
||||
#pragma once
|
||||
|
||||
#include <Poco/SharedPtr.h>
|
||||
|
||||
#include <DB/Interpreters/Expression.h>
|
||||
#include <DB/DataStreams/IBlockInputStream.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
using Poco::SharedPtr;
|
||||
|
||||
|
||||
/** Выполняет над блоком вычисление некоторого выражения.
|
||||
* Выражение состоит из идентификаторов столбцов из блока, констант, обычных функций.
|
||||
* Например: hits * 2 + 3, instr("yandex", url)
|
||||
* Выражение не меняет количество строк в потоке, и обрабатывает каждую строку независимо от других.
|
||||
*/
|
||||
class ExpressionBlockInputStream : public IBlockInputStream
|
||||
{
|
||||
public:
|
||||
ExpressionBlockInputStream(SharedPtr<IBlockInputStream> input_, SharedPtr<Expression> expression_)
|
||||
: input(input_), expression(expression_) {}
|
||||
|
||||
Block read()
|
||||
{
|
||||
Block res = input->read();
|
||||
if (!res)
|
||||
return res;
|
||||
|
||||
expression->execute(res);
|
||||
return res;
|
||||
}
|
||||
|
||||
private:
|
||||
SharedPtr<IBlockInputStream> input;
|
||||
SharedPtr<Expression> expression;
|
||||
};
|
||||
|
||||
}
|
@ -29,6 +29,10 @@ public:
|
||||
*/
|
||||
void execute(Block & block);
|
||||
|
||||
/** Получить список типов столбцов результата.
|
||||
*/
|
||||
DataTypes getReturnTypes();
|
||||
|
||||
private:
|
||||
ASTPtr ast;
|
||||
const Context & context;
|
||||
@ -67,6 +71,8 @@ private:
|
||||
Block projectResult(ASTPtr ast, Block & block);
|
||||
|
||||
void collectFinalColumns(ASTPtr ast, Block & src, Block & dst);
|
||||
|
||||
void getReturnTypesImpl(ASTPtr ast, DataTypes & res);
|
||||
};
|
||||
|
||||
|
||||
|
@ -44,7 +44,8 @@ void Block::rebuildIndexByPosition()
|
||||
void Block::insert(size_t position, const ColumnWithNameAndType & elem)
|
||||
{
|
||||
if (position >= index_by_position.size())
|
||||
throw Exception("Position out of bound in Block::insert()", ErrorCodes::POSITION_OUT_OF_BOUND);
|
||||
throw Exception("Position out of bound in Block::insert(), max position = "
|
||||
+ Poco::NumberFormatter::format(index_by_position.size()), ErrorCodes::POSITION_OUT_OF_BOUND);
|
||||
|
||||
Container_t::iterator it = data.insert(index_by_position[position], elem);
|
||||
rebuildIndexByPosition();
|
||||
@ -63,7 +64,8 @@ void Block::insert(const ColumnWithNameAndType & elem)
|
||||
void Block::erase(size_t position)
|
||||
{
|
||||
if (position >= index_by_position.size())
|
||||
throw Exception("Position out of bound in Block::erase()", ErrorCodes::POSITION_OUT_OF_BOUND);
|
||||
throw Exception("Position out of bound in Block::erase(), max position = "
|
||||
+ Poco::NumberFormatter::format(index_by_position.size()), ErrorCodes::POSITION_OUT_OF_BOUND);
|
||||
|
||||
Container_t::iterator it = index_by_position[position];
|
||||
index_by_name.erase(index_by_name.find(it->name));
|
||||
@ -94,7 +96,8 @@ const ColumnWithNameAndType & Block::getByName(const std::string & name) const
|
||||
{
|
||||
IndexByName_t::const_iterator it = index_by_name.find(name);
|
||||
if (index_by_name.end() == it)
|
||||
throw Exception("Not found column " + name + " in block.", ErrorCodes::NOT_FOUND_COLUMN_IN_BLOCK);
|
||||
throw Exception("Not found column " + name + " in block. There are only columns: " + dumpNames()
|
||||
, ErrorCodes::NOT_FOUND_COLUMN_IN_BLOCK);
|
||||
|
||||
return *it->second;
|
||||
}
|
||||
@ -104,7 +107,8 @@ size_t Block::getPositionByName(const std::string & name) const
|
||||
{
|
||||
IndexByName_t::const_iterator it = index_by_name.find(name);
|
||||
if (index_by_name.end() == it)
|
||||
throw Exception("Not found column " + name + " in block.", ErrorCodes::NOT_FOUND_COLUMN_IN_BLOCK);
|
||||
throw Exception("Not found column " + name + " in block. There are only columns: " + dumpNames()
|
||||
, ErrorCodes::NOT_FOUND_COLUMN_IN_BLOCK);
|
||||
|
||||
return std::distance(const_cast<Container_t &>(data).begin(), it->second);
|
||||
}
|
||||
@ -118,10 +122,13 @@ size_t Block::rows() const
|
||||
size_t size = it->column->size();
|
||||
|
||||
if (size == 0)
|
||||
throw Exception("Empty column in block.", ErrorCodes::EMPTY_COLUMN_IN_BLOCK);
|
||||
throw Exception("Empty column " + it->name + " in block.", ErrorCodes::EMPTY_COLUMN_IN_BLOCK);
|
||||
|
||||
if (res != 0 && size != res)
|
||||
throw Exception("Sizes of columns doesn't match.", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
|
||||
throw Exception("Sizes of columns doesn't match: "
|
||||
+ data.begin()->name + ": " + Poco::NumberFormatter::format(res)
|
||||
+ ", " + it->name + ": " + Poco::NumberFormatter::format(size)
|
||||
, ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
|
||||
|
||||
res = size;
|
||||
}
|
||||
@ -135,4 +142,18 @@ size_t Block::columns() const
|
||||
return data.size();
|
||||
}
|
||||
|
||||
|
||||
std::string Block::dumpNames() const
|
||||
{
|
||||
std::stringstream res;
|
||||
for (Container_t::const_iterator it = data.begin(); it != data.end(); ++it)
|
||||
{
|
||||
if (it != data.begin())
|
||||
res << ", ";
|
||||
res << it->name;
|
||||
}
|
||||
return res.str();
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@ -36,8 +36,10 @@ Block LimitBlockInputStream::read()
|
||||
return res;
|
||||
|
||||
/// отдать кусок блока
|
||||
size_t start = std::max(0, static_cast<int>(offset) + static_cast<int>(rows) - static_cast<int>(pos));
|
||||
size_t length = std::min(rows - start, limit + offset + rows - pos);
|
||||
size_t start = std::max(0, static_cast<int>(offset) - static_cast<int>(pos) + static_cast<int>(rows));
|
||||
size_t length = std::min(static_cast<int>(limit), std::min(
|
||||
static_cast<int>(pos) - static_cast<int>(offset),
|
||||
static_cast<int>(limit) + static_cast<int>(offset) - static_cast<int>(pos) + static_cast<int>(rows)));
|
||||
|
||||
for (size_t i = 0; i < res.columns(); ++i)
|
||||
res.getByPosition(i).column->cut(start, length);
|
||||
|
87
dbms/src/DataStreams/tests/expression_stream.cpp
Normal file
87
dbms/src/DataStreams/tests/expression_stream.cpp
Normal file
@ -0,0 +1,87 @@
|
||||
#include <iostream>
|
||||
#include <iomanip>
|
||||
|
||||
#include <Poco/SharedPtr.h>
|
||||
#include <Poco/Stopwatch.h>
|
||||
|
||||
#include <DB/IO/WriteBufferFromOStream.h>
|
||||
|
||||
#include <DB/Storages/StorageSystemNumbers.h>
|
||||
|
||||
#include <DB/DataStreams/LimitBlockInputStream.h>
|
||||
#include <DB/DataStreams/ExpressionBlockInputStream.h>
|
||||
#include <DB/DataStreams/TabSeparatedRowOutputStream.h>
|
||||
#include <DB/DataStreams/copyData.h>
|
||||
|
||||
#include <DB/DataTypes/DataTypesNumberFixed.h>
|
||||
|
||||
#include <DB/Functions/FunctionsArithmetic.h>
|
||||
|
||||
#include <DB/Parsers/ParserSelectQuery.h>
|
||||
|
||||
using Poco::SharedPtr;
|
||||
|
||||
|
||||
int main(int argc, char ** argv)
|
||||
{
|
||||
try
|
||||
{
|
||||
size_t n = argc == 2 ? atoi(argv[1]) : 10;
|
||||
|
||||
DB::StorageSystemNumbers table;
|
||||
|
||||
DB::Names column_names;
|
||||
column_names.push_back("number");
|
||||
|
||||
DB::ParserSelectQuery parser;
|
||||
DB::ASTPtr ast;
|
||||
std::string input = "SELECT number, number + 1, number * 2, number * 2 + 1";
|
||||
std::string expected;
|
||||
|
||||
const char * begin = input.data();
|
||||
const char * end = begin + input.size();
|
||||
const char * pos = begin;
|
||||
|
||||
if (!parser.parse(pos, end, ast, expected))
|
||||
{
|
||||
std::cout << "Failed at position " << (pos - begin) << ": "
|
||||
<< mysqlxx::quote << input.substr(pos - begin, 10)
|
||||
<< ", expected " << expected << "." << std::endl;
|
||||
}
|
||||
|
||||
DB::Context context;
|
||||
context.columns["number"] = new DB::DataTypeUInt64;
|
||||
context.functions["plus"] = new DB::FunctionPlus;
|
||||
context.functions["multiply"] = new DB::FunctionMultiply;
|
||||
|
||||
Poco::SharedPtr<DB::Expression> expression = new DB::Expression(ast, context);
|
||||
|
||||
Poco::SharedPtr<DB::IBlockInputStream> in1(table.read(column_names, 0));
|
||||
|
||||
Poco::SharedPtr<DB::ExpressionBlockInputStream> in2 = new DB::ExpressionBlockInputStream(in1, expression);
|
||||
DB::LimitBlockInputStream in3(in2, 10, std::max(0, static_cast<int>(n) - 10));
|
||||
|
||||
DB::WriteBufferFromOStream out1(std::cout);
|
||||
DB::TabSeparatedRowOutputStream out2(out1, new DB::DataTypes(expression->getReturnTypes()));
|
||||
|
||||
{
|
||||
Poco::Stopwatch stopwatch;
|
||||
stopwatch.start();
|
||||
|
||||
DB::copyData(in3, out2);
|
||||
|
||||
stopwatch.stop();
|
||||
std::cout << std::fixed << std::setprecision(2)
|
||||
<< "Elapsed " << stopwatch.elapsed() / 1000000.0 << " sec."
|
||||
<< ", " << n * 1000000 / stopwatch.elapsed() << " rows/sec."
|
||||
<< std::endl;
|
||||
}
|
||||
}
|
||||
catch (const DB::Exception & e)
|
||||
{
|
||||
std::cerr << e.what() << ", " << e.message() << std::endl;
|
||||
return 1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
@ -136,7 +136,7 @@ void Expression::executeImpl(ASTPtr ast, Block & block)
|
||||
|
||||
if (ASTFunction * node = dynamic_cast<ASTFunction *>(&*ast))
|
||||
{
|
||||
std::cerr << node->getTreeID() << std::endl;
|
||||
//std::cerr << node->getTreeID() << std::endl;
|
||||
|
||||
/// Вставляем в блок столбцы - результаты вычисления функции
|
||||
ColumnNumbers argument_numbers;
|
||||
@ -170,7 +170,7 @@ void Expression::executeImpl(ASTPtr ast, Block & block)
|
||||
}
|
||||
else if (ASTLiteral * node = dynamic_cast<ASTLiteral *>(&*ast))
|
||||
{
|
||||
std::cerr << node->getTreeID() << std::endl;
|
||||
//std::cerr << node->getTreeID() << std::endl;
|
||||
|
||||
/// Вставляем в блок столбец - константу
|
||||
|
||||
@ -215,4 +215,32 @@ void Expression::collectFinalColumns(ASTPtr ast, Block & src, Block & dst)
|
||||
}
|
||||
|
||||
|
||||
DataTypes Expression::getReturnTypes()
|
||||
{
|
||||
DataTypes res;
|
||||
getReturnTypesImpl(ast, res);
|
||||
return res;
|
||||
}
|
||||
|
||||
|
||||
void Expression::getReturnTypesImpl(ASTPtr ast, DataTypes & res)
|
||||
{
|
||||
if (ASTExpressionList * node = dynamic_cast<ASTExpressionList *>(&*ast))
|
||||
{
|
||||
for (ASTs::iterator it = node->children.begin(); it != node->children.end(); ++it)
|
||||
{
|
||||
if (ASTIdentifier * ident = dynamic_cast<ASTIdentifier *>(&**it))
|
||||
res.push_back(ident->type);
|
||||
else if (ASTFunction * func = dynamic_cast<ASTFunction *>(&**it))
|
||||
res.insert(res.end(), func->return_types.begin(), func->return_types.end());
|
||||
else if (ASTLiteral * lit = dynamic_cast<ASTLiteral *>(&**it))
|
||||
res.push_back(lit->type);
|
||||
}
|
||||
}
|
||||
else
|
||||
for (ASTs::iterator it = ast->children.begin(); it != ast->children.end(); ++it)
|
||||
getReturnTypesImpl(*it, res);
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
@ -21,9 +21,8 @@ NumbersBlockInputStream::NumbersBlockInputStream(size_t block_size_) : block_siz
|
||||
Block NumbersBlockInputStream::read()
|
||||
{
|
||||
Block res;
|
||||
res.insert(ColumnWithNameAndType());
|
||||
|
||||
ColumnWithNameAndType & column_with_name_and_type = res.getByPosition(0);
|
||||
ColumnWithNameAndType column_with_name_and_type;
|
||||
|
||||
column_with_name_and_type.name = "number";
|
||||
column_with_name_and_type.type = new DataTypeUInt64();
|
||||
@ -34,6 +33,8 @@ Block NumbersBlockInputStream::read()
|
||||
for (size_t i = 0; i < block_size; ++i)
|
||||
vec[i] = next++;
|
||||
|
||||
res.insert(column_with_name_and_type);
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user