Add test for OPTIMIZE FINAL and more diagnostics. [#CLICKHOUSE-2886]

This commit is contained in:
Vitaliy Lyudvichenko 2017-04-02 00:43:25 +03:00 committed by alexey-milovidov
parent 11218b9ba5
commit 74a92cb8d9
8 changed files with 134 additions and 41 deletions

View File

@ -23,11 +23,24 @@ ColumnGathererStream::ColumnGathererStream(const BlockInputStreams & source_stre
throw Exception("There are no streams to gather", ErrorCodes::EMPTY_DATA_PASSED);
children.assign(source_streams.begin(), source_streams.end());
}
/// Trivial case
if (children.size() == 1)
return;
String ColumnGathererStream::getID() const
{
std::stringstream res;
res << getName() << "(";
for (size_t i = 0; i < children.size(); i++)
res << (i == 0 ? "" : ", " ) << children[i]->getID();
res << ")";
return res.str();
}
void ColumnGathererStream::init()
{
sources.reserve(children.size());
for (size_t i = 0; i < children.size(); i++)
{
@ -52,24 +65,16 @@ ColumnGathererStream::ColumnGathererStream(const BlockInputStreams & source_stre
}
String ColumnGathererStream::getID() const
{
std::stringstream res;
res << getName() << "(";
for (size_t i = 0; i < children.size(); i++)
res << (i == 0 ? "" : ", " ) << children[i]->getID();
res << ")";
return res.str();
}
Block ColumnGathererStream::readImpl()
{
if (children.size() == 1)
/// Special case: single source and there are no skipped rows
if (children.size() == 1 && row_source.size() == 0)
return children[0]->read();
/// Initialize first source blocks
if (sources.empty())
init();
if (pos_global_start >= row_source.size())
return Block();
@ -154,13 +159,10 @@ void ColumnGathererStream::fetchNewBlock(Source & source, size_t source_num)
void ColumnGathererStream::readSuffixImpl()
{
if (children.size() == 1)
return;
const BlockStreamProfileInfo & profile_info = getProfileInfo();
double seconds = profile_info.total_stopwatch.elapsedSeconds();
LOG_DEBUG(log, std::fixed << std::setprecision(2)
<< "Gathered column " << column.name << " " << column.type->getName()
<< "Gathered column " << name
<< " (" << static_cast<double>(profile_info.bytes) / profile_info.rows << " bytes/elem.)"
<< " in " << seconds << " sec., "
<< profile_info.rows / seconds << " rows/sec., "

View File

@ -97,6 +97,7 @@ private:
}
};
void init();
void fetchNewBlock(Source & source, size_t source_num);
std::vector<Source> sources;

View File

@ -21,6 +21,8 @@ MergingSortedBlockInputStream::MergingSortedBlockInputStream(BlockInputStreams &
source_blocks(inputs_.size()), cursors(inputs_.size()), out_row_sources(out_row_sources_)
{
children.insert(children.end(), inputs_.begin(), inputs_.end());
if (out_row_sources)
out_row_sources->clear();
}
String MergingSortedBlockInputStream::getID() const

View File

@ -681,15 +681,23 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMerger::mergePartsToTemporaryPart
ColumnGathererStream column_gathered_stream(column_part_streams, column_name, merged_rows_sources, DEFAULT_BLOCK_SIZE);
MergedColumnOnlyOutputStream column_to(data, new_part_tmp_path, false, compression_method, offset_written);
size_t column_elems_written = 0;
column_to.writePrefix();
while ((block = column_gathered_stream.read()))
{
column_elems_written += block.rows();
column_to.write(block);
}
column_gathered_stream.readSuffix();
checksums_gathered_columns.add(column_to.writeSuffixAndGetChecksums());
if (rows_written != column_elems_written)
{
throw Exception("Written " + toString(column_elems_written) + " elements of column " + column_name +
", but " + toString(rows_written) + " rows of PK columns", ErrorCodes::LOGICAL_ERROR);
}
if (typeid_cast<const DataTypeArray *>(column_type.get()))
offset_columns_written.emplace(offset_column_name);

View File

@ -33,5 +33,3 @@ target_link_libraries (merge_selector dbms)
add_executable (merge_selector2 merge_selector2.cpp)
target_link_libraries (merge_selector2 dbms)
add_executable (row_source_bitwise_compatibility row_source_bitwise_test.cpp)

View File

@ -1,17 +1,13 @@
#include <cstdlib>
#include <gtest/gtest.h>
#include <DataStreams/ColumnGathererStream.h>
using DB::RowSourcePart;
static void check(const RowSourcePart & s, size_t num, bool flag)
{
if ((s.getSourceNum() != num || s.getSkipFlag() != flag) || (!flag && s.getData() != num))
{
printf("FAIL");
std::exit(-1);
}
EXPECT_FALSE((s.getSourceNum() != num || s.getSkipFlag() != flag) || (!flag && s.getData() != num));
}
int main(int, char **)
TEST(ColumnGathererStream, RowSourcePartBitsTest)
{
check(RowSourcePart(0, false), 0, false);
check(RowSourcePart(0, true), 0, true);
@ -20,15 +16,12 @@ int main(int, char **)
check(RowSourcePart(RowSourcePart::MAX_PARTS, false), RowSourcePart::MAX_PARTS, false);
check(RowSourcePart(RowSourcePart::MAX_PARTS, true), RowSourcePart::MAX_PARTS, true);
RowSourcePart p{80, false};
check(p, 80, false);
p.setSkipFlag(true);
check(p, 80, true);
p.setSkipFlag(false);
check(p, 80, false);
p.setSourceNum(RowSourcePart::MAX_PARTS);
check(p, RowSourcePart::MAX_PARTS, false);
printf("PASSED");
return 0;
RowSourcePart p{80, false};
check(p, 80, false);
p.setSkipFlag(true);
check(p, 80, true);
p.setSkipFlag(false);
check(p, 80, false);
p.setSourceNum(RowSourcePart::MAX_PARTS);
check(p, RowSourcePart::MAX_PARTS, false);
}

View File

@ -0,0 +1,3 @@
1500000 1500000 1500000 1500000 1500000 1500000
[['def']] [['','']]
0

View File

@ -0,0 +1,86 @@
#!/bin/bash
set -e
db="test"
table="optimize_me_finally"
name="$db.$table"
res_rows=1500000 # >= vertical_merge_algorithm_min_rows_to_activate
function get_num_parts {
clickhouse-client -q "SELECT count() FROM system.parts WHERE active AND database='$db' AND table='$table'"
}
clickhouse-client -q "DROP TABLE IF EXISTS $name"
clickhouse-client -q "CREATE TABLE $name (
date Date,
Sign Int8,
ki UInt64,
ds String,
di01 UInt64,
di02 UInt64,
di03 UInt64,
di04 UInt64,
di05 UInt64,
di06 UInt64,
di07 UInt64,
di08 UInt64,
di09 UInt64,
di10 UInt64,
n Nested(
i UInt64,
s String
)
)
ENGINE = CollapsingMergeTree(date, (date, ki), 8192, Sign)"
clickhouse-client -q "INSERT INTO $name (date, Sign, ki) SELECT
toDate(0) AS date,
toInt8(1) AS Sign,
toUInt64(0) AS ki
FROM system.numbers LIMIT 9000"
clickhouse-client -q "INSERT INTO $name (date, Sign, ki) SELECT
toDate(0) AS date,
toInt8(1) AS Sign,
number AS ki
FROM system.numbers LIMIT 9000, 9000"
clickhouse-client -q "INSERT INTO $name SELECT
toDate(0) AS date,
toInt8(1) AS Sign,
number AS ki,
hex(number) AS ds,
number AS di01,
number AS di02,
number AS di03,
number AS di04,
number AS di05,
number AS di06,
number AS di07,
number AS di08,
number AS di09,
number AS di10,
[number, number+1] AS \`n.i\`,
[hex(number), hex(number+1)] AS \`n.s\`
FROM system.numbers LIMIT $res_rows"
while [[ `get_num_parts` -ne 1 ]] ; do clickhouse-client -q "OPTIMIZE TABLE $name PARTITION 197001"; done
clickhouse-client -q "ALTER TABLE $name ADD COLUMN n.a Array(String)"
clickhouse-client -q "ALTER TABLE $name ADD COLUMN da Array(String) DEFAULT ['def']"
clickhouse-client -q "OPTIMIZE TABLE $name PARTITION 197001 FINAL"
clickhouse-client -q "ALTER TABLE $name MODIFY COLUMN n.a Array(String) DEFAULT ['zzz']"
clickhouse-client -q "ALTER TABLE $name MODIFY COLUMN da Array(String) DEFAULT ['zzz']"
clickhouse-client -q "SELECT count(), sum(Sign), sum(ki = di05), sum(hex(ki) = ds), sum(ki = n.i[1]), sum([hex(ki), hex(ki+1)] = n.s) FROM $name"
clickhouse-client -q "SELECT groupUniqArray(da), groupUniqArray(n.a) FROM $name"
hash_src=`clickhouse-client --max_threads=1 -q "SELECT cityHash64(groupArray(ki)) FROM $name"`
hash_ref=`clickhouse-client --max_threads=1 -q "SELECT cityHash64(groupArray(ki)) FROM (SELECT number as ki FROM system.numbers LIMIT $res_rows)"`
echo $(( $hash_src - $hash_ref ))
clickhouse-client -q "DROP TABLE IF EXISTS $name"