This commit is contained in:
Andrey Mironov 2015-06-24 20:05:40 +03:00
parent aa7fe63962
commit 54d5517757
3 changed files with 29 additions and 20 deletions

View File

@ -20,14 +20,16 @@ struct MergeTreeReadTask
const NamesAndTypesList & columns;
const NamesAndTypesList & pre_columns;
const bool remove_prewhere_column;
const MarkRanges & all_ranges;
MergeTreeReadTask(const MergeTreeData::DataPartPtr & data_part, const MarkRanges & ranges,
const std::size_t part_index_in_query, const Names & ordered_names,
const NameSet & column_name_set, const NamesAndTypesList & columns,
const NamesAndTypesList & pre_columns, const bool remove_prewhere_column)
const NamesAndTypesList & pre_columns, const bool remove_prewhere_column,
const MarkRanges & all_ranges)
: data_part{data_part}, mark_ranges{ranges}, part_index_in_query{part_index_in_query},
ordered_names{ordered_names}, column_name_set{column_name_set}, columns{columns}, pre_columns{pre_columns},
remove_prewhere_column{remove_prewhere_column}
remove_prewhere_column{remove_prewhere_column}, all_ranges{all_ranges}
{}
};
@ -115,7 +117,7 @@ public:
return std::make_unique<MergeTreeReadTask>(
part.data_part, ranges_to_get_from_part, part.part_index_in_query, ordered_names, column_name_set, columns,
pre_columns, remove_prewhere_column);
pre_columns, remove_prewhere_column, per_part_all_ranges[part_id]);
}
public:
@ -124,6 +126,8 @@ public:
{
for (const auto & part : parts)
{
per_part_all_ranges.push_back(part.ranges);
per_part_columns_lock.push_back(std::make_unique<Poco::ScopedReadRWLock>(
part.data_part->columns_lock));
@ -238,6 +242,7 @@ public:
std::vector<std::unique_ptr<Poco::ScopedReadRWLock>> per_part_columns_lock;
RangesInDataParts parts;
std::vector<MarkRanges> per_part_all_ranges;
std::vector<std::size_t> per_part_sum_marks;
std::size_t sum_marks;
MergeTreeData & data;

View File

@ -203,13 +203,15 @@ public:
fillMissingColumnsImpl(res, ordered_names, true);
}
const MergeTreeData::DataPartPtr & getDataPart() const { return data_part; }
private:
struct Stream
{
MarkCache::MappedPtr marks;
ReadBuffer * data_buffer;
Poco::SharedPtr<CachedCompressedReadBuffer> cached_buffer;
Poco::SharedPtr<CompressedReadBufferFromFile> non_cached_buffer;
std::unique_ptr<CachedCompressedReadBuffer> cached_buffer;
std::unique_ptr<CompressedReadBufferFromFile> non_cached_buffer;
std::string path_prefix;
size_t max_mark_range;
@ -269,15 +271,15 @@ private:
if (uncompressed_cache)
{
cached_buffer = new CachedCompressedReadBuffer(path_prefix + ".bin", uncompressed_cache,
estimated_size, aio_threshold, buffer_size);
data_buffer = &*cached_buffer;
cached_buffer = std::make_unique<CachedCompressedReadBuffer>(
path_prefix + ".bin", uncompressed_cache, estimated_size, aio_threshold, buffer_size);
data_buffer = cached_buffer.get();
}
else
{
non_cached_buffer = new CompressedReadBufferFromFile(path_prefix + ".bin", estimated_size,
aio_threshold, buffer_size);
data_buffer = &*non_cached_buffer;
non_cached_buffer = std::make_unique<CompressedReadBufferFromFile>(
path_prefix + ".bin", estimated_size, aio_threshold, buffer_size);
data_buffer = non_cached_buffer.get();
}
}

View File

@ -77,16 +77,18 @@ protected:
const auto path = storage.getFullPath() + task->data_part->name + '/';
reader = std::make_unique<MergeTreeReader>(
path, task->data_part, task->columns, owned_uncompressed_cache.get(), owned_mark_cache.get(),
storage,
task->mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size);
if (!reader || reader->getDataPart() != task->data_part)
{
reader = std::make_unique<MergeTreeReader>(
path, task->data_part, task->columns, owned_uncompressed_cache.get(), owned_mark_cache.get(),
storage, task->all_ranges, min_bytes_to_use_direct_io, max_read_buffer_size);
if (prewhere_actions)
pre_reader = std::make_unique<MergeTreeReader>(
path, task->data_part, task->pre_columns, owned_uncompressed_cache.get(),
owned_mark_cache.get(),
storage, task->mark_ranges, min_bytes_to_use_direct_io, max_read_buffer_size);
if (prewhere_actions)
pre_reader = std::make_unique<MergeTreeReader>(
path, task->data_part, task->pre_columns, owned_uncompressed_cache.get(),
owned_mark_cache.get(), storage, task->all_ranges, min_bytes_to_use_direct_io,
max_read_buffer_size);
}
}
res = readFromPart();