Slightly clearer debug and fix alter

This commit is contained in:
alesapin 2019-03-19 12:57:29 +03:00
parent 8cf304c83d
commit d8aa5c7c49
8 changed files with 93 additions and 45 deletions

View File

@ -109,7 +109,7 @@ Block MergingSortedBlockInputStream::readImpl()
else
merge(merged_columns, queue_without_collation);
std::cerr << "MERGED COLUMNS SIZE:" << merged_columns[0]->size() << std::endl;
//std::cerr << "MERGED COLUMNS SIZE:" << merged_columns[0]->size() << std::endl;
return header.cloneWithColumns(std::move(merged_columns));
}
@ -201,7 +201,7 @@ void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::
/// If there are already data in the current block, we first return it. We'll get here again the next time we call the merge function.
if (merged_rows != 0)
{
std::cerr << "merged rows is non-zero\n";
//std::cerr << "merged rows is non-zero\n";
queue.push(current);
return;
}
@ -242,7 +242,7 @@ void MergingSortedBlockInputStream::merge(MutableColumns & merged_columns, std::
out_row_sources_buf->write(row_source.data);
}
std::cerr << "fetching next block\n";
//std::cerr << "fetching next block\n";
total_merged_rows += merged_rows;
fetchNextBlock(current, queue);

View File

