This commit is contained in:
Alexey Milovidov 2016-04-15 22:37:19 +03:00
parent 2bb5a4749b
commit d6ba48a610
4 changed files with 171 additions and 51 deletions

View File

@ -11,6 +11,7 @@
#include <DB/IO/CompressedReadBuffer.h>
#include <DB/IO/HexWriteBuffer.h>
#include <DB/DataTypes/DataTypeDate.h>
#include <DB/DataTypes/DataTypeDateTime.h>
#include <DB/DataTypes/DataTypeFixedString.h>
#include <DB/DataTypes/DataTypeEnum.h>
#include <DB/Common/localBackup.h>
@ -155,9 +156,11 @@ void MergeTreeData::MergingParams::check(const NamesAndTypesList & columns) cons
if (!typeid_cast<const DataTypeUInt8 *>(column.type.get())
&& !typeid_cast<const DataTypeUInt16 *>(column.type.get())
&& !typeid_cast<const DataTypeUInt32 *>(column.type.get())
&& !typeid_cast<const DataTypeUInt64 *>(column.type.get()))
&& !typeid_cast<const DataTypeUInt64 *>(column.type.get())
&& !typeid_cast<const DataTypeDate *>(column.type.get())
&& !typeid_cast<const DataTypeDateTime *>(column.type.get()))
throw Exception("Version column (" + version_column + ")"
" for storage ReplacingMergeTree must have type of UInt family."
" for storage ReplacingMergeTree must have type of UInt family or Date or DateTime."
" Provided column of type " + column.type->getName() + ".", ErrorCodes::BAD_TYPE_OF_FIELD);
break;
}

View File

@ -742,59 +742,28 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreadsFinal
BlockInputStreams to_merge;
if (settings.merge_tree_uniform_read_distribution == 1)
/// NOTE merge_tree_uniform_read_distribution не используется для FINAL
for (size_t part_index = 0; part_index < parts.size(); ++part_index)
{
/// Пусть отрезки будут перечислены справа налево, чтобы можно было выбрасывать самый левый отрезок с помощью pop_back().
for (auto & part : parts)
std::reverse(std::begin(part.ranges), std::end(part.ranges));
RangesInDataPart & part = parts[part_index];
MergeTreeReadPoolPtr pool = std::make_shared<MergeTreeReadPool>(
parts.size(), sum_marks, min_marks_for_read_task, parts, data, prewhere_actions, prewhere_column, true,
column_names, MergeTreeReadPool::BackoffSettings{}, true);
BlockInputStreamPtr source_stream = new MergeTreeBlockInputStream(
data.getFullPath() + part.data_part->name + '/', max_block_size, column_names, data,
part.data_part, part.ranges, use_uncompressed_cache,
prewhere_actions, prewhere_column, true, settings.min_bytes_to_use_direct_io, settings.max_read_buffer_size, true);
/// Оценим общее количество строк - для прогресс-бара.
const std::size_t total_rows = data.index_granularity * sum_marks;
LOG_TRACE(log, "Reading approx. " << total_rows << " rows");
for (const auto i : ext::range(0, parts.size()))
for (const String & virt_column : virt_columns)
{
BlockInputStreamPtr source_stream{
new MergeTreeThreadBlockInputStream{
i, pool, min_marks_for_read_task, max_block_size, data, use_uncompressed_cache, prewhere_actions,
prewhere_column, settings, virt_columns
}
};
if (i == 0)
/// Выставим приблизительное количество строк только для первого источника
static_cast<IProfilingBlockInputStream &>(*source_stream).setTotalRowsApprox(total_rows);
to_merge.push_back(new ExpressionBlockInputStream(source_stream, data.getPrimaryExpression()));
if (virt_column == "_part")
source_stream = new AddingConstColumnBlockInputStream<String>(
source_stream, new DataTypeString, part.data_part->name, "_part");
else if (virt_column == "_part_index")
source_stream = new AddingConstColumnBlockInputStream<UInt64>(
source_stream, new DataTypeUInt64, part.part_index_in_query, "_part_index");
}
}
else
{
for (size_t part_index = 0; part_index < parts.size(); ++part_index)
{
RangesInDataPart & part = parts[part_index];
BlockInputStreamPtr source_stream = new MergeTreeBlockInputStream(
data.getFullPath() + part.data_part->name + '/', max_block_size, column_names, data,
part.data_part, part.ranges, use_uncompressed_cache,
prewhere_actions, prewhere_column, true, settings.min_bytes_to_use_direct_io, settings.max_read_buffer_size, true);
for (const String & virt_column : virt_columns)
{
if (virt_column == "_part")
source_stream = new AddingConstColumnBlockInputStream<String>(
source_stream, new DataTypeString, part.data_part->name, "_part");
else if (virt_column == "_part_index")
source_stream = new AddingConstColumnBlockInputStream<UInt64>(
source_stream, new DataTypeUInt64, part.part_index_in_query, "_part_index");
}
to_merge.push_back(new ExpressionBlockInputStream(source_stream, data.getPrimaryExpression()));
}
to_merge.emplace_back(new ExpressionBlockInputStream(source_stream, data.getPrimaryExpression()));
}
BlockInputStreams res;
@ -807,7 +776,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreadsFinal
createPositiveSignCondition(sign_filter_expression, sign_filter_column, context);
res.push_back(new FilterBlockInputStream(to_merge[0], sign_filter_expression, sign_filter_column));
res.emplace_back(new FilterBlockInputStream(to_merge[0], sign_filter_expression, sign_filter_column));
}
else
res = to_merge;
@ -844,7 +813,7 @@ BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreadsFinal
throw Exception("UnsortedMergeTree doesn't support FINAL", ErrorCodes::LOGICAL_ERROR);
}
res.push_back(merged);
res.emplace_back(merged);
}
return res;

