ClickHouse/dbms/src/Storages/StorageMergeTree.cpp

1476 lines
53 KiB
C++
Raw Normal View History

2012-07-18 19:16:16 +00:00
#include <boost/bind.hpp>
2012-11-28 08:52:15 +00:00
#include <numeric>
2012-07-17 20:04:39 +00:00
2012-07-19 20:32:10 +00:00
#include <Poco/DirectoryIterator.h>
#include <Poco/NumberParser.h>
2012-11-28 08:52:15 +00:00
#include <Poco/Ext/scopedTry.h>
2012-07-19 20:32:10 +00:00
#include <Yandex/time2str.h>
2012-07-17 20:04:39 +00:00
#include <DB/Common/escapeForFileName.h>
#include <DB/IO/WriteBufferFromString.h>
#include <DB/IO/WriteBufferFromFile.h>
#include <DB/IO/CompressedWriteBuffer.h>
2012-07-21 03:45:48 +00:00
#include <DB/IO/ReadBufferFromString.h>
2012-07-19 20:32:10 +00:00
#include <DB/IO/ReadBufferFromFile.h>
#include <DB/IO/CompressedReadBuffer.h>
2012-07-17 20:04:39 +00:00
#include <DB/Columns/ColumnsNumber.h>
2012-08-30 17:43:31 +00:00
#include <DB/Columns/ColumnArray.h>
2012-07-17 20:04:39 +00:00
2012-07-21 03:45:48 +00:00
#include <DB/DataTypes/DataTypesNumberFixed.h>
2012-08-30 17:43:31 +00:00
#include <DB/DataTypes/DataTypeArray.h>
2012-07-21 03:45:48 +00:00
2012-07-19 20:32:10 +00:00
#include <DB/DataStreams/IProfilingBlockInputStream.h>
2012-07-30 20:32:36 +00:00
#include <DB/DataStreams/MergingSortedBlockInputStream.h>
2012-08-16 17:27:40 +00:00
#include <DB/DataStreams/CollapsingSortedBlockInputStream.h>
2012-07-31 17:22:40 +00:00
#include <DB/DataStreams/ExpressionBlockInputStream.h>
2012-11-28 08:52:15 +00:00
#include <DB/DataStreams/ConcatBlockInputStream.h>
2012-07-21 06:47:17 +00:00
#include <DB/DataStreams/narrowBlockInputStreams.h>
2012-07-30 20:32:36 +00:00
#include <DB/DataStreams/copyData.h>
#include <DB/DataStreams/FilterBlockInputStream.h>
2012-07-19 20:32:10 +00:00
2012-07-21 03:45:48 +00:00
#include <DB/Parsers/ASTExpressionList.h>
#include <DB/Parsers/ASTSelectQuery.h>
#include <DB/Parsers/ASTFunction.h>
#include <DB/Parsers/ASTLiteral.h>
#include <DB/Parsers/ASTIdentifier.h>
2012-07-21 03:45:48 +00:00
2012-07-17 20:04:39 +00:00
#include <DB/Interpreters/sortBlock.h>
#include <DB/Storages/StorageMergeTree.h>
2012-12-05 12:44:55 +00:00
#include <DB/Storages/PkCondition.h>
2012-07-17 20:04:39 +00:00
2012-07-19 20:32:10 +00:00
#define MERGE_TREE_MARK_SIZE (2 * sizeof(size_t))
2012-07-17 20:04:39 +00:00
namespace DB
{
class MergeTreeBlockOutputStream : public IBlockOutputStream
{
public:
2012-08-30 17:43:31 +00:00
MergeTreeBlockOutputStream(StorageMergeTree & storage_) : storage(storage_), flags(O_TRUNC | O_CREAT | O_WRONLY)
2012-07-17 20:04:39 +00:00
{
}
void write(const Block & block)
{
storage.check(block);
Yandex::DateLUTSingleton & date_lut = Yandex::DateLUTSingleton::instance();
size_t rows = block.rows();
size_t columns = block.columns();
/// Достаём столбец с датой.
2012-07-18 19:16:16 +00:00
const ColumnUInt16::Container_t & dates =
dynamic_cast<const ColumnUInt16 &>(*block.getByName(storage.date_column_name).column).getData();
2012-07-17 20:04:39 +00:00
/// Минимальная и максимальная дата.
UInt16 min_date = std::numeric_limits<UInt16>::max();
UInt16 max_date = std::numeric_limits<UInt16>::min();
for (ColumnUInt16::Container_t::const_iterator it = dates.begin(); it != dates.end(); ++it)
{
if (*it < min_date)
min_date = *it;
if (*it > max_date)
max_date = *it;
}
/// Разделяем на блоки по месяцам. Для каждого ещё посчитаем минимальную и максимальную дату.
typedef std::map<UInt16, BlockWithDateInterval> BlocksByMonth;
BlocksByMonth blocks_by_month;
2012-08-31 20:38:05 +00:00
UInt16 min_month = date_lut.toFirstDayNumOfMonth(Yandex::DayNum_t(min_date));
UInt16 max_month = date_lut.toFirstDayNumOfMonth(Yandex::DayNum_t(max_date));
2012-07-17 20:04:39 +00:00
/// Типичный случай - когда месяц один (ничего разделять не нужно).
if (min_month == max_month)
blocks_by_month[min_month] = BlockWithDateInterval(block, min_date, max_date);
else
{
for (size_t i = 0; i < rows; ++i)
{
2012-08-31 20:38:05 +00:00
UInt16 month = date_lut.toFirstDayNumOfMonth(Yandex::DayNum_t(dates[i]));
2012-08-31 18:40:21 +00:00
2012-07-17 20:04:39 +00:00
BlockWithDateInterval & block_for_month = blocks_by_month[month];
if (!block_for_month.block)
block_for_month.block = block.cloneEmpty();
if (dates[i] < block_for_month.min_date)
block_for_month.min_date = dates[i];
if (dates[i] > block_for_month.max_date)
block_for_month.max_date = dates[i];
for (size_t j = 0; j < columns; ++j)
block_for_month.block.getByPosition(j).column->insert((*block.getByPosition(j).column)[i]);
}
}
/// Для каждого месяца.
2012-07-18 19:16:16 +00:00
for (BlocksByMonth::iterator it = blocks_by_month.begin(); it != blocks_by_month.end(); ++it)
2012-07-17 20:04:39 +00:00
writePart(it->second.block, it->second.min_date, it->second.max_date);
}
BlockOutputStreamPtr clone() { return new MergeTreeBlockOutputStream(storage); }
private:
StorageMergeTree & storage;
2012-08-30 17:43:31 +00:00
const int flags;
2012-07-17 20:04:39 +00:00
struct BlockWithDateInterval
{
Block block;
UInt16 min_date;
UInt16 max_date;
BlockWithDateInterval() : min_date(std::numeric_limits<UInt16>::max()), max_date(0) {}
2012-07-18 19:16:16 +00:00
BlockWithDateInterval(const Block & block_, UInt16 min_date_, UInt16 max_date_)
: block(block_), min_date(min_date_), max_date(max_date_) {}
2012-07-17 20:04:39 +00:00
};
void writePart(Block & block, UInt16 min_date, UInt16 max_date)
{
2012-07-21 07:21:41 +00:00
Yandex::DateLUTSingleton & date_lut = Yandex::DateLUTSingleton::instance();
2012-07-18 19:16:16 +00:00
size_t rows = block.rows();
2012-07-17 20:04:39 +00:00
size_t columns = block.columns();
UInt64 part_id = storage.increment.get(true);
2012-07-21 07:21:41 +00:00
String part_name = storage.getPartName(
Yandex::DayNum_t(min_date), Yandex::DayNum_t(max_date),
part_id, part_id, 0);
2012-07-18 19:16:16 +00:00
2012-07-21 07:21:41 +00:00
String part_tmp_path = storage.full_path + "tmp_" + part_name + "/";
String part_res_path = storage.full_path + part_name + "/";
2012-07-17 20:04:39 +00:00
Poco::File(part_tmp_path).createDirectories();
2012-07-31 16:58:37 +00:00
LOG_TRACE(storage.log, "Calculating primary expression.");
2012-07-17 20:04:39 +00:00
/// Если для сортировки надо вычислить некоторые столбцы - делаем это.
2012-07-18 20:14:41 +00:00
storage.primary_expr->execute(block);
2012-07-31 16:58:37 +00:00
LOG_TRACE(storage.log, "Sorting by primary key.");
2012-07-17 20:04:39 +00:00
/// Сортируем.
sortBlock(block, storage.sort_descr);
/// Наконец-то можно писать данные на диск.
2012-07-31 16:58:37 +00:00
LOG_TRACE(storage.log, "Writing index.");
2012-07-18 19:16:16 +00:00
/// Сначала пишем индекс. Индекс содержит значение PK для каждой index_granularity строки.
{
WriteBufferFromFile index(part_tmp_path + "primary.idx", DBMS_DEFAULT_BUFFER_SIZE, flags);
typedef std::vector<const ColumnWithNameAndType *> PrimaryColumns;
PrimaryColumns primary_columns;
for (size_t i = 0, size = storage.sort_descr.size(); i < size; ++i)
primary_columns.push_back(
!storage.sort_descr[i].column_name.empty()
? &block.getByName(storage.sort_descr[i].column_name)
: &block.getByPosition(storage.sort_descr[i].column_number));
for (size_t i = 0; i < rows; i += storage.index_granularity)
for (PrimaryColumns::const_iterator it = primary_columns.begin(); it != primary_columns.end(); ++it)
(*it)->type->serializeBinary((*(*it)->column)[i], index);
}
2012-07-31 16:58:37 +00:00
LOG_TRACE(storage.log, "Writing data.");
2012-07-17 20:04:39 +00:00
for (size_t i = 0; i < columns; ++i)
{
const ColumnWithNameAndType & column = block.getByPosition(i);
2012-08-30 17:43:31 +00:00
writeData(part_tmp_path, column.name, *column.type, *column.column);
2012-07-18 19:16:16 +00:00
}
2012-07-31 16:58:37 +00:00
LOG_TRACE(storage.log, "Renaming.");
2012-07-17 20:04:39 +00:00
2012-07-18 19:16:16 +00:00
/// Переименовываем кусок.
Poco::File(part_tmp_path).renameTo(part_res_path);
2012-07-21 07:21:41 +00:00
/// Добавляем новый кусок в набор.
2012-07-23 06:23:29 +00:00
{
2012-08-10 20:04:34 +00:00
Poco::ScopedLock<Poco::FastMutex> lock(storage.data_parts_mutex);
Poco::ScopedLock<Poco::FastMutex> lock_all(storage.all_data_parts_mutex);
StorageMergeTree::DataPartPtr new_data_part = new StorageMergeTree::DataPart(storage);
new_data_part->left_date = Yandex::DayNum_t(min_date);
new_data_part->right_date = Yandex::DayNum_t(max_date);
new_data_part->left = part_id;
new_data_part->right = part_id;
new_data_part->level = 0;
new_data_part->name = part_name;
2012-11-29 17:04:12 +00:00
new_data_part->size = (rows + storage.index_granularity - 1) / storage.index_granularity;
2012-08-10 20:04:34 +00:00
new_data_part->modification_time = time(0);
2012-08-31 20:38:05 +00:00
new_data_part->left_month = date_lut.toFirstDayNumOfMonth(new_data_part->left_date);
new_data_part->right_month = date_lut.toFirstDayNumOfMonth(new_data_part->right_date);
2012-08-10 20:04:34 +00:00
storage.data_parts.insert(new_data_part);
2012-07-23 06:23:29 +00:00
storage.all_data_parts.insert(new_data_part);
}
2012-11-28 11:49:14 +00:00
/// Если на каждую запись делать по две итерации слияния, то дерево будет максимально компактно.
storage.merge(2);
2012-07-18 19:16:16 +00:00
}
2012-07-17 20:04:39 +00:00
2012-08-30 17:43:31 +00:00
/// Записать данные одного столбца.
void writeData(const String & path, const String & name, const IDataType & type, const IColumn & column, size_t level = 0)
{
String escaped_column_name = escapeForFileName(name);
size_t size = column.size();
2012-08-30 17:43:31 +00:00
/// Для массивов требуется сначала сериализовать размеры, а потом значения.
if (const DataTypeArray * type_arr = dynamic_cast<const DataTypeArray *>(&type))
{
String size_name = escaped_column_name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + Poco::NumberFormatter::format(level);
WriteBufferFromFile plain(path + size_name + ".bin", DBMS_DEFAULT_BUFFER_SIZE, flags);
WriteBufferFromFile marks(path + size_name + ".mrk", 4096, flags);
CompressedWriteBuffer compressed(plain);
size_t prev_mark = 0;
while (prev_mark < size)
{
/// Каждая засечка - это: (смещение в файле до начала сжатого блока, смещение внутри блока)
writeIntBinary(plain.count(), marks);
writeIntBinary(compressed.offset(), marks);
type_arr->serializeOffsets(column, compressed, prev_mark, storage.index_granularity);
prev_mark += storage.index_granularity;
}
2012-08-30 17:43:31 +00:00
}
2012-09-22 07:30:40 +00:00
2012-08-30 17:43:31 +00:00
{
WriteBufferFromFile plain(path + escaped_column_name + ".bin", DBMS_DEFAULT_BUFFER_SIZE, flags);
WriteBufferFromFile marks(path + escaped_column_name + ".mrk", 4096, flags);
CompressedWriteBuffer compressed(plain);
// TODO Для массивов здесь баг - засечки сериализуются неправильно.
2012-08-30 17:43:31 +00:00
size_t prev_mark = 0;
while (prev_mark < size)
{
writeIntBinary(plain.count(), marks);
writeIntBinary(compressed.offset(), marks);
type.serializeBinary(column, compressed, prev_mark, storage.index_granularity);
prev_mark += storage.index_granularity;
}
2012-08-30 17:43:31 +00:00
}
}
2012-07-17 20:04:39 +00:00
};
2012-07-30 20:32:36 +00:00
/** Для записи куска, полученного слиянием нескольких других.
* Данные уже отсортированы, относятся к одному месяцу, и пишутся в один кускок.
*/
class MergedBlockOutputStream : public IBlockOutputStream
{
public:
MergedBlockOutputStream(StorageMergeTree & storage_,
UInt16 min_date, UInt16 max_date, UInt64 min_part_id, UInt64 max_part_id, UInt32 level)
: storage(storage_), marks_count(0), index_offset(0)
2012-07-30 20:32:36 +00:00
{
part_name = storage.getPartName(
Yandex::DayNum_t(min_date), Yandex::DayNum_t(max_date),
min_part_id, max_part_id, level);
part_tmp_path = storage.full_path + "tmp_" + part_name + "/";
part_res_path = storage.full_path + part_name + "/";
Poco::File(part_tmp_path).createDirectories();
2012-08-21 15:37:29 +00:00
index_stream = new WriteBufferFromFile(part_tmp_path + "primary.idx", DBMS_DEFAULT_BUFFER_SIZE, O_TRUNC | O_CREAT | O_WRONLY);
2012-07-30 20:32:36 +00:00
for (NamesAndTypesList::const_iterator it = storage.columns->begin(); it != storage.columns->end(); ++it)
2012-08-30 17:43:31 +00:00
addStream(it->first, *it->second);
2012-07-30 20:32:36 +00:00
}
void write(const Block & block)
{
size_t rows = block.rows();
/// Сначала пишем индекс. Индекс содержит значение PK для каждой index_granularity строки.
typedef std::vector<const ColumnWithNameAndType *> PrimaryColumns;
PrimaryColumns primary_columns;
for (size_t i = 0, size = storage.sort_descr.size(); i < size; ++i)
primary_columns.push_back(
!storage.sort_descr[i].column_name.empty()
? &block.getByName(storage.sort_descr[i].column_name)
: &block.getByPosition(storage.sort_descr[i].column_number));
for (size_t i = index_offset; i < rows; i += storage.index_granularity)
2012-12-03 08:52:58 +00:00
{
2012-07-30 20:32:36 +00:00
for (PrimaryColumns::const_iterator it = primary_columns.begin(); it != primary_columns.end(); ++it)
2012-12-03 08:52:58 +00:00
{
2012-07-30 20:32:36 +00:00
(*it)->type->serializeBinary((*(*it)->column)[i], *index_stream);
2012-12-03 08:52:58 +00:00
}
++marks_count;
}
2012-07-30 20:32:36 +00:00
2012-08-30 17:43:31 +00:00
/// Теперь пишем данные.
2012-07-30 20:32:36 +00:00
for (NamesAndTypesList::const_iterator it = storage.columns->begin(); it != storage.columns->end(); ++it)
{
const ColumnWithNameAndType & column = block.getByName(it->first);
2012-08-30 17:43:31 +00:00
writeData(column.name, *column.type, *column.column);
2012-07-30 20:32:36 +00:00
}
2012-07-31 18:15:51 +00:00
index_offset = rows % storage.index_granularity
? (storage.index_granularity - rows % storage.index_granularity)
: 0;
2012-07-30 20:32:36 +00:00
}
void writeSuffix()
{
/// Заканчиваем запись.
index_stream = NULL;
column_streams.clear();
2012-12-03 08:52:58 +00:00
if (marks_count == 0)
throw Exception("Empty part", ErrorCodes::LOGICAL_ERROR);
2012-07-30 20:32:36 +00:00
/// Переименовываем кусок.
Poco::File(part_tmp_path).renameTo(part_res_path);
/// А добавление нового куска в набор (и удаление исходных кусков) сделает вызывающая сторона.
}
BlockOutputStreamPtr clone() { throw Exception("Cannot clone MergedBlockOutputStream", ErrorCodes::NOT_IMPLEMENTED); }
2012-12-03 08:52:58 +00:00
/// Сколько засечек уже записано.
size_t marksCount()
{
return marks_count;
}
2012-07-30 20:32:36 +00:00
private:
StorageMergeTree & storage;
String part_name;
String part_tmp_path;
String part_res_path;
2012-12-03 08:52:58 +00:00
size_t marks_count;
2012-07-30 20:32:36 +00:00
struct ColumnStream
{
ColumnStream(const String & data_path, const std::string & marks_path) :
2012-08-21 15:37:29 +00:00
plain(data_path, DBMS_DEFAULT_BUFFER_SIZE, O_TRUNC | O_CREAT | O_WRONLY),
2012-07-30 20:32:36 +00:00
compressed(plain),
2012-08-21 15:37:29 +00:00
marks(marks_path, 4096, O_TRUNC | O_CREAT | O_WRONLY) {}
2012-07-30 20:32:36 +00:00
WriteBufferFromFile plain;
CompressedWriteBuffer compressed;
WriteBufferFromFile marks;
};
typedef std::map<String, SharedPtr<ColumnStream> > ColumnStreams;
ColumnStreams column_streams;
SharedPtr<WriteBuffer> index_stream;
/// Смещение до первой строчки блока, для которой надо записать индекс.
size_t index_offset;
2012-08-30 17:43:31 +00:00
void addStream(const String & name, const IDataType & type, size_t level = 0)
{
String escaped_column_name = escapeForFileName(name);
/// Для массивов используются отдельные потоки для размеров.
if (const DataTypeArray * type_arr = dynamic_cast<const DataTypeArray *>(&type))
{
String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + Poco::NumberFormatter::format(level);
String escaped_size_name = escaped_column_name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + Poco::NumberFormatter::format(level);
column_streams[size_name] = new ColumnStream(
part_tmp_path + escaped_size_name + ".bin",
part_tmp_path + escaped_size_name + ".mrk");
addStream(name, *type_arr->getNestedType(), level + 1);
}
else
column_streams[name] = new ColumnStream(
part_tmp_path + escaped_column_name + ".bin",
part_tmp_path + escaped_column_name + ".mrk");
}
/// Записать данные одного столбца.
void writeData(const String & name, const IDataType & type, const IColumn & column, size_t level = 0)
{
size_t size = column.size();
2012-08-30 17:43:31 +00:00
/// Для массивов требуется сначала сериализовать размеры, а потом значения.
if (const DataTypeArray * type_arr = dynamic_cast<const DataTypeArray *>(&type))
{
String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + Poco::NumberFormatter::format(level);
ColumnStream & stream = *column_streams[size_name];
size_t prev_mark = 0;
while (prev_mark < size)
{
size_t limit = 0;
/// Если есть index_offset, то первая засечка идёт не сразу, а после этого количества строк.
if (prev_mark == 0 && index_offset != 0)
{
limit = index_offset;
}
else
{
limit = storage.index_granularity;
writeIntBinary(stream.plain.count(), stream.marks);
writeIntBinary(stream.compressed.offset(), stream.marks);
}
type_arr->serializeOffsets(column, stream.compressed, prev_mark, limit);
prev_mark += limit;
}
2012-08-30 17:43:31 +00:00
}
2012-09-22 07:30:40 +00:00
2012-08-30 17:43:31 +00:00
{
ColumnStream & stream = *column_streams[name];
// TODO Для массивов здесь баг - засечки сериализуются неправильно.
2012-08-30 17:43:31 +00:00
size_t prev_mark = 0;
while (prev_mark < size)
{
size_t limit = 0;
/// Если есть index_offset, то первая засечка идёт не сразу, а после этого количества строк.
if (prev_mark == 0 && index_offset != 0)
{
limit = index_offset;
}
else
{
limit = storage.index_granularity;
writeIntBinary(stream.plain.count(), stream.marks);
writeIntBinary(stream.compressed.offset(), stream.marks);
}
type.serializeBinary(column, stream.compressed, prev_mark, limit);
prev_mark += limit;
}
2012-07-30 20:32:36 +00:00
}
}
};
2012-12-03 08:52:58 +00:00
typedef Poco::SharedPtr<MergedBlockOutputStream> MergedBlockOutputStreamPtr;
2012-07-30 20:32:36 +00:00
2012-07-19 20:32:10 +00:00
/// Для чтения из одного куска. Для чтения сразу из многих, Storage использует сразу много таких объектов.
class MergeTreeBlockInputStream : public IProfilingBlockInputStream
{
public:
2012-07-21 06:47:17 +00:00
MergeTreeBlockInputStream(const String & path_, /// Путь к куску
2012-11-28 08:52:15 +00:00
size_t block_size_, const Names & column_names_,
StorageMergeTree & storage_, const StorageMergeTree::DataPartPtr & owned_data_part_,
2012-12-06 09:45:09 +00:00
const MarkRanges & mark_ranges_)
2012-07-21 06:47:17 +00:00
: path(path_), block_size(block_size_), column_names(column_names_),
2012-08-10 20:04:34 +00:00
storage(storage_), owned_data_part(owned_data_part_),
2012-12-06 09:45:09 +00:00
mark_ranges(mark_ranges_), current_range(-1), rows_left_in_current_range(0)
2012-11-28 08:52:15 +00:00
{
2012-12-06 09:45:09 +00:00
LOG_TRACE(storage.log, "Reading " << mark_ranges.size() << " ranges from part " << owned_data_part->name
<< ", up to " << (mark_ranges.back().end - mark_ranges.front().begin) * storage.index_granularity
<< " rows starting from " << mark_ranges.front().begin * storage.index_granularity);
2012-11-28 08:52:15 +00:00
}
String getName() const { return "MergeTreeBlockInputStream"; }
BlockInputStreamPtr clone()
{
2012-12-06 09:45:09 +00:00
return new MergeTreeBlockInputStream(path, block_size, column_names, storage, owned_data_part, mark_ranges);
2012-11-28 08:52:15 +00:00
}
2012-12-06 09:45:09 +00:00
/// Получает набор диапазонов засечек, вне которых не могут находиться ключи из заданного диапазона.
static MarkRanges markRangesFromPkRange(const String & path,
2012-11-28 08:52:15 +00:00
size_t marks_count,
StorageMergeTree & storage,
2012-12-10 10:23:10 +00:00
PKCondition & key_condition)
2012-07-21 06:47:17 +00:00
{
2012-12-06 09:45:09 +00:00
MarkRanges res;
2012-11-28 08:52:15 +00:00
2012-07-21 06:47:17 +00:00
/// Если индекс не используется.
2012-12-05 12:44:55 +00:00
if (key_condition.alwaysTrue())
2012-07-21 06:47:17 +00:00
{
2012-12-10 10:23:10 +00:00
res.push_back(MarkRange(0, marks_count));
2012-07-21 06:47:17 +00:00
}
else
{
2012-12-10 10:23:10 +00:00
/// Читаем индекс.
typedef std::vector<Row> Index;
size_t key_size = storage.sort_descr.size();
Index index(marks_count, Row(key_size));
2012-11-28 08:52:15 +00:00
2012-07-21 06:47:17 +00:00
{
2012-12-10 10:23:10 +00:00
String index_path = path + "primary.idx";
ReadBufferFromFile index_file(index_path, std::min(static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(index_path).getSize()));
2012-12-06 09:45:09 +00:00
2012-12-10 10:23:10 +00:00
for (size_t i = 0; i < marks_count; ++i)
2012-12-06 09:45:09 +00:00
{
2012-12-10 10:23:10 +00:00
for (size_t j = 0; j < key_size; ++j)
storage.primary_key_sample.getByPosition(j).type->deserializeBinary(index[i][j], index_file);
2012-12-06 09:45:09 +00:00
}
2012-12-10 10:23:10 +00:00
if (!index_file.eof())
throw Exception("index file " + index_path + " is unexpectedly long", ErrorCodes::EXPECTED_END_OF_FILE);
}
/// В стеке всегда будут находиться непересекающиеся подозрительные отрезки, самый левый наверху (back).
/// На каждом шаге берем левый отрезок и проверяем, подходит ли он.
/// Если подходит, разбиваем его на более мелкие и кладем их в стек. Если нет - выбрасываем его.
/// Если отрезок уже длиной в одну засечку, добавляем его в ответ и выбрасываем.
std::vector<MarkRange> ranges_stack;
ranges_stack.push_back(MarkRange(0, marks_count));
while (!ranges_stack.empty())
{
MarkRange range = ranges_stack.back();
ranges_stack.pop_back();
bool may_be_true;
if (range.end == marks_count)
may_be_true = key_condition.mayBeTrueAfter(index[range.begin]);
2012-12-06 09:45:09 +00:00
else
2012-12-10 10:23:10 +00:00
may_be_true = key_condition.mayBeTrueInRange(index[range.begin], index[range.end]);
if (!may_be_true)
continue;
2012-11-28 08:52:15 +00:00
2012-12-10 10:23:10 +00:00
if (range.end == range.begin + 1)
2012-07-21 06:47:17 +00:00
{
2012-12-06 09:45:09 +00:00
/// Увидели полезный промежуток между соседними засечками. Либо добавим его к последнему диапазону, либо начнем новый диапазон.
2012-12-10 10:23:10 +00:00
if (res.empty() || range.begin - res.back().end > storage.min_marks_for_seek)
2012-07-21 06:47:17 +00:00
{
2012-12-10 10:23:10 +00:00
res.push_back(range);
2012-12-06 09:45:09 +00:00
}
else
{
2012-12-10 10:23:10 +00:00
res.back().end = range.end;
}
}
else
{
/// Разбиваем отрезок и кладем результат в стек справа налево.
size_t step = (range.end - range.begin - 1) / storage.settings.coarse_index_granularity + 1;
size_t end;
for (end = range.end; end > range.begin + step; end -= step)
{
ranges_stack.push_back(MarkRange(end - step, end));
2012-07-21 06:47:17 +00:00
}
2012-12-10 10:23:10 +00:00
ranges_stack.push_back(MarkRange(range.begin, end));
2012-11-20 22:48:38 +00:00
}
2012-07-21 06:47:17 +00:00
}
2012-07-31 19:08:49 +00:00
}
2012-12-06 09:45:09 +00:00
return res;
2012-07-21 06:47:17 +00:00
}
2012-10-20 02:10:47 +00:00
protected:
2012-07-19 20:32:10 +00:00
Block readImpl()
{
Block res;
2012-12-06 09:45:09 +00:00
/// Если нужно, переходим к следующему диапазону.
if (rows_left_in_current_range == 0)
{
++current_range;
2012-12-06 17:36:51 +00:00
if (static_cast<size_t>(current_range) == mark_ranges.size())
2012-12-06 09:45:09 +00:00
return res;
MarkRange & range = mark_ranges[current_range];
rows_left_in_current_range = (range.end - range.begin) * storage.index_granularity;
2012-07-19 20:32:10 +00:00
for (Names::const_iterator it = column_names.begin(); it != column_names.end(); ++it)
2012-12-06 09:45:09 +00:00
addStream(*it, *storage.getDataTypeByName(*it), range.begin);
}
2012-07-19 20:32:10 +00:00
/// Сколько строк читать для следующего блока.
2012-12-06 09:45:09 +00:00
size_t max_rows_to_read = std::min(block_size, rows_left_in_current_range);
2012-07-19 20:32:10 +00:00
2012-11-30 00:52:45 +00:00
/** Для некоторых столбцов файлы с данными могут отсутствовать.
* Это бывает для старых кусков, после добавления новых столбцов в структуру таблицы.
*/
bool has_missing_columns = false;
bool has_normal_columns = false;
2012-07-19 20:32:10 +00:00
for (Names::const_iterator it = column_names.begin(); it != column_names.end(); ++it)
{
2012-11-30 00:52:45 +00:00
if (streams.end() == streams.find(*it))
{
has_missing_columns = true;
continue;
}
has_normal_columns = true;
2012-07-19 20:32:10 +00:00
ColumnWithNameAndType column;
column.name = *it;
column.type = storage.getDataTypeByName(*it);
column.column = column.type->createColumn();
2012-08-30 17:43:31 +00:00
readData(*it, *column.type, *column.column, max_rows_to_read);
2012-07-19 20:32:10 +00:00
if (column.column->size())
res.insert(column);
}
2012-11-30 00:52:45 +00:00
if (has_missing_columns && !has_normal_columns)
throw Exception("All requested columns are missing", ErrorCodes::ALL_REQUESTED_COLUMNS_ARE_MISSING);
2012-07-19 20:32:10 +00:00
if (res)
2012-11-30 00:52:45 +00:00
{
2012-12-06 09:45:09 +00:00
rows_left_in_current_range -= res.rows();
2012-07-19 20:32:10 +00:00
2012-11-30 00:52:45 +00:00
/// Заполним столбцы, для которых нет файлов, значениями по-умолчанию.
if (has_missing_columns)
{
for (Names::const_iterator it = column_names.begin(); it != column_names.end(); ++it)
{
if (streams.end() == streams.find(*it))
{
ColumnWithNameAndType column;
column.name = *it;
column.type = storage.getDataTypeByName(*it);
/** Нужно превратить константный столбец в полноценный, так как в части блоков (из других кусков),
* он может быть полноценным (а то интерпретатор может посчитать, что он константный везде).
*/
column.column = dynamic_cast<IColumnConst &>(*column.type->createConstColumn(
res.rows(), column.type->getDefault())).convertToFullColumn();
res.insert(column);
}
}
}
}
2012-12-06 09:45:09 +00:00
if (!res || rows_left_in_current_range == 0)
2012-07-19 20:32:10 +00:00
{
2012-12-06 09:45:09 +00:00
rows_left_in_current_range = 0;
2012-07-19 20:32:10 +00:00
/** Закрываем файлы (ещё до уничтожения объекта).
* Чтобы при создании многих источников, но одновременном чтении только из нескольких,
* буферы не висели в памяти.
*/
streams.clear();
}
return res;
}
private:
const String path;
size_t block_size;
Names column_names;
StorageMergeTree & storage;
2012-08-10 20:04:34 +00:00
const StorageMergeTree::DataPartPtr owned_data_part; /// Кусок не будет удалён, пока им владеет этот объект.
2012-12-06 09:45:09 +00:00
MarkRanges mark_ranges; /// В каких диапазонах засечек читать.
int current_range; /// Какой из mark_ranges сейчас читаем.
size_t rows_left_in_current_range; /// Сколько строк уже прочитали из текущего элемента mark_ranges.
2012-07-19 20:32:10 +00:00
struct Stream
{
Stream(const String & path_prefix, size_t mark_number)
: plain(path_prefix + ".bin", std::min(static_cast<size_t>(DBMS_DEFAULT_BUFFER_SIZE), Poco::File(path_prefix + ".bin").getSize())),
compressed(plain)
{
if (mark_number)
{
/// Прочитаем из файла с засечками смещение в файле с данными.
ReadBufferFromFile marks(path_prefix + ".mrk", MERGE_TREE_MARK_SIZE);
marks.seek(mark_number * MERGE_TREE_MARK_SIZE);
size_t offset_in_compressed_file = 0;
size_t offset_in_decompressed_block = 0;
readIntBinary(offset_in_compressed_file, marks);
readIntBinary(offset_in_decompressed_block, marks);
plain.seek(offset_in_compressed_file);
compressed.next();
compressed.position() += offset_in_decompressed_block;
}
}
ReadBufferFromFile plain;
CompressedReadBuffer compressed;
};
typedef std::map<std::string, SharedPtr<Stream> > FileStreams;
FileStreams streams;
2012-08-30 17:43:31 +00:00
2012-12-06 09:45:09 +00:00
void addStream(const String & name, const IDataType & type, size_t mark_number, size_t level = 0)
2012-08-30 17:43:31 +00:00
{
String escaped_column_name = escapeForFileName(name);
2012-11-30 00:52:45 +00:00
/** Если файла с данными нет - то не будем пытаться открыть его.
* Это нужно, чтобы можно было добавлять новые столбцы к структуре таблицы без создания файлов для старых кусков.
*/
if (!Poco::File(path + escaped_column_name + ".bin").exists())
return;
2012-08-30 17:43:31 +00:00
/// Для массивов используются отдельные потоки для размеров.
if (const DataTypeArray * type_arr = dynamic_cast<const DataTypeArray *>(&type))
{
String size_name = name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + Poco::NumberFormatter::format(level);
String escaped_size_name = escaped_column_name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + Poco::NumberFormatter::format(level);
streams.insert(std::make_pair(size_name, new Stream(
path + escaped_size_name,
mark_number)));
2012-12-06 09:45:09 +00:00
addStream(name, *type_arr->getNestedType(), mark_number, level + 1);
2012-08-30 17:43:31 +00:00
}
else
streams.insert(std::make_pair(name, new Stream(
path + escaped_column_name,
mark_number)));
}
void readData(const String & name, const IDataType & type, IColumn & column, size_t max_rows_to_read, size_t level = 0)
{
/// Для массивов требуется сначала десериализовать размеры, а потом значения.
if (const DataTypeArray * type_arr = dynamic_cast<const DataTypeArray *>(&type))
{
type_arr->deserializeOffsets(
column,
streams[name + ARRAY_SIZES_COLUMN_NAME_SUFFIX + Poco::NumberFormatter::format(level)]->compressed,
max_rows_to_read);
2012-08-30 20:35:02 +00:00
if (column.size())
readData(
name,
*type_arr->getNestedType(),
dynamic_cast<ColumnArray &>(column).getData(),
dynamic_cast<const ColumnArray &>(column).getOffsets()[column.size() - 1],
level + 1);
2012-08-30 17:43:31 +00:00
}
else
type.deserializeBinary(column, streams[name]->compressed, max_rows_to_read);
}
2012-07-19 20:32:10 +00:00
};
2012-07-17 20:04:39 +00:00
StorageMergeTree::StorageMergeTree(
const String & path_, const String & name_, NamesAndTypesListPtr columns_,
Context & context_,
ASTPtr & primary_expr_ast_,
2012-12-12 15:45:08 +00:00
const String & date_column_name_, const ASTPtr & sampling_expression_,
2012-08-16 17:27:40 +00:00
size_t index_granularity_,
const String & sign_column_,
2012-08-29 20:23:19 +00:00
const StorageMergeTreeSettings & settings_)
2012-07-17 20:04:39 +00:00
: path(path_), name(name_), full_path(path + escapeForFileName(name) + '/'), columns(columns_),
context(context_), primary_expr_ast(primary_expr_ast_->clone()),
2012-12-12 15:45:08 +00:00
date_column_name(date_column_name_), sampling_expression(sampling_expression_),
index_granularity(index_granularity_),
sign_column(sign_column_),
2012-08-29 20:23:19 +00:00
settings(settings_),
2012-07-19 20:32:10 +00:00
increment(full_path + "increment.txt"), log(&Logger::get("StorageMergeTree: " + name))
2012-07-17 20:04:39 +00:00
{
2012-12-06 13:07:29 +00:00
min_marks_for_seek = (settings.min_rows_for_seek + index_granularity - 1) / index_granularity;
min_marks_for_concurrent_read = (settings.min_rows_for_concurrent_read + index_granularity - 1) / index_granularity;
2012-07-17 20:04:39 +00:00
/// создаём директорию, если её нет
Poco::File(full_path).createDirectories();
/// инициализируем описание сортировки
sort_descr.reserve(primary_expr_ast->children.size());
for (ASTs::iterator it = primary_expr_ast->children.begin();
it != primary_expr_ast->children.end();
++it)
{
2012-07-18 20:14:41 +00:00
String name = (*it)->getColumnName();
2012-07-17 20:04:39 +00:00
sort_descr.push_back(SortColumnDescription(name, 1));
}
2012-07-18 20:14:41 +00:00
2012-08-02 17:33:31 +00:00
context.setColumns(*columns);
2012-07-18 20:14:41 +00:00
primary_expr = new Expression(primary_expr_ast, context);
2012-07-21 07:02:55 +00:00
primary_key_sample = primary_expr->getSampleBlock();
2012-07-19 20:32:10 +00:00
2012-11-28 08:52:15 +00:00
merge_threads = new boost::threadpool::pool(settings.merging_threads);
2012-07-19 20:32:10 +00:00
loadDataParts();
2012-07-17 20:04:39 +00:00
}
2012-07-30 20:32:36 +00:00
StorageMergeTree::~StorageMergeTree()
{
2012-11-28 08:52:15 +00:00
joinMergeThreads();
2012-07-30 20:32:36 +00:00
}
2012-07-18 19:44:04 +00:00
BlockOutputStreamPtr StorageMergeTree::write(ASTPtr query)
{
return new MergeTreeBlockOutputStream(*this);
}
2012-07-21 05:07:14 +00:00
BlockInputStreams StorageMergeTree::read(
2012-12-12 16:11:27 +00:00
const Names & column_names_to_return,
2012-07-21 05:07:14 +00:00
ASTPtr query,
QueryProcessingStage::Enum & processed_stage,
size_t max_block_size,
unsigned threads)
{
2012-12-10 10:23:10 +00:00
PKCondition key_condition(query, context, sort_descr);
PKCondition date_condition(query, context, SortDescription(1, SortColumnDescription(date_column_name, 1)));
2012-07-21 03:45:48 +00:00
2012-12-12 16:11:27 +00:00
Names column_names_to_read = column_names_to_return;
size_t count_limit = std::numeric_limits<size_t>::max();
bool sample_by_value = false;
UInt64 sample_column_value_limit = 0;
ASTSelectQuery & select = *dynamic_cast<ASTSelectQuery*>(&*query);
if (select.sample_size)
{
double size = boost::apply_visitor(FieldVisitorConvertToNumber<double>(), dynamic_cast<ASTLiteral*>(&*select.sample_size)->value);
if (size < 0)
throw Exception("Negative sample size", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
if (size > 1)
{
count_limit = boost::apply_visitor(FieldVisitorConvertToNumber<UInt64>(), dynamic_cast<ASTLiteral*>(&*select.sample_size)->value);
}
else
{
sample_by_value = true;
sample_column_value_limit = static_cast<UInt64>(size * std::numeric_limits<UInt32>::max());
2012-12-12 16:11:27 +00:00
column_names_to_read.push_back(sampling_expression->getColumnName());
2012-12-12 15:45:08 +00:00
if (!key_condition.addCondition(sampling_expression->getColumnName(),
Range::RightBounded(sample_column_value_limit, true)))
throw Exception("Invalid sampling column in storage parameters", ErrorCodes::ILLEGAL_COLUMN);
}
}
2012-12-05 12:44:55 +00:00
LOG_DEBUG(log, "key condition: " << key_condition.toString());
LOG_DEBUG(log, "date condition: " << date_condition.toString());
2012-12-06 11:48:41 +00:00
typedef std::vector<DataPartPtr> PartsList;
PartsList parts;
2012-11-28 08:52:15 +00:00
2012-12-05 12:44:55 +00:00
/// Выберем куски, в которых могут быть данные, удовлетворяющие key_condition.
2012-08-10 20:04:34 +00:00
{
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
for (DataParts::iterator it = data_parts.begin(); it != data_parts.end(); ++it)
2012-12-05 13:39:38 +00:00
if (date_condition.mayBeTrueInRange(Row(1, static_cast<UInt64>((*it)->left_date)),Row(1, static_cast<UInt64>((*it)->right_date))))
2012-12-06 11:48:41 +00:00
parts.push_back(*it);
2012-08-10 20:04:34 +00:00
}
2012-11-28 08:52:15 +00:00
2012-12-06 11:48:41 +00:00
RangesInDataParts parts_with_ranges;
2012-11-28 08:52:15 +00:00
/// Найдем, какой диапазон читать из каждого куска.
size_t sum_marks = 0;
2012-12-06 11:48:41 +00:00
size_t sum_ranges = 0;
2012-11-28 08:52:15 +00:00
for (size_t i = 0; i < parts.size(); ++i)
{
2012-12-06 11:48:41 +00:00
DataPartPtr & part = parts[i];
RangesInDataPart ranges(part);
ranges.ranges = MergeTreeBlockInputStream::markRangesFromPkRange(full_path + part->name + '/',
part->size,
2012-12-06 09:45:09 +00:00
*this,
key_condition);
2012-12-06 11:48:41 +00:00
if (!ranges.ranges.empty())
{
parts_with_ranges.push_back(ranges);
sum_ranges += ranges.ranges.size();
for (size_t j = 0; j < ranges.ranges.size(); ++j)
{
sum_marks += ranges.ranges[j].end - ranges.ranges[j].begin;
/// Если нашли достаточно строк.
if (sum_marks * index_granularity >= count_limit)
{
MarkRanges & new_ranges = parts_with_ranges.back().ranges;
/// Обрежем этот отрезок.
new_ranges[j].end -= count_limit - sum_marks * index_granularity;
/// Удалим вссе последующие отрезки.
new_ranges.erase(new_ranges.begin() + j + 1, new_ranges.end());
}
2012-12-06 11:48:41 +00:00
}
/// Если нашли достаточно строк, дальше можено не смотреть.
if (sum_marks * index_granularity >= count_limit)
break;
2012-12-06 11:48:41 +00:00
}
2012-11-28 08:52:15 +00:00
}
2012-12-06 12:24:08 +00:00
LOG_DEBUG(log, "Selected " << parts.size() << " parts by date, " << parts_with_ranges.size() << " parts by key, "
2012-12-06 11:48:41 +00:00
<< sum_marks << " marks to read from " << sum_ranges << " ranges");
2012-11-28 08:52:15 +00:00
2012-12-12 16:11:27 +00:00
BlockInputStreams res = spreadMarkRangesAmongThreads(parts_with_ranges, threads, column_names_to_read, max_block_size);
if (sample_by_value)
{
/// Добавим фильтрацию: sampling_column_name <= sample_column_value_limit
ASTPtr filter_function_args = new ASTExpressionList;
2012-12-12 15:45:08 +00:00
filter_function_args->children.push_back(sampling_expression);
filter_function_args->children.push_back(new ASTLiteral(StringRange(), sample_column_value_limit));
2012-12-12 15:37:43 +00:00
Poco::SharedPtr<ASTFunction> filter_function = new ASTFunction;
filter_function->name = "lessOrEquals";
filter_function->arguments = filter_function_args;
filter_function->children.push_back(filter_function->arguments);
ExpressionPtr filter_expression = new Expression(filter_function, context);
for (size_t i = 0; i < res.size(); ++i)
{
BlockInputStreamPtr original_stream = res[i];
BlockInputStreamPtr expression_stream = new ExpressionBlockInputStream(original_stream, filter_expression);
BlockInputStreamPtr filter_stream = new FilterBlockInputStream(expression_stream, filter_function->getColumnName());
res[i] = filter_stream;
}
}
return res;
2012-12-06 09:45:09 +00:00
}
/// Примерно поровну распределить засечки между потоками.
BlockInputStreams StorageMergeTree::spreadMarkRangesAmongThreads(RangesInDataParts parts, size_t threads, const Names & column_names, size_t max_block_size)
{
/// На всякий случай перемешаем куски.
std::random_shuffle(parts.begin(), parts.end());
/// Посчитаем засечки для каждого куска.
std::vector<size_t> sum_marks_in_parts(parts.size());
size_t sum_marks = 0;
for (size_t i = 0; i < parts.size(); ++i)
{
/// Пусть отрезки будут перечислены справа налево, чтобы можно было выбрасывать самый левый отрезок с помощью pop_back().
std::reverse(parts[i].ranges.begin(), parts[i].ranges.end());
sum_marks_in_parts[i] = 0;
for (size_t j = 0; j < parts[i].ranges.size(); ++j)
{
MarkRange & range = parts[i].ranges[j];
sum_marks_in_parts[i] += range.end - range.begin;
}
sum_marks += sum_marks_in_parts[i];
}
2012-11-28 08:52:15 +00:00
BlockInputStreams res;
2012-11-30 00:52:45 +00:00
2012-11-30 08:33:36 +00:00
if (sum_marks > 0)
2012-11-28 08:52:15 +00:00
{
2012-12-06 09:45:09 +00:00
size_t min_marks_per_thread = (sum_marks - 1) / threads + 1;
2012-11-30 08:33:36 +00:00
2012-12-06 09:45:09 +00:00
for (size_t i = 0; i < threads && !parts.empty(); ++i)
2012-11-28 08:52:15 +00:00
{
2012-12-06 09:45:09 +00:00
size_t need_marks = min_marks_per_thread;
2012-11-30 08:33:36 +00:00
BlockInputStreams streams;
2012-11-30 00:52:45 +00:00
2012-12-06 09:45:09 +00:00
/// Цикл по кускам.
while (need_marks > 0 && !parts.empty())
2012-11-28 08:52:15 +00:00
{
2012-12-06 09:45:09 +00:00
RangesInDataPart & part = parts.back();
size_t & marks_in_part = sum_marks_in_parts.back();
/// Не будем брать из куска слишком мало строк.
2012-12-06 13:07:29 +00:00
if (marks_in_part >= min_marks_for_concurrent_read &&
need_marks < min_marks_for_concurrent_read)
need_marks = min_marks_for_concurrent_read;
2012-11-30 08:33:36 +00:00
2012-12-06 09:45:09 +00:00
/// Не будем оставлять в куске слишком мало строк.
if (marks_in_part > need_marks &&
2012-12-06 13:07:29 +00:00
marks_in_part - need_marks < min_marks_for_concurrent_read)
2012-12-06 09:45:09 +00:00
need_marks = marks_in_part;
2012-11-30 08:33:36 +00:00
2012-12-06 09:45:09 +00:00
/// Возьмем весь кусок, если он достаточно мал.
if (marks_in_part <= need_marks)
2012-11-30 08:33:36 +00:00
{
2012-12-06 09:45:09 +00:00
/// Восстановим порядок отрезков.
std::reverse(part.ranges.begin(), part.ranges.end());
streams.push_back(new MergeTreeBlockInputStream(full_path + part.data_part->name + '/',
max_block_size, column_names, *this,
part.data_part, part.ranges));
need_marks -= marks_in_part;
parts.pop_back();
sum_marks_in_parts.pop_back();
2012-11-30 08:33:36 +00:00
continue;
}
2012-12-06 09:45:09 +00:00
MarkRanges ranges_to_get_from_part;
2012-11-30 08:33:36 +00:00
2012-12-06 09:45:09 +00:00
/// Цикл по отрезкам куска.
while (need_marks > 0)
{
if (part.ranges.empty())
throw Exception("Unexpected end of ranges while spreading marks among threads", ErrorCodes::LOGICAL_ERROR);
MarkRange & range = part.ranges.back();
size_t marks_in_range = range.end - range.begin;
2012-11-30 08:33:36 +00:00
2012-12-06 09:45:09 +00:00
size_t marks_to_get_from_range = std::min(marks_in_range, need_marks);
ranges_to_get_from_part.push_back(MarkRange(range.begin, range.begin + marks_to_get_from_range));
range.begin += marks_to_get_from_range;
marks_in_part -= marks_to_get_from_range;
need_marks -= marks_to_get_from_range;
if (range.begin == range.end)
part.ranges.pop_back();
}
2012-11-30 08:33:36 +00:00
2012-12-06 09:45:09 +00:00
streams.push_back(new MergeTreeBlockInputStream(full_path + part.data_part->name + '/',
max_block_size, column_names, *this,
part.data_part, ranges_to_get_from_part));
2012-11-28 08:52:15 +00:00
}
2012-11-30 00:52:45 +00:00
2012-11-30 08:33:36 +00:00
if (streams.size() == 1)
res.push_back(streams[0]);
2012-11-29 08:41:20 +00:00
else
2012-11-30 08:33:36 +00:00
res.push_back(new ConcatBlockInputStream(streams));
2012-11-28 08:52:15 +00:00
}
2012-12-06 09:45:09 +00:00
if (!parts.empty())
throw Exception("Couldn't spread marks among threads", ErrorCodes::LOGICAL_ERROR);
2012-11-28 08:52:15 +00:00
}
2012-12-06 11:10:05 +00:00
return res;
2012-07-21 03:45:48 +00:00
}
2012-07-19 20:32:10 +00:00
String StorageMergeTree::getPartName(Yandex::DayNum_t left_date, Yandex::DayNum_t right_date, UInt64 left_id, UInt64 right_id, UInt64 level)
2012-07-17 20:04:39 +00:00
{
Yandex::DateLUTSingleton & date_lut = Yandex::DateLUTSingleton::instance();
2012-07-19 20:32:10 +00:00
/// Имя директории для куска иммет вид: YYYYMMDD_YYYYMMDD_N_N_L.
2012-07-17 20:04:39 +00:00
String res;
{
2012-07-19 20:32:10 +00:00
unsigned left_date_id = Yandex::Date2OrderedIdentifier(date_lut.fromDayNum(left_date));
unsigned right_date_id = Yandex::Date2OrderedIdentifier(date_lut.fromDayNum(right_date));
2012-07-17 20:04:39 +00:00
WriteBufferFromString wb(res);
2012-07-19 20:32:10 +00:00
writeIntText(left_date_id, wb);
2012-07-17 20:04:39 +00:00
writeChar('_', wb);
2012-07-19 20:32:10 +00:00
writeIntText(right_date_id, wb);
2012-07-17 20:04:39 +00:00
writeChar('_', wb);
writeIntText(left_id, wb);
writeChar('_', wb);
writeIntText(right_id, wb);
writeChar('_', wb);
writeIntText(level, wb);
}
return res;
}
2012-07-19 20:32:10 +00:00
void StorageMergeTree::loadDataParts()
{
LOG_DEBUG(log, "Loading data parts");
2012-08-10 20:04:34 +00:00
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
Poco::ScopedLock<Poco::FastMutex> lock_all(all_data_parts_mutex);
2012-07-19 20:32:10 +00:00
Yandex::DateLUTSingleton & date_lut = Yandex::DateLUTSingleton::instance();
2012-08-10 20:04:34 +00:00
data_parts.clear();
2012-07-19 20:32:10 +00:00
static Poco::RegularExpression file_name_regexp("^(\\d{8})_(\\d{8})_(\\d+)_(\\d+)_(\\d+)");
Poco::DirectoryIterator end;
Poco::RegularExpression::MatchVec matches;
for (Poco::DirectoryIterator it(full_path); it != end; ++it)
{
std::string file_name = it.name();
if (!(file_name_regexp.match(file_name, 0, matches) && 6 == matches.size()))
continue;
2012-07-31 20:03:53 +00:00
DataPartPtr part = new DataPart(*this);
2012-07-23 06:23:29 +00:00
part->left_date = date_lut.toDayNum(Yandex::OrderedIdentifier2Date(file_name.substr(matches[1].offset, matches[1].length)));
part->right_date = date_lut.toDayNum(Yandex::OrderedIdentifier2Date(file_name.substr(matches[2].offset, matches[2].length)));
part->left = Poco::NumberParser::parseUnsigned64(file_name.substr(matches[3].offset, matches[3].length));
part->right = Poco::NumberParser::parseUnsigned64(file_name.substr(matches[4].offset, matches[4].length));
part->level = Poco::NumberParser::parseUnsigned(file_name.substr(matches[5].offset, matches[5].length));
part->name = file_name;
2012-07-19 20:32:10 +00:00
/// Размер - в количестве засечек.
2012-07-23 06:23:29 +00:00
part->size = Poco::File(full_path + file_name + "/" + escapeForFileName(columns->front().first) + ".mrk").getSize()
2012-07-19 20:32:10 +00:00
/ MERGE_TREE_MARK_SIZE;
2012-07-23 06:23:29 +00:00
part->modification_time = it->getLastModified().epochTime();
2012-07-19 20:32:10 +00:00
2012-08-31 20:38:05 +00:00
part->left_month = date_lut.toFirstDayNumOfMonth(part->left_date);
part->right_month = date_lut.toFirstDayNumOfMonth(part->right_date);
2012-07-19 20:32:10 +00:00
2012-08-10 20:04:34 +00:00
data_parts.insert(part);
2012-07-19 20:32:10 +00:00
}
2012-08-10 20:04:34 +00:00
all_data_parts = data_parts;
2012-07-31 20:03:53 +00:00
2012-07-31 20:13:14 +00:00
/** Удаляем из набора актуальных кусков куски, которые содержатся в другом куске (которые были склеены),
* но по каким-то причинам остались лежать в файловой системе.
* Удаление файлов будет произведено потом в методе clearOldParts.
*/
2012-08-10 20:04:34 +00:00
if (data_parts.size() >= 2)
2012-07-31 20:13:14 +00:00
{
2012-08-10 20:04:34 +00:00
DataParts::iterator prev_jt = data_parts.begin();
2012-08-07 20:37:45 +00:00
DataParts::iterator curr_jt = prev_jt;
++curr_jt;
2012-08-10 20:04:34 +00:00
while (curr_jt != data_parts.end())
2012-07-31 20:13:14 +00:00
{
2012-08-07 20:37:45 +00:00
/// Куски данных за разные месяцы рассматривать не будем
if ((*curr_jt)->left_month != (*curr_jt)->right_month
|| (*curr_jt)->right_month != (*prev_jt)->left_month
|| (*prev_jt)->left_month != (*prev_jt)->right_month)
{
++prev_jt;
++curr_jt;
continue;
}
2012-07-31 20:13:14 +00:00
2012-08-07 20:37:45 +00:00
if ((*curr_jt)->contains(**prev_jt))
{
LOG_WARNING(log, "Part " << (*curr_jt)->name << " contains " << (*prev_jt)->name);
2012-08-10 20:04:34 +00:00
data_parts.erase(prev_jt);
2012-08-07 20:37:45 +00:00
prev_jt = curr_jt;
++curr_jt;
}
else if ((*prev_jt)->contains(**curr_jt))
{
LOG_WARNING(log, "Part " << (*prev_jt)->name << " contains " << (*curr_jt)->name);
2012-08-10 20:04:34 +00:00
data_parts.erase(curr_jt++);
2012-08-07 20:37:45 +00:00
}
else
{
++prev_jt;
++curr_jt;
}
2012-07-31 20:13:14 +00:00
}
}
2012-07-31 20:03:53 +00:00
2012-08-10 20:04:34 +00:00
LOG_DEBUG(log, "Loaded data parts (" << data_parts.size() << " items)");
2012-07-19 20:32:10 +00:00
}
2012-07-23 06:23:29 +00:00
void StorageMergeTree::clearOldParts()
{
Poco::ScopedTry<Poco::FastMutex> lock;
/// Если метод уже вызван из другого потока (или если all_data_parts прямо сейчас меняют), то можно ничего не делать.
if (!lock.lock(&all_data_parts_mutex))
{
LOG_TRACE(log, "Already clearing or modifying old parts");
return;
}
LOG_TRACE(log, "Clearing old parts");
for (DataParts::iterator it = all_data_parts.begin(); it != all_data_parts.end();)
{
2012-07-31 18:25:16 +00:00
int ref_count = it->referenceCount();
LOG_TRACE(log, (*it)->name << ": ref_count = " << ref_count);
2012-08-10 20:04:34 +00:00
if (ref_count == 1) /// После этого ref_count не может увеличиться.
2012-07-23 06:23:29 +00:00
{
LOG_DEBUG(log, "Removing part " << (*it)->name);
(*it)->remove();
all_data_parts.erase(it++);
}
else
++it;
}
}
2012-09-10 19:05:06 +00:00
void StorageMergeTree::merge(size_t iterations, bool async)
2012-07-23 06:23:29 +00:00
{
2012-11-28 08:52:15 +00:00
bool while_can = false;
if (iterations == 0){
while_can = true;
iterations = settings.merging_threads;
2012-08-01 20:08:59 +00:00
}
2012-11-28 17:17:17 +00:00
for (size_t i = 0; i < iterations; ++i)
2012-11-28 08:52:15 +00:00
merge_threads->schedule(boost::bind(&StorageMergeTree::mergeThread, this, while_can));
if (!async)
joinMergeThreads();
2012-08-13 19:13:11 +00:00
}
2012-11-28 08:52:15 +00:00
void StorageMergeTree::mergeThread(bool while_can)
2012-08-13 19:13:11 +00:00
{
try
2012-07-23 06:23:29 +00:00
{
2012-11-28 08:52:15 +00:00
std::vector<DataPartPtr> parts;
while (selectPartsToMerge(parts))
2012-08-13 19:13:11 +00:00
{
2012-11-28 08:52:15 +00:00
mergeParts(parts);
2012-08-13 19:13:11 +00:00
/// Удаляем старые куски.
2012-11-28 08:52:15 +00:00
parts.clear();
2012-08-13 19:13:11 +00:00
clearOldParts();
2012-11-28 08:52:15 +00:00
if (!while_can)
break;
2012-08-13 19:13:11 +00:00
}
}
catch (const Exception & e)
{
LOG_ERROR(log, "Code: " << e.code() << ". " << e.displayText() << std::endl
<< std::endl
<< "Stack trace:" << std::endl
<< e.getStackTrace().toString());
}
catch (const Poco::Exception & e)
{
LOG_ERROR(log, "Poco::Exception: " << e.code() << ". " << e.displayText());
}
catch (const std::exception & e)
{
LOG_ERROR(log, "std::exception: " << e.what());
}
catch (...)
{
LOG_ERROR(log, "Unknown exception");
}
2012-07-23 06:23:29 +00:00
}
2012-11-28 08:52:15 +00:00
void StorageMergeTree::joinMergeThreads()
{
LOG_DEBUG(log, "Waiting for merge thread to finish.");
merge_threads->wait();
}
2012-11-29 10:50:17 +00:00
/// Выбираем отрезок из не более чем max_parts_to_merge_at_once кусков так, чтобы максимальный размер был меньше чем в max_size_ratio_to_merge_parts раз больше суммы остальных.
/// Это обеспечивает в худшем случае время O(n log n) на все слияния, независимо от выбора сливаемых кусков, порядка слияния и добавления.
2012-11-29 17:43:23 +00:00
/// При max_parts_to_merge_at_once >= log(max_rows_to_merge_parts/index_granularity)/log(max_size_ratio_to_merge_parts),
/// несложно доказать, что всегда будет что сливать, пока количество кусков больше
/// log(max_rows_to_merge_parts/index_granularity)/log(max_size_ratio_to_merge_parts)*(количество кусков размером больше max_rows_to_merge_parts).
2012-11-29 12:24:08 +00:00
/// Дальше эвристики.
/// Будем выбирать максимальный по включению подходящий отрезок.
/// Из всех таких выбираем отрезок с минимальным максимумом размера.
/// Из всех таких выбираем отрезок с минимальным минимумом размера.
/// Из всех таких выбираем отрезок с максимальной длиной.
2012-11-28 08:52:15 +00:00
bool StorageMergeTree::selectPartsToMerge(std::vector<DataPartPtr> & parts)
2012-07-23 06:23:29 +00:00
{
LOG_DEBUG(log, "Selecting parts to merge");
2012-08-10 20:04:34 +00:00
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
2012-07-23 06:23:29 +00:00
2012-11-29 11:32:29 +00:00
size_t min_max = -1U;
size_t min_min = -1U;
int max_len = 0;
2012-11-29 10:50:17 +00:00
DataParts::iterator best_begin;
bool found = false;
2012-11-29 12:24:08 +00:00
/// Сколько кусков, начиная с текущего, можно включить в валидный отрезок, начинающийся левее текущего куска.
/// Нужно для определения максимальности по включению.
2012-11-29 12:26:34 +00:00
int max_count_from_left = 0;
2012-11-29 10:50:17 +00:00
2012-11-29 12:24:08 +00:00
/// Левый конец отрезка.
2012-11-29 10:50:17 +00:00
for (DataParts::iterator it = data_parts.begin(); it != data_parts.end(); ++it)
2012-07-23 06:23:29 +00:00
{
2012-11-29 12:24:08 +00:00
const DataPartPtr & first_part = *it;
2012-11-29 12:26:34 +00:00
max_count_from_left = std::max(0, max_count_from_left - 1);
2012-11-29 10:50:17 +00:00
2012-11-29 12:24:08 +00:00
/// Кусок не занят и достаточно мал.
if (first_part->currently_merging ||
first_part->size * index_granularity > settings.max_rows_to_merge_parts)
continue;
/// Кусок в одном месяце.
if (first_part->left_month != first_part->right_month)
{
LOG_WARNING(log, "Part " << first_part->name << " spans more than one month");
2012-11-29 10:50:17 +00:00
continue;
2012-11-29 12:24:08 +00:00
}
/// Самый длинный валидный отрезок, начинающийся здесь.
2012-11-29 16:39:29 +00:00
size_t cur_longest_max = -1U;
size_t cur_longest_min = -1U;
2012-11-29 12:24:08 +00:00
int cur_longest_len = 0;
2012-11-29 10:50:17 +00:00
2012-11-29 12:24:08 +00:00
/// Текущий отрезок, не обязательно валидный.
size_t cur_max = first_part->size;
size_t cur_min = first_part->size;
size_t cur_sum = first_part->size;
2012-11-29 10:50:17 +00:00
int cur_len = 1;
2012-11-29 12:24:08 +00:00
Yandex::DayNum_t month = first_part->left_month;
UInt64 cur_id = first_part->right;
2012-11-29 10:50:17 +00:00
2012-11-29 12:24:08 +00:00
/// Правый конец отрезка.
2012-11-29 10:50:17 +00:00
DataParts::iterator jt = it;
2012-11-29 11:32:29 +00:00
for (++jt; jt != data_parts.end() && cur_len < static_cast<int>(settings.max_parts_to_merge_at_once); ++jt)
2012-07-23 06:23:29 +00:00
{
2012-11-29 12:24:08 +00:00
const DataPartPtr & last_part = *jt;
/// Кусок не занят, достаточно мал и в одном правильном месяце.
if (last_part->currently_merging ||
last_part->size * index_granularity > settings.max_rows_to_merge_parts ||
last_part->left_month != last_part->right_month ||
last_part->left_month != month)
2012-11-29 10:50:17 +00:00
break;
2012-11-29 12:24:08 +00:00
/// Кусок правее предыдущего.
2012-11-30 00:52:45 +00:00
if (last_part->left < cur_id)
2012-11-29 12:24:08 +00:00
{
LOG_WARNING(log, "Part " << last_part->name << " intersects previous part");
break;
}
cur_max = std::max(cur_max, last_part->size);
cur_min = std::min(cur_min, last_part->size);
cur_sum += last_part->size;
2012-11-29 10:50:17 +00:00
++cur_len;
2012-11-29 12:24:08 +00:00
cur_id = last_part->right;
2012-11-29 10:50:17 +00:00
2012-11-29 12:24:08 +00:00
/// Если отрезок валидный, то он самый длинный валидный, начинающийся тут.
2012-11-29 10:50:17 +00:00
if (cur_len >= 2 &&
2012-11-29 12:24:08 +00:00
static_cast<double>(cur_max) / (cur_sum - cur_max) < settings.max_size_ratio_to_merge_parts)
{
cur_longest_max = cur_max;
cur_longest_min = cur_min;
cur_longest_len = cur_len;
}
}
2012-11-29 12:26:34 +00:00
/// Это максимальный по включению валидный отрезок.
2012-11-29 12:24:08 +00:00
if (cur_longest_len > max_count_from_left)
{
max_count_from_left = cur_longest_len;
if (!found ||
std::make_pair(std::make_pair(cur_longest_max, cur_longest_min), -cur_longest_len) <
std::make_pair(std::make_pair(min_max, min_min), -max_len))
2012-11-29 10:50:17 +00:00
{
found = true;
2012-11-29 12:24:08 +00:00
min_max = cur_longest_max;
min_min = cur_longest_min;
max_len = cur_longest_len;
2012-11-29 10:50:17 +00:00
best_begin = it;
}
2012-07-23 06:23:29 +00:00
}
}
2012-11-29 10:50:17 +00:00
if (found)
2012-07-23 06:23:29 +00:00
{
2012-11-29 10:50:17 +00:00
parts.clear();
DataParts::iterator it = best_begin;
for (int i = 0; i < max_len; ++i)
2012-07-23 06:23:29 +00:00
{
2012-11-29 10:50:17 +00:00
parts.push_back(*it);
2012-11-29 11:48:27 +00:00
parts.back()->currently_merging = true;
2012-11-29 10:50:17 +00:00
++it;
2012-07-23 06:23:29 +00:00
}
2012-11-29 10:50:17 +00:00
2012-11-30 02:01:02 +00:00
LOG_DEBUG(log, "Selected " << parts.size() << " parts from " << parts.front()->name << " to " << parts.back()->name);
2012-07-23 06:23:29 +00:00
}
2012-11-29 10:50:17 +00:00
else
2012-07-23 06:23:29 +00:00
{
2012-11-29 10:50:17 +00:00
LOG_DEBUG(log, "No parts to merge");
2012-07-23 06:23:29 +00:00
}
2012-11-29 10:50:17 +00:00
return found;
2012-07-23 06:23:29 +00:00
}
2012-11-28 08:52:15 +00:00
void StorageMergeTree::mergeParts(std::vector<DataPartPtr> parts)
2012-07-23 06:23:29 +00:00
{
2012-11-30 02:01:02 +00:00
LOG_DEBUG(log, "Merging " << parts.size() << " parts: from " << parts.front()->name << " to " << parts.back()->name);
2012-07-30 20:32:36 +00:00
2012-08-13 19:13:11 +00:00
Names all_column_names;
for (NamesAndTypesList::const_iterator it = columns->begin(); it != columns->end(); ++it)
all_column_names.push_back(it->first);
2012-07-30 20:32:36 +00:00
2012-08-13 19:13:11 +00:00
Yandex::DateLUTSingleton & date_lut = Yandex::DateLUTSingleton::instance();
2012-07-30 20:32:36 +00:00
2012-08-13 19:13:11 +00:00
StorageMergeTree::DataPartPtr new_data_part = new DataPart(*this);
2012-12-06 12:51:15 +00:00
new_data_part->left_date = std::numeric_limits<UInt16>::max();
new_data_part->right_date = std::numeric_limits<UInt16>::min();
2012-11-30 02:01:02 +00:00
new_data_part->left = parts.front()->left;
2012-11-28 08:52:15 +00:00
new_data_part->right = parts.back()->right;
new_data_part->level = 0;
for (size_t i = 0; i < parts.size(); ++i)
{
new_data_part->level = std::max(new_data_part->level, parts[i]->level);
2012-12-06 12:51:15 +00:00
new_data_part->left_date = std::min(new_data_part->left_date, parts[i]->left_date);
new_data_part->right_date = std::max(new_data_part->right_date, parts[i]->right_date);
2012-11-28 08:52:15 +00:00
}
++new_data_part->level;
2012-08-13 19:13:11 +00:00
new_data_part->name = getPartName(
new_data_part->left_date, new_data_part->right_date, new_data_part->left, new_data_part->right, new_data_part->level);
2012-08-31 20:38:05 +00:00
new_data_part->left_month = date_lut.toFirstDayNumOfMonth(new_data_part->left_date);
new_data_part->right_month = date_lut.toFirstDayNumOfMonth(new_data_part->right_date);
2012-08-13 19:13:11 +00:00
2012-11-29 16:39:29 +00:00
/** Читаем из всех кусков, сливаем и пишем в новый.
2012-08-13 19:13:11 +00:00
* Попутно вычисляем выражение для сортировки.
*/
BlockInputStreams src_streams;
2012-07-30 20:32:36 +00:00
2012-11-28 08:52:15 +00:00
for (size_t i = 0; i < parts.size(); ++i)
{
2012-12-06 09:45:09 +00:00
MarkRanges ranges(1, MarkRange(0, parts[i]->size));
2012-11-28 08:52:15 +00:00
src_streams.push_back(new ExpressionBlockInputStream(new MergeTreeBlockInputStream(
2012-12-06 09:45:09 +00:00
full_path + parts[i]->name + '/', DEFAULT_BLOCK_SIZE, all_column_names, *this, parts[i], ranges), primary_expr));
2012-11-28 08:52:15 +00:00
}
2012-07-30 20:32:36 +00:00
2012-11-30 00:52:45 +00:00
BlockInputStreamPtr merged_stream = sign_column.empty()
? new MergingSortedBlockInputStream(src_streams, sort_descr, DEFAULT_BLOCK_SIZE)
: new CollapsingSortedBlockInputStream(src_streams, sort_descr, sign_column, DEFAULT_BLOCK_SIZE);
2012-08-13 20:16:06 +00:00
2012-12-03 08:52:58 +00:00
MergedBlockOutputStreamPtr to = new MergedBlockOutputStream(*this,
2012-11-28 08:52:15 +00:00
new_data_part->left_date, new_data_part->right_date, new_data_part->left, new_data_part->right, new_data_part->level);
2012-07-30 20:32:36 +00:00
2012-08-13 19:13:11 +00:00
copyData(*merged_stream, *to);
2012-07-31 20:03:53 +00:00
2012-12-03 08:52:58 +00:00
new_data_part->size = to->marksCount();
2012-08-13 19:13:11 +00:00
new_data_part->modification_time = time(0);
2012-07-31 17:07:20 +00:00
2012-08-13 19:13:11 +00:00
{
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
Poco::ScopedLock<Poco::FastMutex> lock_all(all_data_parts_mutex);
2012-07-31 17:07:20 +00:00
2012-08-13 19:13:11 +00:00
/// Добавляем новый кусок в набор.
2012-11-28 08:52:15 +00:00
for (size_t i = 0; i < parts.size(); ++i)
{
if (data_parts.end() == data_parts.find(parts[i]))
throw Exception("Logical error: cannot find data part " + parts[i]->name + " in list", ErrorCodes::LOGICAL_ERROR);
}
2012-07-31 17:07:20 +00:00
2012-08-13 19:13:11 +00:00
data_parts.insert(new_data_part);
all_data_parts.insert(new_data_part);
2012-11-28 08:52:15 +00:00
for (size_t i = 0; i < parts.size(); ++i)
{
data_parts.erase(data_parts.find(parts[i]));
}
2012-07-31 17:07:20 +00:00
}
2012-08-13 19:13:11 +00:00
2012-11-30 02:01:02 +00:00
LOG_TRACE(log, "Merged " << parts.size() << " parts: from" << parts.front()->name << " to " << parts.back()->name);
2012-07-23 06:23:29 +00:00
}
2012-08-16 18:17:01 +00:00
void StorageMergeTree::drop()
{
2012-11-28 08:52:15 +00:00
joinMergeThreads();
2012-08-16 18:17:01 +00:00
Poco::ScopedLock<Poco::FastMutex> lock(data_parts_mutex);
Poco::ScopedLock<Poco::FastMutex> lock_all(all_data_parts_mutex);
data_parts.clear();
all_data_parts.clear();
Poco::File(full_path).remove(true);
}
2012-07-17 20:04:39 +00:00
}