This commit is contained in:
Alexey Milovidov 2013-12-09 00:29:24 +00:00
parent 489696281e
commit 9690a3003b
6 changed files with 82 additions and 58 deletions

View File

@ -73,12 +73,15 @@ public:
} }
/// Получает набор диапазонов засечек, вне которых не могут находиться ключи из заданного диапазона. /// Получает набор диапазонов засечек, вне которых не могут находиться ключи из заданного диапазона.
static MarkRanges markRangesFromPkRange(const String & path, static MarkRanges markRangesFromPkRange(
size_t marks_count, const StorageMergeTree::DataPart::Index & index,
StorageMergeTree & storage, StorageMergeTree & storage,
PKCondition & key_condition) PKCondition & key_condition)
{ {
MarkRanges res; MarkRanges res;
size_t key_size = storage.sort_descr.size();
size_t marks_count = index.size() / key_size;
/// Если индекс не используется. /// Если индекс не используется.
if (key_condition.alwaysTrue()) if (key_condition.alwaysTrue())
@ -87,31 +90,11 @@ public:
} }
else else
{ {
/// Читаем индекс. /** В стеке всегда будут находиться непересекающиеся подозрительные отрезки, самый левый наверху (back).
typedef AutoArray<Row> Index; * На каждом шаге берем левый отрезок и проверяем, подходит ли он.
size_t key_size = storage.sort_descr.size(); * Если подходит, разбиваем его на более мелкие и кладем их в стек. Если нет - выбрасываем его.
Index index(marks_count); * Если отрезок уже длиной в одну засечку, добавляем его в ответ и выбрасываем.
for (size_t i = 0; i < marks_count; ++i) */
index[i].resize(key_size);
{
String index_path = path + "primary.idx";
ReadBufferFromFile index_file(index_path, std::min(static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(index_path).getSize()));
for (size_t i = 0; i < marks_count; ++i)
{
for (size_t j = 0; j < key_size; ++j)
storage.primary_key_sample.getByPosition(j).type->deserializeBinary(index[i][j], index_file);
}
if (!index_file.eof())
throw Exception("index file " + index_path + " is unexpectedly long", ErrorCodes::EXPECTED_END_OF_FILE);
}
/// В стеке всегда будут находиться непересекающиеся подозрительные отрезки, самый левый наверху (back).
/// На каждом шаге берем левый отрезок и проверяем, подходит ли он.
/// Если подходит, разбиваем его на более мелкие и кладем их в стек. Если нет - выбрасываем его.
/// Если отрезок уже длиной в одну засечку, добавляем его в ответ и выбрасываем.
std::vector<MarkRange> ranges_stack; std::vector<MarkRange> ranges_stack;
ranges_stack.push_back(MarkRange(0, marks_count)); ranges_stack.push_back(MarkRange(0, marks_count));
while (!ranges_stack.empty()) while (!ranges_stack.empty())
@ -121,9 +104,9 @@ public:
bool may_be_true; bool may_be_true;
if (range.end == marks_count) if (range.end == marks_count)
may_be_true = key_condition.mayBeTrueAfter(index[range.begin]); may_be_true = key_condition.mayBeTrueAfter(&index[range.begin * key_size]);
else else
may_be_true = key_condition.mayBeTrueInRange(index[range.begin], index[range.end]); may_be_true = key_condition.mayBeTrueInRange(&index[range.begin * key_size], &index[range.end * key_size]);
if (!may_be_true) if (!may_be_true)
continue; continue;
@ -132,23 +115,19 @@ public:
{ {
/// Увидели полезный промежуток между соседними засечками. Либо добавим его к последнему диапазону, либо начнем новый диапазон. /// Увидели полезный промежуток между соседними засечками. Либо добавим его к последнему диапазону, либо начнем новый диапазон.
if (res.empty() || range.begin - res.back().end > storage.min_marks_for_seek) if (res.empty() || range.begin - res.back().end > storage.min_marks_for_seek)
{
res.push_back(range); res.push_back(range);
}
else else
{
res.back().end = range.end; res.back().end = range.end;
}
} }
else else
{ {
/// Разбиваем отрезок и кладем результат в стек справа налево. /// Разбиваем отрезок и кладем результат в стек справа налево.
size_t step = (range.end - range.begin - 1) / storage.settings.coarse_index_granularity + 1; size_t step = (range.end - range.begin - 1) / storage.settings.coarse_index_granularity + 1;
size_t end; size_t end;
for (end = range.end; end > range.begin + step; end -= step) for (end = range.end; end > range.begin + step; end -= step)
{
ranges_stack.push_back(MarkRange(end - step, end)); ranges_stack.push_back(MarkRange(end - step, end));
}
ranges_stack.push_back(MarkRange(range.begin, end)); ranges_stack.push_back(MarkRange(range.begin, end));
} }
} }

