Add parallel final.

This commit is contained in:
Nikolai Kochetov 2020-04-22 16:52:07 +03:00
parent 15e8f37839
commit 19dadb8c2d
38 changed files with 659 additions and 164 deletions

View File

@ -17,6 +17,8 @@ public:
explicit WeakHash32(size_t size) : data(size, ~UInt32(0)) {} explicit WeakHash32(size_t size) : data(size, ~UInt32(0)) {}
WeakHash32(const WeakHash32 & other) { data.assign(other.data); } WeakHash32(const WeakHash32 & other) { data.assign(other.data); }
void reset(size_t size) { data.assign(size, ~UInt32(0)); }
const Container & getData() const { return data; } const Container & getData() const { return data; }
Container & getData() { return data; } Container & getData() { return data; }

View File

@ -54,6 +54,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingUInt64, min_insert_block_size_bytes, (DEFAULT_INSERT_BLOCK_SIZE * 256), "Squash blocks passed to INSERT query to specified size in bytes, if blocks are not big enough.", 0) \ M(SettingUInt64, min_insert_block_size_bytes, (DEFAULT_INSERT_BLOCK_SIZE * 256), "Squash blocks passed to INSERT query to specified size in bytes, if blocks are not big enough.", 0) \
M(SettingUInt64, max_joined_block_size_rows, DEFAULT_BLOCK_SIZE, "Maximum block size for JOIN result (if join algorithm supports it). 0 means unlimited.", 0) \ M(SettingUInt64, max_joined_block_size_rows, DEFAULT_BLOCK_SIZE, "Maximum block size for JOIN result (if join algorithm supports it). 0 means unlimited.", 0) \
M(SettingUInt64, max_insert_threads, 0, "The maximum number of threads to execute the INSERT SELECT query. Values 0 or 1 means that INSERT SELECT is not run in parallel. Higher values will lead to higher memory usage. Parallel INSERT SELECT has effect only if the SELECT part is run on parallel, see 'max_threads' setting.", 0) \ M(SettingUInt64, max_insert_threads, 0, "The maximum number of threads to execute the INSERT SELECT query. Values 0 or 1 means that INSERT SELECT is not run in parallel. Higher values will lead to higher memory usage. Parallel INSERT SELECT has effect only if the SELECT part is run on parallel, see 'max_threads' setting.", 0) \
M(SettingUInt64, max_final_threads, 16, "The maximum number of threads to read from table with FINAL.", 0) \
M(SettingMaxThreads, max_threads, 0, "The maximum number of threads to execute the request. By default, it is determined automatically.", 0) \ M(SettingMaxThreads, max_threads, 0, "The maximum number of threads to execute the request. By default, it is determined automatically.", 0) \
M(SettingMaxThreads, max_alter_threads, 0, "The maximum number of threads to execute the ALTER requests. By default, it is determined automatically.", 0) \ M(SettingMaxThreads, max_alter_threads, 0, "The maximum number of threads to execute the ALTER requests. By default, it is determined automatically.", 0) \
M(SettingUInt64, max_read_buffer_size, DBMS_DEFAULT_BUFFER_SIZE, "The maximum size of the buffer to read from the filesystem.", 0) \ M(SettingUInt64, max_read_buffer_size, DBMS_DEFAULT_BUFFER_SIZE, "The maximum size of the buffer to read from the filesystem.", 0) \
@ -429,7 +430,6 @@ struct Settings : public SettingsCollection<Settings>
M(SettingBool, partial_merge_join, false, "Obsolete. Use join_algorithm='prefer_partial_merge' instead.", 0) \ M(SettingBool, partial_merge_join, false, "Obsolete. Use join_algorithm='prefer_partial_merge' instead.", 0) \
M(SettingUInt64, max_memory_usage_for_all_queries, 0, "Obsolete. Will be removed after 2020-10-20", 0) \ M(SettingUInt64, max_memory_usage_for_all_queries, 0, "Obsolete. Will be removed after 2020-10-20", 0) \
DECLARE_SETTINGS_COLLECTION(LIST_OF_SETTINGS) DECLARE_SETTINGS_COLLECTION(LIST_OF_SETTINGS)
/** Set multiple settings from "profile" (in server configuration file (users.xml), profiles contain groups of multiple settings). /** Set multiple settings from "profile" (in server configuration file (users.xml), profiles contain groups of multiple settings).

View File

@ -46,7 +46,7 @@ Chunk::Chunk(MutableColumns columns_, UInt64 num_rows_, ChunkInfoPtr chunk_info_
Chunk Chunk::clone() const Chunk Chunk::clone() const
{ {
return Chunk(getColumns(), getNumRows()); return Chunk(getColumns(), getNumRows(), chunk_info);
} }
void Chunk::setColumns(Columns columns_, UInt64 num_rows_) void Chunk::setColumns(Columns columns_, UInt64 num_rows_)

View File

@ -29,10 +29,10 @@ ISimpleTransform::Status ISimpleTransform::prepare()
} }
/// Output if has data. /// Output if has data.
if (transformed) if (has_output)
{ {
output.pushData(std::move(current_data)); output.pushData(std::move(output_data));
transformed = false; has_output = false;
if (!no_more_data_needed) if (!no_more_data_needed)
return Status::PortFull; return Status::PortFull;
@ -56,58 +56,54 @@ ISimpleTransform::Status ISimpleTransform::prepare()
return Status::Finished; return Status::Finished;
} }
if (!input.hasData())
{
input.setNeeded(); input.setNeeded();
return Status::NeedData;
}
current_data = input.pullData(true); if (!input.hasData())
return Status::NeedData;
input_data = input.pullData(set_input_not_needed_after_read);
has_input = true; has_input = true;
if (current_data.exception) if (input_data.exception)
{
/// Skip transform in case of exception.
has_input = false;
transformed = true;
/// No more data needed. Exception will be thrown (or swallowed) later. /// No more data needed. Exception will be thrown (or swallowed) later.
input.setNotNeeded(); input.setNotNeeded();
} }
if (set_input_not_needed_after_read)
input.setNotNeeded();
}
/// Now transform. /// Now transform.
return Status::Ready; return Status::Ready;
} }
void ISimpleTransform::work() void ISimpleTransform::work()
{ {
if (current_data.exception) if (input_data.exception)
{
/// Skip transform in case of exception.
output_data = std::move(input_data);
has_input = false;
has_output = true;
return; return;
}
try try
{ {
transform(current_data.chunk); transform(input_data.chunk, output_data.chunk);
} }
catch (DB::Exception &) catch (DB::Exception &)
{ {
current_data.exception = std::current_exception(); output_data.exception = std::current_exception();
transformed = true; has_output = true;
has_input = false; has_input = false;
return; return;
} }
has_input = !needInputData(); has_input = !needInputData();
if (!skip_empty_chunks || current_data.chunk) if (!skip_empty_chunks || output_data.chunk)
transformed = true; has_output = true;
if (transformed && !current_data.chunk) if (has_output && !output_data.chunk && getOutputPort().getHeader())
/// Support invariant that chunks must have the same number of columns as header. /// Support invariant that chunks must have the same number of columns as header.
current_data.chunk = Chunk(getOutputPort().getHeader().cloneEmpty().getColumns(), 0); output_data.chunk = Chunk(getOutputPort().getHeader().cloneEmpty().getColumns(), 0);
} }
} }

View File

@ -6,6 +6,11 @@
namespace DB namespace DB
{ {
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
/** Has one input and one output. /** Has one input and one output.
* Simply pull a block from input, transform it, and push it to output. * Simply pull a block from input, transform it, and push it to output.
*/ */
@ -15,18 +20,29 @@ protected:
InputPort & input; InputPort & input;
OutputPort & output; OutputPort & output;
Port::Data current_data; Port::Data input_data;
Port::Data output_data;
bool has_input = false; bool has_input = false;
bool transformed = false; bool has_output = false;
bool no_more_data_needed = false; bool no_more_data_needed = false;
const bool skip_empty_chunks; const bool skip_empty_chunks;
/// Set input port NotNeeded after chunk was pulled. /// Set input port NotNeeded after chunk was pulled.
/// Input port will become needed again only after data was transformed. /// Input port will become needed again only after data was transformed.
/// This allows to escape caching chunks in input port, which can lead to uneven data distribution. /// This allows to escape caching chunks in input port, which can lead to uneven data distribution.
bool set_input_not_needed_after_read = false; bool set_input_not_needed_after_read = true;
virtual void transform(Chunk &)
{
throw Exception("Method transform is not implemented for " + getName(), ErrorCodes::NOT_IMPLEMENTED);
}
virtual void transform(Chunk & input_chunk, Chunk & output_chunk)
{
transform(input_chunk);
output_chunk.swap(input_chunk);
}
virtual void transform(Chunk & chunk) = 0;
virtual bool needInputData() const { return true; } virtual bool needInputData() const { return true; }
void stopReading() { no_more_data_needed = true; } void stopReading() { no_more_data_needed = true; }

