dbms: development [#CONV-2944].

This commit is contained in:
Alexey Milovidov 2011-08-27 22:43:31 +00:00
parent 557110c826
commit 8fec27c1f6
9 changed files with 315 additions and 4 deletions

View File

@ -132,6 +132,11 @@ public:
tmp.swap(offsets);
}
size_t byteSize()
{
return data->byteSize() + offsets.size() * sizeof(offsets[0]);
}
/** Более эффективные методы манипуляции */
IColumn & getData()
{

View File

@ -36,6 +36,8 @@ public:
void insertDefault() { ++s; }
void filter(const Filter & filt) {}
size_t byteSize() { return sizeof(data) + sizeof(s); }
/** Более эффективные методы манипуляции */
T & getData() { return data; }
const T & getData() const { return data; }

View File

@ -94,6 +94,11 @@ public:
data->filter(nested_filt);
}
size_t byteSize()
{
return data->byteSize() + sizeof(n);
}
const IColumn & getData() const
{
return *data;

View File

@ -96,6 +96,11 @@ public:
tmp.swap(data);
}
size_t byteSize()
{
return data.size() * sizeof(data[0]);
}
/** Более эффективные методы манипуляции */
Container_t & getData()
{

View File

@ -65,6 +65,9 @@ public:
/** Очистить */
virtual void clear() = 0;
/** Приблизительный размер столбца в оперативке в байтах - для профайлинга. 0 - если неизвестно. */
virtual size_t byteSize() = 0;
virtual ~IColumn() {}
};

View File

@ -0,0 +1,52 @@
#pragma once
#include <Poco/SharedPtr.h>
#include <Poco/Stopwatch.h>
#include <DB/DataStreams/IBlockInputStream.h>
namespace DB
{
using Poco::SharedPtr;
/// Информация для профайлинга.
struct BlockStreamProfileInfo
{
bool started;
Poco::Stopwatch work_stopwatch; /// Время вычислений (выполнения функции read())
Poco::Stopwatch total_stopwatch; /// Время с учётом ожидания
size_t rows;
size_t blocks;
size_t bytes;
BlockStreamProfileInfo() : started(false), rows(0), blocks(0), bytes(0) {}
void update(Block & block);
void print(std::ostream & ostr) const;
};
/** Смотрит за тем, как работает другой поток блоков.
* Позволяет получить информацию для профайлинга:
* строк в секунду, блоков в секунду, мегабайт в секунду и т. п.
*/
class ProfilingBlockInputStream : public IBlockInputStream
{
public:
ProfilingBlockInputStream(SharedPtr<IBlockInputStream> in_)
: in(in_) {}
Block read();
const BlockStreamProfileInfo & getInfo() const;
private:
SharedPtr<IBlockInputStream> in;
BlockStreamProfileInfo info;
};
}

View File

@ -1,5 +1,4 @@
#ifndef DBMS_STORAGES_STORAGE_SYSTEM_NUMBERS_H
#define DBMS_STORAGES_STORAGE_SYSTEM_NUMBERS_H
#pragma once
#include <Poco/SharedPtr.h>
@ -48,5 +47,3 @@ private:
};
}
#endif

View File

@ -0,0 +1,53 @@
#include <iomanip>
#include <DB/DataStreams/ProfilingBlockInputStream.h>
namespace DB
{
void BlockStreamProfileInfo::update(Block & block)
{
++blocks;
rows += block.rows();
for (size_t i = 0; i < block.columns(); ++i)
bytes += block.getByPosition(i).column->byteSize();
}
void BlockStreamProfileInfo::print(std::ostream & ostr) const
{
ostr << std::fixed << std::setprecision(2)
<< "Elapsed: " << work_stopwatch.elapsed() / 1000000.0 << " sec., " << std::endl
<< "Rows: " << rows << ", per second: " << rows * 1000000 / work_stopwatch.elapsed() << ", " << std::endl
<< "Blocks: " << blocks << ", per second: " << blocks * 1000000.0 / work_stopwatch.elapsed() << ", " << std::endl
<< bytes / 1000000.0 << " MB (memory), " << bytes / work_stopwatch.elapsed() << " MB/s (memory), " << std::endl
<< "Average block size: " << rows / blocks << "." << std::endl
<< "Idle time: " << (total_stopwatch.elapsed() - work_stopwatch.elapsed()) * 100.0 / total_stopwatch.elapsed() << "%" << std::endl;
}
Block ProfilingBlockInputStream::read()
{
if (!info.started)
info.total_stopwatch.start();
info.work_stopwatch.start();
Block res = in->read();
info.work_stopwatch.stop();
if (res)
info.update(res);
return res;
}
const BlockStreamProfileInfo & ProfilingBlockInputStream::getInfo() const
{
return info;
}
}