View File

@ -104,10 +104,11 @@ private:
size_t rows = block.rows(); size_t rows = block.rows();
size_t columns = block.columns(); size_t columns = block.columns();
UInt64 part_id = storage.increment.get(false); UInt64 part_id = storage.increment.get(false);
size_t part_size = (rows + storage.index_granularity - 1) / storage.index_granularity;
String part_name = storage.getPartName( String part_name = storage.getPartName(
DayNum_t(min_date), DayNum_t(max_date), DayNum_t(min_date), DayNum_t(max_date),
part_id, part_id, 0); part_id, part_id, 0);
String part_tmp_path = storage.full_path + "tmp_" + part_name + "/"; String part_tmp_path = storage.full_path + "tmp_" + part_name + "/";
String part_res_path = storage.full_path + part_name + "/"; String part_res_path = storage.full_path + part_name + "/";
@ -128,6 +129,9 @@ private:
LOG_TRACE(storage.log, "Writing index."); LOG_TRACE(storage.log, "Writing index.");
/// Сначала пишем индекс. Индекс содержит значение PK для каждой index_granularity строки. /// Сначала пишем индекс. Индекс содержит значение PK для каждой index_granularity строки.
StorageMergeTree::DataPart::Index index_vec;
index_vec.reserve(part_size * storage.sort_descr.size());
{ {
WriteBufferFromFile index(part_tmp_path + "primary.idx", DBMS_DEFAULT_BUFFER_SIZE, flags); WriteBufferFromFile index(part_tmp_path + "primary.idx", DBMS_DEFAULT_BUFFER_SIZE, flags);
@ -141,8 +145,13 @@ private:
: &block.getByPosition(storage.sort_descr[i].column_number)); : &block.getByPosition(storage.sort_descr[i].column_number));
for (size_t i = 0; i < rows; i += storage.index_granularity) for (size_t i = 0; i < rows; i += storage.index_granularity)
{
for (PrimaryColumns::const_iterator it = primary_columns.begin(); it != primary_columns.end(); ++it) for (PrimaryColumns::const_iterator it = primary_columns.begin(); it != primary_columns.end(); ++it)
(*it)->type->serializeBinary((*(*it)->column)[i], index); {
index_vec.push_back((*(*it)->column)[i]);
(*it)->type->serializeBinary(index_vec.back(), index);
}
}
index.next(); index.next();
} }
@ -175,10 +184,11 @@ private:
new_data_part->right = part_id; new_data_part->right = part_id;
new_data_part->level = 0; new_data_part->level = 0;
new_data_part->name = part_name; new_data_part->name = part_name;
new_data_part->size = (rows + storage.index_granularity - 1) / storage.index_granularity; new_data_part->size = part_size;
new_data_part->modification_time = time(0); new_data_part->modification_time = time(0);
new_data_part->left_month = date_lut.toFirstDayNumOfMonth(new_data_part->left_date); new_data_part->left_month = date_lut.toFirstDayNumOfMonth(new_data_part->left_date);
new_data_part->right_month = date_lut.toFirstDayNumOfMonth(new_data_part->right_date); new_data_part->right_month = date_lut.toFirstDayNumOfMonth(new_data_part->right_date);
new_data_part->index.swap(index_vec);
storage.data_parts.insert(new_data_part); storage.data_parts.insert(new_data_part);
storage.all_data_parts.insert(new_data_part); storage.all_data_parts.insert(new_data_part);

View File

