mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-09-21 09:10:48 +00:00
added collation [#CONV-7651]
This commit is contained in:
parent
8cc5acc6af
commit
f1c161842b
@ -5,6 +5,7 @@
|
||||
#include <DB/Core/Defines.h>
|
||||
|
||||
#include <DB/Columns/IColumn.h>
|
||||
#include <DB/Common/Collator.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -237,6 +238,16 @@ public:
|
||||
reinterpret_cast<const char *>(&chars[offsetAt(n)]),
|
||||
reinterpret_cast<const char *>(&rhs.chars[rhs.offsetAt(m)]));
|
||||
}
|
||||
|
||||
/// Версия compareAt для locale-sensitive сравнения строк
|
||||
int compareAt(size_t n, size_t m, const IColumn & rhs_, const Collator & collator) const
|
||||
{
|
||||
const ColumnString & rhs = static_cast<const ColumnString &>(rhs_);
|
||||
|
||||
return collator.compare(
|
||||
reinterpret_cast<const char *>(&chars[offsetAt(n)]), sizeAt(n),
|
||||
reinterpret_cast<const char *>(&rhs.chars[rhs.offsetAt(m)]), rhs.sizeAt(m));
|
||||
}
|
||||
|
||||
struct less
|
||||
{
|
||||
@ -260,6 +271,33 @@ public:
|
||||
std::sort(res.begin(), res.end(), less(*this));
|
||||
return res;
|
||||
}
|
||||
|
||||
struct lessWithCollation
|
||||
{
|
||||
const ColumnString & parent;
|
||||
const Collator & collator;
|
||||
|
||||
lessWithCollation(const ColumnString & parent_, const Collator & collator_) : parent(parent_), collator(collator_) {}
|
||||
|
||||
bool operator()(size_t lhs, size_t rhs) const
|
||||
{
|
||||
return collator.compare(
|
||||
reinterpret_cast<const char *>(&parent.chars[parent.offsetAt(lhs)]), parent.sizeAt(lhs),
|
||||
reinterpret_cast<const char *>(&parent.chars[parent.offsetAt(rhs)]), parent.sizeAt(rhs));
|
||||
}
|
||||
};
|
||||
|
||||
/// Сортировка с учетом Collation
|
||||
Permutation getPermutation(const Collator & collator) const
|
||||
{
|
||||
size_t s = offsets.size();
|
||||
Permutation res(s);
|
||||
for (size_t i = 0; i < s; ++i)
|
||||
res[i] = i;
|
||||
|
||||
std::sort(res.begin(), res.end(), lessWithCollation(*this, collator));
|
||||
return res;
|
||||
}
|
||||
|
||||
ColumnPtr replicate(const Offsets_t & replicate_offsets) const
|
||||
{
|
||||
|
60
dbms/include/DB/Common/Collator.h
Normal file
60
dbms/include/DB/Common/Collator.h
Normal file
@ -0,0 +1,60 @@
|
||||
#pragma once
|
||||
|
||||
#include <unicode/ucol.h>
|
||||
|
||||
#include <DB/Core/ErrorCodes.h>
|
||||
#include <DB/Core/Exception.h>
|
||||
#include <Yandex/Common.h>
|
||||
#include <Poco/String.h>
|
||||
#include <Poco/NumberFormatter.h>
|
||||
|
||||
class Collator
|
||||
{
|
||||
public:
|
||||
Collator(const std::string & locale_) : locale(Poco::toLower(locale_))
|
||||
{
|
||||
UErrorCode status = U_ZERO_ERROR;
|
||||
|
||||
collator = ucol_open(locale.c_str(), &status);
|
||||
if (status != U_ZERO_ERROR)
|
||||
{
|
||||
ucol_close(collator);
|
||||
throw DB::Exception("Unsupported collation locale: " + locale, DB::ErrorCodes::UNSUPPORTED_COLLATION_LOCALE);
|
||||
}
|
||||
}
|
||||
|
||||
~Collator()
|
||||
{
|
||||
ucol_close(collator);
|
||||
}
|
||||
|
||||
int compare(const char * str1, size_t length1, const char * str2, size_t length2) const
|
||||
{
|
||||
UCharIterator iter1, iter2;
|
||||
uiter_setUTF8(&iter1, str1, length1);
|
||||
uiter_setUTF8(&iter2, str2, length2);
|
||||
|
||||
UErrorCode status = U_ZERO_ERROR;
|
||||
UCollationResult compare_result = ucol_strcollIter(collator, &iter1, &iter2, &status);
|
||||
|
||||
if (status != U_ZERO_ERROR)
|
||||
throw DB::Exception("ICU collation comparison failed with error code: " + Poco::NumberFormatter::format(status),
|
||||
DB::ErrorCodes::COLLATION_COMPARISON_FAILED);
|
||||
|
||||
/** Значения enum UCollationResult совпадают с нужными нам:
|
||||
* UCOL_EQUAL = 0
|
||||
* UCOL_GREATER = 1
|
||||
* UCOL_LESS = -1
|
||||
*/
|
||||
return compare_result;
|
||||
}
|
||||
|
||||
const std::string & getLocale() const
|
||||
{
|
||||
return locale;
|
||||
}
|
||||
|
||||
private:
|
||||
std::string locale;
|
||||
UCollator * collator;
|
||||
};
|
@ -191,6 +191,8 @@ namespace ErrorCodes
|
||||
UNEXPECTED_EXPRESSION,
|
||||
ILLEGAL_AGGREGATION,
|
||||
UNSUPPORTED_MYISAM_BLOCK_TYPE,
|
||||
UNSUPPORTED_COLLATION_LOCALE,
|
||||
COLLATION_COMPARISON_FAILED,
|
||||
UNKNOWN_ACTION,
|
||||
|
||||
POCO_EXCEPTION = 1000,
|
||||
|
@ -5,6 +5,8 @@
|
||||
#include <DB/Core/Types.h>
|
||||
#include <DB/Core/Block.h>
|
||||
#include <DB/Columns/IColumn.h>
|
||||
#include <DB/Columns/ColumnString.h>
|
||||
#include <DB/Common/Collator.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -13,21 +15,24 @@ namespace DB
|
||||
/// Описание правила сортировки по одному столбцу.
|
||||
struct SortColumnDescription
|
||||
{
|
||||
String column_name; /// Имя столбца.
|
||||
size_t column_number; /// Номер столбца (используется, если не задано имя).
|
||||
int direction; /// 1 - по возрастанию, -1 - по убыванию.
|
||||
String column_name; /// Имя столбца.
|
||||
size_t column_number; /// Номер столбца (используется, если не задано имя).
|
||||
int direction; /// 1 - по возрастанию, -1 - по убыванию.
|
||||
Poco::SharedPtr<Collator> collator; /// Collator для locale-specific сортировки строк
|
||||
|
||||
SortColumnDescription(size_t column_number_, int direction_)
|
||||
: column_number(column_number_), direction(direction_) {}
|
||||
SortColumnDescription(size_t column_number_, int direction_, const Poco::SharedPtr<Collator> & collator_ = NULL)
|
||||
: column_number(column_number_), direction(direction_), collator(collator_) {}
|
||||
|
||||
SortColumnDescription(String column_name_, int direction_)
|
||||
: column_name(column_name_), column_number(0), direction(direction_) {}
|
||||
SortColumnDescription(String column_name_, int direction_, const Poco::SharedPtr<Collator> & collator_ = NULL)
|
||||
: column_name(column_name_), column_number(0), direction(direction_), collator(collator_) {}
|
||||
|
||||
/// Для IBlockInputStream.
|
||||
String getID() const
|
||||
{
|
||||
std::stringstream res;
|
||||
res << column_name << ", " << column_number << ", " << direction;
|
||||
if (!collator.isNull())
|
||||
res << ", collation locale: " << collator->getLocale();
|
||||
return res.str();
|
||||
}
|
||||
};
|
||||
@ -54,10 +59,18 @@ struct SortCursorImpl
|
||||
*/
|
||||
size_t order;
|
||||
|
||||
typedef std::vector<UInt8> NeedCollationFlags;
|
||||
|
||||
/** Нужно ли использовать Collator для сортировки столбца */
|
||||
NeedCollationFlags need_collation;
|
||||
|
||||
/** Есть ли хотя бы один столбец с Collator. */
|
||||
bool has_collation;
|
||||
|
||||
SortCursorImpl() {}
|
||||
|
||||
SortCursorImpl(const Block & block, const SortDescription & desc_, size_t order_ = 0)
|
||||
: desc(desc_), sort_columns_size(desc.size()), order(order_)
|
||||
: desc(desc_), sort_columns_size(desc.size()), order(order_), need_collation(desc.size()), has_collation(false)
|
||||
{
|
||||
reset(block);
|
||||
}
|
||||
@ -80,6 +93,9 @@ struct SortCursorImpl
|
||||
: desc[j].column_number;
|
||||
|
||||
sort_columns.push_back(&*block.getByPosition(column_number).column);
|
||||
|
||||
need_collation[j] = !desc[j].collator.isNull() && sort_columns.back()->getName() == "ColumnString";
|
||||
has_collation |= need_collation[j];
|
||||
}
|
||||
|
||||
pos = 0;
|
||||
@ -115,5 +131,39 @@ struct SortCursor
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
/// Отдельный компаратор для locale-sensitive сравнения строк
|
||||
struct SortCursorWithCollation
|
||||
{
|
||||
SortCursorImpl * impl;
|
||||
|
||||
SortCursorWithCollation(SortCursorImpl * impl_) : impl(impl_) {}
|
||||
SortCursorImpl * operator-> () { return impl; }
|
||||
const SortCursorImpl * operator-> () const { return impl; }
|
||||
|
||||
/// Инвертировано, чтобы из priority queue элементы вынимались в нужном порядке.
|
||||
bool operator< (const SortCursorWithCollation & rhs) const
|
||||
{
|
||||
for (size_t i = 0; i < impl->sort_columns_size; ++i)
|
||||
{
|
||||
int res;
|
||||
if (impl->need_collation[i])
|
||||
{
|
||||
const ColumnString & column_string = dynamic_cast<const ColumnString &>(*impl->sort_columns[i]);
|
||||
res = column_string.compareAt(impl->pos, rhs.impl->pos, *(rhs.impl->sort_columns[i]), *impl->desc[i].collator);
|
||||
}
|
||||
else
|
||||
res = impl->sort_columns[i]->compareAt(impl->pos, rhs.impl->pos, *(rhs.impl->sort_columns[i]));
|
||||
|
||||
res *= impl->desc[i].direction;
|
||||
if (res > 0)
|
||||
return true;
|
||||
if (res < 0)
|
||||
return false;
|
||||
}
|
||||
return impl->order > rhs.impl->order;
|
||||
}
|
||||
};
|
||||
|
||||
}
|
||||
|
||||
|
@ -70,16 +70,23 @@ private:
|
||||
size_t count_positive; /// Количество положительных строк для текущего первичного ключа.
|
||||
size_t count_negative; /// Количество отрицательных строк для текущего первичного ключа.
|
||||
|
||||
/** Делаем поддержку двух разных курсоров - с Collation и без.
|
||||
* Шаблоны используем вместо полиморфных SortCursor'ов и вызовов виртуальных функций.
|
||||
*/
|
||||
template<class TSortCursor>
|
||||
void merge(Block & merged_block, ColumnPlainPtrs & merged_columns, std::priority_queue<TSortCursor> & queue);
|
||||
|
||||
/// Сохранить строчку, на которую указывает cursor в row.
|
||||
void setRow(Row & row, SortCursor & cursor)
|
||||
template<class TSortCursor>
|
||||
void setRow(Row & row, TSortCursor & cursor)
|
||||
{
|
||||
for (size_t i = 0; i < num_columns; ++i)
|
||||
cursor->all_columns[i]->get(cursor->pos, row[i]);
|
||||
}
|
||||
|
||||
/// Сохранить первичный ключ, на который указывает cursor в row.
|
||||
void setPrimaryKey(Row & row, SortCursor & cursor)
|
||||
template<class TSortCursor>
|
||||
void setPrimaryKey(Row & row, TSortCursor & cursor)
|
||||
{
|
||||
for (size_t i = 0; i < cursor->sort_columns_size; ++i)
|
||||
cursor->sort_columns[i]->get(cursor->pos, row[i]);
|
||||
|
@ -49,6 +49,13 @@ private:
|
||||
/** Слить сразу много блоков с помощью priority queue.
|
||||
*/
|
||||
Block merge(Blocks & blocks);
|
||||
|
||||
typedef std::vector<SortCursorImpl> CursorImpls;
|
||||
|
||||
/** Делаем поддержку двух разных курсоров - с Collation и без.
|
||||
* Шаблоны используем вместо полиморфных SortCursor'ов и вызовов виртуальных функций.
|
||||
*/
|
||||
template<class TSortCursor> Block mergeImpl(Blocks & block, CursorImpls & cursors);
|
||||
};
|
||||
|
||||
}
|
||||
|
@ -18,7 +18,7 @@ class MergingSortedBlockInputStream : public IProfilingBlockInputStream
|
||||
{
|
||||
public:
|
||||
MergingSortedBlockInputStream(BlockInputStreams inputs_, SortDescription & description_, size_t max_block_size_)
|
||||
: description(description_), max_block_size(max_block_size_), first(true),
|
||||
: description(description_), max_block_size(max_block_size_), first(true), has_collation(false),
|
||||
num_columns(0), source_blocks(inputs_.size()), cursors(inputs_.size()), log(&Logger::get("MergingSortedBlockInputStream"))
|
||||
{
|
||||
children.insert(children.end(), inputs_.begin(), inputs_.end());
|
||||
@ -57,13 +57,16 @@ protected:
|
||||
void init(Block & merged_block, ColumnPlainPtrs & merged_columns);
|
||||
|
||||
/// Достаёт из источника, соответствующего current следующий блок.
|
||||
void fetchNextBlock(const SortCursor & current);
|
||||
template<class TSortCursor>
|
||||
void fetchNextBlock(const TSortCursor & current, std::priority_queue<TSortCursor> & queue);
|
||||
|
||||
|
||||
SortDescription description;
|
||||
size_t max_block_size;
|
||||
|
||||
bool first;
|
||||
|
||||
bool has_collation;
|
||||
|
||||
/// Текущие сливаемые блоки.
|
||||
size_t num_columns;
|
||||
@ -74,8 +77,21 @@ protected:
|
||||
|
||||
typedef std::priority_queue<SortCursor> Queue;
|
||||
Queue queue;
|
||||
|
||||
typedef std::priority_queue<SortCursorWithCollation> QueueWithCollation;
|
||||
QueueWithCollation queue_with_collation;
|
||||
|
||||
private:
|
||||
|
||||
/** Делаем поддержку двух разных курсоров - с Collation и без.
|
||||
* Шаблоны используем вместо полиморфных SortCursor'ов и вызовов виртуальных функций.
|
||||
*/
|
||||
template<class TSortCursor>
|
||||
void initQueue(std::priority_queue<TSortCursor> & queue);
|
||||
|
||||
template<class TSortCursor>
|
||||
void merge(Block & merged_block, ColumnPlainPtrs & merged_columns, std::priority_queue<TSortCursor> & queue);
|
||||
|
||||
Logger * log;
|
||||
};
|
||||
|
||||
|
@ -1,7 +1,7 @@
|
||||
#pragma once
|
||||
|
||||
#include <DB/Parsers/IAST.h>
|
||||
|
||||
#include <DB/Common/Collator.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -16,8 +16,14 @@ class ASTOrderByElement : public IAST
|
||||
public:
|
||||
int direction; /// 1, если ASC, -1, если DESC
|
||||
|
||||
/** Collator для locale-specific сортировки строк.
|
||||
* Если NULL, то производится сортировка по байтам.
|
||||
*/
|
||||
Poco::SharedPtr<Collator> collator;
|
||||
|
||||
ASTOrderByElement() {}
|
||||
ASTOrderByElement(StringRange range_, int direction_) : IAST(range_), direction(direction_) {}
|
||||
ASTOrderByElement(StringRange range_, int direction_, const Poco::SharedPtr<Collator> & collator_ = NULL)
|
||||
: IAST(range_), direction(direction_), collator(collator_) {}
|
||||
|
||||
/** Получить текст, который идентифицирует этот элемент. */
|
||||
String getID() const { return "OrderByElement"; }
|
||||
|
@ -60,7 +60,6 @@ Block CollapsingSortedBlockInputStream::readImpl()
|
||||
if (children.size() == 1)
|
||||
return children[0]->read();
|
||||
|
||||
size_t merged_rows = 0;
|
||||
Block merged_block;
|
||||
ColumnPlainPtrs merged_columns;
|
||||
|
||||
@ -78,11 +77,24 @@ Block CollapsingSortedBlockInputStream::readImpl()
|
||||
|
||||
sign_column_number = merged_block.getPositionByName(sign_column);
|
||||
}
|
||||
|
||||
if (has_collation)
|
||||
merge(merged_block, merged_columns, queue_with_collation);
|
||||
else
|
||||
merge(merged_block, merged_columns, queue);
|
||||
|
||||
return merged_block;
|
||||
}
|
||||
|
||||
template<class TSortCursor>
|
||||
void CollapsingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainPtrs & merged_columns, std::priority_queue<TSortCursor> & queue)
|
||||
{
|
||||
size_t merged_rows = 0;
|
||||
|
||||
/// Вынимаем строки в нужном порядке и кладём в merged_block, пока строк не больше max_block_size
|
||||
while (!queue.empty())
|
||||
{
|
||||
SortCursor current = queue.top();
|
||||
TSortCursor current = queue.top();
|
||||
queue.pop();
|
||||
|
||||
Int8 sign = get<Int64>((*current->all_columns[sign_column_number])[current->pos]);
|
||||
@ -125,18 +137,17 @@ Block CollapsingSortedBlockInputStream::readImpl()
|
||||
else
|
||||
{
|
||||
/// Достаём из соответствующего источника следующий блок, если есть.
|
||||
fetchNextBlock(current);
|
||||
fetchNextBlock(current, queue);
|
||||
}
|
||||
|
||||
if (merged_rows >= max_block_size)
|
||||
return merged_block;
|
||||
return;;
|
||||
}
|
||||
|
||||
/// Запишем данные для последнего визита.
|
||||
insertRows(merged_columns, merged_rows);
|
||||
|
||||
children.clear();
|
||||
return merged_block;
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -31,39 +31,59 @@ Block MergeSortingBlockInputStream::readImpl()
|
||||
return merge(blocks);
|
||||
}
|
||||
|
||||
|
||||
Block MergeSortingBlockInputStream::merge(Blocks & blocks)
|
||||
{
|
||||
Stopwatch watch;
|
||||
Block merged;
|
||||
|
||||
if (blocks.empty())
|
||||
return merged;
|
||||
return Block();
|
||||
|
||||
if (blocks.size() == 1)
|
||||
return blocks[0];
|
||||
|
||||
Stopwatch watch;
|
||||
|
||||
LOG_DEBUG(log, "Merge sorting");
|
||||
|
||||
merged = blocks[0].cloneEmpty();
|
||||
|
||||
typedef std::priority_queue<SortCursor> Queue;
|
||||
Queue queue;
|
||||
|
||||
typedef std::vector<SortCursorImpl> CursorImpls;
|
||||
|
||||
CursorImpls cursors(blocks.size());
|
||||
|
||||
bool has_collation = false;
|
||||
|
||||
size_t i = 0;
|
||||
size_t num_columns = blocks[0].columns();
|
||||
for (Blocks::const_iterator it = blocks.begin(); it != blocks.end(); ++it, ++i)
|
||||
{
|
||||
if (!*it)
|
||||
continue;
|
||||
|
||||
cursors[i] = SortCursorImpl(*it, description);
|
||||
queue.push(SortCursor(&cursors[i]));
|
||||
has_collation |= cursors[i].has_collation;
|
||||
}
|
||||
|
||||
Block merged;
|
||||
|
||||
if (has_collation)
|
||||
merged = mergeImpl<SortCursorWithCollation>(blocks, cursors);
|
||||
else
|
||||
merged = mergeImpl<SortCursor>(blocks, cursors);
|
||||
|
||||
LOG_DEBUG(log, std::fixed << std::setprecision(2)
|
||||
<< "Merge sorted " << blocks.size() << " blocks, " << merged.rows() << " rows"
|
||||
<< " in " << watch.elapsedSeconds() << " sec., "
|
||||
<< merged.rows() / watch.elapsedSeconds() << " rows/sec., "
|
||||
<< merged.bytes() / 1000000.0 / watch.elapsedSeconds() << " MiB/sec.");
|
||||
|
||||
return merged;
|
||||
}
|
||||
|
||||
template<class TSortCursor> Block MergeSortingBlockInputStream::mergeImpl(Blocks & blocks, CursorImpls & cursors)
|
||||
{
|
||||
Block merged = blocks[0].cloneEmpty();
|
||||
size_t num_columns = blocks[0].columns();
|
||||
|
||||
typedef std::priority_queue<TSortCursor> Queue;
|
||||
Queue queue;
|
||||
|
||||
for (size_t i = 0; i < cursors.size(); ++i)
|
||||
queue.push(TSortCursor(&cursors[i]));
|
||||
|
||||
ColumnPlainPtrs merged_columns;
|
||||
for (size_t i = 0; i < num_columns; ++i)
|
||||
merged_columns.push_back(&*merged.getByPosition(i).column);
|
||||
@ -71,7 +91,7 @@ Block MergeSortingBlockInputStream::merge(Blocks & blocks)
|
||||
/// Вынимаем строки в нужном порядке и кладём в merged.
|
||||
while (!queue.empty())
|
||||
{
|
||||
SortCursor current = queue.top();
|
||||
TSortCursor current = queue.top();
|
||||
queue.pop();
|
||||
|
||||
for (size_t i = 0; i < num_columns; ++i)
|
||||
@ -84,12 +104,6 @@ Block MergeSortingBlockInputStream::merge(Blocks & blocks)
|
||||
}
|
||||
}
|
||||
|
||||
LOG_DEBUG(log, std::fixed << std::setprecision(2)
|
||||
<< "Merge sorted " << blocks.size() << " blocks, " << merged.rows() << " rows"
|
||||
<< " in " << watch.elapsedSeconds() << " sec., "
|
||||
<< merged.rows() / watch.elapsedSeconds() << " rows/sec., "
|
||||
<< merged.bytes() / 1000000.0 / watch.elapsedSeconds() << " MiB/sec.");
|
||||
|
||||
return merged;
|
||||
}
|
||||
|
||||
|
@ -29,8 +29,13 @@ void MergingSortedBlockInputStream::init(Block & merged_block, ColumnPlainPtrs &
|
||||
num_columns = source_blocks[0].columns();
|
||||
|
||||
cursors[i] = SortCursorImpl(*it, description, i);
|
||||
queue.push(SortCursor(&cursors[i]));
|
||||
has_collation |= cursors[i].has_collation;
|
||||
}
|
||||
|
||||
if (has_collation)
|
||||
initQueue(queue_with_collation);
|
||||
else
|
||||
initQueue(queue);
|
||||
}
|
||||
|
||||
/// Инициализируем результат.
|
||||
@ -74,6 +79,14 @@ void MergingSortedBlockInputStream::init(Block & merged_block, ColumnPlainPtrs &
|
||||
for (size_t i = 0; i < num_columns; ++i)
|
||||
merged_columns.push_back(&*merged_block.getByPosition(i).column);
|
||||
}
|
||||
|
||||
|
||||
template<class TSortCursor>
|
||||
void MergingSortedBlockInputStream::initQueue(std::priority_queue<TSortCursor> & queue)
|
||||
{
|
||||
for (size_t i = 0; i < cursors.size(); ++i)
|
||||
queue.push(TSortCursor(&cursors[i]));
|
||||
}
|
||||
|
||||
|
||||
Block MergingSortedBlockInputStream::readImpl()
|
||||
@ -84,18 +97,30 @@ Block MergingSortedBlockInputStream::readImpl()
|
||||
if (children.size() == 1)
|
||||
return children[0]->read();
|
||||
|
||||
size_t merged_rows = 0;
|
||||
Block merged_block;
|
||||
ColumnPlainPtrs merged_columns;
|
||||
|
||||
init(merged_block, merged_columns);
|
||||
if (merged_columns.empty())
|
||||
return Block();
|
||||
|
||||
if (has_collation)
|
||||
merge(merged_block, merged_columns, queue_with_collation);
|
||||
else
|
||||
merge(merged_block, merged_columns, queue);
|
||||
|
||||
return merged_block;
|
||||
}
|
||||
|
||||
/// Вынимаем строки в нужном порядке и кладём в merged_block, пока строк не больше max_block_size
|
||||
template<class TSortCursor>
|
||||
void MergingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainPtrs & merged_columns, std::priority_queue<TSortCursor> & queue)
|
||||
{
|
||||
size_t merged_rows = 0;
|
||||
|
||||
/// Вынимаем строки в нужном порядке и кладём в merged_block, пока строк не больше max_block_size
|
||||
while (!queue.empty())
|
||||
{
|
||||
SortCursor current = queue.top();
|
||||
TSortCursor current = queue.top();
|
||||
queue.pop();
|
||||
|
||||
for (size_t i = 0; i < num_columns; ++i)
|
||||
@ -109,20 +134,20 @@ Block MergingSortedBlockInputStream::readImpl()
|
||||
else
|
||||
{
|
||||
/// Достаём из соответствующего источника следующий блок, если есть.
|
||||
fetchNextBlock(current);
|
||||
fetchNextBlock(current, queue);
|
||||
}
|
||||
|
||||
++merged_rows;
|
||||
if (merged_rows == max_block_size)
|
||||
return merged_block;
|
||||
return;
|
||||
}
|
||||
|
||||
children.clear();
|
||||
return merged_block;
|
||||
}
|
||||
|
||||
|
||||
void MergingSortedBlockInputStream::fetchNextBlock(const SortCursor & current)
|
||||
template<class TSortCursor>
|
||||
void MergingSortedBlockInputStream::fetchNextBlock(const TSortCursor & current, std::priority_queue<TSortCursor> & queue)
|
||||
{
|
||||
size_t i = 0;
|
||||
size_t size = cursors.size();
|
||||
@ -134,7 +159,7 @@ void MergingSortedBlockInputStream::fetchNextBlock(const SortCursor & current)
|
||||
if (source_blocks[i])
|
||||
{
|
||||
cursors[i].reset(source_blocks[i]);
|
||||
queue.push(SortCursor(&cursors[i]));
|
||||
queue.push(TSortCursor(&cursors[i]));
|
||||
}
|
||||
|
||||
break;
|
||||
|
@ -3,27 +3,56 @@
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
typedef std::vector<std::pair<const IColumn *, SortColumnDescription> > ColumnsWithSortDescriptions;
|
||||
|
||||
|
||||
static inline bool needCollation(const IColumn * column, const SortColumnDescription & description)
|
||||
{
|
||||
return !description.collator.isNull() && column->getName() == "ColumnString";
|
||||
}
|
||||
|
||||
|
||||
struct PartialSortingLess
|
||||
{
|
||||
typedef std::vector<std::pair<const IColumn *, int> > Columns;
|
||||
Columns columns;
|
||||
|
||||
PartialSortingLess(const Block & block, const SortDescription & description)
|
||||
{
|
||||
for (size_t i = 0, size = description.size(); i < size; ++i)
|
||||
columns.push_back(std::make_pair(
|
||||
!description[i].column_name.empty()
|
||||
? &*block.getByName(description[i].column_name).column
|
||||
: &*block.getByPosition(description[i].column_number).column,
|
||||
description[i].direction));
|
||||
}
|
||||
const ColumnsWithSortDescriptions & columns;
|
||||
|
||||
PartialSortingLess(const ColumnsWithSortDescriptions & columns_) : columns(columns_) {}
|
||||
|
||||
bool operator() (size_t a, size_t b) const
|
||||
{
|
||||
for (Columns::const_iterator it = columns.begin(); it != columns.end(); ++it)
|
||||
for (ColumnsWithSortDescriptions::const_iterator it = columns.begin(); it != columns.end(); ++it)
|
||||
{
|
||||
int res = it->second * it->first->compareAt(a, b, *it->first);
|
||||
int res = it->second.direction * it->first->compareAt(a, b, *it->first);
|
||||
if (res < 0)
|
||||
return true;
|
||||
else if (res > 0)
|
||||
return false;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
struct PartialSortingLessWithCollation
|
||||
{
|
||||
const ColumnsWithSortDescriptions & columns;
|
||||
|
||||
PartialSortingLessWithCollation(const ColumnsWithSortDescriptions & columns_) : columns(columns_) {}
|
||||
|
||||
bool operator() (size_t a, size_t b) const
|
||||
{
|
||||
for (ColumnsWithSortDescriptions::const_iterator it = columns.begin(); it != columns.end(); ++it)
|
||||
{
|
||||
int res;
|
||||
if (needCollation(it->first, it->second))
|
||||
{
|
||||
const ColumnString & column_string = dynamic_cast<const ColumnString &>(*it->first);
|
||||
res = column_string.compareAt(a, b, *it->first, *it->second.collator);
|
||||
}
|
||||
else
|
||||
res = it->first->compareAt(a, b, *it->first);
|
||||
|
||||
res *= it->second.direction;
|
||||
if (res < 0)
|
||||
return true;
|
||||
else if (res > 0)
|
||||
@ -42,9 +71,18 @@ void sortBlock(Block & block, const SortDescription & description)
|
||||
/// Если столбец сортировки один
|
||||
if (description.size() == 1)
|
||||
{
|
||||
IColumn::Permutation perm = (!description[0].column_name.empty()
|
||||
IColumn * column = !description[0].column_name.empty()
|
||||
? block.getByName(description[0].column_name).column
|
||||
: block.getByPosition(description[0].column_number).column)->getPermutation();
|
||||
: block.getByPosition(description[0].column_number).column;
|
||||
|
||||
IColumn::Permutation perm;
|
||||
if (needCollation(column, description[0]))
|
||||
{
|
||||
const ColumnString & column_string = dynamic_cast<const ColumnString &>(*column);
|
||||
perm = column_string.getPermutation(*description[0].collator);
|
||||
}
|
||||
else
|
||||
perm = column->getPermutation();
|
||||
|
||||
if (description[0].direction == -1)
|
||||
for (size_t i = 0, size = perm.size(); i < size / 2; ++i)
|
||||
@ -60,9 +98,32 @@ void sortBlock(Block & block, const SortDescription & description)
|
||||
IColumn::Permutation perm(size);
|
||||
for (size_t i = 0; i < size; ++i)
|
||||
perm[i] = i;
|
||||
|
||||
bool need_collation = false;
|
||||
ColumnsWithSortDescriptions columns_with_sort_desc;
|
||||
|
||||
for (size_t i = 0, size = description.size(); i < size; ++i)
|
||||
{
|
||||
IColumn * column = !description[i].column_name.empty()
|
||||
? block.getByName(description[i].column_name).column
|
||||
: block.getByPosition(description[i].column_number).column;
|
||||
|
||||
columns_with_sort_desc.push_back(std::make_pair(column, description[i]));
|
||||
|
||||
if (needCollation(column, description[i]))
|
||||
need_collation = true;
|
||||
}
|
||||
|
||||
PartialSortingLess less(block, description);
|
||||
std::sort(perm.begin(), perm.end(), less);
|
||||
if (need_collation)
|
||||
{
|
||||
PartialSortingLessWithCollation less_with_collation(columns_with_sort_desc);
|
||||
std::sort(perm.begin(), perm.end(), less_with_collation);
|
||||
}
|
||||
else
|
||||
{
|
||||
PartialSortingLess less(columns_with_sort_desc);
|
||||
std::sort(perm.begin(), perm.end(), less);
|
||||
}
|
||||
|
||||
size_t columns = block.columns();
|
||||
for (size_t i = 0; i < columns; ++i)
|
||||
|
@ -448,6 +448,8 @@ bool ParserOrderByElement::parseImpl(Pos & pos, Pos end, ASTPtr & node, String &
|
||||
ParserString descending("DESCENDING", true, true);
|
||||
ParserString asc("ASC", true, true);
|
||||
ParserString desc("DESC", true, true);
|
||||
ParserString collate("COLLATE", true, true);
|
||||
ParserStringLiteral collate_locale_parser;
|
||||
|
||||
ASTPtr expr_elem;
|
||||
if (!elem_p.parse(pos, end, expr_elem, expected))
|
||||
@ -460,8 +462,27 @@ bool ParserOrderByElement::parseImpl(Pos & pos, Pos end, ASTPtr & node, String &
|
||||
direction = -1;
|
||||
else
|
||||
ascending.ignore(pos, end) || asc.ignore(pos, end);
|
||||
|
||||
Poco::SharedPtr<Collator> collator = NULL;
|
||||
if (collate.ignore(pos, end))
|
||||
{
|
||||
ASTPtr locale_node;
|
||||
if (!collate_locale_parser.parse(pos, end, locale_node, expected))
|
||||
return false;
|
||||
|
||||
const String & locale = dynamic_cast<const ASTLiteral &>(*locale_node).value.safeGet<String>();
|
||||
try
|
||||
{
|
||||
collator = new Collator(locale);
|
||||
}
|
||||
catch (const DB::Exception & e)
|
||||
{
|
||||
expected = "unsupported collation locale";
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
node = new ASTOrderByElement(StringRange(begin, pos), direction);
|
||||
node = new ASTOrderByElement(StringRange(begin, pos), direction, collator);
|
||||
node->children.push_back(expr_elem);
|
||||
return true;
|
||||
}
|
||||
|
@ -30,6 +30,7 @@ bool ParserSelectQuery::parseImpl(Pos & pos, Pos end, ASTPtr & node, String & ex
|
||||
ParserString s_totals("TOTALS", true, true);
|
||||
ParserString s_having("HAVING", true, true);
|
||||
ParserString s_order("ORDER", true, true);
|
||||
ParserString s_collate("COLLATE", true, true);
|
||||
ParserString s_limit("LIMIT", true, true);
|
||||
ParserString s_format("FORMAT", true, true);
|
||||
ParserNotEmptyExpressionList exp_list;
|
||||
|
Loading…
Reference in New Issue
Block a user