View File

@ -289,10 +289,10 @@ void AggregatingSortedAlgorithm::initialize(Chunks chunks)
initializeQueue(std::move(chunks)); initializeQueue(std::move(chunks));
} }
void AggregatingSortedAlgorithm::consume(Chunk chunk, size_t source_num) void AggregatingSortedAlgorithm::consume(Chunk & chunk, size_t source_num)
{ {
preprocessChunk(chunk, columns_definition); preprocessChunk(chunk, columns_definition);
updateCursor(std::move(chunk), source_num); updateCursor(chunk, source_num);
} }
IMergingAlgorithm::Status AggregatingSortedAlgorithm::merge() IMergingAlgorithm::Status AggregatingSortedAlgorithm::merge()

View File

@ -20,7 +20,7 @@ public:
SortDescription description_, size_t max_block_size); SortDescription description_, size_t max_block_size);
void initialize(Chunks chunks) override; void initialize(Chunks chunks) override;
void consume(Chunk chunk, size_t source_num) override; void consume(Chunk & chunk, size_t source_num) override;
Status merge() override; Status merge() override;
struct SimpleAggregateDescription; struct SimpleAggregateDescription;

View File

@ -23,6 +23,7 @@ CollapsingSortedAlgorithm::CollapsingSortedAlgorithm(
size_t num_inputs, size_t num_inputs,
SortDescription description_, SortDescription description_,
const String & sign_column, const String & sign_column,
bool only_positive_sign_,
size_t max_block_size, size_t max_block_size,
WriteBuffer * out_row_sources_buf_, WriteBuffer * out_row_sources_buf_,
bool use_average_block_sizes, bool use_average_block_sizes,
@ -30,6 +31,7 @@ CollapsingSortedAlgorithm::CollapsingSortedAlgorithm(
: IMergingAlgorithmWithSharedChunks(num_inputs, std::move(description_), out_row_sources_buf_, max_row_refs) : IMergingAlgorithmWithSharedChunks(num_inputs, std::move(description_), out_row_sources_buf_, max_row_refs)
, merged_data(header.cloneEmptyColumns(), use_average_block_sizes, max_block_size) , merged_data(header.cloneEmptyColumns(), use_average_block_sizes, max_block_size)
, sign_column_number(header.getPositionByName(sign_column)) , sign_column_number(header.getPositionByName(sign_column))
, only_positive_sign(only_positive_sign_)
, log(log_) , log(log_)
{ {
} }
@ -76,7 +78,7 @@ void CollapsingSortedAlgorithm::insertRows()
if (last_is_positive || count_positive != count_negative) if (last_is_positive || count_positive != count_negative)
{ {
if (count_positive <= count_negative) if (count_positive <= count_negative && !only_positive_sign)
{ {
insertRow(first_negative_row); insertRow(first_negative_row);

View File

@ -31,6 +31,7 @@ public:
size_t num_inputs, size_t num_inputs,
SortDescription description_, SortDescription description_,
const String & sign_column, const String & sign_column,
bool only_positive_sign_, /// For select final. Skip rows with sum(sign) < 0.
size_t max_block_size, size_t max_block_size,
WriteBuffer * out_row_sources_buf_, WriteBuffer * out_row_sources_buf_,
bool use_average_block_sizes, bool use_average_block_sizes,
@ -42,6 +43,7 @@ private:
MergedData merged_data; MergedData merged_data;
const size_t sign_column_number; const size_t sign_column_number;
const bool only_positive_sign;
static constexpr size_t max_row_refs = 4; /// first_negative, last_positive, last, current. static constexpr size_t max_row_refs = 4; /// first_negative, last_positive, last, current.
RowRef first_negative_row; RowRef first_negative_row;

View File

@ -21,7 +21,7 @@ public:
}; };
virtual void initialize(Chunks chunks) = 0; virtual void initialize(Chunks chunks) = 0;
virtual void consume(Chunk chunk, size_t source_num) = 0; virtual void consume(Chunk & chunk, size_t source_num) = 0;
virtual Status merge() = 0; virtual Status merge() = 0;
IMergingAlgorithm() = default; IMergingAlgorithm() = default;

View File

@ -28,15 +28,15 @@ void IMergingAlgorithmWithDelayedChunk::initializeQueue(Chunks chunks)
queue = SortingHeap<SortCursor>(cursors); queue = SortingHeap<SortCursor>(cursors);
} }
void IMergingAlgorithmWithDelayedChunk::updateCursor(Chunk chunk, size_t source_num) void IMergingAlgorithmWithDelayedChunk::updateCursor(Chunk & chunk, size_t source_num)
{ {
auto & source_chunk = source_chunks[source_num]; auto & source_chunk = source_chunks[source_num];
/// Extend lifetime of last chunk. /// Extend lifetime of last chunk.
last_chunk = std::move(source_chunk); last_chunk.swap(source_chunk);
last_chunk_sort_columns = std::move(cursors[source_num].sort_columns); last_chunk_sort_columns = std::move(cursors[source_num].sort_columns);
source_chunk = std::move(chunk); source_chunk.swap(chunk);
cursors[source_num].reset(source_chunk.getColumns(), {}); cursors[source_num].reset(source_chunk.getColumns(), {});
queue.push(cursors[source_num]); queue.push(cursors[source_num]);

View File

@ -24,7 +24,7 @@ protected:
ColumnRawPtrs last_chunk_sort_columns; /// Point to last_chunk if valid. ColumnRawPtrs last_chunk_sort_columns; /// Point to last_chunk if valid.
void initializeQueue(Chunks chunks); void initializeQueue(Chunks chunks);
void updateCursor(Chunk chunk, size_t source_num); void updateCursor(Chunk & chunk, size_t source_num);
private: private:
/// Chunks currently being merged. /// Chunks currently being merged.

View File

@ -39,7 +39,7 @@ void IMergingAlgorithmWithSharedChunks::initialize(Chunks chunks)
auto & source_chunk = source_chunks[source_num]; auto & source_chunk = source_chunks[source_num];
source_chunk = chunk_allocator.alloc(std::move(chunks[source_num])); source_chunk = chunk_allocator.alloc(chunks[source_num]);
cursors[source_num] = SortCursorImpl(source_chunk->getColumns(), description, source_num); cursors[source_num] = SortCursorImpl(source_chunk->getColumns(), description, source_num);
source_chunk->all_columns = cursors[source_num].all_columns; source_chunk->all_columns = cursors[source_num].all_columns;
@ -49,12 +49,12 @@ void IMergingAlgorithmWithSharedChunks::initialize(Chunks chunks)
queue = SortingHeap<SortCursor>(cursors); queue = SortingHeap<SortCursor>(cursors);
} }
void IMergingAlgorithmWithSharedChunks::consume(Chunk chunk, size_t source_num) void IMergingAlgorithmWithSharedChunks::consume(Chunk & chunk, size_t source_num)
{ {
prepareChunk(chunk); prepareChunk(chunk);
auto & source_chunk = source_chunks[source_num]; auto & source_chunk = source_chunks[source_num];
source_chunk = chunk_allocator.alloc(std::move(chunk)); source_chunk = chunk_allocator.alloc(chunk);
cursors[source_num].reset(source_chunk->getColumns(), {}); cursors[source_num].reset(source_chunk->getColumns(), {});
source_chunk->all_columns = cursors[source_num].all_columns; source_chunk->all_columns = cursors[source_num].all_columns;

View File

@ -16,7 +16,7 @@ public:
size_t max_row_refs); size_t max_row_refs);
void initialize(Chunks chunks) override; void initialize(Chunks chunks) override;
void consume(Chunk chunk, size_t source_num) override; void consume(Chunk & chunk, size_t source_num) override;
private: private:
SortDescription description; SortDescription description;

View File

@ -74,10 +74,10 @@ void MergingSortedAlgorithm::initialize(Chunks chunks)
queue_without_collation = SortingHeap<SortCursor>(cursors); queue_without_collation = SortingHeap<SortCursor>(cursors);
} }
void MergingSortedAlgorithm::consume(Chunk chunk, size_t source_num) void MergingSortedAlgorithm::consume(Chunk & chunk, size_t source_num)
{ {
prepareChunk(chunk); prepareChunk(chunk);
source_chunks[source_num] = std::move(chunk); source_chunks[source_num].swap(chunk);
cursors[source_num].reset(source_chunks[source_num].getColumns(), {}); cursors[source_num].reset(source_chunks[source_num].getColumns(), {});
if (has_collation) if (has_collation)

View File

@ -23,7 +23,7 @@ public:
void addInput(); void addInput();
void initialize(Chunks chunks) override; void initialize(Chunks chunks) override;
void consume(Chunk chunk, size_t source_num) override; void consume(Chunk & chunk, size_t source_num) override;
Status merge() override; Status merge() override;
const MergedData & getMergedData() const { return merged_data; } const MergedData & getMergedData() const { return merged_data; }

View File

@ -63,7 +63,7 @@ public:
free_chunks.push_back(i); free_chunks.push_back(i);
} }
SharedChunkPtr alloc(Chunk && chunk) SharedChunkPtr alloc(Chunk & chunk)
{ {
if (free_chunks.empty()) if (free_chunks.empty())
throw Exception("Not enough space in SharedChunkAllocator. " throw Exception("Not enough space in SharedChunkAllocator. "
@ -72,7 +72,7 @@ public:
auto pos = free_chunks.back(); auto pos = free_chunks.back();
free_chunks.pop_back(); free_chunks.pop_back();
chunks[pos] = std::move(chunk); chunks[pos].swap(chunk);
chunks[pos].position = pos; chunks[pos].position = pos;
chunks[pos].allocator = this; chunks[pos].allocator = this;
@ -110,9 +110,9 @@ private:
} }
/// Release memory. It is not obligatory. /// Release memory. It is not obligatory.
ptr->clear(); // ptr->clear();
ptr->all_columns.clear(); // ptr->all_columns.clear();
ptr->sort_columns.clear(); // ptr->sort_columns.clear();
free_chunks.push_back(ptr->position); free_chunks.push_back(ptr->position);
} }

