This commit is contained in:
Michael Kolupaev 2013-10-30 08:50:58 +00:00
parent 2fab08ae10
commit 732302c263
3 changed files with 102 additions and 6 deletions

View File

@ -14,7 +14,8 @@ namespace DB
* и не более одиной строки со значением столбца sign_column = 1 ("положительной строки").
* То есть - производит схлопывание записей из лога изменений.
*
* Если количество положительных и отрицательных строк совпадает - то пишет первую отрицательную и последнюю положительную строку.
* Если количество положительных и отрицательных строк совпадает, и последняя строка положительная - то пишет первую отрицательную и последнюю положительную строку.
* Если количество положительных и отрицательных строк совпадает, и последняя строка отрицательная - то ничего не пишет.
* Если положительных на 1 больше, чем отрицательных - то пишет только последнюю положительную строку.
* Если отрицательных на 1 больше, чем положительных - то пишет только первую отрицательную строку.
* Иначе - логическая ошибка.
@ -27,7 +28,7 @@ public:
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_),
sign_column(sign_column_), sign_column_number(0),
log(&Logger::get("CollapsingSortedBlockInputStream")),
count_positive(0), count_negative(0), count_incorrect_data(0)
count_positive(0), count_negative(0), count_incorrect_data(0), blocks_written(0)
{
}
@ -65,12 +66,16 @@ private:
Row first_negative; /// Первая отрицательная строка для текущего первичного ключа.
Row last_positive; /// Последняя положительная строка для текущего первичного ключа.
Row last_negative; /// Последняя отрицательная. Сорраняется только если ни одной строки в ответ еще не выписано.
size_t count_positive; /// Количество положительных строк для текущего первичного ключа.
size_t count_negative; /// Количество отрицательных строк для текущего первичного ключа.
bool last_is_positive; /// true, если последняя строка для текущего первичного ключа положительная.
size_t count_incorrect_data; /// Чтобы не писать в лог слишком много сообщений об ошибке.
size_t blocks_written;
/** Делаем поддержку двух разных курсоров - с Collation и без.
* Шаблоны используем вместо полиморфных SortCursor'ов и вызовов виртуальных функций.
*/
@ -78,7 +83,7 @@ private:
void merge(Block & merged_block, ColumnPlainPtrs & merged_columns, std::priority_queue<TSortCursor> & queue);
/// Вставить в результат строки для текущего идентификатора "визита".
void insertRows(ColumnPlainPtrs & merged_columns, size_t & merged_rows);
void insertRows(ColumnPlainPtrs & merged_columns, size_t & merged_rows, bool last_in_stream = false);
void reportIncorrectData();
};

View File

