mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-12-15 10:52:30 +00:00
264cff6415
TODO (suggested by Nikolai) 1. Build query plan fro current query (inside storage::read) up to WithMergableState 2. Check, that plan is simple enough: Aggregating - Expression - Filter - ReadFromStorage (or simplier) 3. Check, that filter is the same as filter in projection, and also expression calculates the same aggregation keys as in projection 4. Return WithMergableState if projection applies 3 will be easier to do with ActionsDAG, cause it sees all functions, and dependencies are direct (but it is possible with ExpressionActions also) Also need to figure out how prewhere works for projections, and row_filter_policies. wip
50 lines
1.6 KiB
C++
50 lines
1.6 KiB
C++
#include <Storages/MergeTree/MergeTreeBlockOutputStream.h>
|
|
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
|
|
#include <Storages/StorageMergeTree.h>
|
|
#include <Interpreters/PartLog.h>
|
|
|
|
|
|
namespace DB
|
|
{
|
|
|
|
Block MergeTreeBlockOutputStream::getHeader() const
|
|
{
|
|
return metadata_snapshot->getSampleBlock();
|
|
}
|
|
|
|
|
|
void MergeTreeBlockOutputStream::writePrefix()
|
|
{
|
|
/// Only check "too many parts" before write,
|
|
/// because interrupting long-running INSERT query in the middle is not convenient for users.
|
|
storage.delayInsertOrThrowIfNeeded();
|
|
}
|
|
|
|
|
|
void MergeTreeBlockOutputStream::write(const Block & block)
|
|
{
|
|
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot);
|
|
for (auto & current_block : part_blocks)
|
|
{
|
|
Stopwatch watch;
|
|
|
|
MergeTreeData::MutableDataPartPtr part = storage.writer.writeTempPart(current_block, metadata_snapshot, context);
|
|
|
|
/// If optimize_on_insert setting is true, current_block could become empty after merge
|
|
/// and we didn't create part.
|
|
if (!part)
|
|
continue;
|
|
|
|
/// Part can be deduplicated, so increment counters and add to part log only if it's really added
|
|
if (storage.renameTempPartAndAdd(part, &storage.increment, nullptr, storage.getDeduplicationLog()))
|
|
{
|
|
PartLog::addNewPart(storage.getContext(), part, watch.elapsed());
|
|
|
|
/// Initiate async merge - it will be done if it's good time for merge and if there are space in 'background_pool'.
|
|
storage.background_executor.triggerTask();
|
|
}
|
|
}
|
|
}
|
|
|
|
}
|