dbms: adding support for nulls in the Log engine [#METR-19266]

This commit is contained in:
Alexey Arno 2016-07-12 21:08:16 +03:00
parent 3e479c375c
commit 6d753f05e6
2 changed files with 228 additions and 26 deletions

View File

@ -126,10 +126,13 @@ protected:
/// Можно вызывать при любом состоянии rwlock.
size_t marksCount();
size_t nullMarksCount();
BlockInputStreams read(
size_t from_mark,
size_t to_mark,
size_t from_null_mark,
size_t to_null_mark,
const Names & column_names,
ASTPtr query,
const Context & context,
@ -140,15 +143,21 @@ protected:
private:
Files_t files; /// name -> data
Files_t null_files;
Names column_names; /// column_index -> name
Poco::File marks_file;
Poco::File null_marks_file;
void loadMarksImpl(Files_t & files_descs, Poco::File & marks_file_handle);
/// Порядок добавления файлов не должен меняться: он соответствует порядку столбцов в файле с засечками.
void addFile(const String & column_name, const IDataType & type, size_t level = 0);
void addNullFile(const String & colun_name);
bool loaded_marks;
bool has_nullable_columns = false;
size_t max_compress_block_size;
@ -164,6 +173,8 @@ private:
* Вернуть первую попавшуюся группу засечек, в которых указано количество строчек, а не внутренностей массивов.
*/
const Marks & getMarksWithRealRowCount() const;
const Marks & getNullMarksWithRealRowCount() const;
const Marks & getMarksWithRealRowCountImpl(const Files_t & files_descs) const;
};
}

View File