View File

@ -632,10 +632,10 @@ void SummingSortedAlgorithm::initialize(Chunks chunks)
initializeQueue(std::move(chunks)); initializeQueue(std::move(chunks));
} }
void SummingSortedAlgorithm::consume(Chunk chunk, size_t source_num) void SummingSortedAlgorithm::consume(Chunk & chunk, size_t source_num)
{ {
preprocessChunk(chunk); preprocessChunk(chunk);
updateCursor(std::move(chunk), source_num); updateCursor(chunk, source_num);
} }
IMergingAlgorithm::Status SummingSortedAlgorithm::merge() IMergingAlgorithm::Status SummingSortedAlgorithm::merge()

View File

@ -23,7 +23,7 @@ public:
size_t max_block_size); size_t max_block_size);
void initialize(Chunks chunks) override; void initialize(Chunks chunks) override;
void consume(Chunk chunk, size_t source_num) override; void consume(Chunk & chunk, size_t source_num) override;
Status merge() override; Status merge() override;
struct AggregateDescription; struct AggregateDescription;

View File

@ -15,6 +15,7 @@ public:
size_t num_inputs, size_t num_inputs,
SortDescription description_, SortDescription description_,
const String & sign_column, const String & sign_column,
bool only_positive_sign,
size_t max_block_size, size_t max_block_size,
WriteBuffer * out_row_sources_buf_ = nullptr, WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false) bool use_average_block_sizes = false)
@ -24,6 +25,7 @@ public:
num_inputs, num_inputs,
std::move(description_), std::move(description_),
sign_column, sign_column,
only_positive_sign,
max_block_size, max_block_size,
out_row_sources_buf_, out_row_sources_buf_,
use_average_block_sizes, use_average_block_sizes,

View File

@ -1,4 +1,5 @@
#include <Processors/Merges/IMergingTransform.h> #include <Processors/Merges/IMergingTransform.h>
#include <Processors/Transforms/SelectorInfo.h>
namespace DB namespace DB
{ {
@ -135,7 +136,6 @@ IProcessor::Status IMergingTransformBase::prepare()
if (state.is_finished) if (state.is_finished)
{ {
if (is_port_full) if (is_port_full)
return Status::PortFull; return Status::PortFull;
@ -158,11 +158,11 @@ IProcessor::Status IMergingTransformBase::prepare()
if (!input.hasData()) if (!input.hasData())
return Status::NeedData; return Status::NeedData;
auto chunk = input.pull(); state.input_chunk = input.pull();
if (!chunk.hasRows() && !input.isFinished()) if (!state.input_chunk.hasRows() && !input.isFinished())
return Status::NeedData; return Status::NeedData;
state.input_chunk = std::move(chunk); state.has_input = true;
} }
state.need_data = false; state.need_data = false;
@ -174,4 +174,83 @@ IProcessor::Status IMergingTransformBase::prepare()
return Status::Ready; return Status::Ready;
} }
static void filterChunk(Chunk & chunk, size_t selector_position)
{
if (!chunk.getChunkInfo())
throw Exception("IMergingTransformBase expected ChunkInfo for input chunk", ErrorCodes::LOGICAL_ERROR);
const auto * chunk_info = typeid_cast<const SelectorInfo *>(chunk.getChunkInfo().get());
if (!chunk_info)
throw Exception("IMergingTransformBase expected SelectorInfo for input chunk", ErrorCodes::LOGICAL_ERROR);
auto & selector = chunk_info->selector;
IColumn::Filter filter;
filter.resize_fill(selector.size());
size_t num_rows = chunk.getNumRows();
auto columns = chunk.detachColumns();
size_t num_result_rows = 0;
for (size_t row = 0; row < num_rows; ++row)
{
if (selector[row] == selector_position)
{
++num_result_rows;
filter[row] = 1;
}
}
for (auto & column : columns)
column = column->filter(filter, num_result_rows);
chunk.clear();
chunk.setColumns(std::move(columns), num_result_rows);
}
bool IMergingTransformBase::filterChunks()
{
if (state.selector_position < 0)
return true;
bool has_empty_chunk = false;
if (!state.init_chunks.empty())
{
for (size_t i = 0; i < input_states.size(); ++i)
{
auto & chunk = state.init_chunks[i];
if (!chunk || input_states[i].is_filtered)
continue;
filterChunk(chunk, state.selector_position);
if (!chunk.hasRows())
{
chunk.clear();
has_empty_chunk = true;
input_states[i].is_initialized = false;
is_initialized = false;
}
else
input_states[i].is_filtered = true;
}
}
if (state.has_input)
{
filterChunk(state.input_chunk, state.selector_position);
if (!state.input_chunk.hasRows())
{
state.has_input = false;
state.need_data = true;
has_empty_chunk = true;
}
}
return !has_empty_chunk;
}
} }

View File

@ -18,6 +18,10 @@ public:
const Block & output_header, const Block & output_header,
bool have_all_inputs_); bool have_all_inputs_);
virtual ~IMergingTransformBase() = default;
OutputPort & getOutputPort() { return outputs.front(); }
/// Methods to add additional input port. It is possible to do only before the first call of `prepare`. /// Methods to add additional input port. It is possible to do only before the first call of `prepare`.
void addInput(); void addInput();
/// Need to be called after all inputs are added. (only if have_all_inputs was not specified). /// Need to be called after all inputs are added. (only if have_all_inputs was not specified).
@ -25,20 +29,29 @@ public:
Status prepare() override; Status prepare() override;
/// Set position which will be used in selector if input chunk has attached SelectorInfo (see SelectorInfo.h).
/// Columns will be filtered, keep only rows labeled with this position.
/// It is used in parallel final.
void setSelectorPosition(size_t position) { state.selector_position = position; }
protected: protected:
virtual void onNewInput(); /// Is called when new input is added. Only if have_all_inputs = false. virtual void onNewInput(); /// Is called when new input is added. Only if have_all_inputs = false.
virtual void onFinish() {} /// Is called when all data is processed. virtual void onFinish() {} /// Is called when all data is processed.
bool filterChunks(); /// Filter chunks if selector position was set. For parallel final.
/// Processor state. /// Processor state.
struct State struct State
{ {
Chunk output_chunk; Chunk output_chunk;
Chunk input_chunk; Chunk input_chunk;
bool has_input = false;
bool is_finished = false; bool is_finished = false;
bool need_data = false; bool need_data = false;
size_t next_input_to_read = 0; size_t next_input_to_read = 0;
Chunks init_chunks; Chunks init_chunks;
ssize_t selector_position = -1;
}; };
State state; State state;
@ -50,6 +63,7 @@ private:
InputPort & port; InputPort & port;
bool is_initialized = false; bool is_initialized = false;
bool is_filtered = false;
}; };
std::vector<InputState> input_states; std::vector<InputState> input_states;
@ -78,14 +92,18 @@ public:
void work() override void work() override
{ {
if (!filterChunks())
return;
if (!state.init_chunks.empty()) if (!state.init_chunks.empty())
algorithm.initialize(std::move(state.init_chunks)); algorithm.initialize(std::move(state.init_chunks));
if (state.input_chunk) if (state.has_input)
{ {
// std::cerr << "Consume chunk with " << state.input_chunk.getNumRows() // std::cerr << "Consume chunk with " << state.input_chunk.getNumRows()
// << " for input " << state.next_input_to_read << std::endl; // << " for input " << state.next_input_to_read << std::endl;
algorithm.consume(std::move(state.input_chunk), state.next_input_to_read); algorithm.consume(state.input_chunk, state.next_input_to_read);
state.has_input = false;
} }
IMergingAlgorithm::Status status = algorithm.merge(); IMergingAlgorithm::Status status = algorithm.merge();
@ -120,4 +138,6 @@ private:
using IMergingTransformBase::state; using IMergingTransformBase::state;
}; };
using MergingTransformPtr = std::shared_ptr<IMergingTransformBase>;
} }