@ -158,11 +158,11 @@ public:
/// Выполнимо ли условие в диапазоне ключей. /// Выполнимо ли условие в диапазоне ключей.
/// left_pk и right_pk должны содержать все поля из sort_descr в соответствующем порядке. /// left_pk и right_pk должны содержать все поля из sort_descr в соответствующем порядке.
bool mayBeTrueInRange(const Row & left_pk, const Row & right_pk); bool mayBeTrueInRange(const Field * left_pk, const Field * right_pk);
/// Выполнимо ли условие в полубесконечном (не ограниченном справа) диапазоне ключей. /// Выполнимо ли условие в полубесконечном (не ограниченном справа) диапазоне ключей.
/// left_pk должен содержать все поля из sort_descr в соответствующем порядке. /// left_pk должен содержать все поля из sort_descr в соответствующем порядке.
bool mayBeTrueAfter(const Row & left_pk); bool mayBeTrueAfter(const Field * left_pk);
bool alwaysTrue() bool alwaysTrue()
{ {
@ -230,7 +230,7 @@ private:
typedef std::vector<RPNElement> RPN; typedef std::vector<RPNElement> RPN;
typedef std::map<String, size_t> ColumnIndices; typedef std::map<String, size_t> ColumnIndices;
bool mayBeTrueInRange(const Row & left_pk, const Row & right_pk, bool right_bounded); bool mayBeTrueInRange(const Field * left_pk, const Field * right_pk, bool right_bounded);
void traverseAST(ASTPtr & node, Block & block_with_constants); void traverseAST(ASTPtr & node, Block & block_with_constants);
bool atomFromAST(ASTPtr & node, Block & block_with_constants, RPNElement & out); bool atomFromAST(ASTPtr & node, Block & block_with_constants, RPNElement & out);

View File

@ -260,7 +260,11 @@ private:
/// Смотреть и изменять это поле следует под залоченным data_parts_mutex. /// Смотреть и изменять это поле следует под залоченным data_parts_mutex.
bool currently_merging; bool currently_merging;
/// NOTE можно загружать индекс и засечки в оперативку /// Первичный ключ. Всегда загружается в оперативку.
typedef std::vector<Field> Index;
Index index;
/// NOTE можно загружать засечки тоже в оперативку
void remove() const void remove() const
{ {
@ -318,6 +322,23 @@ private:
&& left <= rhs.left && left <= rhs.left
&& right >= rhs.right; && right >= rhs.right;
} }
/// Загрузить индекс.
void loadIndex()
{
size_t key_size = storage.sort_descr.size();
index.resize(key_size * size);
String index_path = storage.full_path + name + "/primary.idx";
ReadBufferFromFile index_file(index_path, std::min(static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(index_path).getSize()));
for (size_t i = 0; i < size; ++i)
for (size_t j = 0; j < key_size; ++j)
storage.primary_key_sample.getByPosition(j).type->deserializeBinary(index[i * key_size + j], index_file);
if (!index_file.eof())
throw Exception("index file " + index_path + " is unexpectedly long", ErrorCodes::EXPECTED_END_OF_FILE);
}
}; };
typedef SharedPtr<DataPart> DataPartPtr; typedef SharedPtr<DataPart> DataPartPtr;
@ -370,6 +391,7 @@ private:
bool use_uncompressed_cache, bool use_uncompressed_cache,
ExpressionActionsPtr prewhere_actions, ExpressionActionsPtr prewhere_actions,
const String & prewhere_column); const String & prewhere_column);
BlockInputStreams spreadMarkRangesAmongThreadsFinal( BlockInputStreams spreadMarkRangesAmongThreadsFinal(
RangesInDataParts parts, RangesInDataParts parts,
size_t threads, size_t threads,

View File

@ -241,7 +241,7 @@ struct BoolMask
} }
}; };
bool PKCondition::mayBeTrueInRange(const Row & left_pk, const Row & right_pk, bool right_bounded) bool PKCondition::mayBeTrueInRange(const Field * left_pk, const Field * right_pk, bool right_bounded)
{ {
/// Найдем диапазоны элементов ключа. /// Найдем диапазоны элементов ключа.
std::vector<Range> key_ranges(sort_descr.size(), Range()); std::vector<Range> key_ranges(sort_descr.size(), Range());
@ -311,14 +311,14 @@ bool PKCondition::mayBeTrueInRange(const Row & left_pk, const Row & right_pk, bo
return rpn_stack[0].can_be_true; return rpn_stack[0].can_be_true;
} }
bool PKCondition::mayBeTrueInRange(const Row & left_pk, const Row & right_pk) bool PKCondition::mayBeTrueInRange(const Field * left_pk, const Field * right_pk)
{ {
return mayBeTrueInRange(left_pk, right_pk, true); return mayBeTrueInRange(left_pk, right_pk, true);
} }
bool PKCondition::mayBeTrueAfter(const Row & left_pk) bool PKCondition::mayBeTrueAfter(const Field * left_pk)
{ {
return mayBeTrueInRange(left_pk, Row(), false); return mayBeTrueInRange(left_pk, NULL, false);
} }
} }

