dbms: development.

This commit is contained in:
Alexey Milovidov 2011-08-19 19:18:15 +00:00
parent c972f291cb
commit ba52b98e0a
5 changed files with 278 additions and 0 deletions

View File

@ -0,0 +1,29 @@
#pragma once
#include <DB/DataTypes/DataTypeFactory.h>
#include <DB/DataStreams/IBlockInputStream.h>
namespace DB
{
/** Десериализует поток блоков из родного бинарного формата (с именами и типами столбцов).
* Предназначено для взаимодействия между серверами.
*/
class NativeBlockInputStream : public IBlockInputStream
{
public:
NativeBlockInputStream(ReadBuffer & istr_, DataTypeFactory & data_type_factory_)
: istr(istr_), data_type_factory(data_type_factory_) {}
/** Прочитать следующий блок.
* Если блоков больше нет - вернуть пустой блок (для которого operator bool возвращает false).
*/
Block read();
private:
ReadBuffer & istr;
DataTypeFactory & data_type_factory;
};
}

View File

@ -0,0 +1,25 @@
#pragma once
#include <DB/DataStreams/IBlockOutputStream.h>
namespace DB
{
/** Сериализует поток блоков в родном бинарном формате (с именами и типами столбцов).
* Предназначено для взаимодействия между серверами.
*/
class NativeBlockOutputStream : public IBlockOutputStream
{
public:
NativeBlockOutputStream(WriteBuffer & ostr_) : ostr(ostr_) {}
/** Записать блок.
*/
void write(const Block & block);
private:
WriteBuffer & ostr;
};
}

View File

@ -0,0 +1,48 @@
#include <DB/IO/ReadHelpers.h>
#include <DB/IO/VarInt.h>
#include <DB/DataStreams/NativeBlockInputStream.h>
namespace DB
{
Block NativeBlockInputStream::read()
{
Block res;
if (istr.eof())
return res;
/// Размеры
size_t columns = 0;
size_t rows = 0;
readVarUInt(columns, istr);
readVarUInt(rows, istr);
for (size_t i = 0; i < columns; ++i)
{
ColumnWithNameAndType column;
/// Имя
readStringBinary(column.name, istr);
/// Тип
String type_name;
readStringBinary(type_name, istr);
column.type = data_type_factory.get(type_name);
/// Данные
column.column = column.type->createColumn();
column.type->deserializeBinary(*column.column, istr, rows);
if (column.column->size() != rows)
throw Exception("Cannot read all data in NativeBlockInputStream.", ErrorCodes::CANNOT_READ_ALL_DATA);
res.insert(column);
}
return res;
}
}

View File

@ -0,0 +1,33 @@
#include <DB/IO/WriteHelpers.h>
#include <DB/IO/VarInt.h>
#include <DB/DataStreams/NativeBlockOutputStream.h>
namespace DB
{
void NativeBlockOutputStream::write(const Block & block)
{
/// Размеры
size_t columns = block.columns();
size_t rows = block.rows();
writeVarUInt(columns, ostr);
writeVarUInt(rows, ostr);
for (size_t i = 0; i < columns; ++i)
{
const ColumnWithNameAndType & column = block.getByPosition(i);
/// Имя
writeStringBinary(column.name, ostr);
/// Тип
writeStringBinary(column.type->getName(), ostr);
/// Данные
column.type->serializeBinary(*column.column, ostr);
}
}
}

View File

