dbms: Server: Feature development. [#METR-19266]

This commit is contained in:
Alexey Arno 2016-07-06 18:15:15 +03:00
parent f4e9c16fc0
commit 93ac3ee3a8
4 changed files with 111 additions and 44 deletions

View File

@ -75,6 +75,7 @@ private:
size_t max_compress_block_size;
Files_t files;
Files_t null_files;
FileChecker file_checker;
@ -91,6 +92,7 @@ private:
size_t max_compress_block_size_);
void addFile(const String & column_name, const IDataType & type, size_t level = 0);
void addNullFile(const String & column_name);
};
}

View File

@ -9,6 +9,7 @@
#include <DB/Columns/ColumnsNumber.h>
#include <DB/DataTypes/DataTypeArray.h>
#include <DB/DataTypes/DataTypeNullable.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataTypes/DataTypeFactory.h>
#include <DB/DataStreams/NativeBlockInputStream.h>
@ -24,21 +25,6 @@ namespace ErrorCodes
extern const int CANNOT_READ_ALL_DATA;
}
namespace
{
void deserializeNullValuesByteMap(ColumnNullable & nullable_col, ReadBuffer & istr, size_t limit)
{
ColumnUInt8 & null_map = static_cast<ColumnUInt8 &>(*(nullable_col.getNullValuesByteMap().get()));
auto & x = null_map.getData();
x.resize(limit);
size_t read_count = istr.readBig(reinterpret_cast<char *>(&x[0]), limit);
x.resize(read_count);
}
}
NativeBlockInputStream::NativeBlockInputStream(
ReadBuffer & istr_, UInt64 server_revision_,
bool use_index_,
@ -68,7 +54,9 @@ void NativeBlockInputStream::readData(const IDataType & type, IColumn & column,
ColumnNullable & nullable_col = static_cast<ColumnNullable &>(column);
IColumn & nested_col = *(nullable_col.getNestedColumn().get());
deserializeNullValuesByteMap(nullable_col, istr, rows);
ColumnUInt8 & null_map = static_cast<ColumnUInt8 &>(*(nullable_col.getNullValuesByteMap().get()));
DataTypeUInt8{}.deserializeBinary(null_map, istr, rows, 0);
readData(nested_type, nested_col, istr, rows);
return;

View File

@ -11,6 +11,7 @@
#include <DB/DataTypes/DataTypeArray.h>
#include <DB/DataTypes/DataTypeNullable.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataStreams/MarkInCompressedFile.h>
#include <DB/DataStreams/NativeBlockOutputStream.h>
@ -19,24 +20,6 @@
namespace DB
{
namespace
{
void serializeNullValuesByteMap(const ColumnNullable & nullable_col, WriteBuffer & ostr, size_t offset, size_t limit)
{
const IColumn & nested_col = *(nullable_col.getNestedColumn().get());
const ColumnUInt8 & content = static_cast<const ColumnUInt8 &>(*(nullable_col.getNullValuesByteMap().get()));
const auto & x = content.getData();
size_t size = nested_col.size();
if ((limit == 0) || (offset + limit) > size)
limit = size - offset;
ostr.write(reinterpret_cast<const char *>(&x[offset]), limit);
}
}
NativeBlockOutputStream::NativeBlockOutputStream(
WriteBuffer & ostr_, UInt64 client_revision_,
WriteBuffer * index_ostr_, size_t initial_size_of_file_)
@ -72,7 +55,9 @@ void NativeBlockOutputStream::writeData(const IDataType & type, const ColumnPtr
const ColumnNullable & nullable_col = static_cast<const ColumnNullable &>(*full_column.get());
const ColumnPtr & nested_col = nullable_col.getNestedColumn();
serializeNullValuesByteMap(nullable_col, ostr, offset, limit);
const ColumnUInt8 & content = static_cast<const ColumnUInt8 &>(*(nullable_col.getNullValuesByteMap().get()));
DataTypeUInt8{}.serializeBinary(content, ostr, offset, limit);
writeData(nested_type, nested_col, ostr, offset, limit);
}
else if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))

View File

@ -16,16 +16,20 @@
#include <DB/DataTypes/DataTypeArray.h>
#include <DB/DataTypes/DataTypeNested.h>
#include <DB/DataTypes/DataTypeNullable.h>
#include <DB/DataTypes/DataTypesNumberFixed.h>
#include <DB/DataStreams/IProfilingBlockInputStream.h>
#include <DB/DataStreams/IBlockOutputStream.h>
#include <DB/Columns/ColumnArray.h>
#include <DB/Columns/ColumnNullable.h>
#include <DB/Storages/StorageTinyLog.h>
#include <Poco/DirectoryIterator.h>
#define DBMS_STORAGE_LOG_DATA_FILE_EXTENSION ".bin"
#define DBMS_STORAGE_LOG_DATA_BINARY_NULL_MAP_EXTENSION ".null"
namespace DB
@ -73,8 +77,10 @@ private:
using FileStreams = std::map<std::string, std::unique_ptr<Stream> >;
FileStreams streams;
FileStreams null_streams;
void addStream(const String & name, const IDataType & type, size_t level = 0);
void addNullStream(const String & name);
void readData(const String & name, const IDataType & type, IColumn & column, size_t limit, size_t level = 0, bool read_offsets = true);
};
@ -128,10 +134,12 @@ private:
using FileStreams = std::map<std::string, std::unique_ptr<Stream> >;
FileStreams streams;
FileStreams null_streams;
using OffsetColumns = std::set<std::string>;
void addStream(const String & name, const IDataType & type, size_t level = 0);
void addNullStream(const String & name);
void writeData(const String & name, const IDataType & type, const IColumn & column, OffsetColumns & offset_columns, size_t level = 0);
};
@ -161,6 +169,7 @@ Block TinyLogBlockInputStream::readImpl()
*/
finished = true;
streams.clear();
null_streams.clear();
return res;
}
@ -220,6 +229,7 @@ Block TinyLogBlockInputStream::readImpl()
{
finished = true;
streams.clear();
null_streams.clear();
}
return res;
@ -228,9 +238,16 @@ Block TinyLogBlockInputStream::readImpl()
void TinyLogBlockInputStream::addStream(const String & name, const IDataType & type, size_t level)
{
/// Для массивов используются отдельные потоки для размеров.
if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
if (type.isNullable())
{
const DataTypeNullable & nullable_type = static_cast<const DataTypeNullable &>(type);
const IDataType & nested_type = *(nullable_type.getNestedType().get());
addNullStream(name);
addStream(name, nested_type, level);
}
else if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
{
/// Для массивов используются отдельные потоки для размеров.
String size_name = DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
if (!streams.count(size_name))
streams.emplace(size_name, std::unique_ptr<Stream>(new Stream(storage.files[size_name].data_file.path(), max_read_buffer_size)));
@ -242,11 +259,31 @@ void TinyLogBlockInputStream::addStream(const String & name, const IDataType & t
}
void TinyLogBlockInputStream::addNullStream(const String & name)
{
null_streams[name].reset(new Stream{storage.null_files[name].data_file.path(), max_read_buffer_size});
}
void TinyLogBlockInputStream::readData(const String & name, const IDataType & type, IColumn & column, size_t limit, size_t level, bool read_offsets)
{
/// Для массивов требуется сначала десериализовать размеры, а потом значения.
if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
if (type.isNullable())
{
const DataTypeNullable & nullable_type = static_cast<const DataTypeNullable &>(type);
const IDataType & nested_type = *(nullable_type.getNestedType().get());
if (!column.isNullable())
throw Exception{"Column not nullable!!!", ErrorCodes::LOGICAL_ERROR};
ColumnNullable & nullable_col = static_cast<ColumnNullable &>(column);
IColumn & nested_col = *(nullable_col.getNestedColumn().get());
DataTypeUInt8{}.deserializeBinary(*(nullable_col.getNullValuesByteMap().get()), null_streams[name]->compressed, limit, 0);
readData(name, nested_type, nested_col, limit, level, read_offsets);
}
else if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
{
/// Для массивов требуется сначала десериализовать размеры, а потом значения.
if (read_offsets)
{
type_arr->deserializeOffsets(
@ -272,9 +309,16 @@ void TinyLogBlockInputStream::readData(const String & name, const IDataType & ty
void TinyLogBlockOutputStream::addStream(const String & name, const IDataType & type, size_t level)
{
/// Для массивов используются отдельные потоки для размеров.
if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
if (type.isNullable())
{
const DataTypeNullable & nullable_type = static_cast<const DataTypeNullable &>(type);
const IDataType & nested_type = *(nullable_type.getNestedType().get());
addNullStream(name);
addStream(name, nested_type, level);
}
else if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
{
/// Для массивов используются отдельные потоки для размеров.
String size_name = DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
if (!streams.count(size_name))
streams.emplace(size_name, std::unique_ptr<Stream>(new Stream(storage.files[size_name].data_file.path(), storage.max_compress_block_size)));
@ -286,12 +330,29 @@ void TinyLogBlockOutputStream::addStream(const String & name, const IDataType &
}
void TinyLogBlockOutputStream::addNullStream(const String & name)
{
null_streams[name].reset(new Stream{storage.null_files[name].data_file.path(), storage.max_compress_block_size});
}
void TinyLogBlockOutputStream::writeData(const String & name, const IDataType & type, const IColumn & column,
OffsetColumns & offset_columns, size_t level)
{
/// Для массивов требуется сначала сериализовать размеры, а потом значения.
if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
if (type.isNullable())
{
const DataTypeNullable & nullable_type = static_cast<const DataTypeNullable &>(type);
const IDataType & nested_type = *(nullable_type.getNestedType().get());
const ColumnNullable & nullable_col = static_cast<const ColumnNullable &>(column);
const IColumn & nested_col = *(nullable_col.getNestedColumn().get());
DataTypeUInt8{}.serializeBinary(*(nullable_col.getNullValuesByteMap().get()), null_streams[name]->compressed);
writeData(name, nested_type, nested_col, offset_columns, level);
}
else if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
{
/// Для массивов требуется сначала сериализовать размеры, а потом значения.
String size_name = DataTypeNested::extractNestedTableName(name) + ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
if (offset_columns.count(size_name) == 0)
@ -319,6 +380,9 @@ void TinyLogBlockOutputStream::writeSuffix()
for (FileStreams::iterator it = streams.begin(); it != streams.end(); ++it)
it->second->finalize();
for (auto & it : null_streams)
it.second->finalize();
std::vector<Poco::File> column_files;
for (auto & pair : streams)
column_files.push_back(storage.files[pair.first].data_file);
@ -326,6 +390,7 @@ void TinyLogBlockOutputStream::writeSuffix()
storage.file_checker.update(column_files.begin(), column_files.end());
streams.clear();
null_streams.clear();
}
@ -398,7 +463,15 @@ void StorageTinyLog::addFile(const String & column_name, const IDataType & type,
throw Exception("Duplicate column with name " + column_name + " in constructor of StorageTinyLog.",
ErrorCodes::DUPLICATE_COLUMN);
if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
if (type.isNullable())
{
const DataTypeNullable & nullable_type = static_cast<const DataTypeNullable &>(type);
const IDataType & actual_type = *(nullable_type.getNestedType().get());
addNullFile(column_name);
addFile(column_name, actual_type, level);
}
else if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
{
String size_column_suffix = ARRAY_SIZES_COLUMN_NAME_SUFFIX + toString(level);
String size_name = DataTypeNested::extractNestedTableName(column_name) + size_column_suffix;
@ -423,6 +496,14 @@ void StorageTinyLog::addFile(const String & column_name, const IDataType & type,
}
void StorageTinyLog::addNullFile(const String & column_name)
{
ColumnData & column_data = null_files.emplace(column_name, ColumnData{}).first->second;
column_data.data_file = Poco::File{
path + escapeForFileName(name) + '/' + escapeForFileName(column_name) + DBMS_STORAGE_LOG_DATA_BINARY_NULL_MAP_EXTENSION};
}
void StorageTinyLog::rename(const String & new_path_to_db, const String & new_database_name, const String & new_table_name)
{
/// Переименовываем директорию с данными.
@ -434,6 +515,9 @@ void StorageTinyLog::rename(const String & new_path_to_db, const String & new_da
for (Files_t::iterator it = files.begin(); it != files.end(); ++it)
it->second.data_file = Poco::File(path + escapeForFileName(name) + '/' + Poco::Path(it->second.data_file.path()).getFileName());
for (Files_t::iterator it = null_files.begin(); it != null_files.end(); ++it)
it->second.data_file = Poco::File(path + escapeForFileName(name) + '/' + Poco::Path(it->second.data_file.path()).getFileName());
}
@ -462,8 +546,16 @@ BlockOutputStreamPtr StorageTinyLog::write(
void StorageTinyLog::drop()
{
for (Files_t::iterator it = files.begin(); it != files.end(); ++it)
{
if (it->second.data_file.exists())
it->second.data_file.remove();
}
for (Files_t::iterator it = null_files.begin(); it != null_files.end(); ++it)
{
if (it->second.data_file.exists())
it->second.data_file.remove();
}
}
bool StorageTinyLog::checkData() const