build fix

This commit is contained in:
Vxider 2020-01-24 05:45:45 +03:00
parent 17a31771da
commit 4c2f5e1c8d
4 changed files with 40 additions and 70 deletions

View File

@ -84,18 +84,8 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
out = io.out; out = io.out;
} }
else if (dynamic_cast<const StorageLiveView *>(dependent_table.get())) else if (dynamic_cast<const StorageLiveView *>(dependent_table.get()))
out = std::make_shared<PushingToViewsBlockOutputStream>( out = std::make_shared<PushingToViewsBlockOutputStream>(dependent_table, *views_context, ASTPtr(), true);
database_table.first, database_table.second, dependent_table, *views_context, ASTPtr(), true);
else if (dynamic_cast<const StorageWindowView *>(dependent_table.get())) else if (dynamic_cast<const StorageWindowView *>(dependent_table.get()))
out = std::make_shared<PushingToViewsBlockOutputStream>(
database_table.first, database_table.second, dependent_table, *views_context, ASTPtr(), true);
else
out = std::make_shared<PushingToViewsBlockOutputStream>(
database_table.first, database_table.second, dependent_table, *views_context, ASTPtr());
views.emplace_back(ViewInfo{std::move(query), database_table.first, database_table.second, std::move(out)});
}
else if (dynamic_cast<const StorageLiveView *>(dependent_table.get()))
out = std::make_shared<PushingToViewsBlockOutputStream>(dependent_table, *views_context, ASTPtr(), true); out = std::make_shared<PushingToViewsBlockOutputStream>(dependent_table, *views_context, ASTPtr(), true);
else else
out = std::make_shared<PushingToViewsBlockOutputStream>(dependent_table, *views_context, ASTPtr()); out = std::make_shared<PushingToViewsBlockOutputStream>(dependent_table, *views_context, ASTPtr());

View File