View File

@ -166,10 +166,15 @@ BlockInputStreams StorageMergeTree::read(
/// Выберем куски, в которых могут быть данные, удовлетворяющие date_condition. /// Выберем куски, в которых могут быть данные, удовлетворяющие date_condition.
{ {
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex); Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
for (DataParts::iterator it = data_parts.begin(); it != data_parts.end(); ++it) for (DataParts::iterator it = data_parts.begin(); it != data_parts.end(); ++it)
if (date_condition.mayBeTrueInRange(Row(1, static_cast<UInt64>((*it)->left_date)),Row(1, static_cast<UInt64>((*it)->right_date)))) {
Field left = static_cast<UInt64>((*it)->left_date);
Field right = static_cast<UInt64>((*it)->right_date);
if (date_condition.mayBeTrueInRange(&left, &right))
parts.push_back(*it); parts.push_back(*it);
}
} }
/// Семплирование. /// Семплирование.
@ -196,10 +201,8 @@ BlockInputStreams StorageMergeTree::read(
for (size_t i = 0; i < parts.size(); ++i) for (size_t i = 0; i < parts.size(); ++i)
{ {
DataPartPtr & part = parts[i]; DataPartPtr & part = parts[i];
MarkRanges ranges = MergeTreeBlockInputStream::markRangesFromPkRange(full_path + part->name + '/', MarkRanges ranges = MergeTreeBlockInputStream::markRangesFromPkRange(part->index, *this, key_condition);
part->size,
*this,
key_condition);
for (size_t j = 0; j < ranges.size(); ++j) for (size_t j = 0; j < ranges.size(); ++j)
total_count += ranges[j].end - ranges[j].begin; total_count += ranges[j].end - ranges[j].begin;
} }
@ -272,10 +275,8 @@ BlockInputStreams StorageMergeTree::read(
{ {
DataPartPtr & part = parts[i]; DataPartPtr & part = parts[i];
RangesInDataPart ranges(part); RangesInDataPart ranges(part);
ranges.ranges = MergeTreeBlockInputStream::markRangesFromPkRange(full_path + part->name + '/', ranges.ranges = MergeTreeBlockInputStream::markRangesFromPkRange(part->index, *this, key_condition);
part->size,
*this,
key_condition);
if (!ranges.ranges.empty()) if (!ranges.ranges.empty())
{ {
parts_with_ranges.push_back(ranges); parts_with_ranges.push_back(ranges);
@ -638,6 +639,17 @@ void StorageMergeTree::loadDataParts()
part->modification_time = Poco::File(full_path + file_name).getLastModified().epochTime(); part->modification_time = Poco::File(full_path + file_name).getLastModified().epochTime();
try
{
part->loadIndex();
}
catch (...)
{
/// Не будем вставлять в набор кусок с битым индексом. Пропустим кусок и позволим серверу запуститься.
tryLogCurrentException(__PRETTY_FUNCTION__);
continue;
}
data_parts.insert(part); data_parts.insert(part);
} }
@ -1070,6 +1082,7 @@ void StorageMergeTree::mergeParts(std::vector<DataPartPtr> parts)
new_data_part->size = to->marksCount(); new_data_part->size = to->marksCount();
new_data_part->modification_time = time(0); new_data_part->modification_time = time(0);
new_data_part->loadIndex(); /// NOTE Только что записанный индекс заново считывается с диска. Можно было бы формировать его сразу при записи.
{ {
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex); Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);