ClickHouse/dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp

228 lines
5.6 KiB
C++
Raw Normal View History

#include <DB/Core/FieldVisitors.h>
2012-08-16 17:02:31 +00:00
#include <DB/DataStreams/CollapsingSortedBlockInputStream.h>
2016-04-13 03:56:22 +00:00
#include <DB/Columns/ColumnsNumber.h>
2012-08-16 17:02:31 +00:00
2017-03-25 20:12:56 +00:00
/// Maximum number of messages about incorrect data in the log.
#define MAX_ERROR_MESSAGES 10
2012-08-16 17:02:31 +00:00
namespace DB
{
namespace ErrorCodes
{
extern const int INCORRECT_DATA;
}
2012-08-16 17:02:31 +00:00
2012-08-22 16:30:41 +00:00
void CollapsingSortedBlockInputStream::reportIncorrectData()
{
std::stringstream s;
s << "Incorrect data: number of rows with sign = 1 (" << count_positive
<< ") differs with number of rows with sign = -1 (" << count_negative
<< ") by more than one (for key: ";
for (size_t i = 0, size = current_key.size(); i < size; ++i)
{
if (i != 0)
s << ", ";
Squashed commit of the following: commit c567d4e1fe8d54e6363e47548f1e3927cc5ee78f Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Fri Jan 6 20:35:01 2017 +0300 Style [#METR-2944]. commit 26bf3e1228e03f46c29b13edb0e3770bd453e3f1 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Fri Jan 6 20:33:11 2017 +0300 Miscellaneous [#METR-2944]. commit eb946f4c6fd4bb0e9e5c7fb1468d36be3dfca5a5 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Fri Jan 6 20:30:19 2017 +0300 Miscellaneous [#METR-2944]. commit 78c867a14744b5af2db8d37caf7804fc2057ea51 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Fri Jan 6 20:11:41 2017 +0300 Miscellaneous [#METR-2944]. commit 6604c5c83cfcedc81c8da4da026711920d5963b4 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Fri Jan 6 19:56:15 2017 +0300 Miscellaneous [#METR-2944]. commit 23fbf05c1d4bead636458ec21b05a101b1152e33 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Fri Jan 6 19:47:52 2017 +0300 Miscellaneous [#METR-2944]. commit 98772faf11a7d450d473f7fa84f8a9ae24f7b59b Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Fri Jan 6 19:46:05 2017 +0300 Miscellaneous [#METR-2944]. commit 3dc636ab9f9359dbeac2e8d997ae563d4ca147e2 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Fri Jan 6 19:39:46 2017 +0300 Miscellaneous [#METR-2944]. commit 3e16aee95482f374ee3eda1a4dbe9ba5cdce02e8 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Fri Jan 6 19:38:03 2017 +0300 Miscellaneous [#METR-2944]. commit ae7e7e90eb1f82bd0fe0f887708d08b9e7755612 Author: Alexey Milovidov <milovidov@yandex-team.ru> Date: Fri Jan 6 19:34:15 2017 +0300 Miscellaneous [#METR-2944].
2017-01-06 17:41:19 +00:00
s << applyVisitor(FieldVisitorToString(), (*current_key.columns[i])[current_key.row_num]);
}
s << ").";
2017-03-25 20:12:56 +00:00
/** Fow now we limit ourselves to just logging such situations,
* since the data is generated by external programs.
* With inconsistent data, this is an unavoidable error that can not be easily corrected by admins. Therefore Warning.
2012-08-22 16:30:41 +00:00
*/
LOG_WARNING(log, s.rdbuf());
}
2013-10-30 08:50:58 +00:00
void CollapsingSortedBlockInputStream::insertRows(ColumnPlainPtrs & merged_columns, size_t & merged_rows, bool last_in_stream)
2012-08-16 17:02:31 +00:00
{
2012-08-16 19:43:03 +00:00
if (count_positive != 0 || count_negative != 0)
2012-08-16 17:02:31 +00:00
{
2013-10-30 08:50:58 +00:00
if (count_positive == count_negative && !last_is_positive)
{
2017-03-25 20:12:56 +00:00
/// If all the rows in the input streams collapsed, we still want to give at least one block in the result.
2013-10-30 08:50:58 +00:00
if (last_in_stream && merged_rows == 0 && !blocks_written)
{
2014-04-07 19:02:45 +00:00
LOG_INFO(log, "All rows collapsed");
2013-10-30 08:50:58 +00:00
++merged_rows;
for (size_t i = 0; i < num_columns; ++i)
2016-04-13 03:56:22 +00:00
merged_columns[i]->insertFrom(*last_positive.columns[i], last_positive.row_num);
2013-10-30 08:50:58 +00:00
++merged_rows;
for (size_t i = 0; i < num_columns; ++i)
2016-04-13 03:56:22 +00:00
merged_columns[i]->insertFrom(*last_negative.columns[i], last_negative.row_num);
if (out_row_sources)
{
/// true flag value means "skip row"
out_row_sources->data()[last_positive_pos].setSkipFlag(false);
out_row_sources->data()[last_negative_pos].setSkipFlag(false);
}
2013-10-30 08:50:58 +00:00
}
return;
}
2012-08-22 16:30:41 +00:00
if (count_positive <= count_negative)
2012-08-16 20:07:13 +00:00
{
++merged_rows;
2012-08-16 17:02:31 +00:00
for (size_t i = 0; i < num_columns; ++i)
2016-04-13 03:56:22 +00:00
merged_columns[i]->insertFrom(*first_negative.columns[i], first_negative.row_num);
if (out_row_sources)
out_row_sources->data()[first_negative_pos].setSkipFlag(false);
2012-08-16 20:07:13 +00:00
}
2015-01-18 08:25:56 +00:00
2012-08-22 16:30:41 +00:00
if (count_positive >= count_negative)
2012-08-16 20:07:13 +00:00
{
++merged_rows;
2012-08-16 17:02:31 +00:00
for (size_t i = 0; i < num_columns; ++i)
2016-04-13 03:56:22 +00:00
merged_columns[i]->insertFrom(*last_positive.columns[i], last_positive.row_num);
if (out_row_sources)
out_row_sources->data()[last_positive_pos].setSkipFlag(false);
2012-08-16 20:07:13 +00:00
}
2015-01-18 08:25:56 +00:00
2012-08-16 19:47:18 +00:00
if (!(count_positive == count_negative || count_positive + 1 == count_negative || count_positive == count_negative + 1))
{
if (count_incorrect_data < MAX_ERROR_MESSAGES)
reportIncorrectData();
++count_incorrect_data;
}
2012-08-16 17:02:31 +00:00
}
}
2015-01-18 08:25:56 +00:00
2012-08-16 17:02:31 +00:00
Block CollapsingSortedBlockInputStream::readImpl()
{
2015-01-18 08:25:56 +00:00
if (finished)
2012-08-16 17:02:31 +00:00
return Block();
2015-01-18 08:25:56 +00:00
2013-05-04 04:05:15 +00:00
if (children.size() == 1)
return children[0]->read();
2012-08-16 17:02:31 +00:00
Block merged_block;
ColumnPlainPtrs merged_columns;
2015-01-18 08:25:56 +00:00
2012-08-16 17:02:31 +00:00
init(merged_block, merged_columns);
if (merged_columns.empty())
return Block();
/// Additional initialization.
2012-08-16 17:02:31 +00:00
if (first_negative.empty())
{
2016-04-13 03:56:22 +00:00
first_negative.columns.resize(num_columns);
last_negative.columns.resize(num_columns);
last_positive.columns.resize(num_columns);
2012-08-16 17:02:31 +00:00
sign_column_number = merged_block.getPositionByName(sign_column);
}
2015-01-18 08:25:56 +00:00
2013-05-28 16:56:05 +00:00
if (has_collation)
merge(merged_columns, queue_with_collation);
2013-05-28 16:56:05 +00:00
else
merge(merged_columns, queue);
2013-05-28 16:56:05 +00:00
return merged_block;
}
2012-08-16 17:02:31 +00:00
2016-04-13 03:56:22 +00:00
2013-05-28 16:56:05 +00:00
template<class TSortCursor>
void CollapsingSortedBlockInputStream::merge(ColumnPlainPtrs & merged_columns, std::priority_queue<TSortCursor> & queue)
2015-01-18 08:25:56 +00:00
{
2013-05-28 16:56:05 +00:00
size_t merged_rows = 0;
2015-01-18 08:25:56 +00:00
/// Take rows in correct order and put them into `merged_block` until the rows no more than `max_block_size`
for (; !queue.empty(); ++current_pos)
2012-08-16 17:02:31 +00:00
{
2013-05-28 16:56:05 +00:00
TSortCursor current = queue.top();
2012-08-16 17:02:31 +00:00
2016-04-13 03:56:22 +00:00
if (current_key.empty())
{
current_key.columns.resize(description.size());
next_key.columns.resize(description.size());
setPrimaryKeyRef(current_key, current);
}
Int8 sign = static_cast<const ColumnInt8 &>(*current->all_columns[sign_column_number]).getData()[current->pos];
setPrimaryKeyRef(next_key, current);
2012-08-16 17:02:31 +00:00
bool key_differs = next_key != current_key;
2017-03-25 20:12:56 +00:00
/// if there are enough rows and the last one is calculated completely
if (key_differs && merged_rows >= max_block_size)
{
++blocks_written;
return;
}
queue.pop();
/// Initially, skip all rows. On insert, unskip "corner" rows.
if (out_row_sources)
out_row_sources->emplace_back(current.impl->order, true);
if (key_differs)
2012-08-16 17:02:31 +00:00
{
/// We write data for the previous primary key.
2012-08-16 20:07:13 +00:00
insertRows(merged_columns, merged_rows);
2012-08-16 17:02:31 +00:00
2016-04-13 03:56:22 +00:00
current_key.swap(next_key);
2015-01-18 08:25:56 +00:00
2012-08-16 17:02:31 +00:00
count_negative = 0;
count_positive = 0;
}
if (sign == 1)
{
++count_positive;
2013-10-30 08:50:58 +00:00
last_is_positive = true;
2012-08-16 17:02:31 +00:00
2016-04-13 03:56:22 +00:00
setRowRef(last_positive, current);
last_positive_pos = current_pos;
2012-08-16 17:02:31 +00:00
}
else if (sign == -1)
{
if (!count_negative)
{
2016-04-13 03:56:22 +00:00
setRowRef(first_negative, current);
first_negative_pos = current_pos;
}
2013-10-30 08:50:58 +00:00
if (!blocks_written && !merged_rows)
{
2016-04-13 03:56:22 +00:00
setRowRef(last_negative, current);
last_negative_pos = current_pos;
}
2015-01-18 08:25:56 +00:00
2012-08-16 17:02:31 +00:00
++count_negative;
2013-10-30 08:50:58 +00:00
last_is_positive = false;
2012-08-16 17:02:31 +00:00
}
else
2013-06-21 20:34:19 +00:00
throw Exception("Incorrect data: Sign = " + toString(sign) + " (must be 1 or -1).",
2012-08-16 17:02:31 +00:00
ErrorCodes::INCORRECT_DATA);
if (!current->isLast())
{
current->next();
queue.push(current);
}
else
{
/// We take next block from the corresponding source, if there is one.
2013-05-28 16:56:05 +00:00
fetchNextBlock(current, queue);
2012-08-16 17:02:31 +00:00
}
}
2017-03-25 20:12:56 +00:00
/// Write data for last primary key.
2013-10-30 08:50:58 +00:00
insertRows(merged_columns, merged_rows, true);
2012-08-16 17:02:31 +00:00
2015-01-18 08:25:56 +00:00
finished = true;
2012-08-16 17:02:31 +00:00
}
}