2016-03-07 06:49:07 +00:00
|
|
|
|
/// Совместимость с clang, в котором std::numeric_limits (из libstdc++ из gcc) почему-то не специализируется для __uint128_t.
|
|
|
|
|
#if __clang__
|
|
|
|
|
#include <limits>
|
|
|
|
|
|
|
|
|
|
namespace std
|
|
|
|
|
{
|
|
|
|
|
template <>
|
|
|
|
|
struct numeric_limits<__uint128_t>
|
|
|
|
|
{
|
|
|
|
|
static constexpr bool is_specialized = true;
|
|
|
|
|
static constexpr bool is_signed = false;
|
|
|
|
|
};
|
|
|
|
|
}
|
|
|
|
|
#endif
|
|
|
|
|
|
2015-11-19 21:34:53 +00:00
|
|
|
|
#include <boost/rational.hpp> /// Для вычислений, связанных с коэффициентами сэмплирования.
|
|
|
|
|
|
2015-10-12 07:05:54 +00:00
|
|
|
|
#include <DB/Core/FieldVisitors.h>
|
2014-03-13 12:48:07 +00:00
|
|
|
|
#include <DB/Storages/MergeTree/MergeTreeDataSelectExecutor.h>
|
2015-04-12 04:39:20 +00:00
|
|
|
|
#include <DB/Storages/MergeTree/MergeTreeBlockInputStream.h>
|
2015-06-24 11:03:53 +00:00
|
|
|
|
#include <DB/Storages/MergeTree/MergeTreeReadPool.h>
|
|
|
|
|
#include <DB/Storages/MergeTree/MergeTreeThreadBlockInputStream.h>
|
2014-03-13 12:48:07 +00:00
|
|
|
|
#include <DB/Parsers/ASTIdentifier.h>
|
2015-11-19 21:34:53 +00:00
|
|
|
|
#include <DB/Parsers/ASTSampleRatio.h>
|
2014-03-13 12:48:07 +00:00
|
|
|
|
#include <DB/DataStreams/ExpressionBlockInputStream.h>
|
|
|
|
|
#include <DB/DataStreams/FilterBlockInputStream.h>
|
|
|
|
|
#include <DB/DataStreams/CollapsingFinalBlockInputStream.h>
|
2014-12-30 18:04:53 +00:00
|
|
|
|
#include <DB/DataStreams/AddingConstColumnBlockInputStream.h>
|
2015-02-15 04:16:11 +00:00
|
|
|
|
#include <DB/DataStreams/CreatingSetsBlockInputStream.h>
|
|
|
|
|
#include <DB/DataStreams/NullBlockInputStream.h>
|
2015-07-08 04:38:46 +00:00
|
|
|
|
#include <DB/DataStreams/SummingSortedBlockInputStream.h>
|
2016-04-15 19:09:42 +00:00
|
|
|
|
#include <DB/DataStreams/ReplacingSortedBlockInputStream.h>
|
2015-07-08 04:38:46 +00:00
|
|
|
|
#include <DB/DataStreams/AggregatingSortedBlockInputStream.h>
|
2014-03-13 12:48:07 +00:00
|
|
|
|
#include <DB/DataTypes/DataTypesNumberFixed.h>
|
2015-11-29 08:06:29 +00:00
|
|
|
|
#include <DB/DataTypes/DataTypeDate.h>
|
2014-07-28 10:36:11 +00:00
|
|
|
|
#include <DB/Common/VirtualColumnUtils.h>
|
2014-09-17 09:59:21 +00:00
|
|
|
|
|
2015-02-03 14:37:35 +00:00
|
|
|
|
|
2014-03-13 12:48:07 +00:00
|
|
|
|
namespace DB
|
|
|
|
|
{
|
|
|
|
|
|
2016-01-11 21:46:36 +00:00
|
|
|
|
namespace ErrorCodes
|
|
|
|
|
{
|
|
|
|
|
extern const int INDEX_NOT_USED;
|
2016-03-03 20:29:52 +00:00
|
|
|
|
extern const int SAMPLING_NOT_SUPPORTED;
|
2016-01-11 21:46:36 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
2015-02-15 02:31:48 +00:00
|
|
|
|
MergeTreeDataSelectExecutor::MergeTreeDataSelectExecutor(MergeTreeData & data_)
|
|
|
|
|
: data(data_), log(&Logger::get(data.getLogName() + " (SelectExecutor)"))
|
2014-03-13 12:48:07 +00:00
|
|
|
|
{
|
|
|
|
|
}
|
|
|
|
|
|
2015-11-18 21:37:28 +00:00
|
|
|
|
|
2014-07-28 10:36:11 +00:00
|
|
|
|
/// Построить блок состоящий только из возможных значений виртуальных столбцов
|
2016-03-05 03:17:11 +00:00
|
|
|
|
static Block getBlockWithPartColumn(const MergeTreeData::DataPartsVector & parts)
|
2014-07-28 10:36:11 +00:00
|
|
|
|
{
|
|
|
|
|
Block res;
|
2016-05-28 07:48:40 +00:00
|
|
|
|
ColumnWithTypeAndName _part(std::make_shared<ColumnString>(), std::make_shared<DataTypeString>(), "_part");
|
2014-07-28 10:36:11 +00:00
|
|
|
|
|
|
|
|
|
for (const auto & part : parts)
|
|
|
|
|
_part.column->insert(part->name);
|
|
|
|
|
|
|
|
|
|
res.insert(_part);
|
|
|
|
|
return res;
|
|
|
|
|
}
|
|
|
|
|
|
2015-11-18 21:37:28 +00:00
|
|
|
|
|
|
|
|
|
size_t MergeTreeDataSelectExecutor::getApproximateTotalRowsToRead(
|
|
|
|
|
const MergeTreeData::DataPartsVector & parts, const PKCondition & key_condition, const Settings & settings) const
|
|
|
|
|
{
|
|
|
|
|
size_t full_marks_count = 0;
|
|
|
|
|
|
|
|
|
|
/// Узнаем, сколько строк мы бы прочли без семплирования.
|
|
|
|
|
LOG_DEBUG(log, "Preliminary index scan with condition: " << key_condition.toString());
|
|
|
|
|
|
|
|
|
|
for (size_t i = 0; i < parts.size(); ++i)
|
|
|
|
|
{
|
|
|
|
|
const MergeTreeData::DataPartPtr & part = parts[i];
|
2015-11-29 11:58:44 +00:00
|
|
|
|
MarkRanges ranges = markRangesFromPKRange(part->index, key_condition, settings);
|
2015-11-18 21:37:28 +00:00
|
|
|
|
|
|
|
|
|
/** Для того, чтобы получить оценку снизу количества строк, подходящих под условие на PK,
|
|
|
|
|
* учитываем только гарантированно полные засечки.
|
|
|
|
|
* То есть, не учитываем первую и последнюю засечку, которые могут быть неполными.
|
|
|
|
|
*/
|
|
|
|
|
for (size_t j = 0; j < ranges.size(); ++j)
|
|
|
|
|
if (ranges[j].end - ranges[j].begin > 2)
|
|
|
|
|
full_marks_count += ranges[j].end - ranges[j].begin - 2;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return full_marks_count * data.index_granularity;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
2015-11-19 21:34:53 +00:00
|
|
|
|
using RelativeSize = boost::rational<ASTSampleRatio::BigNum>;
|
|
|
|
|
|
|
|
|
|
static std::ostream & operator<<(std::ostream & ostr, const RelativeSize & x)
|
|
|
|
|
{
|
|
|
|
|
ostr << ASTSampleRatio::toString(x.numerator()) << "/" << ASTSampleRatio::toString(x.denominator());
|
|
|
|
|
return ostr;
|
|
|
|
|
}
|
2015-11-18 21:37:28 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/// Переводит размер сэмпла в приблизительном количестве строк (вида SAMPLE 1000000) в относительную величину (вида SAMPLE 0.1).
|
|
|
|
|
static RelativeSize convertAbsoluteSampleSizeToRelative(const ASTPtr & node, size_t approx_total_rows)
|
|
|
|
|
{
|
|
|
|
|
if (approx_total_rows == 0)
|
|
|
|
|
return 1;
|
|
|
|
|
|
2015-11-19 21:34:53 +00:00
|
|
|
|
const ASTSampleRatio & node_sample = typeid_cast<const ASTSampleRatio &>(*node);
|
|
|
|
|
|
|
|
|
|
auto absolute_sample_size = node_sample.ratio.numerator / node_sample.ratio.denominator;
|
|
|
|
|
return std::min(RelativeSize(1), RelativeSize(absolute_sample_size) / approx_total_rows);
|
2015-11-18 21:37:28 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
2014-03-13 12:48:07 +00:00
|
|
|
|
BlockInputStreams MergeTreeDataSelectExecutor::read(
|
|
|
|
|
const Names & column_names_to_return,
|
|
|
|
|
ASTPtr query,
|
2014-12-17 11:53:17 +00:00
|
|
|
|
const Context & context,
|
2014-03-13 12:48:07 +00:00
|
|
|
|
const Settings & settings,
|
|
|
|
|
QueryProcessingStage::Enum & processed_stage,
|
2014-12-17 11:53:17 +00:00
|
|
|
|
const size_t max_block_size,
|
|
|
|
|
const unsigned threads,
|
2016-03-04 04:54:10 +00:00
|
|
|
|
size_t * inout_part_index,
|
2015-11-18 21:37:28 +00:00
|
|
|
|
Int64 max_block_number_to_read) const
|
2014-03-13 12:48:07 +00:00
|
|
|
|
{
|
2014-07-30 12:10:34 +00:00
|
|
|
|
size_t part_index_var = 0;
|
2016-03-04 04:54:10 +00:00
|
|
|
|
if (!inout_part_index)
|
|
|
|
|
inout_part_index = &part_index_var;
|
2014-07-30 12:10:34 +00:00
|
|
|
|
|
2014-09-19 11:44:29 +00:00
|
|
|
|
MergeTreeData::DataPartsVector parts = data.getDataPartsVector();
|
2014-07-28 10:36:11 +00:00
|
|
|
|
|
2016-03-04 04:54:10 +00:00
|
|
|
|
/// Если в запросе есть ограничения на виртуальный столбец _part или _part_index, выберем только подходящие под него куски.
|
|
|
|
|
/// В запросе может быть запрошен виртуальный столбец _sample_factor - 1 / использованный коэффициент сэмплирования.
|
|
|
|
|
Names virt_column_names;
|
|
|
|
|
Names real_column_names;
|
|
|
|
|
|
2016-03-05 03:17:11 +00:00
|
|
|
|
bool part_column_queried = false;
|
|
|
|
|
|
2016-03-04 04:54:10 +00:00
|
|
|
|
bool sample_factor_column_queried = false;
|
|
|
|
|
Float64 used_sample_factor = 1;
|
|
|
|
|
|
2014-07-30 12:10:34 +00:00
|
|
|
|
for (const String & name : column_names_to_return)
|
2016-03-04 04:54:10 +00:00
|
|
|
|
{
|
2016-03-05 03:17:11 +00:00
|
|
|
|
if (name == "_part")
|
|
|
|
|
{
|
|
|
|
|
part_column_queried = true;
|
|
|
|
|
virt_column_names.push_back(name);
|
|
|
|
|
}
|
|
|
|
|
else if (name == "_part_index")
|
2016-03-04 04:54:10 +00:00
|
|
|
|
{
|
|
|
|
|
virt_column_names.push_back(name);
|
|
|
|
|
}
|
|
|
|
|
else if (name == "_sample_factor")
|
|
|
|
|
{
|
|
|
|
|
sample_factor_column_queried = true;
|
2014-07-30 12:10:34 +00:00
|
|
|
|
virt_column_names.push_back(name);
|
2016-03-04 04:54:10 +00:00
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
real_column_names.push_back(name);
|
|
|
|
|
}
|
|
|
|
|
}
|
2014-07-28 10:36:11 +00:00
|
|
|
|
|
2016-03-05 03:17:11 +00:00
|
|
|
|
NamesAndTypesList available_real_columns = data.getColumnsList();
|
|
|
|
|
|
|
|
|
|
NamesAndTypesList available_real_and_virtual_columns = available_real_columns;
|
|
|
|
|
for (const auto & name : virt_column_names)
|
|
|
|
|
available_real_and_virtual_columns.emplace_back(data.getColumn(name));
|
|
|
|
|
|
2014-07-30 12:22:12 +00:00
|
|
|
|
/// Если в запросе только виртуальные столбцы, надо запросить хотя бы один любой другой.
|
2014-10-03 09:17:06 +00:00
|
|
|
|
if (real_column_names.empty())
|
2016-03-05 03:17:11 +00:00
|
|
|
|
real_column_names.push_back(ExpressionActions::getSmallestColumn(available_real_columns));
|
2014-07-28 10:36:11 +00:00
|
|
|
|
|
2016-03-05 03:17:11 +00:00
|
|
|
|
/// Если запрошен виртуальный столбец _part, пробуем использовать его в качестве индекса.
|
|
|
|
|
Block virtual_columns_block = getBlockWithPartColumn(parts);
|
|
|
|
|
if (part_column_queried)
|
2014-12-17 11:53:17 +00:00
|
|
|
|
VirtualColumnUtils::filterBlockWithQuery(query, virtual_columns_block, context);
|
2014-07-28 10:36:11 +00:00
|
|
|
|
|
2016-03-05 03:17:11 +00:00
|
|
|
|
std::multiset<String> part_values = VirtualColumnUtils::extractSingleValueFromBlock<String>(virtual_columns_block, "_part");
|
2014-07-28 10:36:11 +00:00
|
|
|
|
|
|
|
|
|
data.check(real_column_names);
|
2014-03-13 12:48:07 +00:00
|
|
|
|
processed_stage = QueryProcessingStage::FetchColumns;
|
|
|
|
|
|
2016-04-06 00:31:22 +00:00
|
|
|
|
SortDescription sort_descr = data.getSortDescription();
|
|
|
|
|
|
|
|
|
|
PKCondition key_condition(query, context, available_real_and_virtual_columns, sort_descr);
|
|
|
|
|
PKCondition date_condition(query, context, available_real_and_virtual_columns,
|
|
|
|
|
SortDescription(1, SortColumnDescription(data.date_column_name, 1)));
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
2016-01-22 21:28:58 +00:00
|
|
|
|
if (settings.force_primary_key && key_condition.alwaysUnknownOrTrue())
|
2016-04-06 00:31:22 +00:00
|
|
|
|
{
|
|
|
|
|
std::stringstream exception_message;
|
|
|
|
|
exception_message << "Primary key (";
|
|
|
|
|
for (size_t i = 0, size = sort_descr.size(); i < size; ++i)
|
|
|
|
|
exception_message << (i == 0 ? "" : ", ") << sort_descr[i].column_name;
|
|
|
|
|
exception_message << ") is not used and setting 'force_primary_key' is set.";
|
|
|
|
|
|
|
|
|
|
throw Exception(exception_message.str(), ErrorCodes::INDEX_NOT_USED);
|
|
|
|
|
}
|
2015-11-29 08:06:29 +00:00
|
|
|
|
|
2016-01-22 21:28:58 +00:00
|
|
|
|
if (settings.force_index_by_date && date_condition.alwaysUnknownOrTrue())
|
2016-04-06 00:31:22 +00:00
|
|
|
|
throw Exception("Index by date (" + data.date_column_name + ") is not used and setting 'force_index_by_date' is set.",
|
|
|
|
|
ErrorCodes::INDEX_NOT_USED);
|
2015-03-14 01:20:10 +00:00
|
|
|
|
|
2015-09-20 11:54:58 +00:00
|
|
|
|
/// Выберем куски, в которых могут быть данные, удовлетворяющие date_condition, и которые подходят под условие на _part,
|
|
|
|
|
/// а также max_block_number_to_read.
|
2014-03-13 12:48:07 +00:00
|
|
|
|
{
|
2016-05-28 07:48:40 +00:00
|
|
|
|
const DataTypes data_types_date { std::make_shared<DataTypeDate>() };
|
2015-11-29 08:06:29 +00:00
|
|
|
|
|
2014-07-28 10:36:11 +00:00
|
|
|
|
auto prev_parts = parts;
|
|
|
|
|
parts.clear();
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
2014-07-28 10:36:11 +00:00
|
|
|
|
for (const auto & part : prev_parts)
|
2014-03-13 12:48:07 +00:00
|
|
|
|
{
|
2016-03-05 03:17:11 +00:00
|
|
|
|
if (part_values.find(part->name) == part_values.end())
|
2014-07-28 10:36:11 +00:00
|
|
|
|
continue;
|
|
|
|
|
|
|
|
|
|
Field left = static_cast<UInt64>(part->left_date);
|
|
|
|
|
Field right = static_cast<UInt64>(part->right_date);
|
|
|
|
|
|
2016-03-27 11:37:25 +00:00
|
|
|
|
if (!date_condition.mayBeTrueInRange(1, &left, &right, data_types_date))
|
2014-07-28 10:36:11 +00:00
|
|
|
|
continue;
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
2015-09-20 12:25:10 +00:00
|
|
|
|
if (max_block_number_to_read && part->right > max_block_number_to_read)
|
2015-09-20 11:54:58 +00:00
|
|
|
|
continue;
|
|
|
|
|
|
2014-07-28 10:36:11 +00:00
|
|
|
|
parts.push_back(part);
|
2014-03-13 12:48:07 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Семплирование.
|
2014-07-28 10:36:11 +00:00
|
|
|
|
Names column_names_to_read = real_column_names;
|
2016-05-28 10:35:44 +00:00
|
|
|
|
using ASTFunctionPtr = Poco::SharedPtr<ASTFunction>;
|
2014-03-13 12:48:07 +00:00
|
|
|
|
ASTFunctionPtr filter_function;
|
|
|
|
|
ExpressionActionsPtr filter_expression;
|
2015-11-18 21:37:28 +00:00
|
|
|
|
|
|
|
|
|
RelativeSize relative_sample_size = 0;
|
|
|
|
|
RelativeSize relative_sample_offset = 0;
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
2015-05-20 11:58:21 +00:00
|
|
|
|
ASTSelectQuery & select = *typeid_cast<ASTSelectQuery*>(&*query);
|
|
|
|
|
|
2014-03-13 12:48:07 +00:00
|
|
|
|
if (select.sample_size)
|
|
|
|
|
{
|
2015-11-19 21:34:53 +00:00
|
|
|
|
relative_sample_size.assign(
|
|
|
|
|
typeid_cast<const ASTSampleRatio &>(*select.sample_size).ratio.numerator,
|
|
|
|
|
typeid_cast<const ASTSampleRatio &>(*select.sample_size).ratio.denominator);
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
2014-10-23 19:16:43 +00:00
|
|
|
|
if (relative_sample_size < 0)
|
2014-03-13 12:48:07 +00:00
|
|
|
|
throw Exception("Negative sample size", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
|
|
|
|
|
|
2015-11-18 21:37:28 +00:00
|
|
|
|
relative_sample_offset = 0;
|
|
|
|
|
if (select.sample_offset)
|
2015-11-19 21:34:53 +00:00
|
|
|
|
relative_sample_offset.assign(
|
|
|
|
|
typeid_cast<const ASTSampleRatio &>(*select.sample_offset).ratio.numerator,
|
|
|
|
|
typeid_cast<const ASTSampleRatio &>(*select.sample_offset).ratio.denominator);
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
2015-11-18 21:37:28 +00:00
|
|
|
|
if (relative_sample_offset < 0)
|
|
|
|
|
throw Exception("Negative sample offset", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
2015-11-18 21:37:28 +00:00
|
|
|
|
/// Переводим абсолютную величину сэмплирования (вида SAMPLE 1000000 - сколько строк прочитать) в относительную (какую долю данных читать).
|
|
|
|
|
size_t approx_total_rows = 0;
|
|
|
|
|
if (relative_sample_size > 1 || relative_sample_offset > 1)
|
|
|
|
|
approx_total_rows = getApproximateTotalRowsToRead(parts, key_condition, settings);
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
2015-11-18 21:37:28 +00:00
|
|
|
|
if (relative_sample_size > 1)
|
|
|
|
|
{
|
|
|
|
|
relative_sample_size = convertAbsoluteSampleSizeToRelative(select.sample_size, approx_total_rows);
|
2014-10-23 19:16:43 +00:00
|
|
|
|
LOG_DEBUG(log, "Selected relative sample size: " << relative_sample_size);
|
2014-03-13 12:48:07 +00:00
|
|
|
|
}
|
|
|
|
|
|
2014-10-23 19:16:43 +00:00
|
|
|
|
/// SAMPLE 1 - то же, что и отсутствие SAMPLE.
|
|
|
|
|
if (relative_sample_size == 1)
|
|
|
|
|
relative_sample_size = 0;
|
|
|
|
|
|
2015-11-18 21:37:28 +00:00
|
|
|
|
if (relative_sample_offset > 0 && 0 == relative_sample_size)
|
|
|
|
|
throw Exception("Sampling offset is incorrect because no sampling", ErrorCodes::ARGUMENT_OUT_OF_BOUND);
|
2015-02-03 12:33:51 +00:00
|
|
|
|
|
2015-11-18 21:37:28 +00:00
|
|
|
|
if (relative_sample_offset > 1)
|
|
|
|
|
{
|
|
|
|
|
relative_sample_offset = convertAbsoluteSampleSizeToRelative(select.sample_offset, approx_total_rows);
|
|
|
|
|
LOG_DEBUG(log, "Selected relative sample offset: " << relative_sample_offset);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/** Какой диапазон значений ключа сэмплирования нужно читать?
|
|
|
|
|
* Сначала во всём диапазоне ("юнивёрсум") выбераем интервал
|
|
|
|
|
* относительного размера relative_sample_size, смещённый от начала на relative_sample_offset.
|
|
|
|
|
*
|
|
|
|
|
* Пример: SAMPLE 0.4 OFFSET 0.3:
|
|
|
|
|
*
|
|
|
|
|
* [------********------]
|
|
|
|
|
* ^ - offset
|
|
|
|
|
* <------> - size
|
|
|
|
|
*
|
|
|
|
|
* Если интервал переходит через конец юнивёрсума, то срезаем его правую часть.
|
|
|
|
|
*
|
|
|
|
|
* Пример: SAMPLE 0.4 OFFSET 0.8:
|
|
|
|
|
*
|
|
|
|
|
* [----------------****]
|
|
|
|
|
* ^ - offset
|
|
|
|
|
* <------> - size
|
|
|
|
|
*
|
|
|
|
|
* Далее, если выставлены настройки parallel_replicas_count, parallel_replica_offset,
|
|
|
|
|
* то необходимо разбить полученный интервал ещё на кусочки в количестве parallel_replicas_count,
|
|
|
|
|
* и выбрать из них кусочек с номером parallel_replica_offset (от нуля).
|
|
|
|
|
*
|
|
|
|
|
* Пример: SAMPLE 0.4 OFFSET 0.3, parallel_replicas_count = 2, parallel_replica_offset = 1:
|
|
|
|
|
*
|
|
|
|
|
* [----------****------]
|
|
|
|
|
* ^ - offset
|
|
|
|
|
* <------> - size
|
|
|
|
|
* <--><--> - кусочки для разных parallel_replica_offset, выбираем второй.
|
2015-11-19 03:24:59 +00:00
|
|
|
|
*
|
|
|
|
|
* Очень важно, чтобы интервалы для разных parallel_replica_offset покрывали весь диапазон без пропусков и перекрытий.
|
|
|
|
|
* Также важно, чтобы весь юнивёрсум можно было покрыть, используя SAMPLE 0.1 OFFSET 0, ... OFFSET 0.9 и похожие десятичные дроби.
|
2015-11-18 21:37:28 +00:00
|
|
|
|
*/
|
|
|
|
|
|
|
|
|
|
bool use_sampling = relative_sample_size > 0 || settings.parallel_replicas_count > 1;
|
|
|
|
|
bool no_data = false; /// После сэмплирования ничего не остаётся.
|
|
|
|
|
|
|
|
|
|
if (use_sampling)
|
2014-10-23 19:16:43 +00:00
|
|
|
|
{
|
2016-03-03 20:29:52 +00:00
|
|
|
|
if (!data.sampling_expression)
|
|
|
|
|
throw Exception("Illegal SAMPLE: table doesn't support sampling", ErrorCodes::SAMPLING_NOT_SUPPORTED);
|
|
|
|
|
|
2016-03-04 04:54:10 +00:00
|
|
|
|
if (sample_factor_column_queried && relative_sample_size != 0)
|
|
|
|
|
used_sample_factor = 1.0 / boost::rational_cast<Float64>(relative_sample_size);
|
|
|
|
|
|
2015-11-19 03:24:59 +00:00
|
|
|
|
RelativeSize size_of_universum = 0;
|
2014-03-14 17:03:52 +00:00
|
|
|
|
DataTypePtr type = data.getPrimaryExpression()->getSampleBlock().getByName(data.sampling_expression->getColumnName()).type;
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
2015-11-19 03:24:59 +00:00
|
|
|
|
if (typeid_cast<const DataTypeUInt64 *>(type.get()))
|
|
|
|
|
size_of_universum = RelativeSize(std::numeric_limits<UInt64>::max()) + 1;
|
|
|
|
|
else if (typeid_cast<const DataTypeUInt32 *>(type.get()))
|
|
|
|
|
size_of_universum = RelativeSize(std::numeric_limits<UInt32>::max()) + 1;
|
|
|
|
|
else if (typeid_cast<const DataTypeUInt16 *>(type.get()))
|
|
|
|
|
size_of_universum = RelativeSize(std::numeric_limits<UInt16>::max()) + 1;
|
|
|
|
|
else if (typeid_cast<const DataTypeUInt8 *>(type.get()))
|
|
|
|
|
size_of_universum = RelativeSize(std::numeric_limits<UInt8>::max()) + 1;
|
2014-03-13 12:48:07 +00:00
|
|
|
|
else
|
2016-03-05 02:30:20 +00:00
|
|
|
|
throw Exception("Invalid sampling column type in storage parameters: " + type->getName() + ". Must be unsigned integer type.",
|
|
|
|
|
ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER);
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
2015-02-04 10:40:00 +00:00
|
|
|
|
if (settings.parallel_replicas_count > 1)
|
2015-02-03 12:33:51 +00:00
|
|
|
|
{
|
2015-11-20 02:29:16 +00:00
|
|
|
|
if (relative_sample_size == 0)
|
|
|
|
|
relative_sample_size = 1;
|
|
|
|
|
|
2015-11-18 21:37:28 +00:00
|
|
|
|
relative_sample_size /= settings.parallel_replicas_count;
|
|
|
|
|
relative_sample_offset += relative_sample_size * settings.parallel_replica_offset;
|
2015-02-03 12:33:51 +00:00
|
|
|
|
}
|
2015-11-18 21:37:28 +00:00
|
|
|
|
|
2015-11-19 03:24:59 +00:00
|
|
|
|
if (relative_sample_offset >= 1)
|
|
|
|
|
no_data = true;
|
2015-11-18 21:37:28 +00:00
|
|
|
|
|
|
|
|
|
/// Вычисляем полуинтервал [lower, upper) значений столбца.
|
|
|
|
|
bool has_lower_limit = false;
|
|
|
|
|
bool has_upper_limit = false;
|
|
|
|
|
|
2015-11-19 21:34:53 +00:00
|
|
|
|
RelativeSize lower_limit_rational = relative_sample_offset * size_of_universum;
|
|
|
|
|
RelativeSize upper_limit_rational = (relative_sample_offset + relative_sample_size) * size_of_universum;
|
2015-11-19 03:24:59 +00:00
|
|
|
|
|
2015-11-20 03:08:12 +00:00
|
|
|
|
UInt64 lower = boost::rational_cast<ASTSampleRatio::BigNum>(lower_limit_rational);
|
|
|
|
|
UInt64 upper = boost::rational_cast<ASTSampleRatio::BigNum>(upper_limit_rational);
|
2015-11-18 21:37:28 +00:00
|
|
|
|
|
|
|
|
|
if (lower > 0)
|
|
|
|
|
has_lower_limit = true;
|
|
|
|
|
|
2015-11-20 02:42:26 +00:00
|
|
|
|
if (upper_limit_rational < size_of_universum)
|
2015-11-18 21:37:28 +00:00
|
|
|
|
has_upper_limit = true;
|
2015-11-19 03:24:59 +00:00
|
|
|
|
|
2015-11-20 03:08:12 +00:00
|
|
|
|
/*std::cerr << std::fixed << std::setprecision(100)
|
2015-11-19 03:24:59 +00:00
|
|
|
|
<< "relative_sample_size: " << relative_sample_size << "\n"
|
|
|
|
|
<< "relative_sample_offset: " << relative_sample_offset << "\n"
|
2015-11-20 02:29:16 +00:00
|
|
|
|
<< "lower_limit_float: " << lower_limit_rational << "\n"
|
|
|
|
|
<< "upper_limit_float: " << upper_limit_rational << "\n"
|
2015-11-19 03:24:59 +00:00
|
|
|
|
<< "lower: " << lower << "\n"
|
|
|
|
|
<< "upper: " << upper << "\n";*/
|
|
|
|
|
|
|
|
|
|
if ((has_upper_limit && upper == 0)
|
|
|
|
|
|| (has_lower_limit && has_upper_limit && lower == upper))
|
|
|
|
|
no_data = true;
|
2015-02-03 12:33:51 +00:00
|
|
|
|
|
2015-11-18 21:37:28 +00:00
|
|
|
|
if (no_data || (!has_lower_limit && !has_upper_limit))
|
|
|
|
|
{
|
|
|
|
|
use_sampling = false;
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
/// Добавим условия, чтобы отсечь еще что-нибудь при повторном просмотре индекса и при обработке запроса.
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
2015-11-18 21:37:28 +00:00
|
|
|
|
ASTFunctionPtr lower_function;
|
|
|
|
|
ASTFunctionPtr upper_function;
|
2015-02-03 12:33:51 +00:00
|
|
|
|
|
2015-11-18 21:37:28 +00:00
|
|
|
|
if (has_lower_limit)
|
|
|
|
|
{
|
|
|
|
|
if (!key_condition.addCondition(data.sampling_expression->getColumnName(), Range::createLeftBounded(lower, true)))
|
|
|
|
|
throw Exception("Sampling column not in primary key", ErrorCodes::ILLEGAL_COLUMN);
|
2015-02-03 12:33:51 +00:00
|
|
|
|
|
2015-11-18 21:37:28 +00:00
|
|
|
|
ASTPtr args = new ASTExpressionList;
|
|
|
|
|
args->children.push_back(data.sampling_expression);
|
|
|
|
|
args->children.push_back(new ASTLiteral(StringRange(), lower));
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
2015-11-18 21:37:28 +00:00
|
|
|
|
lower_function = new ASTFunction;
|
|
|
|
|
lower_function->name = "greaterOrEquals";
|
|
|
|
|
lower_function->arguments = args;
|
|
|
|
|
lower_function->children.push_back(lower_function->arguments);
|
2015-02-04 10:27:06 +00:00
|
|
|
|
|
2015-11-18 21:37:28 +00:00
|
|
|
|
filter_function = lower_function;
|
|
|
|
|
}
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
2015-11-18 21:37:28 +00:00
|
|
|
|
if (has_upper_limit)
|
|
|
|
|
{
|
|
|
|
|
if (!key_condition.addCondition(data.sampling_expression->getColumnName(), Range::createRightBounded(upper, false)))
|
|
|
|
|
throw Exception("Sampling column not in primary key", ErrorCodes::ILLEGAL_COLUMN);
|
2015-02-04 10:27:06 +00:00
|
|
|
|
|
2015-11-18 21:37:28 +00:00
|
|
|
|
ASTPtr args = new ASTExpressionList;
|
|
|
|
|
args->children.push_back(data.sampling_expression);
|
|
|
|
|
args->children.push_back(new ASTLiteral(StringRange(), upper));
|
2015-02-04 10:27:06 +00:00
|
|
|
|
|
2015-11-18 21:37:28 +00:00
|
|
|
|
upper_function = new ASTFunction;
|
|
|
|
|
upper_function->name = "less";
|
|
|
|
|
upper_function->arguments = args;
|
|
|
|
|
upper_function->children.push_back(upper_function->arguments);
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
2015-11-18 21:37:28 +00:00
|
|
|
|
filter_function = upper_function;
|
|
|
|
|
}
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
2015-11-18 21:37:28 +00:00
|
|
|
|
if (has_lower_limit && has_upper_limit)
|
|
|
|
|
{
|
|
|
|
|
ASTPtr args = new ASTExpressionList;
|
|
|
|
|
args->children.push_back(lower_function);
|
|
|
|
|
args->children.push_back(upper_function);
|
|
|
|
|
|
|
|
|
|
filter_function = new ASTFunction;
|
|
|
|
|
filter_function->name = "and";
|
|
|
|
|
filter_function->arguments = args;
|
|
|
|
|
filter_function->children.push_back(filter_function->arguments);
|
|
|
|
|
}
|
|
|
|
|
|
2016-03-05 03:17:11 +00:00
|
|
|
|
filter_expression = ExpressionAnalyzer(filter_function, context, nullptr, available_real_columns).getActions(false);
|
2015-11-18 21:37:28 +00:00
|
|
|
|
|
|
|
|
|
/// Добавим столбцы, нужные для sampling_expression.
|
|
|
|
|
std::vector<String> add_columns = filter_expression->getRequiredColumns();
|
|
|
|
|
column_names_to_read.insert(column_names_to_read.end(), add_columns.begin(), add_columns.end());
|
|
|
|
|
std::sort(column_names_to_read.begin(), column_names_to_read.end());
|
|
|
|
|
column_names_to_read.erase(std::unique(column_names_to_read.begin(), column_names_to_read.end()), column_names_to_read.end());
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (no_data)
|
|
|
|
|
{
|
|
|
|
|
LOG_DEBUG(log, "Sampling yields no data.");
|
|
|
|
|
return {};
|
2014-03-13 12:48:07 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
LOG_DEBUG(log, "Key condition: " << key_condition.toString());
|
|
|
|
|
LOG_DEBUG(log, "Date condition: " << date_condition.toString());
|
|
|
|
|
|
|
|
|
|
/// PREWHERE
|
|
|
|
|
ExpressionActionsPtr prewhere_actions;
|
|
|
|
|
String prewhere_column;
|
|
|
|
|
if (select.prewhere_expression)
|
|
|
|
|
{
|
2016-03-05 03:17:11 +00:00
|
|
|
|
ExpressionAnalyzer analyzer(select.prewhere_expression, context, nullptr, available_real_columns);
|
2014-03-13 12:48:07 +00:00
|
|
|
|
prewhere_actions = analyzer.getActions(false);
|
|
|
|
|
prewhere_column = select.prewhere_expression->getColumnName();
|
2015-02-15 04:16:11 +00:00
|
|
|
|
SubqueriesForSets prewhere_subqueries = analyzer.getSubqueriesForSets();
|
|
|
|
|
|
|
|
|
|
/** Вычислим подзапросы прямо сейчас.
|
|
|
|
|
* NOTE Недостаток - эти вычисления не вписываются в конвейер выполнения запроса.
|
|
|
|
|
* Они делаются до начала выполнения конвейера; их нельзя прервать; во время вычислений не отправляются пакеты прогресса.
|
|
|
|
|
*/
|
|
|
|
|
if (!prewhere_subqueries.empty())
|
2016-05-28 12:22:22 +00:00
|
|
|
|
CreatingSetsBlockInputStream(std::make_shared<NullBlockInputStream>(), prewhere_subqueries, settings.limits).read();
|
2014-03-13 12:48:07 +00:00
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
RangesInDataParts parts_with_ranges;
|
|
|
|
|
|
|
|
|
|
/// Найдем, какой диапазон читать из каждого куска.
|
2015-02-03 12:33:51 +00:00
|
|
|
|
size_t sum_marks = 0;
|
|
|
|
|
size_t sum_ranges = 0;
|
2014-10-18 21:46:05 +00:00
|
|
|
|
for (auto & part : parts)
|
2014-03-13 12:48:07 +00:00
|
|
|
|
{
|
2016-03-04 04:54:10 +00:00
|
|
|
|
RangesInDataPart ranges(part, (*inout_part_index)++);
|
2015-03-14 02:37:53 +00:00
|
|
|
|
|
2016-04-15 17:42:51 +00:00
|
|
|
|
if (data.merging_params.mode != MergeTreeData::MergingParams::Unsorted)
|
2015-11-29 11:58:44 +00:00
|
|
|
|
ranges.ranges = markRangesFromPKRange(part->index, key_condition, settings);
|
2015-03-14 02:37:53 +00:00
|
|
|
|
else
|
|
|
|
|
ranges.ranges = MarkRanges{MarkRange{0, part->size}};
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
|
|
|
|
if (!ranges.ranges.empty())
|
2015-01-21 11:17:18 +00:00
|
|
|
|
{
|
2015-02-03 12:33:51 +00:00
|
|
|
|
parts_with_ranges.push_back(ranges);
|
2015-01-22 14:22:59 +00:00
|
|
|
|
|
2015-02-03 12:33:51 +00:00
|
|
|
|
sum_ranges += ranges.ranges.size();
|
|
|
|
|
for (const auto & range : ranges.ranges)
|
|
|
|
|
sum_marks += range.end - range.begin;
|
2015-01-21 12:24:29 +00:00
|
|
|
|
}
|
2015-01-22 14:22:59 +00:00
|
|
|
|
}
|
|
|
|
|
|
2015-01-21 14:35:49 +00:00
|
|
|
|
LOG_DEBUG(log, "Selected " << parts.size() << " parts by date, " << parts_with_ranges.size() << " parts by key, "
|
2015-03-14 02:36:39 +00:00
|
|
|
|
<< sum_marks << " marks to read from " << sum_ranges << " ranges");
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
2015-09-16 04:18:16 +00:00
|
|
|
|
if (parts_with_ranges.empty())
|
|
|
|
|
return {};
|
|
|
|
|
|
2016-03-27 11:37:25 +00:00
|
|
|
|
ProfileEvents::increment(ProfileEvents::SelectedParts, parts_with_ranges.size());
|
|
|
|
|
ProfileEvents::increment(ProfileEvents::SelectedRanges, sum_ranges);
|
|
|
|
|
ProfileEvents::increment(ProfileEvents::SelectedMarks, sum_marks);
|
|
|
|
|
|
2014-03-13 12:48:07 +00:00
|
|
|
|
BlockInputStreams res;
|
|
|
|
|
|
|
|
|
|
if (select.final)
|
|
|
|
|
{
|
|
|
|
|
/// Добавим столбцы, нужные для вычисления первичного ключа и знака.
|
2014-03-14 17:03:52 +00:00
|
|
|
|
std::vector<String> add_columns = data.getPrimaryExpression()->getRequiredColumns();
|
2014-03-13 12:48:07 +00:00
|
|
|
|
column_names_to_read.insert(column_names_to_read.end(), add_columns.begin(), add_columns.end());
|
2015-07-08 04:38:46 +00:00
|
|
|
|
|
2016-04-15 17:13:51 +00:00
|
|
|
|
if (!data.merging_params.sign_column.empty())
|
|
|
|
|
column_names_to_read.push_back(data.merging_params.sign_column);
|
2016-04-15 20:11:03 +00:00
|
|
|
|
if (!data.merging_params.version_column.empty())
|
|
|
|
|
column_names_to_read.push_back(data.merging_params.version_column);
|
2015-07-08 04:38:46 +00:00
|
|
|
|
|
2014-03-13 12:48:07 +00:00
|
|
|
|
std::sort(column_names_to_read.begin(), column_names_to_read.end());
|
|
|
|
|
column_names_to_read.erase(std::unique(column_names_to_read.begin(), column_names_to_read.end()), column_names_to_read.end());
|
|
|
|
|
|
|
|
|
|
res = spreadMarkRangesAmongThreadsFinal(
|
2015-01-21 14:35:49 +00:00
|
|
|
|
parts_with_ranges,
|
2014-03-13 12:48:07 +00:00
|
|
|
|
threads,
|
|
|
|
|
column_names_to_read,
|
|
|
|
|
max_block_size,
|
|
|
|
|
settings.use_uncompressed_cache,
|
|
|
|
|
prewhere_actions,
|
2014-07-28 10:36:11 +00:00
|
|
|
|
prewhere_column,
|
2015-02-15 02:31:48 +00:00
|
|
|
|
virt_column_names,
|
2015-07-15 04:50:48 +00:00
|
|
|
|
settings,
|
|
|
|
|
context);
|
2014-03-13 12:48:07 +00:00
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
res = spreadMarkRangesAmongThreads(
|
2015-01-21 14:35:49 +00:00
|
|
|
|
parts_with_ranges,
|
2014-03-13 12:48:07 +00:00
|
|
|
|
threads,
|
|
|
|
|
column_names_to_read,
|
|
|
|
|
max_block_size,
|
|
|
|
|
settings.use_uncompressed_cache,
|
|
|
|
|
prewhere_actions,
|
2014-07-28 10:36:11 +00:00
|
|
|
|
prewhere_column,
|
2015-02-15 02:31:48 +00:00
|
|
|
|
virt_column_names,
|
|
|
|
|
settings);
|
2014-03-13 12:48:07 +00:00
|
|
|
|
}
|
|
|
|
|
|
2015-11-18 21:37:28 +00:00
|
|
|
|
if (use_sampling)
|
2014-10-18 21:46:05 +00:00
|
|
|
|
for (auto & stream : res)
|
2016-05-28 12:22:22 +00:00
|
|
|
|
stream = std::make_shared<FilterBlockInputStream>(stream, filter_expression, filter_function->getColumnName());
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
2016-03-04 04:54:10 +00:00
|
|
|
|
/// Кстати, если делается распределённый запрос или запрос к Merge-таблице, то в столбце _sample_factor могут быть разные значения.
|
|
|
|
|
if (sample_factor_column_queried)
|
|
|
|
|
for (auto & stream : res)
|
2016-05-28 12:22:22 +00:00
|
|
|
|
stream = std::make_shared<AddingConstColumnBlockInputStream<Float64>>(
|
2016-05-28 07:48:40 +00:00
|
|
|
|
stream, std::make_shared<DataTypeFloat64>(), used_sample_factor, "_sample_factor");
|
2016-03-04 04:54:10 +00:00
|
|
|
|
|
2014-03-13 12:48:07 +00:00
|
|
|
|
return res;
|
|
|
|
|
}
|
|
|
|
|
|
2015-02-15 04:16:11 +00:00
|
|
|
|
|
2014-03-13 12:48:07 +00:00
|
|
|
|
BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreads(
|
|
|
|
|
RangesInDataParts parts,
|
|
|
|
|
size_t threads,
|
|
|
|
|
const Names & column_names,
|
|
|
|
|
size_t max_block_size,
|
|
|
|
|
bool use_uncompressed_cache,
|
|
|
|
|
ExpressionActionsPtr prewhere_actions,
|
2014-07-28 10:36:11 +00:00
|
|
|
|
const String & prewhere_column,
|
2015-02-15 02:31:48 +00:00
|
|
|
|
const Names & virt_columns,
|
2015-11-18 21:37:28 +00:00
|
|
|
|
const Settings & settings) const
|
2014-03-13 12:48:07 +00:00
|
|
|
|
{
|
2015-06-24 11:03:53 +00:00
|
|
|
|
const std::size_t min_marks_for_concurrent_read =
|
2015-06-11 13:04:45 +00:00
|
|
|
|
(settings.merge_tree_min_rows_for_concurrent_read + data.index_granularity - 1) / data.index_granularity;
|
2015-06-24 11:03:53 +00:00
|
|
|
|
const std::size_t max_marks_to_use_cache =
|
2015-06-11 13:04:45 +00:00
|
|
|
|
(settings.merge_tree_max_rows_to_use_cache + data.index_granularity - 1) / data.index_granularity;
|
2015-02-15 02:31:48 +00:00
|
|
|
|
|
2014-03-13 12:48:07 +00:00
|
|
|
|
/// Посчитаем засечки для каждого куска.
|
|
|
|
|
std::vector<size_t> sum_marks_in_parts(parts.size());
|
|
|
|
|
size_t sum_marks = 0;
|
|
|
|
|
for (size_t i = 0; i < parts.size(); ++i)
|
|
|
|
|
{
|
|
|
|
|
/// Пусть отрезки будут перечислены справа налево, чтобы можно было выбрасывать самый левый отрезок с помощью pop_back().
|
|
|
|
|
std::reverse(parts[i].ranges.begin(), parts[i].ranges.end());
|
|
|
|
|
|
2015-06-11 13:04:45 +00:00
|
|
|
|
for (const auto & range : parts[i].ranges)
|
2014-03-13 12:48:07 +00:00
|
|
|
|
sum_marks_in_parts[i] += range.end - range.begin;
|
2015-06-11 13:04:45 +00:00
|
|
|
|
|
2014-03-13 12:48:07 +00:00
|
|
|
|
sum_marks += sum_marks_in_parts[i];
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (sum_marks > max_marks_to_use_cache)
|
|
|
|
|
use_uncompressed_cache = false;
|
|
|
|
|
|
|
|
|
|
BlockInputStreams res;
|
|
|
|
|
|
2015-09-01 16:09:12 +00:00
|
|
|
|
if (sum_marks > 0 && settings.merge_tree_uniform_read_distribution == 1)
|
|
|
|
|
{
|
2015-10-08 20:01:09 +00:00
|
|
|
|
/// Уменьшим количество потоков, если данных мало.
|
|
|
|
|
if (sum_marks < threads * min_marks_for_concurrent_read && parts.size() < threads)
|
|
|
|
|
threads = std::max((sum_marks + min_marks_for_concurrent_read - 1) / min_marks_for_concurrent_read, parts.size());
|
|
|
|
|
|
2015-09-01 16:09:12 +00:00
|
|
|
|
MergeTreeReadPoolPtr pool = std::make_shared<MergeTreeReadPool>(
|
|
|
|
|
threads, sum_marks, min_marks_for_concurrent_read, parts, data, prewhere_actions, prewhere_column, true,
|
2015-12-13 04:52:13 +00:00
|
|
|
|
column_names, MergeTreeReadPool::BackoffSettings(settings));
|
2015-09-01 16:09:12 +00:00
|
|
|
|
|
2015-09-16 12:15:25 +00:00
|
|
|
|
/// Оценим общее количество строк - для прогресс-бара.
|
|
|
|
|
const std::size_t total_rows = data.index_granularity * sum_marks;
|
|
|
|
|
LOG_TRACE(log, "Reading approx. " << total_rows << " rows");
|
|
|
|
|
|
2015-09-01 16:09:12 +00:00
|
|
|
|
for (std::size_t i = 0; i < threads; ++i)
|
2015-09-16 12:15:25 +00:00
|
|
|
|
{
|
2016-05-28 12:22:22 +00:00
|
|
|
|
res.emplace_back(std::make_shared<MergeTreeThreadBlockInputStream>(
|
2015-09-25 13:39:06 +00:00
|
|
|
|
i, pool, min_marks_for_concurrent_read, max_block_size, data, use_uncompressed_cache,
|
|
|
|
|
prewhere_actions,
|
2016-05-28 12:22:22 +00:00
|
|
|
|
prewhere_column, settings, virt_columns));
|
2015-09-01 16:09:12 +00:00
|
|
|
|
|
2015-09-16 12:15:25 +00:00
|
|
|
|
if (i == 0)
|
2016-03-05 02:30:20 +00:00
|
|
|
|
{
|
2015-09-16 12:15:25 +00:00
|
|
|
|
/// Выставим приблизительное количество строк только для первого источника
|
|
|
|
|
static_cast<IProfilingBlockInputStream &>(*res.front()).setTotalRowsApprox(total_rows);
|
2016-03-05 02:30:20 +00:00
|
|
|
|
}
|
2015-09-16 12:15:25 +00:00
|
|
|
|
}
|
2015-09-01 16:09:12 +00:00
|
|
|
|
}
|
|
|
|
|
else if (sum_marks > 0)
|
2015-06-24 12:21:43 +00:00
|
|
|
|
{
|
|
|
|
|
const size_t min_marks_per_thread = (sum_marks - 1) / threads + 1;
|
|
|
|
|
|
|
|
|
|
for (size_t i = 0; i < threads && !parts.empty(); ++i)
|
|
|
|
|
{
|
|
|
|
|
size_t need_marks = min_marks_per_thread;
|
|
|
|
|
|
|
|
|
|
/// Цикл по кускам.
|
|
|
|
|
while (need_marks > 0 && !parts.empty())
|
|
|
|
|
{
|
|
|
|
|
RangesInDataPart & part = parts.back();
|
|
|
|
|
size_t & marks_in_part = sum_marks_in_parts.back();
|
|
|
|
|
|
|
|
|
|
/// Не будем брать из куска слишком мало строк.
|
|
|
|
|
if (marks_in_part >= min_marks_for_concurrent_read &&
|
|
|
|
|
need_marks < min_marks_for_concurrent_read)
|
|
|
|
|
need_marks = min_marks_for_concurrent_read;
|
|
|
|
|
|
|
|
|
|
/// Не будем оставлять в куске слишком мало строк.
|
|
|
|
|
if (marks_in_part > need_marks &&
|
|
|
|
|
marks_in_part - need_marks < min_marks_for_concurrent_read)
|
|
|
|
|
need_marks = marks_in_part;
|
|
|
|
|
|
|
|
|
|
MarkRanges ranges_to_get_from_part;
|
|
|
|
|
|
|
|
|
|
/// Возьмем весь кусок, если он достаточно мал.
|
|
|
|
|
if (marks_in_part <= need_marks)
|
|
|
|
|
{
|
|
|
|
|
/// Восстановим порядок отрезков.
|
|
|
|
|
std::reverse(part.ranges.begin(), part.ranges.end());
|
|
|
|
|
|
|
|
|
|
ranges_to_get_from_part = part.ranges;
|
|
|
|
|
|
|
|
|
|
need_marks -= marks_in_part;
|
|
|
|
|
parts.pop_back();
|
|
|
|
|
sum_marks_in_parts.pop_back();
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
/// Цикл по отрезкам куска.
|
|
|
|
|
while (need_marks > 0)
|
|
|
|
|
{
|
|
|
|
|
if (part.ranges.empty())
|
|
|
|
|
throw Exception("Unexpected end of ranges while spreading marks among threads", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
|
|
|
|
|
MarkRange & range = part.ranges.back();
|
|
|
|
|
|
|
|
|
|
const size_t marks_in_range = range.end - range.begin;
|
|
|
|
|
const size_t marks_to_get_from_range = std::min(marks_in_range, need_marks);
|
|
|
|
|
|
|
|
|
|
ranges_to_get_from_part.emplace_back(range.begin, range.begin + marks_to_get_from_range);
|
|
|
|
|
range.begin += marks_to_get_from_range;
|
|
|
|
|
marks_in_part -= marks_to_get_from_range;
|
|
|
|
|
need_marks -= marks_to_get_from_range;
|
|
|
|
|
if (range.begin == range.end)
|
|
|
|
|
part.ranges.pop_back();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2016-05-28 12:22:22 +00:00
|
|
|
|
BlockInputStreamPtr source_stream = std::make_shared<MergeTreeBlockInputStream>(
|
2015-06-24 12:21:43 +00:00
|
|
|
|
data.getFullPath() + part.data_part->name + '/', max_block_size, column_names, data,
|
|
|
|
|
part.data_part, ranges_to_get_from_part, use_uncompressed_cache,
|
2015-12-26 00:59:09 +00:00
|
|
|
|
prewhere_actions, prewhere_column, true, settings.min_bytes_to_use_direct_io, settings.max_read_buffer_size, true);
|
2015-06-24 12:21:43 +00:00
|
|
|
|
|
|
|
|
|
res.push_back(source_stream);
|
|
|
|
|
|
|
|
|
|
for (const String & virt_column : virt_columns)
|
|
|
|
|
{
|
|
|
|
|
if (virt_column == "_part")
|
2016-05-28 12:22:22 +00:00
|
|
|
|
res.back() = std::make_shared<AddingConstColumnBlockInputStream<String>>(
|
2016-05-28 07:48:40 +00:00
|
|
|
|
res.back(), std::make_shared<DataTypeString>(), part.data_part->name, "_part");
|
2015-06-24 12:21:43 +00:00
|
|
|
|
else if (virt_column == "_part_index")
|
2016-05-28 12:22:22 +00:00
|
|
|
|
res.back() = std::make_shared<AddingConstColumnBlockInputStream<UInt64>>(
|
2016-05-28 07:48:40 +00:00
|
|
|
|
res.back(), std::make_shared<DataTypeUInt64>(), part.part_index_in_query, "_part_index");
|
2015-06-24 12:21:43 +00:00
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!parts.empty())
|
|
|
|
|
throw Exception("Couldn't spread marks among threads", ErrorCodes::LOGICAL_ERROR);
|
|
|
|
|
}
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
|
|
|
|
return res;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
BlockInputStreams MergeTreeDataSelectExecutor::spreadMarkRangesAmongThreadsFinal(
|
|
|
|
|
RangesInDataParts parts,
|
|
|
|
|
size_t threads,
|
|
|
|
|
const Names & column_names,
|
|
|
|
|
size_t max_block_size,
|
|
|
|
|
bool use_uncompressed_cache,
|
|
|
|
|
ExpressionActionsPtr prewhere_actions,
|
2014-07-28 10:36:11 +00:00
|
|
|
|
const String & prewhere_column,
|
2015-02-15 02:31:48 +00:00
|
|
|
|
const Names & virt_columns,
|
2015-07-15 04:50:48 +00:00
|
|
|
|
const Settings & settings,
|
2015-11-18 21:37:28 +00:00
|
|
|
|
const Context & context) const
|
2014-03-13 12:48:07 +00:00
|
|
|
|
{
|
2015-09-01 12:24:38 +00:00
|
|
|
|
const size_t max_marks_to_use_cache =
|
|
|
|
|
(settings.merge_tree_max_rows_to_use_cache + data.index_granularity - 1) / data.index_granularity;
|
2015-02-15 02:31:48 +00:00
|
|
|
|
|
2014-03-13 12:48:07 +00:00
|
|
|
|
size_t sum_marks = 0;
|
|
|
|
|
for (size_t i = 0; i < parts.size(); ++i)
|
|
|
|
|
for (size_t j = 0; j < parts[i].ranges.size(); ++j)
|
|
|
|
|
sum_marks += parts[i].ranges[j].end - parts[i].ranges[j].begin;
|
|
|
|
|
|
|
|
|
|
if (sum_marks > max_marks_to_use_cache)
|
|
|
|
|
use_uncompressed_cache = false;
|
|
|
|
|
|
2015-07-08 04:38:46 +00:00
|
|
|
|
BlockInputStreams to_merge;
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
2016-04-15 19:37:19 +00:00
|
|
|
|
/// NOTE merge_tree_uniform_read_distribution не используется для FINAL
|
|
|
|
|
|
|
|
|
|
for (size_t part_index = 0; part_index < parts.size(); ++part_index)
|
2014-03-13 12:48:07 +00:00
|
|
|
|
{
|
2016-04-15 19:37:19 +00:00
|
|
|
|
RangesInDataPart & part = parts[part_index];
|
2015-09-25 11:35:58 +00:00
|
|
|
|
|
2016-05-28 12:22:22 +00:00
|
|
|
|
BlockInputStreamPtr source_stream = std::make_shared<MergeTreeBlockInputStream>(
|
2016-04-15 19:37:19 +00:00
|
|
|
|
data.getFullPath() + part.data_part->name + '/', max_block_size, column_names, data,
|
|
|
|
|
part.data_part, part.ranges, use_uncompressed_cache,
|
|
|
|
|
prewhere_actions, prewhere_column, true, settings.min_bytes_to_use_direct_io, settings.max_read_buffer_size, true);
|
2015-09-01 16:09:12 +00:00
|
|
|
|
|
2016-04-15 19:37:19 +00:00
|
|
|
|
for (const String & virt_column : virt_columns)
|
2015-09-01 16:09:12 +00:00
|
|
|
|
{
|
2016-04-15 19:37:19 +00:00
|
|
|
|
if (virt_column == "_part")
|
2016-05-28 12:22:22 +00:00
|
|
|
|
source_stream = std::make_shared<AddingConstColumnBlockInputStream<String>>(
|
2016-05-28 07:48:40 +00:00
|
|
|
|
source_stream, std::make_shared<DataTypeString>(), part.data_part->name, "_part");
|
2016-04-15 19:37:19 +00:00
|
|
|
|
else if (virt_column == "_part_index")
|
2016-05-28 12:22:22 +00:00
|
|
|
|
source_stream = std::make_shared<AddingConstColumnBlockInputStream<UInt64>>(
|
2016-05-28 07:48:40 +00:00
|
|
|
|
source_stream, std::make_shared<DataTypeUInt64>(), part.part_index_in_query, "_part_index");
|
2015-09-01 16:09:12 +00:00
|
|
|
|
}
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
2016-05-28 12:22:22 +00:00
|
|
|
|
to_merge.emplace_back(std::make_shared<ExpressionBlockInputStream>(source_stream, data.getPrimaryExpression()));
|
2015-09-01 16:09:12 +00:00
|
|
|
|
}
|
2015-09-01 12:24:38 +00:00
|
|
|
|
|
2014-03-13 12:48:07 +00:00
|
|
|
|
BlockInputStreams res;
|
2015-07-08 04:38:46 +00:00
|
|
|
|
if (to_merge.size() == 1)
|
|
|
|
|
{
|
2016-04-15 17:13:51 +00:00
|
|
|
|
if (!data.merging_params.sign_column.empty())
|
2015-07-08 04:38:46 +00:00
|
|
|
|
{
|
|
|
|
|
ExpressionActionsPtr sign_filter_expression;
|
|
|
|
|
String sign_filter_column;
|
|
|
|
|
|
2015-07-15 04:50:48 +00:00
|
|
|
|
createPositiveSignCondition(sign_filter_expression, sign_filter_column, context);
|
2015-07-08 04:38:46 +00:00
|
|
|
|
|
2016-05-28 12:22:22 +00:00
|
|
|
|
res.emplace_back(std::make_shared<FilterBlockInputStream>(to_merge[0], sign_filter_expression, sign_filter_column));
|
2015-07-08 04:38:46 +00:00
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
res = to_merge;
|
|
|
|
|
}
|
|
|
|
|
else if (to_merge.size() > 1)
|
|
|
|
|
{
|
|
|
|
|
BlockInputStreamPtr merged;
|
|
|
|
|
|
2016-04-15 17:42:51 +00:00
|
|
|
|
switch (data.merging_params.mode)
|
2015-07-08 04:38:46 +00:00
|
|
|
|
{
|
2016-04-15 17:42:51 +00:00
|
|
|
|
case MergeTreeData::MergingParams::Ordinary:
|
2016-05-28 12:22:22 +00:00
|
|
|
|
merged = std::make_shared<MergingSortedBlockInputStream>(to_merge, data.getSortDescription(), max_block_size);
|
2015-09-16 04:18:52 +00:00
|
|
|
|
break;
|
2015-07-08 04:38:46 +00:00
|
|
|
|
|
2016-04-15 17:42:51 +00:00
|
|
|
|
case MergeTreeData::MergingParams::Collapsing:
|
2016-05-28 12:22:22 +00:00
|
|
|
|
merged = std::make_shared<CollapsingFinalBlockInputStream>(to_merge, data.getSortDescription(), data.merging_params.sign_column);
|
2015-07-08 04:38:46 +00:00
|
|
|
|
break;
|
|
|
|
|
|
2016-04-15 17:42:51 +00:00
|
|
|
|
case MergeTreeData::MergingParams::Summing:
|
2016-05-28 12:22:22 +00:00
|
|
|
|
merged = std::make_shared<SummingSortedBlockInputStream>(to_merge,
|
2016-04-15 17:13:51 +00:00
|
|
|
|
data.getSortDescription(), data.merging_params.columns_to_sum, max_block_size);
|
2015-07-08 04:38:46 +00:00
|
|
|
|
break;
|
|
|
|
|
|
2016-04-15 17:42:51 +00:00
|
|
|
|
case MergeTreeData::MergingParams::Aggregating:
|
2016-05-28 12:22:22 +00:00
|
|
|
|
merged = std::make_shared<AggregatingSortedBlockInputStream>(to_merge, data.getSortDescription(), max_block_size);
|
2015-07-08 04:38:46 +00:00
|
|
|
|
break;
|
|
|
|
|
|
2016-04-15 19:09:42 +00:00
|
|
|
|
case MergeTreeData::MergingParams::Replacing: /// TODO Сделать ReplacingFinalBlockInputStream
|
2016-05-28 12:22:22 +00:00
|
|
|
|
merged = std::make_shared<ReplacingSortedBlockInputStream>(to_merge,
|
2016-04-15 19:09:42 +00:00
|
|
|
|
data.getSortDescription(), data.merging_params.version_column, max_block_size);
|
2016-04-15 17:13:51 +00:00
|
|
|
|
break;
|
|
|
|
|
|
2016-04-15 17:42:51 +00:00
|
|
|
|
case MergeTreeData::MergingParams::Unsorted:
|
2015-07-08 04:38:46 +00:00
|
|
|
|
throw Exception("UnsortedMergeTree doesn't support FINAL", ErrorCodes::LOGICAL_ERROR);
|
2016-04-24 09:44:47 +00:00
|
|
|
|
|
|
|
|
|
case MergeTreeData::MergingParams::Graphite:
|
|
|
|
|
throw Exception("GraphiteMergeTree doesn't support FINAL", ErrorCodes::LOGICAL_ERROR);
|
2015-07-08 04:38:46 +00:00
|
|
|
|
}
|
|
|
|
|
|
2016-04-15 19:37:19 +00:00
|
|
|
|
res.emplace_back(merged);
|
2015-07-08 04:38:46 +00:00
|
|
|
|
}
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
|
|
|
|
return res;
|
|
|
|
|
}
|
|
|
|
|
|
2016-03-27 11:37:25 +00:00
|
|
|
|
|
2016-03-05 03:17:11 +00:00
|
|
|
|
void MergeTreeDataSelectExecutor::createPositiveSignCondition(
|
|
|
|
|
ExpressionActionsPtr & out_expression, String & out_column, const Context & context) const
|
2014-03-13 12:48:07 +00:00
|
|
|
|
{
|
|
|
|
|
ASTFunction * function = new ASTFunction;
|
|
|
|
|
ASTPtr function_ptr = function;
|
|
|
|
|
|
|
|
|
|
ASTExpressionList * arguments = new ASTExpressionList;
|
|
|
|
|
ASTPtr arguments_ptr = arguments;
|
|
|
|
|
|
|
|
|
|
ASTIdentifier * sign = new ASTIdentifier;
|
|
|
|
|
ASTPtr sign_ptr = sign;
|
|
|
|
|
|
|
|
|
|
ASTLiteral * one = new ASTLiteral;
|
|
|
|
|
ASTPtr one_ptr = one;
|
|
|
|
|
|
|
|
|
|
function->name = "equals";
|
|
|
|
|
function->arguments = arguments_ptr;
|
|
|
|
|
function->children.push_back(arguments_ptr);
|
|
|
|
|
|
|
|
|
|
arguments->children.push_back(sign_ptr);
|
|
|
|
|
arguments->children.push_back(one_ptr);
|
|
|
|
|
|
2016-04-15 17:13:51 +00:00
|
|
|
|
sign->name = data.merging_params.sign_column;
|
2014-03-13 12:48:07 +00:00
|
|
|
|
sign->kind = ASTIdentifier::Column;
|
|
|
|
|
|
|
|
|
|
one->value = Field(static_cast<Int64>(1));
|
|
|
|
|
|
2015-07-15 04:50:48 +00:00
|
|
|
|
out_expression = ExpressionAnalyzer(function_ptr, context, {}, data.getColumnsList()).getActions(false);
|
2014-03-13 12:48:07 +00:00
|
|
|
|
out_column = function->getColumnName();
|
|
|
|
|
}
|
|
|
|
|
|
2016-03-27 11:37:25 +00:00
|
|
|
|
|
2014-03-13 12:48:07 +00:00
|
|
|
|
/// Получает набор диапазонов засечек, вне которых не могут находиться ключи из заданного диапазона.
|
2015-11-29 11:58:44 +00:00
|
|
|
|
MarkRanges MergeTreeDataSelectExecutor::markRangesFromPKRange(
|
2015-11-18 21:37:28 +00:00
|
|
|
|
const MergeTreeData::DataPart::Index & index, const PKCondition & key_condition, const Settings & settings) const
|
2014-03-13 12:48:07 +00:00
|
|
|
|
{
|
2015-02-15 02:31:48 +00:00
|
|
|
|
size_t min_marks_for_seek = (settings.merge_tree_min_rows_for_seek + data.index_granularity - 1) / data.index_granularity;
|
|
|
|
|
|
2014-03-13 12:48:07 +00:00
|
|
|
|
MarkRanges res;
|
|
|
|
|
|
2016-03-27 11:37:25 +00:00
|
|
|
|
size_t used_key_size = key_condition.getMaxKeyColumn() + 1;
|
2016-02-14 05:43:03 +00:00
|
|
|
|
size_t marks_count = index.at(0).get()->size();
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
|
|
|
|
/// Если индекс не используется.
|
2016-01-22 21:28:58 +00:00
|
|
|
|
if (key_condition.alwaysUnknownOrTrue())
|
2014-03-13 12:48:07 +00:00
|
|
|
|
{
|
|
|
|
|
res.push_back(MarkRange(0, marks_count));
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
/** В стеке всегда будут находиться непересекающиеся подозрительные отрезки, самый левый наверху (back).
|
|
|
|
|
* На каждом шаге берем левый отрезок и проверяем, подходит ли он.
|
|
|
|
|
* Если подходит, разбиваем его на более мелкие и кладем их в стек. Если нет - выбрасываем его.
|
|
|
|
|
* Если отрезок уже длиной в одну засечку, добавляем его в ответ и выбрасываем.
|
|
|
|
|
*/
|
2016-03-27 11:37:25 +00:00
|
|
|
|
std::vector<MarkRange> ranges_stack{ {0, marks_count} };
|
2016-02-14 05:43:03 +00:00
|
|
|
|
|
|
|
|
|
/// NOTE Лишнее копирование объектов типа Field для передачи в PKCondition.
|
2016-03-27 11:37:25 +00:00
|
|
|
|
Row index_left(used_key_size);
|
|
|
|
|
Row index_right(used_key_size);
|
2016-02-14 05:43:03 +00:00
|
|
|
|
|
2014-03-13 12:48:07 +00:00
|
|
|
|
while (!ranges_stack.empty())
|
|
|
|
|
{
|
|
|
|
|
MarkRange range = ranges_stack.back();
|
|
|
|
|
ranges_stack.pop_back();
|
|
|
|
|
|
|
|
|
|
bool may_be_true;
|
|
|
|
|
if (range.end == marks_count)
|
2016-02-14 05:43:03 +00:00
|
|
|
|
{
|
2016-03-27 11:37:25 +00:00
|
|
|
|
for (size_t i = 0; i < used_key_size; ++i)
|
2016-02-14 05:43:03 +00:00
|
|
|
|
{
|
|
|
|
|
index_left[i] = (*index[i].get())[range.begin];
|
|
|
|
|
}
|
|
|
|
|
|
2016-03-27 11:37:25 +00:00
|
|
|
|
may_be_true = key_condition.mayBeTrueAfter(
|
|
|
|
|
used_key_size, &index_left[0], data.primary_key_data_types);
|
2016-02-14 05:43:03 +00:00
|
|
|
|
}
|
2014-03-13 12:48:07 +00:00
|
|
|
|
else
|
2016-02-14 05:43:03 +00:00
|
|
|
|
{
|
2016-03-27 11:37:25 +00:00
|
|
|
|
for (size_t i = 0; i < used_key_size; ++i)
|
2016-02-14 05:43:03 +00:00
|
|
|
|
{
|
|
|
|
|
index_left[i] = (*index[i].get())[range.begin];
|
|
|
|
|
index_right[i] = (*index[i].get())[range.end];
|
|
|
|
|
}
|
|
|
|
|
|
2016-03-27 11:37:25 +00:00
|
|
|
|
may_be_true = key_condition.mayBeTrueInRange(
|
|
|
|
|
used_key_size, &index_left[0], &index_right[0], data.primary_key_data_types);
|
2016-02-14 05:43:03 +00:00
|
|
|
|
}
|
2014-03-13 12:48:07 +00:00
|
|
|
|
|
|
|
|
|
if (!may_be_true)
|
|
|
|
|
continue;
|
|
|
|
|
|
|
|
|
|
if (range.end == range.begin + 1)
|
|
|
|
|
{
|
|
|
|
|
/// Увидели полезный промежуток между соседними засечками. Либо добавим его к последнему диапазону, либо начнем новый диапазон.
|
|
|
|
|
if (res.empty() || range.begin - res.back().end > min_marks_for_seek)
|
|
|
|
|
res.push_back(range);
|
|
|
|
|
else
|
|
|
|
|
res.back().end = range.end;
|
|
|
|
|
}
|
|
|
|
|
else
|
|
|
|
|
{
|
|
|
|
|
/// Разбиваем отрезок и кладем результат в стек справа налево.
|
2015-02-15 02:31:48 +00:00
|
|
|
|
size_t step = (range.end - range.begin - 1) / settings.merge_tree_coarse_index_granularity + 1;
|
2014-03-13 12:48:07 +00:00
|
|
|
|
size_t end;
|
|
|
|
|
|
|
|
|
|
for (end = range.end; end > range.begin + step; end -= step)
|
|
|
|
|
ranges_stack.push_back(MarkRange(end - step, end));
|
|
|
|
|
|
|
|
|
|
ranges_stack.push_back(MarkRange(range.begin, end));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return res;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|