2019-10-16 18:27:53 +00:00
|
|
|
#include <Storages/MergeTree/MergeTreeDataPartWriterCompact.h>
|
|
|
|
|
|
|
|
namespace DB
|
|
|
|
{
|
|
|
|
|
2019-10-22 10:50:17 +00:00
|
|
|
namespace
|
|
|
|
{
|
|
|
|
constexpr auto DATA_FILE_NAME = "data";
|
|
|
|
constexpr auto DATA_FILE_EXTENSION = ".bin";
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact(
|
|
|
|
const String & part_path_,
|
|
|
|
const MergeTreeData & storage_,
|
|
|
|
const NamesAndTypesList & columns_list_,
|
2019-11-07 11:11:38 +00:00
|
|
|
const std::vector<MergeTreeIndexPtr> & indices_to_recalc_,
|
2019-10-22 10:50:17 +00:00
|
|
|
const String & marks_file_extension_,
|
|
|
|
const CompressionCodecPtr & default_codec_,
|
2019-11-07 11:11:38 +00:00
|
|
|
const WriterSettings & settings_,
|
|
|
|
const MergeTreeIndexGranularity & index_granularity_)
|
2019-10-22 10:50:17 +00:00
|
|
|
: IMergeTreeDataPartWriter(part_path_,
|
|
|
|
storage_, columns_list_,
|
2019-11-07 11:11:38 +00:00
|
|
|
indices_to_recalc_, marks_file_extension_,
|
2019-11-26 09:48:22 +00:00
|
|
|
default_codec_, settings_, index_granularity_, true)
|
2019-10-22 10:50:17 +00:00
|
|
|
{
|
|
|
|
stream = std::make_unique<ColumnStream>(
|
|
|
|
DATA_FILE_NAME,
|
|
|
|
part_path + DATA_FILE_NAME, DATA_FILE_EXTENSION,
|
|
|
|
part_path + DATA_FILE_NAME, marks_file_extension,
|
|
|
|
default_codec,
|
|
|
|
settings.max_compress_block_size,
|
2019-11-05 11:53:22 +00:00
|
|
|
settings.estimated_size,
|
2019-10-22 10:50:17 +00:00
|
|
|
settings.aio_threshold);
|
|
|
|
}
|
|
|
|
|
2019-11-07 11:11:38 +00:00
|
|
|
void MergeTreeDataPartWriterCompact::write(
|
2019-10-21 17:23:06 +00:00
|
|
|
const Block & block, const IColumn::Permutation * permutation,
|
2019-11-07 11:11:38 +00:00
|
|
|
const Block & primary_key_block, const Block & skip_indexes_block)
|
2019-10-16 18:27:53 +00:00
|
|
|
{
|
|
|
|
size_t total_rows = block.rows();
|
2019-11-07 11:11:38 +00:00
|
|
|
size_t from_mark = current_mark;
|
2019-10-16 18:27:53 +00:00
|
|
|
size_t current_row = 0;
|
|
|
|
|
2019-11-07 11:11:38 +00:00
|
|
|
/// Fill index granularity for this block
|
|
|
|
/// if it's unknown (in case of insert data or horizontal merge,
|
|
|
|
/// but not in case of vertical merge)
|
|
|
|
if (compute_granularity)
|
|
|
|
fillIndexGranularity(block);
|
|
|
|
|
2019-10-16 18:27:53 +00:00
|
|
|
ColumnsWithTypeAndName columns_to_write(columns_list.size());
|
|
|
|
auto it = columns_list.begin();
|
|
|
|
for (size_t i = 0; i < columns_list.size(); ++i, ++it)
|
|
|
|
{
|
|
|
|
if (permutation)
|
|
|
|
{
|
|
|
|
if (primary_key_block.has(it->name))
|
|
|
|
columns_to_write[i] = primary_key_block.getByName(it->name);
|
|
|
|
else if (skip_indexes_block.has(it->name))
|
|
|
|
columns_to_write[i] = skip_indexes_block.getByName(it->name);
|
|
|
|
else
|
|
|
|
{
|
|
|
|
columns_to_write[i] = block.getByName(it->name);
|
|
|
|
columns_to_write[i].column = columns_to_write[i].column->permute(*permutation, 0);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
else
|
|
|
|
columns_to_write[i] = block.getByName(it->name);
|
|
|
|
}
|
|
|
|
|
2019-10-22 10:50:17 +00:00
|
|
|
std::cerr << "(MergeTreeDataPartWriterCompact::write) total_rows: " << total_rows << "\n";
|
|
|
|
|
2019-10-16 18:27:53 +00:00
|
|
|
while (current_row < total_rows)
|
|
|
|
{
|
2019-10-22 10:50:17 +00:00
|
|
|
std::cerr << "(MergeTreeDataPartWriterCompact::write) current_row: " << current_row << "\n";
|
|
|
|
|
2019-10-16 18:27:53 +00:00
|
|
|
bool write_marks = true;
|
2019-10-31 14:44:17 +00:00
|
|
|
// size_t rows_to_write = std::min(total_rows, index_granularity.getMarkRows(current_mark));
|
|
|
|
size_t rows_to_write = total_rows;
|
2019-11-25 11:06:59 +00:00
|
|
|
// if (compute_granularity)
|
|
|
|
// index_granularity.appendMark(total_rows);
|
2019-11-05 11:53:22 +00:00
|
|
|
|
2019-11-07 11:11:38 +00:00
|
|
|
if (rows_to_write)
|
|
|
|
data_written = true;
|
|
|
|
|
2019-10-22 17:42:59 +00:00
|
|
|
// if (current_row == 0 && index_offset != 0)
|
|
|
|
// {
|
|
|
|
// rows_to_write = index_offset;
|
|
|
|
// write_marks = false;
|
|
|
|
// }
|
|
|
|
// else
|
|
|
|
// {
|
|
|
|
// rows_to_write = index_granularity.getMarkRows(current_mark);
|
|
|
|
// }
|
2019-10-16 18:27:53 +00:00
|
|
|
|
2019-10-31 14:44:17 +00:00
|
|
|
// std::cerr << "(MergeTreeDataPartWriterCompact::write) rows_to_write: " << rows_to_write << "\n";
|
2019-10-22 10:50:17 +00:00
|
|
|
|
2019-10-22 17:42:59 +00:00
|
|
|
/// There could already be enough data to compress into the new block.
|
2019-10-31 14:44:17 +00:00
|
|
|
if (stream->compressed.offset() >= settings.min_compress_block_size)
|
2019-10-22 17:42:59 +00:00
|
|
|
stream->compressed.next();
|
2019-10-31 14:44:17 +00:00
|
|
|
|
|
|
|
size_t next_row = 0;
|
2019-10-22 17:42:59 +00:00
|
|
|
|
2019-10-16 18:27:53 +00:00
|
|
|
if (write_marks)
|
|
|
|
{
|
2019-10-21 00:28:29 +00:00
|
|
|
writeIntBinary(rows_to_write, stream->marks);
|
|
|
|
for (size_t i = 0; i < columns_to_write.size(); ++i)
|
|
|
|
{
|
|
|
|
writeIntBinary(stream->plain_hashing.count(), stream->marks);
|
|
|
|
writeIntBinary(stream->compressed.offset(), stream->marks);
|
2019-10-31 14:44:17 +00:00
|
|
|
next_row = writeColumnSingleGranule(columns_to_write[i], current_row, rows_to_write);
|
2019-10-21 00:28:29 +00:00
|
|
|
}
|
2019-11-07 11:11:38 +00:00
|
|
|
++from_mark;
|
2019-10-16 18:27:53 +00:00
|
|
|
}
|
2019-10-21 00:28:29 +00:00
|
|
|
else
|
|
|
|
{
|
|
|
|
for (size_t i = 0; i < columns_to_write.size(); ++i)
|
2019-10-31 14:44:17 +00:00
|
|
|
next_row = writeColumnSingleGranule(columns_to_write[i], current_row, rows_to_write);
|
2019-10-21 00:28:29 +00:00
|
|
|
}
|
2019-11-05 11:53:22 +00:00
|
|
|
|
2019-10-31 14:44:17 +00:00
|
|
|
current_row = next_row;
|
2019-10-16 18:27:53 +00:00
|
|
|
}
|
|
|
|
|
2019-11-07 11:11:38 +00:00
|
|
|
next_mark = from_mark;
|
|
|
|
next_index_offset = total_rows - current_row;
|
2019-10-16 18:27:53 +00:00
|
|
|
}
|
|
|
|
|
2019-10-21 00:28:29 +00:00
|
|
|
size_t MergeTreeDataPartWriterCompact::writeColumnSingleGranule(const ColumnWithTypeAndName & column, size_t from_row, size_t number_of_rows)
|
2019-10-16 18:27:53 +00:00
|
|
|
{
|
2019-10-31 14:44:17 +00:00
|
|
|
std::cerr << "(writeColumnSingleGranule) writing column: " << column.name << "\n";
|
|
|
|
std::cerr << "(writeColumnSingleGranule) from_row: " << from_row << "\n";
|
|
|
|
std::cerr << "(writeColumnSingleGranule) number_of_rows: " << number_of_rows << "\n";
|
2019-11-05 11:53:22 +00:00
|
|
|
|
2019-10-21 00:28:29 +00:00
|
|
|
IDataType::SerializeBinaryBulkStatePtr state;
|
|
|
|
IDataType::SerializeBinaryBulkSettings serialize_settings;
|
2019-10-16 18:27:53 +00:00
|
|
|
|
2019-10-21 15:33:59 +00:00
|
|
|
serialize_settings.getter = [this](IDataType::SubstreamPath) -> WriteBuffer * { return &stream->compressed; };
|
2019-10-21 00:28:29 +00:00
|
|
|
serialize_settings.position_independent_encoding = false;
|
|
|
|
serialize_settings.low_cardinality_max_dictionary_size = 0;
|
2019-10-16 18:27:53 +00:00
|
|
|
|
2019-10-21 00:28:29 +00:00
|
|
|
column.type->serializeBinaryBulkStatePrefix(serialize_settings, state);
|
|
|
|
column.type->serializeBinaryBulkWithMultipleStreams(*column.column, from_row, number_of_rows, serialize_settings, state);
|
|
|
|
column.type->serializeBinaryBulkStateSuffix(serialize_settings, state);
|
2019-10-16 18:27:53 +00:00
|
|
|
|
2019-10-21 00:28:29 +00:00
|
|
|
return from_row + number_of_rows;
|
2019-10-16 18:27:53 +00:00
|
|
|
}
|
|
|
|
|
2019-11-18 15:18:50 +00:00
|
|
|
void MergeTreeDataPartWriterCompact::finishDataSerialization(IMergeTreeDataPart::Checksums & checksums, bool sync)
|
2019-10-21 17:23:06 +00:00
|
|
|
{
|
2019-11-07 11:11:38 +00:00
|
|
|
if (with_final_mark && data_written)
|
2019-10-21 17:23:06 +00:00
|
|
|
{
|
2019-10-31 14:44:17 +00:00
|
|
|
writeIntBinary(0ULL, stream->marks);
|
2019-10-21 17:23:06 +00:00
|
|
|
for (size_t i = 0; i < columns_list.size(); ++i)
|
|
|
|
{
|
|
|
|
writeIntBinary(stream->plain_hashing.count(), stream->marks);
|
|
|
|
writeIntBinary(stream->compressed.offset(), stream->marks);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
stream->finalize();
|
2019-10-22 17:42:59 +00:00
|
|
|
if (sync)
|
|
|
|
stream->sync();
|
2019-10-21 17:23:06 +00:00
|
|
|
stream->addToChecksums(checksums);
|
|
|
|
stream.reset();
|
|
|
|
}
|
|
|
|
|
2019-10-21 15:33:59 +00:00
|
|
|
}
|