Merging [#METR-19266].

This commit is contained in:
Alexey Milovidov 2016-12-10 09:10:29 +03:00
parent f4815364f6
commit c937d75a78
6 changed files with 32 additions and 26 deletions

View File

@ -58,6 +58,7 @@ public:
private:
class Stream
{
public:
Stream(
const String & path_prefix_, const String & extension_,
UncompressedCache * uncompressed_cache,
@ -71,6 +72,8 @@ private:
void seekToMark(size_t index);
bool isEmpty() const { return is_empty; }
ReadBuffer * data_buffer;
private:

View File

@ -57,17 +57,17 @@ protected:
void sync();
void addToChecksums(MergeTreeData::DataPart::Checksums & checksums, String name = "");
void addToChecksums(MergeTreeData::DataPart::Checksums & checksums);
};
using ColumnStreams = std::map<String, std::unique_ptr<ColumnStream>>;
void addStream(const String & path, const String & name, const IDataType & type, size_t estimated_size = 0,
size_t level = 0, const String & filename = "", bool skip_offsets = false);
void addStream(const String & path, const String & name, const IDataType & type, size_t estimated_size,
size_t level, const String & filename, bool skip_offsets);
/// Записать данные одного столбца.
void writeData(const String & name, const IDataType & type, const IColumn & column, OffsetColumns & offset_columns,
size_t level = 0, bool skip_offsets = false);
size_t level, bool skip_offsets);
MergeTreeData & storage;
@ -158,7 +158,7 @@ class MergedColumnOnlyOutputStream : public IMergedBlockOutputStream
{
public:
MergedColumnOnlyOutputStream(
MergeTreeData & storage_, String part_path_, bool sync_, CompressionMethod compression_method, bool skip_offsets_ = false);
MergeTreeData & storage_, String part_path_, bool sync_, CompressionMethod compression_method, bool skip_offsets_);
void write(const Block & block) override;
void writeSuffix() override;

View File

@ -893,7 +893,7 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(
false, nullptr, "", false, 0, DBMS_DEFAULT_BUFFER_SIZE, false);
ExpressionBlockInputStream in(part_in, expression);
MergedColumnOnlyOutputStream out(*this, full_path + part->name + '/', true, CompressionMethod::LZ4);
MergedColumnOnlyOutputStream out(*this, full_path + part->name + '/', true, CompressionMethod::LZ4, false);
in.readPrefix();
out.writePrefix();

View File

@ -17,6 +17,8 @@
#include <DB/DataStreams/MaterializingBlockInputStream.h>
#include <DB/DataStreams/ConcatBlockInputStream.h>
#include <DB/DataStreams/ColumnGathererStream.h>
#include <DB/DataTypes/DataTypeNested.h>
#include <DB/DataTypes/DataTypeArray.h>
#include <DB/Storages/MergeTree/BackgroundProcessingPool.h>
#include <DB/Common/Increment.h>
#include <DB/Common/interpolate.h>

View File

@ -177,7 +177,8 @@ void MergeTreeReader::fillMissingColumnsAndReorder(Block & res, const Names & or
MergeTreeReader::Stream::Stream(
const String & path_prefix_, UncompressedCache * uncompressed_cache,
const String & path_prefix_, const String & extension_,
UncompressedCache * uncompressed_cache,
MarkCache * mark_cache, bool save_marks_in_cache,
const MarkRanges & all_mark_ranges, size_t aio_threshold, size_t max_read_buffer_size,
const ReadBufferFromFileBase::ProfileCallback & profile_callback, clockid_t clock_type)
@ -265,7 +266,7 @@ MergeTreeReader::Stream::Stream(
std::unique_ptr<MergeTreeReader::Stream> MergeTreeReader::Stream::createEmptyPtr()
{
std::unique_ptr<Stream> res = std::make_unique<Stream>();
std::unique_ptr<Stream> res(new Stream);
res->is_empty = true;
return res;
}
@ -477,7 +478,7 @@ void MergeTreeReader::readData(const String & name, const IDataType & type, ICol
Stream & stream = *streams[name];
/// It means that data column of array column will be empty, and it will be replaced by const data column
if (stream.is_empty)
if (stream.isEmpty())
return;
double & avg_value_size_hint = avg_value_size_hints[name];

View File

@ -71,7 +71,7 @@ void IMergedBlockOutputStream::addStream(
aio_threshold);
/// Then create the stream that handles the data of the given column.
addStream(path, name, nested_type, estimated_size, level, filename);
addStream(path, name, nested_type, estimated_size, level, filename, false);
}
else if (const DataTypeArray * type_arr = typeid_cast<const DataTypeArray *>(&type))
{
@ -93,7 +93,7 @@ void IMergedBlockOutputStream::addStream(
aio_threshold);
}
addStream(path, name, *type_arr->getNestedType(), estimated_size, level + 1);
addStream(path, name, *type_arr->getNestedType(), estimated_size, level + 1, "", false);
}
else
{
@ -114,9 +114,10 @@ void IMergedBlockOutputStream::writeData(
const IDataType & type,
const IColumn & column,
OffsetColumns & offset_columns,
size_t level)
size_t level,
bool skip_offsets)
{
writeDataImpl(name, type, column, offset_columns, level, false);
writeDataImpl(name, type, column, offset_columns, level, false, skip_offsets);
}
@ -178,7 +179,7 @@ void IMergedBlockOutputStream::writeDataImpl(
}
/// Then write data.
writeDataImpl(name, nested_type, nested_col, offset_columns, level, write_array_data);
writeDataImpl(name, nested_type, nested_col, offset_columns, level, write_array_data, false);
}
else if (!write_array_data && ((type_arr = typeid_cast<const DataTypeArray *>(&type)) != nullptr))
{
@ -224,9 +225,9 @@ void IMergedBlockOutputStream::writeDataImpl(
if (type_arr->getNestedType()->isNullable())
writeDataImpl(name, *type_arr->getNestedType(),
typeid_cast<const ColumnArray &>(column).getData(), offset_columns,
level + 1, true);
level + 1, true, false);
else
writeDataImpl(name, type, column, offset_columns, level + 1, true);
writeDataImpl(name, type, column, offset_columns, level + 1, true, false);
}
else
{
@ -297,10 +298,9 @@ void IMergedBlockOutputStream::ColumnStream::sync()
marks_file.sync();
}
void IMergedBlockOutputStream::ColumnStream::addToChecksums(MergeTreeData::DataPart::Checksums & checksums, String name)
void IMergedBlockOutputStream::ColumnStream::addToChecksums(MergeTreeData::DataPart::Checksums & checksums)
{
if (name == "")
name = escaped_column_name;
String name = escaped_column_name;
checksums.files[name + data_file_extension].is_compressed = true;
checksums.files[name + data_file_extension].uncompressed_size = compressed.count();
@ -328,7 +328,7 @@ MergedBlockOutputStream::MergedBlockOutputStream(
{
init();
for (const auto & it : columns_list)
addStream(part_path, it.name, *it.type);
addStream(part_path, it.name, *it.type, 0, 0, "", false);
}
MergedBlockOutputStream::MergedBlockOutputStream(
@ -354,7 +354,7 @@ MergedBlockOutputStream::MergedBlockOutputStream(
if (it2 != merged_column_to_size_.end())
estimated_size = it2->second;
}
addStream(part_path, it.name, *it.type, estimated_size);
addStream(part_path, it.name, *it.type, estimated_size, 0, "", false);
}
}
@ -509,18 +509,18 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
auto primary_column_it = primary_columns_name_to_position.find(it.name);
if (primary_columns_name_to_position.end() != primary_column_it)
{
writeData(column.name, *column.type, *primary_columns[primary_column_it->second].column, offset_columns);
writeData(column.name, *column.type, *primary_columns[primary_column_it->second].column, offset_columns, 0, false);
}
else
{
/// Столбцы, не входящие в первичный ключ, переупорядочиваем здесь; затем результат освобождается - для экономии оперативки.
ColumnPtr permutted_column = column.column->permute(*permutation, 0);
writeData(column.name, *column.type, *permutted_column, offset_columns);
writeData(column.name, *column.type, *permutted_column, offset_columns, 0, false);
}
}
else
{
writeData(column.name, *column.type, *column.column, offset_columns);
writeData(column.name, *column.type, *column.column, offset_columns, 0, false);
}
}
@ -575,7 +575,7 @@ void MergedColumnOnlyOutputStream::write(const Block & block)
for (size_t i = 0; i < block.columns(); ++i)
{
addStream(part_path, block.getByPosition(i).name,
*block.getByPosition(i).type, 0, 0, block.getByPosition(i).name);
*block.getByPosition(i).type, 0, 0, block.getByPosition(i).name, false);
}
initialized = true;
}
@ -586,7 +586,7 @@ void MergedColumnOnlyOutputStream::write(const Block & block)
for (size_t i = 0; i < block.columns(); ++i)
{
const ColumnWithTypeAndName & column = block.getByPosition(i);
writeData(column.name, *column.type, *column.column, offset_columns);
writeData(column.name, *column.type, *column.column, offset_columns, 0, false);
}
size_t written_for_last_mark = (storage.index_granularity - index_offset + rows) % storage.index_granularity;