Merge pull request #60729 from ClickHouse/remove-more-live-view

Remove more code from LIVE VIEW
This commit is contained in:
Alexey Milovidov 2024-03-04 18:58:21 +03:00 committed by GitHub
commit ce6dec65cf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 4 additions and 138 deletions

View File

@ -379,7 +379,6 @@
M(467, CANNOT_PARSE_BOOL) \
M(468, CANNOT_PTHREAD_ATTR) \
M(469, VIOLATED_CONSTRAINT) \
M(470, QUERY_IS_NOT_SUPPORTED_IN_LIVE_VIEW) \
M(471, INVALID_SETTING_VALUE) \
M(472, READONLY_SETTING) \
M(473, DEADLOCK_AVOIDED) \

View File

@ -177,7 +177,6 @@ using BlockPtr = std::shared_ptr<Block>;
using Blocks = std::vector<Block>;
using BlocksList = std::list<Block>;
using BlocksPtr = std::shared_ptr<Blocks>;
using BlocksPtrs = std::shared_ptr<std::vector<BlocksPtr>>;
/// Extends block with extra data in derived classes
struct ExtraBlock

View File

@ -13,18 +13,9 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Common/ErrorCodes.cpp
Common/UInt128.h
Core/Block.h
Core/Defines.h
Core/Settings.h
Databases/DatabasesCommon.cpp
IO/WriteBufferValidUTF8.cpp
Interpreters/InterpreterAlterQuery.cpp
Interpreters/InterpreterCreateQuery.cpp
Interpreters/InterpreterFactory.cpp
Parsers/ASTAlterQuery.cpp
Parsers/ASTAlterQuery.h
Parsers/ASTCreateQuery.cpp
Parsers/ASTCreateQuery.h
Parsers/ParserAlterQuery.cpp

View File

@ -60,8 +60,6 @@ ASTPtr ASTAlterCommand::clone() const
res->settings_resets = res->children.emplace_back(settings_resets->clone()).get();
if (select)
res->select = res->children.emplace_back(select->clone()).get();
if (values)
res->values = res->children.emplace_back(values->clone()).get();
if (rename_to)
res->rename_to = res->children.emplace_back(rename_to->clone()).get();
@ -518,7 +516,6 @@ void ASTAlterCommand::forEachPointerToChild(std::function<void(void**)> f)
f(reinterpret_cast<void **>(&settings_changes));
f(reinterpret_cast<void **>(&settings_resets));
f(reinterpret_cast<void **>(&select));
f(reinterpret_cast<void **>(&values));
f(reinterpret_cast<void **>(&rename_to));
}

View File

@ -166,9 +166,6 @@ public:
/// For MODIFY_SQL_SECURITY
IAST * sql_security = nullptr;
/// In ALTER CHANNEL, ADD, DROP, SUSPEND, RESUME, REFRESH, MODIFY queries, the list of live views is stored here
IAST * values = nullptr;
/// Target column name
IAST * rename_to = nullptr;

View File

@ -348,13 +348,6 @@ void ASTCreateQuery::formatQueryImpl(const FormatSettings & settings, FormatStat
settings.ostr << (settings.hilite ? hilite_keyword : "") << " FROM " << (settings.hilite ? hilite_none : "")
<< quoteString(*attach_from_path);
if (live_view_periodic_refresh)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " WITH" << (settings.hilite ? hilite_none : "")
<< (settings.hilite ? hilite_keyword : "") << " PERIODIC REFRESH " << (settings.hilite ? hilite_none : "")
<< *live_view_periodic_refresh;
}
formatOnCluster(settings);
}
else

View File

@ -122,7 +122,6 @@ public:
ASTDictionary * dictionary = nullptr; /// dictionary definition (layout, primary key, etc.)
ASTRefreshStrategy * refresh_strategy = nullptr; // For CREATE MATERIALIZED VIEW ... REFRESH ...
std::optional<UInt64> live_view_periodic_refresh; /// For CREATE LIVE VIEW ... WITH [PERIODIC] REFRESH ...
bool is_watermark_strictly_ascending{false}; /// STRICTLY ASCENDING WATERMARK STRATEGY FOR WINDOW VIEW
bool is_watermark_ascending{false}; /// ASCENDING WATERMARK STRATEGY FOR WINDOW VIEW

View File

