fix tests for liveview windowview

This commit is contained in:
Sema Checherinda 2024-05-21 17:07:31 +02:00
parent 4fa59ca49d
commit ae124bf0b3
9 changed files with 116 additions and 61 deletions

View File

@ -23,7 +23,7 @@ namespace DeduplicationToken
String DB::DeduplicationToken::TokenInfo::getToken(bool enable_assert) const
{
chassert(stage == MATERIALIZE_VIEW_ID || !enable_assert);
chassert(stage == VIEW_ID || !enable_assert);
String result;
result.reserve(getTotalSize());
@ -38,7 +38,7 @@ void DB::DeduplicationToken::TokenInfo::setInitialToken(String part)
{
chassert(stage == INITIAL);
addTokenPart(std::move(part));
stage = MATERIALIZE_VIEW_ID;
stage = VIEW_ID;
}
void TokenInfo::setUserToken(const String & token)
@ -52,21 +52,21 @@ void TokenInfo::setSourceBlockNumber(size_t sbn)
{
chassert(stage == SOURCE_BLOCK_NUMBER);
addTokenPart(fmt::format(":source-number-{}", sbn));
stage = MATERIALIZE_VIEW_ID;
stage = VIEW_ID;
}
void TokenInfo::setMaterializeViewID(const String & id)
void TokenInfo::setViewID(const String & id)
{
chassert(stage == MATERIALIZE_VIEW_ID);
addTokenPart(fmt::format(":mv-{}", id));
stage = MATERIALIZE_VIEW_BLOCK_NUMBER;
chassert(stage == VIEW_ID);
addTokenPart(fmt::format(":view-id-{}", id));
stage = VIEW_BLOCK_NUMBER;
}
void TokenInfo::setMaterializeViewBlockNumber(size_t mvbn)
void TokenInfo::setViewBlockNumber(size_t mvbn)
{
chassert(stage == MATERIALIZE_VIEW_BLOCK_NUMBER);
addTokenPart(fmt::format(":mv-bn-{}", mvbn));
stage = MATERIALIZE_VIEW_ID;
chassert(stage == VIEW_BLOCK_NUMBER);
addTokenPart(fmt::format(":view-block-{}", mvbn));
stage = VIEW_ID;
}
void TokenInfo::reset()
@ -116,8 +116,7 @@ void SetInitialTokenTransform::transform(Chunk & chunk)
ErrorCodes::LOGICAL_ERROR,
"TokenInfo is expected for consumed chunk in SetInitialTokenTransform");
chassert(token_info);
if (!token_info || token_info->tokenInitialized())
if (token_info->tokenInitialized())
return;
SipHash hash;
@ -131,39 +130,52 @@ void SetInitialTokenTransform::transform(Chunk & chunk)
void SetUserTokenTransform::transform(Chunk & chunk)
{
auto token_info = chunk.getChunkInfos().get<TokenInfo>();
chassert(token_info);
chassert(!token_info->tokenInitialized());
if (!token_info)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"TokenInfo is expected for consumed chunk in SetUserTokenTransform");
token_info->setUserToken(user_token);
}
void SetSourceBlockNumberTransform::transform(Chunk & chunk)
{
auto token_info = chunk.getChunkInfos().get<TokenInfo>();
chassert(token_info);
chassert(!token_info->tokenInitialized());
if (!token_info)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"TokenInfo is expected for consumed chunk in SetSourceBlockNumberTransform");
token_info->setSourceBlockNumber(block_number++);
}
void SetMaterializeViewIDTransform::transform(Chunk & chunk)
void SetViewIDTransform::transform(Chunk & chunk)
{
auto token_info = chunk.getChunkInfos().get<TokenInfo>();
chassert(token_info);
chassert(token_info->tokenInitialized());
token_info->setMaterializeViewID(mv_id);
if (!token_info)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"TokenInfo is expected for consumed chunk in SetViewIDTransform");
token_info->setViewID(view_id);
}
void SetMaterializeViewBlockNumberTransform::transform(Chunk & chunk)
void SetViewBlockNumberTransform::transform(Chunk & chunk)
{
auto token_info = chunk.getChunkInfos().get<TokenInfo>();
chassert(token_info);
chassert(token_info->tokenInitialized());
token_info->setMaterializeViewBlockNumber(block_number++);
if (!token_info)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"TokenInfo is expected for consumed chunk in SetViewBlockNumberTransform");
token_info->setViewBlockNumber(block_number++);
}
void ResetTokenTransform::transform(Chunk & chunk)
{
auto token_info = chunk.getChunkInfos().get<TokenInfo>();
chassert(token_info);
if (!token_info)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"TokenInfo is expected for consumed chunk in ResetTokenTransform");
LOG_DEBUG(getLogger("ResetTokenTransform"), "token_info was {}", token_info->getToken(false));
token_info->reset();
}

