From c1e7d5d3c10a6e0789de3c4eb512256d719a9f1e Mon Sep 17 00:00:00 2001 From: alesapin Date: Fri, 18 Dec 2020 16:49:45 +0300 Subject: [PATCH 01/32] Working code --- .../MergeTreeDataPartWriterOnDisk.cpp | 15 ++--- .../MergeTree/MergeTreeDataPartWriterWide.cpp | 65 +++++++++++++++++-- .../MergeTree/MergeTreeDataPartWriterWide.h | 4 ++ 3 files changed, 69 insertions(+), 15 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp index 5190b439809..150bdad55f2 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp @@ -218,6 +218,12 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializeSkipIndices(const Block auto & stream = *skip_indices_streams[i]; for (const auto & granule : granules_to_write) { + if (skip_index_accumulated_marks[i] == index_helper->index.granularity) + { + skip_indices_aggregators[i]->getGranuleAndReset()->serializeBinary(stream.compressed); + skip_index_accumulated_marks[i] = 0; + } + if (skip_indices_aggregators[i]->empty() && granule.mark_on_start) { skip_indices_aggregators[i] = index_helper->createIndexAggregator(); @@ -236,16 +242,7 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializeSkipIndices(const Block size_t pos = granule.start_row; skip_indices_aggregators[i]->update(skip_indexes_block, &pos, granule.granularity_rows); if (granule.isCompleted()) - { ++skip_index_accumulated_marks[i]; - - /// write index if it is filled - if (skip_index_accumulated_marks[i] == index_helper->index.granularity) - { - skip_indices_aggregators[i]->getGranuleAndReset()->serializeBinary(stream.compressed); - skip_index_accumulated_marks[i] = 0; - } - } } } } diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp index 1127c3a7dd7..f9b42e7c320 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp @@ -167,6 +167,13 @@ void MergeTreeDataPartWriterWide::write(const Block & block, const IColumn::Perm if (compute_granularity) { size_t index_granularity_for_block = computeIndexGranularity(block); + if (rows_written_in_last_mark > 0) + { + size_t rows_left_in_last_mark = index_granularity.getMarkRows(getCurrentMark()) - rows_written_in_last_mark; + if (rows_left_in_last_mark > index_granularity_for_block) + adjustLastMarkAndFlushToDisk(); + } + fillIndexGranularity(index_granularity_for_block, block.rows()); } @@ -281,9 +288,6 @@ void MergeTreeDataPartWriterWide::writeSingleGranule( IDataType::SerializeBinaryBulkSettings & serialize_settings, const Granule & granule) { - if (granule.mark_on_start) - writeSingleMark(name, type, offset_columns, granule.granularity_rows, serialize_settings.path); - type.serializeBinaryBulkWithMultipleStreams(column, granule.start_row, granule.granularity_rows, serialize_settings, serialization_state); /// So that instead of the marks pointing to the end of the compressed block, there were marks pointing to the beginning of the next one. @@ -309,6 +313,9 @@ void MergeTreeDataPartWriterWide::writeColumn( WrittenOffsetColumns & offset_columns, const Granules & granules) { + if (granules.empty()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty granules for column {}, current mark {}", backQuoteIfNeed(name), getCurrentMark()); + auto [it, inserted] = serialization_states.emplace(name, nullptr); if (inserted) @@ -329,6 +336,13 @@ void MergeTreeDataPartWriterWide::writeColumn( if (granule.granularity_rows > 0) data_written = true; + if (granule.mark_on_start) + { + if (last_non_written_marks.count(name)) + throw Exception(ErrorCodes::LOGICAL_ERROR, "We have to add new mark for column, but already have non written mark. Current mark {}, total marks {}, offset {}", getCurrentMark(), index_granularity.getMarksCount(), rows_written_in_last_mark); + last_non_written_marks[name] = getCurrentMarksForColumn(name, type, offset_columns, serialize_settings.path); + } + writeSingleGranule( name, type, @@ -338,6 +352,17 @@ void MergeTreeDataPartWriterWide::writeColumn( serialize_settings, granule ); + + if (granule.isCompleted()) + { + auto it = last_non_written_marks.find(name); + if (it == last_non_written_marks.end()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "No mark was saved for incomplete granule for column {}", backQuoteIfNeed(name)); + + for (const auto & mark : it->second) + flushMarkToFile(mark, index_granularity.getMarkRows(granule.mark_number)); + last_non_written_marks.erase(it); + } } type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */) @@ -404,8 +429,8 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const String & name, if (index_granularity_rows != index_granularity.getMarkRows(mark_num)) throw Exception( - ErrorCodes::LOGICAL_ERROR, "Incorrect mark rows for mark #{} (compressed offset {}, decompressed offset {}), in-memory {}, on disk {}", - mark_num, offset_in_compressed_file, offset_in_decompressed_block, index_granularity.getMarkRows(mark_num), index_granularity_rows); + ErrorCodes::LOGICAL_ERROR, "Incorrect mark rows for mark #{} (compressed offset {}, decompressed offset {}), in-memory {}, on disk {}, total marks {}", + mark_num, offset_in_compressed_file, offset_in_decompressed_block, index_granularity.getMarkRows(mark_num), index_granularity_rows, index_granularity.getMarksCount()); auto column = type.createColumn(); @@ -415,7 +440,9 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const String & name, { must_be_last = true; } - else if (column->size() != index_granularity_rows) + + /// Now they must be equal + if (column->size() != index_granularity_rows) { throw Exception( ErrorCodes::LOGICAL_ERROR, "Incorrect mark rows for mark #{} (compressed offset {}, decompressed offset {}), actually in bin file {}, in mrk file {}", @@ -445,6 +472,8 @@ void MergeTreeDataPartWriterWide::finishDataSerialization(IMergeTreeDataPart::Ch serialize_settings.low_cardinality_max_dictionary_size = global_settings.low_cardinality_max_dictionary_size; serialize_settings.low_cardinality_use_single_dictionary_for_part = global_settings.low_cardinality_use_single_dictionary_for_part != 0; WrittenOffsetColumns offset_columns; + if (rows_written_in_last_mark > 0) + adjustLastMarkAndFlushToDisk(); bool write_final_mark = (with_final_mark && data_written); @@ -537,4 +566,28 @@ void MergeTreeDataPartWriterWide::fillIndexGranularity(size_t index_granularity_ rows_in_block); } + +void MergeTreeDataPartWriterWide::adjustLastMarkAndFlushToDisk() +{ + if (getCurrentMark() != index_granularity.getMarksCount() - 1) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Non last mark {} having rows offset {}, total marks {}", getCurrentMark(), rows_written_in_last_mark, index_granularity.getMarksCount()); + + if (last_non_written_marks.empty()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "No saved marks for last mark {} having rows offset {}, total marks {}", getCurrentMark(), rows_written_in_last_mark, index_granularity.getMarksCount()); + + index_granularity.popMark(); + index_granularity.appendMark(rows_written_in_last_mark); + for (const auto & [name, marks] : last_non_written_marks) + for (const auto & mark : marks) + flushMarkToFile(mark, index_granularity.getMarkRows(getCurrentMark())); + + last_non_written_marks.clear(); + + for (size_t i = 0; i < skip_indices.size(); ++i) + ++skip_index_accumulated_marks[i]; + + setCurrentMark(getCurrentMark() + 1); + rows_written_in_last_mark = 0; +} + } diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h index cffedd66a32..afaf5958338 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h @@ -99,6 +99,8 @@ private: /// in our index_granularity array. void shiftCurrentMark(const Granules & granules_written); + void adjustLastMarkAndFlushToDisk(); + IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenOffsetColumns & offset_columns) const; using SerializationState = IDataType::SerializeBinaryBulkStatePtr; @@ -108,6 +110,8 @@ private: using ColumnStreams = std::map; ColumnStreams column_streams; + using MarksForColumns = std::unordered_map; + MarksForColumns last_non_written_marks; /// How many rows we have already written in the current mark. /// More than zero when incoming blocks are smaller then their granularity. From cd598034cd7d83466e1b336d548db187b5be99f8 Mon Sep 17 00:00:00 2001 From: alesapin Date: Fri, 18 Dec 2020 17:44:31 +0300 Subject: [PATCH 02/32] Fix add tests --- .../MergeTreeDataPartWriterCompact.cpp | 19 ++++++------- .../MergeTreeDataPartWriterOnDisk.cpp | 4 +-- .../MergeTree/MergeTreeDataPartWriterOnDisk.h | 17 ++++------- .../MergeTree/MergeTreeDataPartWriterWide.cpp | 26 ++++++++--------- .../MergeTree/MergeTreeDataPartWriterWide.h | 7 +++++ ...aptive_granularity_block_borders.reference | 2 ++ ...605_adaptive_granularity_block_borders.sql | 28 +++++++++++++++++++ 7 files changed, 66 insertions(+), 37 deletions(-) create mode 100644 tests/queries/0_stateless/01605_adaptive_granularity_block_borders.reference create mode 100644 tests/queries/0_stateless/01605_adaptive_granularity_block_borders.sql diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp index 389d2950f65..d5e7009efd6 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterCompact.cpp @@ -93,12 +93,12 @@ Granules getGranulesToWrite(const MergeTreeIndexGranularity & index_granularity, result.emplace_back(Granule{ .start_row = current_row, - .granularity_rows = expected_rows_in_mark, - .block_rows = std::min(rows_left_in_block, expected_rows_in_mark), + .rows_to_write = std::min(rows_left_in_block, expected_rows_in_mark), .mark_number = current_mark, - .mark_on_start = true + .mark_on_start = true, + .is_complete = (rows_left_in_block >= expected_rows_in_mark) }); - current_row += expected_rows_in_mark; + current_row += result.back().rows_to_write; current_mark++; } @@ -173,8 +173,7 @@ void MergeTreeDataPartWriterCompact::writeDataBlock(const Block & block, const G { for (const auto & granule : granules) { - if (granule.granularity_rows) - data_written = true; + data_written = true; auto name_and_type = columns_list.begin(); for (size_t i = 0; i < columns_list.size(); ++i, ++name_and_type) @@ -206,13 +205,13 @@ void MergeTreeDataPartWriterCompact::writeDataBlock(const Block & block, const G writeIntBinary(plain_hashing.count(), marks); writeIntBinary(UInt64(0), marks); - writeColumnSingleGranule(block.getByName(name_and_type->name), stream_getter, granule.start_row, granule.granularity_rows); + writeColumnSingleGranule(block.getByName(name_and_type->name), stream_getter, granule.start_row, granule.rows_to_write); /// Each type always have at least one substream prev_stream->hashing_buf.next(); //-V522 } - writeIntBinary(granule.block_rows, marks); + writeIntBinary(granule.rows_to_write, marks); } } @@ -222,11 +221,11 @@ void MergeTreeDataPartWriterCompact::finishDataSerialization(IMergeTreeDataPart: { auto block = header.cloneWithColumns(columns_buffer.releaseColumns()); auto granules_to_write = getGranulesToWrite(index_granularity, block.rows(), getCurrentMark(), /* last_block = */ true); - if (!granules_to_write.back().isCompleted()) + if (!granules_to_write.back().is_complete) { /// Correct last mark as it should contain exact amount of rows. index_granularity.popMark(); - index_granularity.appendMark(granules_to_write.back().block_rows); + index_granularity.appendMark(granules_to_write.back().rows_to_write); } writeDataBlockPrimaryIndexAndSkipIndices(block, granules_to_write); } diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp index 150bdad55f2..8e6ffe9ee68 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.cpp @@ -240,8 +240,8 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializeSkipIndices(const Block } size_t pos = granule.start_row; - skip_indices_aggregators[i]->update(skip_indexes_block, &pos, granule.granularity_rows); - if (granule.isCompleted()) + skip_indices_aggregators[i]->update(skip_indexes_block, &pos, granule.rows_to_write); + if (granule.is_complete) ++skip_index_accumulated_marks[i]; } } diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h index bc09e0e61e1..a7b84c95e0a 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterOnDisk.h @@ -20,25 +20,18 @@ struct Granule { /// Start row in block for granule size_t start_row; - /// Amount of rows which granule have to contain according to index - /// granularity. - /// NOTE: Sometimes it's not equal to actually written rows, for example - /// for the last granule if it's smaller than computed granularity. - size_t granularity_rows; /// Amount of rows from block which have to be written to disk from start_row - size_t block_rows; + size_t rows_to_write; /// Global mark number in the list of all marks (index_granularity) for this part size_t mark_number; /// Should writer write mark for the first of this granule to disk. /// NOTE: Sometimes we don't write mark for the start row, because /// this granule can be continuation of the previous one. bool mark_on_start; - - /// Is this granule contain amout of rows equal to the value in index granularity - bool isCompleted() const - { - return granularity_rows == block_rows; - } + /// if true: When this granule will be written to disk all rows for corresponding mark will + /// be wrtten. It doesn't mean that rows_to_write == index_granularity.getMarkRows(mark_number), + /// We may have a lot of small blocks between two marks and this may be the last one. + bool is_complete; }; /// Multiple granules to write for concrete block. diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp index f9b42e7c320..ca0e4e014a5 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp @@ -33,12 +33,12 @@ Granules getGranulesToWrite(const MergeTreeIndexGranularity & index_granularity, size_t rows_left_in_block = block_rows - current_row; result.emplace_back(Granule{ .start_row = current_row, - .granularity_rows = rows_left_in_last_mark, - .block_rows = std::min(rows_left_in_block, rows_left_in_last_mark), + .rows_to_write = std::min(rows_left_in_block, rows_left_in_last_mark), .mark_number = current_mark, .mark_on_start = false, /// Don't mark this granule because we have already marked it + .is_complete = (rows_left_in_block >= rows_left_in_last_mark), }); - current_row += rows_left_in_last_mark; + current_row += result.back().rows_to_write; current_mark++; } @@ -51,12 +51,12 @@ Granules getGranulesToWrite(const MergeTreeIndexGranularity & index_granularity, /// save incomplete granule result.emplace_back(Granule{ .start_row = current_row, - .granularity_rows = expected_rows_in_mark, - .block_rows = std::min(rows_left_in_block, expected_rows_in_mark), + .rows_to_write = std::min(rows_left_in_block, expected_rows_in_mark), .mark_number = current_mark, .mark_on_start = true, + .is_complete = (rows_left_in_block >= expected_rows_in_mark), }); - current_row += expected_rows_in_mark; + current_row += result.back().rows_to_write; current_mark++; } @@ -136,11 +136,12 @@ IDataType::OutputStreamGetter MergeTreeDataPartWriterWide::createStreamGetter( }; } + void MergeTreeDataPartWriterWide::shiftCurrentMark(const Granules & granules_written) { auto last_granule = granules_written.back(); /// If we didn't finished last granule than we will continue to write it from new block - if (!last_granule.isCompleted()) + if (!last_granule.is_complete) { /// Shift forward except last granule setCurrentMark(getCurrentMark() + granules_written.size() - 1); @@ -148,9 +149,9 @@ void MergeTreeDataPartWriterWide::shiftCurrentMark(const Granules & granules_wri /// We wrote whole block in the same granule, but didn't finished it. /// So add written rows to rows written in last_mark if (still_in_the_same_granule) - rows_written_in_last_mark += last_granule.block_rows; + rows_written_in_last_mark += last_granule.rows_to_write; else - rows_written_in_last_mark = last_granule.block_rows; + rows_written_in_last_mark = last_granule.rows_to_write; } else { @@ -288,7 +289,7 @@ void MergeTreeDataPartWriterWide::writeSingleGranule( IDataType::SerializeBinaryBulkSettings & serialize_settings, const Granule & granule) { - type.serializeBinaryBulkWithMultipleStreams(column, granule.start_row, granule.granularity_rows, serialize_settings, serialization_state); + type.serializeBinaryBulkWithMultipleStreams(column, granule.start_row, granule.rows_to_write, serialize_settings, serialization_state); /// So that instead of the marks pointing to the end of the compressed block, there were marks pointing to the beginning of the next one. type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */) @@ -333,8 +334,7 @@ void MergeTreeDataPartWriterWide::writeColumn( for (const auto & granule : granules) { - if (granule.granularity_rows > 0) - data_written = true; + data_written = true; if (granule.mark_on_start) { @@ -353,7 +353,7 @@ void MergeTreeDataPartWriterWide::writeColumn( granule ); - if (granule.isCompleted()) + if (granule.is_complete) { auto it = last_non_written_marks.find(name); if (it == last_non_written_marks.end()) diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h index afaf5958338..3fc3e3bfa3d 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h @@ -99,6 +99,11 @@ private: /// in our index_granularity array. void shiftCurrentMark(const Granules & granules_written); + /// Change rows in the last mark in index_granularity to rows_written_in_last_mark. + /// Flush all marks from last_non_written_marks to disk and increment current mark. + /// + /// This function used when blocks change granularity drastically and we have unfinished mark. + /// Also useful to have exact amount of rows in last (non-final) mark. void adjustLastMarkAndFlushToDisk(); IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenOffsetColumns & offset_columns) const; @@ -110,6 +115,8 @@ private: using ColumnStreams = std::map; ColumnStreams column_streams; + /// Non written marks to disk (for each column). Waiting until all rows for + /// this marks will be written to disk. using MarksForColumns = std::unordered_map; MarksForColumns last_non_written_marks; diff --git a/tests/queries/0_stateless/01605_adaptive_granularity_block_borders.reference b/tests/queries/0_stateless/01605_adaptive_granularity_block_borders.reference new file mode 100644 index 00000000000..81c7e6e4df0 --- /dev/null +++ b/tests/queries/0_stateless/01605_adaptive_granularity_block_borders.reference @@ -0,0 +1,2 @@ +849 +102400 diff --git a/tests/queries/0_stateless/01605_adaptive_granularity_block_borders.sql b/tests/queries/0_stateless/01605_adaptive_granularity_block_borders.sql new file mode 100644 index 00000000000..a73045f5a6f --- /dev/null +++ b/tests/queries/0_stateless/01605_adaptive_granularity_block_borders.sql @@ -0,0 +1,28 @@ +DROP TABLE IF EXISTS adaptive_table; + +--- If granularity of consequent blocks differs a lot, then adaptive +--- granularity will adjust amout of marks correctly. Data for test empirically +--- derived, it's quite hard to get good parameters. + +CREATE TABLE adaptive_table( + key UInt64, + value String +) ENGINE MergeTree() +ORDER BY key +SETTINGS index_granularity_bytes=1048576, min_bytes_for_wide_part = 0, enable_vertical_merge_algorithm = 0; + +SET max_block_size=900; + +-- There are about 900 marks for our settings. +INSERT INTO adaptive_table SELECT number, if(number > 700, randomPrintableASCII(102400), randomPrintableASCII(1)) FROM numbers(10000); + +OPTIMIZE TABLE adaptive_table FINAL; + +SELECT marks FROM system.parts WHERE table = 'adaptive_table' and database=currentDatabase() and active; + +-- If we have computed granularity incorrectly than we will exceed this limit. +SET max_memory_usage='30M'; + +SELECT max(length(value)) FROM adaptive_table; + +DROP TABLE IF EXISTS adaptive_table; From c8c99f47326eeafbb3fdb7527b7bdafa7d0cb1da Mon Sep 17 00:00:00 2001 From: alesapin Date: Fri, 18 Dec 2020 18:51:37 +0300 Subject: [PATCH 03/32] Fix tests --- .../00933_test_fix_extra_seek_on_compressed_cache.reference | 2 +- .../00961_checksums_in_system_parts_columns_table.reference | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/queries/0_stateless/00933_test_fix_extra_seek_on_compressed_cache.reference b/tests/queries/0_stateless/00933_test_fix_extra_seek_on_compressed_cache.reference index 797f208c02b..7a08495654c 100644 --- a/tests/queries/0_stateless/00933_test_fix_extra_seek_on_compressed_cache.reference +++ b/tests/queries/0_stateless/00933_test_fix_extra_seek_on_compressed_cache.reference @@ -1 +1 @@ -0 36 14 +0 36 13 diff --git a/tests/queries/0_stateless/00961_checksums_in_system_parts_columns_table.reference b/tests/queries/0_stateless/00961_checksums_in_system_parts_columns_table.reference index 099fe566817..186f2feab79 100644 --- a/tests/queries/0_stateless/00961_checksums_in_system_parts_columns_table.reference +++ b/tests/queries/0_stateless/00961_checksums_in_system_parts_columns_table.reference @@ -1 +1 @@ -20000101_1_1_0 test_00961 5f2e2d4bbc14336f44037e3ac667f247 ed226557cd4e18ecf3ae06c6d5e6725c da96ff1e527a8a1f908ddf2b1d0af239 +20000101_1_1_0 test_00961 b5fce9c4ef1ca42ce4ed027389c208d2 fc3b062b646cd23d4c23d7f5920f89ae da96ff1e527a8a1f908ddf2b1d0af239 From b2d53d802d99ada0f73017a9631d18f98bc266ef Mon Sep 17 00:00:00 2001 From: alesapin Date: Fri, 18 Dec 2020 23:32:52 +0300 Subject: [PATCH 04/32] Fix for non-adaptive granularity --- .../MergeTree/IMergeTreeDataPartWriter.cpp | 4 ++- .../MergeTree/MergeTreeDataPartWriterWide.cpp | 28 +++++++++++++------ 2 files changed, 23 insertions(+), 9 deletions(-) diff --git a/src/Storages/MergeTree/IMergeTreeDataPartWriter.cpp b/src/Storages/MergeTree/IMergeTreeDataPartWriter.cpp index df905215df1..83fd9692e49 100644 --- a/src/Storages/MergeTree/IMergeTreeDataPartWriter.cpp +++ b/src/Storages/MergeTree/IMergeTreeDataPartWriter.cpp @@ -49,7 +49,9 @@ IMergeTreeDataPartWriter::IMergeTreeDataPartWriter( , columns_list(columns_list_) , settings(settings_) , index_granularity(index_granularity_) - , with_final_mark(storage.getSettings()->write_final_mark && settings.can_use_adaptive_granularity){} + , with_final_mark(storage.getSettings()->write_final_mark && settings.can_use_adaptive_granularity) +{ +} Columns IMergeTreeDataPartWriter::releaseIndexColumns() { diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp index ca0e4e014a5..3769fd6daf2 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp @@ -429,8 +429,8 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const String & name, if (index_granularity_rows != index_granularity.getMarkRows(mark_num)) throw Exception( - ErrorCodes::LOGICAL_ERROR, "Incorrect mark rows for mark #{} (compressed offset {}, decompressed offset {}), in-memory {}, on disk {}, total marks {}", - mark_num, offset_in_compressed_file, offset_in_decompressed_block, index_granularity.getMarkRows(mark_num), index_granularity_rows, index_granularity.getMarksCount()); + ErrorCodes::LOGICAL_ERROR, "Incorrect mark rows for part {} for mark #{} (compressed offset {}, decompressed offset {}), in-memory {}, on disk {}, total marks {}", + data_part->getFullPath(), mark_num, offset_in_compressed_file, offset_in_decompressed_block, index_granularity.getMarkRows(mark_num), index_granularity_rows, index_granularity.getMarksCount()); auto column = type.createColumn(); @@ -444,6 +444,9 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const String & name, /// Now they must be equal if (column->size() != index_granularity_rows) { + if (must_be_last && !settings.can_use_adaptive_granularity) + break; + throw Exception( ErrorCodes::LOGICAL_ERROR, "Incorrect mark rows for mark #{} (compressed offset {}, decompressed offset {}), actually in bin file {}, in mrk file {}", mark_num, offset_in_compressed_file, offset_in_decompressed_block, column->size(), index_granularity.getMarkRows(mark_num)); @@ -575,19 +578,28 @@ void MergeTreeDataPartWriterWide::adjustLastMarkAndFlushToDisk() if (last_non_written_marks.empty()) throw Exception(ErrorCodes::LOGICAL_ERROR, "No saved marks for last mark {} having rows offset {}, total marks {}", getCurrentMark(), rows_written_in_last_mark, index_granularity.getMarksCount()); - index_granularity.popMark(); - index_granularity.appendMark(rows_written_in_last_mark); + if (settings.can_use_adaptive_granularity) + { + index_granularity.popMark(); + index_granularity.appendMark(rows_written_in_last_mark); + } + for (const auto & [name, marks] : last_non_written_marks) + { for (const auto & mark : marks) flushMarkToFile(mark, index_granularity.getMarkRows(getCurrentMark())); + } last_non_written_marks.clear(); - for (size_t i = 0; i < skip_indices.size(); ++i) - ++skip_index_accumulated_marks[i]; + if (settings.can_use_adaptive_granularity) + { + for (size_t i = 0; i < skip_indices.size(); ++i) + ++skip_index_accumulated_marks[i]; - setCurrentMark(getCurrentMark() + 1); - rows_written_in_last_mark = 0; + setCurrentMark(getCurrentMark() + 1); + rows_written_in_last_mark = 0; + } } } From ca6708e5945e983625469f1e878659a2670d7e70 Mon Sep 17 00:00:00 2001 From: alesapin Date: Sat, 19 Dec 2020 21:59:20 +0300 Subject: [PATCH 05/32] Fix check and granule adjust for non adaptive parts --- .../MergeTree/MergeTreeDataPartWriterWide.cpp | 38 ++++++++++++------- .../MergeTree/MergeTreeDataPartWriterWide.h | 2 +- 2 files changed, 26 insertions(+), 14 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp index 3769fd6daf2..84deccbf23e 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp @@ -172,7 +172,7 @@ void MergeTreeDataPartWriterWide::write(const Block & block, const IColumn::Perm { size_t rows_left_in_last_mark = index_granularity.getMarkRows(getCurrentMark()) - rows_written_in_last_mark; if (rows_left_in_last_mark > index_granularity_for_block) - adjustLastMarkAndFlushToDisk(); + adjustLastMarkIfNeedAndFlushToDisk(); } fillIndexGranularity(index_granularity_for_block, block.rows()); @@ -390,7 +390,7 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const String & name, bool must_be_last = false; UInt64 offset_in_compressed_file = 0; UInt64 offset_in_decompressed_block = 0; - UInt64 index_granularity_rows = 0; + UInt64 index_granularity_rows = data_part->index_granularity_info.fixed_index_granularity; size_t mark_num; @@ -404,7 +404,7 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const String & name, if (settings.can_use_adaptive_granularity) DB::readBinary(index_granularity_rows, mrk_in); else - index_granularity_rows = storage.getSettings()->index_granularity; + index_granularity_rows = data_part->index_granularity_info.fixed_index_granularity; if (must_be_last) { @@ -476,7 +476,7 @@ void MergeTreeDataPartWriterWide::finishDataSerialization(IMergeTreeDataPart::Ch serialize_settings.low_cardinality_use_single_dictionary_for_part = global_settings.low_cardinality_use_single_dictionary_for_part != 0; WrittenOffsetColumns offset_columns; if (rows_written_in_last_mark > 0) - adjustLastMarkAndFlushToDisk(); + adjustLastMarkIfNeedAndFlushToDisk(); bool write_final_mark = (with_final_mark && data_written); @@ -506,6 +506,8 @@ void MergeTreeDataPartWriterWide::finishDataSerialization(IMergeTreeDataPart::Ch serialization_states.clear(); #ifndef NDEBUG + /// Heavy weight validation of written data. Checks that we are able to read + /// data according to marks. Otherwise throws LOGICAL_ERROR (equal to about in debug mode) for (const auto & column : columns_list) { if (column.type->isValueRepresentedByNumber() && !column.type->haveSubtypes()) @@ -570,20 +572,26 @@ void MergeTreeDataPartWriterWide::fillIndexGranularity(size_t index_granularity_ } -void MergeTreeDataPartWriterWide::adjustLastMarkAndFlushToDisk() +void MergeTreeDataPartWriterWide::adjustLastMarkIfNeedAndFlushToDisk() { - if (getCurrentMark() != index_granularity.getMarksCount() - 1) - throw Exception(ErrorCodes::LOGICAL_ERROR, "Non last mark {} having rows offset {}, total marks {}", getCurrentMark(), rows_written_in_last_mark, index_granularity.getMarksCount()); - - if (last_non_written_marks.empty()) - throw Exception(ErrorCodes::LOGICAL_ERROR, "No saved marks for last mark {} having rows offset {}, total marks {}", getCurrentMark(), rows_written_in_last_mark, index_granularity.getMarksCount()); - - if (settings.can_use_adaptive_granularity) + /// We can adjust marks only if we computed granularity for blocks. + /// Otherwise we cannot change granularity because it will differ from + /// other columns + if (compute_granularity && settings.can_use_adaptive_granularity) { + if (getCurrentMark() != index_granularity.getMarksCount() - 1) + throw Exception(ErrorCodes::LOGICAL_ERROR, "Non last mark {} (with {} rows) having rows offset {}, total marks {}", + getCurrentMark(), index_granularity.getMarkRows(getCurrentMark()), rows_written_in_last_mark, index_granularity.getMarksCount()); + index_granularity.popMark(); index_granularity.appendMark(rows_written_in_last_mark); } + /// Last mark should be filled, otherwise it's a bug + if (last_non_written_marks.empty()) + throw Exception(ErrorCodes::LOGICAL_ERROR, "No saved marks for last mark {} having rows offset {}, total marks {}", + getCurrentMark(), rows_written_in_last_mark, index_granularity.getMarksCount()); + for (const auto & [name, marks] : last_non_written_marks) { for (const auto & mark : marks) @@ -592,12 +600,16 @@ void MergeTreeDataPartWriterWide::adjustLastMarkAndFlushToDisk() last_non_written_marks.clear(); - if (settings.can_use_adaptive_granularity) + if (compute_granularity && settings.can_use_adaptive_granularity) { + /// Also we add mark to each skip index because all of them + /// already accumulated all rows from current adjusting mark for (size_t i = 0; i < skip_indices.size(); ++i) ++skip_index_accumulated_marks[i]; + /// This mark completed, go further setCurrentMark(getCurrentMark() + 1); + /// Without offset rows_written_in_last_mark = 0; } } diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h index 3fc3e3bfa3d..b350e590d07 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h @@ -104,7 +104,7 @@ private: /// /// This function used when blocks change granularity drastically and we have unfinished mark. /// Also useful to have exact amount of rows in last (non-final) mark. - void adjustLastMarkAndFlushToDisk(); + void adjustLastMarkIfNeedAndFlushToDisk(); IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenOffsetColumns & offset_columns) const; From 0041c293abdc00c6b1f20bcb8efad84ff4e1d1c2 Mon Sep 17 00:00:00 2001 From: alesapin Date: Sun, 20 Dec 2020 11:01:39 +0300 Subject: [PATCH 06/32] Fix gcc-10 build --- src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp index 84deccbf23e..beb3646df17 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp @@ -355,13 +355,13 @@ void MergeTreeDataPartWriterWide::writeColumn( if (granule.is_complete) { - auto it = last_non_written_marks.find(name); - if (it == last_non_written_marks.end()) + auto marks_it = last_non_written_marks.find(name); + if (marks_it == last_non_written_marks.end()) throw Exception(ErrorCodes::LOGICAL_ERROR, "No mark was saved for incomplete granule for column {}", backQuoteIfNeed(name)); - for (const auto & mark : it->second) + for (const auto & mark : marks_it->second) flushMarkToFile(mark, index_granularity.getMarkRows(granule.mark_number)); - last_non_written_marks.erase(it); + last_non_written_marks.erase(marks_it); } } From 07b5c03c542e835ad8c1b0b905d9532b3dab4b20 Mon Sep 17 00:00:00 2001 From: alesapin Date: Mon, 21 Dec 2020 11:24:52 +0300 Subject: [PATCH 07/32] Better adjustment of the last mark --- .../MergeTree/MergeTreeDataPartWriterWide.cpp | 51 ++++++++++++------- .../MergeTree/MergeTreeDataPartWriterWide.h | 7 +-- .../0_stateless/00955_test_final_mark.sql | 16 +++--- 3 files changed, 44 insertions(+), 30 deletions(-) diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp index beb3646df17..be735104e99 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.cpp @@ -171,8 +171,18 @@ void MergeTreeDataPartWriterWide::write(const Block & block, const IColumn::Perm if (rows_written_in_last_mark > 0) { size_t rows_left_in_last_mark = index_granularity.getMarkRows(getCurrentMark()) - rows_written_in_last_mark; + /// Previous granularity was much bigger than our new block's + /// granularity let's adjust it, because we want add new + /// heavy-weight blocks into small old granule. if (rows_left_in_last_mark > index_granularity_for_block) - adjustLastMarkIfNeedAndFlushToDisk(); + { + /// We have already written more rows than granularity of our block. + /// adjust last mark rows and flush to disk. + if (rows_written_in_last_mark >= index_granularity_for_block) + adjustLastMarkIfNeedAndFlushToDisk(rows_written_in_last_mark); + else /// We still can write some rows from new block into previous granule. + adjustLastMarkIfNeedAndFlushToDisk(index_granularity_for_block - rows_written_in_last_mark); + } } fillIndexGranularity(index_granularity_for_block, block.rows()); @@ -476,7 +486,7 @@ void MergeTreeDataPartWriterWide::finishDataSerialization(IMergeTreeDataPart::Ch serialize_settings.low_cardinality_use_single_dictionary_for_part = global_settings.low_cardinality_use_single_dictionary_for_part != 0; WrittenOffsetColumns offset_columns; if (rows_written_in_last_mark > 0) - adjustLastMarkIfNeedAndFlushToDisk(); + adjustLastMarkIfNeedAndFlushToDisk(rows_written_in_last_mark); bool write_final_mark = (with_final_mark && data_written); @@ -572,7 +582,7 @@ void MergeTreeDataPartWriterWide::fillIndexGranularity(size_t index_granularity_ } -void MergeTreeDataPartWriterWide::adjustLastMarkIfNeedAndFlushToDisk() +void MergeTreeDataPartWriterWide::adjustLastMarkIfNeedAndFlushToDisk(size_t new_rows_in_last_mark) { /// We can adjust marks only if we computed granularity for blocks. /// Otherwise we cannot change granularity because it will differ from @@ -584,7 +594,7 @@ void MergeTreeDataPartWriterWide::adjustLastMarkIfNeedAndFlushToDisk() getCurrentMark(), index_granularity.getMarkRows(getCurrentMark()), rows_written_in_last_mark, index_granularity.getMarksCount()); index_granularity.popMark(); - index_granularity.appendMark(rows_written_in_last_mark); + index_granularity.appendMark(new_rows_in_last_mark); } /// Last mark should be filled, otherwise it's a bug @@ -592,25 +602,28 @@ void MergeTreeDataPartWriterWide::adjustLastMarkIfNeedAndFlushToDisk() throw Exception(ErrorCodes::LOGICAL_ERROR, "No saved marks for last mark {} having rows offset {}, total marks {}", getCurrentMark(), rows_written_in_last_mark, index_granularity.getMarksCount()); - for (const auto & [name, marks] : last_non_written_marks) + if (rows_written_in_last_mark == new_rows_in_last_mark) { - for (const auto & mark : marks) - flushMarkToFile(mark, index_granularity.getMarkRows(getCurrentMark())); - } + for (const auto & [name, marks] : last_non_written_marks) + { + for (const auto & mark : marks) + flushMarkToFile(mark, index_granularity.getMarkRows(getCurrentMark())); + } - last_non_written_marks.clear(); + last_non_written_marks.clear(); - if (compute_granularity && settings.can_use_adaptive_granularity) - { - /// Also we add mark to each skip index because all of them - /// already accumulated all rows from current adjusting mark - for (size_t i = 0; i < skip_indices.size(); ++i) - ++skip_index_accumulated_marks[i]; + if (compute_granularity && settings.can_use_adaptive_granularity) + { + /// Also we add mark to each skip index because all of them + /// already accumulated all rows from current adjusting mark + for (size_t i = 0; i < skip_indices.size(); ++i) + ++skip_index_accumulated_marks[i]; - /// This mark completed, go further - setCurrentMark(getCurrentMark() + 1); - /// Without offset - rows_written_in_last_mark = 0; + /// This mark completed, go further + setCurrentMark(getCurrentMark() + 1); + /// Without offset + rows_written_in_last_mark = 0; + } } } diff --git a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h index b350e590d07..8c76c10abef 100644 --- a/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h +++ b/src/Storages/MergeTree/MergeTreeDataPartWriterWide.h @@ -99,12 +99,13 @@ private: /// in our index_granularity array. void shiftCurrentMark(const Granules & granules_written); - /// Change rows in the last mark in index_granularity to rows_written_in_last_mark. - /// Flush all marks from last_non_written_marks to disk and increment current mark. + /// Change rows in the last mark in index_granularity to new_rows_in_last_mark. + /// Flush all marks from last_non_written_marks to disk and increment current mark if already written rows + /// (rows_written_in_last_granule) equal to new_rows_in_last_mark. /// /// This function used when blocks change granularity drastically and we have unfinished mark. /// Also useful to have exact amount of rows in last (non-final) mark. - void adjustLastMarkIfNeedAndFlushToDisk(); + void adjustLastMarkIfNeedAndFlushToDisk(size_t new_rows_in_last_mark); IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenOffsetColumns & offset_columns) const; diff --git a/tests/queries/0_stateless/00955_test_final_mark.sql b/tests/queries/0_stateless/00955_test_final_mark.sql index 50ca3d008f9..e020f10b71a 100644 --- a/tests/queries/0_stateless/00955_test_final_mark.sql +++ b/tests/queries/0_stateless/00955_test_final_mark.sql @@ -18,7 +18,7 @@ INSERT INTO mt_with_pk (d, x, y, z, `n.Age`, `n.Name`) VALUES (toDate('2018-10-0 SELECT COUNT(*) FROM mt_with_pk WHERE x > toDateTime('2018-10-01 23:57:57'); -SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_pk' AND database = currentDatabase() AND active=1; +SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_pk' AND database = currentDatabase() AND active=1 AND database = currentDatabase(); SELECT '===test merge==='; INSERT INTO mt_with_pk (d, x, y, z, `n.Age`, `n.Name`) VALUES (toDate('2018-10-01'), toDateTime('2018-10-01 07:57:57'), [4, 4, 4], 14, [111, 222], ['Lui', 'Dave']), (toDate('2018-10-01'), toDateTime('2018-10-01 08:57:57'), [5, 5, 5], 15, [333, 444], ['John', 'Mike']), (toDate('2018-10-01'), toDateTime('2018-10-01 09:57:57'), [6, 6, 6], 16, [555, 666, 777], ['Alex', 'Jim', 'Tom']); @@ -27,7 +27,7 @@ OPTIMIZE TABLE mt_with_pk FINAL; SELECT COUNT(*) FROM mt_with_pk WHERE x > toDateTime('2018-10-01 23:57:57'); -SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_pk' AND database = currentDatabase() AND active=1; +SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_pk' AND database = currentDatabase() AND active=1 AND database = currentDatabase(); SELECT '===test alter==='; ALTER TABLE mt_with_pk MODIFY COLUMN y Array(String); @@ -38,7 +38,7 @@ OPTIMIZE TABLE mt_with_pk FINAL; SELECT COUNT(*) FROM mt_with_pk WHERE x > toDateTime('2018-10-01 23:57:57'); -SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_pk' AND database = currentDatabase() AND active=1; +SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_pk' AND database = currentDatabase() AND active=1 AND database = currentDatabase(); SELECT '===test mutation==='; ALTER TABLE mt_with_pk UPDATE w = 0 WHERE 1 SETTINGS mutations_sync = 2; @@ -58,7 +58,7 @@ OPTIMIZE TABLE mt_with_pk FINAL; SELECT COUNT(*) FROM mt_with_pk WHERE z + w > 5000; -SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_pk' AND database = currentDatabase() AND active=1; +SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_pk' AND database = currentDatabase() AND active=1 AND database = currentDatabase(); DROP TABLE IF EXISTS mt_with_pk; @@ -119,7 +119,7 @@ INSERT INTO mt_without_pk (d, x, y, z, `n.Age`, `n.Name`) VALUES (toDate('2018-1 SELECT COUNT(*) FROM mt_without_pk WHERE x > toDateTime('2018-10-01 23:57:57'); -SELECT sum(marks) FROM system.parts WHERE table = 'mt_without_pk' AND active=1; +SELECT sum(marks) FROM system.parts WHERE table = 'mt_without_pk' AND active=1 AND database = currentDatabase(); INSERT INTO mt_without_pk (d, x, y, z, `n.Age`, `n.Name`) VALUES (toDate('2018-10-01'), toDateTime('2018-10-01 07:57:57'), [4, 4, 4], 14, [111, 222], ['Lui', 'Dave']), (toDate('2018-10-01'), toDateTime('2018-10-01 08:57:57'), [5, 5, 5], 15, [333, 444], ['John', 'Mike']), (toDate('2018-10-01'), toDateTime('2018-10-01 09:57:57'), [6, 6, 6], 16, [555, 666, 777], ['Alex', 'Jim', 'Tom']); @@ -127,7 +127,7 @@ OPTIMIZE TABLE mt_without_pk FINAL; SELECT COUNT(*) FROM mt_without_pk WHERE x > toDateTime('2018-10-01 23:57:57'); -SELECT sum(marks) FROM system.parts WHERE table = 'mt_without_pk' AND active=1; +SELECT sum(marks) FROM system.parts WHERE table = 'mt_without_pk' AND active=1 AND database = currentDatabase(); DROP TABLE IF EXISTS mt_without_pk; @@ -149,7 +149,7 @@ INSERT INTO mt_with_small_granularity (d, x, y, z, `n.Age`, `n.Name`) VALUES (to SELECT COUNT(*) FROM mt_with_small_granularity WHERE x > toDateTime('2018-10-01 23:57:57'); -SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_small_granularity' AND active=1; +SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_small_granularity' AND active=1 AND database = currentDatabase(); INSERT INTO mt_with_small_granularity (d, x, y, z, `n.Age`, `n.Name`) VALUES (toDate('2018-10-01'), toDateTime('2018-10-01 07:57:57'), [4, 4, 4], 14, [111, 222], ['Lui', 'Dave']), (toDate('2018-10-01'), toDateTime('2018-10-01 08:57:57'), [5, 5, 5], 15, [333, 444], ['John', 'Mike']), (toDate('2018-10-01'), toDateTime('2018-10-01 09:57:57'), [6, 6, 6], 16, [555, 666, 777], ['Alex', 'Jim', 'Tom']); @@ -157,6 +157,6 @@ OPTIMIZE TABLE mt_with_small_granularity FINAL; SELECT COUNT(*) FROM mt_with_small_granularity WHERE x > toDateTime('2018-10-01 23:57:57'); -SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_small_granularity' AND active=1; +SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_small_granularity' AND active=1 AND database = currentDatabase(); DROP TABLE IF EXISTS mt_with_small_granularity; From 57a16420353be05b84742c15c7ceef3ed91d4247 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Tue, 22 Dec 2020 14:30:29 +0300 Subject: [PATCH 08/32] fix race in aggregation with combinator distinct --- src/Common/ThreadPool.cpp | 7 +++++ src/Common/ThreadPool.h | 1 + src/Interpreters/Aggregator.cpp | 31 ++++++++++++------- src/Interpreters/Aggregator.h | 1 + .../01605_dictinct_two_level.reference | 20 ++++++++++++ .../0_stateless/01605_dictinct_two_level.sql | 25 +++++++++++++++ 6 files changed, 74 insertions(+), 11 deletions(-) create mode 100644 tests/queries/0_stateless/01605_dictinct_two_level.reference create mode 100644 tests/queries/0_stateless/01605_dictinct_two_level.sql diff --git a/src/Common/ThreadPool.cpp b/src/Common/ThreadPool.cpp index 7b2c2108629..7fc0d65aa5b 100644 --- a/src/Common/ThreadPool.cpp +++ b/src/Common/ThreadPool.cpp @@ -55,6 +55,13 @@ void ThreadPoolImpl::setMaxThreads(size_t value) max_threads = value; } +template +size_t ThreadPoolImpl::getMaxThreads() const +{ + std::lock_guard lock(mutex); + return max_threads; +} + template void ThreadPoolImpl::setMaxFreeThreads(size_t value) { diff --git a/src/Common/ThreadPool.h b/src/Common/ThreadPool.h index 8dd6cbbe02c..0ae023e4ebd 100644 --- a/src/Common/ThreadPool.h +++ b/src/Common/ThreadPool.h @@ -71,6 +71,7 @@ public: void setMaxThreads(size_t value); void setMaxFreeThreads(size_t value); void setQueueSize(size_t value); + size_t getMaxThreads() const; private: mutable std::mutex mutex; diff --git a/src/Interpreters/Aggregator.cpp b/src/Interpreters/Aggregator.cpp index 87abca4d7cd..f6f1f7c8d53 100644 --- a/src/Interpreters/Aggregator.cpp +++ b/src/Interpreters/Aggregator.cpp @@ -913,15 +913,15 @@ template Block Aggregator::convertOneBucketToBlock( AggregatedDataVariants & data_variants, Method & method, + Arena * arena, bool final, size_t bucket) const { Block block = prepareBlockAndFill(data_variants, final, method.data.impls[bucket].size(), - [bucket, &method, this] ( + [bucket, &method, arena, this] ( MutableColumns & key_columns, AggregateColumnsData & aggregate_columns, MutableColumns & final_aggregate_columns, - Arena * arena, bool final_) { convertToBlockImpl(method, method.data.impls[bucket], @@ -950,7 +950,7 @@ Block Aggregator::mergeAndConvertOneBucketToBlock( mergeBucketImpl(variants, bucket, arena); \ if (is_cancelled && is_cancelled->load(std::memory_order_seq_cst)) \ return {}; \ - block = convertOneBucketToBlock(merged_data, *merged_data.NAME, final, bucket); \ + block = convertOneBucketToBlock(merged_data, *merged_data.NAME, arena, final, bucket); \ } APPLY_FOR_VARIANTS_TWO_LEVEL(M) @@ -982,7 +982,7 @@ void Aggregator::writeToTemporaryFileImpl( for (size_t bucket = 0; bucket < Method::Data::NUM_BUCKETS; ++bucket) { - Block block = convertOneBucketToBlock(data_variants, method, false, bucket); + Block block = convertOneBucketToBlock(data_variants, method, data_variants.aggregates_pool, false, bucket); out.write(block); update_max_sizes(block); } @@ -1285,7 +1285,7 @@ Block Aggregator::prepareBlockAndFill( } } - filler(key_columns, aggregate_columns_data, final_aggregate_columns, data_variants.aggregates_pool, final); + filler(key_columns, aggregate_columns_data, final_aggregate_columns, final); Block res = header.cloneEmpty(); @@ -1352,7 +1352,6 @@ Block Aggregator::prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_va MutableColumns & key_columns, AggregateColumnsData & aggregate_columns, MutableColumns & final_aggregate_columns, - Arena * arena, bool final_) { if (data_variants.type == AggregatedDataVariants::Type::without_key || params.overflow_row) @@ -1367,7 +1366,8 @@ Block Aggregator::prepareBlockAndFillWithoutKey(AggregatedDataVariants & data_va } else { - insertAggregatesIntoColumns(data, final_aggregate_columns, arena); + /// Always single-thread. It's safe to pass current arena from 'aggregates_pool'. + insertAggregatesIntoColumns(data, final_aggregate_columns, data_variants.aggregates_pool); } if (params.overflow_row) @@ -1395,13 +1395,12 @@ Block Aggregator::prepareBlockAndFillSingleLevel(AggregatedDataVariants & data_v MutableColumns & key_columns, AggregateColumnsData & aggregate_columns, MutableColumns & final_aggregate_columns, - Arena * arena, bool final_) { #define M(NAME) \ else if (data_variants.type == AggregatedDataVariants::Type::NAME) \ convertToBlockImpl(*data_variants.NAME, data_variants.NAME->data, \ - key_columns, aggregate_columns, final_aggregate_columns, arena, final_); + key_columns, aggregate_columns, final_aggregate_columns, data_variants.aggregates_pool, final_); if (false) {} // NOLINT APPLY_FOR_VARIANTS_SINGLE_LEVEL(M) @@ -1435,11 +1434,21 @@ BlocksList Aggregator::prepareBlocksAndFillTwoLevelImpl( bool final, ThreadPool * thread_pool) const { + size_t max_threads = thread_pool->getMaxThreads(); + if (max_threads > data_variants.aggregates_pools.size()) + for (size_t i = data_variants.aggregates_pools.size(); i < max_threads; ++i) + data_variants.aggregates_pools.push_back(std::make_shared()); + auto converter = [&](size_t bucket, ThreadGroupStatusPtr thread_group) { if (thread_group) CurrentThread::attachToIfDetached(thread_group); - return convertOneBucketToBlock(data_variants, method, final, bucket); + + /// Select Arena to avoid race conditions + size_t thread_number = static_cast(bucket) % max_threads; + Arena * arena = data_variants.aggregates_pools.at(thread_number).get(); + + return convertOneBucketToBlock(data_variants, method, arena, final, bucket); }; /// packaged_task is used to ensure that exceptions are automatically thrown into the main stream. @@ -1949,7 +1958,7 @@ private: else if (method == AggregatedDataVariants::Type::NAME) \ { \ aggregator.mergeBucketImpl(data, bucket_num, arena); \ - block = aggregator.convertOneBucketToBlock(merged_data, *merged_data.NAME, final, bucket_num); \ + block = aggregator.convertOneBucketToBlock(merged_data, *merged_data.NAME, arena, final, bucket_num); \ } APPLY_FOR_VARIANTS_TWO_LEVEL(M) diff --git a/src/Interpreters/Aggregator.h b/src/Interpreters/Aggregator.h index c688da9d32d..86806b7fbad 100644 --- a/src/Interpreters/Aggregator.h +++ b/src/Interpreters/Aggregator.h @@ -1212,6 +1212,7 @@ protected: Block convertOneBucketToBlock( AggregatedDataVariants & data_variants, Method & method, + Arena * arena, bool final, size_t bucket) const; diff --git a/tests/queries/0_stateless/01605_dictinct_two_level.reference b/tests/queries/0_stateless/01605_dictinct_two_level.reference new file mode 100644 index 00000000000..50d1615e1aa --- /dev/null +++ b/tests/queries/0_stateless/01605_dictinct_two_level.reference @@ -0,0 +1,20 @@ +['0'] +['1'] +['2'] +['3'] +['4'] +['5'] +['6'] +['7'] +['8'] +['9'] +test.com ['foo3223','foo6455','foo382','foo5566','foo1037'] +test.com0 ['foo0'] +test.com0.0001 ['foo1'] +test.com0.0002 ['foo2'] +test.com0.0003 ['foo3'] +test.com0.0004 ['foo4'] +test.com0.0005 ['foo5'] +test.com0.0006 ['foo6'] +test.com0.0007 ['foo7'] +test.com0.0008 ['foo8'] diff --git a/tests/queries/0_stateless/01605_dictinct_two_level.sql b/tests/queries/0_stateless/01605_dictinct_two_level.sql new file mode 100644 index 00000000000..5f20ae590c5 --- /dev/null +++ b/tests/queries/0_stateless/01605_dictinct_two_level.sql @@ -0,0 +1,25 @@ +SET group_by_two_level_threshold_bytes = 1; +SET group_by_two_level_threshold = 1; + +SELECT groupArray(DISTINCT toString(number % 10)) FROM numbers_mt(50000) + GROUP BY number ORDER BY number LIMIT 10 + SETTINGS max_threads = 2, max_block_size = 2000; + +DROP TABLE IF EXISTS dictinct_two_level; + +CREATE TABLE dictinct_two_level ( + time DateTime64(3), + domain String, + subdomain String +) ENGINE = MergeTree ORDER BY time; + +INSERT INTO dictinct_two_level SELECT 1546300800000, 'test.com', concat('foo', toString(number % 10000)) from numbers(10000); +INSERT INTO dictinct_two_level SELECT 1546300800000, concat('test.com', toString(number / 10000)) , concat('foo', toString(number % 10000)) from numbers(10000); + +SELECT + domain, groupArraySample(5, 11111)(DISTINCT subdomain) AS example_subdomains +FROM dictinct_two_level +GROUP BY domain ORDER BY domain, example_subdomains +LIMIT 10; + +DROP TABLE IF EXISTS dictinct_two_level; From eecd51bb75fa603d802a258f93c1718935bd3975 Mon Sep 17 00:00:00 2001 From: Anton Popov Date: Tue, 22 Dec 2020 21:29:47 +0300 Subject: [PATCH 09/32] fix for uninitialized thread pool --- src/Interpreters/Aggregator.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Interpreters/Aggregator.cpp b/src/Interpreters/Aggregator.cpp index f6f1f7c8d53..de2c86d7c12 100644 --- a/src/Interpreters/Aggregator.cpp +++ b/src/Interpreters/Aggregator.cpp @@ -1434,7 +1434,7 @@ BlocksList Aggregator::prepareBlocksAndFillTwoLevelImpl( bool final, ThreadPool * thread_pool) const { - size_t max_threads = thread_pool->getMaxThreads(); + size_t max_threads = thread_pool ? thread_pool->getMaxThreads() : 1; if (max_threads > data_variants.aggregates_pools.size()) for (size_t i = data_variants.aggregates_pools.size(); i < max_threads; ++i) data_variants.aggregates_pools.push_back(std::make_shared()); From e49021630e6b948769f9e3850c566659af29e8d1 Mon Sep 17 00:00:00 2001 From: Azat Khuzhin Date: Tue, 22 Dec 2020 22:21:19 +0300 Subject: [PATCH 10/32] Do not override RULE_LAUNCH_COMPILE/RULE_LAUNCH_LINK in rocksdb - it is already done in in find/ccache.cmake - it does not respect disabling ccache - it does not allow to override ccache using custom location (not in PATH) --- contrib/rocksdb-cmake/CMakeLists.txt | 6 ------ 1 file changed, 6 deletions(-) diff --git a/contrib/rocksdb-cmake/CMakeLists.txt b/contrib/rocksdb-cmake/CMakeLists.txt index f99401ce75d..77a30776a4a 100644 --- a/contrib/rocksdb-cmake/CMakeLists.txt +++ b/contrib/rocksdb-cmake/CMakeLists.txt @@ -2,12 +2,6 @@ set(ROCKSDB_SOURCE_DIR "${ClickHouse_SOURCE_DIR}/contrib/rocksdb") list(APPEND CMAKE_MODULE_PATH "${ROCKSDB_SOURCE_DIR}/cmake/modules/") -find_program(CCACHE_FOUND ccache) -if(CCACHE_FOUND) - set_property(GLOBAL PROPERTY RULE_LAUNCH_COMPILE ccache) - set_property(GLOBAL PROPERTY RULE_LAUNCH_LINK ccache) -endif(CCACHE_FOUND) - if (SANITIZE STREQUAL "undefined") set(WITH_UBSAN ON) elseif (SANITIZE STREQUAL "address") From 9e31800ebb0481270739595e302d8f298839fc95 Mon Sep 17 00:00:00 2001 From: Vitaly Baranov Date: Tue, 22 Dec 2020 23:16:25 +0300 Subject: [PATCH 11/32] Fix filling table system.settings_profile_elements. --- .../StorageSystemSettingsProfileElements.cpp | 168 ++++++++---------- ..._settings_profile_while_assigned.reference | 2 + ...5_drop_settings_profile_while_assigned.sql | 8 + 3 files changed, 82 insertions(+), 96 deletions(-) create mode 100644 tests/queries/0_stateless/01605_drop_settings_profile_while_assigned.reference create mode 100644 tests/queries/0_stateless/01605_drop_settings_profile_while_assigned.sql diff --git a/src/Storages/System/StorageSystemSettingsProfileElements.cpp b/src/Storages/System/StorageSystemSettingsProfileElements.cpp index 6d6df4fe114..cf47416e188 100644 --- a/src/Storages/System/StorageSystemSettingsProfileElements.cpp +++ b/src/Storages/System/StorageSystemSettingsProfileElements.cpp @@ -68,121 +68,97 @@ void StorageSystemSettingsProfileElements::fillData(MutableColumns & res_columns auto add_rows_for_single_element = [&](const String & owner_name, EntityType owner_type, const SettingsProfileElement & element, size_t & index) { - switch (owner_type) + size_t old_num_rows = column_profile_name.size(); + size_t new_num_rows = old_num_rows + 1; + size_t current_index = index++; + + bool inserted_value = false; + if (!element.value.isNull() && !element.setting_name.empty()) { - case EntityType::SETTINGS_PROFILE: - { - column_user_name.insertDefault(); - column_user_name_null_map.push_back(true); - column_role_name.insertDefault(); - column_role_name_null_map.push_back(true); - column_profile_name.insertData(owner_name.data(), owner_name.length()); - column_profile_name_null_map.push_back(false); - break; - } - case EntityType::USER: - { - column_user_name.insertData(owner_name.data(), owner_name.length()); - column_user_name_null_map.push_back(false); - column_profile_name.insertDefault(); - column_profile_name_null_map.push_back(true); - column_role_name.insertDefault(); - column_role_name_null_map.push_back(true); - break; - } - case EntityType::ROLE: - { - column_user_name.insertDefault(); - column_user_name_null_map.push_back(true); - column_role_name.insertData(owner_name.data(), owner_name.length()); - column_role_name_null_map.push_back(false); - column_profile_name.insertDefault(); - column_profile_name_null_map.push_back(true); - break; - } - default: - assert(false); + String str = Settings::valueToStringUtil(element.setting_name, element.value); + column_value.insertData(str.data(), str.length()); + column_value_null_map.push_back(false); + inserted_value = true; } + bool inserted_min = false; + if (!element.min_value.isNull() && !element.setting_name.empty()) + { + String str = Settings::valueToStringUtil(element.setting_name, element.min_value); + column_min.insertData(str.data(), str.length()); + column_min_null_map.push_back(false); + inserted_min = true; + } + + bool inserted_max = false; + if (!element.max_value.isNull() && !element.setting_name.empty()) + { + String str = Settings::valueToStringUtil(element.setting_name, element.max_value); + column_max.insertData(str.data(), str.length()); + column_max_null_map.push_back(false); + inserted_max = true; + } + + bool inserted_readonly = false; + if (element.readonly && !element.setting_name.empty()) + { + column_readonly.push_back(*element.readonly); + column_readonly_null_map.push_back(false); + inserted_readonly = true; + } + + bool inserted_setting_name = false; + if (inserted_value || inserted_min || inserted_max || inserted_readonly) + { + const auto & setting_name = element.setting_name; + column_setting_name.insertData(setting_name.data(), setting_name.size()); + column_setting_name_null_map.push_back(false); + inserted_setting_name = true; + } + + bool inserted_inherit_profile = false; if (element.parent_profile) { auto parent_profile = access_control.tryReadName(*element.parent_profile); if (parent_profile) { - column_index.push_back(index++); - column_setting_name.insertDefault(); - column_setting_name_null_map.push_back(true); - column_value.insertDefault(); - column_value_null_map.push_back(true); - column_min.insertDefault(); - column_min_null_map.push_back(true); - column_max.insertDefault(); - column_max_null_map.push_back(true); - column_readonly.push_back(0); - column_readonly_null_map.push_back(true); const String & parent_profile_str = *parent_profile; column_inherit_profile.insertData(parent_profile_str.data(), parent_profile_str.length()); column_inherit_profile_null_map.push_back(false); + inserted_inherit_profile = true; } } - if (!element.setting_name.empty() - && (!element.value.isNull() || !element.min_value.isNull() || !element.max_value.isNull() || element.readonly)) + if (inserted_setting_name || inserted_inherit_profile) { - const auto & setting_name = element.setting_name; - column_index.push_back(index++); - column_setting_name.insertData(setting_name.data(), setting_name.size()); - column_setting_name_null_map.push_back(false); - - if (element.value.isNull()) + switch (owner_type) { - column_value.insertDefault(); - column_value_null_map.push_back(true); - } - else - { - String str = Settings::valueToStringUtil(setting_name, element.value); - column_value.insertData(str.data(), str.length()); - column_value_null_map.push_back(false); + case EntityType::SETTINGS_PROFILE: + { + column_profile_name.insertData(owner_name.data(), owner_name.length()); + column_profile_name_null_map.push_back(false); + break; + } + case EntityType::USER: + { + column_user_name.insertData(owner_name.data(), owner_name.length()); + column_user_name_null_map.push_back(false); + break; + } + case EntityType::ROLE: + { + column_role_name.insertData(owner_name.data(), owner_name.length()); + column_role_name_null_map.push_back(false); + break; + } + default: + assert(false); } - if (element.min_value.isNull()) - { - column_min.insertDefault(); - column_min_null_map.push_back(true); - } - else - { - String str = Settings::valueToStringUtil(setting_name, element.min_value); - column_min.insertData(str.data(), str.length()); - column_min_null_map.push_back(false); - } + column_index.push_back(current_index); - if (element.max_value.isNull()) - { - column_max.insertDefault(); - column_max_null_map.push_back(true); - } - else - { - String str = Settings::valueToStringUtil(setting_name, element.max_value); - column_max.insertData(str.data(), str.length()); - column_max_null_map.push_back(false); - } - - if (element.readonly) - { - column_readonly.push_back(*element.readonly); - column_readonly_null_map.push_back(false); - } - else - { - column_readonly.push_back(0); - column_readonly_null_map.push_back(true); - } - - column_inherit_profile.insertDefault(); - column_inherit_profile_null_map.push_back(true); + for (auto & res_column : res_columns) + res_column->insertManyDefaults(new_num_rows - res_column->size()); } }; diff --git a/tests/queries/0_stateless/01605_drop_settings_profile_while_assigned.reference b/tests/queries/0_stateless/01605_drop_settings_profile_while_assigned.reference new file mode 100644 index 00000000000..47942812a11 --- /dev/null +++ b/tests/queries/0_stateless/01605_drop_settings_profile_while_assigned.reference @@ -0,0 +1,2 @@ +\N test_01605 \N 0 \N \N \N \N \N test_01605 +PROFILE DROPPED diff --git a/tests/queries/0_stateless/01605_drop_settings_profile_while_assigned.sql b/tests/queries/0_stateless/01605_drop_settings_profile_while_assigned.sql new file mode 100644 index 00000000000..c9205d7fd89 --- /dev/null +++ b/tests/queries/0_stateless/01605_drop_settings_profile_while_assigned.sql @@ -0,0 +1,8 @@ +CREATE USER OR REPLACE 'test_01605'; +CREATE SETTINGS PROFILE OR REPLACE 'test_01605'; +ALTER USER 'test_01605' SETTINGS PROFILE 'test_01605'; +SELECT * FROM system.settings_profile_elements WHERE user_name='test_01605' OR profile_name='test_01605'; +DROP SETTINGS PROFILE 'test_01605'; +SELECT 'PROFILE DROPPED'; +SELECT * FROM system.settings_profile_elements WHERE user_name='test_01605' OR profile_name='test_01605'; +DROP USER 'test_01605'; From 27f647f93de5dc1400d748b1278066219f57e363 Mon Sep 17 00:00:00 2001 From: nikitamikhaylov Date: Mon, 30 Nov 2020 19:42:41 +0300 Subject: [PATCH 12/32] done --- .../ParallelParsingBlockInputStream.cpp | 26 +++++++++++++------ .../ParallelParsingBlockInputStream.h | 8 +++++- src/Formats/FormatFactory.cpp | 2 +- src/Formats/FormatFactory.h | 2 +- src/Formats/JSONEachRowUtils.cpp | 8 ++++-- src/Formats/JSONEachRowUtils.h | 2 +- .../Formats/Impl/CSVRowInputFormat.cpp | 9 +++++-- .../Formats/Impl/RegexpRowInputFormat.cpp | 7 ++--- .../Impl/TabSeparatedRowInputFormat.cpp | 8 ++++-- ...el_parsing_exception_with_offset.reference | 1 + ..._parallel_parsing_exception_with_offset.sh | 14 ++++++++++ 11 files changed, 66 insertions(+), 21 deletions(-) create mode 100644 tests/queries/0_stateless/01583_parallel_parsing_exception_with_offset.reference create mode 100755 tests/queries/0_stateless/01583_parallel_parsing_exception_with_offset.sh diff --git a/src/DataStreams/ParallelParsingBlockInputStream.cpp b/src/DataStreams/ParallelParsingBlockInputStream.cpp index 19b04d36fc1..759a98de0a3 100644 --- a/src/DataStreams/ParallelParsingBlockInputStream.cpp +++ b/src/DataStreams/ParallelParsingBlockInputStream.cpp @@ -126,9 +126,13 @@ void ParallelParsingBlockInputStream::segmentatorThreadFunction(ThreadGroupStatu // Segmentating the original input. unit.segment.resize(0); - const bool have_more_data = file_segmentation_engine(original_buffer, - unit.segment, min_chunk_bytes); + auto [have_more_data, currently_read_rows] = file_segmentation_engine( + original_buffer, unit.segment, min_chunk_bytes); + unit.offset = successfully_read_rows_count; + successfully_read_rows_count += currently_read_rows; + + unit.is_last = !have_more_data; unit.status = READY_TO_PARSE; scheduleParserThreadForUnitWithNumber(segmentator_ticket_number); @@ -142,7 +146,7 @@ void ParallelParsingBlockInputStream::segmentatorThreadFunction(ThreadGroupStatu } catch (...) { - onBackgroundException(); + onBackgroundException(successfully_read_rows_count); } } @@ -157,11 +161,11 @@ void ParallelParsingBlockInputStream::parserThreadFunction(ThreadGroupStatusPtr setThreadName("ChunkParser"); + const auto current_unit_number = current_ticket_number % processing_units.size(); + auto & unit = processing_units[current_unit_number]; + try { - const auto current_unit_number = current_ticket_number % processing_units.size(); - auto & unit = processing_units[current_unit_number]; - /* * This is kind of suspicious -- the input_process_creator contract with * respect to multithreaded use is not clear, but we hope that it is @@ -195,11 +199,11 @@ void ParallelParsingBlockInputStream::parserThreadFunction(ThreadGroupStatusPtr } catch (...) { - onBackgroundException(); + onBackgroundException(unit.offset); } } -void ParallelParsingBlockInputStream::onBackgroundException() +void ParallelParsingBlockInputStream::onBackgroundException(size_t offset) { tryLogCurrentException(__PRETTY_FUNCTION__); @@ -207,6 +211,12 @@ void ParallelParsingBlockInputStream::onBackgroundException() if (!background_exception) { background_exception = std::current_exception(); + + if (Exception * e = exception_cast(background_exception)) + e->addMessage(fmt::format( + "Offset: {}. P.S. You have to sum up offset and the row number from" + "the previous message to calculate the true row number where the" + "error occured due to parallel parsing algorithm specificity", offset)); } finished = true; reader_condvar.notify_all(); diff --git a/src/DataStreams/ParallelParsingBlockInputStream.h b/src/DataStreams/ParallelParsingBlockInputStream.h index c882acd9ddd..749de83b583 100644 --- a/src/DataStreams/ParallelParsingBlockInputStream.h +++ b/src/DataStreams/ParallelParsingBlockInputStream.h @@ -149,6 +149,8 @@ private: BlockExt block_ext; Memory<> segment; std::atomic status; + /// Needed for better exception message. + size_t offset = 0; bool is_last{false}; }; @@ -159,6 +161,10 @@ private: std::deque processing_units; + /// Compute it to have a more understandable error message. + size_t successfully_read_rows_count{0}; + + void scheduleParserThreadForUnitWithNumber(size_t ticket_number); void finishAndWait(); @@ -169,7 +175,7 @@ private: // threads. This function is used by segmentator and parsed threads. // readImpl() is called from the main thread, so the exception handling // is different. - void onBackgroundException(); + void onBackgroundException(size_t offset); }; } diff --git a/src/Formats/FormatFactory.cpp b/src/Formats/FormatFactory.cpp index 926b790a4bd..877f62dfb34 100644 --- a/src/Formats/FormatFactory.cpp +++ b/src/Formats/FormatFactory.cpp @@ -163,7 +163,7 @@ BlockInputStreamPtr FormatFactory::getInput( // (segmentator + two parsers + reader). bool parallel_parsing = settings.input_format_parallel_parsing && file_segmentation_engine && settings.max_threads >= 4; - if (settings.min_chunk_bytes_for_parallel_parsing * settings.max_threads * 2 > settings.max_memory_usage) + if (settings.max_memory_usage && settings.min_chunk_bytes_for_parallel_parsing * settings.max_threads * 2 > settings.max_memory_usage) parallel_parsing = false; if (parallel_parsing && name == "JSONEachRow") diff --git a/src/Formats/FormatFactory.h b/src/Formats/FormatFactory.h index 0fe6f19f0b7..d78cc2e2740 100644 --- a/src/Formats/FormatFactory.h +++ b/src/Formats/FormatFactory.h @@ -54,7 +54,7 @@ public: * Reads at least min_chunk_bytes and some more until the end of the chunk, depends on the format. * Used in ParallelParsingBlockInputStream. */ - using FileSegmentationEngine = std::function( ReadBuffer & buf, DB::Memory<> & memory, size_t min_chunk_bytes)>; diff --git a/src/Formats/JSONEachRowUtils.cpp b/src/Formats/JSONEachRowUtils.cpp index a1d9b4a5fff..6017f3983c6 100644 --- a/src/Formats/JSONEachRowUtils.cpp +++ b/src/Formats/JSONEachRowUtils.cpp @@ -4,13 +4,14 @@ namespace DB { -bool fileSegmentationEngineJSONEachRowImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size) +std::pair fileSegmentationEngineJSONEachRowImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size) { skipWhitespaceIfAny(in); char * pos = in.position(); size_t balance = 0; bool quotes = false; + size_t number_of_rows = 0; while (loadAtPosition(in, memory, pos) && (balance || memory.size() + static_cast(pos - in.position()) < min_chunk_size)) { @@ -57,11 +58,14 @@ bool fileSegmentationEngineJSONEachRowImpl(ReadBuffer & in, DB::Memory<> & memor quotes = true; ++pos; } + + if (balance == 0) + ++number_of_rows; } } saveUpToPosition(in, memory, pos); - return loadAtPosition(in, memory, pos); + return {loadAtPosition(in, memory, pos), number_of_rows}; } } diff --git a/src/Formats/JSONEachRowUtils.h b/src/Formats/JSONEachRowUtils.h index 92679fe3589..adf85f37a22 100644 --- a/src/Formats/JSONEachRowUtils.h +++ b/src/Formats/JSONEachRowUtils.h @@ -3,6 +3,6 @@ namespace DB { -bool fileSegmentationEngineJSONEachRowImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size); +std::pair fileSegmentationEngineJSONEachRowImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size); } diff --git a/src/Processors/Formats/Impl/CSVRowInputFormat.cpp b/src/Processors/Formats/Impl/CSVRowInputFormat.cpp index 7bca5f2c5d9..89f0e0e5d2c 100644 --- a/src/Processors/Formats/Impl/CSVRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/CSVRowInputFormat.cpp @@ -424,11 +424,12 @@ void registerInputFormatProcessorCSV(FormatFactory & factory) } } -static bool fileSegmentationEngineCSVImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size) +static std::pair fileSegmentationEngineCSVImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size) { char * pos = in.position(); bool quotes = false; bool need_more_data = true; + size_t number_of_rows = 0; while (loadAtPosition(in, memory, pos) && need_more_data) { @@ -458,6 +459,7 @@ static bool fileSegmentationEngineCSVImpl(ReadBuffer & in, DB::Memory<> & memory } else if (*pos == '\n') { + ++number_of_rows; if (memory.size() + static_cast(pos - in.position()) >= min_chunk_size) need_more_data = false; ++pos; @@ -470,13 +472,16 @@ static bool fileSegmentationEngineCSVImpl(ReadBuffer & in, DB::Memory<> & memory need_more_data = false; ++pos; if (loadAtPosition(in, memory, pos) && *pos == '\n') + { ++pos; + ++number_of_rows; + } } } } saveUpToPosition(in, memory, pos); - return loadAtPosition(in, memory, pos); + return {loadAtPosition(in, memory, pos), number_of_rows}; } void registerFileSegmentationEngineCSV(FormatFactory & factory) diff --git a/src/Processors/Formats/Impl/RegexpRowInputFormat.cpp b/src/Processors/Formats/Impl/RegexpRowInputFormat.cpp index c3f9d07b893..6e14a1dc3c8 100644 --- a/src/Processors/Formats/Impl/RegexpRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/RegexpRowInputFormat.cpp @@ -173,10 +173,11 @@ void registerInputFormatProcessorRegexp(FormatFactory & factory) }); } -static bool fileSegmentationEngineRegexpImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size) +static std::pair fileSegmentationEngineRegexpImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size) { char * pos = in.position(); bool need_more_data = true; + size_t number_of_rows = 0; while (loadAtPosition(in, memory, pos) && need_more_data) { @@ -196,12 +197,12 @@ static bool fileSegmentationEngineRegexpImpl(ReadBuffer & in, DB::Memory<> & mem need_more_data = false; ++pos; - + ++number_of_rows; } saveUpToPosition(in, memory, pos); - return loadAtPosition(in, memory, pos); + return {loadAtPosition(in, memory, pos), number_of_rows}; } void registerFileSegmentationEngineRegexp(FormatFactory & factory) diff --git a/src/Processors/Formats/Impl/TabSeparatedRowInputFormat.cpp b/src/Processors/Formats/Impl/TabSeparatedRowInputFormat.cpp index 529b70e4e09..69a5e61caf2 100644 --- a/src/Processors/Formats/Impl/TabSeparatedRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/TabSeparatedRowInputFormat.cpp @@ -423,10 +423,11 @@ void registerInputFormatProcessorTabSeparated(FormatFactory & factory) } } -static bool fileSegmentationEngineTabSeparatedImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size) +static std::pair fileSegmentationEngineTabSeparatedImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_chunk_size) { bool need_more_data = true; char * pos = in.position(); + size_t number_of_rows = 0; while (loadAtPosition(in, memory, pos) && need_more_data) { @@ -443,6 +444,9 @@ static bool fileSegmentationEngineTabSeparatedImpl(ReadBuffer & in, DB::Memory<> } else if (*pos == '\n' || *pos == '\r') { + if (*pos == '\n') + ++number_of_rows; + if (memory.size() + static_cast(pos - in.position()) >= min_chunk_size) need_more_data = false; ++pos; @@ -451,7 +455,7 @@ static bool fileSegmentationEngineTabSeparatedImpl(ReadBuffer & in, DB::Memory<> saveUpToPosition(in, memory, pos); - return loadAtPosition(in, memory, pos); + return {loadAtPosition(in, memory, pos), number_of_rows}; } void registerFileSegmentationEngineTabSeparated(FormatFactory & factory) diff --git a/tests/queries/0_stateless/01583_parallel_parsing_exception_with_offset.reference b/tests/queries/0_stateless/01583_parallel_parsing_exception_with_offset.reference new file mode 100644 index 00000000000..d86bac9de59 --- /dev/null +++ b/tests/queries/0_stateless/01583_parallel_parsing_exception_with_offset.reference @@ -0,0 +1 @@ +OK diff --git a/tests/queries/0_stateless/01583_parallel_parsing_exception_with_offset.sh b/tests/queries/0_stateless/01583_parallel_parsing_exception_with_offset.sh new file mode 100755 index 00000000000..805f4267818 --- /dev/null +++ b/tests/queries/0_stateless/01583_parallel_parsing_exception_with_offset.sh @@ -0,0 +1,14 @@ +#!/usr/bin/env bash + +CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) +. "$CURDIR"/../shell_config.sh + +CLICKHOUSE_CLIENT=$(echo ${CLICKHOUSE_CLIENT} | sed 's/'"--send_logs_level=${CLICKHOUSE_CLIENT_SERVER_LOGS_LEVEL}"'/--send_logs_level=none/g') + +$CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS check;" + +$CLICKHOUSE_CLIENT --query="CREATE TABLE check (x UInt64) ENGINE = Memory;" + +(seq 1 2000000; echo 'hello'; seq 1 20000000) | $CLICKHOUSE_CLIENT --multiquery --query="SET input_format_parallel_parsing=1; SET min_chunk_bytes_for_parallel_parsing=100000; INSERT INTO check(x) FORMAT TSV " 2>&1 | grep -q "Offset: 1988984" && echo 'OK' || echo 'FAIL' ||: + +$CLICKHOUSE_CLIENT --query="DROP TABLE check;" From c60c161168771a8904c4205a1b0f16f85e60095c Mon Sep 17 00:00:00 2001 From: nikitamikhaylov Date: Thu, 10 Dec 2020 20:26:36 +0300 Subject: [PATCH 13/32] add ParsingException --- src/Common/CounterInFile.h | 2 +- src/Common/Exception.h | 32 +++++++++++++++++++ src/DataStreams/NativeBlockInputStream.cpp | 4 +-- .../ParallelParsingBlockInputStream.cpp | 13 ++++---- src/DataTypes/DataTypeArray.cpp | 4 +-- src/DataTypes/DataTypeFixedString.cpp | 2 +- src/DataTypes/DataTypeNullable.cpp | 8 ++--- src/IO/ReadBuffer.h | 2 +- src/IO/ReadHelpers.cpp | 12 +++---- src/IO/ReadHelpers.h | 12 +++---- src/IO/parseDateTimeBestEffort.cpp | 2 +- src/IO/readDecimalText.h | 4 +-- src/IO/readFloatText.h | 14 ++++---- src/Processors/Formats/IRowInputFormat.cpp | 20 ++++++++++++ .../Formats/Impl/ArrowBlockInputFormat.cpp | 4 +-- .../Formats/Impl/AvroRowInputFormat.cpp | 2 +- .../Impl/JSONCompactEachRowRowInputFormat.cpp | 2 +- .../Impl/JSONEachRowRowInputFormat.cpp | 2 +- .../Formats/Impl/ORCBlockInputFormat.cpp | 2 +- .../Formats/Impl/ParquetBlockInputFormat.cpp | 2 +- .../Formats/Impl/TSKVRowInputFormat.cpp | 4 +-- .../Formats/Impl/TemplateRowInputFormat.cpp | 2 +- ..._parallel_parsing_exception_with_offset.sh | 2 +- 23 files changed, 102 insertions(+), 51 deletions(-) diff --git a/src/Common/CounterInFile.h b/src/Common/CounterInFile.h index 48414bd09cc..8cd4534d413 100644 --- a/src/Common/CounterInFile.h +++ b/src/Common/CounterInFile.h @@ -87,7 +87,7 @@ public: { /// A more understandable error message. if (e.code() == DB::ErrorCodes::CANNOT_READ_ALL_DATA || e.code() == DB::ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF) - throw DB::Exception("File " + path + " is empty. You must fill it manually with appropriate value.", e.code()); + throw DB::ParsingException("File " + path + " is empty. You must fill it manually with appropriate value.", e.code()); else throw; } diff --git a/src/Common/Exception.h b/src/Common/Exception.h index 3da2e2fb0d0..bff39fe454e 100644 --- a/src/Common/Exception.h +++ b/src/Common/Exception.h @@ -96,6 +96,38 @@ private: }; +/// Special class of exceptions, used mostly in ParallelParsingInputFormat for +/// more convinient calculation of problem line number. +class ParsingException : public Exception +{ +public: + using Exception::Exception; + + void formatInternalMessage() { + try + { + message(fmt::format(message(), line_number_)); + } + catch (...) {} + } + + int getLineNumber() { + return line_number_; + } + + void setLineNumber(int line_number) { + line_number_ = line_number; + } + +private: + int line_number_; + mutable std::string formatted_message_; + + const char * name() const throw() override { return "DB::ParsingException"; } + const char * className() const throw() override { return "DB::ParsingException"; } +}; + + using Exceptions = std::vector; diff --git a/src/DataStreams/NativeBlockInputStream.cpp b/src/DataStreams/NativeBlockInputStream.cpp index ae2d6886fa8..e9967e88638 100644 --- a/src/DataStreams/NativeBlockInputStream.cpp +++ b/src/DataStreams/NativeBlockInputStream.cpp @@ -83,7 +83,7 @@ void NativeBlockInputStream::readData(const IDataType & type, IColumn & column, type.deserializeBinaryBulkWithMultipleStreams(column, rows, settings, state); if (column.size() != rows) - throw Exception("Cannot read all data in NativeBlockInputStream. Rows read: " + toString(column.size()) + ". Rows expected: " + toString(rows) + ".", + throw ParsingException("Cannot read all data in NativeBlockInputStream. Rows read: " + toString(column.size()) + ". Rows expected: " + toString(rows) + ".", ErrorCodes::CANNOT_READ_ALL_DATA); } @@ -106,7 +106,7 @@ Block NativeBlockInputStream::readImpl() if (istr.eof()) { if (use_index) - throw Exception("Input doesn't contain all data for index.", ErrorCodes::CANNOT_READ_ALL_DATA); + throw ParsingException("Input doesn't contain all data for index.", ErrorCodes::CANNOT_READ_ALL_DATA); return res; } diff --git a/src/DataStreams/ParallelParsingBlockInputStream.cpp b/src/DataStreams/ParallelParsingBlockInputStream.cpp index 759a98de0a3..ec0652ae766 100644 --- a/src/DataStreams/ParallelParsingBlockInputStream.cpp +++ b/src/DataStreams/ParallelParsingBlockInputStream.cpp @@ -205,19 +205,18 @@ void ParallelParsingBlockInputStream::parserThreadFunction(ThreadGroupStatusPtr void ParallelParsingBlockInputStream::onBackgroundException(size_t offset) { - tryLogCurrentException(__PRETTY_FUNCTION__); - std::unique_lock lock(mutex); if (!background_exception) { background_exception = std::current_exception(); - if (Exception * e = exception_cast(background_exception)) - e->addMessage(fmt::format( - "Offset: {}. P.S. You have to sum up offset and the row number from" - "the previous message to calculate the true row number where the" - "error occured due to parallel parsing algorithm specificity", offset)); + if (ParsingException * e = exception_cast(background_exception)) + { + e->setLineNumber(e->getLineNumber() + offset); + e->formatInternalMessage(); + } } + tryLogCurrentException(__PRETTY_FUNCTION__); finished = true; reader_condvar.notify_all(); segmentator_condvar.notify_all(); diff --git a/src/DataTypes/DataTypeArray.cpp b/src/DataTypes/DataTypeArray.cpp index 1a8130fb36d..9cd56d0e2b5 100644 --- a/src/DataTypes/DataTypeArray.cpp +++ b/src/DataTypes/DataTypeArray.cpp @@ -272,7 +272,7 @@ void DataTypeArray::deserializeBinaryBulkWithMultipleStreams( /// Check consistency between offsets and elements subcolumns. /// But if elements column is empty - it's ok for columns of Nested types that was added by ALTER. if (!nested_column.empty() && nested_column.size() != last_offset) - throw Exception("Cannot read all array values: read just " + toString(nested_column.size()) + " of " + toString(last_offset), + throw ParsingException("Cannot read all array values: read just " + toString(nested_column.size()) + " of " + toString(last_offset), ErrorCodes::CANNOT_READ_ALL_DATA); } @@ -325,7 +325,7 @@ static void deserializeTextImpl(IColumn & column, ReadBuffer & istr, Reader && r if (*istr.position() == ',') ++istr.position(); else - throw Exception(ErrorCodes::CANNOT_READ_ARRAY_FROM_TEXT, + throw ParsingException(ErrorCodes::CANNOT_READ_ARRAY_FROM_TEXT, "Cannot read array from text, expected comma or end of array, found '{}'", *istr.position()); } diff --git a/src/DataTypes/DataTypeFixedString.cpp b/src/DataTypes/DataTypeFixedString.cpp index 585c5709be7..c0b684cfcd9 100644 --- a/src/DataTypes/DataTypeFixedString.cpp +++ b/src/DataTypes/DataTypeFixedString.cpp @@ -103,7 +103,7 @@ void DataTypeFixedString::deserializeBinaryBulk(IColumn & column, ReadBuffer & i size_t read_bytes = istr.readBig(reinterpret_cast(&data[initial_size]), max_bytes); if (read_bytes % n != 0) - throw Exception("Cannot read all data of type FixedString. Bytes read:" + toString(read_bytes) + ". String size:" + toString(n) + ".", + throw ParsingException("Cannot read all data of type FixedString. Bytes read:" + toString(read_bytes) + ". String size:" + toString(n) + ".", ErrorCodes::CANNOT_READ_ALL_DATA); data.resize(initial_size + read_bytes); diff --git a/src/DataTypes/DataTypeNullable.cpp b/src/DataTypes/DataTypeNullable.cpp index ed501939901..a0fc8baaf7e 100644 --- a/src/DataTypes/DataTypeNullable.cpp +++ b/src/DataTypes/DataTypeNullable.cpp @@ -235,7 +235,7 @@ ReturnType DataTypeNullable::deserializeTextEscaped(IColumn & column, ReadBuffer /// Little tricky, because we cannot discriminate null from first character. if (istr.eof()) - throw Exception("Unexpected end of stream, while parsing value of Nullable type", ErrorCodes::CANNOT_READ_ALL_DATA); + throw ParsingException("Unexpected end of stream, while parsing value of Nullable type", ErrorCodes::CANNOT_READ_ALL_DATA); /// This is not null, surely. if (*istr.position() != '\\') @@ -250,7 +250,7 @@ ReturnType DataTypeNullable::deserializeTextEscaped(IColumn & column, ReadBuffer ++istr.position(); if (istr.eof()) - throw Exception("Unexpected end of stream, while parsing value of Nullable type, after backslash", ErrorCodes::CANNOT_READ_ALL_DATA); + throw ParsingException("Unexpected end of stream, while parsing value of Nullable type, after backslash", ErrorCodes::CANNOT_READ_ALL_DATA); return safeDeserialize(column, *nested_data_type, [&istr] @@ -405,11 +405,11 @@ ReturnType DataTypeNullable::deserializeTextCSV(IColumn & column, ReadBuffer & i /// or if someone uses 'U' or 'L' as delimiter in CSV. /// In the first case we cannot continue reading anyway. The second case seems to be unlikely. if (settings.csv.delimiter == 'U' || settings.csv.delimiter == 'L') - throw DB::Exception("Enabled setting input_format_csv_unquoted_null_literal_as_null may not work correctly " + throw DB::ParsingException("Enabled setting input_format_csv_unquoted_null_literal_as_null may not work correctly " "with format_csv_delimiter = 'U' or 'L' for large input.", ErrorCodes::CANNOT_READ_ALL_DATA); WriteBufferFromOwnString parsed_value; nested_data_type->serializeAsTextCSV(nested, nested.size() - 1, parsed_value, settings); - throw DB::Exception("Error while parsing \"" + std::string(null_literal, null_prefix_len) + throw DB::ParsingException("Error while parsing \"" + std::string(null_literal, null_prefix_len) + std::string(istr.position(), std::min(size_t{10}, istr.available())) + "\" as Nullable(" + nested_data_type->getName() + ") at position " + std::to_string(istr.count()) + ": expected \"NULL\" or " + nested_data_type->getName() + ", got \"" + std::string(null_literal, buf.count()) + "\", which was deserialized as \"" diff --git a/src/IO/ReadBuffer.h b/src/IO/ReadBuffer.h index 3d6eb6970ce..08f1bc2a3bb 100644 --- a/src/IO/ReadBuffer.h +++ b/src/IO/ReadBuffer.h @@ -165,7 +165,7 @@ public: { auto read_bytes = read(to, n); if (n != read_bytes) - throw Exception("Cannot read all data. Bytes read: " + std::to_string(read_bytes) + ". Bytes expected: " + std::to_string(n) + ".", ErrorCodes::CANNOT_READ_ALL_DATA); + throw ParsingException("Cannot read all data. Bytes read: " + std::to_string(read_bytes) + ". Bytes expected: " + std::to_string(n) + ".", ErrorCodes::CANNOT_READ_ALL_DATA); } /** A method that can be more efficiently implemented in derived classes, in the case of reading large enough blocks. diff --git a/src/IO/ReadHelpers.cpp b/src/IO/ReadHelpers.cpp index e290da39535..97a8d937d39 100644 --- a/src/IO/ReadHelpers.cpp +++ b/src/IO/ReadHelpers.cpp @@ -96,7 +96,7 @@ void NO_INLINE throwAtAssertionFailed(const char * s, ReadBuffer & buf) else out << " before: " << quote << String(buf.position(), std::min(SHOW_CHARS_ON_SYNTAX_ERROR, buf.buffer().end() - buf.position())); - throw Exception(out.str(), ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED); + throw ParsingException(out.str(), ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED); } @@ -503,7 +503,7 @@ static void readAnyQuotedStringInto(Vector & s, ReadBuffer & buf) { if (buf.eof() || *buf.position() != quote) { - throw Exception(ErrorCodes::CANNOT_PARSE_QUOTED_STRING, + throw ParsingException(ErrorCodes::CANNOT_PARSE_QUOTED_STRING, "Cannot parse quoted string: expected opening quote '{}', got '{}'", std::string{quote}, buf.eof() ? "EOF" : std::string{*buf.position()}); } @@ -538,7 +538,7 @@ static void readAnyQuotedStringInto(Vector & s, ReadBuffer & buf) parseComplexEscapeSequence(s, buf); } - throw Exception("Cannot parse quoted string: expected closing quote", + throw ParsingException("Cannot parse quoted string: expected closing quote", ErrorCodes::CANNOT_PARSE_QUOTED_STRING); } @@ -716,7 +716,7 @@ ReturnType readJSONStringInto(Vector & s, ReadBuffer & buf) auto error = [](const char * message [[maybe_unused]], int code [[maybe_unused]]) { if constexpr (throw_exception) - throw Exception(message, code); + throw ParsingException(message, code); return ReturnType(false); }; @@ -861,7 +861,7 @@ ReturnType readDateTimeTextFallback(time_t & datetime, ReadBuffer & buf, const D s_pos[size] = 0; if constexpr (throw_exception) - throw Exception(std::string("Cannot parse datetime ") + s, ErrorCodes::CANNOT_PARSE_DATETIME); + throw ParsingException(std::string("Cannot parse datetime ") + s, ErrorCodes::CANNOT_PARSE_DATETIME); else return false; } @@ -899,7 +899,7 @@ ReturnType readDateTimeTextFallback(time_t & datetime, ReadBuffer & buf, const D else { if constexpr (throw_exception) - throw Exception("Cannot parse datetime", ErrorCodes::CANNOT_PARSE_DATETIME); + throw ParsingException("Cannot parse datetime", ErrorCodes::CANNOT_PARSE_DATETIME); else return false; } diff --git a/src/IO/ReadHelpers.h b/src/IO/ReadHelpers.h index dbbaae0816f..fa6b1fc2d8a 100644 --- a/src/IO/ReadHelpers.h +++ b/src/IO/ReadHelpers.h @@ -300,7 +300,7 @@ ReturnType readIntTextImpl(T & x, ReadBuffer & buf) else { if constexpr (throw_exception) - throw Exception("Unsigned type must not contain '-' symbol", ErrorCodes::CANNOT_PARSE_NUMBER); + throw ParsingException("Unsigned type must not contain '-' symbol", ErrorCodes::CANNOT_PARSE_NUMBER); else return ReturnType(false); } @@ -648,7 +648,7 @@ inline ReturnType readUUIDTextImpl(UUID & uuid, ReadBuffer & buf) if constexpr (throw_exception) { - throw Exception(std::string("Cannot parse uuid ") + s, ErrorCodes::CANNOT_PARSE_UUID); + throw ParsingException(std::string("Cannot parse uuid ") + s, ErrorCodes::CANNOT_PARSE_UUID); } else { @@ -669,7 +669,7 @@ inline ReturnType readUUIDTextImpl(UUID & uuid, ReadBuffer & buf) if constexpr (throw_exception) { - throw Exception(std::string("Cannot parse uuid ") + s, ErrorCodes::CANNOT_PARSE_UUID); + throw ParsingException(std::string("Cannot parse uuid ") + s, ErrorCodes::CANNOT_PARSE_UUID); } else { @@ -824,7 +824,7 @@ inline void readDateTimeText(LocalDateTime & datetime, ReadBuffer & buf) if (19 != size) { s[size] = 0; - throw Exception(std::string("Cannot parse datetime ") + s, ErrorCodes::CANNOT_PARSE_DATETIME); + throw ParsingException(std::string("Cannot parse datetime ") + s, ErrorCodes::CANNOT_PARSE_DATETIME); } datetime.year((s[0] - '0') * 1000 + (s[1] - '0') * 100 + (s[2] - '0') * 10 + (s[3] - '0')); @@ -1016,7 +1016,7 @@ void readQuoted(std::vector & x, ReadBuffer & buf) if (*buf.position() == ',') ++buf.position(); else - throw Exception("Cannot read array from text", ErrorCodes::CANNOT_READ_ARRAY_FROM_TEXT); + throw ParsingException("Cannot read array from text", ErrorCodes::CANNOT_READ_ARRAY_FROM_TEXT); } first = false; @@ -1039,7 +1039,7 @@ void readDoubleQuoted(std::vector & x, ReadBuffer & buf) if (*buf.position() == ',') ++buf.position(); else - throw Exception("Cannot read array from text", ErrorCodes::CANNOT_READ_ARRAY_FROM_TEXT); + throw ParsingException("Cannot read array from text", ErrorCodes::CANNOT_READ_ARRAY_FROM_TEXT); } first = false; diff --git a/src/IO/parseDateTimeBestEffort.cpp b/src/IO/parseDateTimeBestEffort.cpp index 8a188d22236..063955cdd1e 100644 --- a/src/IO/parseDateTimeBestEffort.cpp +++ b/src/IO/parseDateTimeBestEffort.cpp @@ -99,7 +99,7 @@ ReturnType parseDateTimeBestEffortImpl( auto on_error = [](const std::string & message [[maybe_unused]], int code [[maybe_unused]]) { if constexpr (std::is_same_v) - throw Exception(message, code); + throw ParsingException(message, code); else return false; }; diff --git a/src/IO/readDecimalText.h b/src/IO/readDecimalText.h index 727dd67c389..203d8e3963b 100644 --- a/src/IO/readDecimalText.h +++ b/src/IO/readDecimalText.h @@ -120,7 +120,7 @@ inline bool readDigits(ReadBuffer & buf, T & x, uint32_t & digits, int32_t & exp if (!tryReadIntText(addition_exp, buf)) { if constexpr (_throw_on_error) - throw Exception("Cannot parse exponent while reading decimal", ErrorCodes::CANNOT_PARSE_NUMBER); + throw ParsingException("Cannot parse exponent while reading decimal", ErrorCodes::CANNOT_PARSE_NUMBER); else return false; } @@ -133,7 +133,7 @@ inline bool readDigits(ReadBuffer & buf, T & x, uint32_t & digits, int32_t & exp if (digits_only) { if constexpr (_throw_on_error) - throw Exception("Unexpected symbol while reading decimal", ErrorCodes::CANNOT_PARSE_NUMBER); + throw ParsingException("Unexpected symbol while reading decimal", ErrorCodes::CANNOT_PARSE_NUMBER); return false; } stop = true; diff --git a/src/IO/readFloatText.h b/src/IO/readFloatText.h index 5647d574c62..eac6183e332 100644 --- a/src/IO/readFloatText.h +++ b/src/IO/readFloatText.h @@ -160,7 +160,7 @@ ReturnType readFloatTextPreciseImpl(T & x, ReadBuffer & buf) if (unlikely(res.ec != std::errc())) { if constexpr (throw_exception) - throw Exception("Cannot read floating point value", ErrorCodes::CANNOT_PARSE_NUMBER); + throw ParsingException("Cannot read floating point value", ErrorCodes::CANNOT_PARSE_NUMBER); else return ReturnType(false); } @@ -243,7 +243,7 @@ ReturnType readFloatTextPreciseImpl(T & x, ReadBuffer & buf) if (unlikely(res.ec != std::errc())) { if constexpr (throw_exception) - throw Exception("Cannot read floating point value", ErrorCodes::CANNOT_PARSE_NUMBER); + throw ParsingException("Cannot read floating point value", ErrorCodes::CANNOT_PARSE_NUMBER); else return ReturnType(false); } @@ -331,7 +331,7 @@ ReturnType readFloatTextFastImpl(T & x, ReadBuffer & in) if (in.eof()) { if constexpr (throw_exception) - throw Exception("Cannot read floating point value", ErrorCodes::CANNOT_PARSE_NUMBER); + throw ParsingException("Cannot read floating point value", ErrorCodes::CANNOT_PARSE_NUMBER); else return false; } @@ -387,7 +387,7 @@ ReturnType readFloatTextFastImpl(T & x, ReadBuffer & in) if (in.eof()) { if constexpr (throw_exception) - throw Exception("Cannot read floating point value: nothing after exponent", ErrorCodes::CANNOT_PARSE_NUMBER); + throw ParsingException("Cannot read floating point value: nothing after exponent", ErrorCodes::CANNOT_PARSE_NUMBER); else return false; } @@ -425,7 +425,7 @@ ReturnType readFloatTextFastImpl(T & x, ReadBuffer & in) if (in.eof()) { if constexpr (throw_exception) - throw Exception("Cannot read floating point value: no digits read", ErrorCodes::CANNOT_PARSE_NUMBER); + throw ParsingException("Cannot read floating point value: no digits read", ErrorCodes::CANNOT_PARSE_NUMBER); else return false; } @@ -436,14 +436,14 @@ ReturnType readFloatTextFastImpl(T & x, ReadBuffer & in) if (in.eof()) { if constexpr (throw_exception) - throw Exception("Cannot read floating point value: nothing after plus sign", ErrorCodes::CANNOT_PARSE_NUMBER); + throw ParsingException("Cannot read floating point value: nothing after plus sign", ErrorCodes::CANNOT_PARSE_NUMBER); else return false; } else if (negative) { if constexpr (throw_exception) - throw Exception("Cannot read floating point value: plus after minus sign", ErrorCodes::CANNOT_PARSE_NUMBER); + throw ParsingException("Cannot read floating point value: plus after minus sign", ErrorCodes::CANNOT_PARSE_NUMBER); else return false; } diff --git a/src/Processors/Formats/IRowInputFormat.cpp b/src/Processors/Formats/IRowInputFormat.cpp index a6c66f1cacc..f65a590ab42 100644 --- a/src/Processors/Formats/IRowInputFormat.cpp +++ b/src/Processors/Formats/IRowInputFormat.cpp @@ -132,6 +132,26 @@ Chunk IRowInputFormat::generate() } } } + catch (ParsingException & e) + { + String verbose_diagnostic; + try + { + verbose_diagnostic = getDiagnosticInfo(); + } + catch (const Exception & exception) + { + verbose_diagnostic = "Cannot get verbose diagnostic: " + exception.message(); + } + catch (...) + { + /// Error while trying to obtain verbose diagnostic. Ok to ignore. + } + + e.setLineNumber(total_rows); + e.addMessage("(at row {})\n" + verbose_diagnostic); + throw; + } catch (Exception & e) { if (!isParseError(e.code())) diff --git a/src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp b/src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp index 64a9ce68ef7..4edef1f1365 100644 --- a/src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ArrowBlockInputFormat.cpp @@ -48,12 +48,12 @@ Chunk ArrowBlockInputFormat::generate() } if (!batch_result.ok()) - throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, + throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading batch of Arrow data: {}", batch_result.status().ToString()); auto table_result = arrow::Table::FromRecordBatches({*batch_result}); if (!table_result.ok()) - throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, + throw ParsingException(ErrorCodes::CANNOT_READ_ALL_DATA, "Error while reading batch of Arrow data: {}", table_result.status().ToString()); ++record_batch_current; diff --git a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp index ccfe65bbba3..a8d71790f41 100644 --- a/src/Processors/Formats/Impl/AvroRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/AvroRowInputFormat.cpp @@ -190,7 +190,7 @@ AvroDeserializer::DeserializeFn AvroDeserializer::createDeserializeFn(avro::Node { decoder.decodeString(tmp); if (tmp.length() != 36) - throw Exception(std::string("Cannot parse uuid ") + tmp, ErrorCodes::CANNOT_PARSE_UUID); + throw ParsingException(std::string("Cannot parse uuid ") + tmp, ErrorCodes::CANNOT_PARSE_UUID); UUID uuid; parseUUID(reinterpret_cast(tmp.data()), std::reverse_iterator(reinterpret_cast(&uuid) + 16)); diff --git a/src/Processors/Formats/Impl/JSONCompactEachRowRowInputFormat.cpp b/src/Processors/Formats/Impl/JSONCompactEachRowRowInputFormat.cpp index f20e764dcfd..1fc5041b1f3 100644 --- a/src/Processors/Formats/Impl/JSONCompactEachRowRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/JSONCompactEachRowRowInputFormat.cpp @@ -171,7 +171,7 @@ bool JSONCompactEachRowRowInputFormat::readRow(DB::MutableColumns &columns, DB:: skipWhitespaceIfAny(in); if (in.eof()) - throw Exception("Unexpected end of stream while parsing JSONCompactEachRow format", ErrorCodes::CANNOT_READ_ALL_DATA); + throw ParsingException("Unexpected end of stream while parsing JSONCompactEachRow format", ErrorCodes::CANNOT_READ_ALL_DATA); if (file_column + 1 != column_indexes_for_input_fields.size()) { assertChar(',', in); diff --git a/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.cpp b/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.cpp index 234839b41f5..8a707ae6554 100644 --- a/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/JSONEachRowRowInputFormat.cpp @@ -173,7 +173,7 @@ inline bool JSONEachRowRowInputFormat::advanceToNextKey(size_t key_index) skipWhitespaceIfAny(in); if (in.eof()) - throw Exception("Unexpected end of stream while parsing JSONEachRow format", ErrorCodes::CANNOT_READ_ALL_DATA); + throw ParsingException("Unexpected end of stream while parsing JSONEachRow format", ErrorCodes::CANNOT_READ_ALL_DATA); else if (*in.position() == '}') { ++in.position(); diff --git a/src/Processors/Formats/Impl/ORCBlockInputFormat.cpp b/src/Processors/Formats/Impl/ORCBlockInputFormat.cpp index a12ca09eec0..8bbf0fc089b 100644 --- a/src/Processors/Formats/Impl/ORCBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ORCBlockInputFormat.cpp @@ -38,7 +38,7 @@ Chunk ORCBlockInputFormat::generate() std::shared_ptr table; arrow::Status read_status = file_reader->Read(&table); if (!read_status.ok()) - throw Exception{"Error while reading ORC data: " + read_status.ToString(), + throw ParsingException{"Error while reading ORC data: " + read_status.ToString(), ErrorCodes::CANNOT_READ_ALL_DATA}; ArrowColumnToCHColumn::arrowTableToCHChunk(res, table, header, "ORC"); diff --git a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp index 031974dc357..bb55c71b7ca 100644 --- a/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp +++ b/src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp @@ -47,7 +47,7 @@ Chunk ParquetBlockInputFormat::generate() std::shared_ptr table; arrow::Status read_status = file_reader->ReadRowGroup(row_group_current, column_indices, &table); if (!read_status.ok()) - throw Exception{"Error while reading Parquet data: " + read_status.ToString(), + throw ParsingException{"Error while reading Parquet data: " + read_status.ToString(), ErrorCodes::CANNOT_READ_ALL_DATA}; ++row_group_current; diff --git a/src/Processors/Formats/Impl/TSKVRowInputFormat.cpp b/src/Processors/Formats/Impl/TSKVRowInputFormat.cpp index abb468741c5..8d769cab346 100644 --- a/src/Processors/Formats/Impl/TSKVRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/TSKVRowInputFormat.cpp @@ -89,7 +89,7 @@ static bool readName(ReadBuffer & buf, StringRef & ref, String & tmp) } } - throw Exception("Unexpected end of stream while reading key name from TSKV format", ErrorCodes::CANNOT_READ_ALL_DATA); + throw ParsingException("Unexpected end of stream while reading key name from TSKV format", ErrorCodes::CANNOT_READ_ALL_DATA); } @@ -157,7 +157,7 @@ bool TSKVRowInputFormat::readRow(MutableColumns & columns, RowReadExtension & ex if (in.eof()) { - throw Exception("Unexpected end of stream after field in TSKV format: " + name_ref.toString(), ErrorCodes::CANNOT_READ_ALL_DATA); + throw ParsingException("Unexpected end of stream after field in TSKV format: " + name_ref.toString(), ErrorCodes::CANNOT_READ_ALL_DATA); } else if (*in.position() == '\t') { diff --git a/src/Processors/Formats/Impl/TemplateRowInputFormat.cpp b/src/Processors/Formats/Impl/TemplateRowInputFormat.cpp index ecb529a99af..6023b38e4de 100644 --- a/src/Processors/Formats/Impl/TemplateRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/TemplateRowInputFormat.cpp @@ -489,7 +489,7 @@ void TemplateRowInputFormat::skipToNextDelimiterOrEof(const String & delimiter) void TemplateRowInputFormat::throwUnexpectedEof() { - throw Exception("Unexpected EOF while parsing row " + std::to_string(row_num) + ". " + throw ParsingException("Unexpected EOF while parsing row " + std::to_string(row_num) + ". " "Maybe last row has wrong format or input doesn't contain specified suffix before EOF.", ErrorCodes::CANNOT_READ_ALL_DATA); } diff --git a/tests/queries/0_stateless/01583_parallel_parsing_exception_with_offset.sh b/tests/queries/0_stateless/01583_parallel_parsing_exception_with_offset.sh index 805f4267818..caa180b1e0f 100755 --- a/tests/queries/0_stateless/01583_parallel_parsing_exception_with_offset.sh +++ b/tests/queries/0_stateless/01583_parallel_parsing_exception_with_offset.sh @@ -9,6 +9,6 @@ $CLICKHOUSE_CLIENT --query="DROP TABLE IF EXISTS check;" $CLICKHOUSE_CLIENT --query="CREATE TABLE check (x UInt64) ENGINE = Memory;" -(seq 1 2000000; echo 'hello'; seq 1 20000000) | $CLICKHOUSE_CLIENT --multiquery --query="SET input_format_parallel_parsing=1; SET min_chunk_bytes_for_parallel_parsing=100000; INSERT INTO check(x) FORMAT TSV " 2>&1 | grep -q "Offset: 1988984" && echo 'OK' || echo 'FAIL' ||: +(seq 1 2000000; echo 'hello'; seq 1 20000000) | $CLICKHOUSE_CLIENT --input_format_parallel_parsing=1 --min_chunk_bytes_for_parallel_parsing=1000 --query="INSERT INTO check(x) FORMAT TSV " 2>&1 | grep -q "(at row 2000001)" && echo 'OK' || echo 'FAIL' ||: $CLICKHOUSE_CLIENT --query="DROP TABLE check;" From a285cb83df5caea3592e13e9f2eede7a3f8dbc8c Mon Sep 17 00:00:00 2001 From: nikitamikhaylov Date: Thu, 10 Dec 2020 21:02:40 +0300 Subject: [PATCH 14/32] style --- src/Common/Exception.h | 13 +++++-------- src/DataStreams/ParallelParsingBlockInputStream.cpp | 5 ++--- src/Processors/Formats/IRowInputFormat.cpp | 2 +- src/Processors/Formats/Impl/CSVRowInputFormat.cpp | 2 +- 4 files changed, 9 insertions(+), 13 deletions(-) diff --git a/src/Common/Exception.h b/src/Common/Exception.h index bff39fe454e..0610af6bcc7 100644 --- a/src/Common/Exception.h +++ b/src/Common/Exception.h @@ -98,12 +98,13 @@ private: /// Special class of exceptions, used mostly in ParallelParsingInputFormat for /// more convinient calculation of problem line number. -class ParsingException : public Exception +class ParsingException : public Exception { public: using Exception::Exception; - void formatInternalMessage() { + void formatInternalMessage() + { try { message(fmt::format(message(), line_number_)); @@ -111,13 +112,9 @@ public: catch (...) {} } - int getLineNumber() { - return line_number_; - } + int getLineNumber() { return line_number_; } - void setLineNumber(int line_number) { - line_number_ = line_number; - } + void setLineNumber(int line_number) { line_number_ = line_number;} private: int line_number_; diff --git a/src/DataStreams/ParallelParsingBlockInputStream.cpp b/src/DataStreams/ParallelParsingBlockInputStream.cpp index ec0652ae766..c3daed21434 100644 --- a/src/DataStreams/ParallelParsingBlockInputStream.cpp +++ b/src/DataStreams/ParallelParsingBlockInputStream.cpp @@ -132,7 +132,6 @@ void ParallelParsingBlockInputStream::segmentatorThreadFunction(ThreadGroupStatu unit.offset = successfully_read_rows_count; successfully_read_rows_count += currently_read_rows; - unit.is_last = !have_more_data; unit.status = READY_TO_PARSE; scheduleParserThreadForUnitWithNumber(segmentator_ticket_number); @@ -209,8 +208,8 @@ void ParallelParsingBlockInputStream::onBackgroundException(size_t offset) if (!background_exception) { background_exception = std::current_exception(); - - if (ParsingException * e = exception_cast(background_exception)) + + if (ParsingException * e = exception_cast(background_exception)) { e->setLineNumber(e->getLineNumber() + offset); e->formatInternalMessage(); diff --git a/src/Processors/Formats/IRowInputFormat.cpp b/src/Processors/Formats/IRowInputFormat.cpp index f65a590ab42..85a7750b0b1 100644 --- a/src/Processors/Formats/IRowInputFormat.cpp +++ b/src/Processors/Formats/IRowInputFormat.cpp @@ -132,7 +132,7 @@ Chunk IRowInputFormat::generate() } } } - catch (ParsingException & e) + catch (ParsingException & e) { String verbose_diagnostic; try diff --git a/src/Processors/Formats/Impl/CSVRowInputFormat.cpp b/src/Processors/Formats/Impl/CSVRowInputFormat.cpp index 89f0e0e5d2c..8422f09e364 100644 --- a/src/Processors/Formats/Impl/CSVRowInputFormat.cpp +++ b/src/Processors/Formats/Impl/CSVRowInputFormat.cpp @@ -475,7 +475,7 @@ static std::pair fileSegmentationEngineCSVImpl(ReadBuffer & in, DB { ++pos; ++number_of_rows; - } + } } } } From 8d4465ddeb6c9a24b81aabb2531ce3ec6f4dcbb4 Mon Sep 17 00:00:00 2001 From: nikitamikhaylov Date: Fri, 11 Dec 2020 00:12:54 +0300 Subject: [PATCH 15/32] better --- src/Common/Exception.h | 25 ++++++++++++++++--- .../ParallelParsingBlockInputStream.cpp | 1 - 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/src/Common/Exception.h b/src/Common/Exception.h index 0610af6bcc7..d03c0a19b6c 100644 --- a/src/Common/Exception.h +++ b/src/Common/Exception.h @@ -55,6 +55,11 @@ public: extendedMessage(fmt::format(format, std::forward(args)...)); } + /// Note that base class has exactly the same, but not virtual function. + virtual std::string displayText() const { + return Poco::Exception::displayText(); + } + void addMessage(const std::string& message) { extendedMessage(message); @@ -103,17 +108,29 @@ class ParsingException : public Exception public: using Exception::Exception; - void formatInternalMessage() - { + /// In a good way we have to mark this function virtual in the base class Poco::Exception. + /// For now it is virtual only since Exception (above class) in the hierarchy. + std::string displayText() const override { try { - message(fmt::format(message(), line_number_)); + formatted_message_ = fmt::format(message(), line_number_); } catch (...) {} + + if (!formatted_message_.empty()) + { + std::string result = name(); + result.append(": "); + result.append(formatted_message_); + return result; + } + else + { + return Exception::displayText(); + } } int getLineNumber() { return line_number_; } - void setLineNumber(int line_number) { line_number_ = line_number;} private: diff --git a/src/DataStreams/ParallelParsingBlockInputStream.cpp b/src/DataStreams/ParallelParsingBlockInputStream.cpp index c3daed21434..49b7929d7d7 100644 --- a/src/DataStreams/ParallelParsingBlockInputStream.cpp +++ b/src/DataStreams/ParallelParsingBlockInputStream.cpp @@ -212,7 +212,6 @@ void ParallelParsingBlockInputStream::onBackgroundException(size_t offset) if (ParsingException * e = exception_cast(background_exception)) { e->setLineNumber(e->getLineNumber() + offset); - e->formatInternalMessage(); } } tryLogCurrentException(__PRETTY_FUNCTION__); From 4bb52c062a010cc3728dcac7b648c62b0d4a8472 Mon Sep 17 00:00:00 2001 From: nikitamikhaylov Date: Fri, 11 Dec 2020 17:22:37 +0300 Subject: [PATCH 16/32] fix tests --- tests/queries/0_stateless/00900_parquet_load.reference | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/queries/0_stateless/00900_parquet_load.reference b/tests/queries/0_stateless/00900_parquet_load.reference index f93be897da8..52c5fbedcc6 100644 --- a/tests/queries/0_stateless/00900_parquet_load.reference +++ b/tests/queries/0_stateless/00900_parquet_load.reference @@ -52,7 +52,7 @@ 23.00 24.00 === Try load data from datapage_v2.snappy.parquet -Code: 33. DB::Ex---tion: Error while reading Parquet data: IOError: Not yet implemented: Unsupported encoding.: data for INSERT was parsed from stdin +Code: 33. DB::ParsingEx---tion: Error while reading Parquet data: IOError: Not yet implemented: Unsupported encoding.: data for INSERT was parsed from stdin === Try load data from dict-page-offset-zero.parquet 1552 @@ -257,7 +257,7 @@ Code: 70. DB::Ex---tion: The type "list" of an input column "int64_list" is not Code: 70. DB::Ex---tion: The type "list" of an input column "a" is not supported for conversion from a Parquet data format: data for INSERT was parsed from stdin === Try load data from nested_maps.snappy.parquet -Code: 70. DB::Ex---tion: The type "map" of an input column "a" is not supported for conversion from a Parquet data format: data for INSERT was parsed from stdin +Code: 70. DB::ParsingEx---tion: The type "map" of an input column "a" is not supported for conversion from a Parquet data format: data for INSERT was parsed from stdin === Try load data from non_hadoop_lz4_compressed.parquet 1593604800 abc 42 From 0dafcc38a5d2155a538c6fdcc828950c25d93cc6 Mon Sep 17 00:00:00 2001 From: nikitamikhaylov Date: Fri, 11 Dec 2020 18:15:18 +0300 Subject: [PATCH 17/32] better --- contrib/poco | 2 +- src/Common/Exception.h | 8 +------- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/contrib/poco b/contrib/poco index 08974cc024b..2c32e17c7df 160000 --- a/contrib/poco +++ b/contrib/poco @@ -1 +1 @@ -Subproject commit 08974cc024b2e748f5b1d45415396706b3521d0f +Subproject commit 2c32e17c7dfee1f8bf24227b697cdef5fddf0823 diff --git a/src/Common/Exception.h b/src/Common/Exception.h index d03c0a19b6c..0129045abd3 100644 --- a/src/Common/Exception.h +++ b/src/Common/Exception.h @@ -55,11 +55,6 @@ public: extendedMessage(fmt::format(format, std::forward(args)...)); } - /// Note that base class has exactly the same, but not virtual function. - virtual std::string displayText() const { - return Poco::Exception::displayText(); - } - void addMessage(const std::string& message) { extendedMessage(message); @@ -108,8 +103,7 @@ class ParsingException : public Exception public: using Exception::Exception; - /// In a good way we have to mark this function virtual in the base class Poco::Exception. - /// For now it is virtual only since Exception (above class) in the hierarchy. + /// We use additional field formatted_message_ to make this method const. std::string displayText() const override { try { From 35dbd2bd77ff93c81fdf1cc821ecfcb9ce167948 Mon Sep 17 00:00:00 2001 From: nikitamikhaylov Date: Fri, 11 Dec 2020 23:57:16 +0300 Subject: [PATCH 18/32] style --- src/Common/Exception.h | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/Common/Exception.h b/src/Common/Exception.h index 0129045abd3..91270d9116b 100644 --- a/src/Common/Exception.h +++ b/src/Common/Exception.h @@ -96,7 +96,7 @@ private: }; -/// Special class of exceptions, used mostly in ParallelParsingInputFormat for +/// Special class of exceptions, used mostly in ParallelParsingInputFormat for /// more convinient calculation of problem line number. class ParsingException : public Exception { @@ -104,13 +104,15 @@ public: using Exception::Exception; /// We use additional field formatted_message_ to make this method const. - std::string displayText() const override { + std::string displayText() const override + { try { formatted_message_ = fmt::format(message(), line_number_); } - catch (...) {} - + catch (...) + {} + if (!formatted_message_.empty()) { std::string result = name(); @@ -118,7 +120,7 @@ public: result.append(formatted_message_); return result; } - else + else { return Exception::displayText(); } From a9bd177b2e0cdcba18371bd5dcf7e2adb932fad5 Mon Sep 17 00:00:00 2001 From: nikitamikhaylov Date: Fri, 18 Dec 2020 03:17:26 +0300 Subject: [PATCH 19/32] fix --- src/Common/Exception.h | 24 ++++++++++++++++++++-- src/Processors/Formats/IRowInputFormat.cpp | 2 +- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/src/Common/Exception.h b/src/Common/Exception.h index 91270d9116b..e523aa515d7 100644 --- a/src/Common/Exception.h +++ b/src/Common/Exception.h @@ -102,13 +102,33 @@ class ParsingException : public Exception { public: using Exception::Exception; + + ParsingException() + { + Exception::addMessage("{}"); + } + + ParsingException(const std::string & msg, int code) + : Exception(msg, code) + { + Exception::addMessage("{}"); + } + + ParsingException(int code, const std::string & message) + : Exception(message, code) + { + Exception::addMessage("{}"); + } /// We use additional field formatted_message_ to make this method const. std::string displayText() const override { try { - formatted_message_ = fmt::format(message(), line_number_); + if (line_number_ == -1) + formatted_message_ = fmt::format(message(), ""); + else + formatted_message_ = fmt::format(message(), fmt::format("(at row {})\n", line_number_)); } catch (...) {} @@ -130,7 +150,7 @@ public: void setLineNumber(int line_number) { line_number_ = line_number;} private: - int line_number_; + ssize_t line_number_{-1}; mutable std::string formatted_message_; const char * name() const throw() override { return "DB::ParsingException"; } diff --git a/src/Processors/Formats/IRowInputFormat.cpp b/src/Processors/Formats/IRowInputFormat.cpp index 85a7750b0b1..79090ae2b89 100644 --- a/src/Processors/Formats/IRowInputFormat.cpp +++ b/src/Processors/Formats/IRowInputFormat.cpp @@ -149,7 +149,7 @@ Chunk IRowInputFormat::generate() } e.setLineNumber(total_rows); - e.addMessage("(at row {})\n" + verbose_diagnostic); + e.addMessage(verbose_diagnostic); throw; } catch (Exception & e) From 6f30fae34eb09416f4d7b9e2b037133dfd077d68 Mon Sep 17 00:00:00 2001 From: nikitamikhaylov Date: Fri, 18 Dec 2020 04:47:24 +0300 Subject: [PATCH 20/32] final fix --- src/Common/Exception.cpp | 44 ++++++++++++++++++++++++++++++++++++ src/Common/Exception.h | 48 ++++++++-------------------------------- 2 files changed, 53 insertions(+), 39 deletions(-) diff --git a/src/Common/Exception.cpp b/src/Common/Exception.cpp index d9bbb170dcc..50a78cc1c83 100644 --- a/src/Common/Exception.cpp +++ b/src/Common/Exception.cpp @@ -450,5 +450,49 @@ ExecutionStatus ExecutionStatus::fromCurrentException(const std::string & start_ return ExecutionStatus(getCurrentExceptionCode(), msg); } +ParsingException::ParsingException() +{ + Exception::message(Exception::message() + "{}"); +} + +ParsingException::ParsingException(const std::string & msg, int code) + : Exception(msg, code) +{ + Exception::message(Exception::message() + "{}"); +} + +ParsingException::ParsingException(int code, const std::string & message) + : Exception(message, code) +{ + Exception::message(Exception::message() + "{}"); +} + + +/// We use additional field formatted_message_ to make this method const. +std::string ParsingException::displayText() const +{ + try + { + if (line_number_ == -1) + formatted_message_ = fmt::format(message(), ""); + else + formatted_message_ = fmt::format(message(), fmt::format(": (at row {})\n", line_number_)); + } + catch (...) + {} + + if (!formatted_message_.empty()) + { + std::string result = name(); + result.append(": "); + result.append(formatted_message_); + return result; + } + else + { + return Exception::displayText(); + } +} + } diff --git a/src/Common/Exception.h b/src/Common/Exception.h index e523aa515d7..4e5d8c8ad47 100644 --- a/src/Common/Exception.h +++ b/src/Common/Exception.h @@ -101,50 +101,20 @@ private: class ParsingException : public Exception { public: - using Exception::Exception; + ParsingException(); + ParsingException(const std::string & msg, int code); + ParsingException(int code, const std::string & message); - ParsingException() + // Format message with fmt::format, like the logging functions. + template + ParsingException(int code, const std::string & fmt, Args&&... args) + : Exception(fmt::format(fmt, std::forward(args)...), code) { - Exception::addMessage("{}"); - } - - ParsingException(const std::string & msg, int code) - : Exception(msg, code) - { - Exception::addMessage("{}"); + Exception::message(Exception::message() + "{}"); } - ParsingException(int code, const std::string & message) - : Exception(message, code) - { - Exception::addMessage("{}"); - } - /// We use additional field formatted_message_ to make this method const. - std::string displayText() const override - { - try - { - if (line_number_ == -1) - formatted_message_ = fmt::format(message(), ""); - else - formatted_message_ = fmt::format(message(), fmt::format("(at row {})\n", line_number_)); - } - catch (...) - {} - - if (!formatted_message_.empty()) - { - std::string result = name(); - result.append(": "); - result.append(formatted_message_); - return result; - } - else - { - return Exception::displayText(); - } - } + std::string displayText() const override; int getLineNumber() { return line_number_; } void setLineNumber(int line_number) { line_number_ = line_number;} From 138b74aa3bbd95bfa5bb31c9e309e10c0587c8a5 Mon Sep 17 00:00:00 2001 From: Nikita Mikhailov Date: Wed, 23 Dec 2020 01:04:02 +0300 Subject: [PATCH 21/32] better --- src/DataStreams/NativeBlockInputStream.cpp | 2 +- src/DataTypes/DataTypeFixedString.cpp | 2 +- src/IO/ReadBuffer.h | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/DataStreams/NativeBlockInputStream.cpp b/src/DataStreams/NativeBlockInputStream.cpp index e9967e88638..b182d5e0588 100644 --- a/src/DataStreams/NativeBlockInputStream.cpp +++ b/src/DataStreams/NativeBlockInputStream.cpp @@ -83,7 +83,7 @@ void NativeBlockInputStream::readData(const IDataType & type, IColumn & column, type.deserializeBinaryBulkWithMultipleStreams(column, rows, settings, state); if (column.size() != rows) - throw ParsingException("Cannot read all data in NativeBlockInputStream. Rows read: " + toString(column.size()) + ". Rows expected: " + toString(rows) + ".", + throw Exception("Cannot read all data in NativeBlockInputStream. Rows read: " + toString(column.size()) + ". Rows expected: " + toString(rows) + ".", ErrorCodes::CANNOT_READ_ALL_DATA); } diff --git a/src/DataTypes/DataTypeFixedString.cpp b/src/DataTypes/DataTypeFixedString.cpp index c0b684cfcd9..585c5709be7 100644 --- a/src/DataTypes/DataTypeFixedString.cpp +++ b/src/DataTypes/DataTypeFixedString.cpp @@ -103,7 +103,7 @@ void DataTypeFixedString::deserializeBinaryBulk(IColumn & column, ReadBuffer & i size_t read_bytes = istr.readBig(reinterpret_cast(&data[initial_size]), max_bytes); if (read_bytes % n != 0) - throw ParsingException("Cannot read all data of type FixedString. Bytes read:" + toString(read_bytes) + ". String size:" + toString(n) + ".", + throw Exception("Cannot read all data of type FixedString. Bytes read:" + toString(read_bytes) + ". String size:" + toString(n) + ".", ErrorCodes::CANNOT_READ_ALL_DATA); data.resize(initial_size + read_bytes); diff --git a/src/IO/ReadBuffer.h b/src/IO/ReadBuffer.h index 08f1bc2a3bb..3d6eb6970ce 100644 --- a/src/IO/ReadBuffer.h +++ b/src/IO/ReadBuffer.h @@ -165,7 +165,7 @@ public: { auto read_bytes = read(to, n); if (n != read_bytes) - throw ParsingException("Cannot read all data. Bytes read: " + std::to_string(read_bytes) + ". Bytes expected: " + std::to_string(n) + ".", ErrorCodes::CANNOT_READ_ALL_DATA); + throw Exception("Cannot read all data. Bytes read: " + std::to_string(read_bytes) + ". Bytes expected: " + std::to_string(n) + ".", ErrorCodes::CANNOT_READ_ALL_DATA); } /** A method that can be more efficiently implemented in derived classes, in the case of reading large enough blocks. From 84d0084c1a7314cada3c8f23613167451eca6c3c Mon Sep 17 00:00:00 2001 From: Nikita Mikhailov Date: Wed, 23 Dec 2020 01:33:07 +0300 Subject: [PATCH 22/32] style --- src/Common/Exception.cpp | 2 +- src/Common/Exception.h | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/Common/Exception.cpp b/src/Common/Exception.cpp index 50a78cc1c83..b782471a4e8 100644 --- a/src/Common/Exception.cpp +++ b/src/Common/Exception.cpp @@ -456,7 +456,7 @@ ParsingException::ParsingException() } ParsingException::ParsingException(const std::string & msg, int code) - : Exception(msg, code) + : Exception(msg, code) { Exception::message(Exception::message() + "{}"); } diff --git a/src/Common/Exception.h b/src/Common/Exception.h index 4e5d8c8ad47..4b04de5d8a2 100644 --- a/src/Common/Exception.h +++ b/src/Common/Exception.h @@ -104,7 +104,7 @@ public: ParsingException(); ParsingException(const std::string & msg, int code); ParsingException(int code, const std::string & message); - + // Format message with fmt::format, like the logging functions. template ParsingException(int code, const std::string & fmt, Args&&... args) From 92b2f5ae98319722516d8088dfc37cda0d04f92a Mon Sep 17 00:00:00 2001 From: Nikita Mikhaylov Date: Wed, 23 Dec 2020 03:44:23 +0300 Subject: [PATCH 23/32] Update 00900_parquet_load.reference --- tests/queries/0_stateless/00900_parquet_load.reference | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/queries/0_stateless/00900_parquet_load.reference b/tests/queries/0_stateless/00900_parquet_load.reference index 52c5fbedcc6..1c890119486 100644 --- a/tests/queries/0_stateless/00900_parquet_load.reference +++ b/tests/queries/0_stateless/00900_parquet_load.reference @@ -257,7 +257,7 @@ Code: 70. DB::Ex---tion: The type "list" of an input column "int64_list" is not Code: 70. DB::Ex---tion: The type "list" of an input column "a" is not supported for conversion from a Parquet data format: data for INSERT was parsed from stdin === Try load data from nested_maps.snappy.parquet -Code: 70. DB::ParsingEx---tion: The type "map" of an input column "a" is not supported for conversion from a Parquet data format: data for INSERT was parsed from stdin +Code: 70. DB::Ex---tion: The type "map" of an input column "a" is not supported for conversion from a Parquet data format: data for INSERT was parsed from stdin === Try load data from non_hadoop_lz4_compressed.parquet 1593604800 abc 42 From 54d2fe847ff8a6070dc12eb1b0723590e7a2c3e9 Mon Sep 17 00:00:00 2001 From: Nikita Mikhaylov Date: Wed, 23 Dec 2020 05:02:54 +0300 Subject: [PATCH 24/32] Update src/DataStreams/ParallelParsingBlockInputStream.cpp Co-authored-by: Alexander Kuzmenkov <36882414+akuzm@users.noreply.github.com> --- src/DataStreams/ParallelParsingBlockInputStream.cpp | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/DataStreams/ParallelParsingBlockInputStream.cpp b/src/DataStreams/ParallelParsingBlockInputStream.cpp index 49b7929d7d7..2cd6fe8a8bf 100644 --- a/src/DataStreams/ParallelParsingBlockInputStream.cpp +++ b/src/DataStreams/ParallelParsingBlockInputStream.cpp @@ -209,7 +209,8 @@ void ParallelParsingBlockInputStream::onBackgroundException(size_t offset) { background_exception = std::current_exception(); - if (ParsingException * e = exception_cast(background_exception)) + if (ParsingException * e = exception_cast(background_exception) + && e->getLineNumber() != -1) { e->setLineNumber(e->getLineNumber() + offset); } From e1fc9122cc0270b20f4567d91d81022d204e2b2a Mon Sep 17 00:00:00 2001 From: Nikita Mikhaylov Date: Wed, 23 Dec 2020 05:28:53 +0300 Subject: [PATCH 25/32] Update ParallelParsingBlockInputStream.cpp --- src/DataStreams/ParallelParsingBlockInputStream.cpp | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/DataStreams/ParallelParsingBlockInputStream.cpp b/src/DataStreams/ParallelParsingBlockInputStream.cpp index 2cd6fe8a8bf..b7a0c3cab99 100644 --- a/src/DataStreams/ParallelParsingBlockInputStream.cpp +++ b/src/DataStreams/ParallelParsingBlockInputStream.cpp @@ -209,11 +209,9 @@ void ParallelParsingBlockInputStream::onBackgroundException(size_t offset) { background_exception = std::current_exception(); - if (ParsingException * e = exception_cast(background_exception) - && e->getLineNumber() != -1) - { - e->setLineNumber(e->getLineNumber() + offset); - } + if (ParsingException * e = exception_cast(background_exception)) + if (e->getLineNumber() != -1) + e->setLineNumber(e->getLineNumber() + offset); } tryLogCurrentException(__PRETTY_FUNCTION__); finished = true; From 7a9282ee7bc6884bad7ce19f575f1cdc2b30b15e Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Wed, 23 Dec 2020 15:31:16 +0300 Subject: [PATCH 26/32] Try fix ya.make --- src/DataStreams/ya.make | 1 + src/DataStreams/ya.make.in | 1 + 2 files changed, 2 insertions(+) diff --git a/src/DataStreams/ya.make b/src/DataStreams/ya.make index 776578af131..858bf7081e7 100644 --- a/src/DataStreams/ya.make +++ b/src/DataStreams/ya.make @@ -6,6 +6,7 @@ LIBRARY() PEERDIR( clickhouse/src/Common contrib/libs/poco/MongoDB + contrib/restricted/boost/libs ) NO_COMPILER_WARNINGS() diff --git a/src/DataStreams/ya.make.in b/src/DataStreams/ya.make.in index d6a683daa66..1624ddb799f 100644 --- a/src/DataStreams/ya.make.in +++ b/src/DataStreams/ya.make.in @@ -5,6 +5,7 @@ LIBRARY() PEERDIR( clickhouse/src/Common contrib/libs/poco/MongoDB + contrib/restricted/boost/libs ) NO_COMPILER_WARNINGS() From 34b6b8407ac14bc7abc338cbb34e79013979347e Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Wed, 23 Dec 2020 15:55:51 +0300 Subject: [PATCH 27/32] Fix website --- docs/tools/release.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/tools/release.sh b/docs/tools/release.sh index 9e529b25d0f..5a838474324 100755 --- a/docs/tools/release.sh +++ b/docs/tools/release.sh @@ -39,7 +39,7 @@ then then sleep 1m # https://api.cloudflare.com/#zone-purge-files-by-cache-tags,-host-or-prefix - POST_DATA='{"hosts":"clickhouse.tech"}' + POST_DATA='{"hosts":["clickhouse.tech"]}' curl -X POST "https://api.cloudflare.com/client/v4/zones/4fc6fb1d46e87851605aa7fa69ca6fe0/purge_cache" -H "Authorization: Bearer ${CLOUDFLARE_TOKEN}" -H "Content-Type:application/json" --data "${POST_DATA}" fi fi From e169b39fa78f4edca0983faf705118c977e35203 Mon Sep 17 00:00:00 2001 From: Alexey Milovidov Date: Wed, 23 Dec 2020 17:19:51 +0300 Subject: [PATCH 28/32] Fix website --- docs/tools/release.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/tools/release.sh b/docs/tools/release.sh index 5a838474324..389b63ace7f 100755 --- a/docs/tools/release.sh +++ b/docs/tools/release.sh @@ -39,7 +39,7 @@ then then sleep 1m # https://api.cloudflare.com/#zone-purge-files-by-cache-tags,-host-or-prefix - POST_DATA='{"hosts":["clickhouse.tech"]}' + POST_DATA='{"hosts":["content.clickhouse.tech"]}' curl -X POST "https://api.cloudflare.com/client/v4/zones/4fc6fb1d46e87851605aa7fa69ca6fe0/purge_cache" -H "Authorization: Bearer ${CLOUDFLARE_TOKEN}" -H "Content-Type:application/json" --data "${POST_DATA}" fi fi From e56a1c442c7116dba86a94556b0e81dc35a7d0d0 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Wed, 23 Dec 2020 18:20:52 +0300 Subject: [PATCH 29/32] Use ucontext fir arcadia build. --- base/common/defines.h | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/base/common/defines.h b/base/common/defines.h index 1029becb971..c9cc6feaae8 100644 --- a/base/common/defines.h +++ b/base/common/defines.h @@ -71,6 +71,10 @@ # define BOOST_USE_UCONTEXT 1 #endif +#if defined(ARCADIA_BUILD) +# define BOOST_USE_UCONTEXT 1 +#endif + /// TODO: Strange enough, there is no way to detect UB sanitizer. /// Explicitly allow undefined behaviour for certain functions. Use it as a function attribute. From 51481c9c5fa5235d2afcfdc98470dfc3fdb86cae Mon Sep 17 00:00:00 2001 From: Ivan <5627721+abyss7@users.noreply.github.com> Date: Wed, 23 Dec 2020 18:25:05 +0300 Subject: [PATCH 30/32] Add ANTLR test config to another json --- tests/ci/ci_config.json | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tests/ci/ci_config.json b/tests/ci/ci_config.json index 738c8e8482b..1efcf39601e 100644 --- a/tests/ci/ci_config.json +++ b/tests/ci/ci_config.json @@ -225,6 +225,18 @@ "with_coverage": false } }, + "Functional stateless tests (ANTLR debug)": { + "required_build_properties": { + "compiler": "clang-11", + "package_type": "deb", + "build_type": "debug", + "sanitizer": "none", + "bundled": "bundled", + "splitted": "unsplitted", + "clang-tidy": "disable", + "with_coverage": false + } + }, "Functional stateful tests (release)": { "required_build_properties": { "compiler": "clang-11", From 6fc225e676d3eaa4983ab03d9bbc2b789d1612f7 Mon Sep 17 00:00:00 2001 From: Amos Bird Date: Thu, 24 Dec 2020 00:04:05 +0800 Subject: [PATCH 31/32] Distributed insertion to one random shard (#18294) * Distributed insertion to one random shard * add some tests * add some documentation * Respect shards' weights * fine locking Co-authored-by: Ivan Lezhankin --- docs/en/operations/settings/settings.md | 12 +++++ src/Core/Settings.h | 4 +- .../DistributedBlockOutputStream.cpp | 44 +++++++++++++------ .../DistributedBlockOutputStream.h | 4 +- src/Storages/StorageDistributed.cpp | 31 ++++++++++++- src/Storages/StorageDistributed.h | 10 +++-- ...01615_random_one_shard_insertion.reference | 8 ++++ .../01615_random_one_shard_insertion.sql | 22 ++++++++++ 8 files changed, 115 insertions(+), 20 deletions(-) create mode 100644 tests/queries/0_stateless/01615_random_one_shard_insertion.reference create mode 100644 tests/queries/0_stateless/01615_random_one_shard_insertion.sql diff --git a/docs/en/operations/settings/settings.md b/docs/en/operations/settings/settings.md index 86e16fe4819..7f0a61ccd42 100644 --- a/docs/en/operations/settings/settings.md +++ b/docs/en/operations/settings/settings.md @@ -1855,6 +1855,18 @@ Default value: `0`. - [Distributed Table Engine](../../engines/table-engines/special/distributed.md#distributed) - [Managing Distributed Tables](../../sql-reference/statements/system.md#query-language-system-distributed) +## insert_distributed_one_random_shard {#insert_distributed_one_random_shard} + +Enables or disables random shard insertion into a [Distributed](../../engines/table-engines/special/distributed.md#distributed) table when there is no distributed key. + +By default, when inserting data into a `Distributed` table with more than one shard, the ClickHouse server will any insertion request if there is no distributed key. When `insert_distributed_one_random_shard = 1`, insertions are allowed and data is forwarded randomly among all shards. + +Possible values: + +- 0 — Insertion is rejected if there are multiple shards and no distributed key is given. +- 1 — Insertion is done randomly among all available shards when no distributed key is given. + +Default value: `0`. ## use_compact_format_in_distributed_parts_names {#use_compact_format_in_distributed_parts_names} diff --git a/src/Core/Settings.h b/src/Core/Settings.h index b1353c3009d..b09e960da36 100644 --- a/src/Core/Settings.h +++ b/src/Core/Settings.h @@ -490,7 +490,9 @@ class IColumn; \ M(Bool, output_format_enable_streaming, false, "Enable streaming in output formats that support it.", 0) \ M(Bool, output_format_write_statistics, true, "Write statistics about read rows, bytes, time elapsed in suitable output formats.", 0) \ - M(Bool, output_format_pretty_row_numbers, false, "Add row numbers before each row for pretty output format", 0) + M(Bool, output_format_pretty_row_numbers, false, "Add row numbers before each row for pretty output format", 0) \ + M(Bool, insert_distributed_one_random_shard, false, "If setting is enabled, inserting into distributed table will choose a random shard to write when there is no sharding key", 0) \ + // End of FORMAT_FACTORY_SETTINGS // Please add settings non-related to formats into the COMMON_SETTINGS above. diff --git a/src/Storages/Distributed/DistributedBlockOutputStream.cpp b/src/Storages/Distributed/DistributedBlockOutputStream.cpp index d24967256a0..040f33ea02e 100644 --- a/src/Storages/Distributed/DistributedBlockOutputStream.cpp +++ b/src/Storages/Distributed/DistributedBlockOutputStream.cpp @@ -138,11 +138,22 @@ void DistributedBlockOutputStream::write(const Block & block) void DistributedBlockOutputStream::writeAsync(const Block & block) { - if (storage.getShardingKeyExpr() && (cluster->getShardsInfo().size() > 1)) - return writeSplitAsync(block); + const Settings & settings = context.getSettingsRef(); + bool random_shard_insert = settings.insert_distributed_one_random_shard && !storage.has_sharding_key; - writeAsyncImpl(block); - ++inserted_blocks; + if (random_shard_insert) + { + writeAsyncImpl(block, storage.getRandomShardIndex(cluster->getShardsInfo())); + } + else + { + + if (storage.getShardingKeyExpr() && (cluster->getShardsInfo().size() > 1)) + return writeSplitAsync(block); + + writeAsyncImpl(block); + ++inserted_blocks; + } } @@ -175,18 +186,18 @@ std::string DistributedBlockOutputStream::getCurrentStateDescription() } -void DistributedBlockOutputStream::initWritingJobs(const Block & first_block) +void DistributedBlockOutputStream::initWritingJobs(const Block & first_block, size_t start, size_t end) { const Settings & settings = context.getSettingsRef(); const auto & addresses_with_failovers = cluster->getShardsAddresses(); const auto & shards_info = cluster->getShardsInfo(); - size_t num_shards = shards_info.size(); + size_t num_shards = end - start; remote_jobs_count = 0; local_jobs_count = 0; per_shard_jobs.resize(shards_info.size()); - for (size_t shard_index : ext::range(0, shards_info.size())) + for (size_t shard_index : ext::range(start, end)) { const auto & shard_info = shards_info[shard_index]; auto & shard_jobs = per_shard_jobs[shard_index]; @@ -242,10 +253,11 @@ void DistributedBlockOutputStream::waitForJobs() } -ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutputStream::JobReplica & job, const Block & current_block) +ThreadPool::Job +DistributedBlockOutputStream::runWritingJob(DistributedBlockOutputStream::JobReplica & job, const Block & current_block, size_t num_shards) { auto thread_group = CurrentThread::getGroup(); - return [this, thread_group, &job, ¤t_block]() + return [this, thread_group, &job, ¤t_block, num_shards]() { if (thread_group) CurrentThread::attachToIfDetached(thread_group); @@ -262,7 +274,6 @@ ThreadPool::Job DistributedBlockOutputStream::runWritingJob(DistributedBlockOutp }); const auto & shard_info = cluster->getShardsInfo()[job.shard_index]; - size_t num_shards = cluster->getShardsInfo().size(); auto & shard_job = per_shard_jobs[job.shard_index]; const auto & addresses = cluster->getShardsAddresses(); @@ -356,12 +367,19 @@ void DistributedBlockOutputStream::writeSync(const Block & block) { const Settings & settings = context.getSettingsRef(); const auto & shards_info = cluster->getShardsInfo(); - size_t num_shards = shards_info.size(); + bool random_shard_insert = settings.insert_distributed_one_random_shard && !storage.has_sharding_key; + size_t start = 0, end = shards_info.size(); + if (random_shard_insert) + { + start = storage.getRandomShardIndex(shards_info); + end = start + 1; + } + size_t num_shards = end - start; if (!pool) { /// Deferred initialization. Only for sync insertion. - initWritingJobs(block); + initWritingJobs(block, start, end); pool.emplace(remote_jobs_count + local_jobs_count); @@ -394,7 +412,7 @@ void DistributedBlockOutputStream::writeSync(const Block & block) finished_jobs_count = 0; for (size_t shard_index : ext::range(0, shards_info.size())) for (JobReplica & job : per_shard_jobs[shard_index].replicas_jobs) - pool->scheduleOrThrowOnError(runWritingJob(job, block)); + pool->scheduleOrThrowOnError(runWritingJob(job, block, num_shards)); } catch (...) { diff --git a/src/Storages/Distributed/DistributedBlockOutputStream.h b/src/Storages/Distributed/DistributedBlockOutputStream.h index 872776f0867..ef37776893a 100644 --- a/src/Storages/Distributed/DistributedBlockOutputStream.h +++ b/src/Storages/Distributed/DistributedBlockOutputStream.h @@ -73,10 +73,10 @@ private: /// Performs synchronous insertion to remote nodes. If timeout_exceeded flag was set, throws. void writeSync(const Block & block); - void initWritingJobs(const Block & first_block); + void initWritingJobs(const Block & first_block, size_t start, size_t end); struct JobReplica; - ThreadPool::Job runWritingJob(JobReplica & job, const Block & current_block); + ThreadPool::Job runWritingJob(DistributedBlockOutputStream::JobReplica & job, const Block & current_block, size_t num_shards); void waitForJobs(); diff --git a/src/Storages/StorageDistributed.cpp b/src/Storages/StorageDistributed.cpp index a991103d33b..4ce7efb60b4 100644 --- a/src/Storages/StorageDistributed.cpp +++ b/src/Storages/StorageDistributed.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include @@ -373,6 +374,7 @@ StorageDistributed::StorageDistributed( , cluster_name(global_context.getMacros()->expand(cluster_name_)) , has_sharding_key(sharding_key_) , relative_data_path(relative_data_path_) + , rng(randomSeed()) { StorageInMemoryMetadata storage_metadata; storage_metadata.setColumns(columns_); @@ -543,7 +545,8 @@ BlockOutputStreamPtr StorageDistributed::write(const ASTPtr &, const StorageMeta } /// If sharding key is not specified, then you can only write to a shard containing only one shard - if (!has_sharding_key && ((cluster->getLocalShardCount() + cluster->getRemoteShardCount()) >= 2)) + if (!settings.insert_distributed_one_random_shard && !has_sharding_key + && ((cluster->getLocalShardCount() + cluster->getRemoteShardCount()) >= 2)) { throw Exception("Method write is not supported by storage " + getName() + " with more than one shard and no sharding key provided", ErrorCodes::STORAGE_REQUIRES_PARAMETER); @@ -890,6 +893,32 @@ void StorageDistributed::rename(const String & new_path_to_table_data, const Sto } +size_t StorageDistributed::getRandomShardIndex(const Cluster::ShardsInfo & shards) +{ + + UInt32 total_weight = 0; + for (const auto & shard : shards) + total_weight += shard.weight; + + assert(total_weight > 0); + + size_t res; + { + std::lock_guard lock(rng_mutex); + res = std::uniform_int_distribution(0, total_weight - 1)(rng); + } + + for (auto i = 0ul, s = shards.size(); i < s; ++i) + { + if (shards[i].weight > res) + return i; + res -= shards[i].weight; + } + + __builtin_unreachable(); +} + + void StorageDistributed::renameOnDisk(const String & new_path_to_table_data) { for (const DiskPtr & disk : data_volume->getDisks()) diff --git a/src/Storages/StorageDistributed.h b/src/Storages/StorageDistributed.h index 58ade73b4cf..ce7e48c85a9 100644 --- a/src/Storages/StorageDistributed.h +++ b/src/Storages/StorageDistributed.h @@ -10,7 +10,9 @@ #include #include #include +#include +#include namespace DB { @@ -24,9 +26,6 @@ using VolumePtr = std::shared_ptr; class ExpressionActions; using ExpressionActionsPtr = std::shared_ptr; -class Cluster; -using ClusterPtr = std::shared_ptr; - /** A distributed table that resides on multiple servers. * Uses data from the specified database and tables on each server. * @@ -126,6 +125,8 @@ public: NamesAndTypesList getVirtuals() const override; + size_t getRandomShardIndex(const Cluster::ShardsInfo & shards); + String remote_database; String remote_table; ASTPtr remote_table_function_ptr; @@ -198,6 +199,9 @@ protected: std::unordered_map cluster_nodes_data; mutable std::mutex cluster_nodes_mutex; + // For random shard index generation + mutable std::mutex rng_mutex; + pcg64 rng; }; } diff --git a/tests/queries/0_stateless/01615_random_one_shard_insertion.reference b/tests/queries/0_stateless/01615_random_one_shard_insertion.reference new file mode 100644 index 00000000000..448a73c4789 --- /dev/null +++ b/tests/queries/0_stateless/01615_random_one_shard_insertion.reference @@ -0,0 +1,8 @@ +0 +0 +1 +1 +2 +2 +3 +3 diff --git a/tests/queries/0_stateless/01615_random_one_shard_insertion.sql b/tests/queries/0_stateless/01615_random_one_shard_insertion.sql new file mode 100644 index 00000000000..7d07629feda --- /dev/null +++ b/tests/queries/0_stateless/01615_random_one_shard_insertion.sql @@ -0,0 +1,22 @@ +drop table if exists shard; +drop table if exists distr; + +create table shard (id Int32) engine = MergeTree order by cityHash64(id); +create table distr as shard engine Distributed (test_cluster_two_shards_localhost, currentDatabase(), shard); + +insert into distr (id) values (0), (1); -- { serverError 55; } + +set insert_distributed_sync = 1; + +insert into distr (id) values (0), (1); -- { serverError 55; } + +set insert_distributed_sync = 0; +set insert_distributed_one_random_shard = 1; + +insert into distr (id) values (0), (1); +insert into distr (id) values (2), (3); + +select * from distr order by id; + +drop table if exists shard; +drop table if exists distr; From 8ffadac2a329449be45ef0828ec9275422deda93 Mon Sep 17 00:00:00 2001 From: Nikolai Kochetov Date: Wed, 23 Dec 2020 19:49:36 +0300 Subject: [PATCH 32/32] Try disable ucontect for arcadia --- base/common/defines.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/base/common/defines.h b/base/common/defines.h index c9cc6feaae8..39df4698b88 100644 --- a/base/common/defines.h +++ b/base/common/defines.h @@ -71,8 +71,8 @@ # define BOOST_USE_UCONTEXT 1 #endif -#if defined(ARCADIA_BUILD) -# define BOOST_USE_UCONTEXT 1 +#if defined(ARCADIA_BUILD) && defined(BOOST_USE_UCONTEXT) +# undef BOOST_USE_UCONTEXT #endif /// TODO: Strange enough, there is no way to detect UB sanitizer.