dbms: development [#CONV-2944].

This commit is contained in:
Alexey Milovidov 2011-09-04 00:22:19 +00:00
parent f5f4599922
commit 971c98f03a
11 changed files with 411 additions and 7 deletions

View File

@ -134,6 +134,51 @@ public:
tmp.swap(offsets);
}
void permute(const Permutation & perm)
{
size_t size = offsets.size();
if (size != perm.size())
throw Exception("Size of permutation doesn't match size of column.", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
if (size == 0)
return;
Permutation nested_perm(offsets.back());
Offsets_t tmp_offsets(size);
size_t current_offset = 0;
for (size_t i = 0; i < size; ++i)
{
for (size_t j = 0; j < sizeAt(perm[i]); ++j)
nested_perm[current_offset + j] = offsetAt(perm[i]) + j;
current_offset += sizeAt(perm[i]);
tmp_offsets[i] = current_offset;
}
data->permute(nested_perm);
tmp_offsets.swap(offsets);
}
int compareAt(size_t n, size_t m, const IColumn & rhs_) const
{
const ColumnArray & rhs = static_cast<const ColumnArray &>(rhs_);
/// Не оптимально
size_t lhs_size = sizeAt(n);
size_t rhs_size = rhs.sizeAt(m);
size_t min_size = std::min(lhs_size, rhs_size);
for (size_t i = 0; i < min_size; ++i)
if (int res = data->compareAt(offsetAt(n) + i, rhs.offsetAt(m) + i, *rhs.data))
return res;
return lhs_size < rhs_size
? -1
: (lhs_size == rhs_size
? 0
: 1);
}
size_t byteSize()
{
return data->byteSize() + offsets.size() * sizeof(offsets[0]);

View File

@ -49,6 +49,22 @@ public:
size_t byteSize() { return sizeof(data) + sizeof(s); }
void permute(const Permutation & perm)
{
if (s != perm.size())
throw Exception("Size of permutation doesn't match size of column.", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
}
int compareAt(size_t n, size_t m, const IColumn & rhs_) const
{
const ColumnConst<T> & rhs = static_cast<const ColumnConst<T> &>(rhs_);
return data < rhs.data
? -1
: (data == rhs.data
? 0
: 1);
}
/** Более эффективные методы манипуляции */
T & getData() { return data; }
const T & getData() const { return data; }

View File

@ -96,6 +96,33 @@ public:
data->filter(nested_filt);
}
void permute(const Permutation & perm)
{
size_t size = this->size();
if (size != perm.size())
throw Exception("Size of permutation doesn't match size of column.", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
if (size == 0)
return;
Permutation nested_perm(size * n);
for (size_t i = 0; i < size; ++i)
memset(&nested_perm[i * n], perm[i], n);
data->permute(nested_perm);
}
int compareAt(size_t p1, size_t p2, const IColumn & rhs_) const
{
const ColumnFixedArray & rhs = static_cast<const ColumnFixedArray &>(rhs_);
/// Не оптимально
for (size_t i = 0; i < n; ++i)
if (int res = data->compareAt(p1 * n + i, p2 * n + i, *rhs.data))
return res;
return 0;
}
size_t byteSize()
{
return data->byteSize() + sizeof(n);

View File

@ -50,6 +50,12 @@ public:
{
char_data.resize(char_data.size() + n);
}
int compareAt(size_t p1, size_t p2, const IColumn & rhs_) const
{
const ColumnFixedString & rhs = static_cast<const ColumnFixedString &>(rhs_);
return memcmp(&char_data[p1 * n], &rhs.char_data[p2 * n], n);
}
};

View File

@ -1,7 +1,6 @@
#ifndef DBMS_CORE_COLUMN_STRING_H
#define DBMS_CORE_COLUMN_STRING_H
#pragma once
#include <string.h> // memcpy
#include <string.h>
#include <DB/Columns/ColumnArray.h>
#include <DB/Columns/ColumnsNumber.h>
@ -56,9 +55,25 @@ public:
char_data.push_back(0);
offsets.push_back(offsets.size() == 0 ? 1 : (offsets.back() + 1));
}
int compareAt(size_t n, size_t m, const IColumn & rhs_) const
{
const ColumnString & rhs = static_cast<const ColumnString &>(rhs_);
size_t lhs_size = sizeAt(n);
size_t rhs_size = rhs.sizeAt(m);
size_t min_size = std::min(lhs_size, rhs_size);
if (size_t res = memcmp(&char_data[offsetAt(n)], &rhs.char_data[rhs.offsetAt(m)], min_size))
return res;
return lhs_size < rhs_size
? -1
: (lhs_size == rhs_size
? 0
: 1);
return 0;
}
};
}
#endif