@ -13,11 +13,14 @@
#include <DB/DataTypes/DataTypeArray.h>
#include <DB/DataTypes/DataTypeNested.h>
#include <DB/DataTypes/DataTypeNullable.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/DataStreams/IBlockOutputStream.h>
#include <DB/Columns/ColumnArray.h>
#include <DB/Columns/ColumnNullable.h>
#include <DB/Storages/StorageLog.h>
@ -27,6 +30,8 @@
#define DBMS_STORAGE_LOG_DATA_FILE_EXTENSION ".bin"
#define DBMS_STORAGE_LOG_MARKS_FILE_EXTENSION ".mrk"
#define DBMS_STORAGE_LOG_MARKS_FILE_NAME "__marks.mrk"
#define DBMS_STORAGE_LOG_NULL_MARKS_FILE_NAME "__null_marks.mrk"
#define DBMS_STORAGE_LOG_DATA_BINARY_NULL_MAP_EXTENSION ".null"
namespace DB
@ -48,7 +53,21 @@ public:
size_t block_size_, const Names & column_names_, StorageLog & storage_,
size_t mark_number_, size_t rows_limit_, size_t max_read_buffer_size_)
: block_size(block_size_), column_names(column_names_), storage(storage_),
mark_number(mark_number_), rows_limit(rows_limit_), current_mark(mark_number_), max_read_buffer_size(max_read_buffer_size_) {}
mark_number(mark_number_), rows_limit(rows_limit_),
current_mark(mark_number_), max_read_buffer_size(max_read_buffer_size_),
has_null_marks(false)
{
}
LogBlockInputStream(
size_t block_size_, const Names & column_names_, StorageLog & storage_,
size_t mark_number_, size_t null_mark_number_, size_t rows_limit_, size_t max_read_buffer_size_)
: block_size(block_size_), column_names(column_names_), storage(storage_),
mark_number(mark_number_), null_mark_number(null_mark_number_), rows_limit(rows_limit_),
current_mark(mark_number_), current_null_mark(null_mark_number_),
max_read_buffer_size(max_read_buffer_size_), has_null_marks(true)
{
}
String getName() const { return "Log"; }
@ -71,10 +90,13 @@ private:
Names column_names;
StorageLog & storage;
size_t mark_number; /// С какой засечки читать данные
size_t null_mark_number;
size_t rows_limit; /// Максимальное количество строк, которых можно прочитать
size_t rows_read = 0;
size_t current_mark;
size_t current_null_mark;
size_t max_read_buffer_size;
bool has_null_marks;
struct Stream
{
@ -92,8 +114,10 @@ private:
using FileStreams = std::map<std::string, std::unique_ptr<Stream> >;
FileStreams streams;
FileStreams null_streams;
void addStream(const String & name, const IDataType & type, size_t level = 0);
void addNullStream(const String & name);
void readData(const String & name, const IDataType & type, IColumn & column, size_t max_rows_to_read, size_t level = 0, bool read_offsets = true);
};
@ -103,7 +127,9 @@ class LogBlockOutputStream : public IBlockOutputStream
public:
LogBlockOutputStream(StorageLog & storage_)
: storage(storage_),
lock(storage.rwlock), marks_stream(storage.marks_file.path(), 4096, O_APPEND | O_CREAT | O_WRONLY)
lock(storage.rwlock),
marks_stream(storage.marks_file.path(), 4096, O_APPEND | O_CREAT | O_WRONLY),
null_marks_stream(storage.null_marks_file.path(), 4096, O_APPEND | O_CREAT | O_WRONLY)
{
for (const auto & column : storage.getColumnsList())
addStream(column.name, *column.type);
@ -154,14 +180,19 @@ private:
using FileStreams = std::map<std::string, std::unique_ptr<Stream> >;
FileStreams streams;
FileStreams null_streams;
using OffsetColumns = std::set<std::string>;
WriteBufferFromFile marks_stream; /// Объявлен ниже lock, чтобы файл открывался при захваченном rwlock.
WriteBufferFromFile null_marks_stream;
void addStream(const String & name, const IDataType & type, size_t level = 0);
void addNullStream(const String & name);
void writeData(const String & name, const IDataType & type, const IColumn & column, MarksForColumns & out_marks, OffsetColumns & offset_columns, size_t level = 0);
void writeMarks(MarksForColumns marks);
void writeNullMarks(MarksForColumns marks);
void writeMarksImpl(MarksForColumns marks, StorageLog::Files_t & files_descs);
};
@ -190,6 +221,9 @@ Block LogBlockInputStream::readImpl()
/// Сколько строк читать для следующего блока.
size_t max_rows_to_read = std::min(block_size, rows_limit - rows_read);
const Marks & marks = storage.getMarksWithRealRowCount();
const Marks * null_marks = nullptr;
if (has_null_marks)
null_marks = &storage.getNullMarksWithRealRowCount();
std::pair<String, size_t> current_table;
/// Отдельно обрабатываем виртуальный столбец
@ -201,6 +235,12 @@ Block LogBlockInputStream::readImpl()
while (current_mark < marks.size() && marks[current_mark].rows <= current_row)
++current_mark;
if (has_null_marks)
{
while (current_null_mark < null_marks->size() && (*null_marks)[current_null_mark].rows <= current_row)
++current_null_mark;
}
current_table = storage.getTableFromMark(current_mark);
current_table.second = std::min(current_table.second, marks.size() - 1);
max_rows_to_read = std::min(max_rows_to_read, marks[current_table.second].rows - current_row);
@ -283,9 +323,16 @@ Block LogBlockInputStream::readImpl()
void LogBlockInputStream::addStream(const String & name, const IDataType & type, size_t level)
{
/// Для массивов используются отдельные потоки для размеров.
if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
if (type.isNullable())
{
const DataTypeNullable & nullable_type = static_cast<const DataTypeNullable &>(type);
const IDataType & nested_type = *(nullable_type.getNestedType().get());
addNullStream(name);
addStream(name, nested_type, level);
}
else if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
{
/// Для массивов используются отдельные потоки для размеров.
String size_name = DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
if (!streams.count(size_name))
streams.emplace(size_name, std::unique_ptr<Stream>(new Stream(
@ -307,12 +354,34 @@ void LogBlockInputStream::addStream(const String & name, const IDataType & type,
}
void LogBlockInputStream::addNullStream(const String & name)
{
null_streams[name].reset(new Stream{
storage.null_files[name].data_file.path(),
null_mark_number
? storage.null_files[name].marks[null_mark_number].offset
: 0,
max_read_buffer_size});
}
void LogBlockInputStream::readData(const String & name, const IDataType & type, IColumn & column, size_t max_rows_to_read,
size_t level, bool read_offsets)
{
/// Для массивов требуется сначала десериализовать размеры, а потом значения.
if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
if (type.isNullable())
{
const DataTypeNullable & nullable_type = static_cast<const DataTypeNullable &>(type);
const IDataType & nested_type = *(nullable_type.getNestedType().get());
ColumnNullable & nullable_col = static_cast<ColumnNullable &>(column);
IColumn & nested_col = *(nullable_col.getNestedColumn().get());
DataTypeUInt8{}.deserializeBinary(*(nullable_col.getNullValuesByteMap().get()), null_streams[name]->compressed, max_rows_to_read, 0);
readData(name, nested_type, nested_col, max_rows_to_read, level, read_offsets);
}
else if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
{
/// Для массивов требуется сначала десериализовать размеры, а потом значения.
if (read_offsets)
{
type_arr->deserializeOffsets(
@ -377,9 +446,16 @@ void LogBlockOutputStream::writeSuffix()
void LogBlockOutputStream::addStream(const String & name, const IDataType & type, size_t level)
{
/// Для массивов используются отдельные потоки для размеров.
if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
if (type.isNullable())
{
const DataTypeNullable & nullable_type = static_cast<const DataTypeNullable &>(type);
const IDataType & nested_type = *(nullable_type.getNestedType().get());
addNullStream(name);
addStream(name, nested_type, level);
}
else if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
{
/// Для массивов используются отдельные потоки для размеров.
String size_name = DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
if (!streams.count(size_name))
streams.emplace(size_name, std::unique_ptr<Stream>(new Stream(
@ -392,6 +468,12 @@ void LogBlockOutputStream::addStream(const String & name, const IDataType & type
}
void LogBlockOutputStream::addNullStream(const String & name)
{
null_streams[name].reset(new Stream{storage.null_files[name].data_file.path(), storage.max_compress_block_size});
}
void LogBlockOutputStream::writeData(const String & name, const IDataType & type, const IColumn & column, MarksForColumns & out_marks,
OffsetColumns & offset_columns, size_t level)
{
@ -435,6 +517,18 @@ static bool ColumnIndexLess(const std::pair<size_t, Mark> & a, const std::pair<s
}
void LogBlockOutputStream::writeMarks(MarksForColumns marks)
{
writeMarksImpl(marks, storage.files);
}
void LogBlockOutputStream::writeNullMarks(MarksForColumns marks)
{
writeMarksImpl(marks, storage.null_files);
}
void LogBlockOutputStream::writeMarksImpl(MarksForColumns marks, StorageLog::Files_t & files_descs)
{
if (marks.size() != storage.files.size())
throw Exception("Wrong number of marks generated from block. Makes no sense.", ErrorCodes::LOGICAL_ERROR);
@ -451,7 +545,7 @@ void LogBlockOutputStream::writeMarks(MarksForColumns marks)
writeIntBinary(mark.rows, marks_stream);
writeIntBinary(mark.offset, marks_stream);
storage.files[storage.column_names[i]].marks.push_back(mark);
files_descs[storage.column_names[i]].marks.push_back(mark);
}
}
@ -517,7 +611,17 @@ void StorageLog::addFile(const String & column_name, const IDataType & type, siz
throw Exception("Duplicate column with name " + column_name + " in constructor of StorageLog.",
ErrorCodes::DUPLICATE_COLUMN);
if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
if (type.isNullable())
{
has_nullable_columns = true;
const DataTypeNullable & nullable_type = static_cast<const DataTypeNullable &>(type);
const IDataType & actual_type = *(nullable_type.getNestedType().get());
addNullFile(column_name);
addFile(column_name, actual_type, level);
}
else if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
{
String size_column_suffix = ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
String size_name = DataTypeNested::extractNestedTableName(column_name) + size_column_suffix;
@ -545,6 +649,14 @@ void StorageLog::addFile(const String & column_name, const IDataType & type, siz
}
}
void StorageLog::addNullFile(const String & column_name)
{
ColumnData & column_data = null_files.emplace(column_name, ColumnData{}).first->second;
column_data.column_index = column_names.size();
column_data.data_file = Poco::File{
path + escapeForFileName(name) + '/' + escapeForFileName(column_name) + DBMS_STORAGE_LOG_DATA_BINARY_NULL_MAP_EXTENSION};
}
void StorageLog::loadMarks()
{
@ -553,27 +665,36 @@ void StorageLog::loadMarks()
if (loaded_marks)
return;
loadMarksImpl(files, marks_file);
if (has_nullable_columns)
loadMarksImpl(null_files, null_marks_file);
loaded_marks = true;
}
void StorageLog::loadMarksImpl(Files_t & files_descs, Poco::File & marks_file_handle)
{
using FilesByIndex = std::vector<Files_t::iterator>;
FilesByIndex files_by_index(files.size());
for (Files_t::iterator it = files.begin(); it != files.end(); ++it)
FilesByIndex files_by_index(files_descs.size());
for (Files_t::iterator it = files_descs.begin(); it != files_descs.end(); ++it)
{
files_by_index[it->second.column_index] = it;
}
if (marks_file.exists())
if (marks_file_handle.exists())
{
size_t file_size = marks_file.getSize();
if (file_size % (files.size() * sizeof(Mark)) != 0)
size_t file_size = marks_file_handle.getSize();
if (file_size % (files_descs.size() * sizeof(Mark)) != 0)
throw Exception("Size of marks file is inconsistent", ErrorCodes::SIZES_OF_MARKS_FILES_ARE_INCONSISTENT);
int marks_count = file_size / (files.size() * sizeof(Mark));
int marks_count = file_size / (files_descs.size() * sizeof(Mark));
for (size_t i = 0; i < files_by_index.size(); ++i)
{
files_by_index[i]->second.marks.reserve(marks_count);
}
ReadBufferFromFile marks_rb(marks_file.path(), 32768);
ReadBufferFromFile marks_rb(marks_file_handle.path(), 32768);
while (!marks_rb.eof())
{
for (size_t i = 0; i < files_by_index.size(); ++i)
@ -585,16 +706,17 @@ void StorageLog::loadMarks()
}
}
}
loaded_marks = true;
}
size_t StorageLog::marksCount()
{
return files.begin()->second.marks.size();
}
size_t StorageLog::nullMarksCount()
{
return null_files.begin()->second.marks.size();
}
void StorageLog::rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name)
{
@ -618,8 +740,29 @@ void StorageLog::rename(const String & new_path_to_db, const String & new_databa
const Marks & StorageLog::getMarksWithRealRowCount() const
{
return getMarksWithRealRowCountImpl(files);
}
const Marks & StorageLog::getNullMarksWithRealRowCount() const
{
return getMarksWithRealRowCountImpl(null_files);
}
const Marks & StorageLog::getMarksWithRealRowCountImpl(const Files_t & files_descs) const
{
auto init_column_type = [&]()
{
const IDataType * type = columns->front().type.get();
if (type->isNullable())
{
const DataTypeNullable & nullable_type = static_cast<const DataTypeNullable &>(*type);
type = nullable_type.getNestedType().get();
}
return type;
};
const String & column_name = columns->front().name;
const IDataType & column_type = *columns->front().type;
const IDataType & column_type = *init_column_type();
String file_name;
/** Засечки достаём из первого столбца.
@ -635,8 +778,8 @@ const Marks & StorageLog::getMarksWithRealRowCount() const
file_name = column_name;
}
Files_t::const_iterator it = files.find(file_name);
if (files.end() == it)
Files_t::const_iterator it = files_descs.find(file_name);
if (files_descs.end() == it)
throw Exception("Cannot find file " + file_name, ErrorCodes::LOGICAL_ERROR);
return it->second.marks;
@ -646,6 +789,8 @@ const Marks & StorageLog::getMarksWithRealRowCount() const
BlockInputStreams StorageLog::read(
size_t from_mark,
size_t to_mark,
size_t from_null_mark,
size_t to_null_mark,
const Names & column_names,
ASTPtr query,
const Context & context,
@ -688,6 +833,49 @@ BlockInputStreams StorageLog::read(
0, std::numeric_limits<size_t>::max(),
settings.max_read_buffer_size));
}
else if (has_nullable_columns)
{
const Marks & marks = getMarksWithRealRowCount();
size_t marks_size = marks.size();
const Marks & null_marks = getNullMarksWithRealRowCount();
size_t null_marks_size = null_marks.size();
if ((to_mark == std::numeric_limits<size_t>::max()) && (to_null_mark == std::numeric_limits<size_t>::max()))
{
to_mark = marks_size;
to_null_mark = null_marks_size;
}
if (to_mark > marks_size || to_mark < from_mark)
throw Exception("Marks out of range in StorageLog::read", ErrorCodes::LOGICAL_ERROR);
if (threads > to_mark - from_mark)
threads = to_mark - from_mark;
for (size_t thread = 0; thread < threads; ++thread)
{
size_t mark_number = from_mark + thread * (to_mark - from_mark) / threads;
size_t correction = ((thread == 0 && from_mark == 0)
? 0
: marks[from_mark + thread * (to_mark - from_mark) / threads - 1].rows);
size_t rows_limit = marks[from_mark + (thread + 1) * (to_mark - from_mark) / threads - 1].rows - correction;
/// We must have the same number of marks and of null marks.
size_t null_mark_number = from_null_mark + (mark_number - from_mark);
res.push_back(std::make_shared<LogBlockInputStream>(
max_block_size,
column_names,
*this,
mark_number,
null_mark_number,
rows_limit,
settings.max_read_buffer_size));
}
}
else
{
const Marks & marks = getMarksWithRealRowCount();
@ -730,9 +918,12 @@ BlockInputStreams StorageLog::read(
const size_t max_block_size,
const unsigned threads)
{
return read(0, std::numeric_limits<size_t>::max(), column_names,
query, context, settings, processed_stage,
max_block_size, threads);
return read(
0, std::numeric_limits<size_t>::max(),
0, std::numeric_limits<size_t>::max(),
column_names,
query, context, settings, processed_stage,
max_block_size, threads);
}