@ -40,6 +40,7 @@
#include <Common/Stopwatch.h>
#include <Common/typeid_cast.h>
#include <Common/localBackup.h>
#include <Common/StackTrace.h>
#include <Interpreters/PartLog.h>
#include <Poco/DirectoryIterator.h>
@ -1162,6 +1163,7 @@ void MergeTreeData::createConvertExpression(const DataPartPtr & part, const Name
const IndicesASTs & old_indices, const IndicesASTs & new_indices, ExpressionActionsPtr & out_expression,
NameToNameMap & out_rename_map, bool & out_force_update_metadata) const
{
std::cerr << "Stack:" << StackTrace().toString() << std::endl;
out_expression = nullptr;
out_rename_map = {};
out_force_update_metadata = false;
@ -1278,20 +1280,24 @@ void MergeTreeData::createConvertExpression(const DataPartPtr & part, const Name
/// After conversion, we need to rename temporary files into original.
new_types[source_and_expression.first]->enumerateStreams(
[&](const IDataType::SubstreamPath & substream_path)
{
/// Skip array sizes, because they cannot be modified in ALTER.
if (!substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes)
return;
if (part)
{
new_types[source_and_expression.first]->enumerateStreams(
[&](const IDataType::SubstreamPath & substream_path)
{
/// Skip array sizes, because they cannot be modified in ALTER.
if (!substream_path.empty() && substream_path.back().type == IDataType::Substream::ArraySizes)
return;
String original_file_name = IDataType::getFileNameForStream(original_column_name, substream_path);
String temporary_file_name = IDataType::getFileNameForStream(temporary_column_name, substream_path);
String original_file_name = IDataType::getFileNameForStream(original_column_name, substream_path);
String temporary_file_name = IDataType::getFileNameForStream(temporary_column_name, substream_path);
std::cerr << "PART MARKS FILE_EXTENSION:" << part->marks_file_extension << std::endl;
out_rename_map[temporary_file_name + ".bin"] = original_file_name + ".bin";
out_rename_map[temporary_file_name + part->marks_file_extension] = original_file_name + part->marks_file_extension;
}, {});
std::cerr << "PART IS NULL:" << (part == nullptr) << std::endl;
std::cerr << "PART MARKS FILE_EXTENSION:" << part->marks_file_extension << std::endl;
out_rename_map[temporary_file_name + ".bin"] = original_file_name + ".bin";
out_rename_map[temporary_file_name + part->marks_file_extension] = original_file_name + part->marks_file_extension;
}, {});
}
}
out_expression->add(ExpressionAction::project(projection));
@ -1323,6 +1329,7 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(
const IndicesASTs & new_indices,
bool skip_sanity_checks)
{
std::cerr << "LETS DEBUG ALTER\n";
ExpressionActionsPtr expression;
AlterDataPartTransactionPtr transaction(new AlterDataPartTransaction(part)); /// Blocks changes to the part.
bool force_update_metadata;
@ -1454,6 +1461,7 @@ MergeTreeData::AlterDataPartTransactionPtr MergeTreeData::alterDataPart(
transaction->new_columns.writeText(columns_file);
transaction->rename_map["columns.txt.tmp"] = "columns.txt";
}
std::cerr << "ALTER FINISHED\n";
return transaction;
}

View File

@ -207,12 +207,12 @@ String MergeTreeDataPart::getColumnNameWithMinumumCompressedSize() const
for (const auto & column : storage_columns)
{
std::cerr << "Searching for column:" << column.name << std::endl;
//std::cerr << "Searching for column:" << column.name << std::endl;
if (!hasColumnFiles(column.name))
continue;
const auto size = getColumnSize(column.name, *column.type).data_compressed;
std::cerr << "Column size:" <<size<<std::endl;
//std::cerr << "Column size:" <<size<<std::endl;
if (size < minimum_size)
{
minimum_size = size;
@ -485,21 +485,24 @@ void MergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checksu
void MergeTreeDataPart::loadMarksIndexGranularity()
{
std::cerr << "LOADING MARKS FOR PART:" << getFullPath() << std::endl;
//std::cerr << "LOADING MARKS FOR PART:" << getFullPath() << std::endl;
if (columns.empty())
throw Exception("No columns in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
/// We can use any column, it doesn't matter
std::string marks_file_path = getFullPath() + escapeForFileName(columns.front().name);
std::cerr << "MARKSFILEPATH:" << marks_file_path << std::endl;
if (Poco::File(marks_file_path + ".mrk").exists()) {
//std::cerr << "MARKSFILEPATH:" << marks_file_path << std::endl;
if (Poco::File(marks_file_path + ".mrk").exists())
{
marks_file_extension = ".mrk";
std::cerr << "EXISTS .mrk " << getFullPath() << std::endl;
//std::cerr << "EXISTS .mrk " << getFullPath() << std::endl;
}
else if (Poco::File(marks_file_path + ".mrk2").exists()) {
else if (Poco::File(marks_file_path + ".mrk2").exists())
{
marks_file_extension = ".mrk2";
std::cerr << "EXISTS .mrk2:" << getFullPath() << std::endl;
} else
//std::cerr << "EXISTS .mrk2:" << getFullPath() << std::endl;
}
else
throw Exception("Marks file '" + marks_file_path + "' doesn't exist with extensions .mrk or mrk2", ErrorCodes::NO_FILE_IN_DATA_PART);
marks_file_path += marks_file_extension;
@ -833,12 +836,12 @@ bool MergeTreeDataPart::hasColumnFiles(const String & column) const
/// That's Ok under assumption that files exist either for all or for no streams.
String prefix = getFullPath();
std::cerr << "ColumnPrefix:" << prefix << std::endl;
//std::cerr << "ColumnPrefix:" << prefix << std::endl;
String escaped_column = escapeForFileName(column);
std::cerr << "Escaped name:" << escaped_column << std::endl;
std::cerr << "Marks file extension:" << marks_file_extension << std::endl;
//std::cerr << "Escaped name:" << escaped_column << std::endl;
//std::cerr << "Marks file extension:" << marks_file_extension << std::endl;
return Poco::File(prefix + escaped_column + ".bin").exists()
&& Poco::File(prefix + escaped_column + marks_file_extension).exists();
}

View File

@ -34,8 +34,8 @@ size_t MergeTreeRangeReader::DelayedStream::readRows(Block & block, size_t num_r
if (num_rows)
{
size_t rows_read = merge_tree_reader->readRows(current_mark, continue_reading, num_rows, block);
std::cerr << "Rows read:" << rows_read << std::endl;
std::cerr << "Num rows:" << num_rows << std::endl;
//std::cerr << "Rows read:" << rows_read << std::endl;
//std::cerr << "Num rows:" << num_rows << std::endl;
continue_reading = true;
/// Zero rows_read maybe either because reading has finished
@ -178,8 +178,8 @@ size_t MergeTreeRangeReader::Stream::read(Block & block, size_t num_rows, bool s
offset_after_current_mark += num_rows;
/// Start new granule; skipped_rows_after_offset is already zero.
std::cerr << "Offset after current mark:" << offset_after_current_mark << std::endl;
std::cerr << "Current Index granularity:" << current_mark_index_granularity << std::endl;
//std::cerr << "Offset after current mark:" << offset_after_current_mark << std::endl;
//std::cerr << "Current Index granularity:" << current_mark_index_granularity << std::endl;
if (offset_after_current_mark == current_mark_index_granularity || skip_remaining_rows_in_current_granule)
toNextMark();
@ -572,7 +572,7 @@ MergeTreeRangeReader::ReadResult MergeTreeRangeReader::startReadingChain(size_t
}
auto rows_to_read = std::min(space_left, stream.numPendingRowsInCurrentGranule());
std::cerr << "Rows To Read:" << rows_to_read << std::endl;
//std::cerr << "Rows To Read:" << rows_to_read << std::endl;
bool last = rows_to_read == space_left;
result.addRows(stream.read(result.block, rows_to_read, !last));
result.addGranule(rows_to_read);

View File

@ -116,7 +116,7 @@ try
finish();
}
std::cerr << "Resulting block in MergeTreeSequentialBlockInputStream:" << res.dumpStructure() << std::endl;
//std::cerr << "Resulting block in MergeTreeSequentialBlockInputStream:" << res.dumpStructure() << std::endl;
return res;
}
catch (...)

View File

@ -128,9 +128,9 @@ void fillIndexGranularityImpl(
index_granularity_for_block = rows_in_block;
else if (block_size_in_memory >= index_granularity_bytes)
{
std::cerr << "BLOCK SIZE In MEMORY:" << block_size_in_memory << std::endl;
//std::cerr << "BLOCK SIZE In MEMORY:" << block_size_in_memory << std::endl;
size_t granules_in_block = block_size_in_memory / index_granularity_bytes;
std::cerr << "GRANULES IN BLOCK:" << granules_in_block << std::endl;
//std::cerr << "GRANULES IN BLOCK:" << granules_in_block << std::endl;
index_granularity_for_block = rows_in_block / granules_in_block;
}
else
@ -141,7 +141,7 @@ void fillIndexGranularityImpl(
}
if (index_granularity_for_block == 0) /// very rare case when index granularity bytes less then single row
index_granularity_for_block = 1;
std::cerr << "GRANULARITY SIZE IN ROWS:"<< index_granularity_for_block << std::endl;
//std::cerr << "GRANULARITY SIZE IN ROWS:"<< index_granularity_for_block << std::endl;
for (size_t current_row = index_offset; current_row < rows_in_block; current_row += index_granularity_for_block)
index_granularity.push_back(index_granularity_for_block);
@ -273,7 +273,6 @@ std::pair<size_t, size_t> IMergedBlockOutputStream::writeColumn(
if (write_marks)
current_column_mark++;
std::cerr << "CURRENT ROW:" << current_row << std::endl;
}
/// Memoize offsets for Nested types, that are already written. They will not be written again for next columns of Nested structure.
@ -520,7 +519,7 @@ void MergedBlockOutputStream::writeSuffixAndFinalizePart(
}
new_part->rows_count = rows_count;
std::cerr << "SETTING CURRENT MARK FOR PART:" << part_path << " to " << current_mark << std::endl;
//std::cerr << "SETTING CURRENT MARK FOR PART:" << part_path << " to " << current_mark << std::endl;
new_part->marks_count = current_mark;
new_part->modification_time = time(nullptr);
new_part->columns = *total_column_list;
@ -734,11 +733,10 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
auto temporarily_disable_memory_tracker = getCurrentMemoryTrackerActionLock();
/// Write index. The index contains Primary Key value for each `index_granularity` row.
std::cerr << "Index Granularity size:" << index_granularity.size() << std::endl;
std::cerr << "Index Granularity first elem:" << index_granularity[0] << std::endl;
//std::cerr << "Index Granularity size:" << index_granularity.size() << std::endl;
//std::cerr << "Index Granularity first elem:" << index_granularity[0] << std::endl;
for (size_t i = index_offset; i < rows;)
{
std::cerr << "IN LOOP\n";
if (storage.hasPrimaryKey())
{
for (size_t j = 0, size = primary_key_columns.size(); j < size; ++j)
@ -749,8 +747,8 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
}
}
std::cerr << "I:" << i << " Total rows:" << rows << std::endl;
std::cerr << "Increment current mark:" << current_mark << std::endl;
//std::cerr << "I:" << i << " Total rows:" << rows << std::endl;
//std::cerr << "Increment current mark:" << current_mark << std::endl;
++current_mark;
if (current_mark < index_granularity.size())
i += index_granularity[current_mark];
@ -758,8 +756,8 @@ void MergedBlockOutputStream::writeImpl(const Block & block, const IColumn::Perm
break;
}
}
std::cerr << "Index granularity size:" << index_granularity.size() << std::endl;
std::cerr << "block written, total marks:" << current_mark << std::endl;
//std::cerr << "Index granularity size:" << index_granularity.size() << std::endl;
//std::cerr << "block written, total marks:" << current_mark << std::endl;
index_offset = new_index_offset;
}

View File

@ -14,3 +14,9 @@
1
8
1
4
1
4
1
8
1

View File

@ -105,3 +105,36 @@ SELECT COUNT(*) FROM test.huge_granularity_small_blocks;
SELECT distinct(marks) from system.parts WHERE table = 'huge_granularity_small_blocks' and database='test';
DROP TABLE IF EXISTS test.huge_granularity_small_blocks;
----- Some alter tests ----
DROP TABLE IF EXISTS test.adaptive_granularity_alter;
CREATE TABLE test.adaptive_granularity_alter (
p Date,
k UInt64,
v1 UInt64,
v2 Int64
) ENGINE MergeTree() PARTITION BY toYYYYMM(p) ORDER BY k SETTINGS index_granularity_bytes = 110;
INSERT INTO test.adaptive_granularity_alter (p, k, v1, v2) VALUES ('2018-05-15', 1, 1000, 2000), ('2018-05-16', 2, 3000, 4000), ('2018-05-17', 3, 5000, 6000), ('2018-05-18', 4, 7000, 8000);
SELECT COUNT(*) FROM test.adaptive_granularity_alter;
SELECT distinct(marks) from system.parts WHERE table = 'adaptive_granularity_alter' and database='test';
OPTIMIZE TABLE test.adaptive_granularity_alter FINAL;
ALTER TABLE test.adaptive_granularity_alter MODIFY COLUMN v1 Int16;
SELECT COUNT(*) FROM test.adaptive_granularity_alter;
SELECT distinct(marks) from system.parts WHERE table = 'adaptive_granularity_alter' and database='test';
INSERT INTO test.adaptive_granularity_alter (p, k, v1, v2) VALUES ('2018-05-15', 1, 1000, 2000), ('2018-05-16', 5, 3000, 4000), ('2018-05-17', 6, 5000, 6000), ('2018-05-19', 7, 7000, 8000);
SELECT COUNT(*) FROM test.adaptive_granularity_alter;
SELECT distinct(marks) from system.parts WHERE table = 'adaptive_granularity_alter' and database='test';
DROP TABLE IF EXISTS test.adaptive_granularity_alter;