Fix vertical merge again

This commit is contained in:
alesapin 2019-03-20 20:04:34 +03:00
parent 95a8be569d
commit b5d6813604
5 changed files with 15 additions and 6 deletions

View File

@ -139,7 +139,12 @@ size_t getAvgGranularity(const std::unordered_map<size_t, size_t> & rows_granula
{
size_t sum = 0;
for (const auto & [granularity, rows_num] : rows_granularity)
{
//std::cerr << "Granularity:" << granularity << " Rows Num:" << rows_num << std::endl;
sum += granularity * rows_num;
}
//std::cerr << "SUM:" << sum << std::endl;
//std::cerr << "ANSWER:" << sum / total_rows << std::endl;
return sum / total_rows;
}
}

View File

@ -91,15 +91,16 @@ try
Block res;
if (!isCancelled() && current_row < data_part->rows_count)
{
size_t rows_to_read = data_part->marks_index_granularity[current_mark];
bool continue_reading = (current_mark != 0);
size_t rows_readed = reader->readRows(current_mark, continue_reading, storage.index_granularity, res);
size_t rows_readed = reader->readRows(current_mark, continue_reading, rows_to_read, res);
if (res)
{
res.checkNumberOfRows();
current_row += rows_readed;
current_mark += (rows_readed / storage.index_granularity);
current_mark += (rows_to_read == rows_readed);
bool should_reorder = false, should_evaluate_missing_defaults = false;
reader->fillMissingColumns(res, should_reorder, should_evaluate_missing_defaults, res.rows());
@ -116,7 +117,6 @@ try
finish();
}
//std::cerr << "Resulting block in MergeTreeSequentialBlockInputStream:" << res.dumpStructure() << std::endl;
return res;
}
catch (...)

View File

@ -236,6 +236,7 @@ std::pair<size_t, size_t> IMergedBlockOutputStream::writeColumn(
size_t total_rows = column.size();
size_t current_row = 0;
//std::cerr << "FROM MARK:" << from_mark << std::endl;
size_t current_column_mark = from_mark;
while (current_row < total_rows)
{
@ -273,7 +274,9 @@ std::pair<size_t, size_t> IMergedBlockOutputStream::writeColumn(
if (write_marks)
current_column_mark++;
//std::cerr << "current column mark (loop):" << current_column_mark << std::endl;
}
//std::cerr << "Current column mark:" << current_column_mark << std::endl;
/// Memoize offsets for Nested types, that are already written. They will not be written again for next columns of Nested structure.
type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path)
@ -286,7 +289,7 @@ std::pair<size_t, size_t> IMergedBlockOutputStream::writeColumn(
}
}, serialize_settings.path);
return std::make_pair(current_column_mark - from_mark - 1, current_row - total_rows);
return std::make_pair(current_column_mark, current_row - total_rows);
}

View File

@ -35,7 +35,10 @@ std::vector<size_t> readGranularity(const Poco::Path & mrk_file_path, size_t fix
DB::readBinary(offset_in_decompressed_block, mrk_in);
UInt64 index_granularity_rows = 0;
if (extension == "mrk2")
{
DB::readBinary(index_granularity_rows, mrk_in);
std::cerr << "Readed rows:" << index_granularity_rows << std::endl;
}
else
index_granularity_rows = fixed_granularity;
result.push_back(index_granularity_rows);

View File

@ -79,9 +79,7 @@ void checkByCompressedReadBuffer(const std::string & mrk_path, const std::string
DB::CompressedReadBufferFromFile bin_in(bin_path, 0, 0);
DB::WriteBufferFromFileDescriptor out(STDOUT_FILENO);
std::cerr << "MRKPATH:"<<mrk_path << std::endl;
bool mrk2_format = boost::algorithm::ends_with(mrk_path, ".mrk2");
std::cerr << "endswith:" << mrk2_format << std::endl;
for (size_t mark_num = 0; !mrk_in.eof(); ++mark_num)
{