@ -138,7 +138,6 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
ParserList parser_reset_setting(
std::make_unique<ParserIdentifier>(), std::make_unique<ParserToken>(TokenType::Comma),
/* allow_empty = */ false);
ParserNameList values_p;
ParserSelectWithUnionQuery select_p;
ParserSQLSecurity sql_security_p;
ParserRefreshStrategy refresh_p;
@ -163,7 +162,6 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
ASTPtr command_settings_changes;
ASTPtr command_settings_resets;
ASTPtr command_select;
ASTPtr command_values;
ASTPtr command_rename_to;
ASTPtr command_sql_security;
@ -944,8 +942,6 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
command->settings_resets = command->children.emplace_back(std::move(command_settings_resets)).get();
if (command_select)
command->select = command->children.emplace_back(std::move(command_select)).get();
if (command_values)
command->values = command->children.emplace_back(std::move(command_values)).get();
if (command_sql_security)
command->sql_security = command->children.emplace_back(std::move(command_sql_security)).get();
if (command_rename_to)

View File

@ -917,15 +917,11 @@ bool ParserCreateLiveViewQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & e
ASTPtr as_database;
ASTPtr as_table;
ASTPtr select;
ASTPtr live_view_periodic_refresh;
ASTPtr sql_security;
String cluster_str;
bool attach = false;
bool if_not_exists = false;
bool with_and = false;
bool with_timeout = false;
bool with_periodic_refresh = false;
if (!s_create.ignore(pos, expected))
{
@ -949,23 +945,6 @@ bool ParserCreateLiveViewQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & e
if (!table_name_p.parse(pos, table, expected))
return false;
if (ParserKeyword{"WITH"}.ignore(pos, expected))
{
if (ParserKeyword{"REFRESH"}.ignore(pos, expected) || ParserKeyword{"PERIODIC REFRESH"}.ignore(pos, expected))
{
if (!ParserNumber{}.parse(pos, live_view_periodic_refresh, expected))
live_view_periodic_refresh = std::make_shared<ASTLiteral>(static_cast<UInt64>(60));
with_periodic_refresh = true;
}
else if (with_and)
return false;
if (!with_timeout && !with_periodic_refresh)
return false;
}
if (ParserKeyword{"ON"}.ignore(pos, expected))
{
if (!ASTQueryWithOnCluster::parse(pos, cluster_str, expected))
@ -1028,9 +1007,6 @@ bool ParserCreateLiveViewQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & e
tryGetIdentifierNameInto(as_table, query->as_table);
query->set(query->select, select);
if (live_view_periodic_refresh)
query->live_view_periodic_refresh.emplace(live_view_periodic_refresh->as<ASTLiteral &>().value.safeGet<UInt64>());
if (comment)
query->set(query->comment, comment);

View File

@ -57,7 +57,7 @@ namespace ErrorCodes
{
extern const int INCORRECT_QUERY;
extern const int TABLE_WAS_NOT_DROPPED;
extern const int QUERY_IS_NOT_SUPPORTED_IN_LIVE_VIEW;
extern const int NOT_IMPLEMENTED;
extern const int SUPPORT_IS_DISABLED;
extern const int UNSUPPORTED_METHOD;
}
@ -86,14 +86,14 @@ SelectQueryDescription buildSelectQueryDescription(const ASTPtr & select_query,
if (inner_select_with_union_query)
{
if (inner_select_with_union_query->list_of_selects->children.size() != 1)
throw Exception(ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_LIVE_VIEW, "UNION is not supported for LIVE VIEW");
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "UNION is not supported for LIVE VIEW");
inner_query = inner_select_with_union_query->list_of_selects->children[0];
}
auto * inner_select_query = inner_query->as<ASTSelectQuery>();
if (!inner_select_query)
throw Exception(DB::ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_LIVE_VIEW,
throw Exception(DB::ErrorCodes::NOT_IMPLEMENTED,
"LIVE VIEWs are only supported for queries from tables, "
"but there is no table name in select query.");
@ -226,29 +226,9 @@ StorageLiveView::StorageLiveView(
DatabaseCatalog::instance().addViewDependency(select_query_description.select_table_id, table_id_);
if (query.live_view_periodic_refresh)
{
is_periodically_refreshed = true;
periodic_live_view_refresh = Seconds {*query.live_view_periodic_refresh};
}
blocks_ptr = std::make_shared<BlocksPtr>();
blocks_metadata_ptr = std::make_shared<BlocksMetadataPtr>();
active_ptr = std::make_shared<bool>(true);
periodic_refresh_task = getContext()->getSchedulePool().createTask("LiveViewPeriodicRefreshTask",
[this]
{
try
{
periodicRefreshTaskFunc();
}
catch (...)
{
tryLogCurrentException(log, "Exception in LiveView periodic refresh task in BackgroundSchedulePool");
}
});
periodic_refresh_task->deactivate();
}
StorageLiveView::~StorageLiveView()
@ -285,17 +265,12 @@ void StorageLiveView::drop()
void StorageLiveView::startup()
{
if (is_periodically_refreshed)
periodic_refresh_task->activate();
}
void StorageLiveView::shutdown(bool)
{
shutdown_called = true;
if (is_periodically_refreshed)
periodic_refresh_task->deactivate();
DatabaseCatalog::instance().removeViewDependency(select_query_description.select_table_id, getStorageID());
}
@ -311,17 +286,7 @@ Pipe StorageLiveView::read(
std::lock_guard lock(mutex);
if (!(*blocks_ptr))
{
refreshImpl(lock);
}
else if (is_periodically_refreshed)
{
Seconds current_time = std::chrono::duration_cast<Seconds>(std::chrono::system_clock::now().time_since_epoch());
Seconds blocks_time = std::chrono::duration_cast<Seconds>(getBlocksTime(lock).time_since_epoch());
if ((current_time - periodic_live_view_refresh) >= blocks_time)
refreshImpl(lock);
}
return Pipe(std::make_shared<BlocksSource>(*blocks_ptr, getHeader()));
}
@ -362,9 +327,6 @@ Pipe StorageLiveView::watch(
if (!(*blocks_ptr))
refreshImpl(lock);
if (is_periodically_refreshed)
scheduleNextPeriodicRefresh(lock);
}
processed_stage = QueryProcessingStage::Complete;
@ -746,39 +708,6 @@ bool StorageLiveView::getNewBlocks(const std::lock_guard<std::mutex> & lock)
return updated;
}
void StorageLiveView::periodicRefreshTaskFunc()
{
LOG_TRACE(log, "periodic refresh task");
std::lock_guard lock(mutex);
if (hasActiveUsers(lock))
scheduleNextPeriodicRefresh(lock);
}
void StorageLiveView::scheduleNextPeriodicRefresh(const std::lock_guard<std::mutex> & lock)
{
Seconds current_time = std::chrono::duration_cast<Seconds>(std::chrono::system_clock::now().time_since_epoch());
Seconds blocks_time = std::chrono::duration_cast<Seconds>(getBlocksTime(lock).time_since_epoch());
if ((current_time - periodic_live_view_refresh) >= blocks_time)
{
refreshImpl(lock);
blocks_time = std::chrono::duration_cast<Seconds>(getBlocksTime(lock).time_since_epoch());
}
current_time = std::chrono::duration_cast<Seconds>(std::chrono::system_clock::now().time_since_epoch());
auto next_refresh_time = blocks_time + periodic_live_view_refresh;
if (current_time >= next_refresh_time)
periodic_refresh_task->scheduleAfter(0);
else
{
auto schedule_time = std::chrono::duration_cast<MilliSeconds> (next_refresh_time - current_time);
periodic_refresh_task->scheduleAfter(static_cast<size_t>(schedule_time.count()));
}
}
void registerStorageLiveView(StorageFactory & factory)
{
factory.registerStorage("LiveView", [](const StorageFactory::Arguments & args)

View File

@ -21,6 +21,7 @@ limitations under the License. */
namespace DB
{
using BlocksPtrs = std::shared_ptr<std::vector<BlocksPtr>>;
struct BlocksMetadata
{
@ -172,11 +173,6 @@ private:
/// Read new data blocks that store query result
bool getNewBlocks(const std::lock_guard<std::mutex> & lock);
void periodicRefreshTaskFunc();
/// Must be called with mutex locked
void scheduleNextPeriodicRefresh(const std::lock_guard<std::mutex> & lock);
SelectQueryDescription select_query_description;
/// Query over the mergeable blocks to produce final result
@ -186,9 +182,6 @@ private:
LoggerPtr log;
bool is_periodically_refreshed = false;
Seconds periodic_live_view_refresh;
/// Mutex to protect access to sample block and inner_blocks_query
mutable std::mutex sample_block_lock;
mutable Block sample_block;
@ -208,9 +201,6 @@ private:
MergeableBlocksPtr mergeable_blocks;
std::atomic<bool> shutdown_called = false;
/// Periodic refresh task used when [PERIODIC] REFRESH is specified in create statement
BackgroundSchedulePool::TaskHolder periodic_refresh_task;
};
}