View File

@ -13,14 +13,21 @@ namespace DB
public:
RestoreChunkInfosTransform(Chunk::ChunkInfoCollection chunk_infos_, const Block & header_)
: ISimpleTransform(header_, header_, true)
, chunk_infos(chunk_infos_)
, chunk_infos(std::move(chunk_infos_))
{
LOG_TRACE(getLogger("RestoreChunkInfosTransform"), "create RestoreChunkInfosTransform to append {}:{}",
chunk_infos.size(), chunk_infos.debug());
}
String getName() const override { return "RestoreChunkInfosTransform"; }
void transform(Chunk & chunk) override
{
LOG_TRACE(getLogger("RestoreChunkInfosTransform"), "chunk infos before: {}:{}, append: {}:{}, chunk has rows {}",
chunk.getChunkInfos().size(), chunk.getChunkInfos().debug(),
chunk_infos.size(), chunk_infos.debug(),
chunk.getNumRows());
chunk.getChunkInfos().append(chunk_infos.clone());
}
@ -45,8 +52,8 @@ namespace DeduplicationToken
void setInitialToken(String part);
void setUserToken(const String & token);
void setSourceBlockNumber(size_t sbn);
void setMaterializeViewID(const String & id);
void setMaterializeViewBlockNumber(size_t mvbn);
void setViewID(const String & id);
void setViewBlockNumber(size_t mvbn);
void reset();
private:
@ -57,8 +64,8 @@ namespace DeduplicationToken
{
INITIAL,
SOURCE_BLOCK_NUMBER,
MATERIALIZE_VIEW_ID,
MATERIALIZE_VIEW_BLOCK_NUMBER,
VIEW_ID,
VIEW_BLOCK_NUMBER,
};
BuildingStage stage = INITIAL;
@ -71,7 +78,7 @@ namespace DeduplicationToken
public:
CheckTokenTransform(String debug_, bool must_be_present_, const Block & header_)
: ISimpleTransform(header_, header_, true)
, debug(debug_)
, debug(std::move(debug_))
, must_be_present(must_be_present_)
{
}
@ -165,38 +172,38 @@ namespace DeduplicationToken
};
class SetMaterializeViewIDTransform : public ISimpleTransform
class SetViewIDTransform : public ISimpleTransform
{
public:
SetMaterializeViewIDTransform(String mv_id_, const Block & header_)
SetViewIDTransform(String view_id_, const Block & header_)
: ISimpleTransform(header_, header_, true)
, mv_id(std::move(mv_id_))
, view_id(std::move(view_id_))
{
}
String getName() const override { return "DeduplicationToken::SetMaterializeViewIDTransform"; }
String getName() const override { return "DeduplicationToken::SetViewIDTransform"; }
void transform(Chunk & chunk) override;
private:
String mv_id;
String view_id;
};
class SetMaterializeViewBlockNumberTransform : public ISimpleTransform
class SetViewBlockNumberTransform : public ISimpleTransform
{
public:
explicit SetMaterializeViewBlockNumberTransform(const Block & header_)
explicit SetViewBlockNumberTransform(const Block & header_)
: ISimpleTransform(header_, header_, true)
{
}
String getName() const override { return "DeduplicationToken::SetMaterializeViewBlockNumberTransform"; }
String getName() const override { return "DeduplicationToken::SetViewBlockNumberTransform"; }
void transform(Chunk & chunk) override;
private:
size_t block_number;
size_t block_number = 0;
};
}

View File