View File

@ -0,0 +1,48 @@
0 2000-01-01 1 Hello 0
0 2000-01-01 1 Hello 0
0 2000-01-01 1 Hello 0
1 2000-01-01 1 World 0
0 2000-01-01 1 Hello 0
0 2000-01-01 1 World 0
0 2000-01-01 1 World 0
0 2000-01-01 1 World 0
1 2000-01-01 1 Hello 10
1 2000-01-01 1 Hello 10
2000-01-01 1 Hello 10
2000-01-01 2 abc 1
0 2000-01-01 1 Hello 10
0 2000-01-01 2 def 1
0 2000-01-01 1 Hello 10
0 2000-01-01 2 def 1
0 2000-01-01 1 Hello 0
0 2000-01-01 1 Hello 0
0 2000-01-01 1 Hello 0
1 2000-01-01 1 World 0
0 2000-01-01 1 Hello 0
0 2000-01-01 1 World 0
0 2000-01-01 1 World 0
0 2000-01-01 1 World 0
1 2000-01-01 1 Hello 10
1 2000-01-01 1 Hello 10
2000-01-01 1 Hello 10
2000-01-01 2 abc 1
0 2000-01-01 1 Hello 10
0 2000-01-01 2 def 1
0 2000-01-01 1 Hello 10
0 2000-01-01 2 def 1
0 2000-01-01 1 Hello
0 2000-01-01 1 Hello
0 2000-01-01 1 Hello
1 2000-01-01 1 World
0 2000-01-01 1 Hello
0 2000-01-01 1 World
0 2000-01-01 1 World
0 2000-01-01 1 World
1 2000-01-01 1 Hello
0 2000-01-01 1 World
2000-01-01 1 Hello!
2000-01-01 2 ghi
0 2000-01-01 1 Hello!
0 2000-01-01 2 ghi
0 2000-01-01 1 Hello!
0 2000-01-01 2 ghi

View File

