mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-26 01:22:04 +00:00
Merge branch 'master' of github.com:ClickHouse/ClickHouse into copier-segfault
This commit is contained in:
commit
bcb595e34c
@ -71,6 +71,10 @@
|
||||
# define BOOST_USE_UCONTEXT 1
|
||||
#endif
|
||||
|
||||
#if defined(ARCADIA_BUILD) && defined(BOOST_USE_UCONTEXT)
|
||||
# undef BOOST_USE_UCONTEXT
|
||||
#endif
|
||||
|
||||
/// TODO: Strange enough, there is no way to detect UB sanitizer.
|
||||
|
||||
/// Explicitly allow undefined behaviour for certain functions. Use it as a function attribute.
|
||||
|
@ -6,6 +6,7 @@ LIBRARY()
|
||||
PEERDIR(
|
||||
clickhouse/src/Common
|
||||
contrib/libs/poco/MongoDB
|
||||
contrib/restricted/boost/libs
|
||||
)
|
||||
|
||||
NO_COMPILER_WARNINGS()
|
||||
|
@ -5,6 +5,7 @@ LIBRARY()
|
||||
PEERDIR(
|
||||
clickhouse/src/Common
|
||||
contrib/libs/poco/MongoDB
|
||||
contrib/restricted/boost/libs
|
||||
)
|
||||
|
||||
NO_COMPILER_WARNINGS()
|
||||
|
@ -49,7 +49,9 @@ IMergeTreeDataPartWriter::IMergeTreeDataPartWriter(
|
||||
, columns_list(columns_list_)
|
||||
, settings(settings_)
|
||||
, index_granularity(index_granularity_)
|
||||
, with_final_mark(storage.getSettings()->write_final_mark && settings.can_use_adaptive_granularity){}
|
||||
, with_final_mark(storage.getSettings()->write_final_mark && settings.can_use_adaptive_granularity)
|
||||
{
|
||||
}
|
||||
|
||||
Columns IMergeTreeDataPartWriter::releaseIndexColumns()
|
||||
{
|
||||
|
@ -93,12 +93,12 @@ Granules getGranulesToWrite(const MergeTreeIndexGranularity & index_granularity,
|
||||
|
||||
result.emplace_back(Granule{
|
||||
.start_row = current_row,
|
||||
.granularity_rows = expected_rows_in_mark,
|
||||
.block_rows = std::min(rows_left_in_block, expected_rows_in_mark),
|
||||
.rows_to_write = std::min(rows_left_in_block, expected_rows_in_mark),
|
||||
.mark_number = current_mark,
|
||||
.mark_on_start = true
|
||||
.mark_on_start = true,
|
||||
.is_complete = (rows_left_in_block >= expected_rows_in_mark)
|
||||
});
|
||||
current_row += expected_rows_in_mark;
|
||||
current_row += result.back().rows_to_write;
|
||||
current_mark++;
|
||||
}
|
||||
|
||||
@ -173,8 +173,7 @@ void MergeTreeDataPartWriterCompact::writeDataBlock(const Block & block, const G
|
||||
{
|
||||
for (const auto & granule : granules)
|
||||
{
|
||||
if (granule.granularity_rows)
|
||||
data_written = true;
|
||||
data_written = true;
|
||||
|
||||
auto name_and_type = columns_list.begin();
|
||||
for (size_t i = 0; i < columns_list.size(); ++i, ++name_and_type)
|
||||
@ -206,13 +205,13 @@ void MergeTreeDataPartWriterCompact::writeDataBlock(const Block & block, const G
|
||||
writeIntBinary(plain_hashing.count(), marks);
|
||||
writeIntBinary(UInt64(0), marks);
|
||||
|
||||
writeColumnSingleGranule(block.getByName(name_and_type->name), stream_getter, granule.start_row, granule.granularity_rows);
|
||||
writeColumnSingleGranule(block.getByName(name_and_type->name), stream_getter, granule.start_row, granule.rows_to_write);
|
||||
|
||||
/// Each type always have at least one substream
|
||||
prev_stream->hashing_buf.next(); //-V522
|
||||
}
|
||||
|
||||
writeIntBinary(granule.block_rows, marks);
|
||||
writeIntBinary(granule.rows_to_write, marks);
|
||||
}
|
||||
}
|
||||
|
||||
@ -222,11 +221,11 @@ void MergeTreeDataPartWriterCompact::finishDataSerialization(IMergeTreeDataPart:
|
||||
{
|
||||
auto block = header.cloneWithColumns(columns_buffer.releaseColumns());
|
||||
auto granules_to_write = getGranulesToWrite(index_granularity, block.rows(), getCurrentMark(), /* last_block = */ true);
|
||||
if (!granules_to_write.back().isCompleted())
|
||||
if (!granules_to_write.back().is_complete)
|
||||
{
|
||||
/// Correct last mark as it should contain exact amount of rows.
|
||||
index_granularity.popMark();
|
||||
index_granularity.appendMark(granules_to_write.back().block_rows);
|
||||
index_granularity.appendMark(granules_to_write.back().rows_to_write);
|
||||
}
|
||||
writeDataBlockPrimaryIndexAndSkipIndices(block, granules_to_write);
|
||||
}
|
||||
|
@ -218,6 +218,12 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializeSkipIndices(const Block
|
||||
auto & stream = *skip_indices_streams[i];
|
||||
for (const auto & granule : granules_to_write)
|
||||
{
|
||||
if (skip_index_accumulated_marks[i] == index_helper->index.granularity)
|
||||
{
|
||||
skip_indices_aggregators[i]->getGranuleAndReset()->serializeBinary(stream.compressed);
|
||||
skip_index_accumulated_marks[i] = 0;
|
||||
}
|
||||
|
||||
if (skip_indices_aggregators[i]->empty() && granule.mark_on_start)
|
||||
{
|
||||
skip_indices_aggregators[i] = index_helper->createIndexAggregator();
|
||||
@ -234,18 +240,9 @@ void MergeTreeDataPartWriterOnDisk::calculateAndSerializeSkipIndices(const Block
|
||||
}
|
||||
|
||||
size_t pos = granule.start_row;
|
||||
skip_indices_aggregators[i]->update(skip_indexes_block, &pos, granule.granularity_rows);
|
||||
if (granule.isCompleted())
|
||||
{
|
||||
skip_indices_aggregators[i]->update(skip_indexes_block, &pos, granule.rows_to_write);
|
||||
if (granule.is_complete)
|
||||
++skip_index_accumulated_marks[i];
|
||||
|
||||
/// write index if it is filled
|
||||
if (skip_index_accumulated_marks[i] == index_helper->index.granularity)
|
||||
{
|
||||
skip_indices_aggregators[i]->getGranuleAndReset()->serializeBinary(stream.compressed);
|
||||
skip_index_accumulated_marks[i] = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -20,25 +20,18 @@ struct Granule
|
||||
{
|
||||
/// Start row in block for granule
|
||||
size_t start_row;
|
||||
/// Amount of rows which granule have to contain according to index
|
||||
/// granularity.
|
||||
/// NOTE: Sometimes it's not equal to actually written rows, for example
|
||||
/// for the last granule if it's smaller than computed granularity.
|
||||
size_t granularity_rows;
|
||||
/// Amount of rows from block which have to be written to disk from start_row
|
||||
size_t block_rows;
|
||||
size_t rows_to_write;
|
||||
/// Global mark number in the list of all marks (index_granularity) for this part
|
||||
size_t mark_number;
|
||||
/// Should writer write mark for the first of this granule to disk.
|
||||
/// NOTE: Sometimes we don't write mark for the start row, because
|
||||
/// this granule can be continuation of the previous one.
|
||||
bool mark_on_start;
|
||||
|
||||
/// Is this granule contain amout of rows equal to the value in index granularity
|
||||
bool isCompleted() const
|
||||
{
|
||||
return granularity_rows == block_rows;
|
||||
}
|
||||
/// if true: When this granule will be written to disk all rows for corresponding mark will
|
||||
/// be wrtten. It doesn't mean that rows_to_write == index_granularity.getMarkRows(mark_number),
|
||||
/// We may have a lot of small blocks between two marks and this may be the last one.
|
||||
bool is_complete;
|
||||
};
|
||||
|
||||
/// Multiple granules to write for concrete block.
|
||||
|
@ -33,12 +33,12 @@ Granules getGranulesToWrite(const MergeTreeIndexGranularity & index_granularity,
|
||||
size_t rows_left_in_block = block_rows - current_row;
|
||||
result.emplace_back(Granule{
|
||||
.start_row = current_row,
|
||||
.granularity_rows = rows_left_in_last_mark,
|
||||
.block_rows = std::min(rows_left_in_block, rows_left_in_last_mark),
|
||||
.rows_to_write = std::min(rows_left_in_block, rows_left_in_last_mark),
|
||||
.mark_number = current_mark,
|
||||
.mark_on_start = false, /// Don't mark this granule because we have already marked it
|
||||
.is_complete = (rows_left_in_block >= rows_left_in_last_mark),
|
||||
});
|
||||
current_row += rows_left_in_last_mark;
|
||||
current_row += result.back().rows_to_write;
|
||||
current_mark++;
|
||||
}
|
||||
|
||||
@ -51,12 +51,12 @@ Granules getGranulesToWrite(const MergeTreeIndexGranularity & index_granularity,
|
||||
/// save incomplete granule
|
||||
result.emplace_back(Granule{
|
||||
.start_row = current_row,
|
||||
.granularity_rows = expected_rows_in_mark,
|
||||
.block_rows = std::min(rows_left_in_block, expected_rows_in_mark),
|
||||
.rows_to_write = std::min(rows_left_in_block, expected_rows_in_mark),
|
||||
.mark_number = current_mark,
|
||||
.mark_on_start = true,
|
||||
.is_complete = (rows_left_in_block >= expected_rows_in_mark),
|
||||
});
|
||||
current_row += expected_rows_in_mark;
|
||||
current_row += result.back().rows_to_write;
|
||||
current_mark++;
|
||||
}
|
||||
|
||||
@ -136,11 +136,12 @@ IDataType::OutputStreamGetter MergeTreeDataPartWriterWide::createStreamGetter(
|
||||
};
|
||||
}
|
||||
|
||||
|
||||
void MergeTreeDataPartWriterWide::shiftCurrentMark(const Granules & granules_written)
|
||||
{
|
||||
auto last_granule = granules_written.back();
|
||||
/// If we didn't finished last granule than we will continue to write it from new block
|
||||
if (!last_granule.isCompleted())
|
||||
if (!last_granule.is_complete)
|
||||
{
|
||||
/// Shift forward except last granule
|
||||
setCurrentMark(getCurrentMark() + granules_written.size() - 1);
|
||||
@ -148,9 +149,9 @@ void MergeTreeDataPartWriterWide::shiftCurrentMark(const Granules & granules_wri
|
||||
/// We wrote whole block in the same granule, but didn't finished it.
|
||||
/// So add written rows to rows written in last_mark
|
||||
if (still_in_the_same_granule)
|
||||
rows_written_in_last_mark += last_granule.block_rows;
|
||||
rows_written_in_last_mark += last_granule.rows_to_write;
|
||||
else
|
||||
rows_written_in_last_mark = last_granule.block_rows;
|
||||
rows_written_in_last_mark = last_granule.rows_to_write;
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -167,6 +168,23 @@ void MergeTreeDataPartWriterWide::write(const Block & block, const IColumn::Perm
|
||||
if (compute_granularity)
|
||||
{
|
||||
size_t index_granularity_for_block = computeIndexGranularity(block);
|
||||
if (rows_written_in_last_mark > 0)
|
||||
{
|
||||
size_t rows_left_in_last_mark = index_granularity.getMarkRows(getCurrentMark()) - rows_written_in_last_mark;
|
||||
/// Previous granularity was much bigger than our new block's
|
||||
/// granularity let's adjust it, because we want add new
|
||||
/// heavy-weight blocks into small old granule.
|
||||
if (rows_left_in_last_mark > index_granularity_for_block)
|
||||
{
|
||||
/// We have already written more rows than granularity of our block.
|
||||
/// adjust last mark rows and flush to disk.
|
||||
if (rows_written_in_last_mark >= index_granularity_for_block)
|
||||
adjustLastMarkIfNeedAndFlushToDisk(rows_written_in_last_mark);
|
||||
else /// We still can write some rows from new block into previous granule.
|
||||
adjustLastMarkIfNeedAndFlushToDisk(index_granularity_for_block - rows_written_in_last_mark);
|
||||
}
|
||||
}
|
||||
|
||||
fillIndexGranularity(index_granularity_for_block, block.rows());
|
||||
}
|
||||
|
||||
@ -281,10 +299,7 @@ void MergeTreeDataPartWriterWide::writeSingleGranule(
|
||||
IDataType::SerializeBinaryBulkSettings & serialize_settings,
|
||||
const Granule & granule)
|
||||
{
|
||||
if (granule.mark_on_start)
|
||||
writeSingleMark(name, type, offset_columns, granule.granularity_rows, serialize_settings.path);
|
||||
|
||||
type.serializeBinaryBulkWithMultipleStreams(column, granule.start_row, granule.granularity_rows, serialize_settings, serialization_state);
|
||||
type.serializeBinaryBulkWithMultipleStreams(column, granule.start_row, granule.rows_to_write, serialize_settings, serialization_state);
|
||||
|
||||
/// So that instead of the marks pointing to the end of the compressed block, there were marks pointing to the beginning of the next one.
|
||||
type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */)
|
||||
@ -309,6 +324,9 @@ void MergeTreeDataPartWriterWide::writeColumn(
|
||||
WrittenOffsetColumns & offset_columns,
|
||||
const Granules & granules)
|
||||
{
|
||||
if (granules.empty())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Empty granules for column {}, current mark {}", backQuoteIfNeed(name), getCurrentMark());
|
||||
|
||||
auto [it, inserted] = serialization_states.emplace(name, nullptr);
|
||||
|
||||
if (inserted)
|
||||
@ -326,8 +344,14 @@ void MergeTreeDataPartWriterWide::writeColumn(
|
||||
|
||||
for (const auto & granule : granules)
|
||||
{
|
||||
if (granule.granularity_rows > 0)
|
||||
data_written = true;
|
||||
data_written = true;
|
||||
|
||||
if (granule.mark_on_start)
|
||||
{
|
||||
if (last_non_written_marks.count(name))
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "We have to add new mark for column, but already have non written mark. Current mark {}, total marks {}, offset {}", getCurrentMark(), index_granularity.getMarksCount(), rows_written_in_last_mark);
|
||||
last_non_written_marks[name] = getCurrentMarksForColumn(name, type, offset_columns, serialize_settings.path);
|
||||
}
|
||||
|
||||
writeSingleGranule(
|
||||
name,
|
||||
@ -338,6 +362,17 @@ void MergeTreeDataPartWriterWide::writeColumn(
|
||||
serialize_settings,
|
||||
granule
|
||||
);
|
||||
|
||||
if (granule.is_complete)
|
||||
{
|
||||
auto marks_it = last_non_written_marks.find(name);
|
||||
if (marks_it == last_non_written_marks.end())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "No mark was saved for incomplete granule for column {}", backQuoteIfNeed(name));
|
||||
|
||||
for (const auto & mark : marks_it->second)
|
||||
flushMarkToFile(mark, index_granularity.getMarkRows(granule.mark_number));
|
||||
last_non_written_marks.erase(marks_it);
|
||||
}
|
||||
}
|
||||
|
||||
type.enumerateStreams([&] (const IDataType::SubstreamPath & substream_path, const IDataType & /* substream_type */)
|
||||
@ -365,7 +400,7 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const String & name,
|
||||
bool must_be_last = false;
|
||||
UInt64 offset_in_compressed_file = 0;
|
||||
UInt64 offset_in_decompressed_block = 0;
|
||||
UInt64 index_granularity_rows = 0;
|
||||
UInt64 index_granularity_rows = data_part->index_granularity_info.fixed_index_granularity;
|
||||
|
||||
size_t mark_num;
|
||||
|
||||
@ -379,7 +414,7 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const String & name,
|
||||
if (settings.can_use_adaptive_granularity)
|
||||
DB::readBinary(index_granularity_rows, mrk_in);
|
||||
else
|
||||
index_granularity_rows = storage.getSettings()->index_granularity;
|
||||
index_granularity_rows = data_part->index_granularity_info.fixed_index_granularity;
|
||||
|
||||
if (must_be_last)
|
||||
{
|
||||
@ -404,8 +439,8 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const String & name,
|
||||
|
||||
if (index_granularity_rows != index_granularity.getMarkRows(mark_num))
|
||||
throw Exception(
|
||||
ErrorCodes::LOGICAL_ERROR, "Incorrect mark rows for mark #{} (compressed offset {}, decompressed offset {}), in-memory {}, on disk {}",
|
||||
mark_num, offset_in_compressed_file, offset_in_decompressed_block, index_granularity.getMarkRows(mark_num), index_granularity_rows);
|
||||
ErrorCodes::LOGICAL_ERROR, "Incorrect mark rows for part {} for mark #{} (compressed offset {}, decompressed offset {}), in-memory {}, on disk {}, total marks {}",
|
||||
data_part->getFullPath(), mark_num, offset_in_compressed_file, offset_in_decompressed_block, index_granularity.getMarkRows(mark_num), index_granularity_rows, index_granularity.getMarksCount());
|
||||
|
||||
auto column = type.createColumn();
|
||||
|
||||
@ -415,8 +450,13 @@ void MergeTreeDataPartWriterWide::validateColumnOfFixedSize(const String & name,
|
||||
{
|
||||
must_be_last = true;
|
||||
}
|
||||
else if (column->size() != index_granularity_rows)
|
||||
|
||||
/// Now they must be equal
|
||||
if (column->size() != index_granularity_rows)
|
||||
{
|
||||
if (must_be_last && !settings.can_use_adaptive_granularity)
|
||||
break;
|
||||
|
||||
throw Exception(
|
||||
ErrorCodes::LOGICAL_ERROR, "Incorrect mark rows for mark #{} (compressed offset {}, decompressed offset {}), actually in bin file {}, in mrk file {}",
|
||||
mark_num, offset_in_compressed_file, offset_in_decompressed_block, column->size(), index_granularity.getMarkRows(mark_num));
|
||||
@ -445,6 +485,8 @@ void MergeTreeDataPartWriterWide::finishDataSerialization(IMergeTreeDataPart::Ch
|
||||
serialize_settings.low_cardinality_max_dictionary_size = global_settings.low_cardinality_max_dictionary_size;
|
||||
serialize_settings.low_cardinality_use_single_dictionary_for_part = global_settings.low_cardinality_use_single_dictionary_for_part != 0;
|
||||
WrittenOffsetColumns offset_columns;
|
||||
if (rows_written_in_last_mark > 0)
|
||||
adjustLastMarkIfNeedAndFlushToDisk(rows_written_in_last_mark);
|
||||
|
||||
bool write_final_mark = (with_final_mark && data_written);
|
||||
|
||||
@ -474,6 +516,8 @@ void MergeTreeDataPartWriterWide::finishDataSerialization(IMergeTreeDataPart::Ch
|
||||
serialization_states.clear();
|
||||
|
||||
#ifndef NDEBUG
|
||||
/// Heavy weight validation of written data. Checks that we are able to read
|
||||
/// data according to marks. Otherwise throws LOGICAL_ERROR (equal to about in debug mode)
|
||||
for (const auto & column : columns_list)
|
||||
{
|
||||
if (column.type->isValueRepresentedByNumber() && !column.type->haveSubtypes())
|
||||
@ -537,4 +581,50 @@ void MergeTreeDataPartWriterWide::fillIndexGranularity(size_t index_granularity_
|
||||
rows_in_block);
|
||||
}
|
||||
|
||||
|
||||
void MergeTreeDataPartWriterWide::adjustLastMarkIfNeedAndFlushToDisk(size_t new_rows_in_last_mark)
|
||||
{
|
||||
/// We can adjust marks only if we computed granularity for blocks.
|
||||
/// Otherwise we cannot change granularity because it will differ from
|
||||
/// other columns
|
||||
if (compute_granularity && settings.can_use_adaptive_granularity)
|
||||
{
|
||||
if (getCurrentMark() != index_granularity.getMarksCount() - 1)
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Non last mark {} (with {} rows) having rows offset {}, total marks {}",
|
||||
getCurrentMark(), index_granularity.getMarkRows(getCurrentMark()), rows_written_in_last_mark, index_granularity.getMarksCount());
|
||||
|
||||
index_granularity.popMark();
|
||||
index_granularity.appendMark(new_rows_in_last_mark);
|
||||
}
|
||||
|
||||
/// Last mark should be filled, otherwise it's a bug
|
||||
if (last_non_written_marks.empty())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "No saved marks for last mark {} having rows offset {}, total marks {}",
|
||||
getCurrentMark(), rows_written_in_last_mark, index_granularity.getMarksCount());
|
||||
|
||||
if (rows_written_in_last_mark == new_rows_in_last_mark)
|
||||
{
|
||||
for (const auto & [name, marks] : last_non_written_marks)
|
||||
{
|
||||
for (const auto & mark : marks)
|
||||
flushMarkToFile(mark, index_granularity.getMarkRows(getCurrentMark()));
|
||||
}
|
||||
|
||||
last_non_written_marks.clear();
|
||||
|
||||
if (compute_granularity && settings.can_use_adaptive_granularity)
|
||||
{
|
||||
/// Also we add mark to each skip index because all of them
|
||||
/// already accumulated all rows from current adjusting mark
|
||||
for (size_t i = 0; i < skip_indices.size(); ++i)
|
||||
++skip_index_accumulated_marks[i];
|
||||
|
||||
/// This mark completed, go further
|
||||
setCurrentMark(getCurrentMark() + 1);
|
||||
/// Without offset
|
||||
rows_written_in_last_mark = 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -99,6 +99,14 @@ private:
|
||||
/// in our index_granularity array.
|
||||
void shiftCurrentMark(const Granules & granules_written);
|
||||
|
||||
/// Change rows in the last mark in index_granularity to new_rows_in_last_mark.
|
||||
/// Flush all marks from last_non_written_marks to disk and increment current mark if already written rows
|
||||
/// (rows_written_in_last_granule) equal to new_rows_in_last_mark.
|
||||
///
|
||||
/// This function used when blocks change granularity drastically and we have unfinished mark.
|
||||
/// Also useful to have exact amount of rows in last (non-final) mark.
|
||||
void adjustLastMarkIfNeedAndFlushToDisk(size_t new_rows_in_last_mark);
|
||||
|
||||
IDataType::OutputStreamGetter createStreamGetter(const String & name, WrittenOffsetColumns & offset_columns) const;
|
||||
|
||||
using SerializationState = IDataType::SerializeBinaryBulkStatePtr;
|
||||
@ -108,6 +116,10 @@ private:
|
||||
|
||||
using ColumnStreams = std::map<String, StreamPtr>;
|
||||
ColumnStreams column_streams;
|
||||
/// Non written marks to disk (for each column). Waiting until all rows for
|
||||
/// this marks will be written to disk.
|
||||
using MarksForColumns = std::unordered_map<String, StreamsWithMarks>;
|
||||
MarksForColumns last_non_written_marks;
|
||||
|
||||
/// How many rows we have already written in the current mark.
|
||||
/// More than zero when incoming blocks are smaller then their granularity.
|
||||
|
@ -1 +1 @@
|
||||
0 36 14
|
||||
0 36 13
|
||||
|
@ -18,7 +18,7 @@ INSERT INTO mt_with_pk (d, x, y, z, `n.Age`, `n.Name`) VALUES (toDate('2018-10-0
|
||||
|
||||
SELECT COUNT(*) FROM mt_with_pk WHERE x > toDateTime('2018-10-01 23:57:57');
|
||||
|
||||
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_pk' AND database = currentDatabase() AND active=1;
|
||||
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_pk' AND database = currentDatabase() AND active=1 AND database = currentDatabase();
|
||||
|
||||
SELECT '===test merge===';
|
||||
INSERT INTO mt_with_pk (d, x, y, z, `n.Age`, `n.Name`) VALUES (toDate('2018-10-01'), toDateTime('2018-10-01 07:57:57'), [4, 4, 4], 14, [111, 222], ['Lui', 'Dave']), (toDate('2018-10-01'), toDateTime('2018-10-01 08:57:57'), [5, 5, 5], 15, [333, 444], ['John', 'Mike']), (toDate('2018-10-01'), toDateTime('2018-10-01 09:57:57'), [6, 6, 6], 16, [555, 666, 777], ['Alex', 'Jim', 'Tom']);
|
||||
@ -27,7 +27,7 @@ OPTIMIZE TABLE mt_with_pk FINAL;
|
||||
|
||||
SELECT COUNT(*) FROM mt_with_pk WHERE x > toDateTime('2018-10-01 23:57:57');
|
||||
|
||||
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_pk' AND database = currentDatabase() AND active=1;
|
||||
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_pk' AND database = currentDatabase() AND active=1 AND database = currentDatabase();
|
||||
|
||||
SELECT '===test alter===';
|
||||
ALTER TABLE mt_with_pk MODIFY COLUMN y Array(String);
|
||||
@ -38,7 +38,7 @@ OPTIMIZE TABLE mt_with_pk FINAL;
|
||||
|
||||
SELECT COUNT(*) FROM mt_with_pk WHERE x > toDateTime('2018-10-01 23:57:57');
|
||||
|
||||
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_pk' AND database = currentDatabase() AND active=1;
|
||||
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_pk' AND database = currentDatabase() AND active=1 AND database = currentDatabase();
|
||||
|
||||
SELECT '===test mutation===';
|
||||
ALTER TABLE mt_with_pk UPDATE w = 0 WHERE 1 SETTINGS mutations_sync = 2;
|
||||
@ -58,7 +58,7 @@ OPTIMIZE TABLE mt_with_pk FINAL;
|
||||
|
||||
SELECT COUNT(*) FROM mt_with_pk WHERE z + w > 5000;
|
||||
|
||||
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_pk' AND database = currentDatabase() AND active=1;
|
||||
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_pk' AND database = currentDatabase() AND active=1 AND database = currentDatabase();
|
||||
|
||||
DROP TABLE IF EXISTS mt_with_pk;
|
||||
|
||||
@ -119,7 +119,7 @@ INSERT INTO mt_without_pk (d, x, y, z, `n.Age`, `n.Name`) VALUES (toDate('2018-1
|
||||
|
||||
SELECT COUNT(*) FROM mt_without_pk WHERE x > toDateTime('2018-10-01 23:57:57');
|
||||
|
||||
SELECT sum(marks) FROM system.parts WHERE table = 'mt_without_pk' AND active=1;
|
||||
SELECT sum(marks) FROM system.parts WHERE table = 'mt_without_pk' AND active=1 AND database = currentDatabase();
|
||||
|
||||
INSERT INTO mt_without_pk (d, x, y, z, `n.Age`, `n.Name`) VALUES (toDate('2018-10-01'), toDateTime('2018-10-01 07:57:57'), [4, 4, 4], 14, [111, 222], ['Lui', 'Dave']), (toDate('2018-10-01'), toDateTime('2018-10-01 08:57:57'), [5, 5, 5], 15, [333, 444], ['John', 'Mike']), (toDate('2018-10-01'), toDateTime('2018-10-01 09:57:57'), [6, 6, 6], 16, [555, 666, 777], ['Alex', 'Jim', 'Tom']);
|
||||
|
||||
@ -127,7 +127,7 @@ OPTIMIZE TABLE mt_without_pk FINAL;
|
||||
|
||||
SELECT COUNT(*) FROM mt_without_pk WHERE x > toDateTime('2018-10-01 23:57:57');
|
||||
|
||||
SELECT sum(marks) FROM system.parts WHERE table = 'mt_without_pk' AND active=1;
|
||||
SELECT sum(marks) FROM system.parts WHERE table = 'mt_without_pk' AND active=1 AND database = currentDatabase();
|
||||
|
||||
DROP TABLE IF EXISTS mt_without_pk;
|
||||
|
||||
@ -149,7 +149,7 @@ INSERT INTO mt_with_small_granularity (d, x, y, z, `n.Age`, `n.Name`) VALUES (to
|
||||
|
||||
SELECT COUNT(*) FROM mt_with_small_granularity WHERE x > toDateTime('2018-10-01 23:57:57');
|
||||
|
||||
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_small_granularity' AND active=1;
|
||||
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_small_granularity' AND active=1 AND database = currentDatabase();
|
||||
|
||||
INSERT INTO mt_with_small_granularity (d, x, y, z, `n.Age`, `n.Name`) VALUES (toDate('2018-10-01'), toDateTime('2018-10-01 07:57:57'), [4, 4, 4], 14, [111, 222], ['Lui', 'Dave']), (toDate('2018-10-01'), toDateTime('2018-10-01 08:57:57'), [5, 5, 5], 15, [333, 444], ['John', 'Mike']), (toDate('2018-10-01'), toDateTime('2018-10-01 09:57:57'), [6, 6, 6], 16, [555, 666, 777], ['Alex', 'Jim', 'Tom']);
|
||||
|
||||
@ -157,6 +157,6 @@ OPTIMIZE TABLE mt_with_small_granularity FINAL;
|
||||
|
||||
SELECT COUNT(*) FROM mt_with_small_granularity WHERE x > toDateTime('2018-10-01 23:57:57');
|
||||
|
||||
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_small_granularity' AND active=1;
|
||||
SELECT sum(marks) FROM system.parts WHERE table = 'mt_with_small_granularity' AND active=1 AND database = currentDatabase();
|
||||
|
||||
DROP TABLE IF EXISTS mt_with_small_granularity;
|
||||
|
@ -1 +1 @@
|
||||
20000101_1_1_0 test_00961 5f2e2d4bbc14336f44037e3ac667f247 ed226557cd4e18ecf3ae06c6d5e6725c da96ff1e527a8a1f908ddf2b1d0af239
|
||||
20000101_1_1_0 test_00961 b5fce9c4ef1ca42ce4ed027389c208d2 fc3b062b646cd23d4c23d7f5920f89ae da96ff1e527a8a1f908ddf2b1d0af239
|
||||
|
@ -0,0 +1,2 @@
|
||||
849
|
||||
102400
|
@ -0,0 +1,28 @@
|
||||
DROP TABLE IF EXISTS adaptive_table;
|
||||
|
||||
--- If granularity of consequent blocks differs a lot, then adaptive
|
||||
--- granularity will adjust amout of marks correctly. Data for test empirically
|
||||
--- derived, it's quite hard to get good parameters.
|
||||
|
||||
CREATE TABLE adaptive_table(
|
||||
key UInt64,
|
||||
value String
|
||||
) ENGINE MergeTree()
|
||||
ORDER BY key
|
||||
SETTINGS index_granularity_bytes=1048576, min_bytes_for_wide_part = 0, enable_vertical_merge_algorithm = 0;
|
||||
|
||||
SET max_block_size=900;
|
||||
|
||||
-- There are about 900 marks for our settings.
|
||||
INSERT INTO adaptive_table SELECT number, if(number > 700, randomPrintableASCII(102400), randomPrintableASCII(1)) FROM numbers(10000);
|
||||
|
||||
OPTIMIZE TABLE adaptive_table FINAL;
|
||||
|
||||
SELECT marks FROM system.parts WHERE table = 'adaptive_table' and database=currentDatabase() and active;
|
||||
|
||||
-- If we have computed granularity incorrectly than we will exceed this limit.
|
||||
SET max_memory_usage='30M';
|
||||
|
||||
SELECT max(length(value)) FROM adaptive_table;
|
||||
|
||||
DROP TABLE IF EXISTS adaptive_table;
|
Loading…
Reference in New Issue
Block a user