mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-28 10:31:57 +00:00
alter table support for windowview
This commit is contained in:
parent
9b354268fa
commit
3ecd9f972a
@ -39,6 +39,7 @@
|
||||
#include <Processors/Transforms/MaterializingTransform.h>
|
||||
#include <Processors/Executors/PipelineExecutor.h>
|
||||
#include <Processors/Sinks/EmptySink.h>
|
||||
#include <Storages/AlterCommands.h>
|
||||
#include <Storages/StorageFactory.h>
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <base/sleep.h>
|
||||
@ -452,6 +453,65 @@ bool StorageWindowView::optimize(
|
||||
return getInnerStorage()->optimize(query, metadata_snapshot, partition, final, deduplicate, deduplicate_by_columns, local_context);
|
||||
}
|
||||
|
||||
void StorageWindowView::alter(
|
||||
const AlterCommands & params,
|
||||
ContextPtr local_context,
|
||||
AlterLockHolder &)
|
||||
{
|
||||
auto table_id = getStorageID();
|
||||
StorageInMemoryMetadata new_metadata = getInMemoryMetadata();
|
||||
StorageInMemoryMetadata old_metadata = getInMemoryMetadata();
|
||||
params.apply(new_metadata, local_context);
|
||||
|
||||
const auto & new_select = new_metadata.select;
|
||||
const auto & new_select_query = new_metadata.select.inner_query;
|
||||
|
||||
auto old_inner_table_id = inner_table_id;
|
||||
|
||||
modifying_query = true;
|
||||
shutdown();
|
||||
|
||||
auto inner_query = initInnerQuery(new_select_query->as<ASTSelectQuery &>(), local_context);
|
||||
|
||||
dropInnerTableIfAny(true, getContext());
|
||||
|
||||
/// create inner table
|
||||
std::exchange(has_inner_table, true);
|
||||
auto create_context = Context::createCopy(local_context);
|
||||
auto inner_create_query = getInnerTableCreateQuery(inner_query, inner_table_id);
|
||||
InterpreterCreateQuery create_interpreter(inner_create_query, create_context);
|
||||
create_interpreter.setInternal(true);
|
||||
create_interpreter.execute();
|
||||
|
||||
DatabaseCatalog::instance().addDependency(select_table_id, table_id);
|
||||
DatabaseCatalog::instance().updateDependency(old_inner_table_id, table_id, inner_table_id, table_id);
|
||||
|
||||
shutdown_called = false;
|
||||
|
||||
clean_cache_task = getContext()->getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncCleanup(); });
|
||||
fire_task = getContext()->getSchedulePool().createTask(
|
||||
getStorageID().getFullTableName(), [this] { is_proctime ? threadFuncFireProc() : threadFuncFireEvent(); });
|
||||
clean_cache_task->deactivate();
|
||||
fire_task->deactivate();
|
||||
|
||||
new_metadata.setSelectQuery(new_select);
|
||||
|
||||
DatabaseCatalog::instance().getDatabase(table_id.database_name)->alterTable(local_context, table_id, new_metadata);
|
||||
setInMemoryMetadata(new_metadata);
|
||||
|
||||
startup();
|
||||
modifying_query = false;
|
||||
}
|
||||
|
||||
void StorageWindowView::checkAlterIsPossible(const AlterCommands & commands, ContextPtr /*local_context*/) const
|
||||
{
|
||||
for (const auto & command : commands)
|
||||
{
|
||||
if (!command.isCommentAlter() && command.type != AlterCommand::MODIFY_QUERY)
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Alter of type '{}' is not supported by storage {}", command.type, getName());
|
||||
}
|
||||
}
|
||||
|
||||
std::pair<BlocksPtr, Block> StorageWindowView::getNewBlocks(UInt32 watermark)
|
||||
{
|
||||
UInt32 w_start = addTime(watermark, window_kind, -window_num_units, *time_zone);
|
||||
@ -600,13 +660,12 @@ inline void StorageWindowView::fire(UInt32 watermark)
|
||||
}
|
||||
}
|
||||
|
||||
std::shared_ptr<ASTCreateQuery> StorageWindowView::getInnerTableCreateQuery(
|
||||
const ASTPtr & inner_query, ASTStorage * storage, const String & database_name, const String & table_name)
|
||||
ASTPtr StorageWindowView::getInnerTableCreateQuery(const ASTPtr & inner_query, const StorageID & inner_table_id)
|
||||
{
|
||||
/// We will create a query to create an internal table.
|
||||
auto inner_create_query = std::make_shared<ASTCreateQuery>();
|
||||
inner_create_query->setDatabase(database_name);
|
||||
inner_create_query->setTable(table_name);
|
||||
inner_create_query->setDatabase(inner_table_id.getDatabaseName());
|
||||
inner_create_query->setTable(inner_table_id.getTableName());
|
||||
|
||||
Aliases aliases;
|
||||
QueryAliasesVisitor(aliases).visit(inner_query);
|
||||
@ -684,33 +743,34 @@ std::shared_ptr<ASTCreateQuery> StorageWindowView::getInnerTableCreateQuery(
|
||||
};
|
||||
|
||||
auto new_storage = std::make_shared<ASTStorage>();
|
||||
/// storage != nullptr in case create window view with ENGINE syntax
|
||||
if (storage)
|
||||
/// inner_storage_engine != nullptr in case create window view with ENGINE syntax
|
||||
if (inner_table_engine)
|
||||
{
|
||||
new_storage->set(new_storage->engine, storage->engine->clone());
|
||||
auto storage = inner_table_engine->as<ASTStorage &>();
|
||||
new_storage->set(new_storage->engine, storage.engine->clone());
|
||||
|
||||
if (storage->ttl_table)
|
||||
if (storage.ttl_table)
|
||||
throw Exception(
|
||||
ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW,
|
||||
"TTL is not supported for inner table in Window View");
|
||||
|
||||
if (!endsWith(storage->engine->name, "MergeTree"))
|
||||
if (!endsWith(storage.engine->name, "MergeTree"))
|
||||
throw Exception(
|
||||
ErrorCodes::INCORRECT_QUERY,
|
||||
"The ENGINE of WindowView must be MergeTree family of table engines "
|
||||
"including the engines with replication support");
|
||||
|
||||
if (storage->partition_by)
|
||||
new_storage->set(new_storage->partition_by, visit(storage->partition_by));
|
||||
if (storage->primary_key)
|
||||
new_storage->set(new_storage->primary_key, visit(storage->primary_key));
|
||||
if (storage->order_by)
|
||||
new_storage->set(new_storage->order_by, visit(storage->order_by));
|
||||
if (storage->sample_by)
|
||||
new_storage->set(new_storage->sample_by, visit(storage->sample_by));
|
||||
if (storage.partition_by)
|
||||
new_storage->set(new_storage->partition_by, visit(storage.partition_by));
|
||||
if (storage.primary_key)
|
||||
new_storage->set(new_storage->primary_key, visit(storage.primary_key));
|
||||
if (storage.order_by)
|
||||
new_storage->set(new_storage->order_by, visit(storage.order_by));
|
||||
if (storage.sample_by)
|
||||
new_storage->set(new_storage->sample_by, visit(storage.sample_by));
|
||||
|
||||
if (storage->settings)
|
||||
new_storage->set(new_storage->settings, storage->settings->clone());
|
||||
if (storage.settings)
|
||||
new_storage->set(new_storage->settings, storage.settings->clone());
|
||||
}
|
||||
else
|
||||
{
|
||||
@ -969,6 +1029,7 @@ StorageWindowView::StorageWindowView(
|
||||
: IStorage(table_id_)
|
||||
, WithContext(context_->getGlobalContext())
|
||||
, log(&Poco::Logger::get(fmt::format("StorageWindowView({}.{})", table_id_.database_name, table_id_.table_name)))
|
||||
, clean_interval_ms(context_->getSettingsRef().window_view_clean_interval.totalMilliseconds())
|
||||
{
|
||||
StorageInMemoryMetadata storage_metadata;
|
||||
storage_metadata.setColumns(columns_);
|
||||
@ -982,11 +1043,47 @@ StorageWindowView::StorageWindowView(
|
||||
ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW,
|
||||
"UNION is not supported for {}", getName());
|
||||
|
||||
select_query = query.select->list_of_selects->children.at(0)->clone();
|
||||
/// Extract information about watermark, lateness.
|
||||
eventTimeParser(query);
|
||||
|
||||
target_table_id = query.to_table_id;
|
||||
|
||||
if (query.storage)
|
||||
inner_table_engine = query.storage->clone();
|
||||
|
||||
auto inner_query = initInnerQuery(query.select->list_of_selects->children.at(0)->as<ASTSelectQuery &>(), context_);
|
||||
|
||||
if(is_proctime)
|
||||
next_fire_signal = getWindowUpperBound(std::time(nullptr));
|
||||
|
||||
std::exchange(has_inner_table, true);
|
||||
if (!attach_)
|
||||
{
|
||||
auto inner_create_query = getInnerTableCreateQuery(inner_query, inner_table_id);
|
||||
auto create_context = Context::createCopy(context_);
|
||||
InterpreterCreateQuery create_interpreter(inner_create_query, create_context);
|
||||
create_interpreter.setInternal(true);
|
||||
create_interpreter.execute();
|
||||
}
|
||||
|
||||
DatabaseCatalog::instance().addDependency(select_table_id, getStorageID());
|
||||
|
||||
clean_cache_task = getContext()->getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncCleanup(); });
|
||||
fire_task = getContext()->getSchedulePool().createTask(
|
||||
getStorageID().getFullTableName(), [this] { is_proctime ? threadFuncFireProc() : threadFuncFireEvent(); });
|
||||
clean_cache_task->deactivate();
|
||||
fire_task->deactivate();
|
||||
}
|
||||
|
||||
ASTPtr StorageWindowView::initInnerQuery(ASTSelectQuery query, ContextPtr context_)
|
||||
{
|
||||
select_query = query.clone();
|
||||
sample_block.clear();
|
||||
|
||||
String select_database_name = getContext()->getCurrentDatabase();
|
||||
String select_table_name;
|
||||
auto select_query_tmp = select_query->clone();
|
||||
extractDependentTable(getContext(), select_query_tmp, select_database_name, select_table_name);
|
||||
auto select_query_tmp = query.clone();
|
||||
extractDependentTable(context_, select_query_tmp, select_database_name, select_table_name);
|
||||
|
||||
/// If the table is not specified - use the table `system.one`
|
||||
if (select_table_name.empty())
|
||||
@ -995,77 +1092,40 @@ StorageWindowView::StorageWindowView(
|
||||
select_table_name = "one";
|
||||
}
|
||||
select_table_id = StorageID(select_database_name, select_table_name);
|
||||
DatabaseCatalog::instance().addDependency(select_table_id, table_id_);
|
||||
|
||||
/// Extract all info from query; substitute Function_tumble and Function_hop with Function_windowID.
|
||||
auto inner_query = innerQueryParser(select_query->as<ASTSelectQuery &>());
|
||||
auto inner_query = innerQueryParser(query);
|
||||
|
||||
// Parse mergeable query
|
||||
/// Parse mergeable query
|
||||
mergeable_query = inner_query->clone();
|
||||
ReplaceFunctionNowData func_now_data;
|
||||
ReplaceFunctionNowVisitor(func_now_data).visit(mergeable_query);
|
||||
is_time_column_func_now = func_now_data.is_time_column_func_now;
|
||||
if (!is_proctime && is_time_column_func_now)
|
||||
throw Exception("now() is not supported for Event time processing.", ErrorCodes::INCORRECT_QUERY);
|
||||
if (is_time_column_func_now)
|
||||
window_id_name = func_now_data.window_id_name;
|
||||
|
||||
// Parse final query (same as mergeable query but has tumble/hop instead of windowID)
|
||||
/// Parse final query (same as mergeable query but has tumble/hop instead of windowID)
|
||||
final_query = mergeable_query->clone();
|
||||
ReplaceWindowIdMatcher::Data final_query_data;
|
||||
if (is_tumble)
|
||||
final_query_data.window_name = "tumble";
|
||||
else
|
||||
final_query_data.window_name = "hop";
|
||||
final_query_data.window_name = is_tumble ? "tumble" : "hop";
|
||||
ReplaceWindowIdMatcher::Visitor(final_query_data).visit(final_query);
|
||||
|
||||
is_watermark_strictly_ascending = query.is_watermark_strictly_ascending;
|
||||
is_watermark_ascending = query.is_watermark_ascending;
|
||||
is_watermark_bounded = query.is_watermark_bounded;
|
||||
target_table_id = query.to_table_id;
|
||||
window_column_name = std::regex_replace(window_id_name, std::regex("windowID"), is_tumble ? "tumble" : "hop");
|
||||
|
||||
/// Extract information about watermark, lateness.
|
||||
eventTimeParser(query);
|
||||
|
||||
if (is_tumble)
|
||||
window_column_name = std::regex_replace(window_id_name, std::regex("windowID"), "tumble");
|
||||
else
|
||||
window_column_name = std::regex_replace(window_id_name, std::regex("windowID"), "hop");
|
||||
|
||||
auto generate_inner_table_name = [](const StorageID & storage_id)
|
||||
auto generate_inner_table_id = [](const StorageID & storage_id)
|
||||
{
|
||||
if (storage_id.hasUUID())
|
||||
return ".inner." + toString(storage_id.uuid);
|
||||
return ".inner." + storage_id.table_name;
|
||||
StorageID table_id = StorageID::createEmpty();
|
||||
table_id.database_name = storage_id.database_name;
|
||||
table_id.table_name = ".inner." + (storage_id.hasUUID() ? toString(storage_id.uuid) : storage_id.table_name);
|
||||
return table_id;
|
||||
};
|
||||
inner_table_id = generate_inner_table_id(getStorageID());
|
||||
|
||||
if (attach_)
|
||||
{
|
||||
inner_table_id = StorageID(table_id_.database_name, generate_inner_table_name(table_id_));
|
||||
}
|
||||
else
|
||||
{
|
||||
auto inner_create_query
|
||||
= getInnerTableCreateQuery(inner_query, query.storage, table_id_.database_name, generate_inner_table_name(table_id_));
|
||||
|
||||
auto create_context = Context::createCopy(context_);
|
||||
InterpreterCreateQuery create_interpreter(inner_create_query, create_context);
|
||||
create_interpreter.setInternal(true);
|
||||
create_interpreter.execute();
|
||||
inner_table_id = StorageID(inner_create_query->getDatabase(), inner_create_query->getTable());
|
||||
}
|
||||
|
||||
clean_interval_ms = getContext()->getSettingsRef().window_view_clean_interval.totalMilliseconds();
|
||||
next_fire_signal = getWindowUpperBound(std::time(nullptr));
|
||||
|
||||
clean_cache_task = getContext()->getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncCleanup(); });
|
||||
if (is_proctime)
|
||||
fire_task = getContext()->getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncFireProc(); });
|
||||
else
|
||||
fire_task = getContext()->getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncFireEvent(); });
|
||||
clean_cache_task->deactivate();
|
||||
fire_task->deactivate();
|
||||
return inner_query;
|
||||
}
|
||||
|
||||
|
||||
ASTPtr StorageWindowView::innerQueryParser(const ASTSelectQuery & query)
|
||||
{
|
||||
if (!query.groupBy())
|
||||
@ -1127,13 +1187,16 @@ ASTPtr StorageWindowView::innerQueryParser(const ASTSelectQuery & query)
|
||||
|
||||
void StorageWindowView::eventTimeParser(const ASTCreateQuery & query)
|
||||
{
|
||||
watermark_num_units = 0;
|
||||
lateness_num_units = 0;
|
||||
is_watermark_strictly_ascending = query.is_watermark_strictly_ascending;
|
||||
is_watermark_ascending = query.is_watermark_ascending;
|
||||
is_watermark_bounded = query.is_watermark_bounded;
|
||||
|
||||
if (query.is_watermark_strictly_ascending || query.is_watermark_ascending || query.is_watermark_bounded)
|
||||
{
|
||||
is_proctime = false;
|
||||
|
||||
if (is_time_column_func_now)
|
||||
throw Exception("now() is not supported for Event time processing.", ErrorCodes::INCORRECT_QUERY);
|
||||
|
||||
if (query.is_watermark_ascending)
|
||||
{
|
||||
is_watermark_bounded = true;
|
||||
@ -1147,6 +1210,8 @@ void StorageWindowView::eventTimeParser(const ASTCreateQuery & query)
|
||||
"Illegal type WATERMARK function should be Interval");
|
||||
}
|
||||
}
|
||||
else
|
||||
is_proctime = true;
|
||||
|
||||
if (query.allowed_lateness)
|
||||
{
|
||||
@ -1155,11 +1220,16 @@ void StorageWindowView::eventTimeParser(const ASTCreateQuery & query)
|
||||
query.lateness_function, lateness_kind, lateness_num_units,
|
||||
"Illegal type ALLOWED_LATENESS function should be Interval");
|
||||
}
|
||||
else
|
||||
allowed_lateness = false;
|
||||
}
|
||||
|
||||
void StorageWindowView::writeIntoWindowView(
|
||||
StorageWindowView & window_view, const Block & block, ContextPtr local_context)
|
||||
{
|
||||
while (window_view.modifying_query)
|
||||
std::this_thread::sleep_for(std::chrono::milliseconds(100));
|
||||
|
||||
Pipe pipe(std::make_shared<SourceFromSingleChunk>(block.cloneEmpty(), Chunk(block.getColumns(), block.rows())));
|
||||
|
||||
UInt32 lateness_bound = 0;
|
||||
@ -1340,6 +1410,7 @@ void StorageWindowView::shutdown()
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
fire_condition.notify_all();
|
||||
fire_signal_condition.notify_all();
|
||||
}
|
||||
|
||||
clean_cache_task->deactivate();
|
||||
|
@ -134,6 +134,10 @@ public:
|
||||
const Names & deduplicate_by_columns,
|
||||
ContextPtr context) override;
|
||||
|
||||
void alter(const AlterCommands & params, ContextPtr context, AlterLockHolder & table_lock_holder) override;
|
||||
|
||||
void checkAlterIsPossible(const AlterCommands & commands, ContextPtr context) const override;
|
||||
|
||||
void startup() override;
|
||||
void shutdown() override;
|
||||
|
||||
@ -161,10 +165,11 @@ private:
|
||||
/// Used to fetch the mergeable state and generate the final result. e.g. SELECT * FROM * GROUP BY tumble(____timestamp, *)
|
||||
ASTPtr final_query;
|
||||
|
||||
bool is_proctime{true};
|
||||
bool is_proctime;
|
||||
bool is_time_column_func_now;
|
||||
bool is_tumble; // false if is hop
|
||||
std::atomic<bool> shutdown_called{false};
|
||||
std::atomic<bool> modifying_query{false};
|
||||
bool has_inner_table{true};
|
||||
mutable Block sample_block;
|
||||
UInt64 clean_interval_ms;
|
||||
@ -172,10 +177,10 @@ private:
|
||||
UInt32 max_timestamp = 0;
|
||||
UInt32 max_watermark = 0; // next watermark to fire
|
||||
UInt32 max_fired_watermark = 0;
|
||||
bool is_watermark_strictly_ascending{false};
|
||||
bool is_watermark_ascending{false};
|
||||
bool is_watermark_bounded{false};
|
||||
bool allowed_lateness{false};
|
||||
bool is_watermark_strictly_ascending;
|
||||
bool is_watermark_ascending;
|
||||
bool is_watermark_bounded;
|
||||
bool allowed_lateness;
|
||||
UInt32 next_fire_signal;
|
||||
std::deque<UInt32> fire_signal;
|
||||
std::list<std::weak_ptr<WindowViewSource>> watch_streams;
|
||||
@ -195,8 +200,8 @@ private:
|
||||
Int64 window_num_units;
|
||||
Int64 hop_num_units;
|
||||
Int64 slice_num_units;
|
||||
Int64 watermark_num_units = 0;
|
||||
Int64 lateness_num_units = 0;
|
||||
Int64 watermark_num_units;
|
||||
Int64 lateness_num_units;
|
||||
Int64 slide_num_units;
|
||||
String window_id_name;
|
||||
String window_id_alias;
|
||||
@ -207,6 +212,8 @@ private:
|
||||
StorageID target_table_id = StorageID::createEmpty();
|
||||
StorageID inner_table_id = StorageID::createEmpty();
|
||||
|
||||
ASTPtr inner_table_engine;
|
||||
|
||||
BackgroundSchedulePool::TaskHolder clean_cache_task;
|
||||
BackgroundSchedulePool::TaskHolder fire_task;
|
||||
|
||||
@ -215,9 +222,8 @@ private:
|
||||
|
||||
ASTPtr innerQueryParser(const ASTSelectQuery & query);
|
||||
void eventTimeParser(const ASTCreateQuery & query);
|
||||
ASTPtr initInnerQuery(ASTSelectQuery query, ContextPtr context);
|
||||
|
||||
std::shared_ptr<ASTCreateQuery> getInnerTableCreateQuery(
|
||||
const ASTPtr & inner_query, ASTStorage * storage, const String & database_name, const String & table_name);
|
||||
UInt32 getCleanupBound();
|
||||
ASTPtr getCleanupQuery();
|
||||
|
||||
@ -235,6 +241,7 @@ private:
|
||||
|
||||
ASTPtr getFinalQuery() const { return final_query->clone(); }
|
||||
ASTPtr getFetchColumnQuery(UInt32 w_start, UInt32 w_end) const;
|
||||
ASTPtr getInnerTableCreateQuery(const ASTPtr & inner_query, const StorageID & inner_table_id);
|
||||
|
||||
StoragePtr getParentStorage() const;
|
||||
|
||||
|
@ -51,6 +51,8 @@ protected:
|
||||
Block block;
|
||||
UInt32 watermark;
|
||||
std::tie(block, watermark) = generateImpl();
|
||||
if (!block)
|
||||
return Chunk();
|
||||
if (is_events)
|
||||
{
|
||||
return Chunk(
|
||||
|
Loading…
Reference in New Issue
Block a user