View File

@ -0,0 +1,189 @@
#include <iostream>
#include <iomanip>
#include <boost/assign/list_inserter.hpp>
#include <Poco/SharedPtr.h>
#include <Poco/Stopwatch.h>
#include <Poco/NumberParser.h>
#include <DB/IO/WriteBufferFromOStream.h>
#include <DB/Storages/StorageLog.h>
#include <DB/DataStreams/LimitBlockInputStream.h>
#include <DB/DataStreams/ExpressionBlockInputStream.h>
#include <DB/DataStreams/FilterBlockInputStream.h>
#include <DB/DataStreams/ProfilingBlockInputStream.h>
#include <DB/DataStreams/TabSeparatedRowOutputStream.h>
#include <DB/DataStreams/copyData.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/Functions/FunctionsArithmetic.h>
#include <DB/Functions/FunctionsComparison.h>
#include <DB/Functions/FunctionsLogical.h>
#include <DB/Parsers/ParserSelectQuery.h>
#include <DB/Parsers/formatAST.h>
using Poco::SharedPtr;
int main(int argc, char ** argv)
{
try
{
typedef std::pair<std::string, SharedPtr<DB::IDataType> > NameAndTypePair;
typedef std::list<NameAndTypePair> NamesAndTypesList;
NamesAndTypesList names_and_types_list;
boost::assign::push_back(names_and_types_list)
("WatchID", new DB::DataTypeUInt64)
("JavaEnable", new DB::DataTypeUInt8)
("Title", new DB::DataTypeString)
("GoodEvent", new DB::DataTypeUInt32)
("EventTime", new DB::DataTypeDateTime)
("CounterID", new DB::DataTypeUInt32)
("ClientIP", new DB::DataTypeUInt32)
("RegionID", new DB::DataTypeUInt32)
("UniqID", new DB::DataTypeUInt64)
("CounterClass", new DB::DataTypeUInt8)
("OS", new DB::DataTypeUInt8)
("UserAgent", new DB::DataTypeUInt8)
("URL", new DB::DataTypeString)
("Referer", new DB::DataTypeString)
("Refresh", new DB::DataTypeUInt8)
("ResolutionWidth", new DB::DataTypeUInt16)
("ResolutionHeight", new DB::DataTypeUInt16)
("ResolutionDepth", new DB::DataTypeUInt8)
("FlashMajor", new DB::DataTypeUInt8)
("FlashMinor", new DB::DataTypeUInt8)
("FlashMinor2", new DB::DataTypeString)
("NetMajor", new DB::DataTypeUInt8)
("NetMinor", new DB::DataTypeUInt8)
("UserAgentMajor", new DB::DataTypeUInt16)
("UserAgentMinor", new DB::DataTypeFixedString(2))
("CookieEnable", new DB::DataTypeUInt8)
("JavascriptEnable", new DB::DataTypeUInt8)
("IsMobile", new DB::DataTypeUInt8)
("MobilePhone", new DB::DataTypeUInt8)
("MobilePhoneModel", new DB::DataTypeString)
("Params", new DB::DataTypeString)
("IPNetworkID", new DB::DataTypeUInt32)
("TraficSourceID", new DB::DataTypeInt8)
("SearchEngineID", new DB::DataTypeUInt16)
("SearchPhrase", new DB::DataTypeString)
("AdvEngineID", new DB::DataTypeUInt8)
("IsArtifical", new DB::DataTypeUInt8)
("WindowClientWidth", new DB::DataTypeUInt16)
("WindowClientHeight", new DB::DataTypeUInt16)
("ClientTimeZone", new DB::DataTypeInt16)
("ClientEventTime", new DB::DataTypeDateTime)
("SilverlightVersion1", new DB::DataTypeUInt8)
("SilverlightVersion2", new DB::DataTypeUInt8)
("SilverlightVersion3", new DB::DataTypeUInt32)
("SilverlightVersion4", new DB::DataTypeUInt16)
("PageCharset", new DB::DataTypeString)
("CodeVersion", new DB::DataTypeUInt32)
("IsLink", new DB::DataTypeUInt8)
("IsDownload", new DB::DataTypeUInt8)
("IsNotBounce", new DB::DataTypeUInt8)
("FUniqID", new DB::DataTypeUInt64)
("OriginalURL", new DB::DataTypeString)
("HID", new DB::DataTypeUInt32)
("IsOldCounter", new DB::DataTypeUInt8)
("IsEvent", new DB::DataTypeUInt8)
("IsParameter", new DB::DataTypeUInt8)
("DontCountHits", new DB::DataTypeUInt8)
("WithHash", new DB::DataTypeUInt8)
;
SharedPtr<DB::NamesAndTypes> names_and_types_map = new DB::NamesAndTypes;
DB::Context context;
(*context.functions)["plus"] = new DB::FunctionPlus;
(*context.functions)["minus"] = new DB::FunctionMinus;
(*context.functions)["multiply"] = new DB::FunctionMultiply;
(*context.functions)["divide"] = new DB::FunctionDivideFloating;
(*context.functions)["intDiv"] = new DB::FunctionDivideIntegral;
(*context.functions)["modulo"] = new DB::FunctionModulo;
(*context.functions)["equals"] = new DB::FunctionEquals;
(*context.functions)["notEquals"] = new DB::FunctionNotEquals;
(*context.functions)["less"] = new DB::FunctionLess;
(*context.functions)["greater"] = new DB::FunctionGreater;
(*context.functions)["lessOrEquals"] = new DB::FunctionLessOrEquals;
(*context.functions)["greaterOrEquals"] = new DB::FunctionGreaterOrEquals;
(*context.functions)["and"] = new DB::FunctionAnd;
(*context.functions)["or"] = new DB::FunctionOr;
(*context.functions)["xor"] = new DB::FunctionXor;
(*context.functions)["not"] = new DB::FunctionNot;
for (NamesAndTypesList::const_iterator it = names_and_types_list.begin(); it != names_and_types_list.end(); ++it)
{
names_and_types_map->insert(*it);
context.columns[it->first] = it->second;
}
DB::ParserSelectQuery parser;
DB::ASTPtr ast;
std::string input = "SELECT UniqID, URL, CounterID, IsLink, URL = 'http://mail.yandex.ru/neo2/#inbox'";
std::string expected;
const char * begin = input.data();
const char * end = begin + input.size();
const char * pos = begin;
if (!parser.parse(pos, end, ast, expected))
{
std::cout << "Failed at position " << (pos - begin) << ": "
<< mysqlxx::quote << input.substr(pos - begin, 10)
<< ", expected " << expected << "." << std::endl;
}
DB::formatAST(*ast, std::cerr);
std::cerr << std::endl;
std::cerr << ast->getTreeID() << std::endl;
/// создаём объект существующей таблицы хит лога
DB::StorageLog table("./", "HitLog", names_and_types_map, ".bin");
/// читаем из неё, применяем выражение, фильтруем, и пишем в tsv виде в консоль
Poco::SharedPtr<DB::Expression> expression = new DB::Expression(ast, context);
DB::Names column_names;
boost::assign::push_back(column_names)
("UniqID")
("URL")
("CounterID")
("IsLink")
;
Poco::SharedPtr<DB::IBlockInputStream> in = table.read(column_names, 0);
Poco::SharedPtr<DB::ProfilingBlockInputStream> profiling = new DB::ProfilingBlockInputStream(in);
in = new DB::ExpressionBlockInputStream(profiling, expression);
in = new DB::FilterBlockInputStream(in, 4);
//in = new DB::LimitBlockInputStream(in, 10, std::max(static_cast<Int64>(0), static_cast<Int64>(n) - 10));
DB::WriteBufferFromOStream ob(std::cout);
DB::TabSeparatedRowOutputStream out(ob, new DB::DataTypes(expression->getReturnTypes()));
DB::copyData(*in, out);
profiling->getInfo().print(std::cerr);
}
catch (const DB::Exception & e)
{
std::cerr << e.what() << ", " << e.message() << std::endl;
return 1;
}
return 0;
}