dbms: development [#CONV-2944].

This commit is contained in:
Alexey Milovidov 2012-08-13 19:13:11 +00:00
parent cf9279b789
commit 598614b624
2 changed files with 113 additions and 101 deletions

View File

@ -94,7 +94,7 @@ public:
*/ */
bool optimize() bool optimize()
{ {
return merge(false); return merge(1, false);
} }
// void drop(); // void drop();
@ -224,9 +224,10 @@ private:
void getIndexRanges(ASTPtr & query, Range & date_range, Row & primary_prefix, Range & primary_range); void getIndexRanges(ASTPtr & query, Range & date_range, Row & primary_prefix, Range & primary_range);
/// Определяет, какие куски нужно объединять, и запускает их слияние в отдельном потоке. /// Определяет, какие куски нужно объединять, и запускает их слияние в отдельном потоке.
bool merge(bool async = true); bool merge(size_t iterations = 1, bool async = true);
void mergeThread(size_t iterations, bool async, bool & merged_any);
bool selectPartsToMerge(DataPartPtr & left, DataPartPtr & right); bool selectPartsToMerge(DataPartPtr & left, DataPartPtr & right);
void mergeImpl(DataPartPtr left, DataPartPtr right); void mergeParts(DataPartPtr left, DataPartPtr right);
boost::thread merge_thread; boost::thread merge_thread;
Poco::Mutex merge_mutex; Poco::Mutex merge_mutex;

View File

@ -108,7 +108,8 @@ public:
void writeSuffix() void writeSuffix()
{ {
storage.merge(); /// Если на каждую запись делать по две итерации слияния, то дерево будет максимально компактно.
storage.merge(2);
} }
BlockOutputStreamPtr clone() { return new MergeTreeBlockOutputStream(storage); } BlockOutputStreamPtr clone() { return new MergeTreeBlockOutputStream(storage); }
@ -1058,7 +1059,7 @@ void StorageMergeTree::clearOldParts()
} }
bool StorageMergeTree::merge(bool async) bool StorageMergeTree::merge(size_t iterations, bool async)
{ {
Poco::ScopedTry<Poco::Mutex> lock; Poco::ScopedTry<Poco::Mutex> lock;
@ -1069,23 +1070,60 @@ bool StorageMergeTree::merge(bool async)
return false; return false;
} }
DataPartPtr left;
DataPartPtr right;
if (merge_thread.joinable()) if (merge_thread.joinable())
merge_thread.join(); merge_thread.join();
if (selectPartsToMerge(left, right)) bool merged_any = false;
if (async)
merge_thread = boost::thread(boost::bind(&StorageMergeTree::mergeThread, this, iterations, async, boost::ref(merged_any)));
else
mergeThread(iterations, async, merged_any);
return merged_any;
}
void StorageMergeTree::mergeThread(size_t iterations, bool async, bool & merged_any)
{
Poco::ScopedLock<Poco::Mutex> lock(merge_mutex);
try
{ {
if (async) DataPartPtr left;
merge_thread = boost::thread(boost::bind(&StorageMergeTree::mergeImpl, this, left, right)); DataPartPtr right;
else
mergeImpl(left, right);
return true; merged_any = false;
for (size_t i = 0; i < iterations && selectPartsToMerge(left, right); ++i)
{
mergeParts(left, right);
merged_any = true;
/// Удаляем старые куски.
left = NULL;
right = NULL;
clearOldParts();
}
}
catch (const Exception & e)
{
LOG_ERROR(log, "Code: " << e.code() << ". " << e.displayText() << std::endl
<< std::endl
<< "Stack trace:" << std::endl
<< e.getStackTrace().toString());
}
catch (const Poco::Exception & e)
{
LOG_ERROR(log, "Poco::Exception: " << e.code() << ". " << e.displayText());
}
catch (const std::exception & e)
{
LOG_ERROR(log, "std::exception: " << e.what());
}
catch (...)
{
LOG_ERROR(log, "Unknown exception");
} }
return false;
} }
@ -1184,95 +1222,68 @@ bool StorageMergeTree::selectPartsToMerge(DataPartPtr & left, DataPartPtr & righ
} }
void StorageMergeTree::mergeImpl(DataPartPtr left, DataPartPtr right) void StorageMergeTree::mergeParts(DataPartPtr left, DataPartPtr right)
{ {
Poco::ScopedLock<Poco::Mutex> lock(merge_mutex); LOG_DEBUG(log, "Merging parts " << left->name << " with " << right->name);
Names all_column_names;
for (NamesAndTypesList::const_iterator it = columns->begin(); it != columns->end(); ++it)
all_column_names.push_back(it->first);
Yandex::DateLUTSingleton & date_lut = Yandex::DateLUTSingleton::instance();
StorageMergeTree::DataPartPtr new_data_part = new DataPart(*this);
new_data_part->left_date = left->left_date;
new_data_part->right_date = right->right_date;
new_data_part->left = left->left;
new_data_part->right = right->right;
new_data_part->level = 1 + std::max(left->level, right->level);
new_data_part->name = getPartName(
new_data_part->left_date, new_data_part->right_date, new_data_part->left, new_data_part->right, new_data_part->level);
new_data_part->size = left->size + right->size;
new_data_part->left_month = date_lut.toFirstDayOfMonth(new_data_part->left_date);
new_data_part->right_month = date_lut.toFirstDayOfMonth(new_data_part->right_date);
/** Читаем из левого и правого куска, сливаем и пишем в новый.
* Попутно вычисляем выражение для сортировки.
*/
BlockInputStreams src_streams;
Row empty_prefix;
Range empty_range;
src_streams.push_back(new ExpressionBlockInputStream(new MergeTreeBlockInputStream(
full_path + left->name + '/', DEFAULT_BLOCK_SIZE, all_column_names, *this, left, empty_prefix, empty_range), primary_expr));
src_streams.push_back(new ExpressionBlockInputStream(new MergeTreeBlockInputStream(
full_path + right->name + '/', DEFAULT_BLOCK_SIZE, all_column_names, *this, right, empty_prefix, empty_range), primary_expr));
BlockInputStreamPtr merged_stream = new MergingSortedBlockInputStream(src_streams, sort_descr, DEFAULT_BLOCK_SIZE);
BlockOutputStreamPtr to = new MergedBlockOutputStream(*this,
left->left_date, right->right_date, left->left, right->right, 1 + std::max(left->level, right->level));
copyData(*merged_stream, *to);
new_data_part->modification_time = time(0);
try
{ {
LOG_DEBUG(log, "Merging parts " << left->name << " with " << right->name); Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
Poco::ScopedLock<Poco::FastMutex> lock_all(all_data_parts_mutex);
Names all_column_names; /// Добавляем новый кусок в набор.
for (NamesAndTypesList::const_iterator it = columns->begin(); it != columns->end(); ++it) if (data_parts.end() == data_parts.find(left))
all_column_names.push_back(it->first); throw Exception("Logical error: cannot find data part " + left->name + " in list", ErrorCodes::LOGICAL_ERROR);
if (data_parts.end() == data_parts.find(right))
throw Exception("Logical error: cannot find data part " + right->name + " in list", ErrorCodes::LOGICAL_ERROR);
Yandex::DateLUTSingleton & date_lut = Yandex::DateLUTSingleton::instance(); data_parts.insert(new_data_part);
data_parts.erase(data_parts.find(left));
data_parts.erase(data_parts.find(right));
StorageMergeTree::DataPartPtr new_data_part = new DataPart(*this); all_data_parts.insert(new_data_part);
new_data_part->left_date = left->left_date;
new_data_part->right_date = right->right_date;
new_data_part->left = left->left;
new_data_part->right = right->right;
new_data_part->level = 1 + std::max(left->level, right->level);
new_data_part->name = getPartName(
new_data_part->left_date, new_data_part->right_date, new_data_part->left, new_data_part->right, new_data_part->level);
new_data_part->size = left->size + right->size;
new_data_part->left_month = date_lut.toFirstDayOfMonth(new_data_part->left_date);
new_data_part->right_month = date_lut.toFirstDayOfMonth(new_data_part->right_date);
/** Читаем из левого и правого куска, сливаем и пишем в новый.
* Попутно вычисляем выражение для сортировки.
*/
BlockInputStreams src_streams;
Row empty_prefix;
Range empty_range;
src_streams.push_back(new ExpressionBlockInputStream(new MergeTreeBlockInputStream(
full_path + left->name + '/', DEFAULT_BLOCK_SIZE, all_column_names, *this, left, empty_prefix, empty_range), primary_expr));
src_streams.push_back(new ExpressionBlockInputStream(new MergeTreeBlockInputStream(
full_path + right->name + '/', DEFAULT_BLOCK_SIZE, all_column_names, *this, right, empty_prefix, empty_range), primary_expr));
BlockInputStreamPtr merged_stream = new MergingSortedBlockInputStream(src_streams, sort_descr, DEFAULT_BLOCK_SIZE);
BlockOutputStreamPtr to = new MergedBlockOutputStream(*this,
left->left_date, right->right_date, left->left, right->right, 1 + std::max(left->level, right->level));
copyData(*merged_stream, *to);
new_data_part->modification_time = time(0);
{
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
Poco::ScopedLock<Poco::FastMutex> lock_all(all_data_parts_mutex);
/// Добавляем новый кусок в набор.
if (data_parts.end() == data_parts.find(left))
throw Exception("Logical error: cannot find data part " + left->name + " in list", ErrorCodes::LOGICAL_ERROR);
if (data_parts.end() == data_parts.find(right))
throw Exception("Logical error: cannot find data part " + right->name + " in list", ErrorCodes::LOGICAL_ERROR);
data_parts.insert(new_data_part);
data_parts.erase(data_parts.find(left));
data_parts.erase(data_parts.find(right));
all_data_parts.insert(new_data_part);
}
LOG_TRACE(log, "Merged parts " << left->name << " with " << right->name);
/// Удаляем старые куски.
clearOldParts();
}
catch (const Exception & e)
{
LOG_ERROR(log, "Code: " << e.code() << ". " << e.displayText() << std::endl
<< std::endl
<< "Stack trace:" << std::endl
<< e.getStackTrace().toString());
}
catch (const Poco::Exception & e)
{
LOG_ERROR(log, "Poco::Exception: " << e.code() << ". " << e.displayText());
}
catch (const std::exception & e)
{
LOG_ERROR(log, "std::exception: " << e.what());
}
catch (...)
{
LOG_ERROR(log, "Unknown exception");
} }
LOG_TRACE(log, "Merged parts " << left->name << " with " << right->name);
} }
} }