@ -32,10 +32,25 @@ void CollapsingSortedBlockInputStream::reportIncorrectData()
}
void CollapsingSortedBlockInputStream::insertRows(ColumnPlainPtrs & merged_columns, size_t & merged_rows)
void CollapsingSortedBlockInputStream::insertRows(ColumnPlainPtrs & merged_columns, size_t & merged_rows, bool last_in_stream)
{
if (count_positive != 0 || count_negative != 0)
{
if (count_positive == count_negative && !last_is_positive)
{
/// Если все строки во входных потоках схлопнулись, мы все равно хотим выдать хоть один блок в результат.
if (last_in_stream && merged_rows == 0 && !blocks_written)
{
++merged_rows;
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insert(last_positive[i]);
++merged_rows;
for (size_t i = 0; i < num_columns; ++i)
merged_columns[i]->insert(last_negative[i]);
}
return;
}
if (count_positive <= count_negative)
{
++merged_rows;
@ -79,6 +94,7 @@ Block CollapsingSortedBlockInputStream::readImpl()
if (first_negative.empty())
{
first_negative.resize(num_columns);
last_negative.resize(num_columns);
last_positive.resize(num_columns);
current_key.resize(description.size());
next_key.resize(description.size());
@ -123,6 +139,7 @@ void CollapsingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainPt
if (sign == 1)
{
++count_positive;
last_is_positive = true;
setRow(last_positive, current);
}
@ -130,8 +147,11 @@ void CollapsingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainPt
{
if (!count_negative)
setRow(first_negative, current);
if (!blocks_written && !merged_rows)
setRow(last_negative, current);
++count_negative;
last_is_positive = false;
}
else
throw Exception("Incorrect data: Sign = " + toString(sign) + " (must be 1 or -1).",
@ -149,11 +169,14 @@ void CollapsingSortedBlockInputStream::merge(Block & merged_block, ColumnPlainPt
}
if (merged_rows >= max_block_size)
return;;
{
++blocks_written;
return;
}
}
/// Запишем данные для последнего визита.
insertRows(merged_columns, merged_rows);
insertRows(merged_columns, merged_rows, true);
children.clear();
}

View File

@ -0,0 +1,68 @@
#!/bin/bash
# METR-9072
echo "DROP DATABASE IF EXISTS collapsing_test" | clickhouse-client || exit 1
echo "CREATE DATABASE collapsing_test" | clickhouse-client || exit 2
echo "CREATE TABLE collapsing_test.p0 ( d Date, k String, s Int8, v String) ENGINE = CollapsingMergeTree(d, tuple(k), 8192, s)" | clickhouse-client || exit 3
echo "CREATE TABLE collapsing_test.p1 AS collapsing_test.p0" | clickhouse-client || exit 4
echo "CREATE TABLE collapsing_test.p2 AS collapsing_test.p0" | clickhouse-client || exit 5
echo "CREATE TABLE collapsing_test.m0 AS collapsing_test.p0" | clickhouse-client || exit 9
echo "CREATE TABLE collapsing_test.m1 AS collapsing_test.p0" | clickhouse-client || exit 11
echo "('2014-01-01', 'key1', 1, 'val1')" | clickhouse-client --query="INSERT INTO collapsing_test.p0 VALUES" || exit 6
echo "('2014-01-01', 'key1', -1, 'val1'),('2014-01-01', 'key1', 1, 'val2')" | clickhouse-client --query="INSERT INTO collapsing_test.p1 VALUES" || exit 7
echo "('2014-01-01', 'key1', -1, 'val2')" | clickhouse-client --query="INSERT INTO collapsing_test.p2 VALUES" || exit 8
sudo /etc/init.d/clickhouse-server-metrika-yandex stop || exit 10
sudo -u metrika cp -r /opt/clickhouse/data/collapsing_test/{p0/20140101_20140101_1_1_0,m0/} || exit 12
sudo -u metrika cp -r /opt/clickhouse/data/collapsing_test/{p1/20140101_20140101_1_1_0,m0/20140101_20140101_2_2_0} || exit 13
sudo -u metrika cp -r /opt/clickhouse/data/collapsing_test/{p1/20140101_20140101_1_1_0,m1/20140101_20140101_2_2_0} || exit 14
sudo -u metrika cp -r /opt/clickhouse/data/collapsing_test/{p2/20140101_20140101_1_1_0,m1/20140101_20140101_3_3_0} || exit 15
rm /opt/clickhouse/data/collapsing_test/m{0,1}/increment.txt || exit 29
sudo /etc/init.d/clickhouse-server-metrika-yandex start || exit 16
sleep 10s
echo "OPTIMIZE TABLE collapsing_test.m0" | clickhouse-client || exit 17
echo "OPTIMIZE TABLE collapsing_test.m1" | clickhouse-client || exit 18
sudo /etc/init.d/clickhouse-server-metrika-yandex stop || exit 19
sudo -u metrika cp -r /opt/clickhouse/data/collapsing_test/{p0/20140101_20140101_1_1_0,m1/} || exit 20
sudo -u metrika cp -r /opt/clickhouse/data/collapsing_test/{p2/20140101_20140101_1_1_0,m0/20140101_20140101_3_3_0} || exit 21
rm /opt/clickhouse/data/collapsing_test/m{0,1}/increment.txt || exit 29
sudo /etc/init.d/clickhouse-server-metrika-yandex start || exit 22
sleep 10s
echo "OPTIMIZE TABLE collapsing_test.m0" | clickhouse-client || exit 23
echo "OPTIMIZE TABLE collapsing_test.m1" | clickhouse-client || exit 23
ls /opt/clickhouse/data/collapsing_test/m{0,1}
echo "SELECT * FROM collapsing_test.m0" | clickhouse-client || exit 24
echo
echo "SELECT * FROM collapsing_test.m1" | clickhouse-client || exit 25
echo
echo "('2014-01-01', 'key2', 1, 'val')" | clickhouse-client --query="INSERT INTO collapsing_test.m0 VALUES" || exit 33
echo "('2014-01-01', 'key2', 1, 'val')" | clickhouse-client --query="INSERT INTO collapsing_test.m1 VALUES" || exit 32
echo "OPTIMIZE TABLE collapsing_test.m0" | clickhouse-client || exit 30
echo "OPTIMIZE TABLE collapsing_test.m1" | clickhouse-client || exit 31
ls /opt/clickhouse/data/collapsing_test/m{0,1}
echo "SELECT * FROM collapsing_test.m0" | clickhouse-client | tee /tmp/t1 || exit 24
echo
echo "SELECT * FROM collapsing_test.m1" | clickhouse-client | tee /tmp/t2 || exit 25
diff -q /tmp/t{1,2}
if [ $? -ne 0 ]
then
echo 'Failed'
exit 27
else
echo 'Passed'
fi