From 971c98f03ac669ac641e487ab7c64b3b68a620f1 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Sun, 4 Sep 2011 00:22:19 +0000 Subject: [PATCH] dbms: development [#CONV-2944]. --- dbms/include/DB/Columns/ColumnArray.h | 45 +++++ dbms/include/DB/Columns/ColumnConst.h | 16 ++ dbms/include/DB/Columns/ColumnFixedArray.h | 27 +++ dbms/include/DB/Columns/ColumnFixedString.h | 6 + dbms/include/DB/Columns/ColumnString.h | 25 ++- dbms/include/DB/Columns/ColumnVector.h | 22 +++ dbms/include/DB/Columns/IColumn.h | 12 ++ dbms/include/DB/Core/Types.h | 4 +- .../PartialSortingBlockInputStream.h | 27 +++ .../PartialSortingBlockInputStream.cpp | 54 ++++++ .../tests/partial_sorting_stream.cpp | 180 ++++++++++++++++++ 11 files changed, 411 insertions(+), 7 deletions(-) create mode 100644 dbms/include/DB/DataStreams/PartialSortingBlockInputStream.h create mode 100644 dbms/src/DataStreams/PartialSortingBlockInputStream.cpp create mode 100644 dbms/src/DataStreams/tests/partial_sorting_stream.cpp diff --git a/dbms/include/DB/Columns/ColumnArray.h b/dbms/include/DB/Columns/ColumnArray.h index f482c454d3f..36b0f6a0c2e 100644 --- a/dbms/include/DB/Columns/ColumnArray.h +++ b/dbms/include/DB/Columns/ColumnArray.h @@ -134,6 +134,51 @@ public: tmp.swap(offsets); } + void permute(const Permutation & perm) + { + size_t size = offsets.size(); + if (size != perm.size()) + throw Exception("Size of permutation doesn't match size of column.", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH); + + if (size == 0) + return; + + Permutation nested_perm(offsets.back()); + + Offsets_t tmp_offsets(size); + size_t current_offset = 0; + + for (size_t i = 0; i < size; ++i) + { + for (size_t j = 0; j < sizeAt(perm[i]); ++j) + nested_perm[current_offset + j] = offsetAt(perm[i]) + j; + current_offset += sizeAt(perm[i]); + tmp_offsets[i] = current_offset; + } + + data->permute(nested_perm); + tmp_offsets.swap(offsets); + } + + int compareAt(size_t n, size_t m, const IColumn & rhs_) const + { + const ColumnArray & rhs = static_cast(rhs_); + + /// Не оптимально + size_t lhs_size = sizeAt(n); + size_t rhs_size = rhs.sizeAt(m); + size_t min_size = std::min(lhs_size, rhs_size); + for (size_t i = 0; i < min_size; ++i) + if (int res = data->compareAt(offsetAt(n) + i, rhs.offsetAt(m) + i, *rhs.data)) + return res; + + return lhs_size < rhs_size + ? -1 + : (lhs_size == rhs_size + ? 0 + : 1); + } + size_t byteSize() { return data->byteSize() + offsets.size() * sizeof(offsets[0]); diff --git a/dbms/include/DB/Columns/ColumnConst.h b/dbms/include/DB/Columns/ColumnConst.h index 016ca90f0c1..11b7ccb579f 100644 --- a/dbms/include/DB/Columns/ColumnConst.h +++ b/dbms/include/DB/Columns/ColumnConst.h @@ -49,6 +49,22 @@ public: size_t byteSize() { return sizeof(data) + sizeof(s); } + void permute(const Permutation & perm) + { + if (s != perm.size()) + throw Exception("Size of permutation doesn't match size of column.", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH); + } + + int compareAt(size_t n, size_t m, const IColumn & rhs_) const + { + const ColumnConst & rhs = static_cast &>(rhs_); + return data < rhs.data + ? -1 + : (data == rhs.data + ? 0 + : 1); + } + /** Более эффективные методы манипуляции */ T & getData() { return data; } const T & getData() const { return data; } diff --git a/dbms/include/DB/Columns/ColumnFixedArray.h b/dbms/include/DB/Columns/ColumnFixedArray.h index d649792e289..62f375edefb 100644 --- a/dbms/include/DB/Columns/ColumnFixedArray.h +++ b/dbms/include/DB/Columns/ColumnFixedArray.h @@ -96,6 +96,33 @@ public: data->filter(nested_filt); } + void permute(const Permutation & perm) + { + size_t size = this->size(); + if (size != perm.size()) + throw Exception("Size of permutation doesn't match size of column.", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH); + + if (size == 0) + return; + + Permutation nested_perm(size * n); + for (size_t i = 0; i < size; ++i) + memset(&nested_perm[i * n], perm[i], n); + data->permute(nested_perm); + } + + int compareAt(size_t p1, size_t p2, const IColumn & rhs_) const + { + const ColumnFixedArray & rhs = static_cast(rhs_); + + /// Не оптимально + for (size_t i = 0; i < n; ++i) + if (int res = data->compareAt(p1 * n + i, p2 * n + i, *rhs.data)) + return res; + + return 0; + } + size_t byteSize() { return data->byteSize() + sizeof(n); diff --git a/dbms/include/DB/Columns/ColumnFixedString.h b/dbms/include/DB/Columns/ColumnFixedString.h index c23c68ec25b..3d3b1e0cedf 100644 --- a/dbms/include/DB/Columns/ColumnFixedString.h +++ b/dbms/include/DB/Columns/ColumnFixedString.h @@ -50,6 +50,12 @@ public: { char_data.resize(char_data.size() + n); } + + int compareAt(size_t p1, size_t p2, const IColumn & rhs_) const + { + const ColumnFixedString & rhs = static_cast(rhs_); + return memcmp(&char_data[p1 * n], &rhs.char_data[p2 * n], n); + } }; diff --git a/dbms/include/DB/Columns/ColumnString.h b/dbms/include/DB/Columns/ColumnString.h index fe4f68cd888..23d6338015b 100644 --- a/dbms/include/DB/Columns/ColumnString.h +++ b/dbms/include/DB/Columns/ColumnString.h @@ -1,7 +1,6 @@ -#ifndef DBMS_CORE_COLUMN_STRING_H -#define DBMS_CORE_COLUMN_STRING_H +#pragma once -#include // memcpy +#include #include #include @@ -56,9 +55,25 @@ public: char_data.push_back(0); offsets.push_back(offsets.size() == 0 ? 1 : (offsets.back() + 1)); } + + int compareAt(size_t n, size_t m, const IColumn & rhs_) const + { + const ColumnString & rhs = static_cast(rhs_); + + size_t lhs_size = sizeAt(n); + size_t rhs_size = rhs.sizeAt(m); + size_t min_size = std::min(lhs_size, rhs_size); + if (size_t res = memcmp(&char_data[offsetAt(n)], &rhs.char_data[rhs.offsetAt(m)], min_size)) + return res; + + return lhs_size < rhs_size + ? -1 + : (lhs_size == rhs_size + ? 0 + : 1); + return 0; + } }; } - -#endif diff --git a/dbms/include/DB/Columns/ColumnVector.h b/dbms/include/DB/Columns/ColumnVector.h index e140a603a4b..6a7eb773ffa 100644 --- a/dbms/include/DB/Columns/ColumnVector.h +++ b/dbms/include/DB/Columns/ColumnVector.h @@ -103,6 +103,28 @@ public: return data.size() * sizeof(data[0]); } + void permute(const Permutation & perm) + { + size_t size = data.size(); + if (size != perm.size()) + throw Exception("Size of permutation doesn't match size of column.", ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH); + + Container_t tmp(size); + for (size_t i = 0; i < size; ++i) + tmp[i] = data[perm[i]]; + tmp.swap(data); + } + + int compareAt(size_t n, size_t m, const IColumn & rhs_) const + { + const ColumnVector & rhs = static_cast &>(rhs_); + return data[n] < rhs.data[m] + ? -1 + : (data[n] == rhs.data[m] + ? 0 + : 1); + } + /** Более эффективные методы манипуляции */ Container_t & getData() { diff --git a/dbms/include/DB/Columns/IColumn.h b/dbms/include/DB/Columns/IColumn.h index 342d3678630..ca4410430d8 100644 --- a/dbms/include/DB/Columns/IColumn.h +++ b/dbms/include/DB/Columns/IColumn.h @@ -68,6 +68,18 @@ public: typedef std::vector Filter; virtual void filter(const Filter & filt) = 0; + /** Переставить значения местами, используя указанную перестановку. + * Используется при сортировке. + */ + typedef std::vector Permutation; + virtual void permute(const Permutation & perm) = 0; + + /** Сравнить (*this)[n] и rhs[m]. + * Вернуть отрицательное число, 0, или положительное число, если меньше, равно, или больше, соответственно. + * Используется при сортировке. + */ + virtual int compareAt(size_t n, size_t m, const IColumn & rhs) const = 0; + /** Очистить */ virtual void clear() = 0; diff --git a/dbms/include/DB/Core/Types.h b/dbms/include/DB/Core/Types.h index 4afcb5f092e..36a6c9202cf 100644 --- a/dbms/include/DB/Core/Types.h +++ b/dbms/include/DB/Core/Types.h @@ -1,7 +1,7 @@ #pragma once #include -#include +#include #include #include @@ -12,7 +12,7 @@ namespace DB /** Типы данных для представления значений из БД в оперативке. */ -typedef boost::none_t Null; +BOOST_STRONG_TYPEDEF(char, Null); typedef Poco::UInt8 UInt8; typedef Poco::UInt16 UInt16; diff --git a/dbms/include/DB/DataStreams/PartialSortingBlockInputStream.h b/dbms/include/DB/DataStreams/PartialSortingBlockInputStream.h new file mode 100644 index 00000000000..3e26e015449 --- /dev/null +++ b/dbms/include/DB/DataStreams/PartialSortingBlockInputStream.h @@ -0,0 +1,27 @@ +#pragma once + +#include + +#include + + +namespace DB +{ + +/** Сортирует каждый блок по отдельности по значениям указанных столбцов. + * На данный момент, используется сильно неоптимальный алгоритм. + */ +class PartialSortingBlockInputStream : public IBlockInputStream +{ +public: + PartialSortingBlockInputStream(BlockInputStreamPtr input_, ColumnNumbers & column_numbers_) + : input(input_), column_numbers(column_numbers_) {} + + Block read(); + +private: + BlockInputStreamPtr input; + ColumnNumbers column_numbers; +}; + +} diff --git a/dbms/src/DataStreams/PartialSortingBlockInputStream.cpp b/dbms/src/DataStreams/PartialSortingBlockInputStream.cpp new file mode 100644 index 00000000000..6aee9b887b1 --- /dev/null +++ b/dbms/src/DataStreams/PartialSortingBlockInputStream.cpp @@ -0,0 +1,54 @@ +#include + + +namespace DB +{ + +struct PartialSortingLess +{ + typedef std::vector Columns; + Columns columns; + + PartialSortingLess(const Block & block, const ColumnNumbers & column_numbers) + { + for (size_t i = 0, size = column_numbers.size(); i < size; ++i) + columns.push_back(&*block.getByPosition(column_numbers[i]).column); + } + + bool operator() (size_t a, size_t b) const + { + for (Columns::const_iterator it = columns.begin(); it != columns.end(); ++it) + { + int res = (*it)->compareAt(a, b, **it); + if (res < 0) + return true; + else if (res > 0) + return false; + } + return false; + } +}; + + +Block PartialSortingBlockInputStream::read() +{ + Block res = input->read(); + if (!res) + return res; + + size_t size = res.rows(); + IColumn::Permutation perm(size); + for (size_t i = 0; i < size; ++i) + perm[i] = i; + + PartialSortingLess less(res, column_numbers); + std::sort(perm.begin(), perm.end(), less); + + size_t columns = res.columns(); + for (size_t i = 0; i < columns; ++i) + res.getByPosition(i).column->permute(perm); + + return res; +} + +} diff --git a/dbms/src/DataStreams/tests/partial_sorting_stream.cpp b/dbms/src/DataStreams/tests/partial_sorting_stream.cpp new file mode 100644 index 00000000000..1c8f79ecb4d --- /dev/null +++ b/dbms/src/DataStreams/tests/partial_sorting_stream.cpp @@ -0,0 +1,180 @@ +#include +#include + +#include + +#include +#include +#include + +#include + +#include + +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include + +#include +#include + + +using Poco::SharedPtr; + + +int main(int argc, char ** argv) +{ + try + { + typedef std::pair > NameAndTypePair; + typedef std::list NamesAndTypesList; + + NamesAndTypesList names_and_types_list; + + boost::assign::push_back(names_and_types_list) + ("WatchID", new DB::DataTypeUInt64) + ("JavaEnable", new DB::DataTypeUInt8) + ("Title", new DB::DataTypeString) + ("GoodEvent", new DB::DataTypeUInt32) + ("EventTime", new DB::DataTypeDateTime) + ("CounterID", new DB::DataTypeUInt32) + ("ClientIP", new DB::DataTypeUInt32) + ("RegionID", new DB::DataTypeUInt32) + ("UniqID", new DB::DataTypeUInt64) + ("CounterClass", new DB::DataTypeUInt8) + ("OS", new DB::DataTypeUInt8) + ("UserAgent", new DB::DataTypeUInt8) + ("URL", new DB::DataTypeString) + ("Referer", new DB::DataTypeString) + ("Refresh", new DB::DataTypeUInt8) + ("ResolutionWidth", new DB::DataTypeUInt16) + ("ResolutionHeight", new DB::DataTypeUInt16) + ("ResolutionDepth", new DB::DataTypeUInt8) + ("FlashMajor", new DB::DataTypeUInt8) + ("FlashMinor", new DB::DataTypeUInt8) + ("FlashMinor2", new DB::DataTypeString) + ("NetMajor", new DB::DataTypeUInt8) + ("NetMinor", new DB::DataTypeUInt8) + ("UserAgentMajor", new DB::DataTypeUInt16) + ("UserAgentMinor", new DB::DataTypeFixedString(2)) + ("CookieEnable", new DB::DataTypeUInt8) + ("JavascriptEnable", new DB::DataTypeUInt8) + ("IsMobile", new DB::DataTypeUInt8) + ("MobilePhone", new DB::DataTypeUInt8) + ("MobilePhoneModel", new DB::DataTypeString) + ("Params", new DB::DataTypeString) + ("IPNetworkID", new DB::DataTypeUInt32) + ("TraficSourceID", new DB::DataTypeInt8) + ("SearchEngineID", new DB::DataTypeUInt16) + ("SearchPhrase", new DB::DataTypeString) + ("AdvEngineID", new DB::DataTypeUInt8) + ("IsArtifical", new DB::DataTypeUInt8) + ("WindowClientWidth", new DB::DataTypeUInt16) + ("WindowClientHeight", new DB::DataTypeUInt16) + ("ClientTimeZone", new DB::DataTypeInt16) + ("ClientEventTime", new DB::DataTypeDateTime) + ("SilverlightVersion1", new DB::DataTypeUInt8) + ("SilverlightVersion2", new DB::DataTypeUInt8) + ("SilverlightVersion3", new DB::DataTypeUInt32) + ("SilverlightVersion4", new DB::DataTypeUInt16) + ("PageCharset", new DB::DataTypeString) + ("CodeVersion", new DB::DataTypeUInt32) + ("IsLink", new DB::DataTypeUInt8) + ("IsDownload", new DB::DataTypeUInt8) + ("IsNotBounce", new DB::DataTypeUInt8) + ("FUniqID", new DB::DataTypeUInt64) + ("OriginalURL", new DB::DataTypeString) + ("HID", new DB::DataTypeUInt32) + ("IsOldCounter", new DB::DataTypeUInt8) + ("IsEvent", new DB::DataTypeUInt8) + ("IsParameter", new DB::DataTypeUInt8) + ("DontCountHits", new DB::DataTypeUInt8) + ("WithHash", new DB::DataTypeUInt8) + ; + + SharedPtr names_and_types_map = new DB::NamesAndTypes; + + for (NamesAndTypesList::const_iterator it = names_and_types_list.begin(); it != names_and_types_list.end(); ++it) + names_and_types_map->insert(*it); + + DB::ParserSelectQuery parser; + DB::ASTPtr ast; + std::string input = "SELECT UniqID, URL, CounterID, IsLink"; + std::string expected; + + const char * begin = input.data(); + const char * end = begin + input.size(); + const char * pos = begin; + + if (!parser.parse(pos, end, ast, expected)) + { + std::cout << "Failed at position " << (pos - begin) << ": " + << mysqlxx::quote << input.substr(pos - begin, 10) + << ", expected " << expected << "." << std::endl; + } + + DB::formatAST(*ast, std::cerr); + std::cerr << std::endl; + std::cerr << ast->getTreeID() << std::endl; + + /// создаём объект существующей таблицы хит лога + + DB::StorageLog table("./", "HitLog", names_and_types_map, ".bin"); + + /// читаем из неё, сортируем, и пишем в tsv виде в консоль + + DB::Names column_names; + boost::assign::push_back(column_names) + ("UniqID") + ("URL") + ("CounterID") + ("IsLink") + ; + + Poco::SharedPtr result_types = new DB::DataTypes; + boost::assign::push_back(*result_types) + ((*names_and_types_map)["UniqID"]) + ((*names_and_types_map)["URL"]) + ((*names_and_types_map)["CounterID"]) + ((*names_and_types_map)["IsLink"]) + ; + + DB::ColumnNumbers sort_columns; + sort_columns.push_back(1); + sort_columns.push_back(2); + + Poco::SharedPtr in = table.read(column_names, 0); + Poco::SharedPtr profiling1 = new DB::ProfilingBlockInputStream(in); + in = profiling1; + in = new DB::PartialSortingBlockInputStream(in, sort_columns); + Poco::SharedPtr profiling2 = new DB::ProfilingBlockInputStream(in); + in = profiling2; + //in = new DB::LimitBlockInputStream(in, 10); + + DB::WriteBufferFromOStream ob(std::cout); + DB::TabSeparatedRowOutputStream out(ob, result_types); + + DB::copyData(*in, out); + + std::cerr << std::endl << "Reading: " << std::endl; + profiling1->getInfo().print(std::cerr); + std::cerr << std::endl << "Sorting: " << std::endl; + profiling2->getInfo().print(std::cerr); + } + catch (const DB::Exception & e) + { + std::cerr << e.what() << ", " << e.message() << std::endl; + return 1; + } + + return 0; +}