add test when chunk with 0 columns has to be produced in squashing

This commit is contained in:
Sema Checherinda 2024-07-05 19:12:23 +02:00
parent 5e4a244faf
commit 376472c8ce
7 changed files with 71 additions and 23 deletions

View File

@ -952,7 +952,7 @@ class IColumn;
#define OBSOLETE_SETTINGS(M, ALIAS) \
/** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \
MAKE_OBSOLETE(M, Bool, update_insert_deduplication_token_in_dependent_materialized_views, 1) \
MAKE_OBSOLETE(M, Bool, update_insert_deduplication_token_in_dependent_materialized_views, 0) \
MAKE_OBSOLETE(M, UInt64, max_memory_usage_for_all_queries, 0) \
MAKE_OBSOLETE(M, UInt64, multiple_joins_rewriter_version, 0) \
MAKE_OBSOLETE(M, Bool, enable_debug_queries, false) \

View File

@ -1,5 +1,7 @@
#include <vector>
#include <Interpreters/Squashing.h>
#include "Common/Logger.h"
#include "Common/logger_useful.h"
#include <Common/CurrentThread.h>
#include <base/defines.h>
@ -16,6 +18,7 @@ Squashing::Squashing(Block header_, size_t min_block_size_rows_, size_t min_bloc
, min_block_size_bytes(min_block_size_bytes_)
, header(header_)
{
LOG_TEST(getLogger("Squashing"), "header columns {}", header.columns());
}
Chunk Squashing::flush()
@ -23,7 +26,7 @@ Chunk Squashing::flush()
if (!accumulated)
return {};
auto result = convertToChunk(accumulated.extract());
auto result = convertToChunk(extract());
chassert(result);
return result;
}
@ -43,6 +46,8 @@ Chunk Squashing::squash(Chunk && input_chunk)
Chunk Squashing::add(Chunk && input_chunk)
{
LOG_TEST(getLogger("Squashing"), "add columns {} rows {}", input_chunk.getNumColumns(), input_chunk.getNumRows());
if (!input_chunk)
return {};
@ -53,11 +58,11 @@ Chunk Squashing::add(Chunk && input_chunk)
if (!accumulated)
{
accumulated.add(std::move(input_chunk));
return convertToChunk(accumulated.extract());
return convertToChunk(extract());
}
/// Return accumulated data (maybe it has small size) and place new block to accumulated data.
Chunk res_chunk = convertToChunk(accumulated.extract());
Chunk res_chunk = convertToChunk(extract());
accumulated.add(std::move(input_chunk));
return res_chunk;
}
@ -66,7 +71,7 @@ Chunk Squashing::add(Chunk && input_chunk)
if (isEnoughSize())
{
/// Return accumulated data and place new block to accumulated data.
Chunk res_chunk = convertToChunk(accumulated.extract());
Chunk res_chunk = convertToChunk(extract());
accumulated.add(std::move(input_chunk));
return res_chunk;
}
@ -76,21 +81,25 @@ Chunk Squashing::add(Chunk && input_chunk)
/// If accumulated data is big enough, we send it
if (isEnoughSize())
return convertToChunk(accumulated.extract());
return convertToChunk(extract());
return {};
}
Chunk Squashing::convertToChunk(std::vector<Chunk> && chunks) const
Chunk Squashing::convertToChunk(CurrentData && data) const
{
if (chunks.empty())
LOG_TEST(getLogger("Squashing"), "convertToChunk {}", data.chunks.size());
if (data.chunks.empty())
return {};
auto info = std::make_shared<ChunksToSquash>();
info->chunks = std::move(chunks);
info->chunks = std::move(data.chunks);
// It is imortant that chunk is not empty, it has to have columns even if they are empty
auto aggr_chunk = Chunk(header.getColumns(), 0);
// Sometimes there are could be no columns in header but not empty rows in chunks
// That happens when we intend to add defaults for the missing columns after
auto aggr_chunk = Chunk(header.getColumns(), header.columns() ? 0 : data.getRows());
aggr_chunk.getChunkInfos().add(std::move(info));
chassert(aggr_chunk);
return aggr_chunk;
@ -149,17 +158,18 @@ bool Squashing::isEnoughSize(const Chunk & chunk) const
return isEnoughSize(chunk.getNumRows(), chunk.bytes());
}
void Squashing::CurrentSize::add(Chunk && chunk)
void Squashing::CurrentData::add(Chunk && chunk)
{
rows += chunk.getNumRows();
bytes += chunk.bytes();
chunks.push_back(std::move(chunk));
}
std::vector<Chunk> Squashing::CurrentSize::extract()
Squashing::CurrentData Squashing::extract()
{
auto result = std::move(chunks);
*this = {};
auto result = std::move(accumulated);
accumulated = {};
return result;
}
}

View File

@ -49,25 +49,23 @@ public:
const Block & getHeader() const { return header; }
private:
class CurrentSize
struct CurrentData
{
std::vector<Chunk> chunks = {};
size_t rows = 0;
size_t bytes = 0;
public:
explicit operator bool () const { return !chunks.empty(); }
size_t getRows() const { return rows; }
size_t getBytes() const { return bytes; }
void add(Chunk && chunk);
std::vector<Chunk> extract();
};
const size_t min_block_size_rows;
const size_t min_block_size_bytes;
Block header;
CurrentSize accumulated;
CurrentData accumulated;
static Chunk squash(std::vector<Chunk> && input_chunks, Chunk::ChunkInfoCollection && infos);
@ -75,7 +73,9 @@ private:
bool isEnoughSize(size_t rows, size_t bytes) const;
bool isEnoughSize(const Chunk & chunk) const;
Chunk convertToChunk(std::vector<Chunk> && chunks) const;
CurrentData extract();
Chunk convertToChunk(CurrentData && data) const;
};
}

View File

@ -148,9 +148,11 @@ void CheckTokenTransform::transform(Chunk & chunk)
auto token_info = chunk.getChunkInfos().get<TokenInfo>();
if (!token_info)
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Chunk has to have DedupTokenInfo as ChunkInfo, {}", debug);
}
LOG_DEBUG(log, "debug: {}, token: {}", debug, token_info->debugToken());
LOG_TEST(log, "debug: {}, token: {}, columns {} rows {}", debug, token_info->debugToken(), chunk.getNumColumns(), chunk.getNumRows());
}
#endif

View File

@ -12,9 +12,9 @@ drop table if exists testXC;
create table testX (A Int64) engine=MergeTree order by tuple();
create materialized view testXA engine=MergeTree order by tuple() as select sleep(0.1) from testX;
create materialized view testXB engine=MergeTree order by tuple() as select sleep(0.2), throwIf(A=1) from testX;
create materialized view testXC engine=MergeTree order by tuple() as select sleep(0.1) from testX;
create materialized view testXA engine=MergeTree order by tuple() as select sleepEachRow(0.2) from testX;
create materialized view testXB engine=MergeTree order by tuple() as select sleepEachRow(0.4), throwIf(A=1) from testX;
create materialized view testXC engine=MergeTree order by tuple() as select sleepEachRow(0.2) from testX;
-- { echoOn }
{% for parallel_view_processing in [0, 1] %}

View File

@ -0,0 +1,15 @@
-- { echo ON }
CREATE TABLE src (x UInt8) ENGINE = Memory;
CREATE TABLE dst (x UInt8) ENGINE = Memory;
CREATE MATERIALIZED VIEW mv1 TO dst AS SELECT * FROM src;
INSERT INTO src VALUES (0);
SELECT * from dst;
0
TRUNCATE TABLE dst;
--DROP TABLE src SYNC;
--CREATE TABLE src (y String) ENGINE = MergeTree order by tuple();
ALTER TABLE src ADD COLUMN y UInt8;
ALTER TABLE src DROP COLUMN x;
INSERT INTO src VALUES (0);
SELECT * from dst;
0

View File

@ -0,0 +1,21 @@
DROP TABLE IF EXISTS mv;
DROP TABLE IF EXISTS src;
DROP TABLE IF EXISTS dst;
-- { echo ON }
CREATE TABLE src (x UInt8) ENGINE = Memory;
CREATE TABLE dst (x UInt8) ENGINE = Memory;
CREATE MATERIALIZED VIEW mv1 TO dst AS SELECT * FROM src;
INSERT INTO src VALUES (0);
SELECT * from dst;
TRUNCATE TABLE dst;
--DROP TABLE src SYNC;
--CREATE TABLE src (y String) ENGINE = MergeTree order by tuple();
ALTER TABLE src ADD COLUMN y UInt8;
ALTER TABLE src DROP COLUMN x;
INSERT INTO src VALUES (0);
SELECT * from dst;