@ -620,8 +620,8 @@ static QueryPipeline process(Block block, ViewRuntimeData & view, const ViewsDat
if (!disable_deduplication_for_children)
{
String materialize_view_id = view.table_id.hasUUID() ? toString(view.table_id.uuid) : view.table_id.getFullNameNotQuoted();
pipeline.addTransform(std::make_shared<DeduplicationToken::SetMaterializeViewIDTransform>(std::move(materialize_view_id), pipeline.getHeader()));
pipeline.addTransform(std::make_shared<DeduplicationToken::SetMaterializeViewBlockNumberTransform>(pipeline.getHeader()));
pipeline.addTransform(std::make_shared<DeduplicationToken::SetViewIDTransform>(std::move(materialize_view_id), pipeline.getHeader()));
pipeline.addTransform(std::make_shared<DeduplicationToken::SetViewBlockNumberTransform>(pipeline.getHeader()));
}
else
{
@ -766,7 +766,7 @@ PushingToLiveViewSink::PushingToLiveViewSink(const Block & header, StorageLiveVi
void PushingToLiveViewSink::consume(Chunk & chunk)
{
Progress local_progress(chunk.getNumRows(), chunk.bytes(), 0);
live_view.writeBlock(getHeader(), chunk, context);
live_view.writeBlock(live_view, getHeader().cloneWithColumns(chunk.detachColumns()), std::move(chunk.getChunkInfos()), context);
if (auto process = context->getProcessListElement())
process->updateProgressIn(local_progress);
@ -790,7 +790,7 @@ void PushingToWindowViewSink::consume(Chunk & chunk)
{
Progress local_progress(chunk.getNumRows(), chunk.bytes(), 0);
StorageWindowView::writeIntoWindowView(
window_view, getHeader(), chunk, context);
window_view, getHeader().cloneWithColumns(chunk.detachColumns()), std::move(chunk.getChunkInfos()), context);
if (auto process = context->getProcessListElement())
process->updateProgressIn(local_progress);

View File

@ -27,6 +27,7 @@ limitations under the License. */
#include <Common/logger_useful.h>
#include <Common/typeid_cast.h>
#include <Common/SipHash.h>
#include "Processors/Transforms/NumberBlocksTransform.h"
#include <base/hex.h>
#include <Storages/LiveView/StorageLiveView.h>
@ -330,7 +331,7 @@ Pipe StorageLiveView::watch(
return reader;
}
void StorageLiveView::writeBlock(const Block & header, Chunk & chunk, ContextPtr local_context)
void StorageLiveView::writeBlock(StorageLiveView & live_view, Block && block, Chunk::ChunkInfoCollection && chunk_infos, ContextPtr local_context)
{
auto output = std::make_shared<LiveViewSink>(*this);
@ -363,7 +364,7 @@ void StorageLiveView::writeBlock(const Block & header, Chunk & chunk, ContextPtr
if (!is_block_processed)
{
Pipes pipes;
pipes.emplace_back(std::make_shared<SourceFromSingleChunk>(header, chunk.clone()));
pipes.emplace_back(std::make_shared<SourceFromSingleChunk>(block));
auto creator = [&](const StorageID & blocks_id_global)
{
@ -407,6 +408,21 @@ void StorageLiveView::writeBlock(const Block & header, Chunk & chunk, ContextPtr
builder = interpreter.buildQueryPipeline();
}
builder.addSimpleTransform([&](const Block & cur_header)
{
return std::make_shared<RestoreChunkInfosTransform>(chunk_infos.clone(), cur_header);
});
String live_view_id = live_view.getStorageID().hasUUID() ? toString(live_view.getStorageID().uuid) : live_view.getStorageID().getFullNameNotQuoted();
builder.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<DeduplicationToken::SetViewIDTransform>(live_view_id, stream_header);
});
builder.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<DeduplicationToken::SetViewBlockNumberTransform>(stream_header);
});
builder.addSimpleTransform([&](const Block & cur_header)
{
return std::make_shared<MaterializingTransform>(cur_header);

View File

@ -118,7 +118,7 @@ public:
return 0;
}
void writeBlock(const Block & header, Chunk & chunk, ContextPtr context);
void writeBlock(StorageLiveView & live_view, Block && block, Chunk::ChunkInfoCollection && chunk_infos, ContextPtr context);
void refresh();

View File

@ -1416,25 +1416,27 @@ void StorageWindowView::eventTimeParser(const ASTCreateQuery & query)
}
void StorageWindowView::writeIntoWindowView(
StorageWindowView & window_view, const Block & header, Chunk & chunk, ContextPtr local_context)
StorageWindowView & window_view, Block && block, Chunk::ChunkInfoCollection && chunk_infos, ContextPtr local_context)
{
LOG_TRACE(getLogger("StorageWindowView"), "writeIntoWindowView: rows {}, infos {} with {}, window column {}",
block.rows(),
chunk_infos.size(), chunk_infos.debug(),
window_view.timestamp_column_name);
window_view.throwIfWindowViewIsDisabled(local_context);
while (window_view.modifying_query)
std::this_thread::sleep_for(std::chrono::milliseconds(100));
if (!window_view.is_proctime && window_view.max_watermark == 0 && chunk.getNumRows() > 0)
if (!window_view.is_proctime && window_view.max_watermark == 0 && block.rows() > 0)
{
std::lock_guard lock(window_view.fire_signal_mutex);
const auto & window_column = header.getByName(window_view.timestamp_column_name);
const auto & window_column = block.getByName(window_view.timestamp_column_name);
const ColumnUInt32::Container & window_end_data = static_cast<const ColumnUInt32 &>(*window_column.column).getData();
UInt32 first_record_timestamp = window_end_data[0];
window_view.max_watermark = window_view.getWindowUpperBound(first_record_timestamp);
}
auto chunk_infos = chunk.getChunkInfos();
chunk.setChunkInfos({});
Pipe pipe(std::make_shared<SourceFromSingleChunk>(header.cloneEmpty(), std::move(chunk)));
Pipe pipe(std::make_shared<SourceFromSingleChunk>(block));
UInt32 lateness_bound = 0;
UInt32 t_max_watermark = 0;
@ -1465,6 +1467,9 @@ void StorageWindowView::writeIntoWindowView(
lateness_bound = t_max_fired_watermark;
}
LOG_TRACE(getLogger("StorageWindowView"), "writeIntoWindowView: lateness_bound {}, window_view.is_proctime {}",
lateness_bound, window_view.is_proctime);
if (lateness_bound > 0) /// Add filter, which leaves rows with timestamp >= lateness_bound
{
auto filter_function = makeASTFunction(
@ -1540,7 +1545,18 @@ void StorageWindowView::writeIntoWindowView(
builder.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<RestoreChunkInfosTransform>(std::move(chunk_infos), stream_header);
// Can't move chunk_infos here, that function could be called several times
return std::make_shared<RestoreChunkInfosTransform>(chunk_infos.clone(), stream_header);
});
String window_view_id = window_view.getStorageID().hasUUID() ? toString(window_view.getStorageID().uuid) : window_view.getStorageID().getFullNameNotQuoted();
builder.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<DeduplicationToken::SetViewIDTransform>(window_view_id, stream_header);
});
builder.addSimpleTransform([&](const Block & stream_header)
{
return std::make_shared<DeduplicationToken::SetViewBlockNumberTransform>(stream_header);
});
builder.addSimpleTransform([&](const Block & stream_header)
@ -1548,6 +1564,7 @@ void StorageWindowView::writeIntoWindowView(
return std::make_shared<DeduplicationToken::CheckTokenTransform>("StorageWindowView: Afrer tmp table before squasing", true, stream_header);
});
builder.addSimpleTransform([&](const Block & current_header)
{
return std::make_shared<SquashingChunksTransform>(
@ -1561,7 +1578,7 @@ void StorageWindowView::writeIntoWindowView(
UInt32 block_max_timestamp = 0;
if (window_view.is_watermark_bounded || window_view.allowed_lateness)
{
const auto & timestamp_column = *header.getByName(window_view.timestamp_column_name).column;
const auto & timestamp_column = *block.getByName(window_view.timestamp_column_name).column;
const auto & timestamp_data = typeid_cast<const ColumnUInt32 &>(timestamp_column).getData();
for (const auto & timestamp : timestamp_data)
block_max_timestamp = std::max(timestamp, block_max_timestamp);
@ -1569,6 +1586,9 @@ void StorageWindowView::writeIntoWindowView(
if (block_max_timestamp)
window_view.updateMaxTimestamp(block_max_timestamp);
LOG_TRACE(getLogger("StorageWindowView"), "writeIntoWindowView: block_max_timestamp {}",
block_max_timestamp);
}
UInt32 lateness_upper_bound = 0;

View File

@ -166,7 +166,7 @@ public:
BlockIO populate();
static void writeIntoWindowView(StorageWindowView & window_view, const Block & header, Chunk & chunk, ContextPtr context);
static void writeIntoWindowView(StorageWindowView & window_view, Block && block, Chunk::ChunkInfoCollection && chunk_infos, ContextPtr context);
ASTPtr getMergeableQuery() const { return mergeable_query->clone(); }

View File

@ -29,7 +29,7 @@ INSERT INTO without_deduplication VALUES (43);
SELECT count() FROM with_deduplication;
SELECT count() FROM without_deduplication;
-- Implicit insert is deduplicated even for MV without_deduplication_mv
-- Implicit insert isn't deduplicated, because deduplicate_blocks_in_dependent_materialized_views = 0 by default
SELECT '';
SELECT countMerge(cnt) FROM with_deduplication_mv;
SELECT countMerge(cnt) FROM without_deduplication_mv;