Some debug

This commit is contained in:
alesapin 2018-12-04 11:05:58 +03:00
parent bf338b957f
commit 8c34386310
2 changed files with 23 additions and 5 deletions

View File

@ -34,6 +34,8 @@ size_t MergeTreeRangeReader::DelayedStream::readRows(Block & block, size_t num_r
if (num_rows)
{
size_t rows_read = merge_tree_reader->readRows(current_mark, continue_reading, num_rows, block);
std::cerr << "Rows read:" << rows_read << std::endl;
std::cerr << "Num rows:" << num_rows << std::endl;
continue_reading = true;
/// Zero rows_read maybe either because reading has finished
@ -147,8 +149,11 @@ size_t MergeTreeRangeReader::Stream::readRows(Block & block, size_t num_rows)
void MergeTreeRangeReader::Stream::toNextMark()
{
++current_mark;
if (current_mark >= merge_tree_reader->data_part->marks_index_granularity.size())
throw Exception("Trying to read mark №"+ toString(current_mark) + " but total marks count is "
+ toString(merge_tree_reader->data_part->marks_index_granularity.size()), ErrorCodes::LOGICAL_ERROR);
current_mark_index_granularity = merge_tree_reader->data_part->marks_index_granularity[current_mark];
std::cerr << "Current mark granularity:" << current_mark_index_granularity << std::endl;
offset_after_current_mark = 0;
}
@ -556,6 +561,7 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::startReadingChain(size_t
}
auto rows_to_read = std::min(space_left, stream.numPendingRowsInCurrentGranule());
std::cerr << "Rows To Read:" << rows_to_read << std::endl;
bool last = rows_to_read == space_left;
result.addRows(stream.read(result.block, rows_to_read, !last));
result.addGranule(rows_to_read);

View File

@ -105,23 +105,29 @@ IDataType::OutputStreamGetter IMergedBlockOutputStream::createStreamGetter(
void IMergedBlockOutputStream::fillIndexGranularity(const Block & block)
{
size_t rows = block.rows();
std::cerr << "Total Rows:" << rows << std::endl;
size_t index_granularity_for_block;
if (storage.index_granularity_bytes == 0)
index_granularity_for_block = storage.index_granularity;
else
{
std::cerr << "BlockSizeInMemory:" << block.allocatedBytes() << std::endl;
std::cerr << "Storage index granularity:" << storage.index_granularity_bytes << std::endl;
size_t block_size_in_memory = block.allocatedBytes();
if (blocks_are_granules_size)
index_granularity_for_block = rows;
else
index_granularity_for_block = std::max(block_size_in_memory / storage.index_granularity_bytes, 1);
index_granularity_for_block = std::max(block_size_in_memory / storage.index_granularity_bytes, rows);
}
size_t current_row = 0;
std::cerr << "Final granularity:" << index_granularity_for_block << std::endl;
while (current_row < rows)
{
index_granularity.push_back(index_granularity_for_block);
++current_row;
current_row += index_granularity_for_block;
}
std::cerr << "Total written granules:" << index_granularity.size() << std::endl;
}
size_t IMergedBlockOutputStream::writeSingleGranule(
@ -160,7 +166,7 @@ size_t IMergedBlockOutputStream::writeSingleGranule(
writeIntBinary(stream.plain_hashing.count(), stream.marks);
writeIntBinary(stream.compressed.offset(), stream.marks);
if (stream.marks_file_extension != FIXED_MARKS_FILE_EXTENSION)
writeIntBinary(index_granularity, stream.marks);
writeIntBinary(number_of_rows, stream.marks);
}, serialize_settings.path);
}
@ -459,6 +465,7 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
}
new_part->rows_count = rows_count;
std::cerr << "SETTING CURRENT MARK FOR PART:" << part_path << " to " << current_mark << std::endl;
new_part->marks_count = current_mark;
new_part->modification_time = time(nullptr);
new_part->columns = *total_column_list;
@ -578,7 +585,9 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock();
/// Write index. The index contains Primary Key value for each `index_granularity` row.
for (size_t i = index_offset; i < rows; i += index_granularity[current_mark])
std::cerr << "Index Granularity size:" << index_granularity.size() << std::endl;
std::cerr << "Index Granularity first elem:" << index_granularity[0] << std::endl;
for (size_t i = index_offset; i < rows; i += index_granularity.at(current_mark))
{
if (storage.hasPrimaryKey())
{
@ -590,9 +599,12 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
}
}
std::cerr << "I:" << i << " Total rows:" << rows << std::endl;
std::cerr << "Increment current mark:" << current_mark << std::endl;
++current_mark;
}
}
std::cerr << "block written, total marks:" << current_mark << std::endl;
index_offset = new_index_offset;
}