View File

@ -96,6 +96,15 @@ Pipe::Pipe(Pipes && pipes, ProcessorPtr transform)
processors.emplace_back(std::move(transform)); processors.emplace_back(std::move(transform));
} }
Pipe::Pipe(OutputPort * port) : output_port(port)
{
}
void Pipe::addProcessors(const Processors & processors_)
{
processors.insert(processors.end(), processors_.begin(), processors_.end());
}
void Pipe::addSimpleTransform(ProcessorPtr transform) void Pipe::addSimpleTransform(ProcessorPtr transform)
{ {
checkSimpleTransform(*transform); checkSimpleTransform(*transform);

View File

@ -22,6 +22,8 @@ public:
/// Transform must have the number of inputs equals to the number of pipes. And single output. /// Transform must have the number of inputs equals to the number of pipes. And single output.
/// Will connect pipes outputs with transform inputs automatically. /// Will connect pipes outputs with transform inputs automatically.
Pipe(Pipes && pipes, ProcessorPtr transform); Pipe(Pipes && pipes, ProcessorPtr transform);
/// Create pipe from output port. If pipe was created that way, it possibly will not have tree shape.
Pipe(OutputPort * port);
Pipe(const Pipe & other) = delete; Pipe(const Pipe & other) = delete;
Pipe(Pipe && other) = default; Pipe(Pipe && other) = default;
@ -29,6 +31,9 @@ public:
Pipe & operator=(const Pipe & other) = delete; Pipe & operator=(const Pipe & other) = delete;
Pipe & operator=(Pipe && other) = default; Pipe & operator=(Pipe && other) = default;
/// Append processors to pipe. After this, it possibly will not have tree shape.
void addProcessors(const Processors & processors_);
OutputPort & getPort() const { return *output_port; } OutputPort & getPort() const { return *output_port; }
const Block & getHeader() const { return output_port->getHeader(); } const Block & getHeader() const { return output_port->getHeader(); }

View File

@ -0,0 +1,76 @@
#include <Processors/Transforms/AddingSelectorTransform.h>
#include <Processors/Transforms/SelectorInfo.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
AddingSelectorTransform::AddingSelectorTransform(
const Block & header, size_t num_outputs_, ColumnNumbers key_columns_)
: ISimpleTransform(header, header, false)
, num_outputs(num_outputs_)
, key_columns(std::move(key_columns_))
, hash(0)
{
setInputNotNeededAfterRead(false);
if (num_outputs <= 1)
throw Exception("SplittingByHashTransform expects more than 1 outputs, got " + std::to_string(num_outputs),
ErrorCodes::LOGICAL_ERROR);
if (key_columns.empty())
throw Exception("SplittingByHashTransform cannot split by empty set of key columns",
ErrorCodes::LOGICAL_ERROR);
for (auto & column : key_columns)
if (column >= header.columns())
throw Exception("Invalid column number: " + std::to_string(column) +
". There is only " + std::to_string(header.columns()) + " columns in header",
ErrorCodes::LOGICAL_ERROR);
}
static void calculateWeakHash32(const Chunk & chunk, const ColumnNumbers & key_columns, WeakHash32 & hash)
{
auto num_rows = chunk.getNumRows();
const auto & columns = chunk.getColumns();
hash.reset(num_rows);
for (const auto & column_number : key_columns)
columns[column_number]->updateWeakHash32(hash);
}
static IColumn::Selector fillSelector(const WeakHash32 & hash, size_t num_outputs)
{
/// Row from interval [(2^32 / num_outputs) * i, (2^32 / num_outputs) * (i + 1)) goes to bucket with number i.
const auto & hash_data = hash.getData();
size_t num_rows = hash_data.size();
IColumn::Selector selector(num_rows);
for (size_t row = 0; row < num_rows; ++row)
{
selector[row] = hash_data[row]; /// [0, 2^32)
selector[row] *= num_outputs; /// [0, num_outputs * 2^32), selector stores 64 bit values.
selector[row] >>= 32u; /// [0, num_outputs)
}
return selector;
}
void AddingSelectorTransform::transform(Chunk & input_chunk, Chunk & output_chunk)
{
auto chunk_info = std::make_shared<SelectorInfo>();
calculateWeakHash32(input_chunk, key_columns, hash);
chunk_info->selector = fillSelector(hash, num_outputs);
input_chunk.swap(output_chunk);
output_chunk.setChunkInfo(std::move(chunk_info));
}
}

View File

@ -0,0 +1,26 @@
#pragma once
#include <Processors/IProcessor.h>
#include <Processors/ISimpleTransform.h>
#include <Core/ColumnNumbers.h>
#include <Common/WeakHash.h>
namespace DB
{
/// Add IColumn::Selector to chunk (see SelectorInfo.h).
/// Selector is filled by formula (WeakHash(key_columns) * num_outputs / MAX_INT).
class AddingSelectorTransform : public ISimpleTransform
{
public:
AddingSelectorTransform(const Block & header, size_t num_outputs_, ColumnNumbers key_columns_);
String getName() const override { return "SplittingByHash"; }
void transform(Chunk & input_chunk, Chunk & output_chunk) override;
private:
size_t num_outputs;
ColumnNumbers key_columns;
WeakHash32 hash;
};
}

View File

@ -0,0 +1,108 @@
#include <Processors/Transforms/CopyTransform.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
CopyTransform::CopyTransform(const Block & header, size_t num_outputs)
: IProcessor(InputPorts(1, header), OutputPorts(num_outputs, header))
{
if (num_outputs <= 1)
throw Exception("CopyTransform expects more than 1 outputs, got " + std::to_string(num_outputs), ErrorCodes::LOGICAL_ERROR);
}
IProcessor::Status CopyTransform::prepare()
{
Status status = Status::Ready;
while (status == Status::Ready)
{
status = !has_data ? prepareConsume()
: prepareGenerate();
}
return status;
}
IProcessor::Status CopyTransform::prepareConsume()
{
auto & input = getInputPort();
/// Check all outputs are finished or ready to get data.
bool all_finished = true;
for (auto & output : outputs)
{
if (output.isFinished())
continue;
all_finished = false;
}
if (all_finished)
{
input.close();
return Status::Finished;
}
/// Try get chunk from input.
if (input.isFinished())
{
for (auto & output : outputs)
output.finish();
return Status::Finished;
}
input.setNeeded();
if (!input.hasData())
return Status::NeedData;
chunk = input.pull();
has_data = true;
was_output_processed.assign(outputs.size(), false);
return Status::Ready;
}
IProcessor::Status CopyTransform::prepareGenerate()
{
bool all_outputs_processed = true;
size_t chunk_number = 0;
for (auto & output : outputs)
{
auto & was_processed = was_output_processed[chunk_number];
++chunk_number;
if (was_processed)
continue;
if (output.isFinished())
continue;
if (!output.canPush())
{
all_outputs_processed = false;
continue;
}
output.push(chunk.clone());
was_processed = true;
}
if (all_outputs_processed)
{
has_data = false;
return Status::Ready;
}
return Status::PortFull;
}
}

View File

@ -0,0 +1,28 @@
#pragma once
#include <Processors/IProcessor.h>
namespace DB
{
/// Transform which has single input and num_outputs outputs.
/// Read chunk from input and copy it to all outputs.
class CopyTransform : public IProcessor
{
public:
CopyTransform(const Block & header, size_t num_outputs);
String getName() const override { return "Copy"; }
Status prepare() override;
InputPort & getInputPort() { return inputs.front(); }
private:
Chunk chunk;
bool has_data = false;
std::vector<char> was_output_processed;
Status prepareGenerate();
Status prepareConsume();
};
}