@ -0,0 +1,100 @@
DROP TABLE IF EXISTS test.replacing;
CREATE TABLE test.replacing (d Date, k UInt64, s String, v UInt16) ENGINE = ReplacingMergeTree(d, k, 8192, v);
INSERT INTO test.replacing VALUES ('2000-01-01', 1, 'Hello', 0);
SELECT _part_index, * FROM test.replacing ORDER BY k, v, _part_index;
SELECT _part_index, * FROM test.replacing FINAL ORDER BY k, v, _part_index;
INSERT INTO test.replacing VALUES ('2000-01-01', 1, 'World', 0);
SELECT _part_index, * FROM test.replacing ORDER BY k, v, _part_index;
SELECT _part_index, * FROM test.replacing FINAL ORDER BY k, v, _part_index;
OPTIMIZE TABLE test.replacing;
SELECT _part_index, * FROM test.replacing ORDER BY k, v, _part_index;
SELECT _part_index, * FROM test.replacing FINAL ORDER BY k, v, _part_index;
INSERT INTO test.replacing VALUES ('2000-01-01', 1, 'Hello', 10);
SELECT _part_index, * FROM test.replacing ORDER BY k, v, _part_index;
SELECT _part_index, * FROM test.replacing FINAL ORDER BY k, v, _part_index;
INSERT INTO test.replacing VALUES ('2000-01-01', 1, 'Hello!', 9);
INSERT INTO test.replacing VALUES ('2000-01-01', 2, 'abc', 1);
INSERT INTO test.replacing VALUES ('2000-01-01', 2, 'def', 1);
INSERT INTO test.replacing VALUES ('2000-01-01', 2, 'ghi', 0);
SELECT * FROM test.replacing FINAL ORDER BY k, v, _part_index;
OPTIMIZE TABLE test.replacing;
OPTIMIZE TABLE test.replacing;
OPTIMIZE TABLE test.replacing;
SELECT _part_index, * FROM test.replacing ORDER BY k, v, _part_index;
SELECT _part_index, * FROM test.replacing FINAL ORDER BY k, v, _part_index;
DROP TABLE test.replacing;
DROP TABLE IF EXISTS test.replacing;
CREATE TABLE test.replacing (d Date, k UInt64, s String, v UInt16) ENGINE = ReplacingMergeTree(d, k, 1, v);
INSERT INTO test.replacing VALUES ('2000-01-01', 1, 'Hello', 0);
SELECT _part_index, * FROM test.replacing ORDER BY k, v, _part_index;
SELECT _part_index, * FROM test.replacing FINAL ORDER BY k, v, _part_index;
INSERT INTO test.replacing VALUES ('2000-01-01', 1, 'World', 0);
SELECT _part_index, * FROM test.replacing ORDER BY k, v, _part_index;
SELECT _part_index, * FROM test.replacing FINAL ORDER BY k, v, _part_index;
OPTIMIZE TABLE test.replacing;
SELECT _part_index, * FROM test.replacing ORDER BY k, v, _part_index;
SELECT _part_index, * FROM test.replacing FINAL ORDER BY k, v, _part_index;
INSERT INTO test.replacing VALUES ('2000-01-01', 1, 'Hello', 10);
SELECT _part_index, * FROM test.replacing ORDER BY k, v, _part_index;
SELECT _part_index, * FROM test.replacing FINAL ORDER BY k, v, _part_index;
INSERT INTO test.replacing VALUES ('2000-01-01', 1, 'Hello!', 9);
INSERT INTO test.replacing VALUES ('2000-01-01', 2, 'abc', 1);
INSERT INTO test.replacing VALUES ('2000-01-01', 2, 'def', 1);
INSERT INTO test.replacing VALUES ('2000-01-01', 2, 'ghi', 0);
SELECT * FROM test.replacing FINAL ORDER BY k, v, _part_index;
OPTIMIZE TABLE test.replacing;
OPTIMIZE TABLE test.replacing;
OPTIMIZE TABLE test.replacing;
SELECT _part_index, * FROM test.replacing ORDER BY k, v, _part_index;
SELECT _part_index, * FROM test.replacing FINAL ORDER BY k, v, _part_index;
DROP TABLE test.replacing;
DROP TABLE IF EXISTS test.replacing;
CREATE TABLE test.replacing (d Date, k UInt64, s String) ENGINE = ReplacingMergeTree(d, k, 2);
INSERT INTO test.replacing VALUES ('2000-01-01', 1, 'Hello');
SELECT _part_index, * FROM test.replacing ORDER BY k, _part_index;
SELECT _part_index, * FROM test.replacing FINAL ORDER BY k, _part_index;
INSERT INTO test.replacing VALUES ('2000-01-01', 1, 'World');
SELECT _part_index, * FROM test.replacing ORDER BY k, _part_index;
SELECT _part_index, * FROM test.replacing FINAL ORDER BY k, _part_index;
OPTIMIZE TABLE test.replacing;
SELECT _part_index, * FROM test.replacing ORDER BY k, _part_index;
SELECT _part_index, * FROM test.replacing FINAL ORDER BY k, _part_index;
INSERT INTO test.replacing VALUES ('2000-01-01', 1, 'Hello');
SELECT _part_index, * FROM test.replacing ORDER BY k, _part_index;
SELECT _part_index, * FROM test.replacing FINAL ORDER BY k, _part_index;
INSERT INTO test.replacing VALUES ('2000-01-01', 1, 'Hello!');
INSERT INTO test.replacing VALUES ('2000-01-01', 2, 'abc');
INSERT INTO test.replacing VALUES ('2000-01-01', 2, 'def');
INSERT INTO test.replacing VALUES ('2000-01-01', 2, 'ghi');
SELECT * FROM test.replacing FINAL ORDER BY k, _part_index;
OPTIMIZE TABLE test.replacing;
OPTIMIZE TABLE test.replacing;
OPTIMIZE TABLE test.replacing;
SELECT _part_index, * FROM test.replacing ORDER BY k, _part_index;
SELECT _part_index, * FROM test.replacing FINAL ORDER BY k, _part_index;
DROP TABLE test.replacing;