dbms: addition to prev. revision [#CONV-2944].

This commit is contained in:
Alexey Milovidov 2013-05-03 10:34:36 +00:00
parent 85664affbf
commit cefc1fcc67
3 changed files with 6 additions and 79 deletions

View File

@ -17,36 +17,12 @@
#include <DB/DataStreams/AggregatingBlockInputStream.h>
#include <DB/DataStreams/FinalizingAggregatedBlockInputStream.h>
#include <DB/DataStreams/TabSeparatedRowOutputStream.h>
#include <DB/DataStreams/OneBlockInputStream.h>
#include <DB/DataStreams/copyData.h>
#include <DB/AggregateFunctions/AggregateFunctionFactory.h>
class OneBlockInputStream : public DB::IBlockInputStream
{
private:
const DB::Block & block;
bool has_been_read;
public:
OneBlockInputStream(const DB::Block & block_) : block(block_), has_been_read(false) {}
DB::Block read()
{
if (!has_been_read)
{
has_been_read = true;
return block;
}
else
return DB::Block();
}
DB::String getName() const { return "OneBlockInputStream"; }
DB::BlockInputStreamPtr clone() { return new OneBlockInputStream(block); }
};
int main(int argc, char ** argv)
{
try
@ -116,7 +92,7 @@ int main(int argc, char ** argv)
sample.insert(col);
}
DB::BlockInputStreamPtr stream = new OneBlockInputStream(block);
DB::BlockInputStreamPtr stream = new DB::OneBlockInputStream(block);
stream = new DB::AggregatingBlockInputStream(stream, key_column_numbers, aggregate_descriptions, 0, DB::Limits::THROW);
stream = new DB::FinalizingAggregatedBlockInputStream(stream);

View File

@ -9,38 +9,13 @@
#include <DB/Columns/ColumnsNumber.h>
#include <DB/Columns/ColumnString.h>
#include <DB/DataStreams/IBlockInputStream.h>
#include <DB/DataStreams/OneBlockInputStream.h>
#include <DB/Interpreters/Aggregator.h>
#include <DB/AggregateFunctions/AggregateFunctionFactory.h>
class OneBlockInputStream : public DB::IBlockInputStream
{
private:
const DB::Block & block;
bool has_been_read;
public:
OneBlockInputStream(const DB::Block & block_) : block(block_), has_been_read(false) {}
DB::Block read()
{
if (!has_been_read)
{
has_been_read = true;
return block;
}
else
return DB::Block();
}
DB::String getName() const { return "OneBlockInputStream"; }
DB::BlockInputStreamPtr clone() { return new OneBlockInputStream(block); }
};
int main(int argc, char ** argv)
{
try
@ -84,7 +59,7 @@ int main(int argc, char ** argv)
block.insert(column_s2);
DB::BlockInputStreamPtr stream = new OneBlockInputStream(block);
DB::BlockInputStreamPtr stream = new DB::OneBlockInputStream(block);
DB::AggregatedDataVariants aggregated_data_variants;
DB::ColumnNumbers key_column_numbers;

View File

@ -16,6 +16,7 @@
#include <DB/DataStreams/TabSeparatedRowOutputStream.h>
#include <DB/DataStreams/LimitBlockInputStream.h>
#include <DB/DataStreams/OneBlockInputStream.h>
#include <DB/DataStreams/copyData.h>
#include <DB/Interpreters/Expression.h>
@ -46,31 +47,6 @@ void dump(DB::IAST & ast, int level = 0)
}
class OneBlockInputStream : public DB::IBlockInputStream
{
private:
const DB::Block & block;
bool has_been_read;
public:
OneBlockInputStream(const DB::Block & block_) : block(block_), has_been_read(false) {}
DB::Block read()
{
if (!has_been_read)
{
has_been_read = true;
return block;
}
else
return DB::Block();
}
DB::String getName() const { return "OneBlockInputStream"; }
DB::BlockInputStreamPtr clone() { return new OneBlockInputStream(block); }
};
int main(int argc, char ** argv)
{
try
@ -168,7 +144,7 @@ int main(int argc, char ** argv)
<< std::endl;
}
OneBlockInputStream * is = new OneBlockInputStream(block);
DB::OneBlockInputStream * is = new DB::OneBlockInputStream(block);
DB::LimitBlockInputStream lis(is, 20, std::max(0, static_cast<int>(n) - 20));
DB::WriteBufferFromOStream out_buf(std::cout);
DB::TabSeparatedRowOutputStream os(out_buf, block);