View File

@ -103,6 +103,28 @@ public:
return data.size() * sizeof(data[0]);
}
void permute(const Permutation & perm)
{
size_t size = data.size();
if (size != perm.size())
throw Exception("Size of permutation doesn't match size of column.", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
Container_t tmp(size);
for (size_t i = 0; i < size; ++i)
tmp[i] = data[perm[i]];
tmp.swap(data);
}
int compareAt(size_t n, size_t m, const IColumn & rhs_) const
{
const ColumnVector<T> & rhs = static_cast<const ColumnVector<T> &>(rhs_);
return data[n] < rhs.data[m]
? -1
: (data[n] == rhs.data[m]
? 0
: 1);
}
/** Более эффективные методы манипуляции */
Container_t & getData()
{

View File

@ -68,6 +68,18 @@ public:
typedef std::vector<UInt8> Filter;
virtual void filter(const Filter & filt) = 0;
/** Переставить значения местами, используя указанную перестановку.
* Используется при сортировке.
*/
typedef std::vector<size_t> Permutation;
virtual void permute(const Permutation & perm) = 0;
/** Сравнить (*this)[n] и rhs[m].
* Вернуть отрицательное число, 0, или положительное число, если меньше, равно, или больше, соответственно.
* Используется при сортировке.
*/
virtual int compareAt(size_t n, size_t m, const IColumn & rhs) const = 0;
/** Очистить */
virtual void clear() = 0;

View File

@ -1,7 +1,7 @@
#pragma once
#include <string>
#include <boost/none.hpp>
#include <boost/strong_typedef.hpp>
#include <Poco/Types.h>
#include <Poco/SharedPtr.h>
@ -12,7 +12,7 @@ namespace DB
/** Типы данных для представления значений из БД в оперативке.
*/
typedef boost::none_t Null;
BOOST_STRONG_TYPEDEF(char, Null);
typedef Poco::UInt8 UInt8;
typedef Poco::UInt16 UInt16;

View File

@ -0,0 +1,27 @@
#pragma once
#include <DB/Core/ColumnNumbers.h>
#include <DB/DataStreams/IBlockInputStream.h>
namespace DB
{
/** Сортирует каждый блок по отдельности по значениям указанных столбцов.
* На данный момент, используется сильно неоптимальный алгоритм.
*/
class PartialSortingBlockInputStream : public IBlockInputStream
{
public:
PartialSortingBlockInputStream(BlockInputStreamPtr input_, ColumnNumbers & column_numbers_)
: input(input_), column_numbers(column_numbers_) {}
Block read();
private:
BlockInputStreamPtr input;
ColumnNumbers column_numbers;
};
}

View File

@ -0,0 +1,54 @@
#include <DB/DataStreams/PartialSortingBlockInputStream.h>
namespace DB
{
struct PartialSortingLess
{
typedef std::vector<const IColumn *> Columns;
Columns columns;
PartialSortingLess(const Block & block, const ColumnNumbers & column_numbers)
{
for (size_t i = 0, size = column_numbers.size(); i < size; ++i)
columns.push_back(&*block.getByPosition(column_numbers[i]).column);
}
bool operator() (size_t a, size_t b) const
{
for (Columns::const_iterator it = columns.begin(); it != columns.end(); ++it)
{
int res = (*it)->compareAt(a, b, **it);
if (res < 0)
return true;
else if (res > 0)
return false;
}
return false;
}
};
Block PartialSortingBlockInputStream::read()
{
Block res = input->read();
if (!res)
return res;
size_t size = res.rows();
IColumn::Permutation perm(size);
for (size_t i = 0; i < size; ++i)
perm[i] = i;
PartialSortingLess less(res, column_numbers);
std::sort(perm.begin(), perm.end(), less);
size_t columns = res.columns();
for (size_t i = 0; i < columns; ++i)
res.getByPosition(i).column->permute(perm);
return res;
}
}

View File

@ -0,0 +1,180 @@
#include <iostream>
#include <iomanip>
#include <boost/assign/list_inserter.hpp>
#include <Poco/SharedPtr.h>
#include <Poco/Stopwatch.h>
#include <Poco/NumberParser.h>
#include <DB/IO/WriteBufferFromOStream.h>
#include <DB/Storages/StorageLog.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataTypes/DataTypeString.h>
#include <DB/DataTypes/DataTypeFixedString.h>
#include <DB/DataTypes/DataTypeDateTime.h>
#include <DB/DataStreams/LimitBlockInputStream.h>
#include <DB/DataStreams/PartialSortingBlockInputStream.h>
#include <DB/DataStreams/ProfilingBlockInputStream.h>
#include <DB/DataStreams/TabSeparatedRowOutputStream.h>
#include <DB/DataStreams/copyData.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/Parsers/ParserSelectQuery.h>
#include <DB/Parsers/formatAST.h>
using Poco::SharedPtr;
int main(int argc, char ** argv)
{
try
{
typedef std::pair<std::string, SharedPtr<DB::IDataType> > NameAndTypePair;
typedef std::list<NameAndTypePair> NamesAndTypesList;
NamesAndTypesList names_and_types_list;
boost::assign::push_back(names_and_types_list)
("WatchID", new DB::DataTypeUInt64)
("JavaEnable", new DB::DataTypeUInt8)
("Title", new DB::DataTypeString)
("GoodEvent", new DB::DataTypeUInt32)
("EventTime", new DB::DataTypeDateTime)
("CounterID", new DB::DataTypeUInt32)
("ClientIP", new DB::DataTypeUInt32)
("RegionID", new DB::DataTypeUInt32)
("UniqID", new DB::DataTypeUInt64)
("CounterClass", new DB::DataTypeUInt8)
("OS", new DB::DataTypeUInt8)
("UserAgent", new DB::DataTypeUInt8)
("URL", new DB::DataTypeString)
("Referer", new DB::DataTypeString)
("Refresh", new DB::DataTypeUInt8)
("ResolutionWidth", new DB::DataTypeUInt16)
("ResolutionHeight", new DB::DataTypeUInt16)
("ResolutionDepth", new DB::DataTypeUInt8)
("FlashMajor", new DB::DataTypeUInt8)
("FlashMinor", new DB::DataTypeUInt8)
("FlashMinor2", new DB::DataTypeString)
("NetMajor", new DB::DataTypeUInt8)
("NetMinor", new DB::DataTypeUInt8)
("UserAgentMajor", new DB::DataTypeUInt16)
("UserAgentMinor", new DB::DataTypeFixedString(2))
("CookieEnable", new DB::DataTypeUInt8)
("JavascriptEnable", new DB::DataTypeUInt8)
("IsMobile", new DB::DataTypeUInt8)
("MobilePhone", new DB::DataTypeUInt8)
("MobilePhoneModel", new DB::DataTypeString)
("Params", new DB::DataTypeString)
("IPNetworkID", new DB::DataTypeUInt32)
("TraficSourceID", new DB::DataTypeInt8)
("SearchEngineID", new DB::DataTypeUInt16)
("SearchPhrase", new DB::DataTypeString)
("AdvEngineID", new DB::DataTypeUInt8)
("IsArtifical", new DB::DataTypeUInt8)
("WindowClientWidth", new DB::DataTypeUInt16)
("WindowClientHeight", new DB::DataTypeUInt16)
("ClientTimeZone", new DB::DataTypeInt16)
("ClientEventTime", new DB::DataTypeDateTime)
("SilverlightVersion1", new DB::DataTypeUInt8)
("SilverlightVersion2", new DB::DataTypeUInt8)
("SilverlightVersion3", new DB::DataTypeUInt32)
("SilverlightVersion4", new DB::DataTypeUInt16)
("PageCharset", new DB::DataTypeString)
("CodeVersion", new DB::DataTypeUInt32)
("IsLink", new DB::DataTypeUInt8)
("IsDownload", new DB::DataTypeUInt8)
("IsNotBounce", new DB::DataTypeUInt8)
("FUniqID", new DB::DataTypeUInt64)
("OriginalURL", new DB::DataTypeString)
("HID", new DB::DataTypeUInt32)
("IsOldCounter", new DB::DataTypeUInt8)
("IsEvent", new DB::DataTypeUInt8)
("IsParameter", new DB::DataTypeUInt8)
("DontCountHits", new DB::DataTypeUInt8)
("WithHash", new DB::DataTypeUInt8)
;
SharedPtr<DB::NamesAndTypes> names_and_types_map = new DB::NamesAndTypes;
for (NamesAndTypesList::const_iterator it = names_and_types_list.begin(); it != names_and_types_list.end(); ++it)
names_and_types_map->insert(*it);
DB::ParserSelectQuery parser;
DB::ASTPtr ast;
std::string input = "SELECT UniqID, URL, CounterID, IsLink";
std::string expected;
const char * begin = input.data();
const char * end = begin + input.size();
const char * pos = begin;
if (!parser.parse(pos, end, ast, expected))
{
std::cout << "Failed at position " << (pos - begin) << ": "
<< mysqlxx::quote << input.substr(pos - begin, 10)
<< ", expected " << expected << "." << std::endl;
}
DB::formatAST(*ast, std::cerr);
std::cerr << std::endl;
std::cerr << ast->getTreeID() << std::endl;
/// создаём объект существующей таблицы хит лога
DB::StorageLog table("./", "HitLog", names_and_types_map, ".bin");
/// читаем из неё, сортируем, и пишем в tsv виде в консоль
DB::Names column_names;
boost::assign::push_back(column_names)
("UniqID")
("URL")
("CounterID")
("IsLink")
;
Poco::SharedPtr<DB::DataTypes> result_types = new DB::DataTypes;
boost::assign::push_back(*result_types)
((*names_and_types_map)["UniqID"])
((*names_and_types_map)["URL"])
((*names_and_types_map)["CounterID"])
((*names_and_types_map)["IsLink"])
;
DB::ColumnNumbers sort_columns;
sort_columns.push_back(1);
sort_columns.push_back(2);
Poco::SharedPtr<DB::IBlockInputStream> in = table.read(column_names, 0);
Poco::SharedPtr<DB::ProfilingBlockInputStream> profiling1 = new DB::ProfilingBlockInputStream(in);
in = profiling1;
in = new DB::PartialSortingBlockInputStream(in, sort_columns);
Poco::SharedPtr<DB::ProfilingBlockInputStream> profiling2 = new DB::ProfilingBlockInputStream(in);
in = profiling2;
//in = new DB::LimitBlockInputStream(in, 10);
DB::WriteBufferFromOStream ob(std::cout);
DB::TabSeparatedRowOutputStream out(ob, result_types);
DB::copyData(*in, out);
std::cerr << std::endl << "Reading: " << std::endl;
profiling1->getInfo().print(std::cerr);
std::cerr << std::endl << "Sorting: " << std::endl;
profiling2->getInfo().print(std::cerr);
}
catch (const DB::Exception & e)
{
std::cerr << e.what() << ", " << e.message() << std::endl;
return 1;
}
return 0;
}