@ -0,0 +1,143 @@
#include <string>
#include <iostream>
#include <fstream>
#include <boost/assign/list_inserter.hpp>
#include <Poco/Stopwatch.h>
#include <Poco/SharedPtr.h>
#include <DB/IO/ReadBufferFromIStream.h>
#include <DB/IO/WriteBufferFromOStream.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataTypes/DataTypeString.h>
#include <DB/DataTypes/DataTypeFixedString.h>
#include <DB/DataTypes/DataTypeDateTime.h>
#include <DB/DataTypes/DataTypeFactory.h>
#include <DB/DataStreams/NativeBlockInputStream.h>
#include <DB/DataStreams/NativeBlockOutputStream.h>
#include <DB/DataStreams/copyData.h>
#include <DB/Storages/StorageLog.h>
int main(int argc, char ** argv)
{
using Poco::SharedPtr;
try
{
typedef std::pair<std::string, SharedPtr<DB::IDataType> > NameAndTypePair;
typedef std::list<NameAndTypePair> NamesAndTypesList;
NamesAndTypesList names_and_types_list;
boost::assign::push_back(names_and_types_list)
("WatchID", new DB::DataTypeUInt64)
("JavaEnable", new DB::DataTypeUInt8)
("Title", new DB::DataTypeString)
("GoodEvent", new DB::DataTypeUInt32)
("EventTime", new DB::DataTypeDateTime)
("CounterID", new DB::DataTypeUInt32)
("ClientIP", new DB::DataTypeUInt32)
("RegionID", new DB::DataTypeUInt32)
("UniqID", new DB::DataTypeUInt64)
("CounterClass", new DB::DataTypeUInt8)
("OS", new DB::DataTypeUInt8)
("UserAgent", new DB::DataTypeUInt8)
("URL", new DB::DataTypeString)
("Referer", new DB::DataTypeString)
("Refresh", new DB::DataTypeUInt8)
("ResolutionWidth", new DB::DataTypeUInt16)
("ResolutionHeight", new DB::DataTypeUInt16)
("ResolutionDepth", new DB::DataTypeUInt8)
("FlashMajor", new DB::DataTypeUInt8)
("FlashMinor", new DB::DataTypeUInt8)
("FlashMinor2", new DB::DataTypeString)
("NetMajor", new DB::DataTypeUInt8)
("NetMinor", new DB::DataTypeUInt8)
("UserAgentMajor", new DB::DataTypeUInt16)
("UserAgentMinor", new DB::DataTypeFixedString(2))
("CookieEnable", new DB::DataTypeUInt8)
("JavascriptEnable", new DB::DataTypeUInt8)
("IsMobile", new DB::DataTypeUInt8)
("MobilePhone", new DB::DataTypeUInt8)
("MobilePhoneModel", new DB::DataTypeString)
("Params", new DB::DataTypeString)
("IPNetworkID", new DB::DataTypeUInt32)
("TraficSourceID", new DB::DataTypeInt8)
("SearchEngineID", new DB::DataTypeUInt16)
("SearchPhrase", new DB::DataTypeString)
("AdvEngineID", new DB::DataTypeUInt8)
("IsArtifical", new DB::DataTypeUInt8)
("WindowClientWidth", new DB::DataTypeUInt16)
("WindowClientHeight", new DB::DataTypeUInt16)
("ClientTimeZone", new DB::DataTypeInt16)
("ClientEventTime", new DB::DataTypeDateTime)
("SilverlightVersion1", new DB::DataTypeUInt8)
("SilverlightVersion2", new DB::DataTypeUInt8)
("SilverlightVersion3", new DB::DataTypeUInt32)
("SilverlightVersion4", new DB::DataTypeUInt16)
("PageCharset", new DB::DataTypeString)
("CodeVersion", new DB::DataTypeUInt32)
("IsLink", new DB::DataTypeUInt8)
("IsDownload", new DB::DataTypeUInt8)
("IsNotBounce", new DB::DataTypeUInt8)
("FUniqID", new DB::DataTypeUInt64)
("OriginalURL", new DB::DataTypeString)
("HID", new DB::DataTypeUInt32)
("IsOldCounter", new DB::DataTypeUInt8)
("IsEvent", new DB::DataTypeUInt8)
("IsParameter", new DB::DataTypeUInt8)
("DontCountHits", new DB::DataTypeUInt8)
("WithHash", new DB::DataTypeUInt8)
;
SharedPtr<DB::NamesAndTypes> names_and_types_map = new DB::NamesAndTypes;
SharedPtr<DB::DataTypes> data_types = new DB::DataTypes;
DB::Names column_names;
for (NamesAndTypesList::const_iterator it = names_and_types_list.begin(); it != names_and_types_list.end(); ++it)
{
names_and_types_map->insert(*it);
data_types->push_back(it->second);
column_names.push_back(it->first);
}
/// создаём объект существующей таблицы хит лога
DB::StorageLog table("./", "HitLog", names_and_types_map, ".bin");
/// читаем из неё
if (argc == 2 && 0 == strcmp(argv[1], "read"))
{
SharedPtr<DB::IBlockInputStream> in = table.read(column_names, 0);
DB::WriteBufferFromOStream out1(std::cout);
DB::CompressedWriteBuffer out2(out1);
DB::NativeBlockOutputStream out3(out2);
DB::copyData(*in, out3);
}
/// читаем данные из native файла и одновременно пишем в таблицу
if (argc == 2 && 0 == strcmp(argv[1], "write"))
{
DB::DataTypeFactory factory;
DB::ReadBufferFromIStream in1(std::cin);
DB::CompressedReadBuffer in2(in1);
DB::NativeBlockInputStream in3(in2, factory);
SharedPtr<DB::IBlockOutputStream> out = table.write(0);
DB::copyData(in3, *out);
}
}
catch (const DB::Exception & e)
{
std::cerr << e.what() << ", " << e.message() << std::endl;
return 1;
}
return 0;
}