dbms: development.

This commit is contained in:
Alexey Milovidov 2010-03-12 20:44:25 +00:00
parent f25307c662
commit c474d064a8
6 changed files with 49 additions and 11 deletions

View File

@ -33,6 +33,12 @@ private:
void rebuildIndexByPosition();
public:
Block() {}
/// нужны, чтобы правильно скопировались индексы
Block(const Block & other);
Block & operator= (const Block & other);
/// вставить столбец в заданную позицию
void insert(size_t position, const ColumnWithNameAndType & elem);
/// вставить столбец в конец

View File

@ -1,3 +1,5 @@
#include <iterator>
#include <DB/Core/Exception.h>
#include <DB/Core/ErrorCodes.h>
@ -7,6 +9,29 @@
namespace DB
{
Block::Block(const Block & other)
{
*this = other;
}
Block & Block::operator= (const Block & other)
{
data = other.data;
rebuildIndexByPosition();
for (IndexByName_t::const_iterator it = other.index_by_name.begin(); it != other.index_by_name.end(); ++it)
{
Container_t::iterator value = data.begin();
std::advance(value, std::distance(const_cast<Block&>(other).data.begin(), it->second));
index_by_name[it->first] = value;
}
return *this;
}
void Block::rebuildIndexByPosition()
{
index_by_position.resize(data.size());

View File

@ -19,26 +19,30 @@ Block LimitBlockInputStream::read()
Block res;
size_t rows = 0;
/// pos - сколько строк было прочитано, включая последний прочитанный блок
if (pos >= offset + limit)
return res;
while (pos + rows <= offset)
do
{
res = input->read();
res.getByPosition(0);
rows = res.rows();
pos += rows;
}
} while (pos <= offset);
if (pos >= offset && pos + rows <= offset + limit)
{
pos += rows;
/// отдать целый блок
if (pos >= offset + rows && pos <= offset + limit)
return res;
}
/// блок, от которого надо выбрать кусок
/// отдать кусок блока
size_t start = std::max(0, static_cast<int>(offset) + static_cast<int>(rows) - static_cast<int>(pos));
size_t length = std::min(rows - start, limit + offset + rows - pos);
for (size_t i = 0; i < res.columns(); ++i)
res.getByPosition(i).column->cut(std::max(0, static_cast<int>(offset) - static_cast<int>(pos)), limit);
pos += rows;
res.getByPosition(i).column->cut(start, length);
return res;
}

View File

@ -1,5 +1,6 @@
#include <DB/DataStreams/RowInputStreamFromBlockInputStream.h>
namespace DB
{

View File

@ -22,7 +22,9 @@ Block NumbersBlockInputStream::read()
{
Block res;
res.insert(ColumnWithNameAndType());
ColumnWithNameAndType & column_with_name_and_type = res.getByPosition(0);
column_with_name_and_type.name = "number";
column_with_name_and_type.type = new DataTypeUInt64();
ColumnUInt64 * column = new ColumnUInt64(block_size);
@ -31,7 +33,7 @@ Block NumbersBlockInputStream::read()
for (size_t i = 0; i < block_size; ++i)
vec[i] = next++;
return res;
}

View File

@ -23,7 +23,7 @@ int main(int argc, char ** argv)
Poco::SharedPtr<DB::DataTypes> column_types = new DB::DataTypes;
column_types->push_back(new DB::DataTypeUInt64);
DB::LimitBlockInputStream input(table.read(column_names, 0, 10), 100, 100);
DB::LimitBlockInputStream input(table.read(column_names, 0, 10), 10, 96);
DB::TabSeparatedRowOutputStream output(std::cout, column_types);
DB::copyData(input, output);