dbms: development.

This commit is contained in:
Alexey Milovidov 2010-03-18 20:52:28 +00:00
parent fb5ca4d182
commit 8c8fe0439f
4 changed files with 27 additions and 22 deletions

View File

@ -8,6 +8,8 @@
#include <Poco/FileStream.h>
#include <DB/Core/NamesAndTypes.h>
#include <DB/Common/CompressedInputStream.h>
#include <DB/Common/CompressedOutputStream.h>
#include <DB/Storages/IStorage.h>
@ -27,7 +29,16 @@ private:
const ColumnNames & column_names;
StorageLog & storage;
typedef std::map<std::string, SharedPtr<Poco::FileInputStream> > FileStreams;
struct Stream
{
Stream(const std::string & path)
: plain(path, std::ios::in | std::ios::binary), compressed(plain) {}
Poco::FileInputStream plain;
CompressedInputStream compressed;
};
typedef std::map<std::string, SharedPtr<Stream> > FileStreams;
FileStreams streams;
};
@ -40,7 +51,16 @@ public:
private:
StorageLog & storage;
typedef std::map<std::string, SharedPtr<Poco::FileOutputStream> > FileStreams;
struct Stream
{
Stream(const std::string & path)
: plain(path, std::ios::out | std::ios::ate | std::ios::binary), compressed(plain) {}
Poco::FileOutputStream plain;
CompressedOutputStream compressed;
};
typedef std::map<std::string, SharedPtr<Stream> > FileStreams;
FileStreams streams;
};

View File

@ -22,19 +22,6 @@ CompressingStreamBuf::~CompressingStreamBuf()
}
void CompressingStreamBuf::writeCompressedChunk()
{
size_t compressed_size = qlz_compress(
&uncompressed_buffer[0],
&compressed_buffer[0],
pos_in_buffer,
&scratch[0]);
p_ostr->write(&compressed_buffer[0], compressed_size);
pos_in_buffer = 0;
}
int CompressingStreamBuf::close()
{
sync();

View File

@ -28,8 +28,7 @@ Block LogBlockInputStream::read()
throw Exception("There is no column with name " + *it + " in table.",
ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
streams.insert(std::make_pair(*it,
new Poco::FileInputStream(storage.files[*it].path(), std::ios::in | std::ios::binary)));
streams.insert(std::make_pair(*it, new Stream(storage.files[*it].path())));
}
for (ColumnNames::const_iterator it = column_names.begin(); it != column_names.end(); ++it)
@ -38,7 +37,7 @@ Block LogBlockInputStream::read()
column.name = *it;
column.type = (*storage.columns)[*it];
column.column = column.type->createColumn();
column.type->deserializeBinary(*column.column, *streams[column.name], block_size);
column.type->deserializeBinary(*column.column, streams[column.name]->compressed, block_size);
res.insert(column);
}
@ -62,14 +61,13 @@ void LogBlockOutputStream::write(const Block & block)
throw Exception("There is no column with name " + name + " in table.",
ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
streams.insert(std::make_pair(name,
new Poco::FileOutputStream(storage.files[name].path(), std::ios::out | std::ios::ate | std::ios::binary)));
streams.insert(std::make_pair(name, new Stream(storage.files[name].path())));
}
for (size_t i = 0; i < block.columns(); ++i)
{
const ColumnWithNameAndType & column = block.getByPosition(i);
column.type->serializeBinary(*column.column, *streams[column.name]);
column.type->serializeBinary(*column.column, streams[column.name]->compressed);
}
}

View File

@ -70,7 +70,7 @@ int main(int argc, char ** argv)
data_types->push_back(new DB::DataTypeUInt64);
data_types->push_back(new DB::DataTypeUInt8);
DB::LimitBlockInputStream in_limit(in, 1000000);
DB::LimitBlockInputStream in_limit(in, 10);
DB::TabSeparatedRowOutputStream output(std::cout, data_types);
DB::copyData(in_limit, output);