@ -152,20 +152,19 @@ static void extractDependentTable(ASTSelectQuery & query, String & select_databa
void StorageWindowView::checkTableCanBeDropped() const void StorageWindowView::checkTableCanBeDropped() const
{ {
Dependencies dependencies = global_context.getDependencies(database_name, table_name); auto table_id = getStorageID();
Dependencies dependencies = global_context.getDependencies(table_id);
if (!dependencies.empty()) if (!dependencies.empty())
{ {
DatabaseAndTableName database_and_table_name = dependencies.front(); StorageID dependent_table_id = dependencies.front();
throw Exception( throw Exception("Table has dependency " + dependent_table_id.getNameForLogs(), ErrorCodes::TABLE_WAS_NOT_DROPPED);
"Table has dependency " + database_and_table_name.first + "." + database_and_table_name.second,
ErrorCodes::TABLE_WAS_NOT_DROPPED);
} }
} }
void StorageWindowView::drop(TableStructureWriteLockHolder &) void StorageWindowView::drop(TableStructureWriteLockHolder &)
{ {
global_context.removeDependency( auto table_id = getStorageID();
DatabaseAndTableName(select_database_name, select_table_name), DatabaseAndTableName(database_name, table_name)); global_context.removeDependency(select_table_id, table_id);
std::lock_guard lock(mutex); std::lock_guard lock(mutex);
is_dropped = true; is_dropped = true;
@ -207,7 +206,7 @@ UInt32 StorageWindowView::getWindowUpperBound(UInt32 time_sec)
void StorageWindowView::threadFuncToTable() void StorageWindowView::threadFuncToTable()
{ {
while (!shutdown_called && has_target_table) while (!shutdown_called && !target_table_id.empty())
{ {
std::unique_lock lock(flushTableMutex); std::unique_lock lock(flushTableMutex);
UInt64 timestamp_usec = static_cast<UInt64>(Poco::Timestamp().epochMicroseconds()); UInt64 timestamp_usec = static_cast<UInt64>(Poco::Timestamp().epochMicroseconds());
@ -236,7 +235,7 @@ BlockInputStreams StorageWindowView::watch(
size_t /*max_block_size*/, size_t /*max_block_size*/,
const unsigned /*num_streams*/) const unsigned /*num_streams*/)
{ {
if (has_target_table) if (!target_table_id.empty())
throw Exception("WATCH query is disabled for " + getName() + " when constructed with 'TO' clause.", ErrorCodes::INCORRECT_QUERY); throw Exception("WATCH query is disabled for " + getName() + " when constructed with 'TO' clause.", ErrorCodes::INCORRECT_QUERY);
if (active_ptr.use_count() > 1) if (active_ptr.use_count() > 1)
@ -279,9 +278,8 @@ Block StorageWindowView::getHeader() const
{ {
if (!sample_block) if (!sample_block)
{ {
auto storage = global_context.getTable(select_database_name, select_table_name); auto storage = global_context.getTable(select_table_id);
sample_block = InterpreterSelectQuery( sample_block = InterpreterSelectQuery(getInnerQuery(), global_context, storage, SelectQueryOptions(QueryProcessingStage::Complete))
getInnerQuery(), global_context, storage, SelectQueryOptions(QueryProcessingStage::Complete))
.getSampleBlock(); .getSampleBlock();
for (size_t i = 0; i < sample_block.columns(); ++i) for (size_t i = 0; i < sample_block.columns(); ++i)
sample_block.safeGetByPosition(i).column = sample_block.safeGetByPosition(i).column->convertToFullColumnIfConst(); sample_block.safeGetByPosition(i).column = sample_block.safeGetByPosition(i).column->convertToFullColumnIfConst();
@ -290,24 +288,14 @@ Block StorageWindowView::getHeader() const
return sample_block; return sample_block;
} }
StoragePtr & StorageWindowView::getParentStorage()
{
if (!parent_storage)
parent_storage = global_context.getTable(getSelectDatabaseName(), getSelectTableName());
return parent_storage;
}
StorageWindowView::StorageWindowView( StorageWindowView::StorageWindowView(
const String & table_name_, const StorageID & table_id_,
const String & database_name_,
Context & local_context, Context & local_context,
const ASTCreateQuery & query, const ASTCreateQuery & query,
const ColumnsDescription & columns_) const ColumnsDescription & columns_)
: table_name(table_name_) : IStorage(table_id_)
, database_name(database_name_)
, global_context(local_context.getGlobalContext()) , global_context(local_context.getGlobalContext())
, time_zone(DateLUT::instance()) , time_zone(DateLUT::instance())
, log(&Poco::Logger::get("StorageWindowView"))
{ {
setColumns(columns_); setColumns(columns_);
@ -315,14 +303,16 @@ StorageWindowView::StorageWindowView(
throw Exception("SELECT query is not specified for " + getName(), ErrorCodes::INCORRECT_QUERY); throw Exception("SELECT query is not specified for " + getName(), ErrorCodes::INCORRECT_QUERY);
/// Default value, if only table name exist in the query /// Default value, if only table name exist in the query
select_database_name = local_context.getCurrentDatabase();
if (query.select->list_of_selects->children.size() != 1) if (query.select->list_of_selects->children.size() != 1)
throw Exception("UNION is not supported for Window View", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW); throw Exception("UNION is not supported for Window View", ErrorCodes::QUERY_IS_NOT_SUPPORTED_IN_WINDOW_VIEW);
auto inner_query_ = query.select->list_of_selects->children.at(0); auto inner_query_ = query.select->list_of_selects->children.at(0);
ASTSelectQuery & select_query = typeid_cast<ASTSelectQuery &>(*inner_query_); ASTSelectQuery & select_query = typeid_cast<ASTSelectQuery &>(*inner_query_);
String select_database_name = local_context.getCurrentDatabase();
String select_table_name;
extractDependentTable(select_query, select_database_name, select_table_name); extractDependentTable(select_query, select_database_name, select_table_name);
select_table_id = StorageID(select_database_name, select_table_name);
inner_query = innerQueryParser(select_query); inner_query = innerQueryParser(select_query);
/// If the table is not specified - use the table `system.one` /// If the table is not specified - use the table `system.one`
@ -332,14 +322,13 @@ StorageWindowView::StorageWindowView(
select_table_name = "one"; select_table_name = "one";
} }
global_context.addDependency( global_context.addDependency(select_table_id, table_id_);
DatabaseAndTableName(select_database_name, select_table_name), DatabaseAndTableName(database_name, table_name));
parent_storage = local_context.getTable(select_table_id);
if (!query.to_table.empty()) if (!query.to_table.empty())
{ {
has_target_table = true; target_table_id = StorageID(query.to_database, query.to_table);
target_database_name = query.to_database;
target_table_name = query.to_table;
} }
is_temporary = query.temporary; is_temporary = query.temporary;
@ -349,7 +338,7 @@ StorageWindowView::StorageWindowView(
active_ptr = std::make_shared<bool>(true); active_ptr = std::make_shared<bool>(true);
toTableTask = global_context.getSchedulePool().createTask(log->name(), [this] { threadFuncToTable(); }); toTableTask = global_context.getSchedulePool().createTask(getStorageID().getFullTableName(), [this] { threadFuncToTable(); });
toTableTask->deactivate(); toTableTask->deactivate();
} }
@ -403,7 +392,7 @@ ASTPtr StorageWindowView::innerQueryParser(ASTSelectQuery & query)
void StorageWindowView::writeIntoWindowView(StorageWindowView & window_view, const Block & block, const Context & context) void StorageWindowView::writeIntoWindowView(StorageWindowView & window_view, const Block & block, const Context & context)
{ {
BlockInputStreams streams = {std::make_shared<OneBlockInputStream>(block)}; BlockInputStreams streams = {std::make_shared<OneBlockInputStream>(block)};
auto window_proxy_storage = std::make_shared<WindowViewProxyStorage>( auto window_proxy_storage = std::make_shared<WindowViewProxyStorage>(StorageID("", "WindowViewProxyStorage"),
window_view.getParentStorage(), std::move(streams), QueryProcessingStage::FetchColumns); window_view.getParentStorage(), std::move(streams), QueryProcessingStage::FetchColumns);
InterpreterSelectQuery select_block( InterpreterSelectQuery select_block(
window_view.getInnerQuery(), context, window_proxy_storage, QueryProcessingStage::WithMergeableState); window_view.getInnerQuery(), context, window_proxy_storage, QueryProcessingStage::WithMergeableState);
@ -432,18 +421,18 @@ void StorageWindowView::writeIntoWindowView(StorageWindowView & window_view, con
StoragePtr StorageWindowView::getTargetTable() const StoragePtr StorageWindowView::getTargetTable() const
{ {
return global_context.getTable(target_database_name, target_table_name); return global_context.getTable(target_table_id);
} }
StoragePtr StorageWindowView::tryGetTargetTable() const StoragePtr StorageWindowView::tryGetTargetTable() const
{ {
return global_context.tryGetTable(target_database_name, target_table_name); return global_context.tryGetTable(target_table_id);
} }
void StorageWindowView::startup() void StorageWindowView::startup()
{ {
// Start the working thread // Start the working thread
if (has_target_table) if (!target_table_id.empty())
toTableTask->activateAndSchedule(); toTableTask->activateAndSchedule();
startNoUsersThread(temporary_window_view_timeout); startNoUsersThread(temporary_window_view_timeout);
} }
@ -553,7 +542,7 @@ BlockInputStreamPtr StorageWindowView::getNewBlocksInputStreamPtr()
BlockInputStreamPtr stream = std::make_shared<WindowViewBlocksBlockInputStream>(mergeable_blocks, sample_block_, mutex); BlockInputStreamPtr stream = std::make_shared<WindowViewBlocksBlockInputStream>(mergeable_blocks, sample_block_, mutex);
from.push_back(std::move(stream)); from.push_back(std::move(stream));
auto proxy_storage = std::make_shared<WindowViewProxyStorage>( auto proxy_storage = std::make_shared<WindowViewProxyStorage>(
getParentStorage(), std::move(from), QueryProcessingStage::WithMergeableState); StorageID("", "WindowViewProxyStorage"), getParentStorage(), std::move(from), QueryProcessingStage::WithMergeableState);
InterpreterSelectQuery select(getInnerQuery(), global_context, proxy_storage, QueryProcessingStage::Complete); InterpreterSelectQuery select(getInnerQuery(), global_context, proxy_storage, QueryProcessingStage::Complete);
BlockInputStreamPtr data = std::make_shared<MaterializingBlockInputStream>(select.execute().in); BlockInputStreamPtr data = std::make_shared<MaterializingBlockInputStream>(select.execute().in);
return data; return data;
@ -575,6 +564,7 @@ void StorageWindowView::noUsersThread(std::shared_ptr<StorageWindowView> storage
if (storage->shutdown_called) if (storage->shutdown_called)
return; return;
auto table_id = storage->getStorageID();
{ {
while (1) while (1)
{ {
@ -584,7 +574,7 @@ void StorageWindowView::noUsersThread(std::shared_ptr<StorageWindowView> storage
storage->no_users_thread_wakeup = false; storage->no_users_thread_wakeup = false;
if (storage->shutdown_called) if (storage->shutdown_called)
return; return;
if (!storage->global_context.getDependencies(storage->database_name, storage->table_name).empty()) if (!storage->global_context.getDependencies(table_id).empty())
continue; continue;
drop_table = true; drop_table = true;
} }
@ -594,14 +584,14 @@ void StorageWindowView::noUsersThread(std::shared_ptr<StorageWindowView> storage
if (drop_table) if (drop_table)
{ {
if (storage->global_context.tryGetTable(storage->database_name, storage->table_name)) if (storage->global_context.tryGetTable(table_id))
{ {
try try
{ {
/// We create and execute `drop` query for this table /// We create and execute `drop` query for this table
auto drop_query = std::make_shared<ASTDropQuery>(); auto drop_query = std::make_shared<ASTDropQuery>();
drop_query->database = storage->database_name; drop_query->database = table_id.database_name;
drop_query->table = storage->table_name; drop_query->table = table_id.table_name;
drop_query->kind = ASTDropQuery::Kind::Drop; drop_query->kind = ASTDropQuery::Kind::Drop;
ASTPtr ast_drop_query = drop_query; ASTPtr ast_drop_query = drop_query;
InterpreterDropQuery drop_interpreter(ast_drop_query, storage->global_context); InterpreterDropQuery drop_interpreter(ast_drop_query, storage->global_context);
@ -657,7 +647,8 @@ void registerStorageWindowView(StorageFactory & factory)
throw Exception( throw Exception(
"Experimental WINDOW VIEW feature is not enabled (the setting 'allow_experimental_window_view')", "Experimental WINDOW VIEW feature is not enabled (the setting 'allow_experimental_window_view')",
ErrorCodes::SUPPORT_IS_DISABLED); ErrorCodes::SUPPORT_IS_DISABLED);
return StorageWindowView::create(args.table_name, args.database_name, args.local_context, args.query, args.columns);
return StorageWindowView::create(args.table_id, args.local_context, args.query, args.columns);
}); });
} }
} }

View File

@ -23,10 +23,6 @@ class StorageWindowView : public ext::shared_ptr_helper<StorageWindowView>, publ
public: public:
~StorageWindowView() override; ~StorageWindowView() override;
String getName() const override { return "WindowView"; } String getName() const override { return "WindowView"; }
String getTableName() const override { return table_name; }
String getDatabaseName() const override { return database_name; }
String getSelectDatabaseName() const { return select_database_name; }
String getSelectTableName() const { return select_table_name; }
ASTPtr getInnerQuery() const { return inner_query->clone(); } ASTPtr getInnerQuery() const { return inner_query->clone(); }
@ -75,7 +71,7 @@ public:
Block getHeader() const; Block getHeader() const;
StoragePtr & getParentStorage(); StoragePtr & getParentStorage() { return parent_storage; }
static void writeIntoWindowView(StorageWindowView & window_view, const Block & block, const Context & context); static void writeIntoWindowView(StorageWindowView & window_view, const Block & block, const Context & context);
@ -84,10 +80,7 @@ public:
inline UInt32 getWindowUpperBound(UInt32 time_sec); inline UInt32 getWindowUpperBound(UInt32 time_sec);
private: private:
String select_database_name; StorageID select_table_id = StorageID::createEmpty();
String select_table_name;
String table_name;
String database_name;
ASTPtr inner_query; ASTPtr inner_query;
String window_column_name; String window_column_name;
String window_end_column_alias; String window_end_column_alias;
@ -111,9 +104,7 @@ private:
Int64 window_num_units; Int64 window_num_units;
const DateLUTImpl & time_zone; const DateLUTImpl & time_zone;
std::atomic<bool> has_target_table{false}; StorageID target_table_id = StorageID::createEmpty();
String target_database_name;
String target_table_name;
static void noUsersThread(std::shared_ptr<StorageWindowView> storage, const UInt64 & timeout); static void noUsersThread(std::shared_ptr<StorageWindowView> storage, const UInt64 & timeout);
inline void flushToTable(); inline void flushToTable();
@ -125,15 +116,13 @@ private:
std::atomic<bool> start_no_users_thread_called{false}; std::atomic<bool> start_no_users_thread_called{false};
UInt64 temporary_window_view_timeout; UInt64 temporary_window_view_timeout;
Poco::Logger * log;
Poco::Timestamp timestamp; Poco::Timestamp timestamp;
BackgroundSchedulePool::TaskHolder toTableTask; BackgroundSchedulePool::TaskHolder toTableTask;
BackgroundSchedulePool::TaskHolder toTableTask_preprocess; BackgroundSchedulePool::TaskHolder toTableTask_preprocess;
StorageWindowView( StorageWindowView(
const String & table_name_, const StorageID & table_id_,
const String & database_name_,
Context & local_context, Context & local_context,
const ASTCreateQuery & query, const ASTCreateQuery & query,
const ColumnsDescription & columns); const ColumnsDescription & columns);

View File

@ -12,14 +12,14 @@ namespace DB
class WindowViewProxyStorage : public IStorage class WindowViewProxyStorage : public IStorage
{ {
public: public:
WindowViewProxyStorage(StoragePtr storage_, BlockInputStreams streams_, QueryProcessingStage::Enum to_stage_) WindowViewProxyStorage(const StorageID & table_id_, StoragePtr storage_, BlockInputStreams streams_, QueryProcessingStage::Enum to_stage_)
: storage(std::move(storage_)) : IStorage(table_id_)
, storage(std::move(storage_))
, streams(std::move(streams_)) , streams(std::move(streams_))
, to_stage(to_stage_) {} , to_stage(to_stage_) {}
public: public:
std::string getName() const override { return "WindowViewProxyStorage(" + storage->getName() + ")"; } std::string getName() const override { return "WindowViewProxyStorage(" + storage->getName() + ")"; }
std::string getTableName() const override { return storage->getTableName(); }
bool isRemote() const override { return storage->isRemote(); } bool isRemote() const override { return storage->isRemote(); }
bool supportsSampling() const override { return storage->supportsSampling(); } bool supportsSampling() const override { return storage->supportsSampling(); }