mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-30 03:22:14 +00:00
dbms: development [#CONV-2944].
This commit is contained in:
parent
8700cff3f3
commit
89fde21e34
@ -31,7 +31,7 @@ namespace DB
|
|||||||
* Структура файлов:
|
* Структура файлов:
|
||||||
* / increment.txt - файл, содержащий одно число, увеличивающееся на 1 - для генерации идентификаторов кусков.
|
* / increment.txt - файл, содержащий одно число, увеличивающееся на 1 - для генерации идентификаторов кусков.
|
||||||
* / min-date _ max-date _ min-id _ max-id _ level / - директория с куском.
|
* / min-date _ max-date _ min-id _ max-id _ level / - директория с куском.
|
||||||
* / min-date _ max-date _ min-id _ max-id _ level . idx - индексный файл.
|
* / min-date _ max-date _ min-id _ max-id _ level / primary.idx - индексный файл.
|
||||||
* Внутри директории с куском:
|
* Внутри директории с куском:
|
||||||
* Column.bin - данные столбца
|
* Column.bin - данные столбца
|
||||||
* Column.mrk - засечки, указывающие, откуда начинать чтение, чтобы пропустить n * k строк.
|
* Column.mrk - засечки, указывающие, откуда начинать чтение, чтобы пропустить n * k строк.
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
#pragma once
|
#include <boost/bind.hpp>
|
||||||
|
|
||||||
#include <DB/Common/escapeForFileName.h>
|
#include <DB/Common/escapeForFileName.h>
|
||||||
|
|
||||||
@ -33,7 +33,8 @@ public:
|
|||||||
size_t columns = block.columns();
|
size_t columns = block.columns();
|
||||||
|
|
||||||
/// Достаём столбец с датой.
|
/// Достаём столбец с датой.
|
||||||
const ColumnUInt16::Container_t & dates = dynamic_cast<const ColumnUInt16 &>(block.getByName(storage.date_column_name)).getData();
|
const ColumnUInt16::Container_t & dates =
|
||||||
|
dynamic_cast<const ColumnUInt16 &>(*block.getByName(storage.date_column_name).column).getData();
|
||||||
|
|
||||||
/// Минимальная и максимальная дата.
|
/// Минимальная и максимальная дата.
|
||||||
UInt16 min_date = std::numeric_limits<UInt16>::max();
|
UInt16 min_date = std::numeric_limits<UInt16>::max();
|
||||||
@ -76,7 +77,7 @@ public:
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Для каждого месяца.
|
/// Для каждого месяца.
|
||||||
for (BlocksByMonth::const_iterator it = blocks_by_month.begin(); it != blocks_by_month.end(); ++it)
|
for (BlocksByMonth::iterator it = blocks_by_month.begin(); it != blocks_by_month.end(); ++it)
|
||||||
writePart(it->second.block, it->second.min_date, it->second.max_date);
|
writePart(it->second.block, it->second.min_date, it->second.max_date);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -92,33 +93,26 @@ private:
|
|||||||
UInt16 max_date;
|
UInt16 max_date;
|
||||||
|
|
||||||
BlockWithDateInterval() : min_date(std::numeric_limits<UInt16>::max()), max_date(0) {}
|
BlockWithDateInterval() : min_date(std::numeric_limits<UInt16>::max()), max_date(0) {}
|
||||||
BlockWithDateInterval(Block & block, UInt16 min_date_, UInt16 max_date_) : min_date(min_date_), max_date(max_date_) {}
|
BlockWithDateInterval(const Block & block_, UInt16 min_date_, UInt16 max_date_)
|
||||||
|
: block(block_), min_date(min_date_), max_date(max_date_) {}
|
||||||
};
|
};
|
||||||
|
|
||||||
struct Stream
|
|
||||||
{
|
|
||||||
Stream(const std::string & data_path, const std::string & marks_path) :
|
|
||||||
plain(data_path, DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY),
|
|
||||||
compressed(plain),
|
|
||||||
marks(marks_path, DBMS_DEFAULT_BUFFER_SIZE, O_APPEND | O_CREAT | O_WRONLY) {}
|
|
||||||
|
|
||||||
WriteBufferFromFile plain;
|
|
||||||
CompressedWriteBuffer compressed;
|
|
||||||
WriteBufferFromFile marks;
|
|
||||||
};
|
|
||||||
|
|
||||||
typedef std::map<std::string, SharedPtr<Stream> > FileStreams;
|
|
||||||
FileStreams streams;
|
|
||||||
|
|
||||||
void writePart(Block & block, UInt16 min_date, UInt16 max_date)
|
void writePart(Block & block, UInt16 min_date, UInt16 max_date)
|
||||||
{
|
{
|
||||||
|
size_t rows = block.rows();
|
||||||
size_t columns = block.columns();
|
size_t columns = block.columns();
|
||||||
UInt64 part_id = storage.increment.get(true);
|
UInt64 part_id = storage.increment.get(true);
|
||||||
|
|
||||||
String part_tmp_path = storage.full_path
|
String part_tmp_path = storage.full_path
|
||||||
+ "tmp_"
|
+ "tmp_"
|
||||||
+ storage.getPartName(
|
+ storage.getPartName(
|
||||||
min_date, max_date,
|
Yandex::DayNum_t(min_date), Yandex::DayNum_t(max_date),
|
||||||
|
part_id, part_id, 0)
|
||||||
|
+ "/";
|
||||||
|
|
||||||
|
String part_res_path = storage.full_path
|
||||||
|
+ storage.getPartName(
|
||||||
|
Yandex::DayNum_t(min_date), Yandex::DayNum_t(max_date),
|
||||||
part_id, part_id, 0)
|
part_id, part_id, 0)
|
||||||
+ "/";
|
+ "/";
|
||||||
|
|
||||||
@ -130,27 +124,59 @@ private:
|
|||||||
/// Сортируем.
|
/// Сортируем.
|
||||||
sortBlock(block, storage.sort_descr);
|
sortBlock(block, storage.sort_descr);
|
||||||
|
|
||||||
/// Теперь удаляем лишние (вычисленные только для сортировки) столбцы.
|
|
||||||
while (block.columns() != columns)
|
|
||||||
block.erase(columns);
|
|
||||||
|
|
||||||
/// Наконец-то можно писать данные на диск.
|
/// Наконец-то можно писать данные на диск.
|
||||||
|
int flags = O_EXCL | O_CREAT | O_WRONLY;
|
||||||
|
|
||||||
|
/// Сначала пишем индекс. Индекс содержит значение PK для каждой index_granularity строки.
|
||||||
|
{
|
||||||
|
WriteBufferFromFile index(part_tmp_path + "primary.idx", DBMS_DEFAULT_BUFFER_SIZE, flags);
|
||||||
|
|
||||||
|
typedef std::vector<const ColumnWithNameAndType *> PrimaryColumns;
|
||||||
|
PrimaryColumns primary_columns;
|
||||||
|
|
||||||
|
for (size_t i = 0, size = storage.sort_descr.size(); i < size; ++i)
|
||||||
|
primary_columns.push_back(
|
||||||
|
!storage.sort_descr[i].column_name.empty()
|
||||||
|
? &block.getByName(storage.sort_descr[i].column_name)
|
||||||
|
: &block.getByPosition(storage.sort_descr[i].column_number));
|
||||||
|
|
||||||
|
for (size_t i = 0; i < rows; i += storage.index_granularity)
|
||||||
|
for (PrimaryColumns::const_iterator it = primary_columns.begin(); it != primary_columns.end(); ++it)
|
||||||
|
(*it)->type->serializeBinary((*(*it)->column)[i], index);
|
||||||
|
}
|
||||||
|
|
||||||
for (size_t i = 0; i < columns; ++i)
|
for (size_t i = 0; i < columns; ++i)
|
||||||
{
|
{
|
||||||
const ColumnWithNameAndType & column = block.getByPosition(i);
|
const ColumnWithNameAndType & column = block.getByPosition(i);
|
||||||
|
String escaped_column_name = escapeForFileName(column.name);
|
||||||
|
|
||||||
Mark mark;
|
WriteBufferFromFile plain(part_tmp_path + escaped_column_name + ".bin", DBMS_DEFAULT_BUFFER_SIZE, flags);
|
||||||
mark.rows = (storage.files[column.name].marks.empty() ? 0 : storage.files[column.name].marks.back().rows) + column.column->size();
|
WriteBufferFromFile marks(part_tmp_path + escaped_column_name + ".mrk", DBMS_DEFAULT_BUFFER_SIZE, flags);
|
||||||
mark.offset = streams[column.name]->plain.count();
|
CompressedWriteBuffer compressed(plain);
|
||||||
|
|
||||||
writeIntBinary(mark.rows, streams[column.name]->marks);
|
size_t prev_mark = 0;
|
||||||
writeIntBinary(mark.offset, streams[column.name]->marks);
|
column.type->serializeBinary(*column.column, compressed,
|
||||||
|
boost::bind(&MergeTreeBlockOutputStream::writeCallback, this,
|
||||||
storage.files[column.name].marks.push_back(mark);
|
boost::ref(prev_mark), boost::ref(plain), boost::ref(compressed), boost::ref(marks)));
|
||||||
|
|
||||||
column.type->serializeBinary(*column.column, streams[column.name]->compressed);
|
|
||||||
streams[column.name]->compressed.next();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Переименовываем кусок.
|
||||||
|
Poco::File(part_tmp_path).renameTo(part_res_path);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Вызывается каждые index_granularity строк и пишет в файл с засечками (.mrk).
|
||||||
|
size_t writeCallback(size_t & prev_mark,
|
||||||
|
WriteBufferFromFile & plain,
|
||||||
|
CompressedWriteBuffer & compressed,
|
||||||
|
WriteBufferFromFile & marks)
|
||||||
|
{
|
||||||
|
/// Каждая засечка - это: (смещение в файле до начала сжатого блока, смещение внутри блока)
|
||||||
|
|
||||||
|
writeIntBinary(plain.count(), marks);
|
||||||
|
writeIntBinary(compressed.offset(), marks);
|
||||||
|
|
||||||
|
prev_mark += storage.index_granularity;
|
||||||
|
return prev_mark;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user