View File

@ -0,0 +1,14 @@
#pragma once
#include <Processors/Chunk.h>
#include <Common/PODArray.h>
namespace DB
{
/// ChunkInfo with IColumn::Selector. It is added by AddingSelectorTransform.
struct SelectorInfo : public ChunkInfo
{
IColumn::Selector selector;
};
}

View File

@ -106,8 +106,10 @@ SRCS(
Sources/SourceFromInputStream.cpp Sources/SourceFromInputStream.cpp
Sources/SourceWithProgress.cpp Sources/SourceWithProgress.cpp
Transforms/AddingMissedTransform.cpp Transforms/AddingMissedTransform.cpp
Transforms/AddingSelectorTransform.cpp
Transforms/AggregatingTransform.cpp Transforms/AggregatingTransform.cpp
Transforms/ConvertingTransform.cpp Transforms/ConvertingTransform.cpp
Transforms/CopyTransform.cpp
Transforms/CreatingSetsTransform.cpp Transforms/CreatingSetsTransform.cpp
Transforms/CubeTransform.cpp Transforms/CubeTransform.cpp
Transforms/DistinctTransform.cpp Transforms/DistinctTransform.cpp

View File

@ -752,7 +752,7 @@ MergeTreeData::MutableDataPartPtr MergeTreeDataMergerMutator::mergePartsToTempor
case MergeTreeData::MergingParams::Collapsing: case MergeTreeData::MergingParams::Collapsing:
merged_transform = std::make_unique<CollapsingSortedTransform>( merged_transform = std::make_unique<CollapsingSortedTransform>(
header, pipes.size(), sort_description, data.merging_params.sign_column, header, pipes.size(), sort_description, data.merging_params.sign_column, false,
merge_block_size, rows_sources_write_buf.get(), blocks_are_granules_size); merge_block_size, rows_sources_write_buf.get(), blocks_are_granules_size);
break; break;

View File

@ -41,22 +41,25 @@ namespace std
#include <DataStreams/CollapsingFinalBlockInputStream.h> #include <DataStreams/CollapsingFinalBlockInputStream.h>
#include <DataStreams/CreatingSetsBlockInputStream.h> #include <DataStreams/CreatingSetsBlockInputStream.h>
#include <DataStreams/ReverseBlockInputStream.h> #include <DataStreams/ReverseBlockInputStream.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeDate.h> #include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeEnum.h> #include <DataTypes/DataTypeEnum.h>
#include <Storages/VirtualColumnUtils.h> #include <DataTypes/DataTypesNumber.h>
#include <Processors/Transforms/FilterTransform.h>
#include <Processors/Transforms/AddingConstColumnTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/ReverseTransform.h>
#include <Processors/Merges/MergingSortedTransform.h>
#include <Processors/Merges/SummingSortedTransform.h>
#include <Processors/Merges/AggregatingSortedTransform.h>
#include <Processors/Merges/ReplacingSortedTransform.h>
#include <Processors/Merges/VersionedCollapsingTransform.h>
#include <Processors/Executors/TreeExecutorBlockInputStream.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/ConcatProcessor.h> #include <Processors/ConcatProcessor.h>
#include <Processors/Executors/TreeExecutorBlockInputStream.h>
#include <Processors/Merges/AggregatingSortedTransform.h>
#include <Processors/Merges/CollapsingSortedTransform.h>
#include <Processors/Merges/MergingSortedTransform.h>
#include <Processors/Merges/ReplacingSortedTransform.h>
#include <Processors/Merges/SummingSortedTransform.h>
#include <Processors/Merges/VersionedCollapsingTransform.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Transforms/AddingConstColumnTransform.h>
#include <Processors/Transforms/AddingSelectorTransform.h>
#include <Processors/Transforms/CopyTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/FilterTransform.h>
#include <Processors/Transforms/ReverseTransform.h>
#include <Storages/VirtualColumnUtils.h>
namespace ProfileEvents namespace ProfileEvents
{ {
@ -617,6 +620,7 @@ Pipes MergeTreeDataSelectExecutor::readFromParts(
res = spreadMarkRangesAmongStreamsFinal( res = spreadMarkRangesAmongStreamsFinal(
std::move(parts_with_ranges), std::move(parts_with_ranges),
num_streams,
column_names_to_read, column_names_to_read,
max_block_size, max_block_size,
settings.use_uncompressed_cache, settings.use_uncompressed_cache,
@ -1017,6 +1021,7 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsWithOrder(
Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal( Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
RangesInDataParts && parts, RangesInDataParts && parts,
size_t num_streams,
const Names & column_names, const Names & column_names,
UInt64 max_block_size, UInt64 max_block_size,
bool use_uncompressed_cache, bool use_uncompressed_cache,
@ -1074,71 +1079,122 @@ Pipes MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
for (size_t i = 0; i < sort_columns_size; ++i) for (size_t i = 0; i < sort_columns_size; ++i)
sort_description.emplace_back(header.getPositionByName(sort_columns[i]), 1, 1); sort_description.emplace_back(header.getPositionByName(sort_columns[i]), 1, 1);
/// Converts pipes to BlockInputsStreams. auto get_merging_processor = [&]() -> MergingTransformPtr
/// It is temporary, till not all merging streams are implemented as processors.
auto streams_to_merge = [&pipes]()
{ {
size_t num_streams = pipes.size();
BlockInputStreams streams;
streams.reserve(num_streams);
for (size_t i = 0; i < num_streams; ++i)
streams.emplace_back(std::make_shared<TreeExecutorBlockInputStream>(std::move(pipes[i])));
pipes.clear();
return streams;
};
BlockInputStreamPtr merged;
ProcessorPtr merged_processor;
switch (data.merging_params.mode) switch (data.merging_params.mode)
{ {
case MergeTreeData::MergingParams::Ordinary: case MergeTreeData::MergingParams::Ordinary:
{ {
merged_processor = std::make_shared<MergingSortedTransform>(header, pipes.size(), return std::make_shared<MergingSortedTransform>(header, pipes.size(),
sort_description, max_block_size); sort_description, max_block_size);
break;
} }
case MergeTreeData::MergingParams::Collapsing: case MergeTreeData::MergingParams::Collapsing:
merged = std::make_shared<CollapsingFinalBlockInputStream>( return std::make_shared<CollapsingSortedTransform>(header, pipes.size(),
streams_to_merge(), sort_description, data.merging_params.sign_column); sort_description, data.merging_params.sign_column, true, max_block_size);
break;
case MergeTreeData::MergingParams::Summing: case MergeTreeData::MergingParams::Summing:
merged_processor = std::make_shared<SummingSortedTransform>(header, pipes.size(), return std::make_shared<SummingSortedTransform>(header, pipes.size(),
sort_description, data.merging_params.columns_to_sum, max_block_size); sort_description, data.merging_params.columns_to_sum, max_block_size);
break;
case MergeTreeData::MergingParams::Aggregating: case MergeTreeData::MergingParams::Aggregating:
merged_processor = std::make_shared<AggregatingSortedTransform>(header, pipes.size(), return std::make_shared<AggregatingSortedTransform>(header, pipes.size(),
sort_description, max_block_size); sort_description, max_block_size);
break;
case MergeTreeData::MergingParams::Replacing: /// TODO Make ReplacingFinalBlockInputStream case MergeTreeData::MergingParams::Replacing:
merged_processor = std::make_shared<ReplacingSortedTransform>(header, pipes.size(), return std::make_shared<ReplacingSortedTransform>(header, pipes.size(),
sort_description, data.merging_params.version_column, max_block_size); sort_description, data.merging_params.version_column, max_block_size);
break;
case MergeTreeData::MergingParams::VersionedCollapsing: /// TODO Make VersionedCollapsingFinalBlockInputStream case MergeTreeData::MergingParams::VersionedCollapsing:
merged_processor = std::make_shared<VersionedCollapsingTransform>(header, pipes.size(), return std::make_shared<VersionedCollapsingTransform>(header, pipes.size(),
sort_description, data.merging_params.sign_column, max_block_size); sort_description, data.merging_params.sign_column, max_block_size);
break;
case MergeTreeData::MergingParams::Graphite: case MergeTreeData::MergingParams::Graphite:
throw Exception("GraphiteMergeTree doesn't support FINAL", ErrorCodes::LOGICAL_ERROR); throw Exception("GraphiteMergeTree doesn't support FINAL", ErrorCodes::LOGICAL_ERROR);
} }
if (merged_processor) __builtin_unreachable();
};
if (num_streams > settings.max_final_threads)
num_streams = settings.max_final_threads;
if (num_streams <= 1 || sort_description.empty() || query_info.force_tree_shaped_pipeline)
{ {
Pipe pipe(std::move(pipes), std::move(merged_processor));
Pipe pipe(std::move(pipes), get_merging_processor());
pipes = Pipes(); pipes = Pipes();
pipes.emplace_back(std::move(pipe)); pipes.emplace_back(std::move(pipe));
return pipes;
} }
if (merged) ColumnNumbers key_columns;
pipes.emplace_back(std::make_shared<SourceFromInputStream>(merged)); key_columns.reserve(sort_description.size());
for (auto & desc : sort_description)
{
if (!desc.column_name.empty())
key_columns.push_back(header.getPositionByName(desc.column_name));
else
key_columns.emplace_back(desc.column_number);
}
Processors selectors;
Processors copiers;
selectors.reserve(pipes.size());
for (auto & pipe : pipes)
{
auto selector = std::make_shared<AddingSelectorTransform>(pipe.getHeader(), num_streams, key_columns);
auto copier = std::make_shared<CopyTransform>(pipe.getHeader(), num_streams);
connect(pipe.getPort(), selector->getInputPort());
connect(selector->getOutputPort(), copier->getInputPort());
selectors.emplace_back(std::move(selector));
copiers.emplace_back(std::move(copier));
}
Processors merges;
std::vector<InputPorts::iterator> input_ports;
merges.reserve(num_streams);
input_ports.reserve(num_streams);
for (size_t i = 0; i < num_streams; ++i)
{
auto merge = get_merging_processor();
merge->setSelectorPosition(i);
input_ports.emplace_back(merge->getInputs().begin());
merges.emplace_back(std::move(merge));
}
/// Connect outputs of i-th splitter with i-th input port of every merge.
for (auto & resize : copiers)
{
size_t input_num = 0;
for (auto & output : resize->getOutputs())
{
connect(output, *input_ports[input_num]);
++input_ports[input_num];
++input_num;
}
}
Processors processors;
for (auto & pipe : pipes)
{
auto pipe_processors = std::move(pipe).detachProcessors();
processors.insert(processors.end(), pipe_processors.begin(), pipe_processors.end());
}
pipes.clear();
pipes.reserve(num_streams);
for (auto & merge : merges)
pipes.emplace_back(&merge->getOutputs().front());
pipes.front().addProcessors(processors);
pipes.front().addProcessors(selectors);
pipes.front().addProcessors(copiers);
pipes.front().addProcessors(merges);
return pipes; return pipes;
} }

View File

@ -71,6 +71,7 @@ private:
Pipes spreadMarkRangesAmongStreamsFinal( Pipes spreadMarkRangesAmongStreamsFinal(
RangesInDataParts && parts, RangesInDataParts && parts,
size_t num_streams,
const Names & column_names, const Names & column_names,
UInt64 max_block_size, UInt64 max_block_size,
bool use_uncompressed_cache, bool use_uncompressed_cache,

View File

@ -0,0 +1,51 @@
<test>
<settings>
<max_partitions_per_insert_block>1024</max_partitions_per_insert_block>
</settings>
<substitutions>
<substitution>
<name>collapsing</name>
<values>
<value>collapsing_final_16p_ord</value>
<value>collapsing_final_16p_rnd</value>
<value>collapsing_final_16p_int_keys_ord</value>
<value>collapsing_final_16p_int_keys_rnd</value>
<value>collapsing_final_16p_str_keys_ord</value>
<value>collapsing_final_16p_str_keys_rnd</value>
<value>collapsing_final_1024p_ord</value>
<value>collapsing_final_1024p_rnd</value>
</values>
</substitution>
</substitutions>
<create_query>create table collapsing_final_16p_ord (key1 UInt32, key2 String, sign Int8, s UInt64) engine = CollapsingMergeTree(sign) order by (key1, key2) partition by intDiv(key1, 8192 * 64) </create_query>
<create_query>create table collapsing_final_16p_rnd (key1 UInt32, key2 String, sign Int8, s UInt64) engine = CollapsingMergeTree(sign) order by (key1, key2) partition by key1 % 16 </create_query>
<create_query>create table collapsing_final_16p_int_keys_ord (key1 UInt32, key2 UInt32, key3 UInt32, key4 UInt32, key5 UInt32, key6 UInt32, key7 UInt32, key8 UInt32, sign Int8, s UInt64) engine = CollapsingMergeTree(sign) order by (key1, key2, key3, key4, key5, key6, key7, key8) partition by intDiv(key1, 8192 * 64) </create_query>
<create_query>create table collapsing_final_16p_int_keys_rnd (key1 UInt32, key2 UInt32, key3 UInt32, key4 UInt32, key5 UInt32, key6 UInt32, key7 UInt32, key8 UInt32, sign Int8, s UInt64) engine = CollapsingMergeTree(sign) order by (key1, key2, key3, key4, key5, key6, key7, key8) partition by key1 % 16 </create_query>
<create_query>create table collapsing_final_16p_str_keys_ord (key1 UInt32, key2 String, key3 String, key4 String, key5 String, key6 String, key7 String, key8 String, sign Int8, s UInt64) engine = CollapsingMergeTree(sign) order by (key1, key2, key3, key4, key5, key6, key7, key8) partition by intDiv(key1, 8192 * 64) </create_query>
<create_query>create table collapsing_final_16p_str_keys_rnd (key1 UInt32, key2 String, key3 String, key4 String, key5 String, key6 String, key7 String, key8 String, sign Int8, s UInt64) engine = CollapsingMergeTree(sign) order by (key1, key2, key3, key4, key5, key6, key7, key8) partition by key1 % 16 </create_query>
<create_query>create table collapsing_final_1024p_ord (key1 UInt32, sign Int8, s UInt64) engine = CollapsingMergeTree(sign) order by (key1) partition by intDiv(key1, 8192 * 2) </create_query>
<create_query>create table collapsing_final_1024p_rnd (key1 UInt32, sign Int8, s UInt64) engine = CollapsingMergeTree(sign) order by (key1) partition by key1 % 1024 </create_query>
<!-- 16 parts, 8192 * 1024 rows each -->
<fill_query>insert into collapsing_final_16p_ord select number, number, 1, number from numbers(8388608) </fill_query>
<fill_query>insert into collapsing_final_16p_rnd select sipHash64(number), number, 1, number from numbers(8388608) </fill_query>
<fill_query>insert into collapsing_final_16p_int_keys_ord select number, number, number, number, number, number, number, number, 1, number from numbers(8388608) </fill_query>
<fill_query>insert into collapsing_final_16p_int_keys_rnd select sipHash64(number), number, number, number, number, number, number, number, 1, number from numbers(8388608) </fill_query>
<fill_query>insert into collapsing_final_16p_str_keys_ord select number, number, number, number, number, number, number, number, 1, number from numbers(8388608) </fill_query>
<fill_query>insert into collapsing_final_16p_str_keys_rnd select sipHash64(number), number, number, number, number, number, number, number, 1, number from numbers(8388608) </fill_query>
<!-- 1024 parts, 8192 * 2 rows each -->
<fill_query>insert into collapsing_final_1024p_ord select number, 1, number from numbers(16777216) </fill_query>
<fill_query>insert into collapsing_final_1024p_rnd select number, 1, number from numbers(16777216) </fill_query>
<fill_query>optimize table {collapsing} final</fill_query>
<query>SELECT count() FROM {collapsing} final</query>
<query>SELECT sum(s) FROM {collapsing} final group by key1 limit 10</query>
<query>SELECT sum(s) FROM {collapsing} final group by key1 % 8192 limit 10</query>
<drop_query>DROP TABLE IF EXISTS {collapsing}</drop_query>
</test>

View File

@ -4,11 +4,11 @@ CREATE TABLE aggregating_00191 (d Date DEFAULT '2000-01-01', k UInt64, u Aggrega
INSERT INTO aggregating_00191 (k, u) SELECT intDiv(number, 100) AS k, uniqState(toUInt64(number % 100)) AS u FROM (SELECT * FROM system.numbers LIMIT 1000) GROUP BY k; INSERT INTO aggregating_00191 (k, u) SELECT intDiv(number, 100) AS k, uniqState(toUInt64(number % 100)) AS u FROM (SELECT * FROM system.numbers LIMIT 1000) GROUP BY k;
INSERT INTO aggregating_00191 (k, u) SELECT intDiv(number, 100) AS k, uniqState(toUInt64(number % 100) + 50) AS u FROM (SELECT * FROM system.numbers LIMIT 500, 1000) GROUP BY k; INSERT INTO aggregating_00191 (k, u) SELECT intDiv(number, 100) AS k, uniqState(toUInt64(number % 100) + 50) AS u FROM (SELECT * FROM system.numbers LIMIT 500, 1000) GROUP BY k;
SELECT k, finalizeAggregation(u) FROM aggregating_00191 FINAL; SELECT k, finalizeAggregation(u) FROM aggregating_00191 FINAL order by k;
OPTIMIZE TABLE aggregating_00191; OPTIMIZE TABLE aggregating_00191;
SELECT k, finalizeAggregation(u) FROM aggregating_00191; SELECT k, finalizeAggregation(u) FROM aggregating_00191;
SELECT k, finalizeAggregation(u) FROM aggregating_00191 FINAL; SELECT k, finalizeAggregation(u) FROM aggregating_00191 FINAL order by k;
DROP TABLE aggregating_00191; DROP TABLE aggregating_00191;

View File

@ -3,7 +3,7 @@ create table mult_tab (date Date, value String, version UInt64, sign Int8) engin
insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10;
insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10;
select 'table with 2 blocks final'; select 'table with 2 blocks final';
select * from mult_tab final; select * from mult_tab final order by date, value, sign;
optimize table mult_tab; optimize table mult_tab;
select 'table with 2 blocks optimized'; select 'table with 2 blocks optimized';
select * from mult_tab; select * from mult_tab;
@ -15,7 +15,7 @@ create table mult_tab (date Date, value String, version UInt64, sign Int8) engin
insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10;
insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10;
select 'table with 2 blocks final'; select 'table with 2 blocks final';
select * from mult_tab final; select * from mult_tab final order by date, value, sign;
optimize table mult_tab; optimize table mult_tab;
select 'table with 2 blocks optimized'; select 'table with 2 blocks optimized';
select * from mult_tab; select * from mult_tab;
@ -27,7 +27,7 @@ create table mult_tab (date Date, value String, version UInt64, sign Int8) engin
insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10;
insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, -1, 1) from system.numbers limit 10; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, -1, 1) from system.numbers limit 10;
select 'table with 2 blocks final'; select 'table with 2 blocks final';
select * from mult_tab final; select * from mult_tab final order by date, value, sign;
optimize table mult_tab; optimize table mult_tab;
select 'table with 2 blocks optimized'; select 'table with 2 blocks optimized';
select * from mult_tab; select * from mult_tab;
@ -39,7 +39,7 @@ create table mult_tab (date Date, value String, version UInt64, sign Int8) engin
insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10;
insert into mult_tab select '2018-01-31', 'str_' || toString(number), 1, if(number % 2, -1, 1) from system.numbers limit 10; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 1, if(number % 2, -1, 1) from system.numbers limit 10;
select 'table with 2 blocks final'; select 'table with 2 blocks final';
select * from mult_tab final; select * from mult_tab final order by date, value, version, sign;
optimize table mult_tab; optimize table mult_tab;
select 'table with 2 blocks optimized'; select 'table with 2 blocks optimized';
select * from mult_tab; select * from mult_tab;
@ -53,7 +53,7 @@ insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(numb
insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10;
insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, -1, 1) from system.numbers limit 10; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, -1, 1) from system.numbers limit 10;
select 'table with 4 blocks final'; select 'table with 4 blocks final';
select * from mult_tab final; select * from mult_tab final order by date, value, sign;
optimize table mult_tab; optimize table mult_tab;
select 'table with 4 blocks optimized'; select 'table with 4 blocks optimized';
select * from mult_tab; select * from mult_tab;
@ -68,7 +68,7 @@ insert into mult_tab select '2018-01-31', 'str_' || toString(number), 1, if(numb
insert into mult_tab select '2018-01-31', 'str_' || toString(number), 1, if(number % 3 = 1, 1, -1) from system.numbers limit 10; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 1, if(number % 3 = 1, 1, -1) from system.numbers limit 10;
insert into mult_tab select '2018-01-31', 'str_' || toString(number), 1, if(number % 3 = 2, 1, -1) from system.numbers limit 10; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 1, if(number % 3 = 2, 1, -1) from system.numbers limit 10;
select 'table with 5 blocks final'; select 'table with 5 blocks final';
select * from mult_tab final; select * from mult_tab final order by date, value, sign;
optimize table mult_tab; optimize table mult_tab;
select 'table with 5 blocks optimized'; select 'table with 5 blocks optimized';
select * from mult_tab; select * from mult_tab;
@ -80,7 +80,7 @@ create table mult_tab (date Date, value String, version UInt64, sign Int8) engin
insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 1000000; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 1000000;
insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, -1, 1) from system.numbers limit 1000000; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, -1, 1) from system.numbers limit 1000000;
select 'table with 2 blocks final'; select 'table with 2 blocks final';
select * from mult_tab final; select * from mult_tab final order by date, value, sign;
optimize table mult_tab; optimize table mult_tab;
select 'table with 2 blocks optimized'; select 'table with 2 blocks optimized';
select * from mult_tab; select * from mult_tab;
@ -88,14 +88,14 @@ select * from mult_tab;
select '-------------------------'; select '-------------------------';
drop table if exists mult_tab; drop table if exists mult_tab;
create table mult_tab (date Date, value UInt64, version UInt64, sign Int8) engine = VersionedCollapsingMergeTree(date, (date), 8192, sign, version); create table mult_tab (date Date, value UInt64, key UInt64, version UInt64, sign Int8) engine = VersionedCollapsingMergeTree(date, (date), 8192, sign, version);
insert into mult_tab select '2018-01-31', number, 0, if(number < 64, 1, -1) from system.numbers limit 128; insert into mult_tab select '2018-01-31', number, number, 0, if(number < 64, 1, -1) from system.numbers limit 128;
insert into mult_tab select '2018-01-31', number, 0, if(number < 64, -1, 1) from system.numbers limit 128; insert into mult_tab select '2018-01-31', number, number + 128, 0, if(number < 64, -1, 1) from system.numbers limit 128;
select 'table with 2 blocks final'; select 'table with 2 blocks final';
select * from mult_tab final settings max_block_size=33; select date, value, version, sign from mult_tab final order by date, key, sign settings max_block_size=33;
optimize table mult_tab; optimize table mult_tab;
select 'table with 2 blocks optimized'; select 'table with 2 blocks optimized';
select * from mult_tab; select date, value, version, sign from mult_tab;
select '-------------------------'; select '-------------------------';
select 'Vertival merge'; select 'Vertival merge';
@ -106,7 +106,7 @@ create table mult_tab (date Date, value String, version UInt64, sign Int8) engin
insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10;
insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10;
select 'table with 2 blocks final'; select 'table with 2 blocks final';
select * from mult_tab final; select * from mult_tab final order by date, value, sign;
optimize table mult_tab; optimize table mult_tab;
select 'table with 2 blocks optimized'; select 'table with 2 blocks optimized';
select * from mult_tab; select * from mult_tab;
@ -118,7 +118,7 @@ create table mult_tab (date Date, value String, version UInt64, sign Int8) engin
insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10;
insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10;
select 'table with 2 blocks final'; select 'table with 2 blocks final';
select * from mult_tab final; select * from mult_tab final order by date, value, sign;
optimize table mult_tab; optimize table mult_tab;
select 'table with 2 blocks optimized'; select 'table with 2 blocks optimized';
select * from mult_tab; select * from mult_tab;
@ -130,7 +130,7 @@ create table mult_tab (date Date, value String, version UInt64, sign Int8) engin
insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10;
insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, -1, 1) from system.numbers limit 10; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, -1, 1) from system.numbers limit 10;
select 'table with 2 blocks final'; select 'table with 2 blocks final';
select * from mult_tab final; select * from mult_tab final order by date, value, sign;
optimize table mult_tab; optimize table mult_tab;
select 'table with 2 blocks optimized'; select 'table with 2 blocks optimized';
select * from mult_tab; select * from mult_tab;
@ -142,7 +142,7 @@ create table mult_tab (date Date, value String, version UInt64, sign Int8) engin
insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10;
insert into mult_tab select '2018-01-31', 'str_' || toString(number), 1, if(number % 2, -1, 1) from system.numbers limit 10; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 1, if(number % 2, -1, 1) from system.numbers limit 10;
select 'table with 2 blocks final'; select 'table with 2 blocks final';
select * from mult_tab final; select * from mult_tab final order by date, value, version, sign;
optimize table mult_tab; optimize table mult_tab;
select 'table with 2 blocks optimized'; select 'table with 2 blocks optimized';
select * from mult_tab; select * from mult_tab;
@ -156,7 +156,7 @@ insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(numb
insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 10;
insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, -1, 1) from system.numbers limit 10; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, -1, 1) from system.numbers limit 10;
select 'table with 4 blocks final'; select 'table with 4 blocks final';
select * from mult_tab final; select * from mult_tab final order by date, value, sign;
optimize table mult_tab; optimize table mult_tab;
select 'table with 4 blocks optimized'; select 'table with 4 blocks optimized';
select * from mult_tab; select * from mult_tab;
@ -171,7 +171,7 @@ insert into mult_tab select '2018-01-31', 'str_' || toString(number), 1, if(numb
insert into mult_tab select '2018-01-31', 'str_' || toString(number), 1, if(number % 3 = 1, 1, -1) from system.numbers limit 10; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 1, if(number % 3 = 1, 1, -1) from system.numbers limit 10;
insert into mult_tab select '2018-01-31', 'str_' || toString(number), 1, if(number % 3 = 2, 1, -1) from system.numbers limit 10; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 1, if(number % 3 = 2, 1, -1) from system.numbers limit 10;
select 'table with 5 blocks final'; select 'table with 5 blocks final';
select * from mult_tab final; select * from mult_tab final order by date, value, sign;
optimize table mult_tab; optimize table mult_tab;
select 'table with 5 blocks optimized'; select 'table with 5 blocks optimized';
select * from mult_tab; select * from mult_tab;
@ -183,7 +183,7 @@ create table mult_tab (date Date, value String, version UInt64, sign Int8) engin
insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 1000000; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, 1, -1) from system.numbers limit 1000000;
insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, -1, 1) from system.numbers limit 1000000; insert into mult_tab select '2018-01-31', 'str_' || toString(number), 0, if(number % 2, -1, 1) from system.numbers limit 1000000;
select 'table with 2 blocks final'; select 'table with 2 blocks final';
select * from mult_tab final; select * from mult_tab final order by date, value, sign;
optimize table mult_tab; optimize table mult_tab;
select 'table with 2 blocks optimized'; select 'table with 2 blocks optimized';
select * from mult_tab; select * from mult_tab;
@ -191,13 +191,13 @@ select * from mult_tab;
select '-------------------------'; select '-------------------------';
drop table if exists mult_tab; drop table if exists mult_tab;
create table mult_tab (date Date, value UInt64, version UInt64, sign Int8) engine = VersionedCollapsingMergeTree(sign, version) order by (date) settings enable_vertical_merge_algorithm = 1, vertical_merge_algorithm_min_rows_to_activate = 1, vertical_merge_algorithm_min_columns_to_activate = 0; create table mult_tab (date Date, value UInt64, key UInt64, version UInt64, sign Int8) engine = VersionedCollapsingMergeTree(sign, version) order by (date) settings enable_vertical_merge_algorithm = 1, vertical_merge_algorithm_min_rows_to_activate = 1, vertical_merge_algorithm_min_columns_to_activate = 0;
insert into mult_tab select '2018-01-31', number, 0, if(number < 64, 1, -1) from system.numbers limit 128; insert into mult_tab select '2018-01-31', number, number, 0, if(number < 64, 1, -1) from system.numbers limit 128;
insert into mult_tab select '2018-01-31', number, 0, if(number < 64, -1, 1) from system.numbers limit 128; insert into mult_tab select '2018-01-31', number, number + 128, 0, if(number < 64, -1, 1) from system.numbers limit 128;
select 'table with 2 blocks final'; select 'table with 2 blocks final';
select * from mult_tab final settings max_block_size=33; select date, value, version, sign from mult_tab final order by date, key, sign settings max_block_size=33;
optimize table mult_tab; optimize table mult_tab;
select 'table with 2 blocks optimized'; select 'table with 2 blocks optimized';
select * from mult_tab; select date, value, version, sign from mult_tab;
DROP TABLE mult_tab; DROP TABLE mult_tab;

View File

@ -5,13 +5,13 @@ create table simple (id UInt64,val SimpleAggregateFunction(sum,Double)) engine=A
insert into simple select number,number from system.numbers limit 10; insert into simple select number,number from system.numbers limit 10;
select * from simple; select * from simple;
select * from simple final; select * from simple final order by id;
select toTypeName(val) from simple limit 1; select toTypeName(val) from simple limit 1;
-- merge -- merge
insert into simple select number,number from system.numbers limit 10; insert into simple select number,number from system.numbers limit 10;
select * from simple final; select * from simple final order by id;
optimize table simple final; optimize table simple final;
select * from simple; select * from simple;
@ -33,7 +33,7 @@ insert into simple values(1,null,'2','2.2.2.2', 2, ([1,3], [1,1]));
insert into simple values(10,'10','10','10.10.10.10', 4, ([2,3], [1,1])); insert into simple values(10,'10','10','10.10.10.10', 4, ([2,3], [1,1]));
insert into simple values(10,'2222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222','20','20.20.20.20', 1, ([2, 4], [1,1])); insert into simple values(10,'2222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222222','20','20.20.20.20', 1, ([2, 4], [1,1]));
select * from simple final; select * from simple final order by id;
select toTypeName(nullable_str),toTypeName(low_str),toTypeName(ip),toTypeName(status), toTypeName(tup) from simple limit 1; select toTypeName(nullable_str),toTypeName(low_str),toTypeName(ip),toTypeName(status), toTypeName(tup) from simple limit 1;
optimize table simple final; optimize table simple final;

View File

@ -4,7 +4,7 @@ drop table if exists tst;
create table tst (timestamp DateTime, val Nullable(Int8)) engine SummingMergeTree partition by toYYYYMM(timestamp) ORDER by (timestamp); create table tst (timestamp DateTime, val Nullable(Int8)) engine SummingMergeTree partition by toYYYYMM(timestamp) ORDER by (timestamp);
insert into tst values ('2018-02-01 00:00:00', 1), ('2018-02-02 00:00:00', 2); insert into tst values ('2018-02-01 00:00:00', 1), ('2018-02-02 00:00:00', 2);
select * from tst final; select * from tst final order by timestamp;
select '-- 2 2'; select '-- 2 2';
select count() from tst; select count() from tst;
@ -34,7 +34,7 @@ drop table if exists tst;
create table tst (timestamp DateTime, val Nullable(Int8)) engine SummingMergeTree partition by toYYYYMM(timestamp) ORDER by (timestamp); create table tst (timestamp DateTime, val Nullable(Int8)) engine SummingMergeTree partition by toYYYYMM(timestamp) ORDER by (timestamp);
insert into tst values ('2018-02-01 00:00:00', 1), ('2018-02-02 00:00:00', 2), ('2018-02-01 00:00:00', 3), ('2018-02-02 00:00:00', 4); insert into tst values ('2018-02-01 00:00:00', 1), ('2018-02-02 00:00:00', 2), ('2018-02-01 00:00:00', 3), ('2018-02-02 00:00:00', 4);
select * from tst final; select * from tst final order by timestamp;
select '-- 4 2'; select '-- 4 2';
select count() from tst; select count() from tst;
@ -64,7 +64,7 @@ drop table if exists tst;
create table tst (timestamp DateTime, val Int8) engine SummingMergeTree partition by toYYYYMM(timestamp) ORDER by (timestamp); create table tst (timestamp DateTime, val Int8) engine SummingMergeTree partition by toYYYYMM(timestamp) ORDER by (timestamp);
insert into tst values ('2018-02-01 00:00:00', 1), ('2018-02-02 00:00:00', 2); insert into tst values ('2018-02-01 00:00:00', 1), ('2018-02-02 00:00:00', 2);
select * from tst final; select * from tst final order by timestamp;
select '-- 2 2'; select '-- 2 2';
select count() from tst; select count() from tst;
@ -96,7 +96,7 @@ drop table if exists tst;
create table tst (timestamp DateTime, val Int8) engine SummingMergeTree partition by toYYYYMM(timestamp) ORDER by (timestamp); create table tst (timestamp DateTime, val Int8) engine SummingMergeTree partition by toYYYYMM(timestamp) ORDER by (timestamp);
insert into tst values ('2018-02-01 00:00:00', 1), ('2018-02-02 00:00:00', 2), ('2018-02-01 00:00:00', 3), ('2018-02-02 00:00:00', 4); insert into tst values ('2018-02-01 00:00:00', 1), ('2018-02-02 00:00:00', 2), ('2018-02-01 00:00:00', 3), ('2018-02-02 00:00:00', 4);
select * from tst final; select * from tst final order by timestamp;
select '-- 4 2'; select '-- 4 2';
select count() from tst; select count() from tst;