removed changes accidentally committed in previous revision. [#METR-9091]

This commit is contained in:
Michael Kolupaev 2013-12-02 09:48:57 +00:00
parent 4d85e7b1a4
commit 71d5ea9ec1

View File

@ -20,28 +20,15 @@ public:
size_t block_size_, const Names & column_names_,
StorageMergeTree & storage_, const StorageMergeTree::DataPartPtr & owned_data_part_,
const MarkRanges & mark_ranges_, StoragePtr owned_storage, bool use_uncompressed_cache_,
ExpressionActionsPtr prewhere_actions_, String prewhere_column_)
ExpressionActionsPtr prewhere_actions, String prewhere_column)
: IProfilingBlockInputStream(owned_storage),
path(path_), block_size(block_size_), column_names(column_names_),
storage(storage_), owned_data_part(owned_data_part_),
all_mark_ranges(mark_ranges_), remaining_mark_ranges(mark_ranges_),
use_uncompressed_cache(use_uncompressed_cache_),
prewhere_actions(prewhere_actions_), prewhere_column(prewhere_column_)
use_uncompressed_cache(use_uncompressed_cache_)
{
std::reverse(remaining_mark_ranges.begin(), remaining_mark_ranges.end());
if (prewhere_actions){
pre_column_names = prewhere_actions->getRequiredColumns();
NameSet pre_name_set(pre_column_names.begin(), pre_column_names.end());
Names post_column_names;
for (size_t i = 0; i < column_names.size(); ++i)
{
if (!pre_name_set.count(column_names[i]))
post_column_names.push_back(column_names[i]);
}
column_names = post_column_names;
}
LOG_TRACE(storage.log, "Reading " << all_mark_ranges.size() << " ranges from part " << owned_data_part->name
<< ", up to " << (all_mark_ranges.back().end - all_mark_ranges.front().begin) * storage.index_granularity
<< " rows starting from " << all_mark_ranges.front().begin * storage.index_granularity);
@ -163,133 +150,23 @@ protected:
{
UncompressedCache * uncompressed_cache = use_uncompressed_cache ? storage.context.getUncompressedCache() : NULL;
reader = new MergeTreeReader(path, column_names, uncompressed_cache, storage);
if (prewhere_actions)
pre_reader = new MergeTreeReader(path, pre_column_names, uncompressed_cache, storage);
}
if (prewhere_actions)
size_t space_left = std::max(1LU, block_size / storage.index_granularity);
while (!remaining_mark_ranges.empty() && space_left)
{
do
{
size_t space_left = std::max(1LU, block_size / storage.index_granularity);
MarkRanges ranges_to_read;
while (!remaining_mark_ranges.empty() && space_left)
{
MarkRange & range = remaining_mark_ranges.back();
MarkRange & range = remaining_mark_ranges.back();
size_t marks_to_read = std::min(range.end - range.begin, space_left);
pre_reader->readRange(range.begin, range.begin + marks_to_read, res);
size_t marks_to_read = std::min(range.end - range.begin, space_left);
reader->readRange(range.begin, range.begin + marks_to_read, res);
ranges_to_read.push_back(MarkRange(range.begin, range.begin + marks_to_read));
space_left -= marks_to_read;
range.begin += marks_to_read;
if (range.begin == range.end)
remaining_mark_ranges.pop_back();
}
pre_reader->fillMissingColumns(res);
prewhere_actions->execute(res);
ColumnPtr column = res.getByName(prewhere_column).column;
/** Если фильтр - константа (например, написано PREWHERE 1),
* то либо вернём пустой блок, либо вернём блок без изменений.
*/
if (ColumnConstUInt8 * column_const = dynamic_cast<ColumnConstUInt8 *>(&*column))
{
if (!column_const->getData())
{
res.clear();
return res;
}
for (size_t i = 0; i < ranges_to_read.size(); ++i){
const MarkRange & range = ranges_to_read[i];
reader->readRange(range.begin, range.end, res);
}
}
else if (ColumnUInt8 * column_vec = dynamic_cast<ColumnUInt8 *>(&*column))
{
size_t index_granularity = storage.index_granularity;
const IColumn::Filter & pre_filter = column_vec->getData();
IColumn::Filter post_filter(pre_filter.size());
size_t pre_filter_pos = 0;
size_t post_filter_pos = 0;
for (size_t i = 0; i < ranges_to_read.size(); ++i){
const MarkRange & range = ranges_to_read[i];
size_t begin = range.begin;
size_t pre_filter_begin_pos = pre_filter_pos;
for (size_t mark = range.begin; mark <= range.end; ++mark)
{
UInt8 nonzero = 0;
if (mark != range.end)
{
for (size_t row = 0; row < index_granularity; ++row)
nonzero |= pre_filter[pre_filter_pos + row];
}
pre_filter_pos += index_granularity;
if (!nonzero)
{
if (mark > begin)
{
memcpy(
&post_filter[post_filter_pos],
&pre_filter[pre_filter_begin_pos],
pre_filter_pos - pre_filter_begin_pos);
post_filter_pos += pre_filter_pos - pre_filter_begin_pos;
reader->readRange(begin, mark, res);
}
begin = mark + 1;
pre_filter_begin_pos = pre_filter_pos;
}
}
}
for (size_t i = 0; i < res.columns(); ++i)
{
... filter with post_filter if column from columns
... replace prewhere_column with constant
... filter with pre_filter otherwise
if (i != filter_column_position)
{
ColumnWithNameAndType & current_column = res.getByPosition(i);
current_column.column = current_column.column->filter(filter);
if (current_column.column->empty())
break;
}
}
}
else
throw Exception("Illegal type " + column->getName() + " of column for filter. Must be ColumnUInt8 or ColumnConstUInt8.", ErrorCodes::ILLEGAL_TYPE_OF_COLUMN_FOR_FILTER);
reader->fillMissingColumns(res);
}
while (!remaining_mark_ranges.empty() && !res)
if (res)
reader->fillMissingColumns(res);
space_left -= marks_to_read;
range.begin += marks_to_read;
if (range.begin == range.end)
remaining_mark_ranges.pop_back();
}
else
{
size_t space_left = std::max(1LU, block_size / storage.index_granularity);
while (!remaining_mark_ranges.empty() && space_left)
{
MarkRange & range = remaining_mark_ranges.back();
size_t marks_to_read = std::min(range.end - range.begin, space_left);
reader->readRange(range.begin, range.begin + marks_to_read, res);
space_left -= marks_to_read;
range.begin += marks_to_read;
if (range.begin == range.end)
remaining_mark_ranges.pop_back();
}
reader->fillMissingColumns(res);
}
reader->fillMissingColumns(res);
if (remaining_mark_ranges.empty())
{
@ -307,7 +184,6 @@ private:
const String path;
size_t block_size;
Names column_names;
Names pre_column_names;
StorageMergeTree & storage;
const StorageMergeTree::DataPartPtr owned_data_part; /// Кусок не будет удалён, пока им владеет этот объект.
MarkRanges all_mark_ranges; /// В каких диапазонах засечек читать. В порядке возрастания номеров.
@ -315,7 +191,6 @@ private:
/// В порядке убывания номеров, чтобы можно было выбрасывать из конца.
bool use_uncompressed_cache;
Poco::SharedPtr<MergeTreeReader> reader;
Poco::SharedPtr<MergeTreeReader> pre_reader;
ExpressionActionsPtr prewhere_actions;
String prewhere_column;
};